Fixing command list delimiter depth.

This commit is contained in:
Brad Arant 2022-07-26 16:07:37 -07:00
parent 45aee7ab6f
commit 9af17daed6
5 changed files with 101 additions and 105 deletions

View File

@ -2,50 +2,47 @@
#include "Log.h" #include "Log.h"
namespace core { namespace core {
CommandList::CommandList(std::string delimiter, int depth) : delimiter(delimiter), depth(depth) {} CommandList::CommandList(std::string delimiter, int depth) : delimiter(delimiter), depth(depth) {}
void CommandList::add(Command &command, std::string name) { void CommandList::add(Command &command, std::string name) {
commands.insert(std::make_pair(name, &command)); commands.insert(std::make_pair(name, &command));
} }
void CommandList::remove(Command &command) {} void CommandList::remove(Command &command) {}
bool CommandList::processRequest(coreutils::ZString &request, TCPSession &session) { int CommandList::processRequest(coreutils::ZString &request, TCPSession &session) {
if(session.grab != NULL) if(session.grab != NULL)
return session.grab->processCommand(request, session); return session.grab->processCommand(request, session);
else { else {
if(request.equals("")) if(request.equals(""))
return false; return 0;
request.split(delimiter, 10); request.split(delimiter, depth);
request.reset(); request.reset();
try { try {
auto command = commands.at(request[0].str()); auto command = commands.at(request[0].str());
return command->processCommand(request, session); return command->processCommand(request, session);
} }
catch(...) { catch(...) {
return false; return 0;
} }
} }
return true; return true;
} }
bool CommandList::grabInput(TCPSession &session, Command &command) { bool CommandList::grabInput(TCPSession &session, Command &command) {
session.grab = &command; session.grab = &command;
return true; return true;
} }
void CommandList::clearGrab(TCPSession &session) { void CommandList::clearGrab(TCPSession &session) {
session.grab = NULL; session.grab = NULL;
} }
int CommandList::processCommand(coreutils::ZString &request, TCPSession &session) { int CommandList::processCommand(coreutils::ZString &request, TCPSession &session) {
// for(Command *command : commands) // for(Command *command : commands)
// session.out << command->getName() << std::endl; // session.out << command->getName() << std::endl;
return true; return true;
} }
} }

View File

@ -40,7 +40,7 @@ namespace core {
/// then control is given to the process handler holding the grab on the input. /// then control is given to the process handler holding the grab on the input.
/// ///
bool processRequest(coreutils::ZString &request, TCPSession &session); int processRequest(coreutils::ZString &request, TCPSession &session);
/// ///
/// Use grabInput() within a Command object to force the requesting handler to receive /// Use grabInput() within a Command object to force the requesting handler to receive

View File

@ -1 +0,0 @@
0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos: 0.000000| pos:

View File

@ -8,31 +8,31 @@ namespace core {
TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, int depth, std::string text) TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, int depth, std::string text)
: TCPSocket(ePoll, text), commands(delimiter, depth) { : TCPSocket(ePoll, text), commands(delimiter, depth) {
commands.add(subscriptions, "publish"); commands.add(subscriptions, "publish");
commands.add(subscriptions, "unpublish"); commands.add(subscriptions, "unpublish");
commands.add(subscriptions, "subscribe"); commands.add(subscriptions, "subscribe");
commands.add(subscriptions, "unsubscribe"); commands.add(subscriptions, "unsubscribe");
commands.add(subscriptions, "catalog"); commands.add(subscriptions, "catalog");
commands.add(subscriptions, "event"); commands.add(subscriptions, "event");
setDescriptor(socket(AF_INET, SOCK_STREAM, 0)); setDescriptor(socket(AF_INET, SOCK_STREAM, 0));
int yes = 1; int yes = 1;
setsockopt(getDescriptor(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); setsockopt(getDescriptor(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if(bind(getDescriptor(), address.getPointer(), address.addressLength) < 0) if(bind(getDescriptor(), address.getPointer(), address.addressLength) < 0)
throw coreutils::Exception("Error on bind to socket: " + std::to_string(errno)); throw coreutils::Exception("Error on bind to socket: " + std::to_string(errno));
if(listen(getDescriptor(), 20) < 0) if(listen(getDescriptor(), 20) < 0)
throw coreutils::Exception("Error on listen to socket"); throw coreutils::Exception("Error on listen to socket");
} }
TCPServer::~TCPServer() { TCPServer::~TCPServer() {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Closing server socket " << getDescriptor() << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Closing server socket " << getDescriptor() << ".";
close(getDescriptor()); close(getDescriptor());
} }
void TCPServer::onDataReceived(std::string data) { void TCPServer::onDataReceived(std::string data) {
lock.lock(); lock.lock();
TCPSession *session = accept(); TCPSession *session = accept();
@ -40,11 +40,11 @@ namespace core {
sessions.push_back(session); sessions.push_back(session);
lock.unlock(); lock.unlock();
} }
TCPSession * TCPServer::accept() { TCPSession * TCPServer::accept() {
try { try {
TCPSession *session = getSocketAccept(ePoll); TCPSession *session = getSocketAccept(ePoll);
session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength)); session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength));
// if(blackList && blackList->contains(session->ipAddress.getClientAddress())) { // if(blackList && blackList->contains(session->ipAddress.getClientAddress())) {
@ -67,30 +67,30 @@ namespace core {
} }
return NULL; return NULL;
} }
void TCPServer::removeFromSessionList(TCPSession *session) { void TCPServer::removeFromSessionList(TCPSession *session) {
std::vector<TCPSession *>::iterator cursor; std::vector<TCPSession *>::iterator cursor;
lock.lock(); lock.lock();
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session) { if(*cursor == session) {
sessions.erase(cursor); sessions.erase(cursor);
break; break;
} }
lock.unlock(); lock.unlock();
} }
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) {
throw coreutils::Exception(errorString); throw coreutils::Exception(errorString);
} }
TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) { TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) {
return new TCPSession(ePoll, *this); return new TCPSession(ePoll, *this);
} }
void TCPServer::output(std::stringstream &out) { void TCPServer::output(std::stringstream &out) {
out << "Use the 'help' command to list the commands for this server." << std::endl; out << "Use the 'help' command to list the commands for this server." << std::endl;
} }
int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) { int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) {
int sequence = 0; int sequence = 0;
for(auto *sessionx : sessions) { for(auto *sessionx : sessions) {
@ -100,26 +100,26 @@ namespace core {
} }
return 1; return 1;
} }
void TCPServer::sendToAll(std::stringstream &data) { void TCPServer::sendToAll(std::stringstream &data) {
for(auto session : sessions) for(auto session : sessions)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) {
for(auto session : sessions) for(auto session : sessions)
if(session != &sender) if(session != &sender)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) {
for(auto session : sessions) for(auto session : sessions)
if(filter.test(*session)) if(filter.test(*session))
if(session != &sender) if(session != &sender)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
} }

