ECNotificationManager.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <chrono>
  19. #include <kopano/lockhelper.hpp>
  20. #include <pthread.h>
  21. #include "ECNotificationManager.h"
  22. #include "ECSession.h"
  23. #include "ECSessionManager.h"
  24. #include "ECStringCompat.h"
  25. #include "soapH.h"
  26. namespace KC {
  27. // Copied from generated soapServer.cpp
  28. static int soapresponse(struct notifyResponse notifications, struct soap *soap)
  29. {
  30. soap_serializeheader(soap);
  31. soap_serialize_notifyResponse(soap, &notifications);
  32. if (soap_begin_count(soap))
  33. return soap->error;
  34. if (soap->mode & SOAP_IO_LENGTH)
  35. { if (soap_envelope_begin_out(soap)
  36. || soap_putheader(soap)
  37. || soap_body_begin_out(soap)
  38. || soap_put_notifyResponse(soap, &notifications, "ns:notifyResponse", NULL)
  39. || soap_body_end_out(soap)
  40. || soap_envelope_end_out(soap))
  41. return soap->error;
  42. };
  43. if (soap_end_count(soap)
  44. || soap_response(soap, SOAP_OK)
  45. || soap_envelope_begin_out(soap)
  46. || soap_putheader(soap)
  47. || soap_body_begin_out(soap)
  48. || soap_put_notifyResponse(soap, &notifications, "ns:notifyResponse", NULL)
  49. || soap_body_end_out(soap)
  50. || soap_envelope_end_out(soap)
  51. || soap_end_send(soap))
  52. return soap->error;
  53. return soap_closesock(soap);
  54. }
  55. void (*kopano_notify_done)(struct soap *) = [](struct soap *) {};
  56. ECNotificationManager::ECNotificationManager(void)
  57. {
  58. pthread_create(&m_thread, NULL, Thread, this);
  59. set_thread_name(m_thread, "NotificationManager");
  60. }
  61. ECNotificationManager::~ECNotificationManager()
  62. {
  63. ulock_normal l_ses(m_mutexSessions);
  64. m_bExit = true;
  65. m_condSessions.notify_all();
  66. l_ses.unlock();
  67. ec_log_info("Shutdown notification manager");
  68. pthread_join(m_thread, NULL);
  69. // Close and free any pending requests (clients will receive EOF)
  70. for (const auto &p : m_mapRequests) {
  71. // we can't call kopano_notify_done here, race condition on shutdown in ECSessionManager vs ECDispatcher
  72. kopano_end_soap_connection(p.second.soap);
  73. soap_destroy(p.second.soap);
  74. soap_end(p.second.soap);
  75. soap_free(p.second.soap);
  76. }
  77. }
  78. // Called by the SOAP handler
  79. HRESULT ECNotificationManager::AddRequest(ECSESSIONID ecSessionId, struct soap *soap)
  80. {
  81. struct soap *lpItem = NULL;
  82. ulock_normal l_req(m_mutexRequests);
  83. auto iterRequest = m_mapRequests.find(ecSessionId);
  84. if (iterRequest != m_mapRequests.cend()) {
  85. // Hm. There is already a SOAP request waiting for this session id. Apparently a second SOAP connection has now
  86. // requested notifications. Since this should only happen if the client thinks it has lost its connection and has
  87. // restarted the request, we will replace the existing request with this one.
  88. ec_log_warn("Replacing notification request for ID %llu",
  89. static_cast<unsigned long long>(ecSessionId));
  90. // Return the previous request as an error
  91. struct notifyResponse notifications;
  92. soap_default_notifyResponse(iterRequest->second.soap, &notifications);
  93. notifications.er = KCERR_NOT_FOUND; // Should be something like 'INTERRUPTED' or something
  94. if (soapresponse(notifications, iterRequest->second.soap))
  95. // Handle error on the response
  96. soap_send_fault(iterRequest->second.soap);
  97. soap_destroy(iterRequest->second.soap);
  98. soap_end(iterRequest->second.soap);
  99. lpItem = iterRequest->second.soap;
  100. // Pass the socket back to the socket manager (which will probably close it since the client should not be holding two notification sockets)
  101. kopano_notify_done(lpItem);
  102. }
  103. NOTIFREQUEST req;
  104. req.soap = soap;
  105. time(&req.ulRequestTime);
  106. m_mapRequests[ecSessionId] = req;
  107. l_req.unlock();
  108. // There may already be notifications waiting for this session, so post a change on this session so that the
  109. // thread will attempt to get notifications on this session
  110. NotifyChange(ecSessionId);
  111. return hrSuccess;
  112. }
  113. // Called by a session when it has a notification to send
  114. HRESULT ECNotificationManager::NotifyChange(ECSESSIONID ecSessionId)
  115. {
  116. // Simply mark the session in our set of active sessions
  117. scoped_lock l_ses(m_mutexSessions);
  118. m_setActiveSessions.insert(ecSessionId);
  119. m_condSessions.notify_all(); /* Wake up thread due to activity */
  120. return hrSuccess;
  121. }
  122. void * ECNotificationManager::Thread(void *lpParam)
  123. {
  124. return static_cast<ECNotificationManager *>(lpParam)->Work();
  125. }
  126. void *ECNotificationManager::Work() {
  127. ECRESULT er = erSuccess;
  128. ECSession *lpecSession = NULL;
  129. struct notifyResponse notifications;
  130. std::set<ECSESSIONID> setActiveSessions;
  131. struct soap *lpItem;
  132. time_t ulNow = 0;
  133. // Keep looping until we should exit
  134. while(1) {
  135. ulock_normal l_ses(m_mutexSessions);
  136. if (m_bExit)
  137. break;
  138. if (m_setActiveSessions.size() == 0)
  139. /* Wait for events for maximum of 1 sec */
  140. m_condSessions.wait_for(l_ses, std::chrono::seconds(1));
  141. // Make a copy of the session list so we can release the lock ASAP
  142. setActiveSessions = m_setActiveSessions;
  143. m_setActiveSessions.clear();
  144. l_ses.unlock();
  145. // Look at all the sessions that have signalled a change
  146. for (const auto &ses : setActiveSessions) {
  147. lpItem = NULL;
  148. ulock_normal l_req(m_mutexRequests);
  149. // Find the request for the session that had something to say
  150. auto iterRequest = m_mapRequests.find(ses);
  151. if (iterRequest != m_mapRequests.cend()) {
  152. // Reset notification response to default values
  153. soap_default_notifyResponse(iterRequest->second.soap, &notifications);
  154. if(g_lpSessionManager->ValidateSession(iterRequest->second.soap, ses, &lpecSession, true) == erSuccess) {
  155. // Get the notifications from the session
  156. er = lpecSession->GetNotifyItems(iterRequest->second.soap, &notifications);
  157. if(er == KCERR_NOT_FOUND) {
  158. if(time(NULL) - iterRequest->second.ulRequestTime < m_ulTimeout) {
  159. // No notifications - this means we have to wait. This can happen if the session was marked active since
  160. // the request was just made, and there may have been notifications still waiting for us
  161. l_req.unlock();
  162. lpecSession->Unlock();
  163. continue; // Totally ignore this item == wait
  164. } else {
  165. // No notifications and we're out of time, just respond OK with 0 notifications
  166. er = erSuccess;
  167. notifications.pNotificationArray = (struct notificationArray *)soap_malloc(iterRequest->second.soap, sizeof(notificationArray));
  168. soap_default_notificationArray(iterRequest->second.soap, notifications.pNotificationArray);
  169. }
  170. }
  171. ULONG ulCapabilities = lpecSession->GetCapabilities();
  172. if (er == erSuccess && (ulCapabilities & KOPANO_CAP_UNICODE) == 0) {
  173. ECStringCompat stringCompat(false);
  174. er = FixNotificationsEncoding(iterRequest->second.soap, stringCompat, notifications.pNotificationArray);
  175. }
  176. notifications.er = er;
  177. lpecSession->Unlock();
  178. } else {
  179. // The session is dead
  180. notifications.er = KCERR_END_OF_SESSION;
  181. }
  182. // Send the SOAP data
  183. if (soapresponse(notifications, iterRequest->second.soap))
  184. // Handle error on the response
  185. soap_send_fault(iterRequest->second.soap);
  186. // Free allocated SOAP data (in GetNotifyItems())
  187. soap_destroy(iterRequest->second.soap);
  188. soap_end(iterRequest->second.soap);
  189. // Since we have responded, remove the item from our request list and pass it back to the active socket list so
  190. // that the next SOAP call can be handled (probably another notification request)
  191. lpItem = iterRequest->second.soap;
  192. m_mapRequests.erase(iterRequest);
  193. } else {
  194. // Nobody was listening to this session, just ignore it
  195. }
  196. l_req.unlock();
  197. if(lpItem)
  198. kopano_notify_done(lpItem);
  199. }
  200. /* Find all notification requests which have not received any data for m_ulTimeout seconds. This makes sure
  201. * that the client get a response, even if there are no notifications. Since the client has a hard-coded
  202. * TCP timeout of 70 seconds, we need to respond well within those 70 seconds. We therefore use a timeout
  203. * value of 60 seconds here.
  204. */
  205. ulock_normal l_req(m_mutexRequests);
  206. time(&ulNow);
  207. for (const auto &req : m_mapRequests)
  208. if (ulNow - req.second.ulRequestTime > m_ulTimeout)
  209. // Mark the session as active so it will be processed in the next loop
  210. NotifyChange(req.first);
  211. }
  212. return NULL;
  213. }
  214. } /* namespace */