ECThreadManager.h 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #ifndef ECTHREADMANAGER_H
  18. #define ECTHREADMANAGER_H
  19. #include <kopano/zcdefs.h>
  20. #include <condition_variable>
  21. #include <mutex>
  22. #include <queue>
  23. #include <set>
  24. #include <pthread.h>
  25. #include <kopano/ECConfig.h>
  26. #include <kopano/kcodes.h>
  27. #include "SOAPUtils.h"
  28. #include "soapH.h"
  29. /*
  30. * A single work item - it doesn't contain much since we defer all processing, including XML
  31. * parsing until a worker thread starts processing
  32. */
  33. struct WORKITEM {
  34. struct soap *soap; // socket and state associated with the connection
  35. double dblReceiveStamp; // time at which activity was detected on the socket
  36. };
  37. struct ACTIVESOCKET _kc_final {
  38. struct soap *soap;
  39. time_t ulLastActivity;
  40. bool operator < (const ACTIVESOCKET &a) const { return a.soap->socket < this->soap->socket; };
  41. };
  42. class FindSocket _kc_final {
  43. public:
  44. FindSocket(SOAP_SOCKET s) { this->s = s; };
  45. bool operator()(const ACTIVESOCKET &a) const { return a.soap->socket == s; }
  46. private:
  47. SOAP_SOCKET s;
  48. };
  49. class FindListenSocket _kc_final {
  50. public:
  51. FindListenSocket(SOAP_SOCKET s) { this->s = s; };
  52. bool operator()(struct soap *soap) const { return soap->socket == s; }
  53. private:
  54. SOAP_SOCKET s;
  55. };
  56. class ECThreadManager;
  57. class ECDispatcher;
  58. /*
  59. * Each instance of ECWorkerThread represents a single worker thread; the thread is started
  60. * when constructed, and exits when it is deleted. It needs access to the thread manager to notify
  61. * it of the thread state, possibly exiting if needed, and needs access to the dispatcher to retrieve
  62. * the next work item.
  63. */
  64. class ECWorkerThread {
  65. public:
  66. ECWorkerThread(ECThreadManager *, ECDispatcher *, bool nostart = false);
  67. protected:
  68. // The destructor is protected since we self-cleanup; you cannot delete this object externally.
  69. virtual ~ECWorkerThread(void) _kc_impdtor;
  70. static void *Work(void *param);
  71. pthread_t m_thread;
  72. ECThreadManager *m_lpManager;
  73. ECDispatcher *m_lpDispatcher;
  74. };
  75. class _kc_export_dycast ECPriorityWorkerThread _kc_final :
  76. public ECWorkerThread {
  77. public:
  78. _kc_hidden ECPriorityWorkerThread(ECThreadManager *, ECDispatcher *);
  79. // The destructor is public since this thread isn't detached, we wait for the thread and clean it
  80. _kc_hidden ~ECPriorityWorkerThread(void);
  81. };
  82. /*
  83. * It is the thread manager's job to keep track of processing threads, and adding or removing threads
  84. * when requested.
  85. */
  86. class ECThreadManager _kc_final {
  87. public:
  88. // ulThreads is the normal number of threads that are started; These threads are pre-started and will be in an idle state.
  89. ECThreadManager(ECDispatcher *, unsigned int threads);
  90. ~ECThreadManager();
  91. // Adds n threads above the standard thread count. Threads are removed back to the normal thread count whenever the message
  92. // queue hits size 0 and there is an idle thread.
  93. ECRESULT ForceAddThread(int nThreads);
  94. // Some statistics
  95. ECRESULT GetThreadCount(unsigned int *lpulThreads);
  96. // This is the same parameter as passed in the constructor
  97. ECRESULT SetThreadCount(unsigned int ulThreads);
  98. // Called by the worker thread when it is idle. *lpfStop is set to TRUE then the thread will terminate and delete itself.
  99. ECRESULT NotifyIdle(ECWorkerThread *, bool *lpfStop);
  100. private:
  101. std::mutex m_mutexThreads;
  102. std::list<ECWorkerThread *> m_lstThreads;
  103. ECPriorityWorkerThread * m_lpPrioWorker;
  104. ECDispatcher * m_lpDispatcher;
  105. unsigned int m_ulThreads;
  106. };
  107. /*
  108. * Represents the watchdog thread. This monitors the dispatcher and acts when needed.
  109. *
  110. * We check the age of the first item in the queue dblMaxFreq times per second. If it is higher
  111. * than dblMaxAge, a new thread is added
  112. *
  113. * Thread deletion is done by the Thread Manager.
  114. */
  115. class ECWatchDog _kc_final {
  116. public:
  117. ECWatchDog(ECConfig *, ECDispatcher *, ECThreadManager *);
  118. ~ECWatchDog();
  119. private:
  120. // Main watch thread
  121. static void *Watch(void *);
  122. ECConfig * m_lpConfig;
  123. ECDispatcher * m_lpDispatcher;
  124. ECThreadManager* m_lpThreadManager;
  125. pthread_t m_thread;
  126. bool m_bExit = false;
  127. std::mutex m_mutexExit;
  128. std::condition_variable m_condExit;
  129. };
  130. /*
  131. * The main dispatcher; The dispatcher monitors open connections for activity and queues processing on the
  132. * work item queue. It is the owner of the thread manager and watchdog. Workers will query the dispatcher for
  133. * work items and will inform the dispatcher when a work item is done.
  134. */
  135. typedef SOAP_SOCKET (*CREATEPIPESOCKETCALLBACK)(void *lpParam);
  136. class ECDispatcher {
  137. public:
  138. ECDispatcher(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
  139. virtual ~ECDispatcher(void) _kc_impdtor;
  140. // Statistics
  141. ECRESULT GetIdle(unsigned int *lpulIdle); // Idle threads
  142. ECRESULT GetThreadCount(unsigned int *lpulThreads, unsigned int *lpulIdleThreads); // Total threads + idle threads
  143. ECRESULT GetFrontItemAge(double *lpdblAge); // Age of the front queue item (time since the item was queued and now)
  144. ECRESULT GetQueueLength(unsigned int *lpulQueueLength); // Number of requests in the queue
  145. ECRESULT SetThreadCount(unsigned int ulThreads);
  146. // Add a listen socket
  147. ECRESULT AddListenSocket(struct soap *soap);
  148. // Add soap socket in the work queue
  149. ECRESULT QueueItem(struct soap *soap);
  150. // Get the next work item on the queue, if bWait is TRUE, will block until a work item is available. The returned
  151. // workitem should not be freed, but returned to the class via NotifyDone(), at which point it will be cleaned up
  152. ECRESULT GetNextWorkItem(WORKITEM **item, bool bWait, bool bPrio);
  153. // Reload variables from config
  154. ECRESULT DoHUP();
  155. // Called asynchronously during MainLoop() to shutdown the server
  156. virtual ECRESULT ShutDown();
  157. // Inform that a soap request was processed and is finished. This will cause the dispatcher to start listening
  158. // on that socket for activity again
  159. ECRESULT NotifyDone(struct soap *soap);
  160. virtual ECRESULT NotifyRestart(SOAP_SOCKET s) = 0;
  161. // Goes into main listen loop, accepting sockets and monitoring existing accepted sockets for activity. Also closes
  162. // sockets which are idle for more than ulSocketTimeout
  163. virtual ECRESULT MainLoop() = 0;
  164. protected:
  165. ECConfig * m_lpConfig;
  166. ECThreadManager *m_lpThreadManager = nullptr;
  167. std::mutex m_mutexItems;
  168. std::queue<WORKITEM *> m_queueItems;
  169. std::condition_variable m_condItems;
  170. std::queue<WORKITEM *> m_queuePrioItems;
  171. std::condition_variable m_condPrioItems;
  172. std::map<int, ACTIVESOCKET> m_setSockets;
  173. std::map<int, struct soap *> m_setListenSockets;
  174. std::mutex m_mutexSockets;
  175. bool m_bExit = false;
  176. std::mutex m_mutexIdle;
  177. unsigned int m_ulIdle = 0;
  178. CREATEPIPESOCKETCALLBACK m_lpCreatePipeSocketCallback;
  179. void * m_lpCreatePipeSocketParam;
  180. // Socket settings (TCP + SSL)
  181. int m_nMaxKeepAlive;
  182. int m_nRecvTimeout;
  183. int m_nReadTimeout;
  184. int m_nSendTimeout;
  185. };
  186. class ECDispatcherSelect _kc_final : public ECDispatcher {
  187. private:
  188. int m_fdRescanRead;
  189. int m_fdRescanWrite;
  190. public:
  191. ECDispatcherSelect(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
  192. virtual ECRESULT MainLoop();
  193. virtual ECRESULT ShutDown();
  194. virtual ECRESULT NotifyRestart(SOAP_SOCKET s);
  195. };
  196. #ifdef HAVE_EPOLL_CREATE
  197. class ECDispatcherEPoll _kc_final : public ECDispatcher {
  198. private:
  199. int m_fdMax;
  200. int m_epFD;
  201. public:
  202. ECDispatcherEPoll(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
  203. virtual ~ECDispatcherEPoll();
  204. virtual ECRESULT MainLoop();
  205. //virtual ECRESULT ShutDown();
  206. virtual ECRESULT NotifyRestart(SOAP_SOCKET s);
  207. };
  208. #endif
  209. #endif