From 3a6883e77f1b560ca0b39dbf470a685548c5c0a5 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Tue, 15 Mar 2022 21:02:35 +0000 Subject: [PATCH] Cleaned up socket scheduling. Very clean now. --- EPoll.cpp | 6 +++--- Socket.cpp | 35 +++++++++-------------------------- Socket.h | 2 +- Subscription.cpp | 1 - SubscriptionManager.cpp | 4 ++++ SubscriptionManager.h | 3 ++- TCPServer.cpp | 26 ++++++++++++++------------ Timer.cpp | 2 +- 8 files changed, 34 insertions(+), 45 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index 4cbd9a0..9f669e9 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -97,11 +97,11 @@ namespace core { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); } - + void EPoll::disableSocket(Socket *socket) { epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); } - + void EPoll::resetSocket(Socket *socket) { struct epoll_event event; event.data.ptr = socket; @@ -110,5 +110,5 @@ namespace core { event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); } - + } diff --git a/Socket.cpp b/Socket.cpp index 3e89ac0..8b0cdf6 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -17,7 +17,7 @@ namespace core { Socket::~Socket() { free(buffer); if(descriptor == -1) - return; + return; onUnregister(); ePoll.unregisterSocket(this); coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; @@ -26,10 +26,9 @@ namespace core { void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { - shutdown("Too many files open"); - throw coreutils::Exception("Too many files open. Refusing connection."); + shutdown("Too many files open"); + throw coreutils::Exception("Too many files open. Refusing connection."); } - lock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; if(descriptor < 3) throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); @@ -37,7 +36,6 @@ namespace core { onRegister(); ePoll.registerSocket(this); onRegistered(); - lock.unlock(); } int Socket::getDescriptor() { @@ -63,29 +61,20 @@ namespace core { void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event) { - - lock.lock(); - if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); } - - if(event.events & EPOLLIN) { + else if(event.events & EPOLLIN) { coreutils::ZString zbuffer(buffer, length); receiveData(zbuffer); } - - if(event.events & EPOLLWRNORM) { + else if(event.events & EPOLLWRNORM) { writeSocket(); } - - if(event.events & EPOLLHUP) { + else if(event.events & EPOLLHUP) { shutdown(); } - - lock.unlock(); - return !shutDown; } @@ -107,9 +96,6 @@ namespace core { int len; int error = -1; -// for(int ix = 0; ix < buffer.getLength(); ++ix) -// buffer[ix] = 0; - if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; @@ -140,10 +126,9 @@ namespace core { void Socket::writeSocket() { if(fifo.size() > 0) { outlock.lock(); - ::write(descriptor, fifo.front().c_str(), fifo.front().length()); + if(!shutDown) + ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); -// if(shutDown && !needsToWrite()) -// delete this; outlock.unlock(); } } @@ -152,7 +137,7 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); - ePoll.resetSocket(this); +// ePoll.resetSocket(this); return 1; } @@ -168,8 +153,6 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; - // if(!needsToWrite()) -// delete this; } } diff --git a/Socket.h b/Socket.h index 2063089..9700bcd 100644 --- a/Socket.h +++ b/Socket.h @@ -167,7 +167,7 @@ namespace core { std::string text; int descriptor = -1; - std::mutex lock; +// std::mutex lock; std::mutex outlock; bool readHangup = false; diff --git a/Subscription.cpp b/Subscription.cpp index cd00ec8..ce40556 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -14,7 +14,6 @@ namespace core Subscription::~Subscription() { - coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; std::stringstream out; out << "cancel:" << id << std::endl; for (auto subscriber : subscribers) diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index d5575e8..44c2468 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -7,7 +7,9 @@ namespace core { SubscriptionManager::SubscriptionManager() {} int SubscriptionManager::add(Subscription &subscription) { + lock.lock(); subscriptions.insert(std::make_pair(subscription.id, &subscription)); + lock.unlock(); return 1; } @@ -15,6 +17,7 @@ namespace core { int countSubscribed = 0; int countPublished = 0; + lock.lock(); std::string temp = ""; for(auto [key, subscription] : subscriptions) { if(temp != "") { @@ -34,6 +37,7 @@ namespace core { } coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; + lock.unlock(); return countSubscribed; } diff --git a/SubscriptionManager.h b/SubscriptionManager.h index e0d19a3..6fa042a 100644 --- a/SubscriptionManager.h +++ b/SubscriptionManager.h @@ -23,7 +23,8 @@ namespace core { private: std::map subscriptions; - + std::mutex lock; + }; } diff --git a/TCPServer.cpp b/TCPServer.cpp index 61dc17e..24adf2d 100644 --- a/TCPServer.cpp +++ b/TCPServer.cpp @@ -72,8 +72,10 @@ namespace core { std::vector::iterator cursor; lock.lock(); for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) - if(*cursor == session) - sessions.erase(cursor); + if(*cursor == session) { + sessions.erase(cursor); + break; + } lock.unlock(); } @@ -98,26 +100,26 @@ namespace core { } return 1; } - + void TCPServer::sendToAll(std::stringstream &data) { for(auto session : sessions) - session->write(data.str()); + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { for(auto session : sessions) - if(session != &sender) - session->write(data.str()); + if(session != &sender) + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { for(auto session : sessions) - if(filter.test(*session)) - if(session != &sender) - session->write(data.str()); + if(filter.test(*session)) + if(session != &sender) + session->write(data.str()); data.str(""); } - + } diff --git a/Timer.cpp b/Timer.cpp index 8031ae8..de868db 100644 --- a/Timer.cpp +++ b/Timer.cpp @@ -4,7 +4,7 @@ namespace core { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); - ePoll.registerSocket(this); +// ePoll.registerSocket(this); setTimer(delay); }