93 lines
3.0 KiB
C++
93 lines
3.0 KiB
C++
#include "SubscriptionManager.h"
|
|
#include "Log.h"
|
|
#include <algorithm>
|
|
|
|
namespace core {
|
|
|
|
SubscriptionManager::SubscriptionManager() {}
|
|
|
|
int SubscriptionManager::add(Subscription &subscription) {
|
|
lock.lock();
|
|
subscriptions.insert(std::make_pair(subscription.id, &subscription));
|
|
lock.unlock();
|
|
return 1;
|
|
}
|
|
|
|
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
|
|
int countSubscribed = 0;
|
|
int countPublished = 0;
|
|
|
|
lock.lock();
|
|
std::string temp = "";
|
|
for(auto [key, subscription] : subscriptions) {
|
|
if(temp != "") {
|
|
subscriptions.erase(temp);
|
|
temp = "";
|
|
}
|
|
countSubscribed += subscription->unsubscribe(session);
|
|
if(subscription->owner == &session) {
|
|
temp = key;
|
|
delete subscription;
|
|
++countPublished;
|
|
}
|
|
}
|
|
if(temp != "") {
|
|
subscriptions.erase(temp);
|
|
temp = "";
|
|
}
|
|
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
|
|
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
|
|
lock.unlock();
|
|
return countSubscribed;
|
|
}
|
|
|
|
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
|
|
if(request[0].equals("publish")) {
|
|
Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str());
|
|
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
|
|
return 1;
|
|
} else if(request[0].equals("catalog")) {
|
|
session.out << ":catalog:";
|
|
for(auto const& [key, subscription] : subscriptions) {
|
|
session.out << subscription->id << ";";
|
|
}
|
|
session.out << std::endl;
|
|
return 1;
|
|
}
|
|
|
|
auto subscription = subscriptions[request[1].str()];
|
|
|
|
if(request[1].equals(subscription->id)) {
|
|
if(request[0].equals("unpublish")) {
|
|
subscriptions.erase(request[1].str());
|
|
} else if(request[0].equals("subscribe")) {
|
|
subscription->subscribe(session);
|
|
return 1;
|
|
} else if(request[0].equals("unsubscribe")) {
|
|
subscription->unsubscribe(session);
|
|
return 1;
|
|
} else if(request[0].equals("event")) {
|
|
std::stringstream out;
|
|
subscription->process(request, out, session);
|
|
if(subscription->mode == "*ANYONE") {
|
|
subscription->event(out);
|
|
return 1;
|
|
} else if(subscription->mode == "*SUBSCRIBERS") {
|
|
if(subscription->ifSubscriber(session)) {
|
|
subscription->event(out);
|
|
return 1;
|
|
}
|
|
} else if(subscription->mode == "*AUTHOR") {
|
|
if(subscription->owner == &session) {
|
|
subscription->event(out);
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
}
|