WSMessageStreamImporter.cpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <new>
  19. #include "SOAPUtils.h"
  20. #include "WSMessageStreamImporter.h"
  21. #include "WSUtil.h"
  22. #include "ECSyncSettings.h"
  23. /**
  24. * Create a new WSMessageStreamSink instance
  25. * @param[in] lpFifoBuffer The fifobuffer to write the data into.
  26. * @param[in] ulTimeout The timeout in ms to use when writing to the
  27. * fifobuffer.
  28. * @param[out] lppSink The newly created object
  29. */
  30. HRESULT WSMessageStreamSink::Create(ECFifoBuffer *lpFifoBuffer, ULONG ulTimeout, WSMessageStreamImporter *lpImporter, WSMessageStreamSink **lppSink)
  31. {
  32. if (lpFifoBuffer == NULL || lppSink == NULL)
  33. return MAPI_E_INVALID_PARAMETER;
  34. WSMessageStreamSinkPtr ptrSink(new(std::nothrow) WSMessageStreamSink(lpFifoBuffer, ulTimeout, lpImporter));
  35. if (ptrSink == nullptr)
  36. return MAPI_E_NOT_ENOUGH_MEMORY;
  37. *lppSink = ptrSink.release();
  38. return hrSuccess;
  39. }
  40. /**
  41. * Write data into the underlaying fifo buffer.
  42. * @param[in] lpData Pointer to the data
  43. * @param[in] cbData The amount of data in bytes.
  44. */
  45. HRESULT WSMessageStreamSink::Write(LPVOID lpData, ULONG cbData)
  46. {
  47. HRESULT hr = hrSuccess;
  48. HRESULT hrAsync = hrSuccess;
  49. hr = kcerr_to_mapierr(m_lpFifoBuffer->Write(lpData, cbData, 0, NULL));
  50. if(hr != hrSuccess) {
  51. // Write failed, close the write-side of the FIFO
  52. m_lpFifoBuffer->Close(ECFifoBuffer::cfWrite);
  53. // Failure writing to the fifo. This means there must have been some error
  54. // on the other side of the FIFO. Since that is the root cause of the write failure,
  55. // return that instead of the error from the FIFO buffer (most probably a network
  56. // error, but others also possible, eg logon failure, session lost, etc)
  57. m_lpImporter->GetAsyncResult(&hrAsync);
  58. // Make sure that we only use the async error if there really was an error
  59. if(hrAsync != hrSuccess)
  60. hr = hrAsync;
  61. }
  62. return hr;
  63. }
  64. /**
  65. * @param[in] lpFifoBuffer The fifobuffer to write the data into.
  66. */
  67. WSMessageStreamSink::WSMessageStreamSink(ECFifoBuffer *lpFifoBuffer, ULONG ulTimeout, WSMessageStreamImporter *lpImporter)
  68. : m_lpFifoBuffer(lpFifoBuffer)
  69. , m_lpImporter(lpImporter)
  70. { }
  71. /**
  72. * Closes the underlaying fifo buffer, causing the reader to stop reading.
  73. */
  74. WSMessageStreamSink::~WSMessageStreamSink()
  75. {
  76. m_lpFifoBuffer->Close(ECFifoBuffer::cfWrite);
  77. }
  78. HRESULT WSMessageStreamImporter::Create(ULONG ulFlags, ULONG ulSyncId, ULONG cbEntryID, LPENTRYID lpEntryID, ULONG cbFolderEntryID, LPENTRYID lpFolderEntryID, bool bNewMessage, LPSPropValue lpConflictItems, WSTransport *lpTransport, WSMessageStreamImporter **lppStreamImporter)
  79. {
  80. HRESULT hr = hrSuccess;
  81. entryId sEntryId = {0};
  82. entryId sFolderEntryId = {0};
  83. struct propVal sConflictItems{__gszeroinit};
  84. WSMessageStreamImporterPtr ptrStreamImporter;
  85. ECSyncSettings* lpSyncSettings = NULL;
  86. if (lppStreamImporter == NULL ||
  87. lpEntryID == NULL || cbEntryID == 0 ||
  88. lpFolderEntryID == NULL || cbFolderEntryID == 0 ||
  89. (bNewMessage == true && lpConflictItems != NULL) ||
  90. lpTransport == NULL)
  91. {
  92. hr = MAPI_E_INVALID_PARAMETER;
  93. goto exit;
  94. }
  95. hr = CopyMAPIEntryIdToSOAPEntryId(cbEntryID, lpEntryID, &sEntryId, false);
  96. if (hr != hrSuccess)
  97. goto exit;
  98. hr = CopyMAPIEntryIdToSOAPEntryId(cbFolderEntryID, lpFolderEntryID, &sFolderEntryId, false);
  99. if (hr != hrSuccess)
  100. goto exit;
  101. if (lpConflictItems) {
  102. hr = CopyMAPIPropValToSOAPPropVal(&sConflictItems, lpConflictItems);
  103. if (hr != hrSuccess)
  104. goto exit;
  105. }
  106. lpSyncSettings = ECSyncSettings::GetInstance();
  107. ptrStreamImporter.reset(new(std::nothrow) WSMessageStreamImporter(ulFlags, ulSyncId, sEntryId, sFolderEntryId, bNewMessage, sConflictItems, lpTransport, lpSyncSettings->StreamBufferSize(), lpSyncSettings->StreamTimeout()));
  108. if (ptrStreamImporter == nullptr) {
  109. hr = MAPI_E_NOT_ENOUGH_MEMORY;
  110. goto exit;
  111. }
  112. // The following are now owned by the stream importer
  113. sEntryId.__ptr = NULL;
  114. sFolderEntryId.__ptr = NULL;
  115. sConflictItems.Value.bin = NULL;
  116. *lppStreamImporter = ptrStreamImporter.release();
  117. exit:
  118. s_free(nullptr, sEntryId.__ptr);
  119. s_free(nullptr, sFolderEntryId.__ptr);
  120. if (sConflictItems.Value.bin)
  121. s_free(nullptr, sConflictItems.Value.bin->__ptr);
  122. s_free(nullptr, sConflictItems.Value.bin);
  123. return hr;
  124. }
  125. HRESULT WSMessageStreamImporter::StartTransfer(WSMessageStreamSink **lppSink)
  126. {
  127. HRESULT hr;
  128. WSMessageStreamSinkPtr ptrSink;
  129. if (!m_threadPool.dispatch(this))
  130. return MAPI_E_CALL_FAILED;
  131. hr = WSMessageStreamSink::Create(&m_fifoBuffer, m_ulTimeout, this, &~ptrSink);
  132. if (hr != hrSuccess) {
  133. m_fifoBuffer.Close(ECFifoBuffer::cfWrite);
  134. return hr;
  135. }
  136. AddChild(ptrSink);
  137. *lppSink = ptrSink.release();
  138. return hrSuccess;
  139. }
  140. HRESULT WSMessageStreamImporter::GetAsyncResult(HRESULT *lphrResult)
  141. {
  142. if (lphrResult == NULL)
  143. return MAPI_E_INVALID_PARAMETER;
  144. if (!wait(m_ulTimeout))
  145. return MAPI_E_TIMEOUT;
  146. *lphrResult = m_hr;
  147. return hrSuccess;
  148. }
  149. WSMessageStreamImporter::WSMessageStreamImporter(ULONG ulFlags, ULONG ulSyncId, const entryId &sEntryId, const entryId &sFolderEntryId, bool bNewMessage, const propVal &sConflictItems, WSTransport *lpTransport, ULONG ulBufferSize, ULONG ulTimeout)
  150. : m_ulFlags(ulFlags)
  151. , m_ulSyncId(ulSyncId)
  152. , m_sEntryId(sEntryId)
  153. , m_sFolderEntryId(sFolderEntryId)
  154. , m_bNewMessage(bNewMessage)
  155. , m_sConflictItems(sConflictItems)
  156. , m_ptrTransport(lpTransport, true)
  157. , m_fifoBuffer(ulBufferSize)
  158. , m_threadPool(1)
  159. , m_ulTimeout(ulTimeout)
  160. {
  161. }
  162. WSMessageStreamImporter::~WSMessageStreamImporter()
  163. {
  164. s_free(nullptr, m_sEntryId.__ptr);
  165. s_free(nullptr, m_sFolderEntryId.__ptr);
  166. if (m_sConflictItems.Value.bin)
  167. s_free(nullptr, m_sConflictItems.Value.bin->__ptr);
  168. s_free(nullptr, m_sConflictItems.Value.bin);
  169. }
  170. void WSMessageStreamImporter::run()
  171. {
  172. unsigned int ulResult = 0;
  173. struct xsd__Binary sStreamData{__gszeroinit};
  174. struct soap *lpSoap = m_ptrTransport->m_lpCmd->soap;
  175. propVal *lpsConflictItems = NULL;
  176. if (m_sConflictItems.ulPropTag != 0)
  177. lpsConflictItems = &m_sConflictItems;
  178. sStreamData.xop__Include.__ptr = (unsigned char*)this;
  179. sStreamData.xop__Include.type = const_cast<char *>("application/binary");
  180. m_ptrTransport->LockSoap();
  181. soap_set_omode(lpSoap, SOAP_ENC_MTOM | SOAP_IO_CHUNK);
  182. lpSoap->mode &= ~SOAP_XML_TREE;
  183. lpSoap->omode &= ~SOAP_XML_TREE;
  184. lpSoap->fmimereadopen = &StaticMTOMReadOpen;
  185. lpSoap->fmimeread = &StaticMTOMRead;
  186. lpSoap->fmimereadclose = &StaticMTOMReadClose;
  187. m_hr = hrSuccess;
  188. if (m_ptrTransport->m_lpCmd->ns__importMessageFromStream(m_ptrTransport->m_ecSessionId, m_ulFlags, m_ulSyncId, m_sFolderEntryId, m_sEntryId, m_bNewMessage, lpsConflictItems, sStreamData, &ulResult) != SOAP_OK)
  189. m_hr = MAPI_E_NETWORK_ERROR;
  190. else if (m_hr == hrSuccess) // Could be set from callback
  191. m_hr = kcerr_to_mapierr(ulResult, MAPI_E_NOT_FOUND);
  192. m_ptrTransport->UnLockSoap();
  193. }
  194. void* WSMessageStreamImporter::StaticMTOMReadOpen(struct soap *soap, void *handle, const char *id, const char *type, const char *description)
  195. {
  196. return static_cast<WSMessageStreamImporter *>(handle)->MTOMReadOpen(soap, handle, id, type, description);
  197. }
  198. size_t WSMessageStreamImporter::StaticMTOMRead(struct soap *soap, void *handle, char *buf, size_t len)
  199. {
  200. return static_cast<WSMessageStreamImporter *>(handle)->MTOMRead(soap, handle, buf, len);
  201. }
  202. void WSMessageStreamImporter::StaticMTOMReadClose(struct soap *soap, void *handle)
  203. {
  204. static_cast<WSMessageStreamImporter *>(handle)->MTOMReadClose(soap, handle);
  205. }
  206. void* WSMessageStreamImporter::MTOMReadOpen(struct soap* /*soap*/, void *handle, const char* /*id*/, const char* /*type*/, const char* /*description*/)
  207. {
  208. return handle;
  209. }
  210. size_t WSMessageStreamImporter::MTOMRead(struct soap* soap, void* /*handle*/, char *buf, size_t len)
  211. {
  212. ECRESULT er = erSuccess;
  213. ECFifoBuffer::size_type cbRead = 0;
  214. er = m_fifoBuffer.Read(buf, len, 0, &cbRead);
  215. if (er != erSuccess) {
  216. m_hr = kcerr_to_mapierr(er);
  217. return 0;
  218. }
  219. return cbRead;
  220. }
  221. void WSMessageStreamImporter::MTOMReadClose(struct soap* /*soap*/, void* /*handle*/)
  222. {
  223. m_fifoBuffer.Close(ECFifoBuffer::cfRead);
  224. }