ECThreadManager.cpp 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <mutex>
  19. #include "ECThreadManager.h"
  20. #include <cmath>
  21. #include <cstdlib>
  22. #include <algorithm>
  23. #include <poll.h>
  24. #include <unistd.h>
  25. #include <kopano/lockhelper.hpp>
  26. #include <kopano/stringutil.h>
  27. #ifdef HAVE_EPOLL_CREATE
  28. #include <sys/epoll.h>
  29. #endif
  30. #include <kopano/CommonUtil.h>
  31. #include "ECSessionManager.h"
  32. #include "ECStatsCollector.h"
  33. #include "ECServerEntrypoint.h"
  34. #include "ECSoapServerConnection.h"
  35. // errors from stdsoap2.h, differs per gSOAP release
  36. #define RETURN_CASE(x) \
  37. case x: \
  38. return #x;
  39. static string GetSoapError(int err)
  40. {
  41. switch (err) {
  42. RETURN_CASE(SOAP_EOF)
  43. RETURN_CASE(SOAP_CLI_FAULT)
  44. RETURN_CASE(SOAP_SVR_FAULT)
  45. RETURN_CASE(SOAP_TAG_MISMATCH)
  46. RETURN_CASE(SOAP_TYPE)
  47. RETURN_CASE(SOAP_SYNTAX_ERROR)
  48. RETURN_CASE(SOAP_NO_TAG)
  49. RETURN_CASE(SOAP_IOB)
  50. RETURN_CASE(SOAP_MUSTUNDERSTAND)
  51. RETURN_CASE(SOAP_NAMESPACE)
  52. RETURN_CASE(SOAP_USER_ERROR)
  53. RETURN_CASE(SOAP_FATAL_ERROR)
  54. RETURN_CASE(SOAP_FAULT)
  55. RETURN_CASE(SOAP_NO_METHOD)
  56. RETURN_CASE(SOAP_NO_DATA)
  57. RETURN_CASE(SOAP_GET_METHOD)
  58. RETURN_CASE(SOAP_PUT_METHOD)
  59. RETURN_CASE(SOAP_DEL_METHOD)
  60. RETURN_CASE(SOAP_HEAD_METHOD)
  61. RETURN_CASE(SOAP_HTTP_METHOD)
  62. RETURN_CASE(SOAP_EOM)
  63. RETURN_CASE(SOAP_MOE)
  64. RETURN_CASE(SOAP_HDR)
  65. RETURN_CASE(SOAP_NULL)
  66. RETURN_CASE(SOAP_DUPLICATE_ID)
  67. RETURN_CASE(SOAP_MISSING_ID)
  68. RETURN_CASE(SOAP_HREF)
  69. RETURN_CASE(SOAP_UDP_ERROR)
  70. RETURN_CASE(SOAP_TCP_ERROR)
  71. RETURN_CASE(SOAP_HTTP_ERROR)
  72. RETURN_CASE(SOAP_SSL_ERROR)
  73. RETURN_CASE(SOAP_ZLIB_ERROR)
  74. RETURN_CASE(SOAP_DIME_ERROR)
  75. RETURN_CASE(SOAP_DIME_HREF)
  76. RETURN_CASE(SOAP_DIME_MISMATCH)
  77. RETURN_CASE(SOAP_DIME_END)
  78. RETURN_CASE(SOAP_MIME_ERROR)
  79. RETURN_CASE(SOAP_MIME_HREF)
  80. RETURN_CASE(SOAP_MIME_END)
  81. RETURN_CASE(SOAP_VERSIONMISMATCH)
  82. RETURN_CASE(SOAP_PLUGIN_ERROR)
  83. RETURN_CASE(SOAP_DATAENCODINGUNKNOWN)
  84. RETURN_CASE(SOAP_REQUIRED)
  85. RETURN_CASE(SOAP_PROHIBITED)
  86. RETURN_CASE(SOAP_OCCURS)
  87. RETURN_CASE(SOAP_LENGTH)
  88. RETURN_CASE(SOAP_FD_EXCEEDED)
  89. }
  90. return stringify(err);
  91. }
  92. static void kcsrv_blocksigs(void)
  93. {
  94. sigset_t m;
  95. sigemptyset(&m);
  96. sigaddset(&m, SIGINT);
  97. sigaddset(&m, SIGHUP);
  98. sigaddset(&m, SIGTERM);
  99. }
  100. ECWorkerThread::ECWorkerThread(ECThreadManager *lpManager,
  101. ECDispatcher *lpDispatcher, bool bDoNotStart)
  102. {
  103. m_lpManager = lpManager;
  104. m_lpDispatcher = lpDispatcher;
  105. if (bDoNotStart) {
  106. memset(&m_thread, 0, sizeof(m_thread));
  107. return;
  108. }
  109. if (pthread_create(&m_thread, NULL, ECWorkerThread::Work, this) != 0) {
  110. ec_log_crit("Unable to start thread: %s", strerror(errno));
  111. return;
  112. }
  113. set_thread_name(m_thread, "ECWorkerThread");
  114. pthread_detach(m_thread);
  115. }
  116. ECPriorityWorkerThread::ECPriorityWorkerThread(ECThreadManager *lpManager,
  117. ECDispatcher *lpDispatcher) :
  118. ECWorkerThread(lpManager, lpDispatcher, true)
  119. {
  120. if (pthread_create(&m_thread, NULL, ECWorkerThread::Work, this) != 0)
  121. ec_log_crit("Unable to start thread: %s", strerror(errno));
  122. else
  123. set_thread_name(m_thread, "ECPriorityWorkerThread");
  124. // do not detach
  125. }
  126. ECPriorityWorkerThread::~ECPriorityWorkerThread()
  127. {
  128. pthread_join(m_thread, NULL);
  129. }
  130. void *ECWorkerThread::Work(void *lpParam)
  131. {
  132. kcsrv_blocksigs();
  133. auto lpThis = static_cast<ECWorkerThread *>(lpParam);
  134. auto lpPrio = dynamic_cast<ECPriorityWorkerThread *>(lpThis);
  135. WORKITEM *lpWorkItem = NULL;
  136. ECRESULT er = erSuccess;
  137. bool fStop = false;
  138. int err = 0;
  139. ec_log_debug("Started%sthread %08x", lpPrio ? " priority " : " ", (ULONG)pthread_self());
  140. while(1) {
  141. set_thread_name(pthread_self(), "z-s: idle thread");
  142. // Get the next work item, don't wait for new items
  143. if(lpThis->m_lpDispatcher->GetNextWorkItem(&lpWorkItem, false, lpPrio != NULL) != erSuccess) {
  144. // Nothing in the queue, notify that we're idle now
  145. lpThis->m_lpManager->NotifyIdle(lpThis, &fStop);
  146. // We were requested to exit due to idle state
  147. if(fStop) {
  148. ec_log_debug("Thread %08x idle and requested to exit", (ULONG)pthread_self());
  149. break;
  150. }
  151. // Wait for next work item in the queue
  152. er = lpThis->m_lpDispatcher->GetNextWorkItem(&lpWorkItem, true, lpPrio != NULL);
  153. if (er != erSuccess)
  154. // This could happen because we were waken up because we are exiting
  155. continue;
  156. }
  157. set_thread_name(pthread_self(), format("z-s: %s", lpWorkItem->soap->host).c_str());
  158. // For SSL connections, we first must do the handshake and pass it back to the queue
  159. if (lpWorkItem->soap->ctx && !lpWorkItem->soap->ssl) {
  160. err = soap_ssl_accept(lpWorkItem->soap);
  161. if (err) {
  162. ec_log_warn("%s", soap_faultdetail(lpWorkItem->soap)[0]);
  163. ec_log_debug("%s: %s", GetSoapError(err).c_str(), soap_faultstring(lpWorkItem->soap)[0]);
  164. }
  165. } else {
  166. err = 0;
  167. // Record start of handling of this request
  168. double dblStart = GetTimeOfDay(), dblEnd = 0;
  169. // Reset last session ID so we can use it reliably after the call is done
  170. auto info = soap_info(lpWorkItem->soap);
  171. info->ulLastSessionId = 0;
  172. // Pass information on start time of the request into soap->user, so that it can be applied to the correct
  173. // session after XML parsing
  174. clock_gettime(CLOCK_THREAD_CPUTIME_ID, &info->threadstart);
  175. info->start = GetTimeOfDay();
  176. info->szFname = nullptr;
  177. info->fdone = NULL;
  178. // Do processing of work item
  179. soap_begin(lpWorkItem->soap);
  180. if(soap_begin_recv(lpWorkItem->soap)) {
  181. if(lpWorkItem->soap->error < SOAP_STOP) {
  182. // Client Updater returns 404 to the client to say it doesn't need to update, so skip this HTTP error
  183. if (lpWorkItem->soap->error != SOAP_EOF && lpWorkItem->soap->error != 404)
  184. ec_log_debug("gSOAP error on receiving request: %s", GetSoapError(lpWorkItem->soap->error).c_str());
  185. soap_send_fault(lpWorkItem->soap);
  186. goto done;
  187. }
  188. soap_closesock(lpWorkItem->soap);
  189. goto done;
  190. }
  191. // WARNING
  192. //
  193. // From the moment we call soap_serve_request, the soap object MAY be handled
  194. // by another thread. In this case, soap_serve_request() returns SOAP_NULL. We
  195. // can NOT rely on soap->error being this value since the other thread may already
  196. // have overwritten the error value.
  197. if(soap_envelope_begin_in(lpWorkItem->soap)
  198. || soap_recv_header(lpWorkItem->soap)
  199. || soap_body_begin_in(lpWorkItem->soap))
  200. {
  201. err = lpWorkItem->soap->error;
  202. } else {
  203. try {
  204. err = soap_serve_request(lpWorkItem->soap);
  205. } catch(int) {
  206. // Reply processing is handled by the callee, totally ignore the rest of processing for this item
  207. delete lpWorkItem;
  208. continue;
  209. }
  210. }
  211. if(err)
  212. {
  213. ec_log_debug("gSOAP error on processing request: %s", GetSoapError(err).c_str());
  214. soap_send_fault(lpWorkItem->soap);
  215. goto done;
  216. }
  217. done:
  218. if (info->fdone != nullptr)
  219. info->fdone(lpWorkItem->soap, info->fdoneparam);
  220. dblEnd = GetTimeOfDay();
  221. // Tell the session we're done processing the request for this session. This will also tell the session that this
  222. // thread is done processing the item, so any time spent in this thread until now can be accounted in that session.
  223. g_lpSessionManager->RemoveBusyState(info->ulLastSessionId, pthread_self());
  224. // Track cpu usage server-wide
  225. g_lpStatsCollector->Increment(SCN_SOAP_REQUESTS);
  226. g_lpStatsCollector->Increment(SCN_PROCESSING_TIME, int64_t((dblEnd - dblStart) * 1000));
  227. g_lpStatsCollector->Increment(SCN_RESPONSE_TIME, int64_t((dblEnd - lpWorkItem->dblReceiveStamp) * 1000));
  228. }
  229. // Clear memory used by soap calls. Note that this does not actually
  230. // undo our soap_new2() call so the soap object is still valid after these calls
  231. soap_destroy(lpWorkItem->soap);
  232. soap_end(lpWorkItem->soap);
  233. // We're done processing the item, the workitem's socket is returned to the queue
  234. lpThis->m_lpDispatcher->NotifyDone(lpWorkItem->soap);
  235. delete lpWorkItem;
  236. }
  237. /** free ssl error data **/
  238. ERR_remove_state(0);
  239. // We're detached, so we should clean up ourselves
  240. if (lpPrio == NULL)
  241. delete lpThis;
  242. return NULL;
  243. }
  244. ECThreadManager::ECThreadManager(ECDispatcher *lpDispatcher,
  245. unsigned int ulThreads) :
  246. m_lpDispatcher(lpDispatcher), m_ulThreads(ulThreads)
  247. {
  248. scoped_lock l_thr(m_mutexThreads);
  249. // Start our worker threads
  250. m_lpPrioWorker = new ECPriorityWorkerThread(this, lpDispatcher);
  251. for (unsigned int i = 0; i < ulThreads; ++i)
  252. m_lstThreads.push_back(new ECWorkerThread(this, lpDispatcher));
  253. }
  254. ECThreadManager::~ECThreadManager()
  255. {
  256. unsigned int ulThreads;
  257. // Wait for the threads to exit
  258. while(1) {
  259. ulock_normal l_thr(m_mutexThreads);
  260. ulThreads = m_lstThreads.size();
  261. l_thr.unlock();
  262. if(ulThreads > 0) {
  263. ec_log_notice("Still waiting for %d worker threads to exit", ulThreads);
  264. Sleep(1000);
  265. }
  266. else
  267. break;
  268. }
  269. delete m_lpPrioWorker;
  270. }
  271. ECRESULT ECThreadManager::ForceAddThread(int nThreads)
  272. {
  273. scoped_lock l_thr(m_mutexThreads);
  274. for (int i = 0; i < nThreads; ++i)
  275. m_lstThreads.push_back(new ECWorkerThread(this, m_lpDispatcher));
  276. return erSuccess;
  277. }
  278. ECRESULT ECThreadManager::GetThreadCount(unsigned int *lpulThreads)
  279. {
  280. unsigned int ulThreads;
  281. scoped_lock l_thr(m_mutexThreads);
  282. ulThreads = m_lstThreads.size();
  283. *lpulThreads = ulThreads;
  284. return erSuccess;
  285. }
  286. ECRESULT ECThreadManager::SetThreadCount(unsigned int ulThreads)
  287. {
  288. // If we're under the number of threads at the moment, start new ones
  289. scoped_lock l_thr(m_mutexThreads);
  290. // Set the default thread count
  291. m_ulThreads = ulThreads;
  292. while(ulThreads > m_lstThreads.size())
  293. m_lstThreads.push_back(new ECWorkerThread(this, m_lpDispatcher));
  294. // If we are OVER the number of threads, then the code in NotifyIdle() will bring this down
  295. return erSuccess;
  296. }
  297. // Called by worker threads only when it is idle. This is the only place where the worker thread can be
  298. // deleted.
  299. ECRESULT ECThreadManager::NotifyIdle(ECWorkerThread *lpThread, bool *lpfStop)
  300. {
  301. std::list<ECWorkerThread *>::iterator iterThreads;
  302. *lpfStop = false;
  303. scoped_lock l_thr(m_mutexThreads);
  304. // special case for priority worker
  305. if (lpThread == m_lpPrioWorker) {
  306. // exit requested?
  307. *lpfStop = (m_ulThreads == 0);
  308. return erSuccess;
  309. }
  310. if (m_ulThreads >= m_lstThreads.size())
  311. return erSuccess;
  312. // We are currently running more threads than we want, so tell the thread to stop
  313. iterThreads = std::find(m_lstThreads.begin(), m_lstThreads.end(), lpThread);
  314. if (iterThreads == m_lstThreads.end()) {
  315. ec_log_crit("A thread that we don't know is idle ...");
  316. return KCERR_NOT_FOUND;
  317. }
  318. // Remove the thread from our running thread list
  319. m_lstThreads.erase(iterThreads);
  320. // Tell the thread to exit. The thread will self-cleanup; we therefore needn't delete the object nor join with the running thread
  321. *lpfStop = true;
  322. return erSuccess;
  323. }
  324. ECWatchDog::ECWatchDog(ECConfig *lpConfig, ECDispatcher *lpDispatcher,
  325. ECThreadManager *lpThreadManager) :
  326. m_lpConfig(lpConfig), m_lpDispatcher(lpDispatcher),
  327. m_lpThreadManager(lpThreadManager)
  328. {
  329. if (pthread_create(&m_thread, NULL, ECWatchDog::Watch, this) != 0)
  330. ec_log_crit("Unable to start watchdog thread: %s", strerror(errno));
  331. else
  332. set_thread_name(m_thread, "ECWatchDog");
  333. }
  334. ECWatchDog::~ECWatchDog()
  335. {
  336. void *ret;
  337. ulock_normal l_exit(m_mutexExit);
  338. m_bExit = true;
  339. m_condExit.notify_one();
  340. l_exit.unlock();
  341. pthread_join(m_thread, &ret);
  342. }
  343. void *ECWatchDog::Watch(void *lpParam)
  344. {
  345. auto lpThis = static_cast<ECWatchDog *>(lpParam);
  346. double dblAge;
  347. kcsrv_blocksigs();
  348. while(1) {
  349. if(lpThis->m_bExit == true)
  350. break;
  351. double dblMaxFreq = atoi(lpThis->m_lpConfig->GetSetting("watchdog_frequency"));
  352. double dblMaxAge = atoi(lpThis->m_lpConfig->GetSetting("watchdog_max_age")) / 1000.0;
  353. // If the age of the front item in the queue is older than the specified maximum age, force
  354. // a new thread to be started
  355. if(lpThis->m_lpDispatcher->GetFrontItemAge(&dblAge) == erSuccess && dblAge > dblMaxAge)
  356. lpThis->m_lpThreadManager->ForceAddThread(1);
  357. // Check to see if exit flag is set, and limit rate to dblMaxFreq Hz
  358. ulock_normal l_exit(lpThis->m_mutexExit);
  359. if(lpThis->m_bExit == false) {
  360. lpThis->m_condExit.wait_for(l_exit, std::chrono::duration<double>(1 / dblMaxFreq));
  361. if (lpThis->m_bExit == true)
  362. break;
  363. }
  364. }
  365. return NULL;
  366. }
  367. ECDispatcher::ECDispatcher(ECConfig *lpConfig,
  368. CREATEPIPESOCKETCALLBACK lpCallback, void *lpParam)
  369. {
  370. m_lpConfig = lpConfig;
  371. // Default socket settings
  372. m_nMaxKeepAlive = atoi(m_lpConfig->GetSetting("server_max_keep_alive_requests"));
  373. m_nRecvTimeout = atoi(m_lpConfig->GetSetting("server_recv_timeout"));
  374. m_nReadTimeout = atoi(m_lpConfig->GetSetting("server_read_timeout"));
  375. m_nSendTimeout = atoi(m_lpConfig->GetSetting("server_send_timeout"));
  376. m_lpCreatePipeSocketCallback = lpCallback;
  377. m_lpCreatePipeSocketParam = lpParam;
  378. }
  379. ECRESULT ECDispatcher::GetThreadCount(unsigned int *lpulThreads, unsigned int *lpulIdleThreads)
  380. {
  381. ECRESULT er = m_lpThreadManager->GetThreadCount(lpulThreads);
  382. if (er != erSuccess)
  383. return er;
  384. *lpulIdleThreads = m_ulIdle;
  385. return erSuccess;
  386. }
  387. // Get the age (in seconds) of the next-in-line item in the queue, or 0 if the queue is empty
  388. ECRESULT ECDispatcher::GetFrontItemAge(double *lpdblAge)
  389. {
  390. double dblNow = GetTimeOfDay();
  391. double dblAge = 0;
  392. scoped_lock lock(m_mutexItems);
  393. if(m_queueItems.empty() && m_queuePrioItems.empty())
  394. dblAge = 0;
  395. else if (m_queueItems.empty()) // normal items queue is more important when checking queue age
  396. dblAge = dblNow - m_queuePrioItems.front()->dblReceiveStamp;
  397. else
  398. dblAge = dblNow - m_queueItems.front()->dblReceiveStamp;
  399. *lpdblAge = dblAge;
  400. return erSuccess;
  401. }
  402. ECRESULT ECDispatcher::GetQueueLength(unsigned int *lpulLength)
  403. {
  404. unsigned int ulLength = 0;
  405. scoped_lock lock(m_mutexItems);
  406. ulLength = m_queueItems.size() + m_queuePrioItems.size();
  407. *lpulLength = ulLength;
  408. return erSuccess;
  409. }
  410. ECRESULT ECDispatcher::AddListenSocket(struct soap *soap)
  411. {
  412. soap->max_keep_alive = m_nMaxKeepAlive;
  413. soap->recv_timeout = m_nReadTimeout; // Use m_nReadTimeout, the value for timeouts during XML reads
  414. soap->send_timeout = m_nSendTimeout;
  415. m_setListenSockets.insert(std::make_pair(soap->socket, soap));
  416. return erSuccess;
  417. }
  418. ECRESULT ECDispatcher::QueueItem(struct soap *soap)
  419. {
  420. auto item = new WORKITEM;
  421. CONNECTION_TYPE ulType;
  422. item->soap = soap;
  423. item->dblReceiveStamp = GetTimeOfDay();
  424. ulType = SOAP_CONNECTION_TYPE(soap);
  425. scoped_lock lock(m_mutexItems);
  426. if (ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY) {
  427. m_queuePrioItems.push(item);
  428. m_condPrioItems.notify_one();
  429. } else {
  430. m_queueItems.push(item);
  431. m_condItems.notify_one();
  432. }
  433. return erSuccess;
  434. }
  435. /**
  436. * Called by worker threads to get an item to work on
  437. *
  438. * @param[out] lppItem soap call to process
  439. * @param[in] bWait wait for an item until present or return immediately
  440. * @param[in] bPrio handle priority or normal queue
  441. *
  442. * @return error code
  443. * @retval KCERR_NOT_FOUND no soap call in the queue present
  444. */
  445. ECRESULT ECDispatcher::GetNextWorkItem(WORKITEM **lppItem, bool bWait, bool bPrio)
  446. {
  447. WORKITEM *lpItem = NULL;
  448. ECRESULT er = erSuccess;
  449. std::queue<WORKITEM *>* queue = bPrio ? &m_queuePrioItems : &m_queueItems;
  450. auto &condItems = bPrio ? m_condPrioItems : m_condItems;
  451. ulock_normal l_item(m_mutexItems);
  452. // Check the queue
  453. if(!queue->empty()) {
  454. // Item is waiting, return that
  455. lpItem = queue->front();
  456. queue->pop();
  457. } else if (!bWait || m_bExit) {
  458. // No wait requested, return not found
  459. return KCERR_NOT_FOUND;
  460. } else {
  461. // No item waiting
  462. ulock_normal l_idle(m_mutexIdle);
  463. ++m_ulIdle;
  464. l_idle.unlock();
  465. /* If requested, wait until item is available */
  466. condItems.wait(l_item);
  467. l_idle.lock();
  468. --m_ulIdle;
  469. l_idle.unlock();
  470. if (queue->empty() || m_bExit)
  471. // Condition fired, but still nothing there. Probably exit requested or wrong queue signal
  472. return KCERR_NOT_FOUND;
  473. lpItem = queue->front();
  474. queue->pop();
  475. }
  476. *lppItem = lpItem;
  477. return er;
  478. }
  479. // Called by a worker thread when it's done with an item
  480. ECRESULT ECDispatcher::NotifyDone(struct soap *soap)
  481. {
  482. // During exit, don't requeue active sockets, but close them
  483. if(m_bExit) {
  484. kopano_end_soap_connection(soap);
  485. soap_free(soap);
  486. } else {
  487. --soap->max_keep_alive;
  488. if (soap->max_keep_alive == 0)
  489. soap->keep_alive = 0;
  490. if(soap->socket != SOAP_INVALID_SOCKET) {
  491. SOAP_SOCKET socket;
  492. socket = soap->socket;
  493. ACTIVESOCKET sActive;
  494. sActive.soap = soap;
  495. time(&sActive.ulLastActivity);
  496. ulock_normal l_sock(m_mutexSockets);
  497. m_setSockets.insert(std::make_pair(soap->socket, sActive));
  498. l_sock.unlock();
  499. // Notify select restart, send socket number which is done
  500. NotifyRestart(socket);
  501. } else {
  502. // SOAP has closed the socket, no need to requeue
  503. kopano_end_soap_connection(soap);
  504. soap_free(soap);
  505. }
  506. }
  507. return erSuccess;
  508. }
  509. // Set the nominal thread count
  510. ECRESULT ECDispatcher::SetThreadCount(unsigned int ulThreads)
  511. {
  512. // if we receive a signal before the MainLoop() has started, we don't have thread manager yet
  513. if (m_lpThreadManager == NULL)
  514. return erSuccess;
  515. ECRESULT er = m_lpThreadManager->SetThreadCount(ulThreads);
  516. if (er != erSuccess)
  517. return er;
  518. // Since the threads may be blocking while waiting for the next queue item, broadcast
  519. // a wakeup for all threads so that they re-check their idle state (and exit if the thread count
  520. // is now lower)
  521. scoped_lock l_item(m_mutexItems);
  522. m_condItems.notify_all();
  523. return erSuccess;
  524. }
  525. ECRESULT ECDispatcher::DoHUP()
  526. {
  527. m_nMaxKeepAlive = atoi(m_lpConfig->GetSetting("server_max_keep_alive_requests"));
  528. m_nRecvTimeout = atoi(m_lpConfig->GetSetting("server_recv_timeout"));
  529. m_nReadTimeout = atoi(m_lpConfig->GetSetting("server_read_timeout"));
  530. m_nSendTimeout = atoi(m_lpConfig->GetSetting("server_send_timeout"));
  531. ECRESULT er = SetThreadCount(atoi(m_lpConfig->GetSetting("threads")));
  532. if (er != erSuccess)
  533. return er;
  534. for (auto const &p : m_setListenSockets) {
  535. auto ulType = SOAP_CONNECTION_TYPE(p.second);
  536. if (ulType != CONNECTION_TYPE_SSL)
  537. continue;
  538. if (soap_ssl_server_context(p.second, SOAP_SSL_DEFAULT,
  539. m_lpConfig->GetSetting("server_ssl_key_file"),
  540. m_lpConfig->GetSetting("server_ssl_key_pass", "", NULL),
  541. m_lpConfig->GetSetting("server_ssl_ca_file", "", NULL),
  542. m_lpConfig->GetSetting("server_ssl_ca_path", "", NULL),
  543. NULL, NULL, "EC")) {
  544. ec_log_crit("K-3904: Unable to setup ssl context: %s", *soap_faultdetail(p.second));
  545. return KCERR_CALL_FAILED;
  546. }
  547. char *server_ssl_protocols = strdup(m_lpConfig->GetSetting("server_ssl_protocols"));
  548. er = kc_ssl_options(p.second, server_ssl_protocols,
  549. m_lpConfig->GetSetting("server_ssl_ciphers"),
  550. m_lpConfig->GetSetting("server_ssl_prefer_server_ciphers"));
  551. free(server_ssl_protocols);
  552. }
  553. return erSuccess;
  554. }
  555. ECRESULT ECDispatcher::ShutDown()
  556. {
  557. m_bExit = true;
  558. return erSuccess;
  559. }
  560. ECDispatcherSelect::ECDispatcherSelect(ECConfig *lpConfig,
  561. CREATEPIPESOCKETCALLBACK lpCallback, void *lpCallbackParam) :
  562. ECDispatcher(lpConfig, lpCallback, lpCallbackParam)
  563. {
  564. int pipes[2];
  565. pipe(pipes);
  566. // Create a pipe that we can use to trigger select() to return
  567. m_fdRescanRead = pipes[0];
  568. m_fdRescanWrite = pipes[1];
  569. }
  570. ECRESULT ECDispatcherSelect::MainLoop()
  571. {
  572. ECRESULT er = erSuccess;
  573. ECWatchDog *lpWatchDog = NULL;
  574. int maxfds = getdtablesize();
  575. if (maxfds < 0)
  576. throw std::runtime_error("getrlimit failed");
  577. char s = 0;
  578. time_t now;
  579. CONNECTION_TYPE ulType;
  580. std::unique_ptr<struct pollfd[]> pollfd(new struct pollfd[maxfds]);
  581. for (size_t n = 0; n < maxfds; ++n) {
  582. /*
  583. * Use an identity mapping, quite like fd_set, but without the
  584. * limits of FD_SETSIZE.
  585. */
  586. pollfd[n].fd = n;
  587. pollfd[n].events = 0;
  588. }
  589. // This will start the threads
  590. m_lpThreadManager = new ECThreadManager(this, atoui(m_lpConfig->GetSetting("threads")));
  591. // Start the watchdog
  592. lpWatchDog = new ECWatchDog(m_lpConfig, this, m_lpThreadManager);
  593. // Main loop
  594. while(!m_bExit) {
  595. int nfds = 0, pfd_begin_sock, pfd_begin_listen;
  596. time(&now);
  597. // Listen on rescan trigger
  598. pollfd[0].fd = m_fdRescanRead;
  599. pollfd[0].events = POLLIN | POLLRDHUP;
  600. ++nfds;
  601. // Listen on active sockets
  602. ulock_normal l_sock(m_mutexSockets);
  603. pfd_begin_sock = nfds;
  604. for (const auto &p : m_setSockets) {
  605. ulType = SOAP_CONNECTION_TYPE(p.second.soap);
  606. if (ulType != CONNECTION_TYPE_NAMED_PIPE &&
  607. ulType != CONNECTION_TYPE_NAMED_PIPE_PRIORITY &&
  608. now - static_cast<time_t>(p.second.ulLastActivity) > m_nRecvTimeout)
  609. // Socket has been inactive for more than server_recv_timeout seconds, close the socket
  610. shutdown(p.second.soap->socket, SHUT_RDWR);
  611. pollfd[nfds].fd = p.second.soap->socket;
  612. pollfd[nfds++].events = POLLIN | POLLRDHUP;
  613. }
  614. // Listen on listener sockets
  615. pfd_begin_listen = nfds;
  616. for (const auto &p : m_setListenSockets) {
  617. pollfd[nfds].fd = p.second->socket;
  618. pollfd[nfds++].events = POLLIN | POLLRDHUP;
  619. }
  620. l_sock.unlock();
  621. // Wait for at most 1 second, so that we can close inactive sockets
  622. // Wait for activity
  623. auto n = poll(pollfd.get(), nfds, 1 * 1000);
  624. if (n < 0)
  625. continue; // signal caught, restart
  626. if (pollfd[0].revents & (POLLIN | POLLRDHUP)) {
  627. char s[128];
  628. // A socket rescan has been triggered, we don't need to do anything, just read the data, discard it
  629. // and restart the select call
  630. read(m_fdRescanRead, s, sizeof(s));
  631. }
  632. // Search for activity on active sockets
  633. l_sock.lock();
  634. auto iterSockets = m_setSockets.cbegin();
  635. for (size_t i = pfd_begin_sock; i < pfd_begin_listen; ++i) {
  636. if (!(pollfd[i].revents & (POLLIN | POLLRDHUP)))
  637. continue;
  638. /*
  639. * Forward to the data structure belonging to the pollfd.
  640. * (The order of pollfd and m_setSockets is the same,
  641. * so the element has to be there.)
  642. */
  643. while (iterSockets != m_setSockets.cend() && iterSockets->second.soap->socket != pollfd[i].fd)
  644. ++iterSockets;
  645. if (iterSockets == m_setSockets.cend()) {
  646. ec_log_err("K-1577: socket lost");
  647. /* something is very off - try again at next iteration */
  648. break;
  649. }
  650. // Activity on a TCP/pipe socket
  651. // First, check for EOF
  652. if (recv(pollfd[i].fd, &s, 1, MSG_PEEK) == 0) {
  653. // EOF occurred, just close the socket and remove it from the socket list
  654. kopano_end_soap_connection(iterSockets->second.soap);
  655. soap_free(iterSockets->second.soap);
  656. m_setSockets.erase(iterSockets++);
  657. continue;
  658. }
  659. // Actual data waiting, push it on the processing queue
  660. QueueItem(iterSockets->second.soap);
  661. // Remove socket from listen list for now, since we're already handling data there and don't
  662. // want to interfere with the thread that is now handling that socket. It will be passed back
  663. // to us when the request is done.
  664. m_setSockets.erase(iterSockets++);
  665. }
  666. l_sock.unlock();
  667. // Search for activity on listen sockets
  668. auto sockiter = m_setListenSockets.cbegin();
  669. for (size_t i = pfd_begin_listen; i < nfds; ++i) {
  670. if (!(pollfd[i].revents & (POLLIN | POLLRDHUP)))
  671. continue;
  672. while (sockiter != m_setListenSockets.cend() && sockiter->second->socket != pollfd[i].fd)
  673. ++sockiter;
  674. if (sockiter == m_setListenSockets.cend()) {
  675. ec_log_err("K-1578: socket lost");
  676. break;
  677. }
  678. const auto &p = *sockiter;
  679. ACTIVESOCKET sActive;
  680. auto newsoap = soap_copy(p.second);
  681. if (newsoap == NULL) {
  682. ec_log_crit("Unable to accept new connection: out of memory");
  683. continue;
  684. }
  685. kopano_new_soap_connection(SOAP_CONNECTION_TYPE(p.second), newsoap);
  686. // Record last activity (now)
  687. time(&sActive.ulLastActivity);
  688. ulType = SOAP_CONNECTION_TYPE(p.second);
  689. if (ulType == CONNECTION_TYPE_NAMED_PIPE ||
  690. ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY) {
  691. int socket = accept(newsoap->master, NULL, 0);
  692. newsoap->socket = socket;
  693. } else {
  694. soap_accept(newsoap);
  695. }
  696. if (newsoap->socket == SOAP_INVALID_SOCKET) {
  697. if (ulType == CONNECTION_TYPE_NAMED_PIPE)
  698. ec_log_debug("Error accepting incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_name"));
  699. else if (ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY)
  700. ec_log_debug("Error accepting incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_priority"));
  701. else
  702. ec_log_debug("Error accepting incoming connection from network.");
  703. kopano_end_soap_connection(newsoap);
  704. soap_free(newsoap);
  705. continue;
  706. }
  707. if (ulType == CONNECTION_TYPE_NAMED_PIPE)
  708. ec_log_debug("Accepted incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_name"));
  709. else if (ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY)
  710. ec_log_debug("Accepted incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_priority"));
  711. else
  712. ec_log_debug("Accepted incoming%sconnection from %s",
  713. ulType == CONNECTION_TYPE_SSL ? " SSL ":" ",
  714. newsoap->host);
  715. newsoap->socket = ec_relocate_fd(newsoap->socket);
  716. g_lpStatsCollector->Max(SCN_MAX_SOCKET_NUMBER, (LONGLONG)newsoap->socket);
  717. g_lpStatsCollector->Increment(SCN_SERVER_CONNECTIONS);
  718. sActive.soap = newsoap;
  719. l_sock.lock();
  720. m_setSockets.insert(std::make_pair(sActive.soap->socket, sActive));
  721. l_sock.unlock();
  722. }
  723. }
  724. // Delete the watchdog. This makes sure no new threads will be started.
  725. delete lpWatchDog;
  726. // Set the thread count to zero so that threads will exit
  727. m_lpThreadManager->SetThreadCount(0);
  728. // Notify threads that they should re-query their idle state (and exit)
  729. ulock_normal l_item(m_mutexItems);
  730. m_condItems.notify_all();
  731. m_condPrioItems.notify_all();
  732. l_item.unlock();
  733. // Delete thread manager (waits for threads to become idle). During this time
  734. // the threads may report back a workitem as being done. If this is the case, we directly close that socket too.
  735. delete m_lpThreadManager;
  736. // Empty the queue
  737. l_item.lock();
  738. while(!m_queueItems.empty()) { kopano_end_soap_connection(m_queueItems.front()->soap); soap_free(m_queueItems.front()->soap); m_queueItems.pop(); }
  739. while(!m_queuePrioItems.empty()) { kopano_end_soap_connection(m_queuePrioItems.front()->soap); soap_free(m_queuePrioItems.front()->soap); m_queuePrioItems.pop(); }
  740. l_item.unlock();
  741. // Close all listener sockets.
  742. for (const auto &p : m_setListenSockets) {
  743. kopano_end_soap_listener(p.second);
  744. soap_free(p.second);
  745. }
  746. // Close all sockets. This will cause all that we were listening on clients to get an EOF
  747. ulock_normal l_sock(m_mutexSockets);
  748. for (const auto &p : m_setSockets) {
  749. kopano_end_soap_connection(p.second.soap);
  750. soap_free(p.second.soap);
  751. }
  752. l_sock.unlock();
  753. return er;
  754. }
  755. ECRESULT ECDispatcherSelect::ShutDown()
  756. {
  757. ECDispatcher::ShutDown();
  758. char s = 0;
  759. // Notify select wakeup
  760. write(m_fdRescanWrite, &s, 1);
  761. return erSuccess;
  762. }
  763. ECRESULT ECDispatcherSelect::NotifyRestart(SOAP_SOCKET s)
  764. {
  765. write(m_fdRescanWrite, &s, sizeof(SOAP_SOCKET));
  766. return erSuccess;
  767. }
  768. #ifdef HAVE_EPOLL_CREATE
  769. ECDispatcherEPoll::ECDispatcherEPoll(ECConfig *lpConfig,
  770. CREATEPIPESOCKETCALLBACK lpCallback, void *lpCallbackParam) :
  771. ECDispatcher(lpConfig, lpCallback, lpCallbackParam)
  772. {
  773. m_fdMax = getdtablesize();
  774. m_epFD = epoll_create(m_fdMax);
  775. }
  776. ECDispatcherEPoll::~ECDispatcherEPoll()
  777. {
  778. close(m_epFD);
  779. }
  780. ECRESULT ECDispatcherEPoll::MainLoop()
  781. {
  782. ECRESULT er = erSuccess;
  783. ECWatchDog *lpWatchDog = NULL;
  784. time_t now = 0;
  785. time_t last = 0;
  786. std::map<int, ACTIVESOCKET>::iterator iterSockets;
  787. std::map<int, struct soap *>::const_iterator iterListenSockets;
  788. CONNECTION_TYPE ulType;
  789. epoll_event epevent;
  790. epoll_event *epevents;
  791. int n;
  792. epevents = new epoll_event[m_fdMax];
  793. // setup epoll for listen sockets
  794. memset(&epevent, 0, sizeof(epoll_event));
  795. epevent.events = EPOLLIN | EPOLLPRI; // wait for input and priority (?) events
  796. for (iterListenSockets = m_setListenSockets.begin();
  797. iterListenSockets != m_setListenSockets.end();
  798. ++iterListenSockets) {
  799. epevent.data.fd = iterListenSockets->second->socket;
  800. epoll_ctl(m_epFD, EPOLL_CTL_ADD, iterListenSockets->second->socket, &epevent);
  801. }
  802. // This will start the threads
  803. m_lpThreadManager = new ECThreadManager(this, atoui(m_lpConfig->GetSetting("threads")));
  804. // Start the watchdog
  805. lpWatchDog = new ECWatchDog(m_lpConfig, this, m_lpThreadManager);
  806. while (!m_bExit) {
  807. time(&now);
  808. // find timedout sockets once per second
  809. ulock_normal l_sock(m_mutexSockets);
  810. if(now > last) {
  811. iterSockets = m_setSockets.begin();
  812. while (iterSockets != m_setSockets.end()) {
  813. ulType = SOAP_CONNECTION_TYPE(iterSockets->second.soap);
  814. if (ulType != CONNECTION_TYPE_NAMED_PIPE &&
  815. ulType != CONNECTION_TYPE_NAMED_PIPE_PRIORITY &&
  816. now - static_cast<time_t>(iterSockets->second.ulLastActivity) > m_nRecvTimeout)
  817. // Socket has been inactive for more than server_recv_timeout seconds, close the socket
  818. shutdown(iterSockets->second.soap->socket, SHUT_RDWR);
  819. ++iterSockets;
  820. }
  821. last = now;
  822. }
  823. l_sock.unlock();
  824. n = epoll_wait(m_epFD, epevents, m_fdMax, 1000); // timeout -1 is wait indefinitely
  825. l_sock.lock();
  826. for (int i = 0; i < n; ++i) {
  827. iterListenSockets = m_setListenSockets.find(epevents[i].data.fd);
  828. if (iterListenSockets != m_setListenSockets.end()) {
  829. // this was a listen socket .. accept and continue
  830. struct soap *newsoap;
  831. ACTIVESOCKET sActive;
  832. newsoap = soap_copy(iterListenSockets->second);
  833. kopano_new_soap_connection(SOAP_CONNECTION_TYPE(iterListenSockets->second), newsoap);
  834. // Record last activity (now)
  835. time(&sActive.ulLastActivity);
  836. ulType = SOAP_CONNECTION_TYPE(iterListenSockets->second);
  837. if (ulType == CONNECTION_TYPE_NAMED_PIPE || ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY)
  838. newsoap->socket = accept(newsoap->master, NULL, 0);
  839. else
  840. soap_accept(newsoap);
  841. if(newsoap->socket == SOAP_INVALID_SOCKET) {
  842. if (ulType == CONNECTION_TYPE_NAMED_PIPE)
  843. ec_log_debug("Error accepting incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_name"));
  844. else if (ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY)
  845. ec_log_debug("Error accepting incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_priority"));
  846. else
  847. ec_log_debug("Error accepting incoming connection from network.");
  848. kopano_end_soap_connection(newsoap);
  849. soap_free(newsoap);
  850. } else {
  851. if (ulType == CONNECTION_TYPE_NAMED_PIPE)
  852. ec_log_debug("Accepted incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_name"));
  853. else if (ulType == CONNECTION_TYPE_NAMED_PIPE_PRIORITY)
  854. ec_log_debug("Accepted incoming connection from file://%s", m_lpConfig->GetSetting("server_pipe_priority"));
  855. else
  856. ec_log_debug("Accepted incoming%sconnection from %s",
  857. ulType == CONNECTION_TYPE_SSL ? " SSL ":" ",
  858. newsoap->host);
  859. newsoap->socket = ec_relocate_fd(newsoap->socket);
  860. g_lpStatsCollector->Max(SCN_MAX_SOCKET_NUMBER, (LONGLONG)newsoap->socket);
  861. g_lpStatsCollector->Increment(SCN_SERVER_CONNECTIONS);
  862. // directly make worker thread active
  863. sActive.soap = newsoap;
  864. m_setSockets.insert(std::make_pair(sActive.soap->socket, sActive));
  865. NotifyRestart(newsoap->socket);
  866. }
  867. } else {
  868. // this is a new request from an existing client
  869. iterSockets = m_setSockets.find(epevents[i].data.fd);
  870. // remove from epfd, either close socket, or it will be reactivated later in the epfd
  871. epevent.data.fd = iterSockets->second.soap->socket;
  872. epoll_ctl(m_epFD, EPOLL_CTL_DEL, iterSockets->second.soap->socket, &epevent);
  873. if (epevents[i].events & EPOLLHUP) {
  874. kopano_end_soap_connection(iterSockets->second.soap);
  875. soap_free(iterSockets->second.soap);
  876. m_setSockets.erase(iterSockets);
  877. } else {
  878. QueueItem(iterSockets->second.soap);
  879. // Remove socket from listen list for now, since we're already handling data there and don't
  880. // want to interfere with the thread that is now handling that socket. It will be passed back
  881. // to us when the request is done.
  882. m_setSockets.erase(iterSockets);
  883. }
  884. }
  885. }
  886. l_sock.unlock();
  887. }
  888. // Delete the watchdog. This makes sure no new threads will be started.
  889. delete lpWatchDog;
  890. // Set the thread count to zero so that threads will exit
  891. m_lpThreadManager->SetThreadCount(0);
  892. // Notify threads that they should re-query their idle state (and exit)
  893. ulock_normal l_item(m_mutexItems);
  894. m_condItems.notify_all();
  895. m_condPrioItems.notify_all();
  896. l_item.unlock();
  897. delete m_lpThreadManager;
  898. // Empty the queue
  899. l_item.lock();
  900. while(!m_queueItems.empty()) { kopano_end_soap_connection(m_queueItems.front()->soap); soap_free(m_queueItems.front()->soap); m_queueItems.pop(); }
  901. while(!m_queuePrioItems.empty()) { kopano_end_soap_connection(m_queuePrioItems.front()->soap); soap_free(m_queuePrioItems.front()->soap); m_queuePrioItems.pop(); }
  902. l_item.unlock();
  903. // Close all listener sockets.
  904. for (iterListenSockets = m_setListenSockets.begin();
  905. iterListenSockets != m_setListenSockets.end();
  906. ++iterListenSockets) {
  907. kopano_end_soap_listener(iterListenSockets->second);
  908. soap_free(iterListenSockets->second);
  909. }
  910. // Close all sockets. This will cause all that we were listening on clients to get an EOF
  911. ulock_normal l_sock(m_mutexSockets);
  912. for (iterSockets = m_setSockets.begin(); iterSockets != m_setSockets.end(); ++iterSockets) {
  913. kopano_end_soap_connection(iterSockets->second.soap);
  914. soap_free(iterSockets->second.soap);
  915. }
  916. l_sock.unlock();
  917. delete [] epevents;
  918. return er;
  919. }
  920. ECRESULT ECDispatcherEPoll::NotifyRestart(SOAP_SOCKET s)
  921. {
  922. // add soap socket in epoll fd
  923. epoll_event epevent;
  924. memset(&epevent, 0, sizeof(epoll_event));
  925. epevent.events = EPOLLIN | EPOLLPRI; // wait for input and priority (?) events
  926. epevent.data.fd = s;
  927. epoll_ctl(m_epFD, EPOLL_CTL_ADD, epevent.data.fd, &epevent);
  928. return erSuccess;
  929. }
  930. #endif