Subscription system improvements.

This commit is contained in:
Brad Arant 2022-01-29 15:52:04 +00:00
parent 3ef82e0fce
commit 16ffd60e75
7 changed files with 33 additions and 23 deletions

View File

@ -17,7 +17,7 @@ namespace core {
else { else {
if(request.equals("")) if(request.equals(""))
return false; return false;
request.split(delimiter, 2); request.split(delimiter, 10);
request.reset(); request.reset();
try { try {
auto command = commands.at(request[0].str()); auto command = commands.at(request[0].str());

1
Sihen.txt Normal file
View File

@ -0,0 +1 @@
0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos:

View File

@ -1,8 +1,11 @@
#include "Subscription.h" #include "Subscription.h"
#include "TCPSession.h"
namespace core { namespace core {
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(session) {} Subscription::Subscription(std::string id) : id(id), owner(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {}
Subscription::~Subscription() { Subscription::~Subscription() {
std::stringstream out; std::stringstream out;
@ -13,6 +16,7 @@ namespace core {
} }
int Subscription::subscribe(TCPSession &session) { int Subscription::subscribe(TCPSession &session) {
onSubscribe(session);
subscribers.push_back(&session); subscribers.push_back(&session);
return 1; return 1;
} }
@ -38,9 +42,8 @@ namespace core {
return 1; return 1;
} }
int Subscription::processCommand(coreutils::ZString &request, TCPSession &session) { int Subscription::onSubscribe(TCPSession &session) {
session.out << "Generic subscription passthrough"; return 0;
return 1;
} }
} }

View File

@ -1,30 +1,33 @@
#ifndef __Subscription_h__ #ifndef __Subscription_h__
#define __Subscription_h__ #define __Subscription_h__
#include "TCPSession.h"
#include "Command.h"
#include "ZString.h" #include "ZString.h"
#include <vector> #include <vector>
#include <string> #include <string>
namespace core { namespace core {
class Subscription : public Command { class TCPSession;
class Subscription {
public: public:
Subscription(std::string id);
Subscription(std::string id, TCPSession &session); Subscription(std::string id, TCPSession &session);
~Subscription(); virtual ~Subscription();
int subscribe(TCPSession &session); int subscribe(TCPSession &session);
int unsubscribe(TCPSession &session); int unsubscribe(TCPSession &session);
virtual int process(coreutils::ZString &request, std::stringstream &out); virtual int process(coreutils::ZString &request, std::stringstream &out);
virtual int onSubscribe(TCPSession &session);
int event(std::stringstream &out); int event(std::stringstream &out);
int processCommand(coreutils::ZString &request, TCPSession &session) override; // int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id; std::string id;
TCPSession &owner; TCPSession *owner;
std::vector<TCPSession *> subscribers; std::vector<TCPSession *> subscribers;

View File

@ -5,12 +5,17 @@ namespace core {
SubscriptionManager::SubscriptionManager() {} SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::add(Subscription &subscription) {
subscriptions.insert(std::make_pair(subscription.id, &subscription));
return 1;
}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int countSubscribed = 0; int countSubscribed = 0;
int countPublished = 0; int countPublished = 0;
for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) { for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) {
countSubscribed += (*subscription).second->unsubscribe(session); countSubscribed += (*subscription).second->unsubscribe(session);
if(&(*subscription).second->owner == &session) { if((*subscription).second->owner == &session) {
subscription = subscriptions.erase(subscription); subscription = subscriptions.erase(subscription);
delete (*subscription).second; delete (*subscription).second;
++countPublished; ++countPublished;
@ -24,19 +29,15 @@ namespace core {
} }
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Processing subscription request: " << request << ".";
if(request[0].equals("publish")) { if(request[0].equals("publish")) {
Subscription *newSubscription = new Subscription(request[1].str(), session); Subscription *newSubscription = new Subscription(request[1].str(), session);
subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1; return 1;
} else if(request[0].equals("catalog")) { } else if(request[0].equals("catalog")) {
session.out << ":catalog:"; session.out << ":catalog:";
for(auto const& [key, subscription] : subscriptions) { for(auto const& [key, subscription] : subscriptions) {
session.out << subscription->id << "|"; session.out << subscription->id << "|";
subscription->processCommand(request, session); // subscription->processCommand(request, session);
session.out << (";"); session.out << (";");
} }
session.out << std::endl; session.out << std::endl;
@ -53,7 +54,7 @@ namespace core {
} else if(request[0].equals("unsubscribe")) { } else if(request[0].equals("unsubscribe")) {
subscription->unsubscribe(session); subscription->unsubscribe(session);
} else if(request[0].equals("event")) { } else if(request[0].equals("event")) {
if(&subscription->owner == &session) { if(subscription->owner == &session) {
std::stringstream out; std::stringstream out;
subscription->process(request, out); subscription->process(request, out);
subscription->event(out); subscription->event(out);

View File

@ -15,6 +15,8 @@ namespace core {
public: public:
SubscriptionManager(); SubscriptionManager();
int add(Subscription &subscription);
int removeSessionSubscriptions(TCPSession &session); int removeSessionSubscriptions(TCPSession &session);
int processCommand(coreutils::ZString &request, TCPSession &session) override; int processCommand(coreutils::ZString &request, TCPSession &session) override;

Binary file not shown.