Added subscription service capability.

This commit is contained in:
Brad Arant 2021-08-31 17:57:21 -07:00
parent 0a03b3553d
commit ec39c1df1a
16 changed files with 220 additions and 97 deletions

View File

@ -12,16 +12,4 @@ namespace core {
out << "Write your own command description here for the help system." << std::endl;
}
bool Command::check(coreutils::ZString &request) {
return request[0].equals(name);
}
void Command::setName(std::string name) {
this->name = name;
}
std::string Command::getName() {
return name;
}
}

View File

@ -23,22 +23,6 @@ namespace core {
public:
///
/// Implement check method to provide a special check rule upon the request to see
/// if the command should be processed.
///
/// The default rule is to verify that the first token in the request string matches
/// the name given on the registration of the command to the CommandList. This can
/// be overridden by implementing the check() method to perform the test and return
/// the condition of the command.
///
/// @param request The request passed to the parser to check the rule.
/// @return Return true to execute the command. Returning false will cause no action
/// on this command.
///
virtual bool check(coreutils::ZString &request);
///
/// This method is used to implement the functionality of the requested command.
/// This pure virtual function must be implemented in your inheriting object.
@ -60,22 +44,6 @@ namespace core {
virtual void output(std::stringstream &out);
///
/// Set the name of this command used in default rule checking during request parsing.
/// NOTE: You do not need to call this under normal conditions as adding a Command
/// to a CommandList using the add() method contains a parameter to pass the name
/// of the Command.
///
/// @param name Specify the name of this command for default parsing.
///
void setName(std::string name);
std::string getName();
private:
std::string name;
};
}

View File

@ -4,41 +4,41 @@
namespace core {
CommandList::CommandList(std::string delimiter) : delimiter(delimiter) {}
void CommandList::add(Command &command, std::string name) {
command.setName(name);
commands.push_back(&command);
commands.insert(std::make_pair(name, &command));
}
void CommandList::remove(Command &command) {}
bool CommandList::processRequest(coreutils::ZString &request, TCPSession &session) {
if(session.grab != NULL)
return session.grab->processCommand(request, session);
return session.grab->processCommand(request, session);
else {
if(request.equals(""))
return false;
request.split(delimiter);
for(auto *command : commands)
if(command->check(request))
return command->processCommand(request, session);
auto command = commands.find(request[0].str())->second;
return command->processCommand(request, session);
}
return false;
}
bool CommandList::grabInput(TCPSession &session, Command &command) {
session.grab = &command;
return true;
}
void CommandList::clearGrab(TCPSession &session) {
session.grab = NULL;
}
int CommandList::processCommand(coreutils::ZString &request, TCPSession &session) {
for(Command *command : commands)
session.out << command->getName() << std::endl;
// for(Command *command : commands)
// session.out << command->getName() << std::endl;
return true;
}
}

View File

@ -68,7 +68,7 @@ namespace core {
/// The vector of all registered commands.
///
std::vector<Command *> commands;
std::map<std::string, Command *> commands;
std::string delimiter;
};

View File

@ -167,7 +167,7 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << ".";
shutDown = true;
reset = false;
if(!needsToWrite())
// if(!needsToWrite())
delete this;
}

View File

@ -48,7 +48,7 @@ namespace core {
/// Destructor
///
~Socket();
virtual ~Socket();
///
/// Use the shutdown() method to terminate the socket connection and remove resources.

38
Subscription.cpp Normal file
View File

@ -0,0 +1,38 @@
#include "Subscription.h"
namespace core {
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {}
int Subscription::subscribe(TCPSession &session) {
subscribers.push_back(&session);
return 1;
}
int Subscription::unsubscribe(TCPSession &session) {
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) {
if(*subscriber == &session) {
subscribers.erase(subscriber);
return 1;
}
}
return 0;
}
int Subscription::process(coreutils::ZString &request, std::stringstream &out) {
out << "event:" << request[1] << ":" << request[2] << std::endl;
return 1;
}
int Subscription::event(std::stringstream &out) {
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
(*subscriber)->write(out.str());
return 1;
}
int Subscription::processCommand(coreutils::ZString &request, TCPSession &session) {
session.out << "Generic subscription passthrough";
return 1;
}
}

32
Subscription.h Normal file
View File

@ -0,0 +1,32 @@
#ifndef __Subscription_h__
#define __Subscription_h__
#include "TCPSession.h"
#include "Command.h"
#include "ZString.h"
#include <vector>
namespace core {
class Subscription : public Command {
public:
Subscription(std::string id, TCPSession &session);
int subscribe(TCPSession &session);
int unsubscribe(TCPSession &session);
virtual int process(coreutils::ZString &request, std::stringstream &out);
int event(std::stringstream &out);
int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id;
TCPSession *owner;
std::vector<TCPSession *> subscribers;
};
}
#endif

56
SubscriptionManager.cpp Normal file
View File

@ -0,0 +1,56 @@
#include "SubscriptionManager.h"
#include "Log.h"
namespace core {
SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int count = 0;
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
count += (*subscription)->unsubscribe(session);
}
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << count << " subscriptions.";
return count;
}
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
if(request[0].equals("publish")) {
subscriptions.push_back(new Subscription(request[1].str(), session));
return 1;
} else if(request[0].equals("catalog")) {
session.out << ":catalog:";
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
session.out << (*subscription)->id << "|";
(*subscription)->processCommand(request, session);
session.out << (";");
}
session.out << std::endl;
return 1;
}
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
if(request[1].equals((*subscription)->id)) {
if(request[0].equals("unpublish")) {
subscriptions.erase(subscription);
} else if(request[0].equals("subscribe")) {
(*subscription)->subscribe(session);
} else if(request[0].equals("unsubscribe")) {
(*subscription)->unsubscribe(session);
} else if(request[0].equals("event")) {
if((*subscription)->owner == &session) {
std::stringstream out;
(*subscription)->process(request, out);
(*subscription)->event(out);
}
else
return 0;
}
return 1;
}
}
return 0;
}
}

