From 796ddd511f9abe0b14ddd7ded48b60dd8e9e6297 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Sat, 25 Apr 2020 08:19:26 -0700 Subject: [PATCH] A commit before I change everything on the the way events are handled. --- EPoll.cpp | 34 +++++++++++++++++++++++----------- EPoll.h | 3 ++- Socket.cpp | 31 +++++++++++++++---------------- Socket.h | 4 ++-- TCPSession.cpp | 13 ++++++++++--- TCPSession.h | 11 +++++++++++ Thread.cpp | 10 ++-------- 7 files changed, 65 insertions(+), 41 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index d778e3b..31bf186 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -80,7 +80,7 @@ namespace core { return true; } - bool EPoll::unregisterSocket(Socket *socket /**< The Socket to unregister. */) { + bool EPoll::unregisterSocket(Socket *socket) { lock.lock(); disableSocket(socket); coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << "."; @@ -96,9 +96,12 @@ namespace core { lock.lock(); std::map::iterator socket = sockets.find(event.data.fd); lock.unlock(); + Socket *socket1 = (Socket *)event.data.ptr; + coreutils::Log(coreutils::LOG_DEBUG_3) << "Sockets: " << socket->second << ":" << socket1 << "."; if(socket != sockets.end()) { - if(socket->second->eventReceived(event, threadId)) - resetSocket(socket->second); + socket->second->reset = true; + socket->second->eventReceived(event, threadId); +// resetSocket(socket->second); } else throw coreutils::Exception("Reference to socket that has no object."); @@ -121,29 +124,38 @@ namespace core { void EPoll::enableSocket(Socket *socket) { struct epoll_event event; event.data.fd = socket->getDescriptor(); + event.data.ptr = socket; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; if(socket->needsToWrite()) event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); - socket->active = true; + socket->reset = false; coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events."; } void EPoll::disableSocket(Socket *socket) { epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); - socket->active = false; + socket->reset = false; coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events."; } - void EPoll::resetSocket(Socket *socket) { - coreutils::Log(coreutils::LOG_DEBUG_4) << "ResetSocket " << socket; + void EPoll::resetRead(Socket *socket) { struct epoll_event event; event.data.fd = socket->getDescriptor(); + event.data.ptr = socket; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - if(socket->needsToWrite()) - event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event); - coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for events."; + socket->reset = false; + coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read."; } - + + void EPoll::resetWrite(Socket *socket) { + struct epoll_event event; + event.data.fd = socket->getDescriptor(); + event.data.ptr = socket; + event.events = EPOLLWRNORM | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; + epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); + coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for write."; + } + } diff --git a/EPoll.h b/EPoll.h index 3bbfbad..d891cd1 100644 --- a/EPoll.h +++ b/EPoll.h @@ -112,7 +112,8 @@ namespace core { int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; ///= 0) { -// coreutils::Log(coreutils::LOG_DEBUG_4) << "data[" << std::string(buffer, bufferLength) << "]"; onDataReceived(buffer, len); } else { @@ -144,10 +146,8 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "]."; ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); - coreutils::Log(coreutils::LOG_DEBUG_4) << "resetSocket from writeSocket."; - ePoll.resetSocket(this); - if(shutDown && !needsToWrite()) - delete this; +// if(shutDown && !needsToWrite()) +// delete this; outlock.unlock(); } } @@ -157,8 +157,8 @@ namespace core { outlock.lock(); fifo.emplace(data); coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write."; - ePoll.resetSocket(this); outlock.unlock(); + ePoll.resetWrite(this); return 1; } @@ -174,9 +174,8 @@ namespace core { void Socket::shutdown(std::string text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; - active = false; + reset = false; if(!needsToWrite()) { - active = false; delete this; } diff --git a/Socket.h b/Socket.h index 3c3f126..4ff90fc 100644 --- a/Socket.h +++ b/Socket.h @@ -109,8 +109,8 @@ namespace core { virtual void onUnregistered(); ///< Called when the socket has finished unregistering for the epoll processing. bool needsToWrite(); - - bool active = false; + + bool reset = false; protected: diff --git a/TCPSession.cpp b/TCPSession.cpp index b984a3b..775957a 100644 --- a/TCPSession.cpp +++ b/TCPSession.cpp @@ -26,6 +26,8 @@ namespace core { onConnected(); protocol(); send(); + if(term) + shutdown("termination requested"); } void TCPSession::onConnected() {} @@ -47,9 +49,7 @@ namespace core { lineBufferSize -= lineLength; if(lineBufferSize > 0) memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); - coreutils::Log(coreutils::LOG_DEBUG_3) << "lineBufferSize=" << lineBufferSize << "; lineLength=" << lineLength << ";"; lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); -// coreutils::Log(coreutils::LOG_DEBUG_3) << "lineBuffer=" << std::string(lineBuffer, lineBufferSize); } } } @@ -58,6 +58,8 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << line << "]"; protocol(line); send(); + if(term) + shutdown("termination requested"); } void TCPSession::sendToAll() { @@ -76,8 +78,13 @@ namespace core { } void TCPSession::send() { - write(out.str()); + if(out.tellp() > 0) + write(out.str()); out.str(""); } + + void TCPSession::terminate() { + term = true; + } } diff --git a/TCPSession.h b/TCPSession.h index 782f611..ddb92fe 100644 --- a/TCPSession.h +++ b/TCPSession.h @@ -57,6 +57,16 @@ namespace core { /// void sendToAll(SessionFilter filter); + + /// + /// + /// + + void terminate(); + + /// + /// + /// TCPServer &server; @@ -106,6 +116,7 @@ namespace core { char *lineBuffer = NULL; int lineBufferSize = 0; std::mutex mtx; + bool term = false; }; diff --git a/Thread.cpp b/Thread.cpp index 4a65e09..4b44c93 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -59,18 +59,12 @@ namespace core { break; } else if(rc > 0) { for(int ix = 0; ix < rc; ++ix) { - ++count; -// std::cout << "Event " << events[ix].events << " on socket " << events[ix].data.fd << " on thread " << getThreadId() << ": "; -// std::cout << ((events[ix].events & EPOLLIN) ? "EPOLLIN ": ""); -// std::cout << ((events[ix].events & EPOLLWRNORM) ? "EPOLLWRNORM ": ""); -// std::cout << ((events[ix].events & EPOLLRDHUP) ? "EPOLLRDHUP ": ""); -// std::cout << ((events[ix].events & EPOLLHUP) ? "EPOLLHUP ": ""); -// std::cout << "." << std::endl; + ++count; ePoll.eventReceived(events[ix], threadId); } } } - + coreutils::Log(coreutils::LOG_DEBUG_1) << "Thread ending with thread id " << threadId << "."; } }