From cb7a556edf2d99bd699546e4815d122f6d3d3f42 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Sat, 25 Apr 2020 10:24:25 -0700 Subject: [PATCH] Conversion to listless socket management. --- EPoll.cpp | 75 +++++++++++++----------------------------------------- EPoll.h | 7 ++--- Socket.cpp | 28 ++++++++++---------- Socket.h | 2 +- Thread.cpp | 5 ++-- 5 files changed, 37 insertions(+), 80 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index 31bf186..9b4f0c9 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -69,44 +69,17 @@ namespace core { } bool EPoll::registerSocket(Socket *socket) { - lock.lock(); - std::map::iterator temp = sockets.find(socket->getDescriptor()); - if(temp != sockets.end()) - throw coreutils::Exception("Attempt to register socket that is already registered."); coreutils::Log(coreutils::LOG_DEBUG_3) << "Registering socket " << socket->getDescriptor() << "."; - sockets.insert(std::pair(socket->getDescriptor(), socket)); enableSocket(socket); - lock.unlock(); return true; } bool EPoll::unregisterSocket(Socket *socket) { - lock.lock(); - disableSocket(socket); coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << "."; - std::map::iterator temp = sockets.find(socket->getDescriptor()); - if(temp == sockets.end()) - throw coreutils::Exception("Attempt to unregister socket that is not registered."); - sockets.erase(temp); - lock.unlock(); + disableSocket(socket); return true; } - - void EPoll::eventReceived(struct epoll_event event, pid_t threadId) { - lock.lock(); - std::map::iterator socket = sockets.find(event.data.fd); - lock.unlock(); - Socket *socket1 = (Socket *)event.data.ptr; - coreutils::Log(coreutils::LOG_DEBUG_3) << "Sockets: " << socket->second << ":" << socket1 << "."; - if(socket != sockets.end()) { - socket->second->reset = true; - socket->second->eventReceived(event, threadId); -// resetSocket(socket->second); - } - else - throw coreutils::Exception("Reference to socket that has no object."); - } - + int EPoll::getDescriptor() { return epfd; } @@ -122,40 +95,26 @@ namespace core { } void EPoll::enableSocket(Socket *socket) { + coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events."; + struct epoll_event event; + event.data.ptr = socket; + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; + epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); + } + + void EPoll::disableSocket(Socket *socket) { + coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events."; + epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); + } + + void EPoll::resetSocket(Socket *socket) { + coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read."; struct epoll_event event; - event.data.fd = socket->getDescriptor(); event.data.ptr = socket; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; if(socket->needsToWrite()) event.events |= EPOLLWRNORM; - epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); - socket->reset = false; - coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events."; - } - - void EPoll::disableSocket(Socket *socket) { - epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); - socket->reset = false; - coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events."; - } - - void EPoll::resetRead(Socket *socket) { - struct epoll_event event; - event.data.fd = socket->getDescriptor(); - event.data.ptr = socket; - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event); - socket->reset = false; - coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read."; - } - - void EPoll::resetWrite(Socket *socket) { - struct epoll_event event; - event.data.fd = socket->getDescriptor(); - event.data.ptr = socket; - event.events = EPOLLWRNORM | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); - coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for write."; + epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); } } diff --git a/EPoll.h b/EPoll.h index d891cd1..ad4d7d7 100644 --- a/EPoll.h +++ b/EPoll.h @@ -101,7 +101,7 @@ namespace core { /// Receive the epoll events and dispatch the event to the socket making the request. /// - void eventReceived(struct epoll_event event, pid_t threadId); ///< Dispatch event to appropriate socket. + void eventReceived(struct epoll_event event); ///< Dispatch event to appropriate socket. /// /// The processCommand() method displays the thread array to the requesting console via the @@ -112,17 +112,14 @@ namespace core { int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; /// sockets; std::vector threads; volatile bool terminateThreads; - std::mutex lock; void enableSocket(Socket *socket); void disableSocket(Socket *socket); diff --git a/Socket.cpp b/Socket.cpp index a864965..adcbe83 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -58,26 +58,25 @@ namespace core { void Socket::onUnregistered() {} - bool Socket::eventReceived(struct epoll_event event, int threadId) { + bool Socket::eventReceived(struct epoll_event event) { -// coreutils::Log(coreutils::LOG_DEBUG_2) << "waiting for lock on socket " << descriptor << "."; + coreutils::Log(coreutils::LOG_DEBUG_2) << "waiting for lock on socket " << descriptor << "."; -// lock.lock(); -// coreutils::Log(coreutils::LOG_DEBUG_2) << "Obtained lock on socket " << descriptor << "."; + lock.lock(); + coreutils::Log(coreutils::LOG_DEBUG_2) << "Obtained lock on socket " << descriptor << "."; if(event.events & EPOLLRDHUP) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor; readHangup = true; shutdown("hangup received"); -// lock.unlock(); -// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". RDHUP"; + lock.unlock(); + coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". RDHUP"; return false; } if(event.events & EPOLLIN) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor; receiveData(buffer, length); - ePoll.resetRead(this); } if(event.events & EPOLLWRNORM) { @@ -88,14 +87,15 @@ namespace core { if(event.events & EPOLLHUP) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor; shutdown(); -// lock.unlock(); -// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". HUP"; + lock.unlock(); + coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". HUP"; return false; } -// lock.unlock(); -// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << " with reset " << reset << "."; + lock.unlock(); + coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << " with reset " << reset << "."; + reset = true; return reset; } @@ -146,8 +146,8 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "]."; ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); -// if(shutDown && !needsToWrite()) -// delete this; + if(shutDown && !needsToWrite()) + delete this; outlock.unlock(); } } @@ -158,7 +158,7 @@ namespace core { fifo.emplace(data); coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write."; outlock.unlock(); - ePoll.resetWrite(this); + ePoll.resetSocket(this); return 1; } diff --git a/Socket.h b/Socket.h index 4ff90fc..920d900 100644 --- a/Socket.h +++ b/Socket.h @@ -76,7 +76,7 @@ namespace core { /// simulated. /// - bool eventReceived(struct epoll_event event, pid_t threadId); ///< Parse epoll event and call specified callbacks. + bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks. /// /// Write data to the socket. diff --git a/Thread.cpp b/Thread.cpp index 4b44c93..6bd45de 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -59,8 +59,9 @@ namespace core { break; } else if(rc > 0) { for(int ix = 0; ix < rc; ++ix) { - ++count; - ePoll.eventReceived(events[ix], threadId); + ++count; + if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) + ePoll.resetSocket((Socket *)events[ix].data.ptr); } } }