Streaming.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. #ifndef STREAMING_H__
  2. #define STREAMING_H__
  3. #include <inttypes.h>
  4. #include <string>
  5. #include <map>
  6. #include <set>
  7. #include <queue>
  8. #include <functional>
  9. #include <memory>
  10. #include <mutex>
  11. #include <boost/asio.hpp>
  12. #include "Base.h"
  13. #include "I2PEndian.h"
  14. #include "Identity.h"
  15. #include "LeaseSet.h"
  16. #include "I2NPProtocol.h"
  17. #include "Garlic.h"
  18. #include "Tunnel.h"
  19. #include "util.h" // MemoryPool
  20. namespace i2p
  21. {
  22. namespace client
  23. {
  24. class ClientDestination;
  25. }
  26. namespace stream
  27. {
  28. const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001;
  29. const uint16_t PACKET_FLAG_CLOSE = 0x0002;
  30. const uint16_t PACKET_FLAG_RESET = 0x0004;
  31. const uint16_t PACKET_FLAG_SIGNATURE_INCLUDED = 0x0008;
  32. const uint16_t PACKET_FLAG_SIGNATURE_REQUESTED = 0x0010;
  33. const uint16_t PACKET_FLAG_FROM_INCLUDED = 0x0020;
  34. const uint16_t PACKET_FLAG_DELAY_REQUESTED = 0x0040;
  35. const uint16_t PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED = 0x0080;
  36. const uint16_t PACKET_FLAG_PROFILE_INTERACTIVE = 0x0100;
  37. const uint16_t PACKET_FLAG_ECHO = 0x0200;
  38. const uint16_t PACKET_FLAG_NO_ACK = 0x0400;
  39. const uint16_t PACKET_FLAG_OFFLINE_SIGNATURE = 0x0800;
  40. const size_t STREAMING_MTU = 1730;
  41. const size_t MAX_PACKET_SIZE = 4096;
  42. const size_t COMPRESSION_THRESHOLD_SIZE = 66;
  43. const int MAX_NUM_RESEND_ATTEMPTS = 6;
  44. const int WINDOW_SIZE = 6; // in messages
  45. const int MIN_WINDOW_SIZE = 1;
  46. const int MAX_WINDOW_SIZE = 128;
  47. const int INITIAL_RTT = 8000; // in milliseconds
  48. const int INITIAL_RTO = 9000; // in milliseconds
  49. const int SYN_TIMEOUT = 200; // how long we wait for SYN after follow-on, in milliseconds
  50. const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
  51. const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
  52. const int MAX_RECEIVE_TIMEOUT = 30; // in seconds
  53. struct Packet
  54. {
  55. size_t len, offset;
  56. uint8_t buf[MAX_PACKET_SIZE];
  57. uint64_t sendTime;
  58. Packet (): len (0), offset (0), sendTime (0) {};
  59. uint8_t * GetBuffer () { return buf + offset; };
  60. size_t GetLength () const { return len - offset; };
  61. uint32_t GetSendStreamID () const { return bufbe32toh (buf); };
  62. uint32_t GetReceiveStreamID () const { return bufbe32toh (buf + 4); };
  63. uint32_t GetSeqn () const { return bufbe32toh (buf + 8); };
  64. uint32_t GetAckThrough () const { return bufbe32toh (buf + 12); };
  65. uint8_t GetNACKCount () const { return buf[16]; };
  66. uint32_t GetNACK (int i) const { return bufbe32toh (buf + 17 + 4 * i); };
  67. const uint8_t * GetOption () const { return buf + 17 + GetNACKCount ()*4 + 3; }; // 3 = resendDelay + flags
  68. uint16_t GetFlags () const { return bufbe16toh (GetOption () - 2); };
  69. uint16_t GetOptionSize () const { return bufbe16toh (GetOption ()); };
  70. const uint8_t * GetOptionData () const { return GetOption () + 2; };
  71. const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); };
  72. bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; };
  73. bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; };
  74. };
  75. struct PacketCmp
  76. {
  77. bool operator() (const Packet * p1, const Packet * p2) const
  78. {
  79. return p1->GetSeqn () < p2->GetSeqn ();
  80. };
  81. };
  82. typedef std::function<void (const boost::system::error_code& ecode)> SendHandler;
  83. struct SendBuffer
  84. {
  85. uint8_t * buf;
  86. size_t len, offset;
  87. SendHandler handler;
  88. SendBuffer (const uint8_t * b, size_t l, SendHandler h):
  89. len(l), offset (0), handler(h)
  90. {
  91. buf = new uint8_t[len];
  92. memcpy (buf, b, len);
  93. }
  94. ~SendBuffer ()
  95. {
  96. delete[] buf;
  97. if (handler) handler(boost::system::error_code ());
  98. }
  99. size_t GetRemainingSize () const { return len - offset; };
  100. const uint8_t * GetRemaningBuffer () const { return buf + offset; };
  101. void Cancel () { if (handler) handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); handler = nullptr; };
  102. };
  103. class SendBufferQueue
  104. {
  105. public:
  106. SendBufferQueue (): m_Size (0) {};
  107. ~SendBufferQueue () { CleanUp (); };
  108. void Add (const uint8_t * buf, size_t len, SendHandler handler);
  109. size_t Get (uint8_t * buf, size_t len);
  110. size_t GetSize () const { return m_Size; };
  111. bool IsEmpty () const { return m_Buffers.empty (); };
  112. void CleanUp ();
  113. private:
  114. std::list<std::shared_ptr<SendBuffer> > m_Buffers;
  115. size_t m_Size;
  116. };
  117. enum StreamStatus
  118. {
  119. eStreamStatusNew = 0,
  120. eStreamStatusOpen,
  121. eStreamStatusReset,
  122. eStreamStatusClosing,
  123. eStreamStatusClosed
  124. };
  125. class StreamingDestination;
  126. class Stream: public std::enable_shared_from_this<Stream>
  127. {
  128. public:
  129. Stream (boost::asio::io_service& service, StreamingDestination& local,
  130. std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); // outgoing
  131. Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming
  132. ~Stream ();
  133. uint32_t GetSendStreamID () const { return m_SendStreamID; };
  134. uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
  135. std::shared_ptr<const i2p::data::LeaseSet> GetRemoteLeaseSet () const { return m_RemoteLeaseSet; };
  136. std::shared_ptr<const i2p::data::IdentityEx> GetRemoteIdentity () const { return m_RemoteIdentity; };
  137. bool IsOpen () const { return m_Status == eStreamStatusOpen; };
  138. bool IsEstablished () const { return m_SendStreamID; };
  139. StreamStatus GetStatus () const { return m_Status; };
  140. StreamingDestination& GetLocalDestination () { return m_LocalDestination; };
  141. void HandleNextPacket (Packet * packet);
  142. size_t Send (const uint8_t * buf, size_t len);
  143. void AsyncSend (const uint8_t * buf, size_t len, SendHandler handler);
  144. template<typename Buffer, typename ReceiveHandler>
  145. void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
  146. size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
  147. void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
  148. /** only call close from destination thread, use Stream::AsyncClose for other threads */
  149. void Close ();
  150. void Cancel () { m_ReceiveTimer.cancel (); };
  151. size_t GetNumSentBytes () const { return m_NumSentBytes; };
  152. size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
  153. size_t GetSendQueueSize () const { return m_SentPackets.size (); };
  154. size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); };
  155. size_t GetSendBufferSize () const { return m_SendBuffer.GetSize (); };
  156. int GetWindowSize () const { return m_WindowSize; };
  157. int GetRTT () const { return m_RTT; };
  158. /** don't call me */
  159. void Terminate ();
  160. private:
  161. void CleanUp ();
  162. void SendBuffer ();
  163. void SendQuickAck ();
  164. void SendClose ();
  165. bool SendPacket (Packet * packet);
  166. void SendPackets (const std::vector<Packet *>& packets);
  167. void SendUpdatedLeaseSet ();
  168. void SavePacket (Packet * packet);
  169. void ProcessPacket (Packet * packet);
  170. bool ProcessOptions (uint16_t flags, Packet * packet);
  171. void ProcessAck (Packet * packet);
  172. size_t ConcatenatePackets (uint8_t * buf, size_t len);
  173. void UpdateCurrentRemoteLease (bool expired = false);
  174. template<typename Buffer, typename ReceiveHandler>
  175. void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout);
  176. void ScheduleResend ();
  177. void HandleResendTimer (const boost::system::error_code& ecode);
  178. void HandleAckSendTimer (const boost::system::error_code& ecode);
  179. private:
  180. boost::asio::io_service& m_Service;
  181. uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber;
  182. int32_t m_LastReceivedSequenceNumber;
  183. StreamStatus m_Status;
  184. bool m_IsAckSendScheduled;
  185. StreamingDestination& m_LocalDestination;
  186. std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity;
  187. std::shared_ptr<const i2p::crypto::Verifier> m_TransientVerifier; // in case of offline key
  188. std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
  189. std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
  190. std::shared_ptr<const i2p::data::Lease> m_CurrentRemoteLease;
  191. std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel;
  192. std::queue<Packet *> m_ReceiveQueue;
  193. std::set<Packet *, PacketCmp> m_SavedPackets;
  194. std::set<Packet *, PacketCmp> m_SentPackets;
  195. boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer;
  196. size_t m_NumSentBytes, m_NumReceivedBytes;
  197. uint16_t m_Port;
  198. std::mutex m_SendBufferMutex;
  199. SendBufferQueue m_SendBuffer;
  200. int m_WindowSize, m_RTT, m_RTO, m_AckDelay;
  201. uint64_t m_LastWindowSizeIncreaseTime;
  202. int m_NumResendAttempts;
  203. };
  204. class StreamingDestination: public std::enable_shared_from_this<StreamingDestination>
  205. {
  206. public:
  207. typedef std::function<void (std::shared_ptr<Stream>)> Acceptor;
  208. StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort = 0, bool gzip = true);
  209. ~StreamingDestination ();
  210. void Start ();
  211. void Stop ();
  212. std::shared_ptr<Stream> CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0);
  213. void DeleteStream (std::shared_ptr<Stream> stream);
  214. void SetAcceptor (const Acceptor& acceptor);
  215. void ResetAcceptor ();
  216. bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
  217. void AcceptOnce (const Acceptor& acceptor);
  218. std::shared_ptr<i2p::client::ClientDestination> GetOwner () const { return m_Owner; };
  219. void SetOwner (std::shared_ptr<i2p::client::ClientDestination> owner) { m_Owner = owner; };
  220. uint16_t GetLocalPort () const { return m_LocalPort; };
  221. void HandleDataMessagePayload (const uint8_t * buf, size_t len);
  222. std::shared_ptr<I2NPMessage> CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort);
  223. Packet * NewPacket () { return m_PacketsPool.Acquire(); }
  224. void DeletePacket (Packet * p) { return m_PacketsPool.Release(p); }
  225. void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev);
  226. private:
  227. void HandleNextPacket (Packet * packet);
  228. std::shared_ptr<Stream> CreateNewIncomingStream (uint32_t receiveStreamID);
  229. void HandlePendingIncomingTimer (const boost::system::error_code& ecode);
  230. private:
  231. std::shared_ptr<i2p::client::ClientDestination> m_Owner;
  232. uint16_t m_LocalPort;
  233. bool m_Gzip; // gzip compression of data messages
  234. std::mutex m_StreamsMutex;
  235. std::map<uint32_t, std::shared_ptr<Stream> > m_Streams; // sendStreamID->stream
  236. std::map<uint32_t, std::shared_ptr<Stream> > m_IncomingStreams; // receiveStreamID->stream
  237. Acceptor m_Acceptor;
  238. std::list<std::shared_ptr<Stream> > m_PendingIncomingStreams;
  239. boost::asio::deadline_timer m_PendingIncomingTimer;
  240. std::map<uint32_t, std::list<Packet *> > m_SavedPackets; // receiveStreamID->packets, arrived before SYN
  241. i2p::util::MemoryPool<Packet> m_PacketsPool;
  242. public:
  243. i2p::data::GzipInflator m_Inflator;
  244. i2p::data::GzipDeflator m_Deflator;
  245. // for HTTP only
  246. const decltype(m_Streams)& GetStreams () const { return m_Streams; };
  247. };
  248. //-------------------------------------------------
  249. template<typename Buffer, typename ReceiveHandler>
  250. void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
  251. {
  252. auto s = shared_from_this();
  253. m_Service.post ([s, buffer, handler, timeout](void)
  254. {
  255. if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset)
  256. s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0);
  257. else
  258. {
  259. int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
  260. s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
  261. int left = timeout - t;
  262. auto self = s->shared_from_this();
  263. self->m_ReceiveTimer.async_wait (
  264. [self, buffer, handler, left](const boost::system::error_code & ec)
  265. {
  266. self->HandleReceiveTimer(ec, buffer, handler, left);
  267. });
  268. }
  269. });
  270. }
  271. template<typename Buffer, typename ReceiveHandler>
  272. void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout)
  273. {
  274. size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer));
  275. if (received > 0)
  276. handler (boost::system::error_code (), received);
  277. else if (ecode == boost::asio::error::operation_aborted)
  278. {
  279. // timeout not expired
  280. if (m_Status == eStreamStatusReset)
  281. handler (boost::asio::error::make_error_code (boost::asio::error::connection_reset), 0);
  282. else
  283. handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), 0);
  284. }
  285. else
  286. {
  287. // timeout expired
  288. if (remainingTimeout <= 0)
  289. handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received);
  290. else
  291. {
  292. // itermediate iterrupt
  293. SendUpdatedLeaseSet (); // send our leaseset if applicable
  294. AsyncReceive (buffer, handler, remainingTimeout);
  295. }
  296. }
  297. }
  298. }
  299. }
  300. #endif