Convert Subscription and SubscriptionManager to MString.

This commit is contained in:
Brad Arant 2023-10-26 18:28:51 -07:00
parent 3a75e920d9
commit 57f47fdecb
8 changed files with 100 additions and 179 deletions

View File

@ -5,74 +5,58 @@
#include "TCPSocket.h" #include "TCPSocket.h"
#include <algorithm> #include <algorithm>
namespace core namespace core {
{
Subscription::Subscription(std::string id, std::string mode) Subscription::Subscription(coreutils::MString id, coreutils::MString mode)
: id(id), mode(mode), owner(""), handler(NULL) {} : id(id), mode(mode), owner(""), handler(NULL) {}
Subscription::Subscription(std::string id, coreutils::MString alias, std::string mode) Subscription::Subscription(coreutils::MString id, coreutils::MString owner, coreutils::MString mode)
: id(id), mode(mode), owner(alias), handler(NULL) {} : id(id), mode(mode), owner(owner), handler(NULL) {}
Subscription::Subscription(std::string id, coreutils::MString alias, std::string ownership, std::string mode, SubscriptionHandler *handler, std::string will, std::string timer)
: id(id), ownership(ownership), mode(mode), owner(alias), handler(handler), will(will), timer(timer)
{
Subscription::Subscription(coreutils::MString id, coreutils::MString owner, coreutils::MString ownership, coreutils::MString mode, SubscriptionHandler *handler, coreutils::MString will, coreutils::MString timer)
: id(id), ownership(ownership), mode(mode), owner(owner), handler(handler), will(will), timer(timer) {
coreutils::Log(coreutils::LOG_DEBUG_3) << owner; coreutils::Log(coreutils::LOG_DEBUG_3) << owner;
coreutils::Log(coreutils::LOG_DEBUG_3) << will; coreutils::Log(coreutils::LOG_DEBUG_3) << will;
subscriptionOwner(owner, will); subscriptionOwner(owner, will);
} }
Subscription::~Subscription() Subscription::~Subscription() {
{
std::stringstream out; std::stringstream out;
out << "cancel:" << id << std::endl; out << "cancel:" << id << std::endl;
for (auto subscriber : subscribers) for (auto subscriber : subscribers) {
{
subscriber->write(out.str()); subscriber->write(out.str());
} }
} }
int Subscription::subscribe(TCPSession &session) int Subscription::subscribe(TCPSession &session) {
{
if (handler) if (handler)
handler->onSubscribe(session, this); handler->onSubscribe(session, this);
else else
onSubscribe(session); onSubscribe(session);
subscribers.push_back(&session); subscribers.push_back(&session);
return 1; return 1;
} }
int Subscription::unsubscribe(TCPSession &session) int Subscription::unsubscribe(TCPSession &session) {
{
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{ if (*subscriber == &session) {
if (*subscriber == &session) subscribers.erase(subscriber++);
{ return 1;
subscribers.erase(subscriber++); }
return 1;
}
}
return 0; return 0;
} }
bool Subscription::subscriptionOwner(coreutils::MString alias, std::string ownerSelection)
{ bool Subscription::subscriptionOwner(coreutils::MString alias, coreutils::MString ownerSelection) {
subscriptionOwners = alias; subscriptionOwners = alias;
selectionOfOwnerForSubscription = ownerSelection; selectionOfOwnerForSubscription = ownerSelection;
coreutils::Log(coreutils::LOG_DEBUG_1) << "Owner of Subscription: " << subscriptionOwners; coreutils::Log(coreutils::LOG_DEBUG_1) << "Owner of Subscription: " << subscriptionOwners;
coreutils::Log(coreutils::LOG_DEBUG_1) << "Will of Subscription: " << selectionOfOwnerForSubscription; coreutils::Log(coreutils::LOG_DEBUG_1) << "Will of Subscription: " << selectionOfOwnerForSubscription;
return true; return true;
} }
int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) {
{
if (handler) if (handler)
handler->process(request, out, session, this); handler->process(request, out, session, this);
else else
@ -80,38 +64,28 @@ namespace core
return 1; return 1;
} }
int Subscription::event(std::stringstream &out) int Subscription::event(std::stringstream &out) {
{
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{ (*subscriber)->write(out.str());
(*subscriber)->write(out.str());
}
return 1; return 1;
} }
bool Subscription::ifSubscriber(TCPSession &session) bool Subscription::ifSubscriber(TCPSession &session) {
{
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
} }
int Subscription::onSubscribe(TCPSession &session) int Subscription::onSubscribe(TCPSession &session) {
{
return 0; return 0;
} }
bool Subscription::subInvite(TCPSession &session) bool Subscription::subInvite(TCPSession &session) {
{
return 0; return 0;
} }
void Subscription::sendToAll(std::stringstream &data, TCPSession &sender) void Subscription::sendToAll(std::stringstream &data, TCPSession &sender) {
{
for (auto session : subscribers) for (auto session : subscribers)
if (session != &sender) if (session != &sender)
session->write(data.str());
session->write(data.str());
data.str(""); data.str("");
} }

View File

@ -8,18 +8,16 @@
#include <string> #include <string>
#include <vector> #include <vector>
namespace core namespace core {
{
class TCPSession; class TCPSession;
class Subscription class Subscription {
{
public: public:
Subscription(std::string id, std::string mode = "*AUTHOR"); Subscription(coreutils::MString id, coreutils::MString mode = "*AUTHOR");
Subscription(std::string id, coreutils::MString alias, std::string mode); Subscription(coreutils::MString id, coreutils::MString owner, coreutils::MString mode);
Subscription(std::string id, coreutils::MString alias, std::string ownership, std::string mode, SubscriptionHandler *handler, std::string selection, std::string time); Subscription(coreutils::MString id, coreutils::MString owner, coreutils::MString ownership, coreutils::MString mode, SubscriptionHandler *handler, coreutils::MString selection, coreutils::MString time);
virtual ~Subscription(); virtual ~Subscription();
int subscribe(TCPSession &session); int subscribe(TCPSession &session);
@ -30,7 +28,7 @@ namespace core
virtual int onSubscribe(TCPSession &session); virtual int onSubscribe(TCPSession &session);
int event(std::stringstream &out); int event(std::stringstream &out);
bool subscriptionOwner(coreutils::MString alias, std::string selection); bool subscriptionOwner(coreutils::MString owner, coreutils::MString selection);
bool ifSubscriber(TCPSession &session); bool ifSubscriber(TCPSession &session);
bool subInvite(TCPSession &session); bool subInvite(TCPSession &session);
@ -38,17 +36,18 @@ namespace core
void sendToAll(std::stringstream &data, TCPSession &sender); void sendToAll(std::stringstream &data, TCPSession &sender);
void sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter); void sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter);
std::string id; coreutils::MString id;
std::string mode; coreutils::MString mode;
coreutils::MString owner; coreutils::MString owner;
coreutils::MString subscriptionOwners; coreutils::MString subscriptionOwners;
std::string selectionOfOwnerForSubscription; coreutils::MString selectionOfOwnerForSubscription;
std::string will; coreutils::MString will;
std::string ownership; coreutils::MString ownership;
std::string timer; coreutils::MString timer;
SubscriptionHandler *handler; SubscriptionHandler *handler;
std::vector<TCPSession *> subscribers; std::vector<TCPSession *> subscribers;
}; };
} }

