173 lines
5.4 KiB
C++
173 lines
5.4 KiB
C++
#include "SubscriptionManager.h"
|
|
#include "Log.h"
|
|
#include "Subscription.h"
|
|
#include "SubscriptionHandlerFactory.h"
|
|
#include "TCPServer.h"
|
|
#include <algorithm>
|
|
|
|
namespace core
|
|
{
|
|
|
|
int SubscriptionManager::add(Subscription &subscription)
|
|
{
|
|
lock.lock();
|
|
subscriptions.insert(std::make_pair(subscription.id, &subscription));
|
|
lock.unlock();
|
|
return 1;
|
|
}
|
|
bool SubscriptionManager::onClearSubscription(std::string temp, std::string key)
|
|
{
|
|
// temp = key;
|
|
temp = "";
|
|
key = "";
|
|
return true;
|
|
}
|
|
|
|
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session)
|
|
{
|
|
int countSubscribed = 0;
|
|
int countPublished = 0;
|
|
int count = 0;
|
|
|
|
lock.lock();
|
|
std::string temp = "";
|
|
for (auto [key, subscription] : subscriptions)
|
|
{
|
|
if (temp != "")
|
|
{
|
|
subscriptions.erase(temp);
|
|
temp = "";
|
|
}
|
|
countSubscribed += subscription->unsubscribe(session);
|
|
if (subscription->subscriptionOwners == session.alias)
|
|
{
|
|
if (subscription->selectionOfOwnerForSubscription == "*DIE") //--> This happens if the owner of the subscription sets it, or if not enough people in Vector -->//
|
|
{
|
|
onClearSubscription(temp, key);
|
|
}
|
|
|
|
else if (subscription->will == "*NEXT") //--> This is the other option
|
|
{
|
|
if (subscription->subscribers.size() < 1)
|
|
{
|
|
coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next";
|
|
onClearSubscription(temp, key);
|
|
}
|
|
else
|
|
{
|
|
|
|
for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++)
|
|
{
|
|
if ((*subscribed)->alias == session.alias)
|
|
{
|
|
subscription->subscribers.erase(subscribed++);
|
|
}
|
|
subscription->subscriptionOwner(session.alias, "*NEXT");
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist";
|
|
}
|
|
++countPublished;
|
|
}
|
|
}
|
|
if (temp != "")
|
|
{
|
|
subscriptions.erase(temp);
|
|
temp = "";
|
|
}
|
|
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
|
|
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
|
|
lock.unlock();
|
|
return countSubscribed;
|
|
}
|
|
|
|
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session)
|
|
{
|
|
if (request[0].equals("publish"))
|
|
{
|
|
SubscriptionHandler *handler = NULL;
|
|
if (request.getList().size() > 3)
|
|
{
|
|
handler = factory->getSubscriptionHandler(request[4].str());
|
|
}
|
|
// coreutils::Log(coreutils::LOG_DEBUG_1) << request[1].str() << ":" << session.alias << ":" << request[2].str() << ":" << request[3].str() << ":" << request[4].str() << ":" << request[5].str() << ":" << request[6].str();
|
|
if (request[2].str() == "*OWNER")
|
|
{
|
|
|
|
newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str());
|
|
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
|
|
|
|
return 1;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
else if (request[0].equals("catalog"))
|
|
{
|
|
session.out << ":catalog:";
|
|
for (auto const &[key, subscription] : subscriptions)
|
|
{
|
|
session.out << subscription->id << ";";
|
|
}
|
|
session.out << std::endl;
|
|
return 1;
|
|
}
|
|
else if (request[0].equals("invite"))
|
|
{
|
|
std::stringstream out;
|
|
coreutils::Log(coreutils::LOG_DEBUG_1) << request[2];
|
|
invitee = (coreutils::MString *)&request[2];
|
|
|
|
TCPSession *tempSession = session.server.getSessionByAlias(invitee);
|
|
std::stringstream temp;
|
|
temp << "invite:" << request[1] << ":" << invitee;
|
|
tempSession->write(temp.str());
|
|
return 1;
|
|
}
|
|
|
|
auto subscription = subscriptions[request[1].str()];
|
|
|
|
if (request[1].equals(subscription->id))
|
|
{
|
|
if (request[0].equals("unpublish"))
|
|
{
|
|
subscriptions.erase(request[1].str());
|
|
}
|
|
else if (request[0].equals("subscribe"))
|
|
{
|
|
|
|
subscription->subscribe(session);
|
|
return 1;
|
|
}
|
|
else if (request[0].equals("unsubscribe"))
|
|
{
|
|
subscription->unsubscribe(session);
|
|
return 1;
|
|
}
|
|
else if (request[0].equals("event"))
|
|
{
|
|
std::stringstream out;
|
|
subscription->process(request, out, session);
|
|
if (subscription->mode == "*ANYONE")
|
|
{
|
|
subscription->event(out);
|
|
return 1;
|
|
}
|
|
else if (subscription->mode == "*SUBSCRIBERS")
|
|
{
|
|
if (subscription->ifSubscriber(session))
|
|
{
|
|
subscription->event(out);
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
return 0;
|
|
}
|
|
}
|