*NEXT/*DIE

This commit is contained in:
Brad 2023-06-19 23:22:46 +00:00
parent 48be555724
commit 9a39b6224c
14 changed files with 352 additions and 209 deletions

View File

@ -1,14 +1,17 @@
#include "Command.h"
#include "Log.h"
#include "CommandList.h"
#include "Log.h"
namespace core {
namespace core
{
int Command::processCommand(coreutils::ZString &request, TCPSession &session) {
int Command::processCommand(coreutils::ZString &request, TCPSession &session)
{
return 0;
}
void Command::output(std::stringstream &out) {
void Command::output(std::stringstream &out)
{
out << "Write your own command description here for the help system." << std::endl;
}

View File

@ -1,12 +1,13 @@
#ifndef __Command_h__
#define __Command_h__
#include "includes"
#include "Object.h"
#include "TCPSession.h"
#include "ZString.h"
#include "includes"
namespace core {
namespace core
{
class CommandList;
@ -19,10 +20,10 @@ namespace core {
/// a list of functions that can be invoked as a result of processing a request.
///
class Command {
public:
class Command
{
public:
///
/// This method is used to implement the functionality of the requested command.
/// This pure virtual function must be implemented in your inheriting object.
@ -43,7 +44,6 @@ namespace core {
///
virtual void output(std::stringstream &out);
};
}

View File

@ -1,42 +1,56 @@
#include "Subscription.h"
#include "Log.h"
#include "TCPSession.h"
#include "TCPSocket.h"
#include <algorithm>
namespace core {
namespace core
{
Subscription::Subscription(std::string id, std::string mode)
: id(id), mode(mode), owner(NULL), handler(NULL) {}
: id(id), mode(mode), owner(NULL), handler(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session, std::string mode)
: id(id), mode(mode), owner(&session), handler(NULL) {}
Subscription::Subscription(std::string id, coreutils::MString *alias, std::string mode)
: id(id), mode(mode), owner(alias), handler(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session, std::string mode, SubscriptionHandler *handler)
: id(id), mode(mode), owner(&session), handler(handler) {
// coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription '" << id << "' with handler '" << handler->name << "'";
}
Subscription::Subscription(std::string id, coreutils::MString *alias, std::string ownership, std::string mode, SubscriptionHandler *handler, std::string will, std::string timer)
: id(id), ownership(ownership), mode(mode), owner(alias), handler(handler), will(will), timer(timer)
{
// coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription '" << id << "' with handler '" << handler->name << "'";
}
Subscription::~Subscription()
{
Subscription::~Subscription() {
std::stringstream out;
out << "cancel:" << id << std::endl;
for (auto subscriber : subscribers) {
for (auto subscriber : subscribers)
{
subscriber->write(out.str());
}
}
int Subscription::subscribe(TCPSession &session) {
int Subscription::subscribe(TCPSession &session)
{
if (handler)
handler->onSubscribe(session, this);
handler->onSubscribe(session, this);
else
onSubscribe(session);
onSubscribe(session);
subscribers.push_back(&session);
return 1;
}
int Subscription::unsubscribe(TCPSession &session) {
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) {
if (*subscriber == &session) {
int Subscription::unsubscribe(TCPSession &session)
{
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{
if (*subscriber == &session)
{
subscribers.erase(subscriber++);
return 1;
}
@ -44,7 +58,24 @@ namespace core {
return 0;
}
int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) {
bool Subscription::subscriptionOwner(coreutils::MString *alias, std::string ownerSelection)
{
if (owner != NULL)
{
owner = alias;
will == ownerSelection;
coreutils::Log(coreutils::LOG_DEBUG_1) << "Owner of Subscription: " << *(std::string *)owner;
}
else
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "There is no owner to this subscription";
}
return true;
}
int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session)
{
if (handler)
handler->process(request, out, session, this);
else
@ -52,30 +83,39 @@ namespace core {
return 1;
}
int Subscription::event(std::stringstream &out) {
int Subscription::event(std::stringstream &out)
{
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{
(*subscriber)->write(out.str());
}
return 1;
}
bool Subscription::ifSubscriber(TCPSession &session) {
bool Subscription::ifSubscriber(TCPSession &session)
{
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
}
int Subscription::onSubscribe(TCPSession &session) {
int Subscription::onSubscribe(TCPSession &session)
{
return 0;
}
bool Subscription::subInvite(TCPSession &session) {
bool Subscription::subInvite(TCPSession &session)
{
return 0;
}
void Subscription::sendToAll(std::stringstream &data, TCPSession &sender) {
void Subscription::sendToAll(std::stringstream &data, TCPSession &sender)
{
for (auto session : subscribers)
if (session != &sender)
session->write(data.str());
if (session != &sender)
session->write(data.str());
data.str("");
}
}

View File

@ -1,8 +1,9 @@
#ifndef __Subscription_h__
#define __Subscription_h__
#include "SubscriptionHandler.h"
#include "MString.h"
#include "SessionFilter.h"
#include "SubscriptionHandler.h"
#include "ZString.h"
#include <string>
#include <vector>
@ -12,12 +13,13 @@ namespace core
class TCPSession;
class Subscription {
class Subscription
{
public:
public:
Subscription(std::string id, std::string mode = "*AUTHOR");
Subscription(std::string id, TCPSession &session, std::string mode);
Subscription(std::string id, TCPSession &session, std::string mode, SubscriptionHandler *handler);
Subscription(std::string id, coreutils::MString *alias, std::string mode);
Subscription(std::string id, coreutils::MString *alias, std::string ownership, std::string mode, SubscriptionHandler *handler, std::string selection, std::string time);
virtual ~Subscription();
int subscribe(TCPSession &session);
@ -28,7 +30,7 @@ namespace core
virtual int onSubscribe(TCPSession &session);
int event(std::stringstream &out);
bool subscriptionOwner(coreutils::MString *alias, std::string selection);
bool ifSubscriber(TCPSession &session);
bool subInvite(TCPSession &session);
@ -38,8 +40,11 @@ namespace core
std::string id;
std::string mode;
TCPSession *owner;
coreutils::MString *owner;
std::string will;
std::string ownership;
std::string timer;
SubscriptionHandler *handler;
std::vector<TCPSession *> subscribers;

View File

@ -1,22 +1,35 @@
#include "SubscriptionManager.h"
#include "Log.h"
#include "Subscription.h"
#include "SubscriptionHandlerFactory.h"
#include "TCPServer.h"
#include <algorithm>
#include "SubscriptionHandlerFactory.h"
namespace core {
namespace core
{
int SubscriptionManager::add(Subscription &subscription) {
int SubscriptionManager::add(Subscription &subscription)
{
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock();
return 1;
}
bool SubscriptionManager::onClearSubscription(std::string temp, std::string key)
{
temp = key;
temp = "";
key = "";
// delete subscription;
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
return true;
}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session)
{
int countSubscribed = 0;
int countPublished = 0;
int count = 0;
lock.lock();
std::string temp = "";
@ -28,10 +41,37 @@ namespace core {
temp = "";
}
countSubscribed += subscription->unsubscribe(session);
if (subscription->owner == &session)
if (subscription->owner == session.alias)
{
temp = key;
delete subscription;
if (subscription->will == "*DIE") //--> This happens if the owner of the subscription sets it, or if not enough people in Vector -->//
{
onClearSubscription(temp, key);
}
else if (subscription->will == "*NEXT") //--> This is the other option
{
if (subscription->subscribers.size() < 1)
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "There is not enough people to move to the next";
onClearSubscription(temp, key);
}
else
{
for (auto subscribed = subscription->subscribers.begin(); subscribed < subscription->subscribers.end(); subscribed++)
{
if (*subscribed == &session)
{
subscription->subscribers.erase(subscribed++);
}
subscription->subscriptionOwner((*subscribed)->alias, "*NEXT");
}
}
}
else
{
coreutils::Log(coreutils::LOG_DEBUG_1) << "Subscription doesn't exist";
}
++countPublished;
}
}
@ -46,58 +86,81 @@ namespace core {
return countSubscribed;
}
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
if (request[0].equals("publish")) {
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session)
{
if (request[0].equals("publish"))
{
SubscriptionHandler *handler = NULL;
if(request.getList().size() > 3) {
factory->getSubscriptionHandler(request[3].str());
if (request.getList().size() > 3)
{
handler = factory->getSubscriptionHandler(request[4].str());
}
coreutils::Log(coreutils::LOG_DEBUG_1) << request[1].str() << ":" << request[2].str();
if (request[2].str() == "*OWNER")
{
newSubscription = new Subscription(request[1].str(), session.alias, request[2].str(), request[3].str(), handler, request[5].str(), request[6].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
subscription->subscriptionOwner(session.alias, request[5].str());
return 1;
}
newSubscription = new Subscription(request[1].str(), session, request[2].str(), handler);
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
newSubscription->owner = &session;
return 1;
} else if (request[0].equals("catalog")) {
}
else if (request[0].equals("catalog"))
{
session.out << ":catalog:";
for (auto const &[key, subscription] : subscriptions) {
for (auto const &[key, subscription] : subscriptions)
{
session.out << subscription->id << ";";
}
session.out << std::endl;
return 1;
} else if (request[0].equals("invite")) {
}
else if (request[0].equals("invite"))
{
std::stringstream out;
coreutils::Log(coreutils::LOG_DEBUG_1) << request[2];
std::string invitee = request[2].str();
TCPSession *tempSession = session.server.getSessionByAlias(&invitee);
std::stringstream temp;
temp << "invite:" << request[1] << ":" << *(std::string *)session.alias;
tempSession->write(temp.str());
// invitee = (coreutils::MString *)&request[2];
// TCPSession *tempSession = session.server.getSessionByAlias(invitee);
// std::stringstream temp;
// temp << "invite:" << request[1] << ":" << invitee;
// tempSession->write(temp.str());
return 1;
}
auto subscription = subscriptions[request[1].str()];
if (request[1].equals(subscription->id)) {
if (request[0].equals("unpublish")) {
if (request[1].equals(subscription->id))
{
if (request[0].equals("unpublish"))
{
subscriptions.erase(request[1].str());
} else if (request[0].equals("subscribe")) {
}
else if (request[0].equals("subscribe"))
{
subscription->subscribe(session);
return 1;
} else if (request[0].equals("unsubscribe")) {
}
else if (request[0].equals("unsubscribe"))
{
subscription->unsubscribe(session);
return 1;
} else if (request[0].equals("event")) {
}
else if (request[0].equals("event"))
{
std::stringstream out;
subscription->process(request, out, session);
if (subscription->mode == "*ANYONE") {
if (subscription->mode == "*ANYONE")
{
subscription->event(out);
return 1;
} else if (subscription->mode == "*SUBSCRIBERS") {
if (subscription->ifSubscriber(session)) {
subscription->event(out);
return 1;
}
} else if (subscription->mode == "*AUTHOR") {
if (subscription->owner == &session) {
}
else if (subscription->mode == "*SUBSCRIBERS")
{
if (subscription->ifSubscriber(session))
{
subscription->event(out);
return 1;
}

View File

@ -2,6 +2,8 @@
#define __SubscriptionManager_h__
#include "Command.h"
#include "MString.h"
#include "SessionFilter.h"
#include "Subscription.h"
#include "SubscriptionHandler.h"
#include "TCPSession.h"
@ -9,26 +11,33 @@
#include <string>
#include <vector>
namespace core {
namespace core
{
class SubscriptionHandlerFactory;
//*AUTHOR -> Dies when player disconnects
//*ANYONE -> Does not die when player disconnects
//*SUBSCRIPTION -> Does not die when player disconnects
//*OWNER -> Does die and transfers Ownership before he does
class SubscriptionManager : public Command {
class SubscriptionHandlerFactory;
public:
class SubscriptionManager : public Command
{
int add(Subscription &subscription);
int removeSessionSubscriptions(TCPSession &session);
int processCommand(coreutils::ZString &request, TCPSession &session) override;
public:
int add(Subscription &subscription);
int removeSessionSubscriptions(TCPSession &session);
int processCommand(coreutils::ZString &request, TCPSession &session) override;
bool onClearSubscription(std::string temp, std::string key);
SubscriptionHandlerFactory *factory = NULL;
SubscriptionHandlerFactory *factory = NULL;
private:
Subscription *subscription;
std::map<std::string, Subscription *> subscriptions;
Subscription *newSubscription;
std::mutex lock;
};
private:
Subscription *subscription;
std::map<std::string, Subscription *> subscriptions;
Subscription *newSubscription;
std::mutex lock;
coreutils::MString *invitee;
};
}

View File

@ -8,7 +8,8 @@ namespace core
{
TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, int depth, std::string text)
: TCPSocket(ePoll, text), commands(delimiter, depth) {
: TCPSocket(ePoll, text), commands(delimiter, depth)
{
commands.add(subscriptions, "publish");
commands.add(subscriptions, "unpublish");
@ -35,7 +36,8 @@ namespace core
close(getDescriptor());
}
void TCPServer::onDataReceived(std::string data) {
void TCPServer::onDataReceived(std::string data)
{
lock.lock();
TCPSession *session = accept();
if (session)
@ -43,9 +45,11 @@ namespace core
lock.unlock();
}
TCPSession * TCPServer::accept() {
TCPSession *TCPServer::accept()
{
try {
try
{
TCPSession *session = getSocketAccept(ePoll);
session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength));
@ -72,30 +76,36 @@ namespace core
return NULL;
}
void TCPServer::removeFromSessionList(TCPSession *session) {
void TCPServer::removeFromSessionList(TCPSession *session)
{
std::vector<TCPSession *>::iterator cursor;
lock.lock();
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session) {
sessions.erase(cursor);
break;
}
for (cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if (*cursor == session)
{
sessions.erase(cursor);
break;
}
lock.unlock();
}
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) {
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out)
{
throw coreutils::Exception(errorString);
}
TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) {
TCPSession *TCPServer::getSocketAccept(EPoll &ePoll)
{
return new TCPSession(ePoll, *this);
}
void TCPServer::output(std::stringstream &out) {
void TCPServer::output(std::stringstream &out)
{
out << "Use the 'help' command to list the commands for this server." << std::endl;
}
int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) {
int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session)
{
int sequence = 0;
for (auto *sessionx : sessions)
{
@ -130,11 +140,11 @@ namespace core
data.str("");
}
TCPSession *TCPServer::getSessionByAlias(void *alias)
TCPSession *TCPServer::getSessionByAlias(coreutils::MString *alias)
{
coreutils::Log(coreutils::LOG_DEBUG_1) << alias;
for (auto session : sessions)
if (session->compareAlias(alias))
if (session->compareAlias(*alias))
return session;
return NULL;
}

View File

@ -122,7 +122,7 @@ namespace core
/// of the alias pointer.
///
TCPSession *getSessionByAlias(void *alias);
TCPSession *getSessionByAlias(coreutils::MString *alias);
protected:
///

View File

@ -11,9 +11,9 @@ namespace core
{
uuid_t uuidObject;
uuid_generate(uuidObject);
// std::string aaUuid = {uuidObject, uuidObject + 16};
// std::string aaUuid = {uuidObject, uuidObject + 16};
coreutils::Log(coreutils::LOG_DEBUG_1) << uuidObject;
alias = (void *)uuidObject;
alias = (coreutils::MString *)uuidObject;
}
TCPSession::~TCPSession()
@ -38,15 +38,13 @@ namespace core
}
}
bool TCPSession::compareAlias(void *alias) {
return this->alias = alias;
bool TCPSession::compareAlias(coreutils::MString &alias)
{
return this->alias = &alias;
}
void TCPSession::outputAlias(std::stringstream &out) {
out << alias;
}
void TCPSession::onRegistered() {
void TCPSession::onRegistered()
{
onConnected();
send();
if (term)
@ -55,13 +53,17 @@ namespace core
void TCPSession::onConnected() {}
void TCPSession::onDataReceived(coreutils::ZString &data) {
if (data.getLength() > 0) {
void TCPSession::onDataReceived(coreutils::ZString &data)
{
if (data.getLength() > 0)
{
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength());
lineBufferSize += data.getLength();
while (lineBufferSize > 0) {
if (blockSize == 0) {
while (lineBufferSize > 0)
{
if (blockSize == 0)
{
lineLength = strcspn(lineBuffer, "\r\n");
if (lineLength == lineBufferSize)
break;
@ -109,9 +111,10 @@ namespace core
shutdown("termination requested");
}
void TCPSession::send() {
if(out.tellp() > 0)
write(out.str());
void TCPSession::send()
{
if (out.tellp() > 0)
write(out.str());
out.str("");
}

View File

@ -1,9 +1,9 @@
#ifndef __Session_h__
#define __Session_h__
#include "MString.h"
#include "SessionFilter.h"
#include "TCPSocket.h"
namespace core
{
@ -81,15 +81,18 @@ namespace core
/// how it knows the contacts that this server manages.
///
void *alias;
coreutils::MString *alias;
///
///
///
///
///
///
virtual bool compareAlias(void *alias);
virtual void outputAlias(std::stringstream &out);
bool compareAlias(coreutils::MString &alias);
protected:
///

View File

@ -1,11 +1,12 @@
#ifndef __TCPSocket_h__
#define __TCPSocket_h__
#include "includes"
#include "Socket.h"
#include "IPAddress.h"
#include "Socket.h"
#include "includes"
namespace core {
namespace core
{
///
/// TCPSocket
@ -17,10 +18,10 @@ namespace core {
/// synchronous data connection.
///
class TCPSocket : public Socket {
public:
class TCPSocket : public Socket
{
public:
TCPSocket(EPoll &ePoll);
TCPSocket(EPoll &ePoll, std::string text);
virtual ~TCPSocket();
@ -37,7 +38,6 @@ namespace core {
///
virtual void output(std::stringstream &out);
};
}

View File

@ -1,16 +1,20 @@
#include "Timer.h"
namespace core {
namespace core
{
Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") {
Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer")
{
setDescriptor(timerfd_create(CLOCK_REALTIME, 0));
setTimer(delay);
}
Timer::~Timer() {
Timer::~Timer()
{
}
void Timer::setTimer(double delay) {
void Timer::setTimer(double delay)
{
double integer;
double fraction;
@ -27,10 +31,10 @@ namespace core {
timer.it_value.tv_nsec = (int)(fraction * 1000000000);
timerfd_settime(getDescriptor(), 0, &timer, NULL);
}
void Timer::clearTimer() {
void Timer::clearTimer()
{
struct itimerspec timer;
@ -42,19 +46,22 @@ namespace core {
timerfd_settime(getDescriptor(), 0, &timer, NULL);
}
double Timer::getElapsed() {
double Timer::getElapsed()
{
struct itimerspec timer;
timerfd_gettime(getDescriptor(), &timer);
double toTimeout = (double)((timer.it_value.tv_sec * 1000000000L) + timer.it_value.tv_nsec) / 1000000000L;
return delayValue - toTimeout;
}
void Timer::onDataReceived(std::string data) {
void Timer::onDataReceived(std::string data)
{
onTimeout();
}
double Timer::getEpoch() {
return (double)std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count() /1000000000L;
double Timer::getEpoch()
{
return (double)std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count() / 1000000000L;
}
}

86
Timer.h
View File

@ -1,65 +1,65 @@
#ifndef __Timer_h__
#define __Timer_h__
#include "Socket.h"
#include "EPoll.h"
#include "Socket.h"
namespace core {
namespace core
{
///
/// Timer
///
/// Set and trigger callback upon specified timeout.
///
/// The Timer is used to establish a timer using the timer socket
/// interface. It cannot be instantiated directly but must be extended.
///
///
/// Timer
///
/// Set and trigger callback upon specified timeout.
///
/// The Timer is used to establish a timer using the timer socket
/// interface. It cannot be instantiated directly but must be extended.
///
class Timer : Socket {
class Timer : Socket
{
public:
public:
Timer(EPoll &ePoll);
Timer(EPoll &ePoll, double delay);
~Timer();
Timer(EPoll &ePoll, double delay);
~Timer();
///
/// Use the setTimer() method to set the time out value for timer. Setting the timer
/// also starts the timer countdown. The clearTimer() method can be used to reset
/// the timer without triggering the onTimeout() callback.
///
/// @param delay the amount of time in seconds to wait before trigering the onTimeout function.
///
///
/// Use the setTimer() method to set the time out value for timer. Setting the timer
/// also starts the timer countdown. The clearTimer() method can be used to reset
/// the timer without triggering the onTimeout() callback.
///
/// @param delay the amount of time in seconds to wait before trigering the onTimeout function.
///
void setTimer(double delay);
void setTimer(double delay);
///
/// Use the clearTimer() to unset the timer and return the timer to an idle state.
///
///
/// Use the clearTimer() to unset the timer and return the timer to an idle state.
///
void clearTimer();
void clearTimer();
///
/// Use the getElapsed() method to obtain the amount of time that has elapsed since
/// the timer was set.
///
///
/// Use the getElapsed() method to obtain the amount of time that has elapsed since
/// the timer was set.
///
double getElapsed();
double getElapsed();
double getEpoch();
double getEpoch();
protected:
protected:
///
/// This method is called when the time out occurs.
///
///
/// This method is called when the time out occurs.
///
virtual void onTimeout() = 0;
virtual void onTimeout() = 0;
private:
void onDataReceived(std::string data) override;
double delayValue;
};
private:
void onDataReceived(std::string data) override;
double delayValue;
};
}

Binary file not shown.