27
SubscriptionManager.h Normal file
View File

@ -0,0 +1,27 @@
#ifndef __SubscriptionManager_h__
#define __SubscriptionManager_h__
#include "TCPSession.h"
#include "Subscription.h"
#include "Command.h"
#include "ZString.h"
#include <vector>
namespace core {
class SubscriptionManager : public Command {
public:
SubscriptionManager();
int removeSessionSubscriptions(TCPSession &session);
int processCommand(coreutils::ZString &request, TCPSession &session) override;
private:
std::vector<Subscription *> subscriptions;
};
}
#endif

View File

@ -5,10 +5,17 @@
#include "Log.h"
namespace core {
TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, std::string text)
: TCPSocket(ePoll, text), commands(delimiter) {
commands.add(subscriptions, "publish");
commands.add(subscriptions, "unpublish");
commands.add(subscriptions, "subscribe");
commands.add(subscriptions, "unsubscribe");
commands.add(subscriptions, "catalog");
commands.add(subscriptions, "event");
setDescriptor(socket(AF_INET, SOCK_STREAM, 0));
int yes = 1;
setsockopt(getDescriptor(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
@ -17,24 +24,24 @@ namespace core {
if(listen(getDescriptor(), 20) < 0)
throw coreutils::Exception("Error on listen to socket");
}
TCPServer::~TCPServer() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Closing server socket " << getDescriptor() << ".";
close(getDescriptor());
}
void TCPServer::onDataReceived(std::string data) {
lock.lock();
TCPSession *session = accept();
if(session)
sessions.push_back(session);
sessions.push_back(session);
lock.unlock();
}
TCPSession * TCPServer::accept() {
try {
TCPSession *session = getSocketAccept(ePoll);
session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength));
// if(blackList && blackList->contains(session->ipAddress.getClientAddress())) {
@ -55,28 +62,28 @@ namespace core {
catch(...) {
coreutils::Log(coreutils::LOG_EXCEPT) << "Unnspecified error on session initialization.";
}
return NULL;
return NULL;
}
void TCPServer::removeFromSessionList(TCPSession *session) {
std::vector<TCPSession *>::iterator cursor;
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session)
sessions.erase(cursor);
}
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) {
throw coreutils::Exception(errorString);
}
TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) {
return new TCPSession(ePoll, *this);
}
void TCPServer::output(std::stringstream &out) {
out << "Use the 'help' command to list the commands for this server." << std::endl;
}
int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) {
int sequence = 0;
for(auto *sessionx : sessions) {
@ -86,20 +93,20 @@ namespace core {
}
return 1;
}
void TCPServer::sendToAll(std::stringstream &data) {
for(auto session : sessions)
session->write(data.str());
data.str("");
}
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) {
for(auto session : sessions)
if(session != &sender)
session->write(data.str());
data.str("");
}
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) {
for(auto session : sessions)
if(filter.test(*session))
@ -107,5 +114,5 @@ namespace core {
session->write(data.str());
data.str("");
}
}

View File

@ -6,6 +6,7 @@
#include "IPAddressList.h"
#include "Command.h"
#include "CommandList.h"
#include "SubscriptionManager.h"
namespace core {
@ -40,7 +41,7 @@ namespace core {
/// The destructor for this object.
///
~TCPServer();
virtual ~TCPServer();
virtual void sessionErrorHandler(std::string errorString, std::stringstream &out);
@ -135,6 +136,7 @@ namespace core {
TCPSession * accept();
std::mutex lock;
SubscriptionManager subscriptions;
};

View File

@ -9,6 +9,7 @@ namespace core {
TCPSession::~TCPSession() {
server.removeFromSessionList(this);
coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level.";
}
void TCPSession::output(std::stringstream &data) {
@ -16,22 +17,22 @@ namespace core {
}
void TCPSession::protocol(coreutils::ZString &data) {
if(!server.commands.processRequest(data, *this)) {
coreutils::Log(coreutils::LOG_DEBUG_1) << data.str();
}
if(!server.commands.processRequest(data, *this)) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
}
}
void TCPSession::onRegistered() {
onConnected();
coreutils::ZString blank("");
protocol(blank);
send();
if(term)
shutdown("termination requested");
shutdown("termination requested");
}
void TCPSession::onConnected() {}
void TCPSession::onDataReceived(coreutils::ZString &data) {
if(data.getLength() > 0) {
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
@ -63,32 +64,32 @@ namespace core {
}
}
}
void TCPSession::setBlockSize(int blockSize) {
this->blockSize = blockSize;
}
void TCPSession::onLineReceived(coreutils::ZString &line) {
protocol(line);
send();
if(term)
shutdown("termination requested");
}
void TCPSession::onBlockReceived(coreutils::ZString &block) {
coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]";
if(term)
shutdown("termination requested");
}
void TCPSession::send() {
if(out.tellp() > 0)
write(out.str());
out.str("");
}
void TCPSession::terminate() {
term = true;
}
}

View File

@ -35,7 +35,7 @@ namespace core {
///
///
~TCPSession();
virtual ~TCPSession();
Command *grab = NULL;

View File

@ -23,7 +23,7 @@ namespace core {
TCPSocket(EPoll &ePoll);
TCPSocket(EPoll &ePoll, std::string text);
~TCPSocket();
virtual ~TCPSocket();
void connect(IPAddress &address);

View File

@ -4,5 +4,9 @@
"path": "."
}
],
"settings": {}
"settings": {
"files.associations": {
"iosfwd": "c"
}
}
}