From ec39c1df1a81a53290b8cc9d8143325a0af2b87c Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Tue, 31 Aug 2021 17:57:21 -0700 Subject: [PATCH] Added subscription service capability. --- Command.cpp | 12 --------- Command.h | 32 ---------------------- CommandList.cpp | 30 ++++++++++----------- CommandList.h | 2 +- Socket.cpp | 2 +- Socket.h | 2 +- Subscription.cpp | 38 ++++++++++++++++++++++++++ Subscription.h | 32 ++++++++++++++++++++++ SubscriptionManager.cpp | 56 +++++++++++++++++++++++++++++++++++++++ SubscriptionManager.h | 27 +++++++++++++++++++ TCPServer.cpp | 43 +++++++++++++++++------------- TCPServer.h | 4 ++- TCPSession.cpp | 27 ++++++++++--------- TCPSession.h | 2 +- TCPSocket.h | 2 +- servercore.code-workspace | 6 ++++- 16 files changed, 220 insertions(+), 97 deletions(-) create mode 100644 Subscription.cpp create mode 100644 Subscription.h create mode 100644 SubscriptionManager.cpp create mode 100644 SubscriptionManager.h diff --git a/Command.cpp b/Command.cpp index 6505ffd..76b0c87 100644 --- a/Command.cpp +++ b/Command.cpp @@ -12,16 +12,4 @@ namespace core { out << "Write your own command description here for the help system." << std::endl; } - bool Command::check(coreutils::ZString &request) { - return request[0].equals(name); - } - - void Command::setName(std::string name) { - this->name = name; - } - - std::string Command::getName() { - return name; - } - } diff --git a/Command.h b/Command.h index 3b162b8..e2d1b7f 100644 --- a/Command.h +++ b/Command.h @@ -23,22 +23,6 @@ namespace core { public: - /// - /// Implement check method to provide a special check rule upon the request to see - /// if the command should be processed. - /// - /// The default rule is to verify that the first token in the request string matches - /// the name given on the registration of the command to the CommandList. This can - /// be overridden by implementing the check() method to perform the test and return - /// the condition of the command. - /// - /// @param request The request passed to the parser to check the rule. - /// @return Return true to execute the command. Returning false will cause no action - /// on this command. - /// - - virtual bool check(coreutils::ZString &request); - /// /// This method is used to implement the functionality of the requested command. /// This pure virtual function must be implemented in your inheriting object. @@ -60,22 +44,6 @@ namespace core { virtual void output(std::stringstream &out); - /// - /// Set the name of this command used in default rule checking during request parsing. - /// NOTE: You do not need to call this under normal conditions as adding a Command - /// to a CommandList using the add() method contains a parameter to pass the name - /// of the Command. - /// - /// @param name Specify the name of this command for default parsing. - /// - - void setName(std::string name); - - std::string getName(); - - private: - std::string name; - }; } diff --git a/CommandList.cpp b/CommandList.cpp index 2ac92cc..9c22744 100644 --- a/CommandList.cpp +++ b/CommandList.cpp @@ -4,41 +4,41 @@ namespace core { CommandList::CommandList(std::string delimiter) : delimiter(delimiter) {} - + void CommandList::add(Command &command, std::string name) { - command.setName(name); - commands.push_back(&command); + commands.insert(std::make_pair(name, &command)); } - + void CommandList::remove(Command &command) {} - + bool CommandList::processRequest(coreutils::ZString &request, TCPSession &session) { if(session.grab != NULL) - return session.grab->processCommand(request, session); + return session.grab->processCommand(request, session); else { + if(request.equals("")) + return false; request.split(delimiter); - for(auto *command : commands) - if(command->check(request)) - return command->processCommand(request, session); + auto command = commands.find(request[0].str())->second; + return command->processCommand(request, session); } return false; } - + bool CommandList::grabInput(TCPSession &session, Command &command) { session.grab = &command; return true; } - + void CommandList::clearGrab(TCPSession &session) { session.grab = NULL; } - + int CommandList::processCommand(coreutils::ZString &request, TCPSession &session) { - for(Command *command : commands) - session.out << command->getName() << std::endl; +// for(Command *command : commands) +// session.out << command->getName() << std::endl; return true; } - + } diff --git a/CommandList.h b/CommandList.h index 9e506ef..061f84f 100644 --- a/CommandList.h +++ b/CommandList.h @@ -68,7 +68,7 @@ namespace core { /// The vector of all registered commands. /// - std::vector commands; + std::map commands; std::string delimiter; }; diff --git a/Socket.cpp b/Socket.cpp index 5173b7c..0e6f22c 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -167,7 +167,7 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; - if(!needsToWrite()) +// if(!needsToWrite()) delete this; } diff --git a/Socket.h b/Socket.h index f440110..cdfdb81 100644 --- a/Socket.h +++ b/Socket.h @@ -48,7 +48,7 @@ namespace core { /// Destructor /// - ~Socket(); + virtual ~Socket(); /// /// Use the shutdown() method to terminate the socket connection and remove resources. diff --git a/Subscription.cpp b/Subscription.cpp new file mode 100644 index 0000000..2d8201f --- /dev/null +++ b/Subscription.cpp @@ -0,0 +1,38 @@ +#include "Subscription.h" + +namespace core { + + Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {} + + int Subscription::subscribe(TCPSession &session) { + subscribers.push_back(&session); + return 1; + } + + int Subscription::unsubscribe(TCPSession &session) { + for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { + if(*subscriber == &session) { + subscribers.erase(subscriber); + return 1; + } + } + return 0; + } + + int Subscription::process(coreutils::ZString &request, std::stringstream &out) { + out << "event:" << request[1] << ":" << request[2] << std::endl; + return 1; + } + + int Subscription::event(std::stringstream &out) { + for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) + (*subscriber)->write(out.str()); + return 1; + } + + int Subscription::processCommand(coreutils::ZString &request, TCPSession &session) { + session.out << "Generic subscription passthrough"; + return 1; + } + +} diff --git a/Subscription.h b/Subscription.h new file mode 100644 index 0000000..1d2625d --- /dev/null +++ b/Subscription.h @@ -0,0 +1,32 @@ +#ifndef __Subscription_h__ +#define __Subscription_h__ + +#include "TCPSession.h" +#include "Command.h" +#include "ZString.h" +#include + +namespace core { + + class Subscription : public Command { + + public: + Subscription(std::string id, TCPSession &session); + + int subscribe(TCPSession &session); + int unsubscribe(TCPSession &session); + virtual int process(coreutils::ZString &request, std::stringstream &out); + + int event(std::stringstream &out); + + int processCommand(coreutils::ZString &request, TCPSession &session) override; + + std::string id; + TCPSession *owner; + + std::vector subscribers; + + }; +} + +#endif diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp new file mode 100644 index 0000000..56881b9 --- /dev/null +++ b/SubscriptionManager.cpp @@ -0,0 +1,56 @@ +#include "SubscriptionManager.h" +#include "Log.h" + +namespace core { + + SubscriptionManager::SubscriptionManager() {} + + int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { + int count = 0; + for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { + count += (*subscription)->unsubscribe(session); + } + coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << count << " subscriptions."; + return count; + } + + int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { + + if(request[0].equals("publish")) { + subscriptions.push_back(new Subscription(request[1].str(), session)); + return 1; + } else if(request[0].equals("catalog")) { + session.out << ":catalog:"; + for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { + session.out << (*subscription)->id << "|"; + (*subscription)->processCommand(request, session); + session.out << (";"); + } + session.out << std::endl; + return 1; + } + + for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { + if(request[1].equals((*subscription)->id)) { + if(request[0].equals("unpublish")) { + subscriptions.erase(subscription); + } else if(request[0].equals("subscribe")) { + (*subscription)->subscribe(session); + } else if(request[0].equals("unsubscribe")) { + (*subscription)->unsubscribe(session); + } else if(request[0].equals("event")) { + if((*subscription)->owner == &session) { + std::stringstream out; + (*subscription)->process(request, out); + (*subscription)->event(out); + } + else + return 0; + } + return 1; + } + } + return 0; + } + +} diff --git a/SubscriptionManager.h b/SubscriptionManager.h new file mode 100644 index 0000000..aa11409 --- /dev/null +++ b/SubscriptionManager.h @@ -0,0 +1,27 @@ +#ifndef __SubscriptionManager_h__ +#define __SubscriptionManager_h__ + +#include "TCPSession.h" +#include "Subscription.h" +#include "Command.h" +#include "ZString.h" +#include + +namespace core { + + class SubscriptionManager : public Command { + + public: + SubscriptionManager(); + + int removeSessionSubscriptions(TCPSession &session); + + int processCommand(coreutils::ZString &request, TCPSession &session) override; + + private: + std::vector subscriptions; + + }; +} + +#endif diff --git a/TCPServer.cpp b/TCPServer.cpp index eeeb269..d75819d 100644 --- a/TCPServer.cpp +++ b/TCPServer.cpp @@ -5,10 +5,17 @@ #include "Log.h" namespace core { - + TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, std::string text) : TCPSocket(ePoll, text), commands(delimiter) { - + + commands.add(subscriptions, "publish"); + commands.add(subscriptions, "unpublish"); + commands.add(subscriptions, "subscribe"); + commands.add(subscriptions, "unsubscribe"); + commands.add(subscriptions, "catalog"); + commands.add(subscriptions, "event"); + setDescriptor(socket(AF_INET, SOCK_STREAM, 0)); int yes = 1; setsockopt(getDescriptor(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); @@ -17,24 +24,24 @@ namespace core { if(listen(getDescriptor(), 20) < 0) throw coreutils::Exception("Error on listen to socket"); } - + TCPServer::~TCPServer() { coreutils::Log(coreutils::LOG_DEBUG_2) << "Closing server socket " << getDescriptor() << "."; close(getDescriptor()); } - + void TCPServer::onDataReceived(std::string data) { lock.lock(); TCPSession *session = accept(); if(session) - sessions.push_back(session); + sessions.push_back(session); lock.unlock(); } - + TCPSession * TCPServer::accept() { - + try { - + TCPSession *session = getSocketAccept(ePoll); session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength)); // if(blackList && blackList->contains(session->ipAddress.getClientAddress())) { @@ -55,28 +62,28 @@ namespace core { catch(...) { coreutils::Log(coreutils::LOG_EXCEPT) << "Unnspecified error on session initialization."; } - return NULL; + return NULL; } - + void TCPServer::removeFromSessionList(TCPSession *session) { std::vector::iterator cursor; for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) if(*cursor == session) sessions.erase(cursor); } - + void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { throw coreutils::Exception(errorString); } - + TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) { return new TCPSession(ePoll, *this); } - + void TCPServer::output(std::stringstream &out) { out << "Use the 'help' command to list the commands for this server." << std::endl; } - + int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) { int sequence = 0; for(auto *sessionx : sessions) { @@ -86,20 +93,20 @@ namespace core { } return 1; } - + void TCPServer::sendToAll(std::stringstream &data) { for(auto session : sessions) session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { for(auto session : sessions) if(session != &sender) session->write(data.str()); data.str(""); } - + void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { for(auto session : sessions) if(filter.test(*session)) @@ -107,5 +114,5 @@ namespace core { session->write(data.str()); data.str(""); } - + } diff --git a/TCPServer.h b/TCPServer.h index 21c4560..aad7a2c 100644 --- a/TCPServer.h +++ b/TCPServer.h @@ -6,6 +6,7 @@ #include "IPAddressList.h" #include "Command.h" #include "CommandList.h" +#include "SubscriptionManager.h" namespace core { @@ -40,7 +41,7 @@ namespace core { /// The destructor for this object. /// - ~TCPServer(); + virtual ~TCPServer(); virtual void sessionErrorHandler(std::string errorString, std::stringstream &out); @@ -135,6 +136,7 @@ namespace core { TCPSession * accept(); std::mutex lock; + SubscriptionManager subscriptions; }; diff --git a/TCPSession.cpp b/TCPSession.cpp index ed285d5..ef51955 100644 --- a/TCPSession.cpp +++ b/TCPSession.cpp @@ -9,6 +9,7 @@ namespace core { TCPSession::~TCPSession() { server.removeFromSessionList(this); + coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level."; } void TCPSession::output(std::stringstream &data) { @@ -16,22 +17,22 @@ namespace core { } void TCPSession::protocol(coreutils::ZString &data) { - if(!server.commands.processRequest(data, *this)) { - coreutils::Log(coreutils::LOG_DEBUG_1) << data.str(); - } + if(!server.commands.processRequest(data, *this)) { + coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str(); + } } - + void TCPSession::onRegistered() { onConnected(); coreutils::ZString blank(""); protocol(blank); send(); if(term) - shutdown("termination requested"); + shutdown("termination requested"); } - + void TCPSession::onConnected() {} - + void TCPSession::onDataReceived(coreutils::ZString &data) { if(data.getLength() > 0) { lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); @@ -63,32 +64,32 @@ namespace core { } } } - + void TCPSession::setBlockSize(int blockSize) { this->blockSize = blockSize; } - + void TCPSession::onLineReceived(coreutils::ZString &line) { protocol(line); send(); if(term) shutdown("termination requested"); } - + void TCPSession::onBlockReceived(coreutils::ZString &block) { coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]"; if(term) shutdown("termination requested"); } - + void TCPSession::send() { if(out.tellp() > 0) write(out.str()); out.str(""); } - + void TCPSession::terminate() { term = true; } - + } diff --git a/TCPSession.h b/TCPSession.h index e0a9edf..61a26b9 100644 --- a/TCPSession.h +++ b/TCPSession.h @@ -35,7 +35,7 @@ namespace core { /// /// - ~TCPSession(); + virtual ~TCPSession(); Command *grab = NULL; diff --git a/TCPSocket.h b/TCPSocket.h index fd888db..94bb97b 100644 --- a/TCPSocket.h +++ b/TCPSocket.h @@ -23,7 +23,7 @@ namespace core { TCPSocket(EPoll &ePoll); TCPSocket(EPoll &ePoll, std::string text); - ~TCPSocket(); + virtual ~TCPSocket(); void connect(IPAddress &address); diff --git a/servercore.code-workspace b/servercore.code-workspace index 876a149..5a0d140 100644 --- a/servercore.code-workspace +++ b/servercore.code-workspace @@ -4,5 +4,9 @@ "path": "." } ], - "settings": {} + "settings": { + "files.associations": { + "iosfwd": "c" + } + } } \ No newline at end of file