View File

@ -5,76 +5,60 @@
#include "TCPServer.h" #include "TCPServer.h"
#include <algorithm> #include <algorithm>
namespace core namespace core {
{
int SubscriptionManager::add(Subscription &subscription) int SubscriptionManager::add(Subscription &subscription) {
{
lock.lock(); lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription)); subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock(); lock.unlock();
return 1; return 1;
} }
bool SubscriptionManager::onClearSubscription(std::string temp, std::string key)
{ bool SubscriptionManager::onClearSubscription(coreutils::MString temp, coreutils::MString key) {
// temp = key;
temp = ""; temp = "";
key = ""; key = "";
return true; return true;
} }
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
{
int countSubscribed = 0; int countSubscribed = 0;
int countPublished = 0; int countPublished = 0;
int count = 0; int count = 0;
lock.lock(); lock.lock();
std::string temp = ""; coreutils::MString temp = "";
for (auto [key, subscription] : subscriptions) for (auto [key, subscription] : subscriptions) {
{ if (temp != "") {
if (temp != "")
{
subscriptions.erase(temp); subscriptions.erase(temp);
temp = ""; temp = "";
} }
countSubscribed += subscription->unsubscribe(session); countSubscribed += subscription->unsubscribe(session);
if (subscription->subscriptionOwners == session.alias) if (subscription->subscriptionOwners == session.alias) {
{ if (subscription->selectionOfOwnerForSubscription == "*DIE") {
if (subscription->selectionOfOwnerForSubscription == "*DIE") //--> This happens if the owner of the subscription sets it, or if not enough people in Vector -->//
{
onClearSubscription(temp, key); onClearSubscription(temp, key);
} }
else if (subscription->will == "*NEXT") //--> This is the other option else if (subscription->will == "*NEXT") {
{ if (subscription->subscribers.size() < 1) {
if (subscription->subscribers.size() < 1)
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next"; coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next";
onClearSubscription(temp, key); onClearSubscription(temp, key);
} }
else else {
{ for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++) {
if ((*subscribed)->alias == session.alias) {
for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++)
{
if ((*subscribed)->alias == session.alias)
{
subscription->subscribers.erase(subscribed++); subscription->subscribers.erase(subscribed++);
} }
subscription->subscriptionOwner(session.alias, "*NEXT"); subscription->subscriptionOwner(session.alias, "*NEXT");
} }
} }
} }
else else {
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist"; coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist";
} }
++countPublished; ++countPublished;
} }
} }
if (temp != "") if (temp != "") {
{
subscriptions.erase(temp); subscriptions.erase(temp);
temp = ""; temp = "";
} }
@ -84,43 +68,32 @@ namespace core
return countSubscribed; return countSubscribed;
} }
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
{ if (request[0] == "publish") {
if (request[0].equals("publish"))
{
SubscriptionHandler *handler = NULL; SubscriptionHandler *handler = NULL;
if (request.getList().size() > 3) if (request.getList().size() > 3) {
{
handler = factory->getSubscriptionHandler(request[4].str()); handler = factory->getSubscriptionHandler(request[4].str());
} }
// coreutils::Log(coreutils::LOG_DEBUG_1) << request[1].str() << ":" << session.alias << ":" << request[2].str() << ":" << request[3].str() << ":" << request[4].str() << ":" << request[5].str() << ":" << request[6].str(); if (request[2] == "*OWNER") {
if (request[2].str() == "*OWNER")
{
newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str()); newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1; return 1;
} }
return 1; return 1;
} }
else if (request[0].equals("catalog")) else if (request[0] == "catalog") {
{
session.out << ":catalog:"; session.out << ":catalog:";
for (auto const &[key, subscription] : subscriptions) for (auto const &[key, subscription] : subscriptions) {
{
session.out << subscription->id << ";"; session.out << subscription->id << ";";
} }
session.out << std::endl; session.out << std::endl;
return 1; return 1;
} }
else if (request[0].equals("invite")) else if (request[0] == "invite") {
{
std::stringstream out; std::stringstream out;
coreutils::Log(coreutils::LOG_DEBUG_1) << request[2]; coreutils::Log(coreutils::LOG_DEBUG_1) << request[2];
invitee = (coreutils::MString *)&request[2]; invitee = (coreutils::MString *)&request[2];
TCPSession *tempSession = session.server.getSessionByAlias(invitee); TCPSession *tempSession = session.server.getSessionByAlias(invitee);
std::stringstream temp; std::stringstream temp;
temp << "invite:" << request[1] << ":" << invitee; temp << "invite:" << request[1] << ":" << invitee;
@ -130,36 +103,27 @@ namespace core
auto subscription = subscriptions[request[1].str()]; auto subscription = subscriptions[request[1].str()];
if (request[1].equals(subscription->id)) if (request[1].equals(subscription->id)) {
{ if (request[0] == "unpublish") {
if (request[0].equals("unpublish"))
{
subscriptions.erase(request[1].str()); subscriptions.erase(request[1].str());
} }
else if (request[0].equals("subscribe")) else if (request[0] == "subscribe") {
{
subscription->subscribe(session); subscription->subscribe(session);
return 1; return 1;
} }
else if (request[0].equals("unsubscribe")) else if (request[0] == "unsubscribe") {
{
subscription->unsubscribe(session); subscription->unsubscribe(session);
return 1; return 1;
} }
else if (request[0].equals("event")) else if (request[0] == "event") {
{
std::stringstream out; std::stringstream out;
subscription->process(request, out, session); subscription->process(request, out, session);
if (subscription->mode == "*ANYONE") if (subscription->mode == "*ANYONE") {
{
subscription->event(out); subscription->event(out);
return 1; return 1;
} }
else if (subscription->mode == "*SUBSCRIBERS") else if (subscription->mode == "*SUBSCRIBERS") {
{ if (subscription->ifSubscriber(session)) {
if (subscription->ifSubscriber(session))
{
subscription->event(out); subscription->event(out);
return 1; return 1;
} }

View File

@ -8,11 +8,9 @@
#include "SubscriptionHandler.h" #include "SubscriptionHandler.h"
#include "TCPSession.h" #include "TCPSession.h"
#include "ZString.h" #include "ZString.h"
#include <string>
#include <vector> #include <vector>
namespace core namespace core {
{
//*AUTHOR -> Dies when player disconnects //*AUTHOR -> Dies when player disconnects
//*ANYONE -> Does not die when player disconnects //*ANYONE -> Does not die when player disconnects
@ -21,22 +19,22 @@ namespace core
class SubscriptionHandlerFactory; class SubscriptionHandlerFactory;
class SubscriptionManager : public Command class SubscriptionManager : public Command {
{
public: public:
int add(Subscription &subscription); int add(Subscription &subscription);
int removeSessionSubscriptions(TCPSession &session); int removeSessionSubscriptions(TCPSession &session);
int processCommand(coreutils::ZString &request, TCPSession &session) override; int processCommand(coreutils::ZString &request, TCPSession &session) override;
bool onClearSubscription(std::string temp, std::string key); bool onClearSubscription(coreutils::MString temp, coreutils::MString key);
SubscriptionHandlerFactory *factory = NULL; SubscriptionHandlerFactory *factory = NULL;
private: private:
Subscription *subscription; Subscription *subscription;
std::map<std::string, Subscription *> subscriptions; std::map<coreutils::MString, Subscription *> subscriptions;
Subscription *newSubscription; Subscription *newSubscription;
std::mutex lock; std::mutex lock;
coreutils::MString *invitee; coreutils::MString invitee;
}; };
} }

View File

@ -140,12 +140,10 @@ namespace core
data.str(""); data.str("");
} }
TCPSession *TCPServer::getSessionByAlias(coreutils::MString *alias) TCPSession *TCPServer::getSessionByAlias(coreutils::MString alias) {
{
coreutils::Log(coreutils::LOG_DEBUG_1) << alias;
for (auto session : sessions) for (auto session : sessions)
if (session->compareAlias(*alias)) if (session->alias == alias)
return session; return session;
return NULL; return NULL;
} }

View File

@ -122,7 +122,7 @@ namespace core
/// of the alias pointer. /// of the alias pointer.
/// ///
TCPSession *getSessionByAlias(coreutils::MString *alias); TCPSession *getSessionByAlias(coreutils::MString alias);
protected: protected:
/// ///

View File

@ -25,43 +25,32 @@ namespace core
data << "|" << ipAddress.getClientAddressAndPort(); data << "|" << ipAddress.getClientAddressAndPort();
} }
void TCPSession::protocol(coreutils::ZString &data) void TCPSession::protocol(coreutils::ZString &data) {
{
if (data.getLength() != 0) if (data.getLength() != 0)
{ if (!server.commands.processRequest(data, *this))
if (!server.commands.processRequest(data, *this)) coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
}
}
} }
bool TCPSession::compareAlias(coreutils::MString &alias) // bool TCPSession::compareAlias(coreutils::MString alias) {
{ // return this->alias == alias;
return this->alias == alias; // }
}
void TCPSession::onRegistered() void TCPSession::onRegistered() {
{
onConnected(); onConnected();
send(); send();
if (term) if (term)
shutdown("termination requested"); shutdown("termination requested");
} }
void TCPSession::onConnected() {} void TCPSession::onConnected() {}
void TCPSession::onDataReceived(coreutils::ZString &data) void TCPSession::onDataReceived(coreutils::ZString &data) {
{ if (data.getLength() > 0) {
if (data.getLength() > 0)
{
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength()); memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength());
lineBufferSize += data.getLength(); lineBufferSize += data.getLength();
while (lineBufferSize > 0) while (lineBufferSize > 0) {
{ if (blockSize == 0) {
if (blockSize == 0)
{
lineLength = strcspn(lineBuffer, "\r\n"); lineLength = strcspn(lineBuffer, "\r\n");
if (lineLength == lineBufferSize) if (lineLength == lineBufferSize)
break; break;
@ -76,8 +65,7 @@ namespace core
memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
} }
else if (lineBufferSize >= blockLength) else if (lineBufferSize >= blockLength) {
{
coreutils::ZString zBlock(lineBuffer, blockLength); coreutils::ZString zBlock(lineBuffer, blockLength);
onBlockReceived(zBlock); onBlockReceived(zBlock);
lineBufferSize -= blockLength; lineBufferSize -= blockLength;

View File

@ -89,7 +89,7 @@ namespace core
/// ///
/// ///
bool compareAlias(coreutils::MString &alias); // bool compareAlias(coreutils::MString alias);
protected: protected:
/// ///