Mods to improve stability for Zephora.

This commit is contained in:
Brad Arant 2022-03-14 18:48:58 +00:00
parent d13e3f9fb5
commit 03a3799888
7 changed files with 47 additions and 30 deletions

View File

@ -2,6 +2,7 @@
#include "EPoll.h" #include "EPoll.h"
#include "Command.h" #include "Command.h"
#include "Exception.h" #include "Exception.h"
#include <signal.h>
namespace core { namespace core {

View File

@ -6,6 +6,8 @@
namespace core { namespace core {
void sigpipe_handler(int unused) {}
Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "]."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "].";
buffer = (char *)malloc(4096); buffer = (char *)malloc(4096);
@ -110,7 +112,7 @@ namespace core {
if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) {
coreutils::ZString zbuffer(buffer.getData(), len); coreutils::ZString zbuffer(buffer.getData(), len);
// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer;
onDataReceived(zbuffer); onDataReceived(zbuffer);
} }
else { else {
@ -140,8 +142,8 @@ namespace core {
outlock.lock(); outlock.lock();
::write(descriptor, fifo.front().c_str(), fifo.front().length()); ::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop(); fifo.pop();
if(shutDown && !needsToWrite()) // if(shutDown && !needsToWrite())
delete this; // delete this;
outlock.unlock(); outlock.unlock();
} }
} }

View File

@ -3,8 +3,8 @@
#include "Log.h" #include "Log.h"
#include <algorithm> #include <algorithm>
namespace core { namespace core
{
Subscription::Subscription(std::string id, std::string mode) Subscription::Subscription(std::string id, std::string mode)
: id(id), mode(mode), owner(NULL) {} : id(id), mode(mode), owner(NULL) {}
@ -12,24 +12,30 @@ namespace core {
Subscription::Subscription(std::string id, TCPSession &session, std::string mode) Subscription::Subscription(std::string id, TCPSession &session, std::string mode)
: id(id), mode(mode), owner(&session) {} : id(id), mode(mode), owner(&session) {}
Subscription::~Subscription() { Subscription::~Subscription()
{
coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor...."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor....";
std::stringstream out; std::stringstream out;
out << "cancel:" << id << std::endl; out << "cancel:" << id << std::endl;
for(auto subscriber : subscribers) { for (auto subscriber : subscribers)
{
subscriber->write(out.str()); subscriber->write(out.str());
} }
} }
int Subscription::subscribe(TCPSession &session) { int Subscription::subscribe(TCPSession &session)
{
onSubscribe(session); onSubscribe(session);
subscribers.push_back(&session); subscribers.push_back(&session);
return 1; return 1;
} }
int Subscription::unsubscribe(TCPSession &session) { int Subscription::unsubscribe(TCPSession &session)
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { {
if(*subscriber == &session) { for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{
if (*subscriber == &session)
{
subscribers.erase(subscriber++); subscribers.erase(subscriber++);
return 1; return 1;
} }
@ -37,22 +43,26 @@ namespace core {
return 0; return 0;
} }
int Subscription::process(coreutils::ZString &request, std::stringstream &out) { int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session)
{
out << "event:" << request[1] << ":" << request[2] << std::endl; out << "event:" << request[1] << ":" << request[2] << std::endl;
return 1; return 1;
} }
int Subscription::event(std::stringstream &out) { int Subscription::event(std::stringstream &out)
{
for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
(*subscriber)->write(out.str()); (*subscriber)->write(out.str());
return 1; return 1;
} }
bool Subscription::ifSubscriber(TCPSession &session) { bool Subscription::ifSubscriber(TCPSession &session)
{
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
} }
int Subscription::onSubscribe(TCPSession &session) { int Subscription::onSubscribe(TCPSession &session)
{
return 0; return 0;
} }

View File

@ -5,11 +5,13 @@
#include <vector> #include <vector>
#include <string> #include <string>
namespace core { namespace core
{
class TCPSession; class TCPSession;
class Subscription { class Subscription
{
public: public:
Subscription(std::string id, std::string mode = "*AUTHOR"); Subscription(std::string id, std::string mode = "*AUTHOR");
@ -19,7 +21,8 @@ namespace core {
int subscribe(TCPSession &session); int subscribe(TCPSession &session);
int unsubscribe(TCPSession &session); int unsubscribe(TCPSession &session);
virtual int process(coreutils::ZString &request, std::stringstream &out); virtual int process(coreutils::ZString &request, std::stringstream &out, TCPSession &session);
virtual int onSubscribe(TCPSession &session); virtual int onSubscribe(TCPSession &session);
int event(std::stringstream &out); int event(std::stringstream &out);
@ -33,7 +36,6 @@ namespace core {
TCPSession *owner; TCPSession *owner;
std::vector<TCPSession *> subscribers; std::vector<TCPSession *> subscribers;
}; };
} }

View File

@ -64,7 +64,7 @@ namespace core {
return 1; return 1;
} else if(request[0].equals("event")) { } else if(request[0].equals("event")) {
std::stringstream out; std::stringstream out;
subscription->process(request, out); subscription->process(request, out, session);
if(subscription->mode == "*ANYONE") { if(subscription->mode == "*ANYONE") {
subscription->event(out); subscription->event(out);
return 1; return 1;

View File

@ -70,9 +70,11 @@ namespace core {
void TCPServer::removeFromSessionList(TCPSession *session) { void TCPServer::removeFromSessionList(TCPSession *session) {
std::vector<TCPSession *>::iterator cursor; std::vector<TCPSession *>::iterator cursor;
lock.lock();
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session) if(*cursor == session)
sessions.erase(cursor); sessions.erase(cursor);
lock.unlock();
} }
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) {

Binary file not shown.