A commit before I change everything on the the way events are handled.

This commit is contained in:
Brad Arant 2020-04-25 08:19:26 -07:00
parent 28292e924a
commit 796ddd511f
7 changed files with 65 additions and 41 deletions

View File

@ -80,7 +80,7 @@ namespace core {
return true; return true;
} }
bool EPoll::unregisterSocket(Socket *socket /**< The Socket to unregister. */) { bool EPoll::unregisterSocket(Socket *socket) {
lock.lock(); lock.lock();
disableSocket(socket); disableSocket(socket);
coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << "."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Unregistering socket " << socket->getDescriptor() << ".";
@ -96,9 +96,12 @@ namespace core {
lock.lock(); lock.lock();
std::map<int, Socket *>::iterator socket = sockets.find(event.data.fd); std::map<int, Socket *>::iterator socket = sockets.find(event.data.fd);
lock.unlock(); lock.unlock();
Socket *socket1 = (Socket *)event.data.ptr;
coreutils::Log(coreutils::LOG_DEBUG_3) << "Sockets: " << socket->second << ":" << socket1 << ".";
if(socket != sockets.end()) { if(socket != sockets.end()) {
if(socket->second->eventReceived(event, threadId)) socket->second->reset = true;
resetSocket(socket->second); socket->second->eventReceived(event, threadId);
// resetSocket(socket->second);
} }
else else
throw coreutils::Exception("Reference to socket that has no object."); throw coreutils::Exception("Reference to socket that has no object.");
@ -121,29 +124,38 @@ namespace core {
void EPoll::enableSocket(Socket *socket) { void EPoll::enableSocket(Socket *socket) {
struct epoll_event event; struct epoll_event event;
event.data.fd = socket->getDescriptor(); event.data.fd = socket->getDescriptor();
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite()) if(socket->needsToWrite())
event.events |= EPOLLWRNORM; event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event); epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event);
socket->active = true; socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events."; coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling socket " << socket->getDescriptor() << " for events.";
} }
void EPoll::disableSocket(Socket *socket) { void EPoll::disableSocket(Socket *socket) {
epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL);
socket->active = false; socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events."; coreutils::Log(coreutils::LOG_DEBUG_4) << "Disabling socket " << socket->getDescriptor() << " from events.";
} }
void EPoll::resetSocket(Socket *socket) { void EPoll::resetRead(Socket *socket) {
coreutils::Log(coreutils::LOG_DEBUG_4) << "ResetSocket " << socket;
struct epoll_event event; struct epoll_event event;
event.data.fd = socket->getDescriptor(); event.data.fd = socket->getDescriptor();
event.data.ptr = socket;
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
if(socket->needsToWrite())
event.events |= EPOLLWRNORM;
epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event); epoll_ctl(epfd, EPOLL_CTL_MOD, event.data.fd, &event);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for events."; socket->reset = false;
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for read.";
}
void EPoll::resetWrite(Socket *socket) {
struct epoll_event event;
event.data.fd = socket->getDescriptor();
event.data.ptr = socket;
event.events = EPOLLWRNORM | EPOLLONESHOT | EPOLLRDHUP | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, event.data.fd, &event);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Resetting socket " << socket->getDescriptor() << " for write.";
} }
} }

View File

