diff --git a/CommandList.cpp b/CommandList.cpp index a999c72..919836e 100644 --- a/CommandList.cpp +++ b/CommandList.cpp @@ -2,18 +2,19 @@ #include "Log.h" namespace core { - + CommandList::CommandList(std::string delimiter, int depth) : delimiter(delimiter), depth(depth) {} - + void CommandList::add(Command &command, std::string name) { commands.insert(std::make_pair(name, &command)); } - + void CommandList::remove(Command &command) {} - + int CommandList::processRequest(coreutils::ZString &request, TCPSession &session) { - if(session.grab != NULL) - return session.grab->processCommand(request, session); + if(session.grab != NULL) { + return session.grab->processCommand(request, session); + } else { if(request.equals("")) return 0; @@ -29,20 +30,20 @@ namespace core { } return true; } - + bool CommandList::grabInput(TCPSession &session, Command &command) { session.grab = &command; return true; } - + void CommandList::clearGrab(TCPSession &session) { session.grab = NULL; } - + int CommandList::processCommand(coreutils::ZString &request, TCPSession &session) { // for(Command *command : commands) // session.out << command->getName() << std::endl; return true; } - + } diff --git a/EPoll.cpp b/EPoll.cpp index 9f669e9..9a6420a 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -96,19 +96,22 @@ namespace core { event.data.ptr = socket; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); +// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXenable: " << socket->getDescriptor(); } - + void EPoll::disableSocket(Socket *socket) { epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); +// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXdisable: " << socket->getDescriptor(); } - + void EPoll::resetSocket(Socket *socket) { struct epoll_event event; event.data.ptr = socket; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - if(socket->needsToWrite()) + if(socket->needsToWrite() && !socket->shutDown) event.events |= EPOLLWRNORM; epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); +// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXreset: " << socket->getDescriptor(); } - + } diff --git a/EPoll.h b/EPoll.h index dcd389b..3db2031 100644 --- a/EPoll.h +++ b/EPoll.h @@ -32,6 +32,8 @@ namespace core { public: + volatile long long eventId = 0; + /// /// The constructor for the BMAEPoll object. /// @@ -41,9 +43,9 @@ namespace core { /// /// The destructor for the BMAEPoll object. /// - + ~EPoll(); - + /// /// Use the start() method to initiate the threads and begin epoll queue processing. /// @@ -109,22 +111,22 @@ namespace core { /// /// @param session the session to write the requested data to. /// - + int processCommand(coreutils::ZString &request, TCPSession &session) override; /// threads; - volatile bool terminateThreads; + int numberOfThreads; + std::vector threads; + volatile bool terminateThreads; void enableSocket(Socket *socket); void disableSocket(Socket *socket); - + }; - + } #endif diff --git a/INotify.cpp b/INotify.cpp index da82e79..010435c 100644 --- a/INotify.cpp +++ b/INotify.cpp @@ -1,17 +1,18 @@ #include "INotify.h" #include "Log.h" +#include "ZString.h" namespace core { - + INotify::INotify(EPoll &ePoll) : Socket(ePoll, "INotify") { - setDescriptor(inotify_init()); - } - + setDescriptor(inotify_init()); + } + INotify::~INotify() { shutdown(); } - - int INotify::addWatch(std::string watch) { + + int INotify::addWatch(coreutils::ZString &watch) { return inotify_add_watch(getDescriptor(), watch.c_str(), IN_ALL_EVENTS); } @@ -22,37 +23,37 @@ namespace core { void INotify::onDataReceived(coreutils::ZString &buffer) { const struct inotify_event *event; char *ptr; - for (ptr = buffer.getData(); ptr < buffer.getData() + buffer.getLength(); + for (ptr = buffer.getData(); + ptr < buffer.getData() + buffer.getLength(); ptr += sizeof(struct inotify_event) + event->len) { event = (const struct inotify_event *) ptr; - - if(event->mask & IN_ACCESS) - inAccess(std::string(event->name)); - if(event->mask & IN_ATTRIB) + coreutils::ZString name(event->name); + + if(event->mask & IN_ACCESS) + inAccess(name); + if(event->mask & IN_ATTRIB) inAttrib(std::string(event->name)); - if(event->mask & IN_CLOSE_WRITE) + if(event->mask & IN_CLOSE_WRITE) inCloseWrite(std::string(event->name)); - if(event->mask & IN_CLOSE_NOWRITE) + if(event->mask & IN_CLOSE_NOWRITE) inCloseNoWrite(std::string(event->name)); - if(event->mask & IN_CREATE) - inCreate(std::string(event->name)); - if(event->mask & IN_DELETE) + if(event->mask & IN_CREATE) + inCreate(name); + if(event->mask & IN_DELETE) inDelete(std::string(event->name)); - if(event->mask & IN_DELETE_SELF) + if(event->mask & IN_DELETE_SELF) inDeleteSelf(std::string(event->name)); - if(event->mask & IN_MODIFY) + if(event->mask & IN_MODIFY) inModify(std::string(event->name)); - if(event->mask & IN_MOVE_SELF) + if(event->mask & IN_MOVE_SELF) inMoveSelf(std::string(event->name)); - if(event->mask & IN_MOVED_FROM) + if(event->mask & IN_MOVED_FROM) inMovedFrom(std::string(event->name)); - if(event->mask & IN_MOVED_TO) + if(event->mask & IN_MOVED_TO) inMovedTo(std::string(event->name)); - if(event->mask & IN_OPEN) + if(event->mask & IN_OPEN) inOpen(std::string(event->name)); - - } + } } - - + } diff --git a/INotify.h b/INotify.h index b63abf9..4961710 100644 --- a/INotify.h +++ b/INotify.h @@ -5,33 +5,33 @@ #include "Socket.h" namespace core { - + class INotify : Socket { - + public: INotify(EPoll &ePoll); ~INotify(); - - int addWatch(std::string watch); + + int addWatch(coreutils::ZString &watch); void removeWatch(int wd); - void onDataReceived(coreutils::ZString &data) override; + void onDataReceived(coreutils::ZString &data) override; - virtual void inAccess(std::string name) {} - virtual void inAttrib(std::string name) {} + virtual void inAccess(coreutils::ZString name) {} + virtual void inAttrib(std::string name) {} virtual void inCloseWrite(std::string name) {} virtual void inCloseNoWrite(std::string name) {} - virtual void inCreate(std::string name) {} - virtual void inDelete(std::string name) {} - virtual void inDeleteSelf(std::string name) {} - virtual void inModify(std::string name) {} - virtual void inMoveSelf(std::string name) {} - virtual void inMovedFrom(std::string name) {} - virtual void inMovedTo(std::string name) {} - virtual void inOpen(std::string name) {} - + virtual void inCreate(coreutils::ZString &name) {} + virtual void inDelete(std::string name) {} + virtual void inDeleteSelf(std::string name) {} + virtual void inModify(std::string name) {} + virtual void inMoveSelf(std::string name) {} + virtual void inMovedFrom(std::string name) {} + virtual void inMovedTo(std::string name) {} + virtual void inOpen(std::string name) {} + }; - + } #endif diff --git a/IPAddress.cpp b/IPAddress.cpp index c4f5e69..c1cb39b 100644 --- a/IPAddress.cpp +++ b/IPAddress.cpp @@ -1,7 +1,7 @@ #include "IPAddress.h" namespace core { - + IPAddress::IPAddress() { addressLength = sizeof(addr); } @@ -14,7 +14,7 @@ namespace core { convert >> port; IPAddress(url, port); } - + IPAddress::IPAddress(std::string address, int port) { memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -25,7 +25,7 @@ namespace core { } IPAddress::~IPAddress() { - + } struct sockaddr * IPAddress::getPointer() { @@ -36,16 +36,16 @@ namespace core { std::string result; return result; } - + std::string IPAddress::getClientAddressAndPort() { std::stringstream out; out << inet_ntoa(addr.sin_addr); out << ":" << addr.sin_port; return out.str(); } - + int IPAddress::getClientPort() { - int result = -1; + int result = -1; return result; } diff --git a/IPAddress.h b/IPAddress.h index 4beca82..d1029f7 100644 --- a/IPAddress.h +++ b/IPAddress.h @@ -7,20 +7,20 @@ namespace core { class IPAddress : public Object { - + public: IPAddress(); IPAddress(std::string address); IPAddress(std::string address, int port); ~IPAddress(); - - struct sockaddr_in addr; + + struct sockaddr_in addr; socklen_t addressLength; struct sockaddr * getPointer(); std::string getClientAddress(); ///descriptor = descriptor; @@ -45,7 +48,6 @@ namespace core { void Socket::setBufferSize(int length) { this->length = length; buffer = (char *)realloc(buffer, length); - } int Socket::getBufferSize() { @@ -60,20 +62,21 @@ namespace core { void Socket::onUnregistered() {} - bool Socket::eventReceived(struct epoll_event event) { + bool Socket::eventReceived(struct epoll_event event, long long eventId) { +// coreutils::Log(coreutils::LOG_DEBUG_3) << "eventReceived " << eventId << ": " << descriptor << ":" << event.events; inHandler = true; if(event.events & EPOLLRDHUP) { readHangup = true; shutdown("hangup received"); } - else if(event.events & EPOLLIN) { + if(event.events & EPOLLIN) { coreutils::ZString zbuffer(buffer, length); receiveData(zbuffer); } - else if(event.events & EPOLLWRNORM) { + if(event.events & EPOLLWRNORM) { writeSocket(); } - else if(event.events & EPOLLHUP) { + if(event.events & EPOLLHUP) { shutdown(); } inHandler = false; @@ -89,15 +92,11 @@ namespace core { } void Socket::receiveData(coreutils::ZString &buffer) { - coreutils::ZString blank(""); - if(buffer.getLength() <= 0) throw coreutils::Exception("Request to receive data with a zero buffer length.", __FILE__, __LINE__, -1); - int len; int error = -1; - if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; @@ -129,7 +128,7 @@ namespace core { if(fifo.size() > 0) { outlock.lock(); if(!shutDown) - ::write(descriptor, fifo.front().c_str(), fifo.front().length()); + int rc = ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); outlock.unlock(); } @@ -139,8 +138,9 @@ namespace core { outlock.lock(); fifo.emplace(data); outlock.unlock(); - if(!inHandler) - ePoll.resetSocket(this); +// coreutils::Log(coreutils::LOG_DEBUG_4) << "inHandler " << descriptor << " " << inHandler << ":" << shutDown << ":[" << data << "]"; + if(!inHandler) + ePoll.resetSocket(this); return 1; } @@ -153,7 +153,7 @@ namespace core { } void Socket::shutdown(std::string text) { - coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested on socket " << descriptor << " with reason " << text << "."; + coreutils::Log(coreutils::LOG_DEBUG_2) << "Shutdown requested from " << this->text << " (" << descriptor << ") with reason: " << text << "."; shutDown = true; reset = false; } diff --git a/Socket.h b/Socket.h index d0049c0..bf70108 100644 --- a/Socket.h +++ b/Socket.h @@ -60,39 +60,39 @@ namespace core { /// /// setDescriptor establishes the file descriptor for the socket and registers the socket - /// on the EPoll controller. setDescriptor will invoke the onRegister() event. + /// on the EPoll controller. setDescriptor will invoke the onRegister() event. /// - + void setDescriptor(int descriptor); /// fifo; - + }; } diff --git a/SubscriptionManager.cpp b/SubscriptionManager.cpp index 44c2468..48fb233 100644 --- a/SubscriptionManager.cpp +++ b/SubscriptionManager.cpp @@ -5,19 +5,19 @@ namespace core { SubscriptionManager::SubscriptionManager() {} - + int SubscriptionManager::add(Subscription &subscription) { lock.lock(); subscriptions.insert(std::make_pair(subscription.id, &subscription)); lock.unlock(); return 1; } - + int SubscriptionManager::removeSessionSubscriptions(TCPSession &session) { int countSubscribed = 0; int countPublished = 0; - lock.lock(); + lock.lock(); std::string temp = ""; for(auto [key, subscription] : subscriptions) { if(temp != "") { @@ -29,14 +29,14 @@ namespace core { temp = key; delete subscription; ++countPublished; - } + } } if(temp != "") { subscriptions.erase(temp); temp = ""; } - coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; - coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; +// coreutils::Log(coreutils::LOG_DEBUG_2) << "Removed session from " << countSubscribed << " subscription(s)."; +// coreutils::Log(coreutils::LOG_DEBUG_2) << "Cancelled " << countPublished << " channel(s) for session."; lock.unlock(); return countSubscribed; } diff --git a/TCPSession.cpp b/TCPSession.cpp index 67c25ae..d9fe0db 100644 --- a/TCPSession.cpp +++ b/TCPSession.cpp @@ -6,35 +6,32 @@ namespace core { TCPSession::TCPSession(EPoll &ePoll, TCPServer &server, std::string text) : TCPSocket(ePoll, text), server(server) {} - + TCPSession::~TCPSession() { server.removeFromSessionList(this); server.subscriptions.removeSessionSubscriptions(*this); } - + void TCPSession::output(std::stringstream &data) { data << "|" << ipAddress.getClientAddressAndPort(); } - + void TCPSession::protocol(coreutils::ZString &data) { - if(data.getLength() != 0) { - if(server.commands.processRequest(data, *this) == 0) { - coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << data.str(); - } - } + if(server.commands.processRequest(data, *this) == 0) + coreutils::Log(coreutils::LOG_DEBUG_1) << "Received data could not be parsed: " << getDescriptor() << " [" << data.str() << "]"; } - + void TCPSession::onRegistered() { onConnected(); - coreutils::ZString blank(""); - protocol(blank); +// coreutils::ZString blank(""); +// protocol(blank); send(); if(term) shutdown("termination requested"); } - + void TCPSession::onConnected() {} - + void TCPSession::onDataReceived(coreutils::ZString &data) { if(data.getLength() > 0) { lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); @@ -66,32 +63,32 @@ namespace core { } } } - + void TCPSession::setBlockSize(int blockSize) { this->blockSize = blockSize; } - + void TCPSession::onLineReceived(coreutils::ZString &line) { protocol(line); send(); if(term) shutdown("termination requested"); } - + void TCPSession::onBlockReceived(coreutils::ZString &block) { coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]"; if(term) shutdown("termination requested"); } - + void TCPSession::send() { if(out.tellp() > 0) write(out.str()); out.str(""); } - + void TCPSession::terminate() { term = true; } - + } diff --git a/TCPSession2.cpp b/TCPSession2.cpp new file mode 100644 index 0000000..af272aa --- /dev/null +++ b/TCPSession2.cpp @@ -0,0 +1,85 @@ +#include "TCPSession2.h" +#include "Exception.h" +#include "Log.h" + +namespace core { + + TCPSession2::TCPSession2(EPoll &ePoll, std::string text) : TCPSocket(ePoll, text) {} + + TCPSession2::~TCPSession2() {} + + void TCPSession2::output(std::stringstream &data) { + data << "|" << ipAddress.getClientAddressAndPort(); + } + + void TCPSession2::protocol(coreutils::ZString &data) {} + + void TCPSession2::onRegistered() { + onConnected(); + send(); + if(term) + TCPSocket::shutdown("termination requested"); + } + + void TCPSession2::onConnected() {} + + void TCPSession2::onDataReceived(coreutils::ZString &data) { + if(data.getLength() > 0) { + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize + data.getLength()); + memcpy(lineBuffer + lineBufferSize, data.getData(), data.getLength()); + lineBufferSize += data.getLength(); + while(lineBufferSize > 0) { + if(blockSize == 0) { + lineLength = strcspn(lineBuffer, "\r\n"); + if(lineLength == lineBufferSize) + break; + coreutils::ZString zLine(lineBuffer, lineLength); + onLineReceived(zLine); + if(lineBuffer[lineLength] == '\r') + ++lineLength; + if(lineBuffer[lineLength] == '\n') + ++lineLength; + lineBufferSize -= lineLength; + if(lineBufferSize > 0) + memmove(lineBuffer, lineBuffer + lineLength, lineBufferSize); + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); + } else if(lineBufferSize >= blockLength) { + coreutils::ZString zBlock(lineBuffer, blockLength); + onBlockReceived(zBlock); + lineBufferSize -= blockLength; + if(lineBufferSize > 0) + memmove(lineBuffer, lineBuffer + blockLength, lineBufferSize); + lineBuffer = (char *)realloc(lineBuffer, lineBufferSize); + } + } + } + } + + void TCPSession2::setBlockSize(int blockSize) { + this->blockSize = blockSize; + } + + void TCPSession2::onLineReceived(coreutils::ZString &line) { + protocol(line); + send(); + if(term) + TCPSocket::shutdown("termination requested"); + } + + void TCPSession2::onBlockReceived(coreutils::ZString &block) { + coreutils::Log(coreutils::LOG_DEBUG_3) << "[" << block.getLength() << "]"; + if(term) + TCPSocket::shutdown("termination requested"); + } + + void TCPSession2::send() { + if(out.tellp() > 0) + TCPSocket::write(out.str()); + out.str(""); + } + + void TCPSession2::terminate() { + term = true; + } + +} diff --git a/TCPSession2.h b/TCPSession2.h new file mode 100644 index 0000000..8ad4a20 --- /dev/null +++ b/TCPSession2.h @@ -0,0 +1,139 @@ +#ifndef __TCPSession2_h__ +# define __TCPSession2_h__ + +#include "TCPSocket.h" +#include "Timer.h" +#include "SessionFilter.h" + +namespace core { + + class Command; + class TCPServer; + + /// + /// TCPSession2 + /// + /// TCPSession defines the nature of the interaction with the client + /// and stores persistent data for a defined session. TCPSession objects + /// are not sockets but instead provide a communications control + /// mechanism. Protocol conversations are provided through extensions + /// from this object. + /// + /// TCPSession2 is designed to be 'connected' instead of being served + /// by a server. + /// + + class TCPSession2 : public TCPSocket { + + public: + + /// + /// + /// + + TCPSession2(EPoll &ePoll, std::string text = ""); + + /// + /// + /// + + virtual ~TCPSession2(); + + Command *grab = NULL; + + virtual void output(std::stringstream &data); + + /// + /// The send method is used to output the contents of the out stream + /// to the session containing the stream. + /// + + void send(); + + /// + /// Use this method to terminate this TCPSession. + /// + + void terminate(); + + /// + /// Use out to send data to the session socket or other session sockets. + /// + + std::stringstream out; + + protected: + + /// + /// + /// + + virtual void onRegistered() override; + + /// + /// Override this method to receive data directly from the socket as data is + /// received. If you need data split by line termination characters then + /// override the onLineReceived method instead. + /// + virtual void onDataReceived(coreutils::ZString &data) override; + + /// + /// Override the onLineReceived method to receive a string of characters that + /// represents a single line of data terminated by a LF or CRLF. If onDataReceived + /// was overriden this method will not be called unless the onDataReceived calls + /// this method explicitly using the class and member name. + /// + + virtual void onLineReceived(coreutils::ZString &line); + + /// + /// Override the onBlockReceived method to receive a string of characters that + /// represents a single block of data of length determined by the block length value. If + /// onDataReceived was overriden this method will not be called unless the onDataReceived + /// calls this method explicitly using the class and member name. + /// + + virtual void onBlockReceived(coreutils::ZString &block); + + /// + /// This method is called from within the protocol method when protocol is called + /// on the initial connection where the data is an empty string. Use this method + /// to deliver a message to the connection upon connection. + /// + + virtual void onConnected(); + + /// + /// Override the protocol method to manage and control the session communications + /// in your inherited session. If you do not override this method then the Session + /// default will process the 'commands' added to the server object using the + /// processRequest method on the session input. + /// + /// When data is received within the session two modes are available to pass the + /// data through the protocol method: LINE or BLOCK. + /// + + virtual void protocol(coreutils::ZString &data); + + /// + /// Use setBlockSize to set the amount of data that should be read at once from the + /// session data buffer. + /// If this value is set to 0 then the data will be retrieved + /// + + void setBlockSize(int size = 0); + + private: + char *lineBuffer = NULL; + int lineBufferSize = 0; + int lineLength = 0; + int blockLength = 0; + std::mutex mtx; + bool term = false; + int blockSize = 0; + + }; + +} + +#endif diff --git a/TCPSocket.cpp b/TCPSocket.cpp index 3345032..20afa4a 100644 --- a/TCPSocket.cpp +++ b/TCPSocket.cpp @@ -2,6 +2,7 @@ #include "EPoll.h" #include "Log.h" #include "Exception.h" +#include "errno.h" namespace core { @@ -14,7 +15,8 @@ namespace core { void TCPSocket::connect(IPAddress &address) { setDescriptor(socket(AF_INET, SOCK_STREAM, 0)); if(::connect(getDescriptor(), (struct sockaddr *)&address.addr, address.addressLength) == -1) - throw coreutils::Exception("Error on connect to TCP socket."); + throw coreutils::Exception("Error on connect to TCP socket." + errno); + coreutils::Log(coreutils::LOG_DEBUG_3) << "Connected to IP..." << address.getClientAddressAndPort(); } void TCPSocket::output(std::stringstream &out) { diff --git a/Thread.cpp b/Thread.cpp index d942b51..cea1074 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -60,9 +60,12 @@ namespace core { } else if(rc > 0) { for(int ix = 0; ix < rc; ++ix) { ++count; - if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix])) { + if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix], ++ePoll.eventId)) { +// coreutils::Log(coreutils::LOG_DEBUG_4) << "return true"; ePoll.resetSocket((Socket *)events[ix].data.ptr); - } else { + } else { + ((Socket *)events[ix].data.ptr)->shutDown = true; +// coreutils::Log(coreutils::LOG_DEBUG_4) << "return false"; delete (Socket *)events[ix].data.ptr; } } diff --git a/Thread.h b/Thread.h index 023123a..41baa8e 100644 --- a/Thread.h +++ b/Thread.h @@ -10,7 +10,7 @@ namespace core { class EPoll; - + /// /// Thread /// @@ -18,37 +18,37 @@ namespace core { /// object to allow maintaining a status value for monitoring the thread activity. EPoll will instantiate /// a Thread object for each thread specified in the EPoll's start method. /// - + class Thread : public Object { - - public: + + public: Thread(EPoll &ePoll); Thread(EPoll &ePoll, ThreadScope *thread); ~Thread(); - + /// /// Start the thread object. This will cause the epoll scheduler to commence reading the epoll queue. /// - + void start(); void join(); - std::string getStatus(); + std::string getStatus(); pid_t getThreadId(); int getCount(); - void output(std::stringstream &data); - + void output(std::stringstream &data); + private: EPoll &ePoll; // The EPoll control object. std::string status; - int count; + int count; std::thread *_thread; void print_thread_start_log(); pid_t threadId; - void run(); + void run(); ThreadScope *thread; - + }; - + } #endif diff --git a/Timer.cpp b/Timer.cpp index 8f74c8d..9b3d536 100644 --- a/Timer.cpp +++ b/Timer.cpp @@ -1,61 +1,60 @@ #include "Timer.h" namespace core { - + Timer::Timer(EPoll &ePoll, double delay = 0.0f) : Socket(ePoll, "Timer") { setDescriptor(timerfd_create(CLOCK_REALTIME, 0)); setTimer(delay); } - + Timer::~Timer() { } - + void Timer::setTimer(double delay) { - + double integer; double fraction; struct itimerspec timer; - + delayValue = delay; - + timer.it_interval.tv_sec = 0; timer.it_interval.tv_nsec = 0; - + fraction = modf(delay, &integer); - + timer.it_value.tv_sec = (int)integer; timer.it_value.tv_nsec = (int)(fraction * 1000000000); - + timerfd_settime(getDescriptor(), 0, &timer, NULL); - + } - + void Timer::clearTimer() { - + struct itimerspec timer; - + timer.it_interval.tv_sec = 0; timer.it_interval.tv_nsec = 0; timer.it_value.tv_sec = 0; timer.it_value.tv_nsec = 0; - + timerfd_settime(getDescriptor(), 0, &timer, NULL); - } - + double Timer::getElapsed() { struct itimerspec timer; timerfd_gettime(getDescriptor(), &timer); double toTimeout = (double)((timer.it_value.tv_sec * 1000000000L) + timer.it_value.tv_nsec) / 1000000000L; return delayValue - toTimeout; } - - void Timer::onDataReceived(std::string data) { + + void Timer::onDataReceived(std::string data) { onTimeout(); - } - + } + double Timer::getEpoch() { return (double)std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() /1000000000L; } - + } diff --git a/html/CommandList_8h_source.html b/html/CommandList_8h_source.html index b013265..04f32e6 100644 --- a/html/CommandList_8h_source.html +++ b/html/CommandList_8h_source.html @@ -112,8 +112,8 @@ $(function() {
78 
79 #endif
Definition: CommandList.h:18
-
int processCommand(coreutils::ZString &request, TCPSession &session)
Definition: CommandList.cpp:42
-
bool grabInput(TCPSession &session, Command &command)
Definition: CommandList.cpp:33
+
int processCommand(coreutils::ZString &request, TCPSession &session)
Definition: CommandList.cpp:43
+
bool grabInput(TCPSession &session, Command &command)
Definition: CommandList.cpp:34
void add(Command &command, std::string name="")
Definition: CommandList.cpp:8
void remove(Command &command)
Definition: CommandList.cpp:12
int processRequest(coreutils::ZString &request, TCPSession &session)
Definition: CommandList.cpp:14
diff --git a/html/EPoll_8h_source.html b/html/EPoll_8h_source.html index 757633f..ebf61e2 100644 --- a/html/EPoll_8h_source.html +++ b/html/EPoll_8h_source.html @@ -81,56 +81,58 @@ $(function() {
32 
33  public:
34 
-
38 
-
39  EPoll();
+
35  volatile long long eventId = 0;
+
36 
40 
-
44 
-
45  ~EPoll();
-
46 
-
53 
-
54  bool start(int numberOfThreads, int maxSockets);
+
41  EPoll();
+
42 
+
46 
+
47  ~EPoll();
+
48 
55 
-
61 
-
62  bool stop();
+
56  bool start(int numberOfThreads, int maxSockets);
+
57 
63 
-
68 
-
69  bool isStopping();
+
64  bool stop();
+
65 
70 
-
79 
-
80  bool registerSocket(Socket *socket);
+
71  bool isStopping();
+
72 
81 
-
85 
-
86  bool unregisterSocket(Socket *socket);
+
82  bool registerSocket(Socket *socket);
+
83 
87 
-
91 
-
92  int getDescriptor();
+
88  bool unregisterSocket(Socket *socket);
+
89 
93 
-
97 
-
98  int maxSockets;
+
94  int getDescriptor();
+
95 
99 
-
103 
-
104  void eventReceived(struct epoll_event event);
+ +
101 
105 
-
112 
-
113  int processCommand(coreutils::ZString &request, TCPSession &session) override;
+
106  void eventReceived(struct epoll_event event);
+
107 
114 
-
115  void resetSocket(Socket *socket);
+
115  int processCommand(coreutils::ZString &request, TCPSession &session) override;
116 
-
117  private:
-
118 
-
119  int epfd;
-
120  int numberOfThreads;
-
121  std::vector<Thread> threads;
-
122  volatile bool terminateThreads;
-
123  void enableSocket(Socket *socket);
-
124  void disableSocket(Socket *socket);
-
125 
-
126  };
-
127 
-
128 }
+
117  void resetSocket(Socket *socket);
+
118 
+
119  private:
+
120 
+
121  int epfd;
+
122  int numberOfThreads;
+
123  std::vector<Thread> threads;
+
124  volatile bool terminateThreads;
+
125  void enableSocket(Socket *socket);
+
126  void disableSocket(Socket *socket);
+
127 
+
128  };
129 
-
130 #endif
+
130 }
131 
+
132 #endif
+
133 
Definition: Command.h:22
Definition: EPoll.h:31
bool stop()
Stop and shut down the BMAEPoll processing.
Definition: EPoll.cpp:46
@@ -143,7 +145,7 @@ $(function() {
bool unregisterSocket(Socket *socket)
Unregister a BMASocket from monitoring by BMAEPoll.
Definition: EPoll.cpp:75
~EPoll()
Definition: EPoll.cpp:18
bool start(int numberOfThreads, int maxSockets)
Start the BMAEPoll processing.
Definition: EPoll.cpp:20
-
int maxSockets
The maximum number of socket allowed.
Definition: EPoll.h:98
+
int maxSockets
The maximum number of socket allowed.
Definition: EPoll.h:100
Definition: Socket.h:34
Definition: TCPSession.h:24
diff --git a/html/INotify_8h_source.html b/html/INotify_8h_source.html index 1f79a59..d9b1771 100644 --- a/html/INotify_8h_source.html +++ b/html/INotify_8h_source.html @@ -72,33 +72,33 @@ $(function() {
5 #include "Socket.h"
6 
7 namespace core {
-
8 
+
8 
9  class INotify : Socket {
-
10 
+
10 
11  public:
12  INotify(EPoll &ePoll);
13  ~INotify();
-
14 
-
15  int addWatch(std::string watch);
+
14 
+
15  int addWatch(coreutils::ZString &watch);
16  void removeWatch(int wd);
17 
-
18  void onDataReceived(coreutils::ZString &data) override;
+
18  void onDataReceived(coreutils::ZString &data) override;
19 
-
20  virtual void inAccess(std::string name) {}
-
21  virtual void inAttrib(std::string name) {}
+
20  virtual void inAccess(coreutils::ZString name) {}
+
21  virtual void inAttrib(std::string name) {}
22  virtual void inCloseWrite(std::string name) {}
23  virtual void inCloseNoWrite(std::string name) {}
-
24  virtual void inCreate(std::string name) {}
-
25  virtual void inDelete(std::string name) {}
-
26  virtual void inDeleteSelf(std::string name) {}
-
27  virtual void inModify(std::string name) {}
-
28  virtual void inMoveSelf(std::string name) {}
-
29  virtual void inMovedFrom(std::string name) {}
-
30  virtual void inMovedTo(std::string name) {}
-
31  virtual void inOpen(std::string name) {}
-
32 
+
24  virtual void inCreate(coreutils::ZString &name) {}
+
25  virtual void inDelete(std::string name) {}
+
26  virtual void inDeleteSelf(std::string name) {}
+
27  virtual void inModify(std::string name) {}
+
28  virtual void inMoveSelf(std::string name) {}
+
29  virtual void inMovedFrom(std::string name) {}
+
30  virtual void inMovedTo(std::string name) {}
+
31  virtual void inOpen(std::string name) {}
+
32 
33  };
-
34 
+
34 
35 }
36 
37 #endif
diff --git a/html/IPAddress_8h_source.html b/html/IPAddress_8h_source.html index 376e6e9..33793d8 100644 --- a/html/IPAddress_8h_source.html +++ b/html/IPAddress_8h_source.html @@ -74,14 +74,14 @@ $(function() {
7 namespace core {
8 
9  class IPAddress : public Object {
-
10 
+
10 
11  public:
12  IPAddress();
13  IPAddress(std::string address);
14  IPAddress(std::string address, int port);
15  ~IPAddress();
16 
-
17  struct sockaddr_in addr;
+
17  struct sockaddr_in addr;
18  socklen_t addressLength;
19 
20  struct sockaddr * getPointer();
diff --git a/html/Socket_8h_source.html b/html/Socket_8h_source.html index 1be47c5..925b28f 100644 --- a/html/Socket_8h_source.html +++ b/html/Socket_8h_source.html @@ -90,15 +90,15 @@ $(function() {
58 
59  void shutdown(std::string text = "unknown");
60 
-
65 
+
65 
66  void setDescriptor(int descriptor);
-
67 
+
67 
68  int getDescriptor();
-
69 
-
82 
-
83  bool eventReceived(struct epoll_event event);
+
69 
+
82 
+
83  bool eventReceived(struct epoll_event event, long long eventId);
84 
-
88 
+
88 
89  int write(std::string data);
90  void write(char *buffer, int length);
91 
@@ -107,93 +107,87 @@ $(function() {
100 
101  virtual void onRegister();
102  virtual void onRegistered();
-
103 
+
103 
104  virtual void onUnregister();
-
105 
-
112 
-
113  virtual void onUnregistered();
-
114 
-
115  bool needsToWrite();
-
116 
-
117  bool reset = false;
-
118 
-
119  protected:
-
120 
-
121  EPoll &ePoll; // The EPoll control object.
-
122 
-
123  bool shutDown = false;
-
124 
-
125  void setBufferSize(int length);
-
126 
-
127  int getBufferSize();
-
128 
-
134 
-
135 // virtual void onConnected(); ///< Called when socket is open and ready to communicate.
-
136 
-
140 
-
141 // virtual void onDisconnected(); ///< Called when socket is closing and no longer ready to communicate.
-
142 
-
150 
-
151  virtual void onDataReceived(std::string data);
-
152 
-
156 
-
157  virtual void onDataReceived(coreutils::ZString &data);
-
158 
-
163 
-
164  virtual void receiveData(coreutils::ZString &buffer);
-
165 
-
166  private:
-
167 
-
168  std::string text;
-
169  int descriptor = -1;
-
170 // std::mutex lock;
-
171  std::mutex outlock;
-
172  bool readHangup = false;
-
173  bool inHandler = false;
-
174 // struct epoll_event event; // Event selection construction structure.
-
175 
-
176  //-------------------------------------------------------------------------------------
-
177  // the writeSocket is called when epoll has received a write request for a socket.
-
178  // Writing data to this socket is queued in the streambuf and permission is requested
-
179  // to write to the socket. This routine handles the writing of the streambuf data
-
180  // buffer to the socket.
-
181  //-------------------------------------------------------------------------------------
-
182 
-
183  void writeSocket();
-
184 
-
185  // int_type underflow();
-
186 // int_type uflow();
-
187 // int_type pbackfail(int_type ch);
-
188 // streamsize showmanyc();
+
105 
+
111 
+
112  virtual void onUnregistered();
+
113 
+
114  bool needsToWrite();
+
115 
+
116  bool reset = false;
+
117 
+
118  volatile bool shutDown = false;
+
119 
+
120  protected:
+
121 
+
122  EPoll &ePoll; // The EPoll control object.
+
123 
+
124  void setBufferSize(int length);
+
125 
+
126  int getBufferSize();
+
127 
+
133 
+
134 // virtual void onConnected(); ///< Called when socket is open and ready to communicate.
+
135 
+
139 
+
140 // virtual void onDisconnected(); ///< Called when socket is closing and no longer ready to communicate.
+
141 
+
149 
+
150  virtual void onDataReceived(std::string data);
+
151 
+
155 
+
156  virtual void onDataReceived(coreutils::ZString &data);
+
157 
+
162 
+
163  virtual void receiveData(coreutils::ZString &buffer);
+
164 
+
165  private:
+
166 
+
167  std::string text;
+
168  int descriptor = -1;
+
169  std::mutex outlock;
+
170  bool readHangup = false;
+
171  volatile bool inHandler = false;
+
172 
+
173  //-------------------------------------------------------------------------------------
+
174  // the writeSocket is called when epoll has received a write request for a socket.
+
175  // Writing data to this socket is queued in the streambuf and permission is requested
+
176  // to write to the socket. This routine handles the writing of the streambuf data
+
177  // buffer to the socket.
+
178  //-------------------------------------------------------------------------------------
+
179 
+
180  void writeSocket();
+
181 
+
182  // int_type underflow();
+
183 // int_type uflow();
+
184 // int_type pbackfail(int_type ch);
+
185 // streamsize showmanyc();
+
186 
+
187  char *buffer; // This is a pointer to the managed buffer space.
+
188  int length; // This is the length of the buffer.
189 
-
190  char *buffer; // This is a pointer to the managed buffer space.
-
191  int length; // This is the length of the buffer.
-
192 
-
193 // const char * const begin_;
-
194 // const char * const end_;
-
195 // const char * const current_;
-
196 
-
197  std::queue<std::string> fifo;
-
198 
-
199  };
-
200 
-
201 }
-
202 
-
203 #endif
-
204 
+
190  std::queue<std::string> fifo;
+
191 
+
192  };
+
193 
+
194 }
+
195 
+
196 #endif
+
197 
Definition: EPoll.h:31
Definition: Socket.h:34
-
int getDescriptor()
Get the descriptor for the socket.
Definition: Socket.cpp:41
-
int write(std::string data)
Definition: Socket.cpp:138
-
virtual void onRegistered()
Called after the socket has been registered with epoll processing.
Definition: Socket.cpp:57
-
virtual void receiveData(coreutils::ZString &buffer)
Definition: Socket.cpp:91
+
int getDescriptor()
Get the descriptor for the socket.
Definition: Socket.cpp:44
+
int write(std::string data)
Definition: Socket.cpp:137
+
bool eventReceived(struct epoll_event event, long long eventId)
Parse epoll event and call specified callbacks.
Definition: Socket.cpp:65
+
virtual void onRegistered()
Called after the socket has been registered with epoll processing.
Definition: Socket.cpp:59
+
virtual void receiveData(coreutils::ZString &buffer)
Definition: Socket.cpp:94
Socket(EPoll &ePoll, std::string text="")
Definition: Socket.cpp:11
-
bool eventReceived(struct epoll_event event)
Parse epoll event and call specified callbacks.
Definition: Socket.cpp:63
-
virtual void onRegister()
Called before the socket has registered with the epoll processing.
Definition: Socket.cpp:55
-
virtual ~Socket()
Definition: Socket.cpp:17
-
void setDescriptor(int descriptor)
Set the descriptor for the socket.
Definition: Socket.cpp:27
-
virtual void onDataReceived(std::string data)
Called when data is received from the socket.
Definition: Socket.cpp:83
-
virtual void onUnregistered()
Called when the socket has finished unregistering for the epoll processing.
Definition: Socket.cpp:61
+
virtual void onRegister()
Called before the socket has registered with the epoll processing.
Definition: Socket.cpp:57
+
virtual ~Socket()
Definition: Socket.cpp:16
+
void setDescriptor(int descriptor)
Set the descriptor for the socket.
Definition: Socket.cpp:29
+
virtual void onDataReceived(std::string data)
Called when data is received from the socket.
Definition: Socket.cpp:86
+
virtual void onUnregistered()
Called when the socket has finished unregistering for the epoll processing.
Definition: Socket.cpp:63
void shutdown(std::string text="unknown")
Definition: Socket.cpp:155
diff --git a/html/TCPSession2_8h_source.html b/html/TCPSession2_8h_source.html new file mode 100644 index 0000000..6eafeb1 --- /dev/null +++ b/html/TCPSession2_8h_source.html @@ -0,0 +1,161 @@ + + + + + + + +My Project: TCPSession2.h Source File + + + + + + + + + +
+
+ + + + + + +
+
My Project +
+
+
+ + + + + + + +
+ +
+
+ + +
+ +
+ +
+
+
TCPSession2.h
+
+
+
1 #ifndef __TCPSession2_h__
+
2 # define __TCPSession2_h__
+
3 
+
4 #include "TCPSocket.h"
+
5 #include "Timer.h"
+
6 #include "SessionFilter.h"
+
7 
+
8 namespace core {
+
9 
+
10  class Command;
+
11  class TCPServer;
+
12 
+
25 
+
26  class TCPSession2 : public TCPSocket {
+
27 
+
28  public:
+
29 
+
33 
+
34  TCPSession2(EPoll &ePoll, std::string text = "");
+
35 
+
39 
+
40  virtual ~TCPSession2();
+
41 
+
42  Command *grab = NULL;
+
43 
+
44  virtual void output(std::stringstream &data);
+
45 
+
50 
+
51  void send();
+
52 
+
56 
+
57  void terminate();
+
58 
+
62 
+
63  std::stringstream out;
+
64 
+
65  protected:
+
66 
+
70 
+
71  virtual void onRegistered() override;
+
72 
+
78  virtual void onDataReceived(coreutils::ZString &data) override;
+
79 
+
86 
+
87  virtual void onLineReceived(coreutils::ZString &line);
+
88 
+
95 
+
96  virtual void onBlockReceived(coreutils::ZString &block);
+
97 
+
103 
+
104  virtual void onConnected();
+
105 
+
115 
+
116  virtual void protocol(coreutils::ZString &data);
+
117 
+
123 
+
124  void setBlockSize(int size = 0);
+
125 
+
126  private:
+
127  char *lineBuffer = NULL;
+
128  int lineBufferSize = 0;
+
129  int lineLength = 0;
+
130  int blockLength = 0;
+
131  std::mutex mtx;
+
132  bool term = false;
+
133  int blockSize = 0;
+
134 
+
135  };
+
136 
+
137 }
+
138 
+
139 #endif
+
Definition: Command.h:22
+
Definition: EPoll.h:31
+
Definition: TCPSession2.h:26
+
virtual void protocol(coreutils::ZString &data)
Definition: TCPSession2.cpp:15
+
virtual void onDataReceived(coreutils::ZString &data) override
Definition: TCPSession2.cpp:26
+
std::stringstream out
Definition: TCPSession2.h:63
+
void terminate()
Definition: TCPSession2.cpp:81
+
virtual void onBlockReceived(coreutils::ZString &block)
Definition: TCPSession2.cpp:69
+
virtual void onLineReceived(coreutils::ZString &line)
Definition: TCPSession2.cpp:62
+
void setBlockSize(int size=0)
Definition: TCPSession2.cpp:58
+
virtual void onRegistered() override
Called after the socket has been registered with epoll processing.
Definition: TCPSession2.cpp:17
+
void send()
Definition: TCPSession2.cpp:75
+
virtual void output(std::stringstream &data)
Definition: TCPSession2.cpp:11
+
virtual void onConnected()
Definition: TCPSession2.cpp:24
+
Definition: TCPSocket.h:20
+
+ + + + diff --git a/html/TCPSession_8h_source.html b/html/TCPSession_8h_source.html index cac4bce..4249d08 100644 --- a/html/TCPSession_8h_source.html +++ b/html/TCPSession_8h_source.html @@ -143,17 +143,17 @@ $(function() {
Definition: EPoll.h:31
Definition: TCPServer.h:25
Definition: TCPSession.h:24
-
void send()
Definition: TCPSession.cpp:87
-
void terminate()
Definition: TCPSession.cpp:93
-
virtual void onLineReceived(coreutils::ZString &line)
Definition: TCPSession.cpp:74
+
void send()
Definition: TCPSession.cpp:84
+
void terminate()
Definition: TCPSession.cpp:90
+
virtual void onLineReceived(coreutils::ZString &line)
Definition: TCPSession.cpp:71
virtual void output(std::stringstream &data)
Definition: TCPSession.cpp:15
-
void setBlockSize(int size=0)
Definition: TCPSession.cpp:70
-
virtual void onConnected()
Definition: TCPSession.cpp:36
+
void setBlockSize(int size=0)
Definition: TCPSession.cpp:67
+
virtual void onConnected()
Definition: TCPSession.cpp:33
virtual void protocol(coreutils::ZString &data)
Definition: TCPSession.cpp:19
-
virtual void onBlockReceived(coreutils::ZString &block)
Definition: TCPSession.cpp:81
+
virtual void onBlockReceived(coreutils::ZString &block)
Definition: TCPSession.cpp:78
std::stringstream out
Definition: TCPSession.h:67
-
virtual void onRegistered() override
Called after the socket has been registered with epoll processing.
Definition: TCPSession.cpp:27
-
virtual void onDataReceived(coreutils::ZString &data) override
Definition: TCPSession.cpp:38
+
virtual void onRegistered() override
Called after the socket has been registered with epoll processing.
Definition: TCPSession.cpp:24
+
virtual void onDataReceived(coreutils::ZString &data) override
Definition: TCPSession.cpp:35
Definition: TCPSocket.h:20
diff --git a/html/TCPSocket_8h_source.html b/html/TCPSocket_8h_source.html index d57da81..d09b5f5 100644 --- a/html/TCPSocket_8h_source.html +++ b/html/TCPSocket_8h_source.html @@ -99,7 +99,7 @@ $(function() {
Definition: IPAddress.h:9
Definition: Socket.h:34
Definition: TCPSocket.h:20
-
virtual void output(std::stringstream &out)
Definition: TCPSocket.cpp:20
+
virtual void output(std::stringstream &out)
Definition: TCPSocket.cpp:22