ServerCore/EPoll.cpp
2021-08-05 13:07:53 -07:00

121 lines
3.7 KiB
C++

#include "Thread.h"
#include "EPoll.h"
#include "Command.h"
#include "Exception.h"
namespace core {
EPoll::EPoll() : Command() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "EPoll object being constructed.";
maxSockets = 1000;
epfd = epoll_create1(0);
terminateThreads = false;
}
EPoll::~EPoll() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "BMAEPoll destructed.";
}
bool EPoll::start(int numberOfThreads, int maxSockets) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Starting epoll event processing.";
this->numberOfThreads = numberOfThreads;
coreutils::Log(coreutils::LOG_DEBUG_3) << "Number of threads starting is " << numberOfThreads << ".";
coreutils::Log(coreutils::LOG_DEBUG_3) << "Maximum connections is " << maxSockets << ".";
// TODO: Set the number of maximum open files to the maxSockets value.
//
//----------------------------------------------------------------------
// Create thread objects into vector for number of threads requested.
// Hand all the threads a pointer to the EPoll object so they can run
// the socket handlers.
//----------------------------------------------------------------------
for(int ix = 0; ix < numberOfThreads; ++ix)
threads.emplace_back(*this);
for(int ix = 0; ix < numberOfThreads; ++ix)
threads[ix].start();
return true;
}
bool EPoll::stop() {
terminateThreads = true;
//--------------------------------------------------------
// Kill and join all the threads that have been started.
//--------------------------------------------------------
for(int ix = 0; ix < numberOfThreads; ++ix)
threads[ix].join();
//--------------------------
// Close the epoll socket.
//--------------------------
close(epfd);
return true;
}
bool EPoll::isStopping() {
return terminateThreads;
}
bool EPoll::registerSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << ".";
enableSocket(socket);
return true;
}
bool EPoll::unregisterSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << ".";
disableSocket(socket);
return true;
}
int EPoll::getDescriptor() {
return epfd;
}
int EPoll::processCommand(coreutils::ZString &request, TCPSession &session) {
int sequence = 0;
for(auto threadx : threads) {
session.out << "|" << ++sequence;
threadx.output(session.out);
session.out << "|" << std::endl;
}
return 1;
}
void EPoll::enableSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events.";
struct epoll_event event;
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event);
}
void EPoll::disableSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events.";
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
}
void EPoll::resetSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read.";
struct epoll_event event;
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite())
event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event);
}
}