Cleaned up socket scheduling. Very clean now.

This commit is contained in:
Brad Arant 2022-03-15 21:02:35 +00:00
parent 4017ee9175
commit 3a6883e77f
8 changed files with 34 additions and 45 deletions

View File

@ -29,7 +29,6 @@ namespace core {
shutdown("Too many files open"); shutdown("Too many files open");
throw coreutils::Exception("Too many files open. Refusing connection."); throw coreutils::Exception("Too many files open. Refusing connection.");
} }
lock.lock();
coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket.";
if(descriptor < 3) if(descriptor < 3)
throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__);
@ -37,7 +36,6 @@ namespace core {
onRegister(); onRegister();
ePoll.registerSocket(this); ePoll.registerSocket(this);
onRegistered(); onRegistered();
lock.unlock();
} }
int Socket::getDescriptor() { int Socket::getDescriptor() {
@ -63,29 +61,20 @@ namespace core {
void Socket::onUnregistered() {} void Socket::onUnregistered() {}
bool Socket::eventReceived(struct epoll_event event) { bool Socket::eventReceived(struct epoll_event event) {
lock.lock();
if(event.events & EPOLLRDHUP) { if(event.events & EPOLLRDHUP) {
readHangup = true; readHangup = true;
shutdown("hangup received"); shutdown("hangup received");
} }
else if(event.events & EPOLLIN) {
if(event.events & EPOLLIN) {
coreutils::ZString zbuffer(buffer, length); coreutils::ZString zbuffer(buffer, length);
receiveData(zbuffer); receiveData(zbuffer);
} }
else if(event.events & EPOLLWRNORM) {
if(event.events & EPOLLWRNORM) {
writeSocket(); writeSocket();
} }
else if(event.events & EPOLLHUP) {
if(event.events & EPOLLHUP) {
shutdown(); shutdown();
} }
lock.unlock();
return !shutDown; return !shutDown;
} }
@ -107,9 +96,6 @@ namespace core {
int len; int len;
int error = -1; int error = -1;
// for(int ix = 0; ix < buffer.getLength(); ++ix)
// buffer[ix] = 0;
if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) {
coreutils::ZString zbuffer(buffer.getData(), len); coreutils::ZString zbuffer(buffer.getData(), len);
coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer;
@ -140,10 +126,9 @@ namespace core {
void Socket::writeSocket() { void Socket::writeSocket() {
if(fifo.size() > 0) { if(fifo.size() > 0) {
outlock.lock(); outlock.lock();
if(!shutDown)
::write(descriptor, fifo.front().c_str(), fifo.front().length()); ::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop(); fifo.pop();
// if(shutDown && !needsToWrite())
// delete this;
outlock.unlock(); outlock.unlock();
} }
} }
@ -152,7 +137,7 @@ namespace core {
outlock.lock(); outlock.lock();
fifo.emplace(data); fifo.emplace(data);
outlock.unlock(); outlock.unlock();
ePoll.resetSocket(this); // ePoll.resetSocket(this);
return 1; return 1;
} }
@ -168,8 +153,6 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << ".";
shutDown = true; shutDown = true;
reset = false; reset = false;
// if(!needsToWrite())
// delete this;
} }
} }

View File

@ -167,7 +167,7 @@ namespace core {
std::string text; std::string text;
int descriptor = -1; int descriptor = -1;
std::mutex lock; // std::mutex lock;
std::mutex outlock; std::mutex outlock;
bool readHangup = false; bool readHangup = false;

View File

@ -14,7 +14,6 @@ namespace core
Subscription::~Subscription() Subscription::~Subscription()
{ {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Subscription destructor....";
std::stringstream out; std::stringstream out;
out << "cancel:" << id << std::endl; out << "cancel:" << id << std::endl;
for (auto subscriber : subscribers) for (auto subscriber : subscribers)

View File

@ -7,7 +7,9 @@ namespace core {
SubscriptionManager::SubscriptionManager() {} SubscriptionManager::SubscriptionManager() {}
int SubscriptionManager::add(Subscription &subscription) { int SubscriptionManager::add(Subscription &subscription) {
lock.lock();
subscriptions.insert(std::make_pair(subscription.id, &subscription)); subscriptions.insert(std::make_pair(subscription.id, &subscription));
lock.unlock();
return 1; return 1;
} }
@ -15,6 +17,7 @@ namespace core {
int countSubscribed = 0; int countSubscribed = 0;
int countPublished = 0; int countPublished = 0;
lock.lock();
std::string temp = ""; std::string temp = "";
for(auto [key, subscription] : subscriptions) { for(auto [key, subscription] : subscriptions) {
if(temp != "") { if(temp != "") {
@ -34,6 +37,7 @@ namespace core {
} }
coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s).";
coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session.";
lock.unlock();
return countSubscribed; return countSubscribed;
} }

View File

@ -23,6 +23,7 @@ namespace core {
private: private:
std::map<std::string, Subscription *> subscriptions; std::map<std::string, Subscription *> subscriptions;
std::mutex lock;
}; };
} }

View File

@ -72,8 +72,10 @@ namespace core {
std::vector<TCPSession *>::iterator cursor; std::vector<TCPSession *>::iterator cursor;
lock.lock(); lock.lock();
for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor)
if(*cursor == session) if(*cursor == session) {
sessions.erase(cursor); sessions.erase(cursor);
break;
}
lock.unlock(); lock.unlock();
} }

View File

@ -4,7 +4,7 @@ namespace core {
Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") {
setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); setDescriptor(timerfd_create(CLOCK_REALTIME, 0));
ePoll.registerSocket(this); // ePoll.registerSocket(this);
setTimer(delay); setTimer(delay);
} }