#include "EPoll.h" #include "Socket.h" #include "Exception.h" #include "Log.h" namespace core { Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "BMASocket object created [" << text << "]."; buffer = (char *)malloc(4096); length = 4096; } Socket::~Socket() { free(buffer); if(descriptor == -1) return; onUnregister(); ePoll.unregisterSocket(this); coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; close(descriptor); } void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { shutdown("Too many files open"); coreutils::Exception("Too many files open. Refusing connection.");; } lock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; if(descriptor < 3) throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); this->descriptor = descriptor; onRegister(); ePoll.registerSocket(this); onRegistered(); lock.unlock(); } int Socket::getDescriptor() { return descriptor; } void Socket::setBufferSize(int length) { this->length = length; buffer = (char *)realloc(buffer, length); } int Socket::getBufferSize() { return length; } void Socket::onRegister() {} void Socket::onRegistered() {} void Socket::onUnregister() {} void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event, int threadId) { coreutils::Log(coreutils::LOG_DEBUG_1) << "eventReceived on " << descriptor << "; shutDown = " << shutDown << "; active = " << active << ";"; lock.lock(); if(event.events & EPOLLRDHUP) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLRDHUP " << descriptor; readHangup = true; shutdown("hangup received"); lock.unlock(); return false; } if(event.events & EPOLLIN) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLIN " << descriptor; receiveData(buffer, length); } if(event.events & EPOLLWRNORM) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLOUT " << descriptor; writeSocket(); } if(event.events & EPOLLHUP) { coreutils::Log(coreutils::LOG_DEBUG_1) << "start EPOLLHUP " << descriptor; coreutils::Log(coreutils::LOG_DEBUG_1) << "end shutting down" << descriptor; shutdown(); lock.unlock(); return false; } coreutils::Log(coreutils::LOG_DEBUG_1) << "end with active = " << active << " on socket " << descriptor; lock.unlock(); return active; } void Socket::onDataReceived(std::string data) { throw coreutils::Exception("Need to override onDataReceived.", __FILE__, __LINE__, -1); } void Socket::onDataReceived(char *buffer, int len) { onDataReceived(std::string(buffer, len)); } void Socket::receiveData(char *buffer, int bufferLength) { if(bufferLength <= 0) throw coreutils::Exception("Request to receive data with a zero buffer length.", __FILE__, __LINE__, -1); int len; int error = -1; if((len = ::read(getDescriptor(), buffer, bufferLength)) >= 0) { // coreutils::Log(coreutils::LOG_DEBUG_4) << "data[" << std::string(buffer, bufferLength) << "]"; onDataReceived(buffer, len); } else { error = errno; switch (error) { // When a listening socket receives a connection // request we get one of these. // case ENOTCONN: onDataReceived(std::string(buffer, 0)); break; case ECONNRESET: break; default: throw coreutils::Exception("Error in read of data from socket.", __FILE__, __LINE__, error); } } } void Socket::writeSocket() { if(shutDown) return; if(fifo.size() > 0) { outlock.lock(); ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); coreutils::Log(coreutils::LOG_DEBUG_4) << "resetSocket from writeSocket."; if(active) ePoll.resetSocket(this); outlock.unlock(); } } int Socket::write(std::string data) { if(!active) return -1; outlock.lock(); fifo.emplace(data); coreutils::Log(coreutils::LOG_DEBUG_4) << "resetSocket from write. active is " << active; if(active) ePoll.resetSocket(this); outlock.unlock(); return 1; } void Socket::output(std::stringstream &out) { out << "|" << descriptor << "|"; } bool Socket::needsToWrite() { return fifo.size() > 0; } void Socket::shutdown(std::string text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; active = false; delete this; } }