diff --git a/Socket.cpp b/Socket.cpp index c372090..a17e063 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -67,8 +67,6 @@ namespace core { if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); - lock.unlock(); - return false; } if(event.events & EPOLLIN) { @@ -82,14 +80,11 @@ namespace core { if(event.events & EPOLLHUP) { shutdown(); - lock.unlock(); - return false; } lock.unlock(); - reset = true; - return reset; + return !shutDown; } void Socket::onDataReceived(std::string data) { @@ -172,7 +167,7 @@ namespace core { shutDown = true; reset = false; // if(!needsToWrite()) - delete this; +// delete this; } } diff --git a/Socket.h b/Socket.h index cdfdb81..2063089 100644 --- a/Socket.h +++ b/Socket.h @@ -53,7 +53,7 @@ namespace core { /// /// Use the shutdown() method to terminate the socket connection and remove resources. /// This method is provided to ensure that all destructors are called for all inherited - /// objects without a virtual destructor. + /// objects with a virtual destructor. /// void shutdown(std::string text = "unknown"); @@ -76,6 +76,9 @@ namespace core { /// from any user extended classes unless an epoll event is being /// simulated. /// + /// The return value of false will delete the socket object causing the destructors to run. + /// The return value of true will enable the socket on ePoll to receive more events. + /// bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks. diff --git a/Subscription.cpp b/Subscription.cpp index ba16cd3..97e09be 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -1,13 +1,19 @@ #include "Subscription.h" #include "TCPSession.h" +#include "Log.h" +#include namespace core { - Subscription::Subscription(std::string id) : id(id), owner(NULL) {} - Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {} + Subscription::Subscription(std::string id, std::string mode) + : id(id), mode(mode), owner(NULL) {} + + Subscription::Subscription(std::string id, TCPSession &session, std::string mode) + : id(id), mode(mode), owner(&session) {} Subscription::~Subscription() { + coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; std::stringstream out; out << "cancel:" << id << std::endl; for(auto subscriber : subscribers) { @@ -42,6 +48,10 @@ namespace core { return 1; } + bool Subscription::ifSubscriber(TCPSession &session) { + return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); + } + int Subscription::onSubscribe(TCPSession &session) { return 0; } diff --git a/Subscription.h b/Subscription.h index 4b11c76..31336e3 100644 --- a/Subscription.h +++ b/Subscription.h @@ -12,8 +12,8 @@ namespace core { class Subscription { public: - Subscription(std::string id); - Subscription(std::string id, TCPSession &session); + Subscription(std::string id, std::string mode = "*AUTHOR"); + Subscription(std::string id, TCPSession &session, std::string mode = "*AUTHOR"); virtual ~Subscription(); int subscribe(TCPSession &session); @@ -24,9 +24,12 @@ namespace core { int event(std::stringstream &out); + bool ifSubscriber(TCPSession &session); + // int processCommand(coreutils::ZString &request, TCPSession &session) override; std::string id; + std::string mode; TCPSession *owner; std::vector subscribers; diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index f315d7d..dcc754b 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -1,5 +1,6 @@ #include "SubscriptionManager.h" #include "Log.h" +#include namespace core { @@ -13,15 +14,23 @@ namespace core { int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { int countSubscribed = 0; int countPublished = 0; - for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) { - countSubscribed += (*subscription).second->unsubscribe(session); - if((*subscription).second->owner == &session) { - subscription = subscriptions.erase(subscription); -// delete (*subscription).second; - ++countPublished; - } else - ++subscription; + std::string temp = ""; + for(auto [key, subscription] : subscriptions) { + if(temp != "") { + subscriptions.erase(temp); + temp = ""; + } + countSubscribed += subscription->unsubscribe(session); + if(subscription->owner == &session) { + temp = key; + delete subscription; + ++countPublished; + } + } + if(temp != "") { + subscriptions.erase(temp); + temp = ""; } coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; @@ -30,15 +39,13 @@ namespace core { int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { if(request[0].equals("publish")) { - Subscription *newSubscription = new Subscription(request[1].str(), session); + Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str()); subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); return 1; } else if(request[0].equals("catalog")) { session.out << ":catalog:"; for(auto const& [key, subscription] : subscriptions) { - session.out << subscription->id << "|"; -// subscription->processCommand(request, session); - session.out << (";"); + session.out << subscription->id << ";"; } session.out << std::endl; return 1; @@ -51,19 +58,29 @@ namespace core { subscriptions.erase(request[1].str()); } else if(request[0].equals("subscribe")) { subscription->subscribe(session); + return 1; } else if(request[0].equals("unsubscribe")) { subscription->unsubscribe(session); + return 1; } else if(request[0].equals("event")) { -// coreutils::Log(coreutils::LOG_DEBUG_2) << "request: [" << request << "]."; - if(subscription->owner == &session) { - std::stringstream out; - subscription->process(request, out); + std::stringstream out; + subscription->process(request, out); + if(subscription->mode == "*ANYONE") { subscription->event(out); + return 1; + } else if(subscription->mode == "*SUBSCRIBERS") { + if(subscription->ifSubscriber(session)) { + subscription->event(out); + return 1; + } + } else if(subscription->mode == "*AUTHOR") { + if(subscription->owner == &session) { + subscription->event(out); + return 1; + } } - else - return 0; } - return 1; + return 0; } return 0; } diff --git a/TCPSession.cpp b/TCPSession.cpp index 45bd1e0..59283c2 100644 --- a/TCPSession.cpp +++ b/TCPSession.cpp @@ -10,7 +10,6 @@ namespace core { TCPSession::~TCPSession() { server.removeFromSessionList(this); server.subscriptions.removeSessionSubscriptions(*this); - coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level."; } void TCPSession::output(std::stringstream &data) { diff --git a/Thread.cpp b/Thread.cpp index 4464439..56b463b 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -4,6 +4,7 @@ namespace core { Thread::Thread(EPoll &ePoll) : ePoll(ePoll) {} + Thread::Thread(EPoll &ePoll, ThreadScope *scope) : ePoll(ePoll) {} Thread::~Thread() {} @@ -45,25 +46,28 @@ namespace core { while(1) { - if(ePoll.isStopping()) + if(ePoll.isStopping()) break; - status = "WAITING"; - int rc = epoll_wait(ePoll.getDescriptor(), events, 50, -1); - status = "RUNNING"; + status = "WAITING"; + int rc = epoll_wait(ePoll.getDescriptor(), events, 50, -1); + status = "RUNNING"; - if(rc < 0) { - // TODO: Make log entry indicating status received and ignore for now. - } else if(rc == 0) { - break; - } else if(rc > 0) { - for(int ix = 0; ix < rc; ++ix) { - ++count; - if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) - ePoll.resetSocket((Socket *)events[ix].data.ptr); - } - } - } + if(rc < 0) { + // TODO: Make log entry indicating status received and ignore for now. + } else if(rc == 0) { + break; + } else if(rc > 0) { + for(int ix = 0; ix < rc; ++ix) { + ++count; + if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) { + ePoll.resetSocket((Socket *)events[ix].data.ptr); + } else { + delete (Socket *)events[ix].data.ptr; + } + } + } + } coreutils::Log(coreutils::LOG_DEBUG_1) << "Thread ending with thread id " << threadId << "."; } diff --git a/Thread.h b/Thread.h index 8c52613..023123a 100644 --- a/Thread.h +++ b/Thread.h @@ -5,6 +5,7 @@ #include "Log.h" #include "Object.h" #include "TCPSession.h" +#include "ThreadScope.h" namespace core { @@ -22,6 +23,7 @@ namespace core { public: Thread(EPoll &ePoll); + Thread(EPoll &ePoll, ThreadScope *thread); ~Thread(); /// @@ -43,6 +45,7 @@ namespace core { void print_thread_start_log(); pid_t threadId; void run(); + ThreadScope *thread; }; diff --git a/ThreadScope.h b/ThreadScope.h new file mode 100644 index 0000000..48ca912 --- /dev/null +++ b/ThreadScope.h @@ -0,0 +1,15 @@ +#ifndef __ThreadScope_h__ +#define __ThreadScope_h__ + +namespace core { + + class ThreadScope { + + public: + ThreadScope() {} + + }; + +} + +#endif