#include "Thread.h" #include "EPoll.h" #include "Command.h" #include "Exception.h" namespace core { EPoll::EPoll() : Command() { coreutils::Log(coreutils::LOG_DEBUG_2) << "EPoll object being constructed."; maxSockets = 1000; epfd = epoll_create1(0); terminateThreads = false; } EPoll::~EPoll() { coreutils::Log(coreutils::LOG_DEBUG_2) << "BMAEPoll destructed."; } bool EPoll::start(int numberOfThreads, int maxSockets) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Starting epoll event processing."; this->numberOfThreads = numberOfThreads; coreutils::Log(coreutils::LOG_DEBUG_3) << "Number of threads starting is " << numberOfThreads << "."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Maximum connections is " << maxSockets << "."; // TODO: Set the number of maximum open files to the maxSockets value. // //---------------------------------------------------------------------- // Create thread objects into vector for number of threads requested. // Hand all the threads a pointer to the EPoll object so they can run // the socket handlers. //---------------------------------------------------------------------- for(int ix = 0; ix < numberOfThreads; ++ix) threads.emplace_back(*this); for(int ix = 0; ix < numberOfThreads; ++ix) threads[ix].start(); return true; } bool EPoll::stop() { terminateThreads = true; //-------------------------------------------------------- // Kill and join all the threads that have been started. //-------------------------------------------------------- for(int ix = 0; ix < numberOfThreads; ++ix) threads[ix].join(); //-------------------------- // Close the epoll socket. //-------------------------- close(epfd); return true; } bool EPoll::isStopping() { return terminateThreads; } bool EPoll::registerSocket(Socket *socket /**< The Socket to register.*/) { lock.lock(); std::map::iterator temp = sockets.find(socket->getDescriptor()); if(temp != sockets.end()) throw coreutils::Exception("Attempt to register socket that is already registered."); coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << "."; sockets.insert(std::pair(socket->getDescriptor(), socket)); lock.unlock(); socket->enable(true); socket->onRegistered(); return true; } bool EPoll::unregisterSocket(Socket *socket /**< The Socket to unregister. */) { lock.lock(); socket->enable(false); coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << "."; std::map::iterator temp = sockets.find(socket->getDescriptor()); if(temp == sockets.end()) throw coreutils::Exception("Attempt to unregister socket that is not registered."); sockets.erase(temp); lock.unlock(); socket->onUnregistered(); return true; } void EPoll::eventReceived(struct epoll_event event) { lock.lock(); std::map::iterator socket = sockets.find(event.data.fd); lock.unlock(); if(socket != sockets.end()) { (socket->second)->eventReceived(event); } else { coreutils::Log(coreutils::LOG_WARN) << "System problem. Reference to socket " << event.data.fd << " that has no object."; throw coreutils::Exception("System problem occurred."); } } int EPoll::getDescriptor() { return epfd; } int EPoll::processCommand(std::string command, TCPSession *session, std::stringstream &data) { int sequence = 0; for(auto threadx : threads) { data << "|" << ++sequence; threadx.output(data); data << "|" << std::endl; } } }