This commit is contained in:
Brad Arant 2022-01-24 18:36:47 -08:00
parent 96d9295df8
commit 3ef82e0fce
14 changed files with 159 additions and 115 deletions

View File

@ -11,7 +11,8 @@
"cStandard": "c17",
"cppStandard": "gnu++20",
"intelliSenseMode": "windows-gcc-x64",
"compileCommands": "./compile"
"compileCommands": "./compile",
"configurationProvider": "ms-vscode.cmake-tools"
},
{
"name": "config",

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"cmake.configureOnOpen": false
}

View File

@ -17,11 +17,17 @@ namespace core {
else {
if(request.equals(""))
return false;
request.split(delimiter);
auto command = commands[request[0].str()];
return command->processCommand(request, session);
request.split(delimiter, 2);
request.reset();
try {
auto command = commands.at(request[0].str());
return command->processCommand(request, session);
}
catch(...) {
return false;
}
}
return false;
return true;
}
bool CommandList::grabInput(TCPSession &session, Command &command) {

View File

@ -33,6 +33,7 @@ namespace core {
out << "Password: ";
status = WAIT_PASSWORD;
break;
case WAIT_PASSWORD:
status = PROMPT;
protocol(blank);

View File

@ -4,18 +4,18 @@
//#include "Session.h"
namespace core {
class TCPSession;
class SessionFilter : public Object {
public:
virtual bool test(TCPSession &session) {
return true;
}
}
};
}
#endif

View File

@ -5,13 +5,13 @@
#include "Log.h"
namespace core {
Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "].";
buffer = (char *)malloc(4096);
length = 4096;
}
Socket::~Socket() {
free(buffer);
if(descriptor == -1)
@ -21,7 +21,7 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << ".";
close(descriptor);
}
void Socket::setDescriptor(int descriptor) {
if((descriptor == -1) && (errno == 24)) {
shutdown("Too many files open");
@ -37,84 +37,85 @@ namespace core {
onRegistered();
lock.unlock();
}
int Socket::getDescriptor() {
return descriptor;
}
void Socket::setBufferSize(int length) {
this->length = length;
buffer = (char *)realloc(buffer, length);
}
int Socket::getBufferSize() {
return length;
}
void Socket::onRegister() {}
void Socket::onRegistered() {}
void Socket::onUnregister() {}
void Socket::onUnregistered() {}
bool Socket::eventReceived(struct epoll_event event) {
lock.lock();
if(event.events & EPOLLRDHUP) {
readHangup = true;
shutdown("hangup received");
lock.unlock();
lock.unlock();
return false;
}
if(event.events & EPOLLIN) {
coreutils::ZString zbuffer(buffer, length);
receiveData(zbuffer);
receiveData(zbuffer);
}
if(event.events & EPOLLWRNORM) {
writeSocket();
writeSocket();
}
if(event.events & EPOLLHUP) {
shutdown();
lock.unlock();
return false;
lock.unlock();
return false;
}
lock.unlock();
reset = true;
return reset;
}
void Socket::onDataReceived(std::string data) {
throw coreutils::Exception("Need to override onDataReceived.", __FILE__, __LINE__, -1);
}
void Socket::onDataReceived(coreutils::ZString &data) {
onDataReceived(std::string(data.getData(), data.getLength()));
}
void Socket::receiveData(coreutils::ZString &buffer) {
coreutils::ZString blank("");
if(buffer.getLength() <= 0)
throw coreutils::Exception("Request to receive data with a zero buffer length.", __FILE__, __LINE__, -1);
int len;
int error = -1;
// for(int ix = 0; ix < buffer.getLength(); ++ix)
// buffer[ix] = 0;
if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) {
coreutils::ZString zbuffer(buffer.getData(), len);
// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer;
onDataReceived(zbuffer);
}
else {
@ -129,27 +130,27 @@ namespace core {
case ENOTCONN:
onDataReceived(blank);
break;
case ECONNRESET:
break;
default:
throw coreutils::Exception("Error in read of data from socket.", __FILE__, __LINE__, error);
}
}
}
void Socket::writeSocket() {
if(fifo.size() > 0) {
outlock.lock();
outlock.lock();
::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop();
if(shutDown && !needsToWrite())
delete this;
outlock.unlock();
if(shutDown && !needsToWrite())
delete this;
outlock.unlock();
}
}
int Socket::write(std::string data) {
outlock.lock();
fifo.emplace(data);
@ -157,22 +158,22 @@ namespace core {
ePoll.resetSocket(this);
return 1;
}
void Socket::output(std::stringstream &out) {
out << "|" << descriptor << "|";
}
bool Socket::needsToWrite() {
return fifo.size() > 0;
}
void Socket::shutdown(std::string text) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << ".";
shutDown = true;
reset = false;
// if(!needsToWrite())
delete this;
}
}

View File

@ -2,7 +2,15 @@
namespace core {
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {}
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(session) {}
Subscription::~Subscription() {
std::stringstream out;
out << "cancel:" << id << std::endl;
for(auto subscriber : subscribers) {
subscriber->write(out.str());
}
}
int Subscription::subscribe(TCPSession &session) {
subscribers.push_back(&session);
@ -12,7 +20,7 @@ namespace core {
int Subscription::unsubscribe(TCPSession &session) {
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) {
if(*subscriber == &session) {
subscribers.erase(subscriber);
subscribers.erase(subscriber++);
return 1;
}
}

View File

@ -5,6 +5,7 @@
#include "Command.h"
#include "ZString.h"
#include <vector>
#include <string>
namespace core {
@ -12,6 +13,7 @@ namespace core {
public:
Subscription(std::string id, TCPSession &session);
~Subscription();
int subscribe(TCPSession &session);
int unsubscribe(TCPSession &session);
@ -22,7 +24,7 @@ namespace core {
int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id;
TCPSession *owner;
TCPSession &owner;
std::vector<TCPSession *> subscribers;

View File

@ -6,49 +6,62 @@ namespace core {
SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int count = 0;
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
count += (*subscription)->unsubscribe(session);
int countSubscribed = 0;
int countPublished = 0;
for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) {
countSubscribed += (*subscription).second->unsubscribe(session);
if(&(*subscription).second->owner == &session) {
subscription = subscriptions.erase(subscription);
delete (*subscription).second;
++countPublished;
} else
++subscription;
}
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << count << " subscriptions.";
return count;
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
return countSubscribed;
}
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Processing subscription request: " << request << ".";
if(request[0].equals("publish")) {
subscriptions.push_back(new Subscription(request[1].str(), session));
Subscription *newSubscription = new Subscription(request[1].str(), session);
subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1;
} else if(request[0].equals("catalog")) {
session.out << ":catalog:";
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
session.out << (*subscription)->id << "|";
(*subscription)->processCommand(request, session);
for(auto const& [key, subscription] : subscriptions) {
session.out << subscription->id << "|";
subscription->processCommand(request, session);
session.out << (";");
}
session.out << std::endl;
return 1;
}
for(auto subscription = subscriptions.begin(); subscription < subscriptions.end(); ++subscription) {
if(request[1].equals((*subscription)->id)) {
if(request[0].equals("unpublish")) {
subscriptions.erase(subscription);
} else if(request[0].equals("subscribe")) {
(*subscription)->subscribe(session);
} else if(request[0].equals("unsubscribe")) {
(*subscription)->unsubscribe(session);
} else if(request[0].equals("event")) {
if((*subscription)->owner == &session) {
std::stringstream out;
(*subscription)->process(request, out);
(*subscription)->event(out);
}
else
return 0;
auto subscription = subscriptions[request[1].str()];
if(request[1].equals(subscription->id)) {
if(request[0].equals("unpublish")) {
subscriptions.erase(request[1].str());
} else if(request[0].equals("subscribe")) {
subscription->subscribe(session);
} else if(request[0].equals("unsubscribe")) {
subscription->unsubscribe(session);
} else if(request[0].equals("event")) {
if(&subscription->owner == &session) {
std::stringstream out;
subscription->process(request, out);
subscription->event(out);
}
return 1;
else
return 0;
}
return 1;
}
return 0;
}

View File

@ -6,6 +6,7 @@
#include "Command.h"
#include "ZString.h"
#include <vector>
#include <string>
namespace core {
@ -19,7 +20,7 @@ namespace core {
int processCommand(coreutils::ZString &request, TCPSession &session) override;
private:
std::vector<Subscription *> subscriptions;
std::map<std::string, Subscription *> subscriptions;
};
}

View File

@ -109,6 +109,12 @@ namespace core {
void sendToAll(std::stringstream &out, TCPSession &sender);
///
/// The Subscription Manager tracks all subscriptions on the server.
///
SubscriptionManager subscriptions;
protected:
///
@ -136,7 +142,6 @@ namespace core {
TCPSession * accept();
std::mutex lock;
SubscriptionManager subscriptions;
};

View File

@ -9,6 +9,7 @@ namespace core {
TCPSession::~TCPSession() {
server.removeFromSessionList(this);
server.subscriptions.removeSessionSubscriptions(*this);
coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level.";
}
@ -17,8 +18,10 @@ namespace core {
}
void TCPSession::protocol(coreutils::ZString &data) {
if(!server.commands.processRequest(data, *this)) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
if(data.getLength() != 0) {
if(!server.commands.processRequest(data, *this)) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
}
}
}
@ -35,33 +38,33 @@ namespace core {
void TCPSession::onDataReceived(coreutils::ZString &data) {
if(data.getLength() > 0) {
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength());
lineBufferSize += data.getLength();
while(lineBufferSize > 0) {
if(blockSize == 0) {
lineLength = strcspn(lineBuffer, "\r\n");
if(lineLength == lineBufferSize)
break;
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength());
lineBufferSize += data.getLength();
while(lineBufferSize > 0) {
if(blockSize == 0) {
lineLength = strcspn(lineBuffer, "\r\n");
if(lineLength == lineBufferSize)
break;
coreutils::ZString zLine(lineBuffer, lineLength);
onLineReceived(zLine);
if(lineBuffer[lineLength] == '\r')
++lineLength;
if(lineBuffer[lineLength] == '\n')
++lineLength;
lineBufferSize -= lineLength;
if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
onLineReceived(zLine);
if(lineBuffer[lineLength] == '\r')
++lineLength;
if(lineBuffer[lineLength] == '\n')
++lineLength;
lineBufferSize -= lineLength;
if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
} else if(lineBufferSize >= blockLength) {
coreutils::ZString zBlock(lineBuffer, blockLength);
onBlockReceived(zBlock);
lineBufferSize -= blockLength;
if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
}
}
coreutils::ZString zBlock(lineBuffer, blockLength);
onBlockReceived(zBlock);
lineBufferSize -= blockLength;
if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
}
}
}
}
@ -84,7 +87,7 @@ namespace core {
void TCPSession::send() {
if(out.tellp() > 0)
write(out.str());
write(out.str());
out.str("");
}

View File

@ -5,7 +5,7 @@ do
filename="${file%.*}"
list="$list $filename.o"
echo -n "Compiling $filename..."
g++ -g -c -I../CoreUtils $file &
g++ -g -c -std=c++17 -I../CoreUtils $file &
if [ $? = '0' ]
then
echo "OK"

Binary file not shown.