Got the subscription event scope parameter added.

This commit is contained in:
Brad Arant 2022-02-13 00:23:56 +00:00
parent 6cf321ceb4
commit d13e3f9fb5
9 changed files with 97 additions and 48 deletions

View File

@ -67,8 +67,6 @@ namespace core {
if(event.events & EPOLLRDHUP) {
readHangup = true;
shutdown("hangup received");
lock.unlock();
return false;
}
if(event.events & EPOLLIN) {
@ -82,14 +80,11 @@ namespace core {
if(event.events & EPOLLHUP) {
shutdown();
lock.unlock();
return false;
}
lock.unlock();
reset = true;
return reset;
return !shutDown;
}
void Socket::onDataReceived(std::string data) {
@ -172,7 +167,7 @@ namespace core {
shutDown = true;
reset = false;
// if(!needsToWrite())
delete this;
// delete this;
}
}

View File

@ -53,7 +53,7 @@ namespace core {
///
/// Use the shutdown() method to terminate the socket connection and remove resources.
/// This method is provided to ensure that all destructors are called for all inherited
/// objects without a virtual destructor.
/// objects with a virtual destructor.
///
void shutdown(std::string text = "unknown");
@ -76,6 +76,9 @@ namespace core {
/// from any user extended classes unless an epoll event is being
/// simulated.
///
/// The return value of false will delete the socket object causing the destructors to run.
/// The return value of true will enable the socket on ePoll to receive more events.
///
bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks.

View File

@ -1,13 +1,19 @@
#include "Subscription.h"
#include "TCPSession.h"
#include "Log.h"
#include <algorithm>
namespace core {
Subscription::Subscription(std::string id) : id(id), owner(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {}
Subscription::Subscription(std::string id, std::string mode)
: id(id), mode(mode), owner(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session, std::string mode)
: id(id), mode(mode), owner(&session) {}
Subscription::~Subscription() {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor....";
std::stringstream out;
out << "cancel:" << id << std::endl;
for(auto subscriber : subscribers) {
@ -42,6 +48,10 @@ namespace core {
return 1;
}
bool Subscription::ifSubscriber(TCPSession &session) {
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
}
int Subscription::onSubscribe(TCPSession &session) {
return 0;
}

View File

@ -12,8 +12,8 @@ namespace core {
class Subscription {
public:
Subscription(std::string id);
Subscription(std::string id, TCPSession &session);
Subscription(std::string id, std::string mode = "*AUTHOR");
Subscription(std::string id, TCPSession &session, std::string mode = "*AUTHOR");
virtual ~Subscription();
int subscribe(TCPSession &session);
@ -24,9 +24,12 @@ namespace core {
int event(std::stringstream &out);
bool ifSubscriber(TCPSession &session);
// int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id;
std::string mode;
TCPSession *owner;
std::vector<TCPSession *> subscribers;

View File

@ -1,5 +1,6 @@
#include "SubscriptionManager.h"
#include "Log.h"
#include <algorithm>
namespace core {
@ -13,15 +14,23 @@ namespace core {
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int countSubscribed = 0;
int countPublished = 0;
for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) {
countSubscribed += (*subscription).second->unsubscribe(session);
if((*subscription).second->owner == &session) {
subscription = subscriptions.erase(subscription);
// delete (*subscription).second;
++countPublished;
} else
++subscription;
std::string temp = "";
for(auto [key, subscription] : subscriptions) {
if(temp != "") {
subscriptions.erase(temp);
temp = "";
}
countSubscribed += subscription->unsubscribe(session);
if(subscription->owner == &session) {
temp = key;
delete subscription;
++countPublished;
}
}
if(temp != "") {
subscriptions.erase(temp);
temp = "";
}
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
@ -30,15 +39,13 @@ namespace core {
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
if(request[0].equals("publish")) {
Subscription *newSubscription = new Subscription(request[1].str(), session);
Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1;
} else if(request[0].equals("catalog")) {
session.out << ":catalog:";
for(auto const& [key, subscription] : subscriptions) {
session.out << subscription->id << "|";
// subscription->processCommand(request, session);
session.out << (";");
session.out << subscription->id << ";";
}
session.out << std::endl;
return 1;
@ -51,19 +58,29 @@ namespace core {
subscriptions.erase(request[1].str());
} else if(request[0].equals("subscribe")) {
subscription->subscribe(session);
return 1;
} else if(request[0].equals("unsubscribe")) {
subscription->unsubscribe(session);
return 1;
} else if(request[0].equals("event")) {
// coreutils::Log(coreutils::LOG_DEBUG_2) << "request: [" << request << "].";
if(subscription->owner == &session) {
std::stringstream out;
subscription->process(request, out);
if(subscription->mode == "*ANYONE") {
subscription->event(out);
}
else
return 0;
}
return 1;
} else if(subscription->mode == "*SUBSCRIBERS") {
if(subscription->ifSubscriber(session)) {
subscription->event(out);
return 1;
}
} else if(subscription->mode == "*AUTHOR") {
if(subscription->owner == &session) {
subscription->event(out);
return 1;
}
}
}
return 0;
}
return 0;
}

View File

@ -10,7 +10,6 @@ namespace core {
TCPSession::~TCPSession() {
server.removeFromSessionList(this);
server.subscriptions.removeSessionSubscriptions(*this);
coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level.";
}
void TCPSession::output(std::stringstream &data) {

View File

@ -4,6 +4,7 @@
namespace core {
Thread::Thread(EPoll &ePoll) : ePoll(ePoll) {}
Thread::Thread(EPoll &ePoll, ThreadScope *scope) : ePoll(ePoll) {}
Thread::~Thread() {}
@ -59,8 +60,11 @@ namespace core {
} else if(rc > 0) {
for(int ix = 0; ix < rc; ++ix) {
++count;
if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix]))
if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) {
ePoll.resetSocket((Socket *)events[ix].data.ptr);
} else {
delete (Socket *)events[ix].data.ptr;
}
}
}
}

View File

@ -5,6 +5,7 @@
#include "Log.h"
#include "Object.h"
#include "TCPSession.h"
#include "ThreadScope.h"
namespace core {
@ -22,6 +23,7 @@ namespace core {
public:
Thread(EPoll &ePoll);
Thread(EPoll &ePoll, ThreadScope *thread);
~Thread();
///
@ -43,6 +45,7 @@ namespace core {
void print_thread_start_log();
pid_t threadId;
void run();
ThreadScope *thread;
};

15
ThreadScope.h Normal file
View File

@ -0,0 +1,15 @@
#ifndef __ThreadScope_h__
#define __ThreadScope_h__
namespace core {
class ThreadScope {
public:
ThreadScope() {}
};
}
#endif