ServerCore/SubscriptionManager.cpp

137 lines
4.7 KiB
C++

#include "SubscriptionManager.h"
#include "Log.h"
#include "Subscription.h"
#include "SubscriptionHandlerFactory.h"
#include "TCPServer.h"
#include <algorithm>
namespace core {
int SubscriptionManager::add(Subscription &subscription) {
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock();
return 1;
}
bool SubscriptionManager::onClearSubscription(coreutils::MString temp, coreutils::MString key) {
temp = "";
key = "";
return true;
}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int countSubscribed = 0;
int countPublished = 0;
int count = 0;
lock.lock();
coreutils::MString temp = "";
for (auto [key, subscription] : subscriptions) {
if (temp != "") {
subscriptions.erase(temp);
temp = "";
}
countSubscribed += subscription->unsubscribe(session);
if (subscription->subscriptionOwners == session.alias) {
if (subscription->selectionOfOwnerForSubscription == "*DIE") {
onClearSubscription(temp, key);
}
else if (subscription->will == "*NEXT") {
if (subscription->subscribers.size() < 1) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next";
onClearSubscription(temp, key);
}
else {
for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++) {
if ((*subscribed)->alias == session.alias) {
subscription->subscribers.erase(subscribed++);
}
subscription->subscriptionOwner(session.alias, "*NEXT");
}
}
}
else {
coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist";
}
++countPublished;
}
}
if (temp != "") {
subscriptions.erase(temp);
temp = "";
}
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
lock.unlock();
return countSubscribed;
}
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
if (request[0] == "publish") {
SubscriptionHandler *handler = NULL;
if (request.getList().size() > 3) {
handler = factory->getSubscriptionHandler(request[4].str());
}
if (request[2] == "*OWNER") {
newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1;
}
return 1;
}
else if (request[0] == "catalog") {
session.out << ":catalog:";
for (auto const &[key, subscription] : subscriptions) {
session.out << subscription->id << ";";
}
session.out << std::endl;
return 1;
}
else if (request[0] == "invite") {
std::stringstream out;
coreutils::Log(coreutils::LOG_DEBUG_1) << request[2];
invitee = (coreutils::MString *)&request[2];
TCPSession *tempSession = session.server.getSessionByAlias(invitee);
std::stringstream temp;
temp << "invite:" << request[1] << ":" << invitee;
tempSession->write(temp.str());
return 1;
}
auto subscription = subscriptions[request[1].str()];
if (request[1].equals(subscription->id)) {
if (request[0] == "unpublish") {
subscriptions.erase(request[1].str());
}
else if (request[0] == "subscribe") {
subscription->subscribe(session);
return 1;
}
else if (request[0] == "unsubscribe") {
subscription->unsubscribe(session);
return 1;
}
else if (request[0] == "event") {
std::stringstream out;
subscription->process(request, out, session);
if (subscription->mode == "*ANYONE") {
subscription->event(out);
return 1;
}
else if (subscription->mode == "*SUBSCRIBERS") {
if (subscription->ifSubscriber(session)) {
subscription->event(out);
return 1;
}
}
}
return 0;
}
return 0;
}
}