ServerCore/EPoll.cpp

155 lines
5.5 KiB
C++

#include "Thread.h"
#include "EPoll.h"
#include "Command.h"
#include "Exception.h"
namespace core {
EPoll::EPoll() : Command() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "EPoll object being constructed.";
maxSockets = 1000;
epfd = epoll_create1(0);
terminateThreads = false;
}
EPoll::~EPoll() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "BMAEPoll destructed.";
}
bool EPoll::start(int numberOfThreads, int maxSockets) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Starting epoll event processing.";
this->numberOfThreads = numberOfThreads;
coreutils::Log(coreutils::LOG_DEBUG_3) << "Number of threads starting is " << numberOfThreads << ".";
coreutils::Log(coreutils::LOG_DEBUG_3) << "Maximum connections is " << maxSockets << ".";
// TODO: Set the number of maximum open files to the maxSockets value.
//
//----------------------------------------------------------------------
// Create thread objects into vector for number of threads requested.
// Hand all the threads a pointer to the EPoll object so they can run
// the socket handlers.
//----------------------------------------------------------------------
for(int ix = 0; ix < numberOfThreads; ++ix)
threads.emplace_back(*this);
for(int ix = 0; ix < numberOfThreads; ++ix)
threads[ix].start();
return true;
}
bool EPoll::stop() {
terminateThreads = true;
//--------------------------------------------------------
// Kill and join all the threads that have been started.
//--------------------------------------------------------
for(int ix = 0; ix < numberOfThreads; ++ix)
threads[ix].join();
//--------------------------
// Close the epoll socket.
//--------------------------
close(epfd);
return true;
}
bool EPoll::isStopping() {
return terminateThreads;
}
bool EPoll::registerSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "0001-" << socket->getDescriptor();
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor());
coreutils::Log(coreutils::LOG_DEBUG_2) << "0002-" << socket->getDescriptor();
if(temp != sockets.end())
throw coreutils::Exception("Attempt to register socket that is already registered.");
coreutils::Log(coreutils::LOG_DEBUG_2) << "0003-" << socket->getDescriptor();
coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << ".";
sockets.insert(std::pair<int, Socket *>(socket->getDescriptor(), socket));
coreutils::Log(coreutils::LOG_DEBUG_2) << "0004-" << socket->getDescriptor();
enableSocket(socket);
coreutils::Log(coreutils::LOG_DEBUG_2) << "0005-" << socket->getDescriptor();
return true;
}
bool EPoll::unregisterSocket(Socket *socket /**< The Socket to unregister. */) {
lock.lock();
disableSocket(socket);
coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << ".";
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor());
if(temp == sockets.end())
throw coreutils::Exception("Attempt to unregister socket that is not registered.");
sockets.erase(temp);
lock.unlock();
return true;
}
void EPoll::eventReceived(struct epoll_event event) {
std::map<int, Socket *>::iterator socket = sockets.find(event.data.fd);
if(socket != sockets.end()) {
if(socket->second->eventReceived(event)) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "resetSocket from eventReceived.";
resetSocket(socket->second);
}
}
else
throw coreutils::Exception("Reference to socket that has no object.");
}
int EPoll::getDescriptor() {
return epfd;
}
int EPoll::processCommand(std::string command, TCPSession *session, std::stringstream &data) {
int sequence = 0;
for(auto threadx : threads) {
data << "|" << ++sequence;
threadx.output(data);
data << "|" << std::endl;
}
}
void EPoll::enableSocket(Socket *socket) {
struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite())
event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event);
socket->active = true;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events.";
}
void EPoll::disableSocket(Socket *socket) {
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
socket->active = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events.";
}
void EPoll::resetSocket(Socket *socket) {
if(!socket->active)
return;
coreutils::Log(coreutils::LOG_DEBUG_4) << "ResetSocket " << socket;
struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite())
event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for events.";
}
}