Conversion to listless socket management.

This commit is contained in:
Brad Arant 2020-04-25 10:24:25 -07:00
parent 796ddd511f
commit cb7a556edf
5 changed files with 37 additions and 80 deletions

View File

@ -69,44 +69,17 @@ namespace core {
} }
bool EPoll::registerSocket(Socket *socket) { bool EPoll::registerSocket(Socket *socket) {
lock.lock();
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor());
if(temp != sockets.end())
throw coreutils::Exception("Attempt to register socket that is already registered.");
coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << "."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << ".";
sockets.insert(std::pair<int, Socket *>(socket->getDescriptor(), socket));
enableSocket(socket); enableSocket(socket);
lock.unlock();
return true; return true;
} }
bool EPoll::unregisterSocket(Socket *socket) { bool EPoll::unregisterSocket(Socket *socket) {
lock.lock();
disableSocket(socket);
coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << "."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << ".";
std::map<int, Socket *>::iterator temp = sockets.find(socket->getDescriptor()); disableSocket(socket);
if(temp == sockets.end())
throw coreutils::Exception("Attempt to unregister socket that is not registered.");
sockets.erase(temp);
lock.unlock();
return true; return true;
} }
void EPoll::eventReceived(struct epoll_event event, pid_t threadId) {
lock.lock();
std::map<int, Socket *>::iterator socket = sockets.find(event.data.fd);
lock.unlock();
Socket *socket1 = (Socket *)event.data.ptr;
coreutils::Log(coreutils::LOG_DEBUG_3) << "Sockets: " << socket->second << ":" << socket1 << ".";
if(socket != sockets.end()) {
socket->second->reset = true;
socket->second->eventReceived(event, threadId);
// resetSocket(socket->second);
}
else
throw coreutils::Exception("Reference to socket that has no object.");
}
int EPoll::getDescriptor() { int EPoll::getDescriptor() {
return epfd; return epfd;
} }
@ -122,40 +95,26 @@ namespace core {
} }
void EPoll::enableSocket(Socket *socket) { void EPoll::enableSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events.";
struct epoll_event event;
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event);
}
void EPoll::disableSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events.";
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
}
void EPoll::resetSocket(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read.";
struct epoll_event event; struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.data.ptr = socket; event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite()) if(socket->needsToWrite())
event.events |= EPOLLWRNORM; event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event);
socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events.";
}
void EPoll::disableSocket(Socket *socket) {
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events.";
}
void EPoll::resetRead(Socket *socket) {
struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event);
socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read.";
}
void EPoll::resetWrite(Socket *socket) {
struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.data.ptr = socket;
event.events = EPOLLWRNORM | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for write.";
} }
} }

View File

@ -101,7 +101,7 @@ namespace core {
/// Receive the epoll events and dispatch the event to the socket making the request. /// Receive the epoll events and dispatch the event to the socket making the request.
/// ///
void eventReceived(struct epoll_event event, pid_t threadId); ///< Dispatch event to appropriate socket. void eventReceived(struct epoll_event event); ///< Dispatch event to appropriate socket.
/// ///
/// The processCommand() method displays the thread array to the requesting console via the /// The processCommand() method displays the thread array to the requesting console via the
@ -112,17 +112,14 @@ namespace core {
int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; ///<Output the threads array to the console. int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; ///<Output the threads array to the console.
void resetRead(Socket *socket); void resetSocket(Socket *socket);
void resetWrite(Socket *socket);
private: private:
int epfd; int epfd;
int numberOfThreads; int numberOfThreads;
std::map<int, Socket *> sockets;
std::vector<Thread> threads; std::vector<Thread> threads;
volatile bool terminateThreads; volatile bool terminateThreads;
std::mutex lock;
void enableSocket(Socket *socket); void enableSocket(Socket *socket);
void disableSocket(Socket *socket); void disableSocket(Socket *socket);

View File

@ -58,26 +58,25 @@ namespace core {
void Socket::onUnregistered() {} void Socket::onUnregistered() {}
bool Socket::eventReceived(struct epoll_event event, int threadId) { bool Socket::eventReceived(struct epoll_event event) {
// coreutils::Log(coreutils::LOG_DEBUG_2) << "waiting for lock on socket " << descriptor << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "waiting for lock on socket " << descriptor << ".";
// lock.lock(); lock.lock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Obtained lock on socket " << descriptor << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Obtained lock on socket " << descriptor << ".";
if(event.events & EPOLLRDHUP) { if(event.events & EPOLLRDHUP) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor;
readHangup = true; readHangup = true;
shutdown("hangup received"); shutdown("hangup received");
// lock.unlock(); lock.unlock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". RDHUP"; coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". RDHUP";
return false; return false;
} }
if(event.events & EPOLLIN) { if(event.events & EPOLLIN) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor;
receiveData(buffer, length); receiveData(buffer, length);
ePoll.resetRead(this);
} }
if(event.events & EPOLLWRNORM) { if(event.events & EPOLLWRNORM) {
@ -88,14 +87,15 @@ namespace core {
if(event.events & EPOLLHUP) { if(event.events & EPOLLHUP) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor;
shutdown(); shutdown();
// lock.unlock(); lock.unlock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". HUP"; coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". HUP";
return false; return false;
} }
// lock.unlock(); lock.unlock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << " with reset " << reset << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << " with reset " << reset << ".";
reset = true;
return reset; return reset;
} }
@ -146,8 +146,8 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "]."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "].";
::write(descriptor, fifo.front().c_str(), fifo.front().length()); ::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop(); fifo.pop();
// if(shutDown && !needsToWrite()) if(shutDown && !needsToWrite())
// delete this; delete this;
outlock.unlock(); outlock.unlock();
} }
} }
@ -158,7 +158,7 @@ namespace core {
fifo.emplace(data); fifo.emplace(data);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write."; coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write.";
outlock.unlock(); outlock.unlock();
ePoll.resetWrite(this); ePoll.resetSocket(this);
return 1; return 1;
} }

View File

@ -76,7 +76,7 @@ namespace core {
/// simulated. /// simulated.
/// ///
bool eventReceived(struct epoll_event event, pid_t threadId); ///< Parse epoll event and call specified callbacks. bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks.
/// ///
/// Write data to the socket. /// Write data to the socket.

View File

@ -60,7 +60,8 @@ namespace core {
} else if(rc > 0) { } else if(rc > 0) {
for(int ix = 0; ix < rc; ++ix) { for(int ix = 0; ix < rc; ++ix) {
++count; ++count;
ePoll.eventReceived(events[ix], threadId); if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix]))
ePoll.resetSocket((Socket *)events[ix].data.ptr);
} }
} }
} }