#include "TCPServer.h" #include "EPoll.h" #include "TCPSession.h" #include "Exception.h" #include "Log.h" namespace core { TCPServer::TCPServer(EPoll &ePoll, IPAddress address, std::string delimiter, int depth, std::string text) : TCPSocket(ePoll, text), commands(delimiter, depth) { commands.add(subscriptions, "publish"); commands.add(subscriptions, "unpublish"); commands.add(subscriptions, "subscribe"); commands.add(subscriptions, "unsubscribe"); commands.add(subscriptions, "catalog"); commands.add(subscriptions, "event"); setDescriptor(socket(AF_INET, SOCK_STREAM, 0)); int yes = 1; setsockopt(getDescriptor(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if(bind(getDescriptor(), address.getPointer(), address.addressLength) < 0) throw coreutils::Exception("Error on bind to socket: " + std::to_string(errno)); if(listen(getDescriptor(), 20) < 0) throw coreutils::Exception("Error on listen to socket"); } TCPServer::~TCPServer() { coreutils::Log(coreutils::LOG_DEBUG_2) << "Closing server socket " << getDescriptor() << "."; close(getDescriptor()); } void TCPServer::onDataReceived(std::string data) { lock.lock(); TCPSession *session = accept(); if(session) sessions.push_back(session); lock.unlock(); } TCPSession * TCPServer::accept() { try { TCPSession *session = getSocketAccept(ePoll); session->setDescriptor(::accept(getDescriptor(), (struct sockaddr *)&session->ipAddress.addr, &session->ipAddress.addressLength)); // if(blackList && blackList->contains(session->ipAddress.getClientAddress())) { // session->shutdown(); // Log(LOG_WARN) << "Client at IP address " << session->ipAddress.getClientAddress() << " is blacklisted and was denied a connection."; // return NULL; // } // if(whiteList && !whiteList->contains(session->ipAddress.getClientAddress())) { // session->shutdown(); // Log(LOG_WARN) << "Client at IP address " << session->ipAddress.getClientAddress() << " is not authorized and was denied a connection."; // return NULL; // } return session; } catch(coreutils::Exception e) { coreutils::Log(coreutils::LOG_EXCEPT) << "Major error on session initialization. Error is '" << e.text << "'."; } catch(...) { coreutils::Log(coreutils::LOG_EXCEPT) << "Unnspecified error on session initialization."; } return NULL; } void TCPServer::removeFromSessionList(TCPSession *session) { std::vector::iterator cursor; lock.lock(); for(cursor = sessions.begin(); cursor < sessions.end(); ++cursor) if(*cursor == session) { sessions.erase(cursor); break; } lock.unlock(); } void TCPServer::sessionErrorHandler(std::string errorString, std::stringstream &out) { throw coreutils::Exception(errorString); } TCPSession * TCPServer::getSocketAccept(EPoll &ePoll) { return new TCPSession(ePoll, *this); } void TCPServer::output(std::stringstream &out) { out << "Use the 'help' command to list the commands for this server." << std::endl; } int TCPServer::processCommand(coreutils::ZString &request, TCPSession &session) { int sequence = 0; for(auto *sessionx : sessions) { session.out << "|" << ++sequence; sessionx->output(session.out); session.out << "|" << std::endl; } return 1; } void TCPServer::sendToAll(std::stringstream &data) { for(auto session : sessions) session->write(data.str(), NULL); data.str(""); } void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender) { for(auto session : sessions) if(session != &sender) session->write(data.str(), &sender); data.str(""); } void TCPServer::sendToAll(std::stringstream &data, TCPSession &sender, SessionFilter filter) { for(auto session : sessions) if(filter.test(*session)) if(session != &sender) session->write(data.str(), &sender); data.str(""); } }