diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index be23ad4..7adc94f 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -11,7 +11,8 @@ "cStandard": "c17", "cppStandard": "gnu++20", "intelliSenseMode": "windows-gcc-x64", - "compileCommands": "./compile" + "compileCommands": "./compile", + "configurationProvider": "ms-vscode.cmake-tools" }, { "name": "config", diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..82cfcc9 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cmake.configureOnOpen": false +} \ No newline at end of file diff --git a/CommandList.cpp b/CommandList.cpp index 23a30b5..b3ab300 100644 --- a/CommandList.cpp +++ b/CommandList.cpp @@ -17,11 +17,17 @@ namespace core { else { if(request.equals("")) return false; - request.split(delimiter); - auto command = commands[request[0].str()]; - return command->processCommand(request, session); + request.split(delimiter, 2); + request.reset(); + try { + auto command = commands.at(request[0].str()); + return command->processCommand(request, session); + } + catch(...) { + return false; + } } - return false; + return true; } bool CommandList::grabInput(TCPSession &session, Command &command) { diff --git a/ConsoleSession.cpp b/ConsoleSession.cpp index 28c6f63..a21affa 100644 --- a/ConsoleSession.cpp +++ b/ConsoleSession.cpp @@ -33,6 +33,7 @@ namespace core { out << "Password: "; status = WAIT_PASSWORD; break; + case WAIT_PASSWORD: status = PROMPT; protocol(blank); diff --git a/SessionFilter.h b/SessionFilter.h index 8cca085..56c27df 100644 --- a/SessionFilter.h +++ b/SessionFilter.h @@ -4,18 +4,18 @@ //#include "Session.h" namespace core { - + class TCPSession; - + class SessionFilter : public Object { - + public: virtual bool test(TCPSession &session) { return true; - } - + } + }; - + } #endif diff --git a/Socket.cpp b/Socket.cpp index 5e05ccf..c065148 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -5,13 +5,13 @@ #include "Log.h" namespace core { - + Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "]."; buffer = (char *)malloc(4096); length = 4096; } - + Socket::~Socket() { free(buffer); if(descriptor == -1) @@ -21,7 +21,7 @@ namespace core { coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; close(descriptor); } - + void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { shutdown("Too many files open"); @@ -37,84 +37,85 @@ namespace core { onRegistered(); lock.unlock(); } - + int Socket::getDescriptor() { return descriptor; } - + void Socket::setBufferSize(int length) { this->length = length; buffer = (char *)realloc(buffer, length); - + } - + int Socket::getBufferSize() { return length; } - + void Socket::onRegister() {} - + void Socket::onRegistered() {} - + void Socket::onUnregister() {} - + void Socket::onUnregistered() {} - + bool Socket::eventReceived(struct epoll_event event) { - + lock.lock(); - + if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); - lock.unlock(); + lock.unlock(); return false; } - + if(event.events & EPOLLIN) { coreutils::ZString zbuffer(buffer, length); - receiveData(zbuffer); + receiveData(zbuffer); } - + if(event.events & EPOLLWRNORM) { - writeSocket(); + writeSocket(); } - + if(event.events & EPOLLHUP) { shutdown(); - lock.unlock(); - return false; + lock.unlock(); + return false; } - + lock.unlock(); - + reset = true; return reset; } - + void Socket::onDataReceived(std::string data) { throw coreutils::Exception("Need to override onDataReceived.", __FILE__, __LINE__, -1); } - + void Socket::onDataReceived(coreutils::ZString &data) { onDataReceived(std::string(data.getData(), data.getLength())); } - + void Socket::receiveData(coreutils::ZString &buffer) { - + coreutils::ZString blank(""); - + if(buffer.getLength() <= 0) throw coreutils::Exception("Request to receive data with a zero buffer length.", __FILE__, __LINE__, -1); - + int len; int error = -1; - + // for(int ix = 0; ix < buffer.getLength(); ++ix) // buffer[ix] = 0; if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); +// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; onDataReceived(zbuffer); } else { @@ -129,27 +130,27 @@ namespace core { case ENOTCONN: onDataReceived(blank); break; - + case ECONNRESET: break; - + default: throw coreutils::Exception("Error in read of data from socket.", __FILE__, __LINE__, error); } } } - + void Socket::writeSocket() { if(fifo.size() > 0) { - outlock.lock(); + outlock.lock(); ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); - if(shutDown && !needsToWrite()) - delete this; - outlock.unlock(); + if(shutDown && !needsToWrite()) + delete this; + outlock.unlock(); } } - + int Socket::write(std::string data) { outlock.lock(); fifo.emplace(data); @@ -157,22 +158,22 @@ namespace core { ePoll.resetSocket(this); return 1; } - + void Socket::output(std::stringstream &out) { out << "|" << descriptor << "|"; } - + bool Socket::needsToWrite() { return fifo.size() > 0; } - + void Socket::shutdown(std::string text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; // if(!needsToWrite()) delete this; - + } - + } diff --git a/Subscription.cpp b/Subscription.cpp index 2d8201f..94231cd 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -2,7 +2,15 @@ namespace core { - Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {} + Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(session) {} + + Subscription::~Subscription() { + std::stringstream out; + out << "cancel:" << id << std::endl; + for(auto subscriber : subscribers) { + subscriber->write(out.str()); + } + } int Subscription::subscribe(TCPSession &session) { subscribers.push_back(&session); @@ -12,7 +20,7 @@ namespace core { int Subscription::unsubscribe(TCPSession &session) { for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { if(*subscriber == &session) { - subscribers.erase(subscriber); + subscribers.erase(subscriber++); return 1; } } diff --git a/Subscription.h b/Subscription.h index 1d2625d..93e307f 100644 --- a/Subscription.h +++ b/Subscription.h @@ -5,6 +5,7 @@ #include "Command.h" #include "ZString.h" #include +#include namespace core { @@ -12,6 +13,7 @@ namespace core { public: Subscription(std::string id, TCPSession &session); + ~Subscription(); int subscribe(TCPSession &session); int unsubscribe(TCPSession &session); @@ -22,7 +24,7 @@ namespace core { int processCommand(coreutils::ZString &request, TCPSession &session) override; std::string id; - TCPSession *owner; + TCPSession &owner; std::vector subscribers; diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index 56881b9..c93c530 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -6,49 +6,62 @@ namespace core { SubscriptionManager::SubscriptionManager() {} int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { - int count = 0; - for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { - count += (*subscription)->unsubscribe(session); + int countSubscribed = 0; + int countPublished = 0; + for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) { + countSubscribed += (*subscription).second->unsubscribe(session); + if(&(*subscription).second->owner == &session) { + subscription = subscriptions.erase(subscription); + delete (*subscription).second; + ++countPublished; + } else + ++subscription; + } - coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << count << " subscriptions."; - return count; + coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; + coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; + return countSubscribed; } int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { + coreutils::Log(coreutils::LOG_DEBUG_2) << "Processing subscription request: " << request << "."; + if(request[0].equals("publish")) { - subscriptions.push_back(new Subscription(request[1].str(), session)); + Subscription *newSubscription = new Subscription(request[1].str(), session); + subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); + return 1; } else if(request[0].equals("catalog")) { session.out << ":catalog:"; - for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { - session.out << (*subscription)->id << "|"; - (*subscription)->processCommand(request, session); + for(auto const& [key, subscription] : subscriptions) { + session.out << subscription->id << "|"; + subscription->processCommand(request, session); session.out << (";"); } session.out << std::endl; return 1; } - for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) { - if(request[1].equals((*subscription)->id)) { - if(request[0].equals("unpublish")) { - subscriptions.erase(subscription); - } else if(request[0].equals("subscribe")) { - (*subscription)->subscribe(session); - } else if(request[0].equals("unsubscribe")) { - (*subscription)->unsubscribe(session); - } else if(request[0].equals("event")) { - if((*subscription)->owner == &session) { - std::stringstream out; - (*subscription)->process(request, out); - (*subscription)->event(out); - } - else - return 0; + auto subscription = subscriptions[request[1].str()]; + + if(request[1].equals(subscription->id)) { + if(request[0].equals("unpublish")) { + subscriptions.erase(request[1].str()); + } else if(request[0].equals("subscribe")) { + subscription->subscribe(session); + } else if(request[0].equals("unsubscribe")) { + subscription->unsubscribe(session); + } else if(request[0].equals("event")) { + if(&subscription->owner == &session) { + std::stringstream out; + subscription->process(request, out); + subscription->event(out); } - return 1; + else + return 0; } + return 1; } return 0; } diff --git a/SubscriptionManager.h b/SubscriptionManager.h index aa11409..0f32db9 100644 --- a/SubscriptionManager.h +++ b/SubscriptionManager.h @@ -6,6 +6,7 @@ #include "Command.h" #include "ZString.h" #include +#include namespace core { @@ -19,7 +20,7 @@ namespace core { int processCommand(coreutils::ZString &request, TCPSession &session) override; private: - std::vector subscriptions; + std::map subscriptions; }; } diff --git a/TCPServer.h b/TCPServer.h index 0bd3a1c..98aa8f3 100644 --- a/TCPServer.h +++ b/TCPServer.h @@ -109,6 +109,12 @@ namespace core { void sendToAll(std::stringstream &out, TCPSession &sender); + /// + /// The Subscription Manager tracks all subscriptions on the server. + /// + + SubscriptionManager subscriptions; + protected: /// @@ -136,7 +142,6 @@ namespace core { TCPSession * accept(); std::mutex lock; - SubscriptionManager subscriptions; }; diff --git a/TCPSession.cpp b/TCPSession.cpp index ef51955..45bd1e0 100644 --- a/TCPSession.cpp +++ b/TCPSession.cpp @@ -9,6 +9,7 @@ namespace core { TCPSession::~TCPSession() { server.removeFromSessionList(this); + server.subscriptions.removeSessionSubscriptions(*this); coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level."; } @@ -17,8 +18,10 @@ namespace core { } void TCPSession::protocol(coreutils::ZString &data) { - if(!server.commands.processRequest(data, *this)) { - coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str(); + if(data.getLength() != 0) { + if(!server.commands.processRequest(data, *this)) { + coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str(); + } } } @@ -35,33 +38,33 @@ namespace core { void TCPSession::onDataReceived(coreutils::ZString &data) { if(data.getLength() > 0) { - lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); - memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength()); - lineBufferSize += data.getLength(); - while(lineBufferSize > 0) { - if(blockSize == 0) { - lineLength = strcspn(lineBuffer, "\r\n"); - if(lineLength == lineBufferSize) - break; + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); + memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength()); + lineBufferSize += data.getLength(); + while(lineBufferSize > 0) { + if(blockSize == 0) { + lineLength = strcspn(lineBuffer, "\r\n"); + if(lineLength == lineBufferSize) + break; coreutils::ZString zLine(lineBuffer, lineLength); - onLineReceived(zLine); - if(lineBuffer[lineLength] == '\r') - ++lineLength; - if(lineBuffer[lineLength] == '\n') - ++lineLength; - lineBufferSize -= lineLength; - if(lineBufferSize > 0) - memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); - lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); + onLineReceived(zLine); + if(lineBuffer[lineLength] == '\r') + ++lineLength; + if(lineBuffer[lineLength] == '\n') + ++lineLength; + lineBufferSize -= lineLength; + if(lineBufferSize > 0) + memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); } else if(lineBufferSize >= blockLength) { - coreutils::ZString zBlock(lineBuffer, blockLength); - onBlockReceived(zBlock); - lineBufferSize -= blockLength; - if(lineBufferSize > 0) - memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize); - lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); - } - } + coreutils::ZString zBlock(lineBuffer, blockLength); + onBlockReceived(zBlock); + lineBufferSize -= blockLength; + if(lineBufferSize > 0) + memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize); + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); + } + } } } @@ -84,7 +87,7 @@ namespace core { void TCPSession::send() { if(out.tellp() > 0) - write(out.str()); + write(out.str()); out.str(""); } diff --git a/compile b/compile index 404498b..ca6498e 100755 --- a/compile +++ b/compile @@ -5,7 +5,7 @@ do filename="${file%.*}" list="$list $filename.o" echo -n "Compiling $filename..." - g++ -g -c -I../CoreUtils $file & + g++ -g -c -std=c++17 -I../CoreUtils $file & if [ $? = '0' ] then echo "OK" diff --git a/output/main b/output/main index 3b68dd8..99f2acb 100755 Binary files a/output/main and b/output/main differ