ServerCore/.history/SubscriptionManager_20221014002952.cpp
Brad Arant db199eaec3 ?
2022-10-18 19:06:28 -07:00

142 lines
3.7 KiB
C++

#include "Log.h"
#include "SubscriptionManager.h"
#include <algorithm>
namespace core
{
SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::add(Subscription &subscription)
{
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock();
return 1;
}
int SubscriptionManager::add(Subscription &subscription, std::string handler)
{
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription));
subscription.setHandler(handlers[handler]);
lock.unlock();
return 1;
}
int SubscriptionManager::addHandler(std::string name, SubscriptionHandler &handler)
{
lock.lock();
handlers.insert(std::make_pair(name, &handler));
lock.unlock();
return 1;
}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session)
{
int countSubscribed = 0;
int countPublished = 0;
lock.lock();
std::string temp = "";
for (auto [key, subscription] : subscriptions)
{
if (temp != "")
{
subscriptions.erase(temp);
temp = "";
}
countSubscribed += subscription->unsubscribe(session);
if (subscription->owner == &session)
{
temp = key;
delete subscription;
++countPublished;
}
}
if (temp != "")
{
subscriptions.erase(temp);
temp = "";
}
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
lock.unlock();
return countSubscribed;
}
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session)
{
if (request[0].equals("publish"))
{
if (request.getList().size() > 2)
Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str());
else
Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str(), request[3].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
}
return 1;
}
else if (request[0].equals("catalog"))
{
session.out << ":catalog:";
for (auto const &[key, subscription] : subscriptions)
{
session.out << subscription->id << ";";
}
session.out << std::endl;
return 1;
}
auto subscription = subscriptions[request[1].str()];
if (request[1].equals(subscription->id))
{
if (request[0].equals("unpublish"))
{
subscriptions.erase(request[1].str());
}
else if (request[0].equals("subscribe"))
{
subscription->subscribe(session);
return 1;
}
else if (request[0].equals("unsubscribe"))
{
subscription->unsubscribe(session);
return 1;
}
else if (request[0].equals("event"))
{
std::stringstream out;
subscription->process(request, out, session);
if (subscription->mode == "*ANYONE")
{
subscription->event(out);
return 1;
}
else if (subscription->mode == "*SUBSCRIBERS")
{
if (subscription->ifSubscriber(session))
{
subscription->event(out);
return 1;
}
}
else if (subscription->mode == "*AUTHOR")
{
if (subscription->owner == &session)
{
subscription->event(out);
return 1;
}
}
}
return 0;
}
return 0;
}
}