@ -112,7 +112,8 @@ namespace core {
int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; ///<Output the threads array to the console. int processCommand(std::string command, TCPSession *session, std::stringstream &data) override; ///<Output the threads array to the console.
void resetSocket(Socket *socket); void resetRead(Socket *socket);
void resetWrite(Socket *socket);
private: private:

View File

@ -60,21 +60,24 @@ namespace core {
bool Socket::eventReceived(struct epoll_event event, int threadId) { bool Socket::eventReceived(struct epoll_event event, int threadId) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "eventReceived on " << descriptor << "; shutDown = " << shutDown << "; active = " << active << ";"; // coreutils::Log(coreutils::LOG_DEBUG_2) << "waiting for lock on socket " << descriptor << ".";
lock.lock(); // lock.lock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Obtained lock on socket " << descriptor << ".";
if(event.events & EPOLLRDHUP) { if(event.events & EPOLLRDHUP) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor;
readHangup = true; readHangup = true;
shutdown("hangup received"); shutdown("hangup received");
lock.unlock(); // lock.unlock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". RDHUP";
return false; return false;
} }
if(event.events & EPOLLIN) { if(event.events & EPOLLIN) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor;
receiveData(buffer, length); receiveData(buffer, length);
ePoll.resetRead(this);
} }
if(event.events & EPOLLWRNORM) { if(event.events & EPOLLWRNORM) {
@ -84,16 +87,16 @@ namespace core {
if(event.events & EPOLLHUP) { if(event.events & EPOLLHUP) {
coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor;
coreutils::Log(coreutils::LOG_DEBUG_1) << "end shutting down" << descriptor;
shutdown(); shutdown();
lock.unlock(); // lock.unlock();
// coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << ". HUP";
return false; return false;
} }
coreutils::Log(coreutils::LOG_DEBUG_1) << "end with active = " << active << " on socket " << descriptor; // lock.unlock();
lock.unlock(); // coreutils::Log(coreutils::LOG_DEBUG_2) << "Release lock on socket " << descriptor << " with reset " << reset << ".";
return active; return reset;
} }
void Socket::onDataReceived(std::string data) { void Socket::onDataReceived(std::string data) {
@ -113,7 +116,6 @@ namespace core {
int error = -1; int error = -1;
if((len = ::read(getDescriptor(), buffer, bufferLength)) >= 0) { if((len = ::read(getDescriptor(), buffer, bufferLength)) >= 0) {
// coreutils::Log(coreutils::LOG_DEBUG_4) << "data[" << std::string(buffer, bufferLength) << "]";
onDataReceived(buffer, len); onDataReceived(buffer, len);
} }
else { else {
@ -144,10 +146,8 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "]."; coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "].";
::write(descriptor, fifo.front().c_str(), fifo.front().length()); ::write(descriptor, fifo.front().c_str(), fifo.front().length());
fifo.pop(); fifo.pop();
coreutils::Log(coreutils::LOG_DEBUG_4) << "resetSocket from writeSocket."; // if(shutDown && !needsToWrite())
ePoll.resetSocket(this); // delete this;
if(shutDown && !needsToWrite())
delete this;
outlock.unlock(); outlock.unlock();
} }
} }
@ -157,8 +157,8 @@ namespace core {
outlock.lock(); outlock.lock();
fifo.emplace(data); fifo.emplace(data);
coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write."; coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write.";
ePoll.resetSocket(this);
outlock.unlock(); outlock.unlock();
ePoll.resetWrite(this);
return 1; return 1;
} }
@ -174,9 +174,8 @@ namespace core {
void Socket::shutdown(std::string text) { void Socket::shutdown(std::string text) {
coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << ".";
shutDown = true; shutDown = true;
active = false; reset = false;
if(!needsToWrite()) { if(!needsToWrite()) {
active = false;
delete this; delete this;
} }

View File

@ -110,7 +110,7 @@ namespace core {
bool needsToWrite(); bool needsToWrite();
bool active = false; bool reset = false;
protected: protected:

View File

@ -26,6 +26,8 @@ namespace core {
onConnected(); onConnected();
protocol(); protocol();
send(); send();
if(term)
shutdown("termination requested");
} }
void TCPSession::onConnected() {} void TCPSession::onConnected() {}
@ -47,9 +49,7 @@ namespace core {
lineBufferSize -= lineLength; lineBufferSize -= lineLength;
if(lineBufferSize > 0) if(lineBufferSize > 0)
memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize);
coreutils::Log(coreutils::LOG_DEBUG_3) << "lineBufferSize=" << lineBufferSize << "; lineLength=" << lineLength << ";";
lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); lineBuffer = (char *)realloc(lineBuffer, lineBufferSize);
// coreutils::Log(coreutils::LOG_DEBUG_3) << "lineBuffer=" << std::string(lineBuffer, lineBufferSize);
} }
} }
} }
@ -58,6 +58,8 @@ namespace core {
coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << line << "]"; coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << line << "]";
protocol(line); protocol(line);
send(); send();
if(term)
shutdown("termination requested");
} }
void TCPSession::sendToAll() { void TCPSession::sendToAll() {
@ -76,8 +78,13 @@ namespace core {
} }
void TCPSession::send() { void TCPSession::send() {
if(out.tellp() > 0)
write(out.str()); write(out.str());
out.str(""); out.str("");
} }
void TCPSession::terminate() {
term = true;
}
} }

View File

@ -58,6 +58,16 @@ namespace core {
void sendToAll(SessionFilter filter); void sendToAll(SessionFilter filter);
///
///
///
void terminate();
///
///
///
TCPServer &server; TCPServer &server;
protected: protected:
@ -106,6 +116,7 @@ namespace core {
char *lineBuffer = NULL; char *lineBuffer = NULL;
int lineBufferSize = 0; int lineBufferSize = 0;
std::mutex mtx; std::mutex mtx;
bool term = false;
}; };

View File

@ -60,17 +60,11 @@ namespace core {
} else if(rc > 0) { } else if(rc > 0) {
for(int ix = 0; ix < rc; ++ix) { for(int ix = 0; ix < rc; ++ix) {
++count; ++count;
// std::cout << "Event " << events[ix].events << " on socket " << events[ix].data.fd << " on thread " << getThreadId() << ": ";
// std::cout << ((events[ix].events & EPOLLIN) ? "EPOLLIN ": "");
// std::cout << ((events[ix].events & EPOLLWRNORM) ? "EPOLLWRNORM ": "");
// std::cout << ((events[ix].events & EPOLLRDHUP) ? "EPOLLRDHUP ": "");
// std::cout << ((events[ix].events & EPOLLHUP) ? "EPOLLHUP ": "");
// std::cout << "." << std::endl;
ePoll.eventReceived(events[ix], threadId); ePoll.eventReceived(events[ix], threadId);
} }
} }
} }
coreutils::Log(coreutils::LOG_DEBUG_1) << "Thread ending with thread id " << threadId << ".";
} }
} }