View File

@ -6,92 +6,92 @@
namespace core { namespace core {
TCPSession::TCPSession(EPoll &ePoll, TCPServer &server, std::string text) : TCPSocket(ePoll, text), server(server) {} TCPSession::TCPSession(EPoll &ePoll, TCPServer &server, std::string text) : TCPSocket(ePoll, text), server(server) {}
TCPSession::~TCPSession() { TCPSession::~TCPSession() {
server.removeFromSessionList(this); server.removeFromSessionList(this);
server.subscriptions.removeSessionSubscriptions(*this); server.subscriptions.removeSessionSubscriptions(*this);
} }
void TCPSession::output(std::stringstream &data) { void TCPSession::output(std::stringstream &data) {
data << "|" << ipAddress.getClientAddressAndPort(); data << "|" << ipAddress.getClientAddressAndPort();
} }
void TCPSession::protocol(coreutils::ZString &data) { void TCPSession::protocol(coreutils::ZString &data) {
if(data.getLength() != 0) { if(data.getLength() != 0) {
if(!server.commands.processRequest(data, *this)) { if(server.commands.processRequest(data, *this) == 0) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str(); coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str();
} }
} }
} }
void TCPSession::onRegistered() { void TCPSession::onRegistered() {
onConnected(); onConnected();
coreutils::ZString blank(""); coreutils::ZString blank("");
protocol(blank); protocol(blank);
send(); send();
if(term) if(term)
shutdown("termination requested"); shutdown("termination requested");
} }
void TCPSession::onConnected() {} void TCPSession::onConnected() {}
void TCPSession::onDataReceived(coreutils::ZString &data) { void TCPSession::onDataReceived(coreutils::ZString &data) {
if(data.getLength() > 0) { if(data.getLength() > 0) {
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength());
memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength()); memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength());
lineBufferSize += data.getLength(); lineBufferSize += data.getLength();
while(lineBufferSize > 0) { while(lineBufferSize > 0) {
if(blockSize == 0) { if(blockSize == 0) {
lineLength = strcspn(lineBuffer, "\r\n"); lineLength = strcspn(lineBuffer, "\r\n");
if(lineLength == lineBufferSize) if(lineLength == lineBufferSize)
break; break;
coreutils::ZString zLine(lineBuffer, lineLength); coreutils::ZString zLine(lineBuffer, lineLength);
onLineReceived(zLine); onLineReceived(zLine);
if(lineBuffer[lineLength] == '\r') if(lineBuffer[lineLength] == '\r')
++lineLength; ++lineLength;
if(lineBuffer[lineLength] == '\n') if(lineBuffer[lineLength] == '\n')
++lineLength; ++lineLength;
lineBufferSize -= lineLength; lineBufferSize -= lineLength;
if(lineBufferSize > 0) if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
} else if(lineBufferSize >= blockLength) { } else if(lineBufferSize >= blockLength) {
coreutils::ZString zBlock(lineBuffer, blockLength); coreutils::ZString zBlock(lineBuffer, blockLength);
onBlockReceived(zBlock); onBlockReceived(zBlock);
lineBufferSize -= blockLength; lineBufferSize -= blockLength;
if(lineBufferSize > 0) if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize); memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize);
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
} }
} }
} }
} }
void TCPSession::setBlockSize(int blockSize) { void TCPSession::setBlockSize(int blockSize) {
this->blockSize = blockSize; this->blockSize = blockSize;
} }
void TCPSession::onLineReceived(coreutils::ZString &line) { void TCPSession::onLineReceived(coreutils::ZString &line) {
protocol(line); protocol(line);
send(); send();
if(term) if(term)
shutdown("termination requested"); shutdown("termination requested");
} }
void TCPSession::onBlockReceived(coreutils::ZString &block) { void TCPSession::onBlockReceived(coreutils::ZString &block) {
coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]"; coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]";
if(term) if(term)
shutdown("termination requested"); shutdown("termination requested");
} }
void TCPSession::send() { void TCPSession::send() {
if(out.tellp() > 0) if(out.tellp() > 0)
write(out.str()); write(out.str());
out.str(""); out.str("");
} }
void TCPSession::terminate() { void TCPSession::terminate() {
term = true; term = true;
} }
} }