Streaming.cpp 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. #include "Crypto.h"
  2. #include "Log.h"
  3. #include "RouterInfo.h"
  4. #include "RouterContext.h"
  5. #include "Tunnel.h"
  6. #include "Timestamp.h"
  7. #include "Destination.h"
  8. #include "Streaming.h"
  9. namespace i2p
  10. {
  11. namespace stream
  12. {
  13. void SendBufferQueue::Add (const uint8_t * buf, size_t len, SendHandler handler)
  14. {
  15. m_Buffers.push_back (std::make_shared<SendBuffer>(buf, len, handler));
  16. m_Size += len;
  17. }
  18. size_t SendBufferQueue::Get (uint8_t * buf, size_t len)
  19. {
  20. size_t offset = 0;
  21. while (!m_Buffers.empty () && offset < len)
  22. {
  23. auto nextBuffer = m_Buffers.front ();
  24. auto rem = nextBuffer->GetRemainingSize ();
  25. if (offset + rem <= len)
  26. {
  27. // whole buffer
  28. memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
  29. offset += rem;
  30. m_Buffers.pop_front (); // delete it
  31. }
  32. else
  33. {
  34. // partially
  35. rem = len - offset;
  36. memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), len - offset);
  37. nextBuffer->offset += (len - offset);
  38. offset = len; // break
  39. }
  40. }
  41. m_Size -= offset;
  42. return offset;
  43. }
  44. void SendBufferQueue::CleanUp ()
  45. {
  46. if (!m_Buffers.empty ())
  47. {
  48. for (auto it: m_Buffers)
  49. it->Cancel ();
  50. m_Buffers.clear ();
  51. m_Size = 0;
  52. }
  53. }
  54. Stream::Stream (boost::asio::io_service& service, StreamingDestination& local,
  55. std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service),
  56. m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
  57. m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
  58. m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
  59. m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
  60. m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO),
  61. m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),
  62. m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0)
  63. {
  64. RAND_bytes ((uint8_t *)&m_RecvStreamID, 4);
  65. m_RemoteIdentity = remote->GetIdentity ();
  66. }
  67. Stream::Stream (boost::asio::io_service& service, StreamingDestination& local):
  68. m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
  69. m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
  70. m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
  71. m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE),
  72. m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),
  73. m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0)
  74. {
  75. RAND_bytes ((uint8_t *)&m_RecvStreamID, 4);
  76. }
  77. Stream::~Stream ()
  78. {
  79. CleanUp ();
  80. LogPrint (eLogDebug, "Streaming: Stream deleted");
  81. }
  82. void Stream::Terminate ()
  83. {
  84. m_AckSendTimer.cancel ();
  85. m_ReceiveTimer.cancel ();
  86. m_ResendTimer.cancel ();
  87. //CleanUp (); /* Need to recheck - broke working on windows */
  88. m_LocalDestination.DeleteStream (shared_from_this ());
  89. }
  90. void Stream::CleanUp ()
  91. {
  92. {
  93. std::unique_lock<std::mutex> l(m_SendBufferMutex);
  94. m_SendBuffer.CleanUp ();
  95. }
  96. while (!m_ReceiveQueue.empty ())
  97. {
  98. auto packet = m_ReceiveQueue.front ();
  99. m_ReceiveQueue.pop ();
  100. m_LocalDestination.DeletePacket (packet);
  101. }
  102. for (auto it: m_SentPackets)
  103. m_LocalDestination.DeletePacket (it);
  104. m_SentPackets.clear ();
  105. for (auto it: m_SavedPackets)
  106. m_LocalDestination.DeletePacket (it);
  107. m_SavedPackets.clear ();
  108. }
  109. void Stream::HandleNextPacket (Packet * packet)
  110. {
  111. m_NumReceivedBytes += packet->GetLength ();
  112. if (!m_SendStreamID)
  113. m_SendStreamID = packet->GetReceiveStreamID ();
  114. if (!packet->IsNoAck ()) // ack received
  115. ProcessAck (packet);
  116. int32_t receivedSeqn = packet->GetSeqn ();
  117. bool isSyn = packet->IsSYN ();
  118. if (!receivedSeqn && !isSyn)
  119. {
  120. // plain ack
  121. LogPrint (eLogDebug, "Streaming: Plain ACK received");
  122. m_LocalDestination.DeletePacket (packet);
  123. return;
  124. }
  125. LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID);
  126. if (receivedSeqn == m_LastReceivedSequenceNumber + 1)
  127. {
  128. // we have received next in sequence message
  129. ProcessPacket (packet);
  130. // we should also try stored messages if any
  131. for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();)
  132. {
  133. if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1))
  134. {
  135. Packet * savedPacket = *it;
  136. m_SavedPackets.erase (it++);
  137. ProcessPacket (savedPacket);
  138. }
  139. else
  140. break;
  141. }
  142. // schedule ack for last message
  143. if (m_Status == eStreamStatusOpen)
  144. {
  145. if (!m_IsAckSendScheduled)
  146. {
  147. m_IsAckSendScheduled = true;
  148. auto ackTimeout = m_RTT/10;
  149. if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
  150. m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ackTimeout));
  151. m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer,
  152. shared_from_this (), std::placeholders::_1));
  153. }
  154. }
  155. else if (isSyn)
  156. // we have to send SYN back to incoming connection
  157. SendBuffer (); // also sets m_IsOpen
  158. }
  159. else
  160. {
  161. if (receivedSeqn <= m_LastReceivedSequenceNumber)
  162. {
  163. // we have received duplicate
  164. LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID);
  165. SendQuickAck (); // resend ack for previous message again
  166. m_LocalDestination.DeletePacket (packet); // packet dropped
  167. }
  168. else
  169. {
  170. LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
  171. // save message and wait for missing message again
  172. SavePacket (packet);
  173. if (m_LastReceivedSequenceNumber >= 0)
  174. {
  175. // send NACKs for missing messages ASAP
  176. if (m_IsAckSendScheduled)
  177. {
  178. m_IsAckSendScheduled = false;
  179. m_AckSendTimer.cancel ();
  180. }
  181. SendQuickAck ();
  182. }
  183. else
  184. {
  185. // wait for SYN
  186. m_IsAckSendScheduled = true;
  187. m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(SYN_TIMEOUT));
  188. m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer,
  189. shared_from_this (), std::placeholders::_1));
  190. }
  191. }
  192. }
  193. }
  194. void Stream::SavePacket (Packet * packet)
  195. {
  196. if (!m_SavedPackets.insert (packet).second)
  197. m_LocalDestination.DeletePacket (packet);
  198. }
  199. void Stream::ProcessPacket (Packet * packet)
  200. {
  201. uint32_t receivedSeqn = packet->GetSeqn ();
  202. uint16_t flags = packet->GetFlags ();
  203. LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags);
  204. if (!ProcessOptions (flags, packet))
  205. {
  206. m_LocalDestination.DeletePacket (packet);
  207. Terminate ();
  208. return;
  209. }
  210. packet->offset = packet->GetPayload () - packet->buf;
  211. if (packet->GetLength () > 0)
  212. {
  213. m_ReceiveQueue.push (packet);
  214. m_ReceiveTimer.cancel ();
  215. }
  216. else
  217. m_LocalDestination.DeletePacket (packet);
  218. m_LastReceivedSequenceNumber = receivedSeqn;
  219. if (flags & PACKET_FLAG_RESET)
  220. {
  221. LogPrint (eLogDebug, "Streaming: closing stream sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ": reset flag received in packet #", receivedSeqn);
  222. m_Status = eStreamStatusReset;
  223. Close ();
  224. }
  225. else if (flags & PACKET_FLAG_CLOSE)
  226. {
  227. if (m_Status != eStreamStatusClosed)
  228. SendClose ();
  229. m_Status = eStreamStatusClosed;
  230. Terminate ();
  231. }
  232. }
  233. bool Stream::ProcessOptions (uint16_t flags, Packet * packet)
  234. {
  235. const uint8_t * optionData = packet->GetOptionData ();
  236. size_t optionSize = packet->GetOptionSize ();
  237. if (flags & PACKET_FLAG_DELAY_REQUESTED)
  238. optionData += 2;
  239. if (flags & PACKET_FLAG_FROM_INCLUDED)
  240. {
  241. if (m_RemoteLeaseSet) m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity ();
  242. if (!m_RemoteIdentity)
  243. m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, optionSize);
  244. if (m_RemoteIdentity->IsRSA ())
  245. {
  246. LogPrint (eLogInfo, "Streaming: Incoming stream from RSA destination ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " Discarded");
  247. return false;
  248. }
  249. optionData += m_RemoteIdentity->GetFullLen ();
  250. if (!m_RemoteLeaseSet)
  251. LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID);
  252. }
  253. if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED)
  254. {
  255. uint16_t maxPacketSize = bufbe16toh (optionData);
  256. LogPrint (eLogDebug, "Streaming: Max packet size ", maxPacketSize);
  257. optionData += 2;
  258. }
  259. if (flags & PACKET_FLAG_OFFLINE_SIGNATURE)
  260. {
  261. if (!m_RemoteIdentity)
  262. {
  263. LogPrint (eLogInfo, "Streaming: offline signature without identity");
  264. return false;
  265. }
  266. // if we have it in LeaseSet already we don't need to parse it again
  267. if (m_RemoteLeaseSet) m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier ();
  268. if (m_TransientVerifier)
  269. {
  270. // skip option data
  271. optionData += 6; // timestamp and key type
  272. optionData += m_TransientVerifier->GetPublicKeyLen (); // public key
  273. optionData += m_RemoteIdentity->GetSignatureLen (); // signature
  274. }
  275. else
  276. {
  277. // transient key
  278. size_t offset = 0;
  279. m_TransientVerifier = i2p::data::ProcessOfflineSignature (m_RemoteIdentity, optionData, optionSize - (optionData - packet->GetOptionData ()), offset);
  280. optionData += offset;
  281. if (!m_TransientVerifier)
  282. {
  283. LogPrint (eLogError, "Streaming: offline signature failed");
  284. return false;
  285. }
  286. }
  287. }
  288. if (flags & PACKET_FLAG_SIGNATURE_INCLUDED)
  289. {
  290. uint8_t signature[256];
  291. auto signatureLen = m_RemoteIdentity->GetSignatureLen ();
  292. if(signatureLen <= sizeof(signature))
  293. {
  294. memcpy (signature, optionData, signatureLen);
  295. memset (const_cast<uint8_t *>(optionData), 0, signatureLen);
  296. bool verified = m_TransientVerifier ?
  297. m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature) :
  298. m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature);
  299. if (!verified)
  300. {
  301. LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID);
  302. Close ();
  303. flags |= PACKET_FLAG_CLOSE;
  304. }
  305. memcpy (const_cast<uint8_t *>(optionData), signature, signatureLen);
  306. optionData += signatureLen;
  307. }
  308. else
  309. {
  310. LogPrint (eLogError, "Streaming: Signature too big, ", signatureLen, " bytes");
  311. return false;
  312. }
  313. }
  314. return true;
  315. }
  316. void Stream::ProcessAck (Packet * packet)
  317. {
  318. bool acknowledged = false;
  319. auto ts = i2p::util::GetMillisecondsSinceEpoch ();
  320. uint32_t ackThrough = packet->GetAckThrough ();
  321. if (ackThrough > m_SequenceNumber)
  322. {
  323. LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber);
  324. return;
  325. }
  326. int nackCount = packet->GetNACKCount ();
  327. for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();)
  328. {
  329. auto seqn = (*it)->GetSeqn ();
  330. if (seqn <= ackThrough)
  331. {
  332. if (nackCount > 0)
  333. {
  334. bool nacked = false;
  335. for (int i = 0; i < nackCount; i++)
  336. if (seqn == packet->GetNACK (i))
  337. {
  338. nacked = true;
  339. break;
  340. }
  341. if (nacked)
  342. {
  343. LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK");
  344. ++it;
  345. continue;
  346. }
  347. }
  348. auto sentPacket = *it;
  349. uint64_t rtt = ts - sentPacket->sendTime;
  350. if(ts < sentPacket->sendTime)
  351. {
  352. LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
  353. rtt = 1;
  354. }
  355. m_RTT = (m_RTT*seqn + rtt)/(seqn + 1);
  356. m_RTO = m_RTT*1.5; // TODO: implement it better
  357. LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
  358. m_SentPackets.erase (it++);
  359. m_LocalDestination.DeletePacket (sentPacket);
  360. acknowledged = true;
  361. if (m_WindowSize < WINDOW_SIZE)
  362. m_WindowSize++; // slow start
  363. else
  364. {
  365. // linear growth
  366. if (ts > m_LastWindowSizeIncreaseTime + m_RTT)
  367. {
  368. m_WindowSize++;
  369. if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE;
  370. m_LastWindowSizeIncreaseTime = ts;
  371. }
  372. }
  373. if (!seqn && m_RoutingSession) // first message confirmed
  374. m_RoutingSession->SetSharedRoutingPath (
  375. std::make_shared<i2p::garlic::GarlicRoutingPath> (
  376. i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, m_RTT, 0, 0}));
  377. }
  378. else
  379. break;
  380. }
  381. if (m_SentPackets.empty ())
  382. m_ResendTimer.cancel ();
  383. if (acknowledged)
  384. {
  385. m_NumResendAttempts = 0;
  386. SendBuffer ();
  387. }
  388. if (m_Status == eStreamStatusClosed)
  389. Terminate ();
  390. else if (m_Status == eStreamStatusClosing)
  391. Close (); // check is all outgoing messages have been sent and we can send close
  392. }
  393. size_t Stream::Send (const uint8_t * buf, size_t len)
  394. {
  395. size_t sent = len;
  396. while(len > MAX_PACKET_SIZE)
  397. {
  398. AsyncSend (buf, MAX_PACKET_SIZE, nullptr);
  399. buf += MAX_PACKET_SIZE;
  400. len -= MAX_PACKET_SIZE;
  401. }
  402. AsyncSend (buf, len, nullptr);
  403. return sent;
  404. }
  405. void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler)
  406. {
  407. if (len > 0 && buf)
  408. {
  409. std::unique_lock<std::mutex> l(m_SendBufferMutex);
  410. m_SendBuffer.Add (buf, len, handler);
  411. }
  412. else if (handler)
  413. handler(boost::system::error_code ());
  414. m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ()));
  415. }
  416. void Stream::SendBuffer ()
  417. {
  418. int numMsgs = m_WindowSize - m_SentPackets.size ();
  419. if (numMsgs <= 0) return; // window is full
  420. bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet
  421. std::vector<Packet *> packets;
  422. {
  423. std::unique_lock<std::mutex> l(m_SendBufferMutex);
  424. while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0))
  425. {
  426. Packet * p = m_LocalDestination.NewPacket ();
  427. uint8_t * packet = p->GetBuffer ();
  428. // TODO: implement setters
  429. size_t size = 0;
  430. htobe32buf (packet + size, m_SendStreamID);
  431. size += 4; // sendStreamID
  432. htobe32buf (packet + size, m_RecvStreamID);
  433. size += 4; // receiveStreamID
  434. htobe32buf (packet + size, m_SequenceNumber++);
  435. size += 4; // sequenceNum
  436. if (isNoAck)
  437. htobuf32 (packet + size, 0);
  438. else
  439. htobe32buf (packet + size, m_LastReceivedSequenceNumber);
  440. size += 4; // ack Through
  441. packet[size] = 0;
  442. size++; // NACK count
  443. packet[size] = m_RTO/1000;
  444. size++; // resend delay
  445. if (m_Status == eStreamStatusNew)
  446. {
  447. // initial packet
  448. m_Status = eStreamStatusOpen;
  449. uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED |
  450. PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED;
  451. if (isNoAck) flags |= PACKET_FLAG_NO_ACK;
  452. bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature ();
  453. if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE;
  454. htobe16buf (packet + size, flags);
  455. size += 2; // flags
  456. size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen ();
  457. size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen ();
  458. uint8_t * optionsSize = packet + size; // set options size later
  459. size += 2; // options size
  460. m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen);
  461. size += identityLen; // from
  462. htobe16buf (packet + size, STREAMING_MTU);
  463. size += 2; // max packet size
  464. if (isOfflineSignature)
  465. {
  466. const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature ();
  467. memcpy (packet + size, offlineSignature.data (), offlineSignature.size ());
  468. size += offlineSignature.size (); // offline signature
  469. }
  470. uint8_t * signature = packet + size; // set it later
  471. memset (signature, 0, signatureLen); // zeroes for now
  472. size += signatureLen; // signature
  473. htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size
  474. size += m_SendBuffer.Get (packet + size, STREAMING_MTU - size); // payload
  475. m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
  476. }
  477. else
  478. {
  479. // follow on packet
  480. htobuf16 (packet + size, 0);
  481. size += 2; // flags
  482. htobuf16 (packet + size, 0); // no options
  483. size += 2; // options size
  484. size += m_SendBuffer.Get(packet + size, STREAMING_MTU - size); // payload
  485. }
  486. p->len = size;
  487. packets.push_back (p);
  488. numMsgs--;
  489. }
  490. }
  491. if (packets.size () > 0)
  492. {
  493. if (m_SavedPackets.empty ()) // no NACKS
  494. {
  495. m_IsAckSendScheduled = false;
  496. m_AckSendTimer.cancel ();
  497. }
  498. bool isEmpty = m_SentPackets.empty ();
  499. auto ts = i2p::util::GetMillisecondsSinceEpoch ();
  500. for (auto& it: packets)
  501. {
  502. it->sendTime = ts;
  503. m_SentPackets.insert (it);
  504. }
  505. SendPackets (packets);
  506. if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ())
  507. SendClose ();
  508. if (isEmpty)
  509. ScheduleResend ();
  510. }
  511. }
  512. void Stream::SendQuickAck ()
  513. {
  514. int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber;
  515. if (!m_SavedPackets.empty ())
  516. {
  517. int32_t seqn = (*m_SavedPackets.rbegin ())->GetSeqn ();
  518. if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn;
  519. }
  520. if (lastReceivedSeqn < 0)
  521. {
  522. LogPrint (eLogError, "Streaming: No packets have been received yet");
  523. return;
  524. }
  525. Packet p;
  526. uint8_t * packet = p.GetBuffer ();
  527. size_t size = 0;
  528. htobe32buf (packet + size, m_SendStreamID);
  529. size += 4; // sendStreamID
  530. htobe32buf (packet + size, m_RecvStreamID);
  531. size += 4; // receiveStreamID
  532. htobuf32 (packet + size, 0); // this is plain Ack message
  533. size += 4; // sequenceNum
  534. htobe32buf (packet + size, lastReceivedSeqn);
  535. size += 4; // ack Through
  536. uint8_t numNacks = 0;
  537. if (lastReceivedSeqn > m_LastReceivedSequenceNumber)
  538. {
  539. // fill NACKs
  540. uint8_t * nacks = packet + size + 1;
  541. auto nextSeqn = m_LastReceivedSequenceNumber + 1;
  542. for (auto it: m_SavedPackets)
  543. {
  544. auto seqn = it->GetSeqn ();
  545. if (numNacks + (seqn - nextSeqn) >= 256)
  546. {
  547. LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn);
  548. htobe32buf (packet + 12, nextSeqn); // change ack Through
  549. break;
  550. }
  551. for (uint32_t i = nextSeqn; i < seqn; i++)
  552. {
  553. htobe32buf (nacks, i);
  554. nacks += 4;
  555. numNacks++;
  556. }
  557. nextSeqn = seqn + 1;
  558. }
  559. packet[size] = numNacks;
  560. size++; // NACK count
  561. size += numNacks*4; // NACKs
  562. }
  563. else
  564. {
  565. // No NACKs
  566. packet[size] = 0;
  567. size++; // NACK count
  568. }
  569. size++; // resend delay
  570. htobuf16 (packet + size, 0); // no flags set
  571. size += 2; // flags
  572. htobuf16 (packet + size, 0); // no options
  573. size += 2; // options size
  574. p.len = size;
  575. SendPackets (std::vector<Packet *> { &p });
  576. LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs");
  577. }
  578. void Stream::Close ()
  579. {
  580. LogPrint(eLogDebug, "Streaming: closing stream with sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ", status=", m_Status);
  581. switch (m_Status)
  582. {
  583. case eStreamStatusOpen:
  584. m_Status = eStreamStatusClosing;
  585. Close (); // recursion
  586. if (m_Status == eStreamStatusClosing) //still closing
  587. LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID);
  588. break;
  589. case eStreamStatusReset:
  590. // TODO: send reset
  591. Terminate ();
  592. break;
  593. case eStreamStatusClosing:
  594. if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
  595. {
  596. m_Status = eStreamStatusClosed;
  597. SendClose();
  598. }
  599. break;
  600. case eStreamStatusClosed:
  601. // already closed
  602. Terminate ();
  603. break;
  604. default:
  605. LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID);
  606. };
  607. }
  608. void Stream::SendClose ()
  609. {
  610. Packet * p = m_LocalDestination.NewPacket ();
  611. uint8_t * packet = p->GetBuffer ();
  612. size_t size = 0;
  613. htobe32buf (packet + size, m_SendStreamID);
  614. size += 4; // sendStreamID
  615. htobe32buf (packet + size, m_RecvStreamID);
  616. size += 4; // receiveStreamID
  617. htobe32buf (packet + size, m_SequenceNumber++);
  618. size += 4; // sequenceNum
  619. htobe32buf (packet + size, m_LastReceivedSequenceNumber >= 0 ? m_LastReceivedSequenceNumber : 0);
  620. size += 4; // ack Through
  621. packet[size] = 0;
  622. size++; // NACK count
  623. size++; // resend delay
  624. htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED);
  625. size += 2; // flags
  626. size_t signatureLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetSignatureLen ();
  627. htobe16buf (packet + size, signatureLen); // signature only
  628. size += 2; // options size
  629. uint8_t * signature = packet + size;
  630. memset (packet + size, 0, signatureLen);
  631. size += signatureLen; // signature
  632. m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
  633. p->len = size;
  634. m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p));
  635. LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID);
  636. }
  637. size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len)
  638. {
  639. size_t pos = 0;
  640. while (pos < len && !m_ReceiveQueue.empty ())
  641. {
  642. Packet * packet = m_ReceiveQueue.front ();
  643. size_t l = std::min (packet->GetLength (), len - pos);
  644. memcpy (buf + pos, packet->GetBuffer (), l);
  645. pos += l;
  646. packet->offset += l;
  647. if (!packet->GetLength ())
  648. {
  649. m_ReceiveQueue.pop ();
  650. m_LocalDestination.DeletePacket (packet);
  651. }
  652. }
  653. return pos;
  654. }
  655. bool Stream::SendPacket (Packet * packet)
  656. {
  657. if (packet)
  658. {
  659. if (m_IsAckSendScheduled)
  660. {
  661. m_IsAckSendScheduled = false;
  662. m_AckSendTimer.cancel ();
  663. }
  664. SendPackets (std::vector<Packet *> { packet });
  665. bool isEmpty = m_SentPackets.empty ();
  666. m_SentPackets.insert (packet);
  667. if (isEmpty)
  668. ScheduleResend ();
  669. return true;
  670. }
  671. else
  672. return false;
  673. }
  674. void Stream::SendPackets (const std::vector<Packet *>& packets)
  675. {
  676. if (!m_RemoteLeaseSet)
  677. {
  678. UpdateCurrentRemoteLease ();
  679. if (!m_RemoteLeaseSet)
  680. {
  681. LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID);
  682. return;
  683. }
  684. }
  685. if (!m_RoutingSession || !m_RoutingSession->GetOwner ()) // expired and detached
  686. m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true);
  687. if (!m_CurrentOutboundTunnel && m_RoutingSession) // first message to send
  688. {
  689. // try to get shared path first
  690. auto routingPath = m_RoutingSession->GetSharedRoutingPath ();
  691. if (routingPath)
  692. {
  693. m_CurrentOutboundTunnel = routingPath->outboundTunnel;
  694. m_CurrentRemoteLease = routingPath->remoteLease;
  695. m_RTT = routingPath->rtt;
  696. m_RTO = m_RTT*1.5; // TODO: implement it better
  697. }
  698. }
  699. if (!m_CurrentOutboundTunnel || !m_CurrentOutboundTunnel->IsEstablished ())
  700. m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel);
  701. if (!m_CurrentOutboundTunnel)
  702. {
  703. LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID);
  704. return;
  705. }
  706. auto ts = i2p::util::GetMillisecondsSinceEpoch ();
  707. if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate || // excluded from LeaseSet
  708. ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD)
  709. UpdateCurrentRemoteLease (true);
  710. if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD)
  711. {
  712. std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
  713. for (auto it: packets)
  714. {
  715. auto msg = m_RoutingSession->WrapSingleMessage (m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength (), m_Port));
  716. msgs.push_back (i2p::tunnel::TunnelMessageBlock
  717. {
  718. i2p::tunnel::eDeliveryTypeTunnel,
  719. m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID,
  720. msg
  721. });
  722. m_NumSentBytes += it->GetLength ();
  723. }
  724. m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
  725. }
  726. else
  727. {
  728. LogPrint (eLogWarning, "Streaming: Remote lease is not available, sSID=", m_SendStreamID);
  729. if (m_RoutingSession)
  730. m_RoutingSession->SetSharedRoutingPath (nullptr); // invalidate routing path
  731. }
  732. }
  733. void Stream::SendUpdatedLeaseSet ()
  734. {
  735. if (m_RoutingSession)
  736. {
  737. if (m_RoutingSession->IsLeaseSetNonConfirmed ())
  738. {
  739. auto ts = i2p::util::GetMillisecondsSinceEpoch ();
  740. if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASET_CONFIRMATION_TIMEOUT)
  741. {
  742. // LeaseSet was not confirmed, should try other tunnels
  743. LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit");
  744. m_RoutingSession->SetSharedRoutingPath (nullptr);
  745. m_CurrentOutboundTunnel = nullptr;
  746. m_CurrentRemoteLease = nullptr;
  747. SendQuickAck ();
  748. }
  749. }
  750. else if (m_RoutingSession->IsLeaseSetUpdated ())
  751. {
  752. LogPrint (eLogDebug, "Streaming: sending updated LeaseSet");
  753. SendQuickAck ();
  754. }
  755. }
  756. }
  757. void Stream::ScheduleResend ()
  758. {
  759. m_ResendTimer.cancel ();
  760. // check for invalid value
  761. if (m_RTO <= 0) m_RTO = INITIAL_RTO;
  762. m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO));
  763. m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer,
  764. shared_from_this (), std::placeholders::_1));
  765. }
  766. void Stream::HandleResendTimer (const boost::system::error_code& ecode)
  767. {
  768. if (ecode != boost::asio::error::operation_aborted)
  769. {
  770. // check for resend attempts
  771. if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS)
  772. {
  773. LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
  774. m_Status = eStreamStatusReset;
  775. Close ();
  776. return;
  777. }
  778. // collect packets to resend
  779. auto ts = i2p::util::GetMillisecondsSinceEpoch ();
  780. std::vector<Packet *> packets;
  781. for (auto it : m_SentPackets)
  782. {
  783. if (ts >= it->sendTime + m_RTO)
  784. {
  785. it->sendTime = ts;
  786. packets.push_back (it);
  787. }
  788. }
  789. // select tunnels if necessary and send
  790. if (packets.size () > 0)
  791. {
  792. m_NumResendAttempts++;
  793. m_RTO *= 2;
  794. switch (m_NumResendAttempts)
  795. {
  796. case 1: // congesion avoidance
  797. m_WindowSize /= 2;
  798. if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE;
  799. break;
  800. case 2:
  801. m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time
  802. // no break here
  803. case 4:
  804. if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
  805. UpdateCurrentRemoteLease (); // pick another lease
  806. LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
  807. break;
  808. case 3:
  809. // pick another outbound tunnel
  810. if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
  811. m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
  812. LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID);
  813. break;
  814. default: ;
  815. }
  816. SendPackets (packets);
  817. }
  818. ScheduleResend ();
  819. }
  820. }
  821. void Stream::HandleAckSendTimer (const boost::system::error_code& ecode)
  822. {
  823. if (m_IsAckSendScheduled)
  824. {
  825. if (m_LastReceivedSequenceNumber < 0)
  826. {
  827. LogPrint (eLogWarning, "Streaming: SYN has not been received after ", SYN_TIMEOUT, " milliseconds after follow on, terminate rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
  828. m_Status = eStreamStatusReset;
  829. Close ();
  830. return;
  831. }
  832. if (m_Status == eStreamStatusOpen)
  833. {
  834. if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ())
  835. {
  836. // seems something went wrong and we should re-select tunnels
  837. m_CurrentOutboundTunnel = nullptr;
  838. m_CurrentRemoteLease = nullptr;
  839. }
  840. SendQuickAck ();
  841. }
  842. m_IsAckSendScheduled = false;
  843. }
  844. }
  845. void Stream::UpdateCurrentRemoteLease (bool expired)
  846. {
  847. if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ())
  848. {
  849. m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
  850. if (!m_RemoteLeaseSet)
  851. {
  852. LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found");
  853. m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // try to request for a next attempt
  854. }
  855. else
  856. {
  857. // LeaseSet updated
  858. m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity ();
  859. m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier ();
  860. }
  861. }
  862. if (m_RemoteLeaseSet)
  863. {
  864. if (!m_RoutingSession)
  865. m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true);
  866. auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false); // try without threshold first
  867. if (leases.empty ())
  868. {
  869. expired = false;
  870. // time to request
  871. if (m_RemoteLeaseSet->GetOrigStoreType () == i2p::data::NETDB_STORE_TYPE_ENCRYPTED_LEASESET2)
  872. m_LocalDestination.GetOwner ()->RequestDestinationWithEncryptedLeaseSet (
  873. std::make_shared<i2p::data::BlindedPublicKey>(m_RemoteIdentity));
  874. else
  875. m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ());
  876. leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold
  877. }
  878. if (!leases.empty ())
  879. {
  880. bool updated = false;
  881. if (expired && m_CurrentRemoteLease)
  882. {
  883. for (const auto& it: leases)
  884. if ((it->tunnelGateway == m_CurrentRemoteLease->tunnelGateway) && (it->tunnelID != m_CurrentRemoteLease->tunnelID))
  885. {
  886. m_CurrentRemoteLease = it;
  887. updated = true;
  888. break;
  889. }
  890. }
  891. if (!updated)
  892. {
  893. uint32_t i = rand () % leases.size ();
  894. if (m_CurrentRemoteLease && leases[i]->tunnelID == m_CurrentRemoteLease->tunnelID)
  895. // make sure we don't select previous
  896. i = (i + 1) % leases.size (); // if so, pick next
  897. m_CurrentRemoteLease = leases[i];
  898. }
  899. }
  900. else
  901. {
  902. LogPrint (eLogWarning, "Streaming: All remote leases are expired");
  903. m_RemoteLeaseSet = nullptr;
  904. m_CurrentRemoteLease = nullptr;
  905. // we have requested expired before, no need to do it twice
  906. }
  907. }
  908. else
  909. {
  910. LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found");
  911. m_CurrentRemoteLease = nullptr;
  912. }
  913. }
  914. StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
  915. m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
  916. m_PendingIncomingTimer (m_Owner->GetService ())
  917. {
  918. }
  919. StreamingDestination::~StreamingDestination ()
  920. {
  921. for (auto& it: m_SavedPackets)
  922. {
  923. for (auto it1: it.second) DeletePacket (it1);
  924. it.second.clear ();
  925. }
  926. m_SavedPackets.clear ();
  927. }
  928. void StreamingDestination::Start ()
  929. {
  930. }
  931. void StreamingDestination::Stop ()
  932. {
  933. ResetAcceptor ();
  934. m_PendingIncomingTimer.cancel ();
  935. m_PendingIncomingStreams.clear ();
  936. {
  937. std::unique_lock<std::mutex> l(m_StreamsMutex);
  938. m_Streams.clear ();
  939. }
  940. }
  941. void StreamingDestination::HandleNextPacket (Packet * packet)
  942. {
  943. uint32_t sendStreamID = packet->GetSendStreamID ();
  944. if (sendStreamID)
  945. {
  946. auto it = m_Streams.find (sendStreamID);
  947. if (it != m_Streams.end ())
  948. it->second->HandleNextPacket (packet);
  949. else
  950. {
  951. LogPrint (eLogInfo, "Streaming: Unknown stream sSID=", sendStreamID);
  952. DeletePacket (packet);
  953. }
  954. }
  955. else
  956. {
  957. if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream
  958. {
  959. uint32_t receiveStreamID = packet->GetReceiveStreamID ();
  960. auto it1 = m_IncomingStreams.find (receiveStreamID);
  961. if (it1 != m_IncomingStreams.end ())
  962. {
  963. // already pending
  964. LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists");
  965. DeletePacket (packet); // drop it, because previous should be connected
  966. return;
  967. }
  968. auto incomingStream = CreateNewIncomingStream (receiveStreamID);
  969. incomingStream->HandleNextPacket (packet); // SYN
  970. auto ident = incomingStream->GetRemoteIdentity();
  971. // handle saved packets if any
  972. {
  973. auto it = m_SavedPackets.find (receiveStreamID);
  974. if (it != m_SavedPackets.end ())
  975. {
  976. LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID);
  977. for (auto it1: it->second)
  978. incomingStream->HandleNextPacket (it1);
  979. m_SavedPackets.erase (it);
  980. }
  981. }
  982. // accept
  983. if (m_Acceptor != nullptr)
  984. m_Acceptor (incomingStream);
  985. else
  986. {
  987. LogPrint (eLogWarning, "Streaming: Acceptor for incoming stream is not set");
  988. if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG)
  989. {
  990. m_PendingIncomingStreams.push_back (incomingStream);
  991. m_PendingIncomingTimer.cancel ();
  992. m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
  993. m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer,
  994. shared_from_this (), std::placeholders::_1));
  995. LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID);
  996. }
  997. else
  998. {
  999. LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG);
  1000. incomingStream->Close ();
  1001. }
  1002. }
  1003. }
  1004. else // follow on packet without SYN
  1005. {
  1006. uint32_t receiveStreamID = packet->GetReceiveStreamID ();
  1007. auto it1 = m_IncomingStreams.find (receiveStreamID);
  1008. if (it1 != m_IncomingStreams.end ())
  1009. {
  1010. // found
  1011. it1->second->HandleNextPacket (packet);
  1012. return;
  1013. }
  1014. // save follow on packet
  1015. auto it = m_SavedPackets.find (receiveStreamID);
  1016. if (it != m_SavedPackets.end ())
  1017. it->second.push_back (packet);
  1018. else
  1019. {
  1020. m_SavedPackets[receiveStreamID] = std::list<Packet *>{ packet };
  1021. auto timer = std::make_shared<boost::asio::deadline_timer> (m_Owner->GetService ());
  1022. timer->expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
  1023. auto s = shared_from_this ();
  1024. timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode)
  1025. {
  1026. if (ecode != boost::asio::error::operation_aborted)
  1027. {
  1028. auto it = s->m_SavedPackets.find (receiveStreamID);
  1029. if (it != s->m_SavedPackets.end ())
  1030. {
  1031. for (auto it1: it->second) s->DeletePacket (it1);
  1032. it->second.clear ();
  1033. s->m_SavedPackets.erase (it);
  1034. }
  1035. }
  1036. });
  1037. }
  1038. }
  1039. }
  1040. }
  1041. std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port)
  1042. {
  1043. auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port);
  1044. std::unique_lock<std::mutex> l(m_StreamsMutex);
  1045. m_Streams[s->GetRecvStreamID ()] = s;
  1046. return s;
  1047. }
  1048. std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID)
  1049. {
  1050. auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
  1051. std::unique_lock<std::mutex> l(m_StreamsMutex);
  1052. m_Streams[s->GetRecvStreamID ()] = s;
  1053. m_IncomingStreams[receiveStreamID] = s;
  1054. return s;
  1055. }
  1056. void StreamingDestination::DeleteStream (std::shared_ptr<Stream> stream)
  1057. {
  1058. if (stream)
  1059. {
  1060. std::unique_lock<std::mutex> l(m_StreamsMutex);
  1061. m_Streams.erase (stream->GetRecvStreamID ());
  1062. m_IncomingStreams.erase (stream->GetSendStreamID ());
  1063. }
  1064. }
  1065. void StreamingDestination::SetAcceptor (const Acceptor& acceptor)
  1066. {
  1067. m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet
  1068. auto s = shared_from_this ();
  1069. m_Owner->GetService ().post([s](void)
  1070. {
  1071. // take care about incoming queue
  1072. for (auto& it: s->m_PendingIncomingStreams)
  1073. if (it->GetStatus () == eStreamStatusOpen) // still open?
  1074. s->m_Acceptor (it);
  1075. s->m_PendingIncomingStreams.clear ();
  1076. s->m_PendingIncomingTimer.cancel ();
  1077. });
  1078. }
  1079. void StreamingDestination::ResetAcceptor ()
  1080. {
  1081. if (m_Acceptor) m_Acceptor (nullptr);
  1082. m_Acceptor = nullptr;
  1083. }
  1084. void StreamingDestination::AcceptOnce (const Acceptor& acceptor)
  1085. {
  1086. m_Owner->GetService ().post([acceptor, this](void)
  1087. {
  1088. if (!m_PendingIncomingStreams.empty ())
  1089. {
  1090. acceptor (m_PendingIncomingStreams.front ());
  1091. m_PendingIncomingStreams.pop_front ();
  1092. if (m_PendingIncomingStreams.empty ())
  1093. m_PendingIncomingTimer.cancel ();
  1094. }
  1095. else // we must save old acceptor and set it back
  1096. {
  1097. m_Acceptor = std::bind (&StreamingDestination::AcceptOnceAcceptor, this,
  1098. std::placeholders::_1, acceptor, m_Acceptor);
  1099. }
  1100. });
  1101. }
  1102. void StreamingDestination::AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev)
  1103. {
  1104. m_Acceptor = prev;
  1105. acceptor (stream);
  1106. }
  1107. void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode)
  1108. {
  1109. if (ecode != boost::asio::error::operation_aborted)
  1110. {
  1111. LogPrint (eLogWarning, "Streaming: Pending incoming timeout expired");
  1112. for (auto& it: m_PendingIncomingStreams)
  1113. it->Close ();
  1114. m_PendingIncomingStreams.clear ();
  1115. }
  1116. }
  1117. void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len)
  1118. {
  1119. // unzip it
  1120. Packet * uncompressed = NewPacket ();
  1121. uncompressed->offset = 0;
  1122. uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE);
  1123. if (uncompressed->len)
  1124. HandleNextPacket (uncompressed);
  1125. else
  1126. DeletePacket (uncompressed);
  1127. }
  1128. std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort)
  1129. {
  1130. auto msg = NewI2NPShortMessage ();
  1131. if (!m_Gzip || len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE)
  1132. m_Deflator.SetCompressionLevel (Z_NO_COMPRESSION);
  1133. else
  1134. m_Deflator.SetCompressionLevel (Z_DEFAULT_COMPRESSION);
  1135. uint8_t * buf = msg->GetPayload ();
  1136. buf += 4; // reserve for lengthlength
  1137. msg->len += 4;
  1138. size_t size = m_Deflator.Deflate (payload, len, buf, msg->maxLen - msg->len);
  1139. if (size)
  1140. {
  1141. htobe32buf (msg->GetPayload (), size); // length
  1142. htobe16buf (buf + 4, m_LocalPort); // source port
  1143. htobe16buf (buf + 6, toPort); // destination port
  1144. buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol
  1145. msg->len += size;
  1146. msg->FillI2NPMessageHeader (eI2NPData);
  1147. }
  1148. else
  1149. msg = nullptr;
  1150. return msg;
  1151. }
  1152. }
  1153. }