ECSessionGroup.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <mapidefs.h>
  19. #include <mapitags.h>
  20. #include <algorithm>
  21. #include <kopano/lockhelper.hpp>
  22. #include "ECSession.h"
  23. #include "ECSessionGroup.h"
  24. #include "ECSessionManager.h"
  25. #include "SOAPUtils.h"
  26. namespace KC {
  27. class FindChangeAdvise {
  28. public:
  29. FindChangeAdvise(ECSESSIONID ulSession, unsigned int ulConnection)
  30. : m_ulSession(ulSession)
  31. , m_ulConnection(ulConnection)
  32. { }
  33. bool operator()(const CHANGESUBSCRIBEMAP::value_type &rhs) const
  34. {
  35. return rhs.second.ulSession == m_ulSession && rhs.second.ulConnection == m_ulConnection;
  36. }
  37. private:
  38. ECSESSIONID m_ulSession;
  39. unsigned int m_ulConnection;
  40. };
  41. ECSessionGroup::ECSessionGroup(ECSESSIONGROUPID sessionGroupId,
  42. ECSessionManager *lpSessionManager) :
  43. m_sessionGroupId(sessionGroupId), m_lpSessionManager(lpSessionManager)
  44. {
  45. }
  46. ECSessionGroup::~ECSessionGroup()
  47. {
  48. /* Unsubscribe any subscribed stores */
  49. for (const auto &p : m_mapSubscribedStores)
  50. m_lpSessionManager->UnsubscribeObjectEvents(p.second, m_sessionGroupId);
  51. }
  52. void ECSessionGroup::Lock()
  53. {
  54. /* Increase our refcount by one */
  55. scoped_lock lock(m_hThreadReleasedMutex);
  56. ++m_ulRefCount;
  57. }
  58. void ECSessionGroup::Unlock()
  59. {
  60. // Decrease our refcount by one, signal ThreadReleased if RefCount == 0
  61. scoped_lock lock(m_hThreadReleasedMutex);
  62. --m_ulRefCount;
  63. if (!IsLocked())
  64. m_hThreadReleased.notify_one();
  65. }
  66. void ECSessionGroup::AddSession(ECSession *lpSession)
  67. {
  68. scoped_rlock lock(m_hSessionMapLock);
  69. m_mapSessions.insert(SESSIONINFOMAP::value_type(lpSession->GetSessionId(), sessionInfo(lpSession)));
  70. }
  71. void ECSessionGroup::ReleaseSession(ECSession *lpSession)
  72. {
  73. ulock_rec l_map(m_hSessionMapLock);
  74. m_mapSessions.erase(lpSession->GetSessionId());
  75. l_map.unlock();
  76. scoped_lock l_note(m_hNotificationLock);
  77. for (auto i = m_mapSubscribe.cbegin(); i != m_mapSubscribe.cend(); ) {
  78. if (i->second.ulSession != lpSession->GetSessionId()) {
  79. ++i;
  80. continue;
  81. }
  82. auto iRemove = i;
  83. ++i;
  84. m_mapSubscribe.erase(iRemove);
  85. }
  86. }
  87. void ECSessionGroup::ShutdownSession(ECSession *lpSession)
  88. {
  89. /* This session is used to get the notifications, stop GetNotifyItems() */
  90. if (m_getNotifySession == lpSession->GetSessionId())
  91. releaseListeners();
  92. }
  93. bool ECSessionGroup::isOrphan()
  94. {
  95. scoped_rlock lock(m_hSessionMapLock);
  96. return m_mapSessions.empty();
  97. }
  98. void ECSessionGroup::UpdateSessionTime()
  99. {
  100. scoped_rlock lock(m_hSessionMapLock);
  101. for (const auto &i : m_mapSessions)
  102. i.second.lpSession->UpdateSessionTime();
  103. }
  104. ECRESULT ECSessionGroup::AddAdvise(ECSESSIONID ulSessionId, unsigned int ulConnection, unsigned int ulKey, unsigned int ulEventMask)
  105. {
  106. ECRESULT hr = erSuccess;
  107. subscribeItem sSubscribeItem;
  108. sSubscribeItem.ulSession = ulSessionId;
  109. sSubscribeItem.ulConnection = ulConnection;
  110. sSubscribeItem.ulKey = ulKey;
  111. sSubscribeItem.ulEventMask = ulEventMask;
  112. {
  113. scoped_lock lock(m_hNotificationLock);
  114. m_mapSubscribe.insert(SUBSCRIBEMAP::value_type(ulConnection, sSubscribeItem));
  115. }
  116. if(ulEventMask & (fnevNewMail | fnevObjectModified | fnevObjectCreated | fnevObjectCopied | fnevObjectDeleted | fnevObjectMoved)) {
  117. // Object and new mail notifications should be subscribed at the session manager
  118. unsigned int ulStore = 0;
  119. m_lpSessionManager->GetCacheManager()->GetStore(ulKey, &ulStore, NULL);
  120. m_lpSessionManager->SubscribeObjectEvents(ulStore, this->m_sessionGroupId);
  121. scoped_lock lock(m_mutexSubscribedStores);
  122. m_mapSubscribedStores.insert(std::make_pair(ulKey, ulStore));
  123. }
  124. return hr;
  125. }
  126. ECRESULT ECSessionGroup::AddChangeAdvise(ECSESSIONID ulSessionId, unsigned int ulConnection, notifySyncState *lpSyncState)
  127. {
  128. changeSubscribeItem sSubscribeItem = {ulSessionId, ulConnection};
  129. if (lpSyncState == NULL)
  130. return KCERR_INVALID_PARAMETER;
  131. sSubscribeItem.sSyncState = *lpSyncState;
  132. scoped_lock lock(m_hNotificationLock);
  133. m_mapChangeSubscribe.insert(CHANGESUBSCRIBEMAP::value_type(lpSyncState->ulSyncId, sSubscribeItem));
  134. return erSuccess;
  135. }
  136. ECRESULT ECSessionGroup::DelAdvise(ECSESSIONID ulSessionId, unsigned int ulConnection)
  137. {
  138. ECRESULT hr = erSuccess;
  139. scoped_lock lock(m_hNotificationLock);
  140. auto iterSubscription = m_mapSubscribe.find(ulConnection);
  141. if (iterSubscription == m_mapSubscribe.cend()) {
  142. // Apparently the connection was used for change notifications.
  143. auto iterItem = find_if(m_mapChangeSubscribe.cbegin(),
  144. m_mapChangeSubscribe.cend(),
  145. FindChangeAdvise(ulSessionId, ulConnection));
  146. if (iterItem != m_mapChangeSubscribe.cend())
  147. m_mapChangeSubscribe.erase(iterItem);
  148. } else {
  149. if(iterSubscription->second.ulEventMask & (fnevObjectModified | fnevObjectCreated | fnevObjectCopied | fnevObjectDeleted | fnevObjectMoved)) {
  150. // Object notification - remove our subscription to the store
  151. scoped_lock lock(m_mutexSubscribedStores);
  152. // Find the store that the key was subscribed for
  153. auto iterSubscribed = m_mapSubscribedStores.find(iterSubscription->second.ulKey);
  154. if (iterSubscribed != m_mapSubscribedStores.cend()) {
  155. // Unsubscribe the store
  156. m_lpSessionManager->UnsubscribeObjectEvents(iterSubscribed->second, this->m_sessionGroupId);
  157. // Remove from our list
  158. m_mapSubscribedStores.erase(iterSubscribed);
  159. } else
  160. assert(false); // Unsubscribe for something that was not subscribed
  161. }
  162. m_mapSubscribe.erase(iterSubscription);
  163. }
  164. return hr;
  165. }
  166. ECRESULT ECSessionGroup::AddNotification(notification *notifyItem, unsigned int ulKey, unsigned int ulStore, ECSESSIONID ulSessionId)
  167. {
  168. ECRESULT hr = erSuccess;
  169. ulock_normal l_note(m_hNotificationLock);
  170. ECNotification notify(*notifyItem);
  171. for (const auto &i : m_mapSubscribe) {
  172. if ((ulSessionId != 0 && ulSessionId != i.second.ulSession) ||
  173. (ulKey != i.second.ulKey && i.second.ulKey != ulStore) ||
  174. !(notifyItem->ulEventType & i.second.ulEventMask))
  175. continue;
  176. notify.SetConnection(i.second.ulConnection);
  177. m_listNotification.push_back(notify);
  178. }
  179. l_note.unlock();
  180. // Since we now have a notification ready to send, tell the session manager that we have something to send. Since
  181. // a notification can be read from any session in the session group, we have to notify all of the sessions
  182. scoped_rlock l_ses(m_hSessionMapLock);
  183. for (const auto &p : m_mapSessions)
  184. m_lpSessionManager->NotifyNotificationReady(p.second.lpSession->GetSessionId());
  185. return hr;
  186. }
  187. ECRESULT ECSessionGroup::AddNotificationTable(ECSESSIONID ulSessionId, unsigned int ulType, unsigned int ulObjType, unsigned int ulTableId,
  188. sObjectTableKey* lpsChildRow, sObjectTableKey* lpsPrevRow, struct propValArray *lpRow)
  189. {
  190. ECRESULT hr = erSuccess;
  191. Lock();
  192. auto lpNotify = s_alloc<notification>(nullptr);
  193. memset(lpNotify, 0, sizeof(notification));
  194. lpNotify->tab = s_alloc<notificationTable>(nullptr);
  195. memset(lpNotify->tab, 0, sizeof(notificationTable));
  196. lpNotify->ulEventType = fnevTableModified;
  197. lpNotify->tab->ulTableEvent = ulType;
  198. if(lpsChildRow && (lpsChildRow->ulObjId > 0 || lpsChildRow->ulOrderId > 0)) {
  199. lpNotify->tab->propIndex.ulPropTag = PR_INSTANCE_KEY;
  200. lpNotify->tab->propIndex.__union = SOAP_UNION_propValData_bin;
  201. lpNotify->tab->propIndex.Value.bin = s_alloc<xsd__base64Binary>(nullptr);
  202. lpNotify->tab->propIndex.Value.bin->__ptr = s_alloc<unsigned char>(nullptr, sizeof(ULONG) * 2);
  203. lpNotify->tab->propIndex.Value.bin->__size = sizeof(ULONG)*2;
  204. memcpy(lpNotify->tab->propIndex.Value.bin->__ptr, &lpsChildRow->ulObjId, sizeof(ULONG));
  205. memcpy(lpNotify->tab->propIndex.Value.bin->__ptr+sizeof(ULONG), &lpsChildRow->ulOrderId, sizeof(ULONG));
  206. }else {
  207. lpNotify->tab->propIndex.ulPropTag = PR_NULL;
  208. lpNotify->tab->propIndex.__union = SOAP_UNION_propValData_ul;
  209. }
  210. if(lpsPrevRow && (lpsPrevRow->ulObjId > 0 || lpsPrevRow->ulOrderId > 0))
  211. {
  212. lpNotify->tab->propPrior.ulPropTag = PR_INSTANCE_KEY;
  213. lpNotify->tab->propPrior.__union = SOAP_UNION_propValData_bin;
  214. lpNotify->tab->propPrior.Value.bin = s_alloc<xsd__base64Binary>(nullptr);
  215. lpNotify->tab->propPrior.Value.bin->__ptr = s_alloc<unsigned char>(nullptr, sizeof(ULONG) * 2);
  216. lpNotify->tab->propPrior.Value.bin->__size = sizeof(ULONG)*2;
  217. memcpy(lpNotify->tab->propPrior.Value.bin->__ptr, &lpsPrevRow->ulObjId, sizeof(ULONG));
  218. memcpy(lpNotify->tab->propPrior.Value.bin->__ptr+sizeof(ULONG), &lpsPrevRow->ulOrderId, sizeof(ULONG));
  219. }else {
  220. lpNotify->tab->propPrior.__union = SOAP_UNION_propValData_ul;
  221. lpNotify->tab->propPrior.ulPropTag = PR_NULL;
  222. }
  223. lpNotify->tab->ulObjType = ulObjType;
  224. if(lpRow) {
  225. lpNotify->tab->pRow = s_alloc<propValArray>(nullptr);
  226. lpNotify->tab->pRow->__ptr = lpRow->__ptr;
  227. lpNotify->tab->pRow->__size = lpRow->__size;
  228. }
  229. AddNotification(lpNotify, ulTableId, 0, ulSessionId);
  230. //Free by lpRow
  231. if(lpNotify->tab->pRow){
  232. lpNotify->tab->pRow->__ptr = NULL;
  233. lpNotify->tab->pRow->__size = 0;
  234. }
  235. //Free struct
  236. FreeNotificationStruct(lpNotify);
  237. Unlock();
  238. return hr;
  239. }
  240. ECRESULT ECSessionGroup::AddChangeNotification(const std::set<unsigned int> &syncIds, unsigned int ulChangeId, unsigned int ulChangeType)
  241. {
  242. ECRESULT er = erSuccess;
  243. notification notifyItem{__gszeroinit};
  244. notificationICS ics{__gszeroinit};
  245. entryId syncStateBin = {0};
  246. notifySyncState syncState = {0, ulChangeId};
  247. std::map<ECSESSIONID,unsigned int> mapInserted;
  248. notifyItem.ulEventType = fnevKopanoIcsChange;
  249. notifyItem.ics = &ics;
  250. notifyItem.ics->pSyncState = &syncStateBin;
  251. notifyItem.ics->pSyncState->__size = sizeof(syncState);
  252. notifyItem.ics->pSyncState->__ptr = (unsigned char*)&syncState;
  253. notifyItem.ics->ulChangeType = ulChangeType;
  254. Lock();
  255. ulock_normal l_note(m_hNotificationLock);
  256. // Iterate through all sync ids
  257. for (auto sync_id : syncIds) {
  258. // Iterate through all subscribed clients for the current sync id
  259. auto iterRange = m_mapChangeSubscribe.equal_range(sync_id);
  260. for (auto iterItem = iterRange.first;
  261. iterItem != iterRange.second; ++iterItem) {
  262. // update sync state
  263. syncState.ulSyncId = sync_id;
  264. // create ECNotification
  265. ECNotification notify(notifyItem);
  266. notify.SetConnection(iterItem->second.ulConnection);
  267. m_listNotification.push_back(notify);
  268. mapInserted[iterItem->second.ulSession]++;
  269. }
  270. }
  271. l_note.unlock();
  272. // Since we now have a notification ready to send, tell the session manager that we have something to send. Since
  273. // a notifications can be read from any session in the session group, we have to notify all of the sessions
  274. ulock_rec l_ses(m_hSessionMapLock);
  275. for (const auto &p : m_mapSessions)
  276. m_lpSessionManager->NotifyNotificationReady(p.second.lpSession->GetSessionId());
  277. l_ses.unlock();
  278. Unlock();
  279. return er;
  280. }
  281. ECRESULT ECSessionGroup::AddChangeNotification(ECSESSIONID ulSessionId, unsigned int ulConnection, unsigned int ulSyncId, unsigned long ulChangeId)
  282. {
  283. ECRESULT er = erSuccess;
  284. notification notifyItem{__gszeroinit};
  285. notificationICS ics{__gszeroinit};
  286. entryId syncStateBin = {0};
  287. notifySyncState syncState = { ulSyncId, static_cast<unsigned int>(ulChangeId) };
  288. notifyItem.ulEventType = fnevKopanoIcsChange;
  289. notifyItem.ics = &ics;
  290. notifyItem.ics->pSyncState = &syncStateBin;
  291. notifyItem.ics->pSyncState->__size = sizeof(syncState);
  292. notifyItem.ics->pSyncState->__ptr = (unsigned char*)&syncState;
  293. Lock();
  294. ulock_normal l_note(m_hNotificationLock);
  295. // create ECNotification
  296. ECNotification notify(notifyItem);
  297. notify.SetConnection(ulConnection);
  298. m_listNotification.push_back(notify);
  299. l_note.unlock();
  300. // Since we now have a notification ready to send, tell the session manager that we have something to send. Since
  301. // a notifications can be read from any session in the session group, we have to notify all of the sessions
  302. ulock_rec l_ses(m_hSessionMapLock);
  303. for (const auto &p : m_mapSessions)
  304. m_lpSessionManager->NotifyNotificationReady(p.second.lpSession->GetSessionId());
  305. l_ses.unlock();
  306. Unlock();
  307. return er;
  308. }
  309. ECRESULT ECSessionGroup::GetNotifyItems(struct soap *soap, ECSESSIONID ulSessionId, struct notifyResponse *notifications)
  310. {
  311. ECRESULT er = erSuccess;
  312. /* Start waiting for notifications */
  313. Lock();
  314. /*
  315. * Store the session which requested the notifications.
  316. * We need this in case the session is removed and the
  317. * session must release all calls into ECSessionGroup.
  318. */
  319. m_getNotifySession = ulSessionId;
  320. /*
  321. * Update Session times for all sessions attached to this group.
  322. * This prevents any of the sessions to timeout while it was waiting
  323. * for notifications for the group.
  324. */
  325. UpdateSessionTime();
  326. memset(notifications, 0, sizeof(notifyResponse));
  327. ulock_normal l_note(m_hNotificationLock);
  328. /* May still be nothing in there, as the signal is also fired when we should exit */
  329. if (!m_listNotification.empty()) {
  330. ULONG ulSize = (ULONG)m_listNotification.size();
  331. notifications->pNotificationArray = s_alloc<notificationArray>(soap);
  332. notifications->pNotificationArray->__ptr = s_alloc<notification>(soap, ulSize);
  333. notifications->pNotificationArray->__size = ulSize;
  334. size_t nPos = 0;
  335. for (const auto i : m_listNotification)
  336. i.GetCopy(soap, notifications->pNotificationArray->__ptr[nPos++]);
  337. m_listNotification.clear();
  338. } else {
  339. er = KCERR_NOT_FOUND;
  340. }
  341. l_note.unlock();
  342. /* Reset GetNotifySession */
  343. m_getNotifySession = 0;
  344. Unlock();
  345. return er;
  346. }
  347. ECRESULT ECSessionGroup::releaseListeners()
  348. {
  349. scoped_lock lock(m_hNotificationLock);
  350. m_bExit = true;
  351. m_hNewNotificationEvent.notify_all();
  352. return erSuccess;
  353. }
  354. /**
  355. * Get object size
  356. *
  357. * @return Object size in bytes
  358. */
  359. size_t ECSessionGroup::GetObjectSize(void)
  360. {
  361. size_t ulSize = 0;
  362. ulock_normal l_note(m_hNotificationLock);
  363. ulSize += MEMORY_USAGE_MAP(m_mapSubscribe.size(), SUBSCRIBEMAP);
  364. ulSize += MEMORY_USAGE_MAP(m_mapChangeSubscribe.size(), CHANGESUBSCRIBEMAP);
  365. size_t ulItems = 0;
  366. for (const auto &n : m_listNotification) {
  367. ++ulItems;
  368. ulSize += n.GetObjectSize();
  369. }
  370. ulSize += MEMORY_USAGE_LIST(ulItems, ECNOTIFICATIONLIST);
  371. l_note.unlock();
  372. ulSize += sizeof(*this);
  373. ulock_rec l_ses(m_hSessionMapLock);
  374. ulSize += MEMORY_USAGE_MAP(m_mapSessions.size(), SESSIONINFOMAP);
  375. l_ses.unlock();
  376. ulock_normal l_sub(m_mutexSubscribedStores);
  377. ulSize += MEMORY_USAGE_MULTIMAP(m_mapSubscribedStores.size(), SUBSCRIBESTOREMULTIMAP);
  378. l_sub.unlock();
  379. return ulSize;
  380. }
  381. } /* namespace */