diff --git a/EPoll.cpp b/EPoll.cpp index db7bad6..9f669e9 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -2,6 +2,7 @@ #include "EPoll.h" #include "Command.h" #include "Exception.h" +#include namespace core { @@ -96,11 +97,11 @@ namespace core { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); } - + void EPoll::disableSocket(Socket *socket) { epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); } - + void EPoll::resetSocket(Socket *socket) { struct epoll_event event; event.data.ptr = socket; @@ -109,5 +110,5 @@ namespace core { event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); } - + } diff --git a/Socket.cpp b/Socket.cpp index a17e063..7872f84 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -6,6 +6,8 @@ namespace core { + void sigpipe_handler(int unused) {} + Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "]."; buffer = (char *)malloc(4096); @@ -15,7 +17,7 @@ namespace core { Socket::~Socket() { free(buffer); if(descriptor == -1) - return; + return; onUnregister(); ePoll.unregisterSocket(this); coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; @@ -24,10 +26,9 @@ namespace core { void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { - shutdown("Too many files open"); - throw coreutils::Exception("Too many files open. Refusing connection."); + shutdown("Too many files open"); + throw coreutils::Exception("Too many files open. Refusing connection."); } - lock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; if(descriptor < 3) throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); @@ -35,7 +36,6 @@ namespace core { onRegister(); ePoll.registerSocket(this); onRegistered(); - lock.unlock(); } int Socket::getDescriptor() { @@ -61,29 +61,22 @@ namespace core { void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event) { - - lock.lock(); - + inHandler = true; if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); } - - if(event.events & EPOLLIN) { + else if(event.events & EPOLLIN) { coreutils::ZString zbuffer(buffer, length); receiveData(zbuffer); } - - if(event.events & EPOLLWRNORM) { + else if(event.events & EPOLLWRNORM) { writeSocket(); } - - if(event.events & EPOLLHUP) { + else if(event.events & EPOLLHUP) { shutdown(); } - - lock.unlock(); - + inHandler = false; return !shutDown; } @@ -105,12 +98,9 @@ namespace core { int len; int error = -1; -// for(int ix = 0; ix < buffer.getLength(); ++ix) -// buffer[ix] = 0; - if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); -// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; + coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; onDataReceived(zbuffer); } else { @@ -138,10 +128,9 @@ namespace core { void Socket::writeSocket() { if(fifo.size() > 0) { outlock.lock(); - ::write(descriptor, fifo.front().c_str(), fifo.front().length()); + if(!shutDown) + ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); - if(shutDown && !needsToWrite()) - delete this; outlock.unlock(); } } @@ -150,7 +139,8 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); - ePoll.resetSocket(this); + if(!inHandler) + ePoll.resetSocket(this); return 1; } @@ -166,8 +156,6 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; - // if(!needsToWrite()) -// delete this; } } diff --git a/Socket.h b/Socket.h index 2063089..d0049c0 100644 --- a/Socket.h +++ b/Socket.h @@ -167,10 +167,10 @@ namespace core { std::string text; int descriptor = -1; - std::mutex lock; +// std::mutex lock; std::mutex outlock; bool readHangup = false; - + bool inHandler = false; // struct epoll_event event; // Event selection construction structure. //------------------------------------------------------------------------------------- diff --git a/Subscription.cpp b/Subscription.cpp index 97e09be..ce40556 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -3,33 +3,38 @@ #include "Log.h" #include -namespace core { +namespace core +{ + Subscription::Subscription(std::string id, std::string mode) + : id(id), mode(mode), owner(NULL) {} - Subscription::Subscription(std::string id, std::string mode) - : id(id), mode(mode), owner(NULL) {} + Subscription::Subscription(std::string id, TCPSession &session, std::string mode) + : id(id), mode(mode), owner(&session) {} - Subscription::Subscription(std::string id, TCPSession &session, std::string mode) - : id(id), mode(mode), owner(&session) {} - - Subscription::~Subscription() { - coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; + Subscription::~Subscription() + { std::stringstream out; out << "cancel:" << id << std::endl; - for(auto subscriber : subscribers) { + for (auto subscriber : subscribers) + { subscriber->write(out.str()); } } - int Subscription::subscribe(TCPSession &session) { - onSubscribe(session); + int Subscription::subscribe(TCPSession &session) + { + onSubscribe(session); subscribers.push_back(&session); - return 1; + return 1; } - int Subscription::unsubscribe(TCPSession &session) { - for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { - if(*subscriber == &session) { + int Subscription::unsubscribe(TCPSession &session) + { + for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) + { + if (*subscriber == &session) + { subscribers.erase(subscriber++); return 1; } @@ -37,23 +42,27 @@ namespace core { return 0; } - int Subscription::process(coreutils::ZString &request, std::stringstream &out) { + int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) + { out << "event:" << request[1] << ":" << request[2] << std::endl; return 1; } - int Subscription::event(std::stringstream &out) { - for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) - (*subscriber)->write(out.str()); + int Subscription::event(std::stringstream &out) + { + for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) + (*subscriber)->write(out.str()); return 1; } - bool Subscription::ifSubscriber(TCPSession &session) { + bool Subscription::ifSubscriber(TCPSession &session) + { return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); } - int Subscription::onSubscribe(TCPSession &session) { + int Subscription::onSubscribe(TCPSession &session) + { return 0; } - + } diff --git a/Subscription.h b/Subscription.h index 31336e3..371b6c4 100644 --- a/Subscription.h +++ b/Subscription.h @@ -5,11 +5,13 @@ #include #include -namespace core { +namespace core +{ class TCPSession; - class Subscription { + class Subscription + { public: Subscription(std::string id, std::string mode = "*AUTHOR"); @@ -19,13 +21,14 @@ namespace core { int subscribe(TCPSession &session); int unsubscribe(TCPSession &session); - virtual int process(coreutils::ZString &request, std::stringstream &out); + virtual int process(coreutils::ZString &request, std::stringstream &out, TCPSession &session); + virtual int onSubscribe(TCPSession &session); int event(std::stringstream &out); bool ifSubscriber(TCPSession &session); - + // int processCommand(coreutils::ZString &request, TCPSession &session) override; std::string id; @@ -33,7 +36,6 @@ namespace core { TCPSession *owner; std::vector subscribers; - }; } diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index dcc754b..44c2468 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -7,7 +7,9 @@ namespace core { SubscriptionManager::SubscriptionManager() {} int SubscriptionManager::add(Subscription &subscription) { + lock.lock(); subscriptions.insert(std::make_pair(subscription.id, &subscription)); + lock.unlock(); return 1; } @@ -15,6 +17,7 @@ namespace core { int countSubscribed = 0; int countPublished = 0; + lock.lock(); std::string temp = ""; for(auto [key, subscription] : subscriptions) { if(temp != "") { @@ -34,6 +37,7 @@ namespace core { } coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; + lock.unlock(); return countSubscribed; } @@ -64,7 +68,7 @@ namespace core { return 1; } else if(request[0].equals("event")) { std::stringstream out; - subscription->process(request, out); + subscription->process(request, out, session); if(subscription->mode == "*ANYONE") { subscription->event(out); return 1; diff --git a/SubscriptionManager.h b/SubscriptionManager.h index e0d19a3..6fa042a 100644 --- a/SubscriptionManager.h +++ b/SubscriptionManager.h @@ -23,7 +23,8 @@ namespace core { private: std::map subscriptions; - + std::mutex lock; + }; } diff --git a/TCPServer.cpp b/TCPServer.cpp index 9bacc86..24adf2d 100644 --- a/TCPServer.cpp +++ b/TCPServer.cpp @@ -70,9 +70,13 @@ namespace core { void TCPServer::removeFromSessionList(TCPSession *session) { std::vector::iterator cursor; + lock.lock(); for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) - if(*cursor == session) - sessions.erase(cursor); + if(*cursor == session) { + sessions.erase(cursor); + break; + } + lock.unlock(); } void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { @@ -96,26 +100,26 @@ namespace core { } return 1; } - + void TCPServer::sendToAll(std::stringstream &data) { for(auto session : sessions) - session->write(data.str()); + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { for(auto session : sessions) - if(session != &sender) - session->write(data.str()); + if(session != &sender) + session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { for(auto session : sessions) - if(filter.test(*session)) - if(session != &sender) - session->write(data.str()); + if(filter.test(*session)) + if(session != &sender) + session->write(data.str()); data.str(""); } - + } diff --git a/Timer.cpp b/Timer.cpp index 8031ae8..8f74c8d 100644 --- a/Timer.cpp +++ b/Timer.cpp @@ -4,7 +4,6 @@ namespace core { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); - ePoll.registerSocket(this); setTimer(delay); } diff --git a/output/main b/output/main index b3e266c..71318cc 100755 Binary files a/output/main and b/output/main differ