From 03a37998881db9de5a9cb0ece91552f3ef8d339b Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Mon, 14 Mar 2022 18:48:58 +0000 Subject: [PATCH 1/4] Mods to improve stability for Zephora. --- EPoll.cpp | 1 + Socket.cpp | 8 ++++--- Subscription.cpp | 52 ++++++++++++++++++++++++---------------- Subscription.h | 12 ++++++---- SubscriptionManager.cpp | 2 +- TCPServer.cpp | 2 ++ output/main | Bin 106536 -> 106576 bytes 7 files changed, 47 insertions(+), 30 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index db7bad6..4cbd9a0 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -2,6 +2,7 @@ #include "EPoll.h" #include "Command.h" #include "Exception.h" +#include namespace core { diff --git a/Socket.cpp b/Socket.cpp index a17e063..3e89ac0 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -6,6 +6,8 @@ namespace core { + void sigpipe_handler(int unused) {} + Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "]."; buffer = (char *)malloc(4096); @@ -110,7 +112,7 @@ namespace core { if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); -// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; + coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; onDataReceived(zbuffer); } else { @@ -140,8 +142,8 @@ namespace core { outlock.lock(); ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); - if(shutDown && !needsToWrite()) - delete this; +// if(shutDown && !needsToWrite()) +// delete this; outlock.unlock(); } } diff --git a/Subscription.cpp b/Subscription.cpp index 97e09be..cd00ec8 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -3,33 +3,39 @@ #include "Log.h" #include -namespace core { +namespace core +{ + Subscription::Subscription(std::string id, std::string mode) + : id(id), mode(mode), owner(NULL) {} - Subscription::Subscription(std::string id, std::string mode) - : id(id), mode(mode), owner(NULL) {} + Subscription::Subscription(std::string id, TCPSession &session, std::string mode) + : id(id), mode(mode), owner(&session) {} - Subscription::Subscription(std::string id, TCPSession &session, std::string mode) - : id(id), mode(mode), owner(&session) {} - - Subscription::~Subscription() { + Subscription::~Subscription() + { coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; std::stringstream out; out << "cancel:" << id << std::endl; - for(auto subscriber : subscribers) { + for (auto subscriber : subscribers) + { subscriber->write(out.str()); } } - int Subscription::subscribe(TCPSession &session) { - onSubscribe(session); + int Subscription::subscribe(TCPSession &session) + { + onSubscribe(session); subscribers.push_back(&session); - return 1; + return 1; } - int Subscription::unsubscribe(TCPSession &session) { - for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { - if(*subscriber == &session) { + int Subscription::unsubscribe(TCPSession &session) + { + for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) + { + if (*subscriber == &session) + { subscribers.erase(subscriber++); return 1; } @@ -37,23 +43,27 @@ namespace core { return 0; } - int Subscription::process(coreutils::ZString &request, std::stringstream &out) { + int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) + { out << "event:" << request[1] << ":" << request[2] << std::endl; return 1; } - int Subscription::event(std::stringstream &out) { - for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) - (*subscriber)->write(out.str()); + int Subscription::event(std::stringstream &out) + { + for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) + (*subscriber)->write(out.str()); return 1; } - bool Subscription::ifSubscriber(TCPSession &session) { + bool Subscription::ifSubscriber(TCPSession &session) + { return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); } - int Subscription::onSubscribe(TCPSession &session) { + int Subscription::onSubscribe(TCPSession &session) + { return 0; } - + } diff --git a/Subscription.h b/Subscription.h index 31336e3..371b6c4 100644 --- a/Subscription.h +++ b/Subscription.h @@ -5,11 +5,13 @@ #include #include -namespace core { +namespace core +{ class TCPSession; - class Subscription { + class Subscription + { public: Subscription(std::string id, std::string mode = "*AUTHOR"); @@ -19,13 +21,14 @@ namespace core { int subscribe(TCPSession &session); int unsubscribe(TCPSession &session); - virtual int process(coreutils::ZString &request, std::stringstream &out); + virtual int process(coreutils::ZString &request, std::stringstream &out, TCPSession &session); + virtual int onSubscribe(TCPSession &session); int event(std::stringstream &out); bool ifSubscriber(TCPSession &session); - + // int processCommand(coreutils::ZString &request, TCPSession &session) override; std::string id; @@ -33,7 +36,6 @@ namespace core { TCPSession *owner; std::vector subscribers; - }; } diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index dcc754b..d5575e8 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -64,7 +64,7 @@ namespace core { return 1; } else if(request[0].equals("event")) { std::stringstream out; - subscription->process(request, out); + subscription->process(request, out, session); if(subscription->mode == "*ANYONE") { subscription->event(out); return 1; diff --git a/TCPServer.cpp b/TCPServer.cpp index 9bacc86..61dc17e 100644 --- a/TCPServer.cpp +++ b/TCPServer.cpp @@ -70,9 +70,11 @@ namespace core { void TCPServer::removeFromSessionList(TCPSession *session) { std::vector::iterator cursor; + lock.lock(); for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) if(*cursor == session) sessions.erase(cursor); + lock.unlock(); } void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { diff --git a/output/main b/output/main index b3e266c4e10dfdfc23eca62f978556db9fb095c3..71318ccd0a992ac70416356d3e4622123bf3d84d 100755 GIT binary patch delta 152 zcmZ2+fbGHowh0=H0UI^#nMIUtgxj6jy~4q8{lz{B$@5KD)21|Tu3=uRBWa>%pleu~ zRGL>(YFKAvpl4vBsbHxG6m8}=*v@aj$P>L?rkimBJEP(DwUZd5xfz3}bIxNlXH1^% sIFHeuaq{#=5Os0-K?ub-AIQ5g-4R4-Om75H6Q&;oQQxLBE?`s#01nMG5&!@I delta 139 zcmca`fNjMAwh0=H8XGn3nMGV#o7HPXpU>DP#INxCwf^}fJf~$h*Dx>EY4$MK?qR_A zCu+NMH{(QhMuX`aCNYX_e?5uOm77s(y5T%Vb4IJ_ne!O!8MCIZ1W_xdzl2bl^MSne f(=$O7!}OIPDr5RfAjNrZDkB33L`^rG$tVv1^pr46 From 3a6883e77f1b560ca0b39dbf470a685548c5c0a5 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Tue, 15 Mar 2022 21:02:35 +0000 Subject: [PATCH 2/4] Cleaned up socket scheduling. Very clean now. --- EPoll.cpp | 6 +++--- Socket.cpp | 35 +++++++++-------------------------- Socket.h | 2 +- Subscription.cpp | 1 - SubscriptionManager.cpp | 4 ++++ SubscriptionManager.h | 3 ++- TCPServer.cpp | 26 ++++++++++++++------------ Timer.cpp | 2 +- 8 files changed, 34 insertions(+), 45 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index 4cbd9a0..9f669e9 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -97,11 +97,11 @@ namespace core { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); } - + void EPoll::disableSocket(Socket *socket) { epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); } - + void EPoll::resetSocket(Socket *socket) { struct epoll_event event; event.data.ptr = socket; @@ -110,5 +110,5 @@ namespace core { event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); } - + } diff --git a/Socket.cpp b/Socket.cpp index 3e89ac0..8b0cdf6 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -17,7 +17,7 @@ namespace core { Socket::~Socket() { free(buffer); if(descriptor == -1) - return; + return; onUnregister(); ePoll.unregisterSocket(this); coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; @@ -26,10 +26,9 @@ namespace core { void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { - shutdown("Too many files open"); - throw coreutils::Exception("Too many files open. Refusing connection."); + shutdown("Too many files open"); + throw coreutils::Exception("Too many files open. Refusing connection."); } - lock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; if(descriptor < 3) throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); @@ -37,7 +36,6 @@ namespace core { onRegister(); ePoll.registerSocket(this); onRegistered(); - lock.unlock(); } int Socket::getDescriptor() { @@ -63,29 +61,20 @@ namespace core { void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event) { - - lock.lock(); - if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); } - - if(event.events & EPOLLIN) { + else if(event.events & EPOLLIN) { coreutils::ZString zbuffer(buffer, length); receiveData(zbuffer); } - - if(event.events & EPOLLWRNORM) { + else if(event.events & EPOLLWRNORM) { writeSocket(); } - - if(event.events & EPOLLHUP) { + else if(event.events & EPOLLHUP) { shutdown(); } - - lock.unlock(); - return !shutDown; } @@ -107,9 +96,6 @@ namespace core { int len; int error = -1; -// for(int ix = 0; ix < buffer.getLength(); ++ix) -// buffer[ix] = 0; - if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; @@ -140,10 +126,9 @@ namespace core { void Socket::writeSocket() { if(fifo.size() > 0) { outlock.lock(); - ::write(descriptor, fifo.front().c_str(), fifo.front().length()); + if(!shutDown) + ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); -// if(shutDown && !needsToWrite()) -// delete this; outlock.unlock(); } } @@ -152,7 +137,7 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); - ePoll.resetSocket(this); +// ePoll.resetSocket(this); return 1; } @@ -168,8 +153,6 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; - // if(!needsToWrite()) -// delete this; } } diff --git a/Socket.h b/Socket.h index 2063089..9700bcd 100644 --- a/Socket.h +++ b/Socket.h @@ -167,7 +167,7 @@ namespace core { std::string text; int descriptor = -1; - std::mutex lock; +// std::mutex lock; std::mutex outlock; bool readHangup = false; diff --git a/Subscription.cpp b/Subscription.cpp index cd00ec8..ce40556 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -14,7 +14,6 @@ namespace core Subscription::~Subscription() { - coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; std::stringstream out; out << "cancel:" << id << std::endl; for (auto subscriber : subscribers) diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index d5575e8..44c2468 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -7,7 +7,9 @@ namespace core { SubscriptionManager::SubscriptionManager() {} int SubscriptionManager::add(Subscription &subscription) { + lock.lock(); subscriptions.insert(std::make_pair(subscription.id, &subscription)); + lock.unlock(); return 1; } @@ -15,6 +17,7 @@ namespace core { int countSubscribed = 0; int countPublished = 0; + lock.lock(); std::string temp = ""; for(auto [key, subscription] : subscriptions) { if(temp != "") { @@ -34,6 +37,7 @@ namespace core { } coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; + lock.unlock(); return countSubscribed; } diff --git a/SubscriptionManager.h b/SubscriptionManager.h index e0d19a3..6fa042a 100644 --- a/SubscriptionManager.h +++ b/SubscriptionManager.h @@ -23,7 +23,8 @@ namespace core { private: std::map subscriptions; - + std::mutex lock; + }; } diff --git a/TCPServer.cpp b/TCPServer.cpp index 61dc17e..24adf2d 100644 --- a/TCPServer.cpp +++ b/TCPServer.cpp @@ -72,8 +72,10 @@ namespace core { std::vector::iterator cursor; lock.lock(); for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) - if(*cursor == session) - sessions.erase(cursor); + if(*cursor == session) { + sessions.erase(cursor); + break; + } lock.unlock(); } @@ -98,26 +100,26 @@ namespace core { } return 1; } - + void TCPServer::sendToAll(std::stringstream &data) { for(auto session : sessions) - session->write(data.str()); + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { for(auto session : sessions) - if(session != &sender) - session->write(data.str()); + if(session != &sender) + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { for(auto session : sessions) - if(filter.test(*session)) - if(session != &sender) - session->write(data.str()); + if(filter.test(*session)) + if(session != &sender) + session->write(data.str()); data.str(""); } - + } diff --git a/Timer.cpp b/Timer.cpp index 8031ae8..de868db 100644 --- a/Timer.cpp +++ b/Timer.cpp @@ -4,7 +4,7 @@ namespace core { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); - ePoll.registerSocket(this); +// ePoll.registerSocket(this); setTimer(delay); } From c918e4229050c0d65780630560b3bf96d8b0c832 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Tue, 15 Mar 2022 21:10:48 +0000 Subject: [PATCH 3/4] Removed some comments. --- Socket.cpp | 1 - Timer.cpp | 1 - 2 files changed, 2 deletions(-) diff --git a/Socket.cpp b/Socket.cpp index 8b0cdf6..522202a 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -137,7 +137,6 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); -// ePoll.resetSocket(this); return 1; } diff --git a/Timer.cpp b/Timer.cpp index de868db..8f74c8d 100644 --- a/Timer.cpp +++ b/Timer.cpp @@ -4,7 +4,6 @@ namespace core { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); -// ePoll.registerSocket(this); setTimer(delay); } From 08561f6dc3e6f8c31bd68df17ea351dcd74d0cb6 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Tue, 15 Mar 2022 23:53:22 +0000 Subject: [PATCH 4/4] Fixed cross socket writing issue with epoll scheduler. --- Socket.cpp | 4 ++++ Socket.h | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Socket.cpp b/Socket.cpp index 522202a..7872f84 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -61,6 +61,7 @@ namespace core { void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event) { + inHandler = true; if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); @@ -75,6 +76,7 @@ namespace core { else if(event.events & EPOLLHUP) { shutdown(); } + inHandler = false; return !shutDown; } @@ -137,6 +139,8 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); + if(!inHandler) + ePoll.resetSocket(this); return 1; } diff --git a/Socket.h b/Socket.h index 9700bcd..d0049c0 100644 --- a/Socket.h +++ b/Socket.h @@ -170,7 +170,7 @@ namespace core { // std::mutex lock; std::mutex outlock; bool readHangup = false; - + bool inHandler = false; // struct epoll_event event; // Event selection construction structure. //-------------------------------------------------------------------------------------