From cfa70c58632a367d40a6220c0c265a858bc1ce62 Mon Sep 17 00:00:00 2001 From: Brad Arant Date: Sat, 11 Mar 2023 21:55:33 -0800 Subject: [PATCH] Fixed multithread sync issue and duplicate thread on termination. --- EPoll.cpp | 33 ----- EPoll.h | 23 +--- Socket.cpp | 70 +++++++--- Socket.h | 7 + Subscription.cpp | 2 +- Thread.cpp | 2 +- UDPServerSocket.cpp | 2 +- html/EPoll_8h_source.html | 71 ++++------ html/Socket_8h_source.html | 125 +++++++++--------- html/classcore_1_1ConsoleServer-members.html | 6 +- html/classcore_1_1ConsoleServer.html | 6 + html/classcore_1_1ConsoleSession-members.html | 2 + html/classcore_1_1ConsoleSession.html | 6 + html/classcore_1_1EPoll-members.html | 5 +- html/classcore_1_1EPoll.html | 58 -------- html/classcore_1_1Socket-members.html | 6 +- html/classcore_1_1Socket.html | 6 + html/classcore_1_1TCPServer-members.html | 6 +- html/classcore_1_1TCPServer.html | 6 + html/classcore_1_1TCPSession-members.html | 6 +- html/classcore_1_1TCPSession.html | 6 + html/classcore_1_1TCPSession2-members.html | 6 +- html/classcore_1_1TCPSession2.html | 6 + html/classcore_1_1TCPSocket-members.html | 6 +- html/classcore_1_1TCPSocket.html | 6 + html/classcore_1_1TLSServer-members.html | 2 + html/classcore_1_1TLSServer.html | 6 + html/classcore_1_1TLSSession-members.html | 2 + html/classcore_1_1TLSSession.html | 6 + .../classcore_1_1TerminalSession-members.html | 2 + html/classcore_1_1TerminalSession.html | 6 + .../classcore_1_1UDPServerSocket-members.html | 6 +- html/classcore_1_1UDPServerSocket.html | 6 + html/classcore_1_1UDPSocket-members.html | 2 + html/classcore_1_1UDPSocket.html | 6 + html/functions.html | 6 - html/functions_func.html | 10 -- html/menudata.js | 1 - html/search/all_9.js | 3 +- html/search/all_a.js | 32 ++--- html/search/all_b.js | 22 +-- html/search/all_c.js | 7 +- html/search/all_d.js | 4 +- html/search/all_e.js | 8 +- html/search/classes_0.js | 8 +- html/search/classes_1.js | 2 +- html/search/classes_2.js | 6 +- html/search/classes_3.js | 2 +- html/search/classes_4.js | 10 +- html/search/classes_5.js | 20 +-- html/search/classes_6.js | 4 +- html/search/functions_0.js | 2 +- html/search/functions_1.js | 6 +- html/search/functions_2.js | 4 +- html/search/functions_3.js | 16 +-- html/search/functions_4.js | 2 +- html/search/functions_5.js | 18 +-- html/search/functions_6.js | 6 +- html/search/functions_7.js | 5 +- html/search/functions_8.js | 20 +-- html/search/functions_9.js | 6 +- html/search/functions_a.js | 2 +- html/search/functions_b.js | 5 +- html/search/searchdata.js | 2 +- html/search/variables_0.js | 2 +- html/search/variables_1.js | 2 +- html/search/variables_2.js | 2 +- html/search/variables_3.js | 2 +- html/search/variables_4.js | 2 +- html/search/variables_5.js | 4 +- latex/classcore_1_1EPoll.tex | 41 +----- latex/classcore_1_1Socket.tex | 6 + 72 files changed, 395 insertions(+), 427 deletions(-) diff --git a/EPoll.cpp b/EPoll.cpp index 9a6420a..15c57b9 100644 --- a/EPoll.cpp +++ b/EPoll.cpp @@ -67,16 +67,6 @@ namespace core { return terminateThreads; } - bool EPoll::registerSocket(Socket *socket) { - enableSocket(socket); - return true; - } - - bool EPoll::unregisterSocket(Socket *socket) { - disableSocket(socket); - return true; - } - int EPoll::getDescriptor() { return epfd; } @@ -91,27 +81,4 @@ namespace core { return 1; } - void EPoll::enableSocket(Socket *socket) { - struct epoll_event event; - event.data.ptr = socket; - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - epoll_ctl(epfd, EPOLL_CTL_ADD, socket->getDescriptor(), &event); -// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXenable: " << socket->getDescriptor(); - } - - void EPoll::disableSocket(Socket *socket) { - epoll_ctl(epfd, EPOLL_CTL_DEL, socket->getDescriptor(), NULL); -// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXdisable: " << socket->getDescriptor(); - } - - void EPoll::resetSocket(Socket *socket) { - struct epoll_event event; - event.data.ptr = socket; - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP | EPOLLET; - if(socket->needsToWrite() && !socket->shutDown) - event.events |= EPOLLWRNORM; - epoll_ctl(epfd, EPOLL_CTL_MOD, socket->getDescriptor(), &event); -// coreutils::Log(coreutils::LOG_DEBUG_4) << "BMAXreset: " << socket->getDescriptor(); - } - } diff --git a/EPoll.h b/EPoll.h index 3db2031..d1440e3 100644 --- a/EPoll.h +++ b/EPoll.h @@ -69,24 +69,7 @@ namespace core { /// bool isStopping(); ///< Returns a true if the stop command has been requested. - - /// - /// Use registerSocket to add a new socket to the ePoll event watch list. This enables - /// a new BMASocket object to receive events when data is received as well as to write - /// data output to the socket. - /// - /// @param socket a pointer to a BMASocket object. - /// @return a booelean that indicates the socket was registered or not. - /// - - bool registerSocket(Socket *socket); ///< Register a BMASocket for monitoring by BMAEPoll. - - /// - /// Use this method to remove a socket from receiving events from the epoll system. - /// - - bool unregisterSocket(Socket *socket); ///< Unregister a BMASocket from monitoring by BMAEPoll. - + /// /// Use this method to obtain the current descriptor socket number for the epoll function call. /// @@ -114,16 +97,12 @@ namespace core { int processCommand(coreutils::ZString &request, TCPSession &session) override; /// threads; volatile bool terminateThreads; - void enableSocket(Socket *socket); - void disableSocket(Socket *socket); }; diff --git a/Socket.cpp b/Socket.cpp index 02dcf9d..f6ecab7 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -18,7 +18,7 @@ namespace core Socket::~Socket() { shutDown = true; onUnregister(); - ePoll.unregisterSocket(this); + disableSocket(); coreutils::Log(coreutils::LOG_DEBUG_4) << "Free on socket " << descriptor; free(buffer); if(descriptor == -1) @@ -39,7 +39,7 @@ namespace core throw coreutils::Exception("Descriptor out of range", __FILE__, __LINE__); this->descriptor = descriptor; onRegister(); - ePoll.registerSocket(this); + enableSocket(); onRegistered(); } @@ -68,29 +68,39 @@ namespace core void Socket::onUnregistered() {} bool Socket::eventReceived(struct epoll_event event, long long eventId) { -// lock.lock(); +// coreutils::Log(coreutils::LOG_DEBUG_1) << "Event process beginning for socket " << getDescriptor(); + if(inHandler) + // coreutils::Log(coreutils::LOG_DEBUG_2) << "inHandler was already true."; inHandler = true; if(event.events & EPOLLRDHUP) { +// coreutils::Log(coreutils::LOG_DEBUG_2) << "EPOLLRDHUP"; readHangup = true; shutdown("hangup received"); } if(event.events & EPOLLIN) { +// coreutils::Log(coreutils::LOG_DEBUG_2) << "EPOLLIN"; coreutils::ZString zbuffer(buffer, length); + lock.lock(); receiveData(zbuffer); + if(!shutDown) { + inHandler = false; + lock.unlock(); + resetSocket(); + } } if(event.events & EPOLLWRNORM) { - writeSocket(); +// coreutils::Log(coreutils::LOG_DEBUG_2) << "EPOLLWRNORM"; + writeSocket(); + inHandler = false; + resetSocket(); } - if(event.events & EPOLLHUP) { - shutdown(); - } - inHandler = false; -// lock.unlock(); + inHandler = false; +// coreutils::Log(coreutils::LOG_DEBUG_1) << "Event process ending for socket " << getDescriptor(); return !shutDown; } - + void Socket::onDataReceived(std::string data) - { + { throw coreutils::Exception("Need to override onDataReceived.", __FILE__, __LINE__, -1); } @@ -107,7 +117,7 @@ namespace core int error = -1; if((len = ::read(getDescriptor(), buffer.getData(), buffer.getLength())) >= 0) { coreutils::ZString zbuffer(buffer.getData(), len); - coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; +// coreutils::Log(coreutils::LOG_DEBUG_1) << zbuffer; onDataReceived(zbuffer); } else @@ -135,21 +145,23 @@ namespace core } void Socket::writeSocket() { + outlock.lock(); +// coreutils::Log(coreutils::LOG_DEBUG_3) << "writing data to socket " << getDescriptor(); if(fifo.size() > 0) { - outlock.lock(); - if(!shutDown) - int rc = ::write(descriptor, fifo.front().c_str(), fifo.front().length()); + if(!shutDown) + int rc = ::write(descriptor, fifo.front().c_str(), fifo.front().length()); fifo.pop(); - outlock.unlock(); } + outlock.unlock(); } int Socket::write(std::string data) { outlock.lock(); fifo.emplace(data); outlock.unlock(); - if(!inHandler) { - ePoll.resetSocket(this); + if(lock.try_lock()) { + resetSocket(); + lock.unlock(); } return 1; } @@ -170,4 +182,26 @@ namespace core reset = false; } + void Socket::enableSocket() { + struct epoll_event event; + event.data.ptr = this; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT | EPOLLET; + epoll_ctl(ePoll.getDescriptor(), EPOLL_CTL_ADD, getDescriptor(), &event); + } + + void Socket::disableSocket() { + epoll_ctl(ePoll.getDescriptor(), EPOLL_CTL_DEL, getDescriptor(), NULL); + } + + void Socket::resetSocket() { + struct epoll_event event; + event.data.ptr = this; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT | EPOLLET; + if(fifo.size() > 0) + event.events |= EPOLLWRNORM; + if(!shutDown) + epoll_ctl(ePoll.getDescriptor(), EPOLL_CTL_MOD, getDescriptor(), &event); + } + + } diff --git a/Socket.h b/Socket.h index bf70108..fbd6395 100644 --- a/Socket.h +++ b/Socket.h @@ -117,6 +117,9 @@ namespace core { volatile bool shutDown = false; + void enableSocket(); + void disableSocket(); + protected: EPoll &ePoll; // The EPoll control object. @@ -189,6 +192,10 @@ namespace core { std::queue fifo; + void resetSocket(); + + std::mutex lock; + }; } diff --git a/Subscription.cpp b/Subscription.cpp index 006a37c..65152ad 100644 --- a/Subscription.cpp +++ b/Subscription.cpp @@ -51,7 +51,7 @@ namespace core int Subscription::process(coreutils::ZString &request, std::stringstream &out, TCPSession &session) { - std::cout << "(" << handler << ")" << std::endl; +// std::cout << "(" << handler << ")" << std::endl; if (handler) handler->process(request, out, session); else diff --git a/Thread.cpp b/Thread.cpp index cea1074..2a7628b 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -62,7 +62,7 @@ namespace core { ++count; if(((Socket *)events[ix].data.ptr)->eventReceived(events[ix], ++ePoll.eventId)) { // coreutils::Log(coreutils::LOG_DEBUG_4) << "return true"; - ePoll.resetSocket((Socket *)events[ix].data.ptr); +// ePoll.resetSocket((Socket *)events[ix].data.ptr); } else { ((Socket *)events[ix].data.ptr)->shutDown = true; // coreutils::Log(coreutils::LOG_DEBUG_4) << "return false"; diff --git a/UDPServerSocket.cpp b/UDPServerSocket.cpp index efc9b47..9458698 100644 --- a/UDPServerSocket.cpp +++ b/UDPServerSocket.cpp @@ -24,7 +24,7 @@ namespace core { listen(getDescriptor(), 10); - ePoll.registerSocket(this); +// ePoll.registerSocket(this); } diff --git a/html/EPoll_8h_source.html b/html/EPoll_8h_source.html index 5f25571..e75c626 100644 --- a/html/EPoll_8h_source.html +++ b/html/EPoll_8h_source.html @@ -97,56 +97,43 @@ $(function() {
65 
70 
71  bool isStopping();
-
72 
-
81 
-
82  bool registerSocket(Socket *socket);
-
83 
-
87 
-
88  bool unregisterSocket(Socket *socket);
-
89 
-
93 
-
94  int getDescriptor();
-
95 
-
99 
- -
101 
-
105 
-
106  void eventReceived(struct epoll_event event);
-
107 
-
114 
-
115  int processCommand(coreutils::ZString &request, TCPSession &session) override;
-
116 
-
117  void resetSocket(Socket *socket);
-
118 
-
119  private:
-
120 
-
121  int epfd;
-
122  int numberOfThreads;
-
123  std::vector<Thread> threads;
-
124  volatile bool terminateThreads;
-
125  void enableSocket(Socket *socket);
-
126  void disableSocket(Socket *socket);
-
127 
-
128  };
-
129 
-
130 }
-
131 
-
132 #endif
-
133 
+
72 
+
76 
+
77  int getDescriptor();
+
78 
+
82 
+
83  int maxSockets;
+
84 
+
88 
+
89  void eventReceived(struct epoll_event event);
+
90 
+
97 
+
98  int processCommand(coreutils::ZString &request, TCPSession &session) override;
+
99 
+
100  private:
+
101 
+
102  int epfd;
+
103  int numberOfThreads;
+
104  std::vector<Thread> threads;
+
105  volatile bool terminateThreads;
+
106 
+
107  };
+
108 
+
109 }
+
110 
+
111 #endif
+
112 
Definition: Command.h:22
Definition: EPoll.h:31
bool stop()
Stop and shut down the BMAEPoll processing.
Definition: EPoll.cpp:46
-
int getDescriptor()
Return the descriptor for the ePoll socket.
Definition: EPoll.cpp:80
-
int processCommand(coreutils::ZString &request, TCPSession &session) override
Output the threads array to the console.
Definition: EPoll.cpp:84
+
int getDescriptor()
Return the descriptor for the ePoll socket.
Definition: EPoll.cpp:70
+
int processCommand(coreutils::ZString &request, TCPSession &session) override
Output the threads array to the console.
Definition: EPoll.cpp:74
EPoll()
Definition: EPoll.cpp:9
bool isStopping()
Returns a true if the stop command has been requested.
Definition: EPoll.cpp:66
void eventReceived(struct epoll_event event)
Dispatch event to appropriate socket.
-
bool registerSocket(Socket *socket)
Register a BMASocket for monitoring by BMAEPoll.
Definition: EPoll.cpp:70
-
bool unregisterSocket(Socket *socket)
Unregister a BMASocket from monitoring by BMAEPoll.
Definition: EPoll.cpp:75
~EPoll()
Definition: EPoll.cpp:18
bool start(int numberOfThreads, int maxSockets)
Start the BMAEPoll processing.
Definition: EPoll.cpp:20
-
int maxSockets
The maximum number of socket allowed.
Definition: EPoll.h:100
-
Definition: Socket.h:34
+
int maxSockets
The maximum number of socket allowed.
Definition: EPoll.h:83
Definition: TCPSession.h:26
diff --git a/html/Socket_8h_source.html b/html/Socket_8h_source.html index 5ccd101..1d5babe 100644 --- a/html/Socket_8h_source.html +++ b/html/Socket_8h_source.html @@ -119,76 +119,83 @@ $(function() {
117 
118  volatile bool shutDown = false;
119 
-
120  protected:
-
121 
-
122  EPoll &ePoll; // The EPoll control object.
-
123 
-
124  void setBufferSize(int length);
-
125 
-
126  int getBufferSize();
-
127 
-
133 
-
134 // virtual void onConnected(); ///< Called when socket is open and ready to communicate.
-
135 
-
139 
-
140 // virtual void onDisconnected(); ///< Called when socket is closing and no longer ready to communicate.
-
141 
-
149 
-
150  virtual void onDataReceived(std::string data);
-
151 
-
155 
-
156  virtual void onDataReceived(coreutils::ZString &data);
-
157 
-
162 
-
163  virtual void receiveData(coreutils::ZString &buffer);
-
164 
-
165  private:
-
166 
-
167  std::string text;
-
168  int descriptor = -1;
-
169  std::mutex outlock;
-
170  bool readHangup = false;
-
171  volatile bool inHandler = false;
-
172 
-
173  //-------------------------------------------------------------------------------------
-
174  // the writeSocket is called when epoll has received a write request for a socket.
-
175  // Writing data to this socket is queued in the streambuf and permission is requested
-
176  // to write to the socket. This routine handles the writing of the streambuf data
-
177  // buffer to the socket.
-
178  //-------------------------------------------------------------------------------------
-
179 
-
180  void writeSocket();
-
181 
-
182  // int_type underflow();
-
183 // int_type uflow();
-
184 // int_type pbackfail(int_type ch);
-
185 // streamsize showmanyc();
-
186 
-
187  char *buffer; // This is a pointer to the managed buffer space.
-
188  int length; // This is the length of the buffer.
+
120  void enableSocket();
+
121  void disableSocket();
+
122 
+
123  protected:
+
124 
+
125  EPoll &ePoll; // The EPoll control object.
+
126 
+
127  void setBufferSize(int length);
+
128 
+
129  int getBufferSize();
+
130 
+
136 
+
137 // virtual void onConnected(); ///< Called when socket is open and ready to communicate.
+
138 
+
142 
+
143 // virtual void onDisconnected(); ///< Called when socket is closing and no longer ready to communicate.
+
144 
+
152 
+
153  virtual void onDataReceived(std::string data);
+
154 
+
158 
+
159  virtual void onDataReceived(coreutils::ZString &data);
+
160 
+
165 
+
166  virtual void receiveData(coreutils::ZString &buffer);
+
167 
+
168  private:
+
169 
+
170  std::string text;
+
171  int descriptor = -1;
+
172  std::mutex outlock;
+
173  bool readHangup = false;
+
174  volatile bool inHandler = false;
+
175 
+
176  //-------------------------------------------------------------------------------------
+
177  // the writeSocket is called when epoll has received a write request for a socket.
+
178  // Writing data to this socket is queued in the streambuf and permission is requested
+
179  // to write to the socket. This routine handles the writing of the streambuf data
+
180  // buffer to the socket.
+
181  //-------------------------------------------------------------------------------------
+
182 
+
183  void writeSocket();
+
184 
+
185  // int_type underflow();
+
186 // int_type uflow();
+
187 // int_type pbackfail(int_type ch);
+
188 // streamsize showmanyc();
189 
-
190  std::queue<std::string> fifo;
-
191 
-
192  };
-
193 
-
194 }
-
195 
-
196 #endif
-
197 
+
190  char *buffer; // This is a pointer to the managed buffer space.
+
191  int length; // This is the length of the buffer.
+
192 
+
193  std::queue<std::string> fifo;
+
194 
+
195  void resetSocket();
+
196 
+
197  std::mutex lock;
+
198 
+
199  };
+
200 
+
201 }
+
202 
+
203 #endif
+
204 
Definition: EPoll.h:31
Definition: Socket.h:34
int getDescriptor()
Get the descriptor for the socket.
Definition: Socket.cpp:46
-
int write(std::string data)
Definition: Socket.cpp:147
+
int write(std::string data)
Definition: Socket.cpp:158
bool eventReceived(struct epoll_event event, long long eventId)
Parse epoll event and call specified callbacks.
Definition: Socket.cpp:70
virtual void onRegistered()
Called after the socket has been registered with epoll processing.
Definition: Socket.cpp:64
-
virtual void receiveData(coreutils::ZString &buffer)
Definition: Socket.cpp:102
+
virtual void receiveData(coreutils::ZString &buffer)
Definition: Socket.cpp:112
Socket(EPoll &ePoll, std::string text="")
Definition: Socket.cpp:12
virtual void onRegister()
Called before the socket has registered with the epoll processing.
Definition: Socket.cpp:62
virtual ~Socket()
Definition: Socket.cpp:18
void setDescriptor(int descriptor)
Set the descriptor for the socket.
Definition: Socket.cpp:30
-
virtual void onDataReceived(std::string data)
Called when data is received from the socket.
Definition: Socket.cpp:92
+
virtual void onDataReceived(std::string data)
Called when data is received from the socket.
Definition: Socket.cpp:102
virtual void onUnregistered()
Called when the socket has finished unregistering for the epoll processing.
Definition: Socket.cpp:68
-
void shutdown(std::string text="unknown")
Definition: Socket.cpp:167
+
void shutdown(std::string text="unknown")
Definition: Socket.cpp:179