Merge branch 'develop' of ssh://barant.com/git/ServerCore into develop

This commit is contained in:
Brad Arant 2022-03-15 20:09:34 -07:00
commit 565f119653
10 changed files with 82 additions and 74 deletions

View File

@ -2,6 +2,7 @@
#include "EPoll.h" #include "EPoll.h"
#include "Command.h" #include "Command.h"
#include "Exception.h" #include "Exception.h"
#include <signal.h>
namespace core { namespace core {
@ -96,11 +97,11 @@ namespace core {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event);
} }
void EPoll::disableSocket(Socket *socket) { void EPoll::disableSocket(Socket *socket) {
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
} }
void EPoll::resetSocket(Socket *socket) { void EPoll::resetSocket(Socket *socket) {
struct epoll_event event; struct epoll_event event;
event.data.ptr = socket; event.data.ptr = socket;
@ -109,5 +110,5 @@ namespace core {
event.events |= EPOLLWRNORM; event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event);
} }
} }

View File

@ -6,6 +6,8 @@
namespace core { namespace core {
void sigpipe_handler(int unused) {}
Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "]."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Socket object created [" << text << "].";
buffer = (char *)malloc(4096); buffer = (char *)malloc(4096);
@ -15,7 +17,7 @@ namespace core {
Socket::~Socket() { Socket::~Socket() {
free(buffer); free(buffer);
if(descriptor == -1) if(descriptor == -1)
return; return;
onUnregister(); onUnregister();
ePoll.unregisterSocket(this); ePoll.unregisterSocket(this);
coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << ".";
@ -24,10 +26,9 @@ namespace core {
void Socket::setDescriptor(int descriptor) { void Socket::setDescriptor(int descriptor) {
if((descriptor == -1) && (errno == 24)) { if((descriptor == -1) && (errno == 24)) {
shutdown("Too many files open"); shutdown("Too many files open");
throw coreutils::Exception("Too many files open. Refusing connection."); throw coreutils::Exception("Too many files open. Refusing connection.");
} }
lock.lock();
coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket.";
if(descriptor < 3) if(descriptor < 3)
throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__);
@ -35,7 +36,6 @@ namespace core {
onRegister(); onRegister();
ePoll.registerSocket(this); ePoll.registerSocket(this);
onRegistered(); onRegistered();
lock.unlock();
} }
int Socket::getDescriptor() { int Socket::getDescriptor() {
@ -61,29 +61,22 @@ namespace core {
void Socket::onUnregistered() {} void Socket::onUnregistered() {}
bool Socket::eventReceived(struct epoll_event event) { bool Socket::eventReceived(struct epoll_event event) {
inHandler = true;
lock.lock();
if(event.events & EPOLLRDHUP) { if(event.events & EPOLLRDHUP) {
readHangup = true; readHangup = true;
shutdown("hangup received"); shutdown("hangup received");
} }
else if(event.events & EPOLLIN) {
if(event.events & EPOLLIN) {
coreutils::ZString zbuffer(buffer, length); coreutils::ZString zbuffer(buffer, length);
receiveData(zbuffer); receiveData(zbuffer);
} }
else if(event.events & EPOLLWRNORM) {
if(event.events & EPOLLWRNORM) {
writeSocket(); writeSocket();
} }
else if(event.events & EPOLLHUP) {
if(event.events & EPOLLHUP) {
shutdown(); shutdown();
} }
inHandler = false;
lock.unlock();
return !shutDown; return !shutDown;
} }
@ -105,12 +98,9 @@ namespace core {
int len; int len;
int error = -1; int error = -1;
// for(int ix = 0; ix < buffer.getLength(); ++ix)
// buffer[ix] = 0;
if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) {
coreutils::ZString zbuffer(buffer.getData(), len); coreutils::ZString zbuffer(buffer.getData(), len);
// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer;
onDataReceived(zbuffer); onDataReceived(zbuffer);
} }
else { else {
@ -138,10 +128,9 @@ namespace core {
void Socket::writeSocket() { void Socket::writeSocket() {
if(fifo.size() > 0) { if(fifo.size() > 0) {
outlock.lock(); outlock.lock();
::write(descriptor, fifo.front().c_str(), fifo.front().length()); if(!shutDown)
::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop(); fifo.pop();
if(shutDown && !needsToWrite())
delete this;
outlock.unlock(); outlock.unlock();
} }
} }
@ -150,7 +139,8 @@ namespace core {
outlock.lock(); outlock.lock();
fifo.emplace(data); fifo.emplace(data);
outlock.unlock(); outlock.unlock();
ePoll.resetSocket(this); if(!inHandler)
ePoll.resetSocket(this);
return 1; return 1;
} }
@ -166,8 +156,6 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << ".";
shutDown = true; shutDown = true;
reset = false; reset = false;
// if(!needsToWrite())
// delete this;
} }
} }

View File

@ -167,10 +167,10 @@ namespace core {
std::string text; std::string text;
int descriptor = -1; int descriptor = -1;
std::mutex lock; // std::mutex lock;
std::mutex outlock; std::mutex outlock;
bool readHangup = false; bool readHangup = false;
bool inHandler = false;
// struct epoll_event event; // Event selection construction structure. // struct epoll_event event; // Event selection construction structure.
//------------------------------------------------------------------------------------- //-------------------------------------------------------------------------------------

View File

@ -3,33 +3,38 @@
#include "Log.h" #include "Log.h"
#include <algorithm> #include <algorithm>
namespace core { namespace core
{
Subscription::Subscription(std::string id, std::string mode)
: id(id), mode(mode), owner(NULL) {}
Subscription::Subscription(std::string id, std::string mode) Subscription::Subscription(std::string id, TCPSession &session, std::string mode)
: id(id), mode(mode), owner(NULL) {} : id(id), mode(mode), owner(&session) {}
Subscription::Subscription(std::string id, TCPSession &session, std::string mode) Subscription::~Subscription()
: id(id), mode(mode), owner(&session) {} {
Subscription::~Subscription() {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor....";
std::stringstream out; std::stringstream out;
out << "cancel:" << id << std::endl; out << "cancel:" << id << std::endl;
for(auto subscriber : subscribers) { for (auto subscriber : subscribers)
{
subscriber->write(out.str()); subscriber->write(out.str());
} }
} }
int Subscription::subscribe(TCPSession &session) { int Subscription::subscribe(TCPSession &session)
onSubscribe(session); {
onSubscribe(session);
subscribers.push_back(&session); subscribers.push_back(&session);
return 1; return 1;
} }
int Subscription::unsubscribe(TCPSession &session) { int Subscription::unsubscribe(TCPSession &session)
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) { {
if(*subscriber == &session) { for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
{
if (*subscriber == &session)
{
subscribers.erase(subscriber++); subscribers.erase(subscriber++);
return 1; return 1;
} }
@ -37,23 +42,27 @@ namespace core {
return 0; return 0;
} }
int Subscription::process(coreutils::ZString &request, std::stringstream &out) { int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session)
{
out << "event:" << request[1] << ":" << request[2] << std::endl; out << "event:" << request[1] << ":" << request[2] << std::endl;
return 1; return 1;
} }
int Subscription::event(std::stringstream &out) { int Subscription::event(std::stringstream &out)
for(auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber) {
(*subscriber)->write(out.str()); for (auto subscriber = subscribers.begin(); subscriber < subscribers.end(); ++subscriber)
(*subscriber)->write(out.str());
return 1; return 1;
} }
bool Subscription::ifSubscriber(TCPSession &session) { bool Subscription::ifSubscriber(TCPSession &session)
{
return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end()); return (std::find(subscribers.begin(), subscribers.end(), &session) != subscribers.end());
} }
int Subscription::onSubscribe(TCPSession &session) { int Subscription::onSubscribe(TCPSession &session)
{
return 0; return 0;
} }
} }

View File

@ -5,11 +5,13 @@
#include <vector> #include <vector>
#include <string> #include <string>
namespace core { namespace core
{
class TCPSession; class TCPSession;
class Subscription { class Subscription
{
public: public:
Subscription(std::string id, std::string mode = "*AUTHOR"); Subscription(std::string id, std::string mode = "*AUTHOR");
@ -19,13 +21,14 @@ namespace core {
int subscribe(TCPSession &session); int subscribe(TCPSession &session);
int unsubscribe(TCPSession &session); int unsubscribe(TCPSession &session);
virtual int process(coreutils::ZString &request, std::stringstream &out); virtual int process(coreutils::ZString &request, std::stringstream &out, TCPSession &session);
virtual int onSubscribe(TCPSession &session); virtual int onSubscribe(TCPSession &session);
int event(std::stringstream &out); int event(std::stringstream &out);
bool ifSubscriber(TCPSession &session); bool ifSubscriber(TCPSession &session);
// int processCommand(coreutils::ZString &request, TCPSession &session) override; // int processCommand(coreutils::ZString &request, TCPSession &session) override;
std::string id; std::string id;
@ -33,7 +36,6 @@ namespace core {
TCPSession *owner; TCPSession *owner;
std::vector<TCPSession *> subscribers; std::vector<TCPSession *> subscribers;
}; };
} }

View File

@ -7,7 +7,9 @@ namespace core {
SubscriptionManager::SubscriptionManager() {} SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::add(Subscription &subscription) { int SubscriptionManager::add(Subscription &subscription) {
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription)); subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock();
return 1; return 1;
} }
@ -15,6 +17,7 @@ namespace core {
int countSubscribed = 0; int countSubscribed = 0;
int countPublished = 0; int countPublished = 0;
lock.lock();
std::string temp = ""; std::string temp = "";
for(auto [key, subscription] : subscriptions) { for(auto [key, subscription] : subscriptions) {
if(temp != "") { if(temp != "") {
@ -34,6 +37,7 @@ namespace core {
} }
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
lock.unlock();
return countSubscribed; return countSubscribed;
} }
@ -64,7 +68,7 @@ namespace core {
return 1; return 1;
} else if(request[0].equals("event")) { } else if(request[0].equals("event")) {
std::stringstream out; std::stringstream out;
subscription->process(request, out); subscription->process(request, out, session);
if(subscription->mode == "*ANYONE") { if(subscription->mode == "*ANYONE") {
subscription->event(out); subscription->event(out);
return 1; return 1;

View File

@ -23,7 +23,8 @@ namespace core {
private: private:
std::map<std::string, Subscription *> subscriptions; std::map<std::string, Subscription *> subscriptions;
std::mutex lock;
}; };
} }

View File

@ -70,9 +70,13 @@ namespace core {
void TCPServer::removeFromSessionList(TCPSession *session) { void TCPServer::removeFromSessionList(TCPSession *session) {
std::vector<TCPSession *>::iterator cursor; std::vector<TCPSession *>::iterator cursor;
lock.lock();
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session) if(*cursor == session) {
sessions.erase(cursor); sessions.erase(cursor);
break;
}
lock.unlock();
} }
void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) {
@ -96,26 +100,26 @@ namespace core {
} }
return 1; return 1;
} }
void TCPServer::sendToAll(std::stringstream &data) { void TCPServer::sendToAll(std::stringstream &data) {
for(auto session : sessions) for(auto session : sessions)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) {
for(auto session : sessions) for(auto session : sessions)
if(session != &sender) if(session != &sender)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) {
for(auto session : sessions) for(auto session : sessions)
if(filter.test(*session)) if(filter.test(*session))
if(session != &sender) if(session != &sender)
session->write(data.str()); session->write(data.str());
data.str(""); data.str("");
} }
} }

View File

@ -4,7 +4,6 @@ namespace core {
Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") {
setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); setDescriptor(timerfd_create(CLOCK_REALTIME, 0));
ePoll.registerSocket(this);
setTimer(delay); setTimer(delay);
} }

Binary file not shown.