122 lines
4.0 KiB
C++
122 lines
4.0 KiB
C++
#include "Thread.h"
|
|
#include "EPoll.h"
|
|
#include "Command.h"
|
|
#include "Exception.h"
|
|
|
|
namespace core {
|
|
|
|
EPoll::EPoll() : Command() {
|
|
|
|
coreutils::Log(coreutils::LOG_DEBUG_2) << "EPoll object being constructed.";
|
|
|
|
maxSockets = 1000;
|
|
epfd = epoll_create1(0);
|
|
terminateThreads = false;
|
|
}
|
|
|
|
EPoll::~EPoll() {
|
|
coreutils::Log(coreutils::LOG_DEBUG_2) << "BMAEPoll destructed.";
|
|
}
|
|
|
|
bool EPoll::start(int numberOfThreads, int maxSockets) {
|
|
|
|
coreutils::Log(coreutils::LOG_DEBUG_2) << "Starting epoll event processing.";
|
|
|
|
this->numberOfThreads = numberOfThreads;
|
|
|
|
coreutils::Log(coreutils::LOG_DEBUG_3) << "Number of threads starting is " << numberOfThreads << ".";
|
|
coreutils::Log(coreutils::LOG_DEBUG_3) << "Maximum connections is " << maxSockets << ".";
|
|
|
|
// TODO: Set the number of maximum open files to the maxSockets value.
|
|
//
|
|
//----------------------------------------------------------------------
|
|
// Create thread objects into vector for number of threads requested.
|
|
// Hand all the threads a pointer to the EPoll object so they can run
|
|
// the socket handlers.
|
|
//----------------------------------------------------------------------
|
|
|
|
for(int ix = 0; ix < numberOfThreads; ++ix)
|
|
threads.emplace_back(*this);
|
|
|
|
for(int ix = 0; ix < numberOfThreads; ++ix)
|
|
threads[ix].start();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool EPoll::stop() {
|
|
|
|
terminateThreads = true;
|
|
|
|
//--------------------------------------------------------
|
|
// Kill and join all the threads that have been started.
|
|
//--------------------------------------------------------
|
|
|
|
for(int ix = 0; ix < numberOfThreads; ++ix)
|
|
threads[ix].join();
|
|
|
|
//--------------------------
|
|
// Close the epoll socket.
|
|
//--------------------------
|
|
|
|
close(epfd);
|
|
|
|
return true;
|
|
}
|
|
|
|
bool EPoll::isStopping() {
|
|
return terminateThreads;
|
|
}
|
|
|
|
bool EPoll::registerSocket(Socket *socket) {
|
|
lock.lock();
|
|
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor());
|
|
if(temp != sockets.end())
|
|
throw coreutils::Exception("Attempt to register socket that is already registered.");
|
|
coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << ".";
|
|
sockets.insert(std::pair<int, Socket *>(socket->getDescriptor(), socket));
|
|
lock.unlock();
|
|
socket->enable(true);
|
|
return true;
|
|
}
|
|
|
|
bool EPoll::unregisterSocket(Socket *socket /**< The Socket to unregister. */) {
|
|
lock.lock();
|
|
socket->enable(false);
|
|
coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << ".";
|
|
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor());
|
|
if(temp == sockets.end())
|
|
throw coreutils::Exception("Attempt to unregister socket that is not registered.");
|
|
sockets.erase(temp);
|
|
lock.unlock();
|
|
return true;
|
|
}
|
|
|
|
void EPoll::eventReceived(struct epoll_event event) {
|
|
lock.lock();
|
|
std::map<int, Socket *>::iterator socket = sockets.find(event.data.fd);
|
|
lock.unlock();
|
|
if(socket != sockets.end()) {
|
|
(socket->second)->eventReceived(event);
|
|
} else {
|
|
coreutils::Log(coreutils::LOG_WARN) << "System problem. Reference to socket " << event.data.fd << " that has no object.";
|
|
throw coreutils::Exception("System problem occurred.");
|
|
}
|
|
}
|
|
|
|
int EPoll::getDescriptor() {
|
|
return epfd;
|
|
}
|
|
|
|
int EPoll::processCommand(std::string command, TCPSession *session, std::stringstream &data) {
|
|
int sequence = 0;
|
|
for(auto threadx : threads) {
|
|
data << "|" << ++sequence;
|
|
threadx.output(data);
|
|
data << "|" << std::endl;
|
|
}
|
|
|
|
}
|
|
|
|
}
|