#include "SubscriptionManager.h" #include "Log.h" #include "Subscription.h" #include "SubscriptionHandlerFactory.h" #include "TCPServer.h" #include namespace core { int SubscriptionManager::add(Subscription &subscription) { lock.lock(); subscriptions.insert(std::make_pair(subscription.id, &subscription)); lock.unlock(); return 1; } bool SubscriptionManager::onClearSubscription(coreutils::MString temp, coreutils::MString key) { temp = ""; key = ""; return true; } int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { int countSubscribed = 0; int countPublished = 0; int count = 0; lock.lock(); coreutils::MString temp = ""; for (auto [key, subscription] : subscriptions) { if (temp != "") { subscriptions.erase(temp); temp = ""; } countSubscribed += subscription->unsubscribe(session); if (subscription->subscriptionOwners == session.alias) { if (subscription->selectionOfOwnerForSubscription == "*DIE") { onClearSubscription(temp, key); } else if (subscription->will == "*NEXT") { if (subscription->subscribers.size() < 1) { coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next"; onClearSubscription(temp, key); } else { for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++) { if ((*subscribed)->alias == session.alias) { subscription->subscribers.erase(subscribed++); } subscription->subscriptionOwner(session.alias, "*NEXT"); } } } else { coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist"; } ++countPublished; } } if (temp != "") { subscriptions.erase(temp); temp = ""; } coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; lock.unlock(); return countSubscribed; } int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { if (request[0] == "publish") { SubscriptionHandler *handler = NULL; if (request.getList().size() > 3) { handler = factory->getSubscriptionHandler(request[4].str()); } if (request[2] == "*OWNER") { newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str()); subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); return 1; } return 1; } else if (request[0] == "catalog") { session.out << ":catalog:"; for (auto const &[key, subscription] : subscriptions) { session.out << subscription->id << ";"; } session.out << std::endl; return 1; } else if (request[0] == "invite") { std::stringstream out; coreutils::Log(coreutils::LOG_DEBUG_1) << request[2]; invitee = (coreutils::MString *)&request[2]; TCPSession *tempSession = session.server.getSessionByAlias(invitee); std::stringstream temp; temp << "invite:" << request[1] << ":" << invitee; tempSession->write(temp.str()); return 1; } auto subscription = subscriptions[request[1].str()]; if (request[1].equals(subscription->id)) { if (request[0] == "unpublish") { subscriptions.erase(request[1].str()); } else if (request[0] == "subscribe") { subscription->subscribe(session); return 1; } else if (request[0] == "unsubscribe") { subscription->unsubscribe(session); return 1; } else if (request[0] == "event") { std::stringstream out; subscription->process(request, out, session); if (subscription->mode == "*ANYONE") { subscription->event(out); return 1; } else if (subscription->mode == "*SUBSCRIBERS") { if (subscription->ifSubscriber(session)) { subscription->event(out); return 1; } } } return 0; } return 0; } }