#include "EPoll.h" #include "Socket.h" #include "Exception.h" #include "Log.h" namespace core { Socket::Socket(EPoll &ePoll, std::string text) : ePoll(ePoll), text(text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "BMASocket object created [" << text << "]."; buffer = (char *)malloc(4096); length = 4096; } Socket::~Socket() { free(buffer); if(descriptor == -1) return; onUnregister(); ePoll.unregisterSocket(this); coreutils::Log(coreutils::LOG_DEBUG_3) << "Socket destroyed for socket " << descriptor << "."; close(descriptor); } void Socket::setDescriptor(int descriptor) { if((descriptor == -1) && (errno == 24)) { shutdown("Too many files open"); coreutils::Exception("Too many files open. Refusing connection.");; } lock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Descriptor set to " << descriptor << " for Socket."; if(descriptor < 3) throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); this->descriptor = descriptor; onRegister(); ePoll.registerSocket(this); onRegistered(); lock.unlock(); } int Socket::getDescriptor() { return descriptor; } void Socket::setBufferSize(int length) { this->length = length; buffer = (char *)realloc(buffer, length); } int Socket::getBufferSize() { return length; } void Socket::onRegister() {} void Socket::onRegistered() {} void Socket::onUnregister() {} void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event) { lock.lock(); if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); lock.unlock(); return false; } if(event.events & EPOLLIN) { receiveData(buffer); } if(event.events & EPOLLWRNORM) { writeSocket(); } if(event.events & EPOLLHUP) { shutdown(); lock.unlock(); return false; } lock.unlock(); reset = true; return reset; } void Socket::onDataReceived(std::string data) { throw coreutils::Exception("Need to override onDataReceived.", __FILE__, __LINE__, -1); } void Socket::onDataReceived(coreutils::ZString data) { onDataReceived(std::string(data.getData(), data.getLength())); } void Socket::receiveData(coreutils::ZString buffer) { if(buffer.getLength() <= 0) throw coreutils::Exception("Request to receive data with a zero buffer length.", __FILE__, __LINE__, -1); int len; int error = -1; if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { onDataReceived(buffer); } else { error = errno; switch (error) { // When a listening socket receives a connection // request we get one of these. // case ENOTCONN: onDataReceived(std::string(buffer.getData(), 0)); break; case ECONNRESET: break; default: throw coreutils::Exception("Error in read of data from socket.", __FILE__, __LINE__, error); } } } void Socket::writeSocket() { if(fifo.size() > 0) { outlock.lock(); coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " [" << fifo.front() << "]."; ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); if(shutDown && !needsToWrite()) delete this; outlock.unlock(); } } int Socket::write(std::string data) { coreutils::Log(coreutils::LOG_DEBUG_3) << "Writing data to socket " << getDescriptor() << " buffer [" << data << "]."; outlock.lock(); fifo.emplace(data); coreutils::Log(coreutils::LOG_DEBUG_4) << "Enabling write on socket " << getDescriptor() << " with " << fifo.size() << " entries to write."; outlock.unlock(); ePoll.resetSocket(this); return 1; } void Socket::output(std::stringstream &out) { out << "|" << descriptor << "|"; } bool Socket::needsToWrite() { coreutils::Log(coreutils::LOG_DEBUG_4) << "Socket " << getDescriptor() << " needs to write is " << (fifo.size() > 0) << "."; return fifo.size() > 0; } void Socket::shutdown(std::string text) { coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; shutDown = true; reset = false; if(!needsToWrite()) { delete this; } } }