Merge branch 'develop' of ssh://barant.com/git/ServerCore into develop

This commit is contained in:
Brad Arant 2022-02-14 07:12:45 -08:00
commit e28468d79d
9 changed files with 97 additions and 48 deletions

View File

@ -67,8 +67,6 @@ namespace core {
if(event.events & EPOLLRDHUP) { if(event.events & EPOLLRDHUP) {
readHangup = true; readHangup = true;
shutdown("hangup received"); shutdown("hangup received");
lock.unlock();
return false;
} }
if(event.events & EPOLLIN) { if(event.events & EPOLLIN) {
@ -82,14 +80,11 @@ namespace core {
if(event.events & EPOLLHUP) { if(event.events & EPOLLHUP) {
shutdown(); shutdown();
lock.unlock();
return false;
} }
lock.unlock(); lock.unlock();
reset = true; return !shutDown;
return reset;
} }
void Socket::onDataReceived(std::string data) { void Socket::onDataReceived(std::string data) {
@ -172,7 +167,7 @@ namespace core {
shutDown = true; shutDown = true;
reset = false; reset = false;
// if(!needsToWrite()) // if(!needsToWrite())
delete this; // delete this;
} }
} }

View File

@ -53,7 +53,7 @@ namespace core {
/// ///
/// Use the shutdown() method to terminate the socket connection and remove resources. /// Use the shutdown() method to terminate the socket connection and remove resources.
/// This method is provided to ensure that all destructors are called for all inherited /// This method is provided to ensure that all destructors are called for all inherited
/// objects without a virtual destructor. /// objects with a virtual destructor.
/// ///
void shutdown(std::string text = "unknown"); void shutdown(std::string text = "unknown");
@ -76,6 +76,9 @@ namespace core {
/// from any user extended classes unless an epoll event is being /// from any user extended classes unless an epoll event is being
/// simulated. /// simulated.
/// ///
/// The return value of false will delete the socket object causing the destructors to run.
/// The return value of true will enable the socket on ePoll to receive more events.
///
bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks. bool eventReceived(struct epoll_event event); ///< Parse epoll event and call specified callbacks.

View File

@ -1,13 +1,19 @@
#include "Subscription.h" #include "Subscription.h"
#include "TCPSession.h" #include "TCPSession.h"
#include "Log.h"
#include <algorithm>
namespace core { namespace core {
Subscription::Subscription(std::string id) : id(id), owner(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session) : id(id), owner(&session) {} Subscription::Subscription(std::string id, std::string mode)
: id(id), mode(mode), owner(NULL) {}
Subscription::Subscription(std::string id, TCPSession &session, std::string mode)
: id(id), mode(mode), owner(&session) {}
Subscription::~Subscription() { Subscription::~Subscription() {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor....";
std::stringstream out; std::stringstream out;
out << "cancel:" << id << std::endl; out << "cancel:" << id << std::endl;
for(auto subscriber : subscribers) { for(auto subscriber : subscribers) {
@ -42,6 +48,10 @@ namespace core {
return 1; return 1;
} }
bool Subscription::ifSubscriber(TCPSession &session) {
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
}
int Subscription::onSubscribe(TCPSession &session) { int Subscription::onSubscribe(TCPSession &session) {
return 0; return 0;
} }

View File

@ -12,8 +12,8 @@ namespace core {
class Subscription { class Subscription {
public: public:
Subscription(std::string id); Subscription(std::string id, std::string mode = "*AUTHOR");
Subscription(std::string id, TCPSession &session); Subscription(std::string id, TCPSession &session, std::string mode = "*AUTHOR");
virtual ~Subscription(); virtual ~Subscription();
int subscribe(TCPSession &session); int subscribe(TCPSession &session);
@ -24,9 +24,12 @@ namespace core {
int event(std::stringstream &out); int event(std::stringstream &out);
bool ifSubscriber(TCPSession &session);
// int processCommand(coreutils::ZString &request, TCPSession &session) override; // int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id; std::string id;
std::string mode;
TCPSession *owner; TCPSession *owner;
std::vector<TCPSession *> subscribers; std::vector<TCPSession *> subscribers;

View File

@ -1,5 +1,6 @@
#include "SubscriptionManager.h" #include "SubscriptionManager.h"
#include "Log.h" #include "Log.h"
#include <algorithm>
namespace core { namespace core {
@ -13,15 +14,23 @@ namespace core {
int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) {
int countSubscribed = 0; int countSubscribed = 0;
int countPublished = 0; int countPublished = 0;
for(auto subscription = subscriptions.begin(); subscription != subscriptions.end();) {
countSubscribed += (*subscription).second->unsubscribe(session);
if((*subscription).second->owner == &session) {
subscription = subscriptions.erase(subscription);
// delete (*subscription).second;
++countPublished;
} else
++subscription;
std::string temp = "";
for(auto [key, subscription] : subscriptions) {
if(temp != "") {
subscriptions.erase(temp);
temp = "";
}
countSubscribed += subscription->unsubscribe(session);
if(subscription->owner == &session) {
temp = key;
delete subscription;
++countPublished;
}
}
if(temp != "") {
subscriptions.erase(temp);
temp = "";
} }
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
@ -30,15 +39,13 @@ namespace core {
int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) { int SubscriptionManager::processCommand(coreutils::ZString &request, TCPSession &session) {
if(request[0].equals("publish")) { if(request[0].equals("publish")) {
Subscription *newSubscription = new Subscription(request[1].str(), session); Subscription *newSubscription = new Subscription(request[1].str(), session, request[2].str());
subscriptions.insert(std::make_pair(request[1].str(), newSubscription)); subscriptions.insert(std::make_pair(request[1].str(), newSubscription));
return 1; return 1;
} else if(request[0].equals("catalog")) { } else if(request[0].equals("catalog")) {
session.out << ":catalog:"; session.out << ":catalog:";
for(auto const& [key, subscription] : subscriptions) { for(auto const& [key, subscription] : subscriptions) {
session.out << subscription->id << "|"; session.out << subscription->id << ";";
// subscription->processCommand(request, session);
session.out << (";");
} }
session.out << std::endl; session.out << std::endl;
return 1; return 1;
@ -51,19 +58,29 @@ namespace core {
subscriptions.erase(request[1].str()); subscriptions.erase(request[1].str());
} else if(request[0].equals("subscribe")) { } else if(request[0].equals("subscribe")) {
subscription->subscribe(session); subscription->subscribe(session);
return 1;
} else if(request[0].equals("unsubscribe")) { } else if(request[0].equals("unsubscribe")) {
subscription->unsubscribe(session); subscription->unsubscribe(session);
return 1;
} else if(request[0].equals("event")) { } else if(request[0].equals("event")) {
// coreutils::Log(coreutils::LOG_DEBUG_2) << "request: [" << request << "]."; std::stringstream out;
if(subscription->owner == &session) { subscription->process(request, out);
std::stringstream out; if(subscription->mode == "*ANYONE") {
subscription->process(request, out);
subscription->event(out); subscription->event(out);
return 1;
} else if(subscription->mode == "*SUBSCRIBERS") {
if(subscription->ifSubscriber(session)) {
subscription->event(out);
return 1;
}
} else if(subscription->mode == "*AUTHOR") {
if(subscription->owner == &session) {
subscription->event(out);
return 1;
}
} }
else
return 0;
} }
return 1; return 0;
} }
return 0; return 0;
} }

View File

@ -10,7 +10,6 @@ namespace core {
TCPSession::~TCPSession() { TCPSession::~TCPSession() {
server.removeFromSessionList(this); server.removeFromSessionList(this);
server.subscriptions.removeSessionSubscriptions(*this); server.subscriptions.removeSessionSubscriptions(*this);
coreutils::Log(coreutils::LOG_DEBUG_1) << "Terminating TCPSession level.";
} }
void TCPSession::output(std::stringstream &data) { void TCPSession::output(std::stringstream &data) {

View File

@ -4,6 +4,7 @@
namespace core { namespace core {
Thread::Thread(EPoll &ePoll) : ePoll(ePoll) {} Thread::Thread(EPoll &ePoll) : ePoll(ePoll) {}
Thread::Thread(EPoll &ePoll, ThreadScope *scope) : ePoll(ePoll) {}
Thread::~Thread() {} Thread::~Thread() {}
@ -45,25 +46,28 @@ namespace core {
while(1) { while(1) {
if(ePoll.isStopping()) if(ePoll.isStopping())
break; break;
status = "WAITING"; status = "WAITING";
int rc = epoll_wait(ePoll.getDescriptor(), events, 50, -1); int rc = epoll_wait(ePoll.getDescriptor(), events, 50, -1);
status = "RUNNING"; status = "RUNNING";
if(rc < 0) { if(rc < 0) {
// TODO: Make log entry indicating status received and ignore for now. // TODO: Make log entry indicating status received and ignore for now.
} else if(rc == 0) { } else if(rc == 0) {
break; break;
} else if(rc > 0) { } else if(rc > 0) {
for(int ix = 0; ix < rc; ++ix) { for(int ix = 0; ix < rc; ++ix) {
++count; ++count;
if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) {
ePoll.resetSocket((Socket *)events[ix].data.ptr); ePoll.resetSocket((Socket *)events[ix].data.ptr);
} } else {
} delete (Socket *)events[ix].data.ptr;
} }
}
}
}
coreutils::Log(coreutils::LOG_DEBUG_1) << "Thread ending with thread id " << threadId << "."; coreutils::Log(coreutils::LOG_DEBUG_1) << "Thread ending with thread id " << threadId << ".";
} }

View File

@ -5,6 +5,7 @@
#include "Log.h" #include "Log.h"
#include "Object.h" #include "Object.h"
#include "TCPSession.h" #include "TCPSession.h"
#include "ThreadScope.h"
namespace core { namespace core {
@ -22,6 +23,7 @@ namespace core {
public: public:
Thread(EPoll &ePoll); Thread(EPoll &ePoll);
Thread(EPoll &ePoll, ThreadScope *thread);
~Thread(); ~Thread();
/// ///
@ -43,6 +45,7 @@ namespace core {
void print_thread_start_log(); void print_thread_start_log();
pid_t threadId; pid_t threadId;
void run(); void run();
ThreadScope *thread;
}; };

15
ThreadScope.h Normal file
View File

@ -0,0 +1,15 @@
#ifndef __ThreadScope_h__
#define __ThreadScope_h__
namespace core {
class ThreadScope {
public:
ThreadScope() {}
};
}
#endif