UDP.h 8.3 KB

  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file UDP.h
  15. * @author Alex Leverington <nessence@gmail.com>
  16. * @date 2014
  17. */
  18. #pragma once
  19. #include <atomic>
  20. #include <memory>
  21. #include <vector>
  22. #include <deque>
  23. #include <array>
  24. #include <libdevcore/Guards.h>
  25. #include <libdevcrypto/Common.h>
  26. #include <libdevcore/SHA3.h>
  27. #include <libdevcore/Log.h>
  28. #include <libdevcore/RLP.h>
  29. #include "Common.h"
  30. namespace ba = boost::asio;
  31. namespace bi = ba::ip;
  32. namespace dev
  33. {
  34. namespace p2p
  35. {
  36. struct RLPXWarn: public LogChannel { static const char* name(); static const int verbosity = 0; };
  37. struct RLPXNote: public LogChannel { static const char* name(); static const int verbosity = 1; };
  38. /**
  39. * UDP Datagram
  40. * @todo make data protected/functional
  41. */
  42. class UDPDatagram
  43. {
  44. public:
  45. UDPDatagram(bi::udp::endpoint const& _ep): locus(_ep) {}
  46. UDPDatagram(bi::udp::endpoint const& _ep, bytes _data): data(_data), locus(_ep) {}
  47. bi::udp::endpoint const& endpoint() const { return locus; }
  48. bytes data;
  49. protected:
  50. bi::udp::endpoint locus;
  51. };
  52. /**
  53. * @brief RLPX Datagram which can be signed.
  54. */
  55. struct RLPXDatagramFace: public UDPDatagram
  56. {
  57. static uint32_t futureFromEpoch(std::chrono::seconds _sec) { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count()); }
  58. static uint32_t secondsSinceEpoch() { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now()).time_since_epoch()).count()); }
  59. static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
  60. RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
  61. virtual h256 sign(Secret const& _from);
  62. virtual uint8_t packetType() const = 0;
  63. virtual void streamRLP(RLPStream&) const = 0;
  64. virtual void interpretRLP(bytesConstRef _bytes) = 0;
  65. };
  66. /**
  67. * @brief Interface which UDPSocket will implement.
  68. */
  69. struct UDPSocketFace
  70. {
  71. virtual bool send(UDPDatagram const& _msg) = 0;
  72. virtual void disconnect() = 0;
  73. };
  74. /**
  75. * @brief Interface which a UDPSocket's owner must implement.
  76. */
  77. struct UDPSocketEvents
  78. {
  79. virtual void onDisconnected(UDPSocketFace*) {}
  80. virtual void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packetData) = 0;
  81. };
  82. /**
  83. * @brief UDP Interface
  84. * Handler must implement UDPSocketEvents.
  85. *
  86. * @todo multiple endpoints (we cannot advertise
  87. * @todo decouple deque from UDPDatagram and add ref() to datagram for fire&forget
  88. */
  89. template <typename Handler, unsigned MaxDatagramSize>
  90. class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
  91. {
  92. public:
  93. enum { maxDatagramSize = MaxDatagramSize };
  94. static_assert((unsigned)maxDatagramSize < 65507u, "UDP datagrams cannot be larger than 65507 bytes");
  95. /// Create socket for specific endpoint.
  96. UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, bi::udp::endpoint _endpoint): m_host(_host), m_endpoint(_endpoint), m_socket(_io) { m_started.store(false); m_closed.store(true); };
  97. /// Create socket which listens to all ports.
  98. UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
  99. virtual ~UDPSocket() { disconnect(); }
  100. /// Socket will begin listening for and delivering packets
  101. void connect();
  102. /// Send datagram.
  103. bool send(UDPDatagram const& _datagram);
  104. /// Returns if socket is open.
  105. bool isOpen() { return !m_closed; }
  106. /// Disconnect socket.
  107. void disconnect() { disconnectWithError(boost::asio::error::connection_reset); }
  108. protected:
  109. void doRead();
  110. void doWrite();
  111. void disconnectWithError(boost::system::error_code _ec);
  112. std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError.
  113. std::atomic<bool> m_closed; ///< Connection availability.
  114. UDPSocketEvents& m_host; ///< Interface which owns this socket.
  115. bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to.
  116. Mutex x_sendQ;
  117. std::deque<UDPDatagram> m_sendQ; ///< Queue for egress data.
  118. std::array<byte, maxDatagramSize> m_recvData; ///< Buffer for ingress data.
  119. bi::udp::endpoint m_recvEndpoint; ///< Endpoint data was received from.
  120. bi::udp::socket m_socket; ///< Boost asio udp socket.
  121. Mutex x_socketError; ///< Mutex for error which can be set from host or IO thread.
  122. boost::system::error_code m_socketError; ///< Set when shut down due to error.
  123. };
  124. template <typename Handler, unsigned MaxDatagramSize>
  125. void UDPSocket<Handler, MaxDatagramSize>::connect()
  126. {
  127. bool expect = false;
  128. if (!m_started.compare_exchange_strong(expect, true))
  129. return;
  130. m_socket.open(bi::udp::v4());
  131. try
  132. {
  133. m_socket.bind(m_endpoint);
  134. }
  135. catch (...)
  136. {
  137. m_socket.bind(bi::udp::endpoint(bi::udp::v4(), m_endpoint.port()));
  138. }
  139. // clear write queue so reconnect doesn't send stale messages
  140. Guard l(x_sendQ);
  141. m_sendQ.clear();
  142. m_closed = false;
  143. doRead();
  144. }
  145. template <typename Handler, unsigned MaxDatagramSize>
  146. bool UDPSocket<Handler, MaxDatagramSize>::send(UDPDatagram const& _datagram)
  147. {
  148. if (m_closed)
  149. return false;
  150. Guard l(x_sendQ);
  151. m_sendQ.push_back(_datagram);
  152. if (m_sendQ.size() == 1)
  153. doWrite();
  154. return true;
  155. }
  156. template <typename Handler, unsigned MaxDatagramSize>
  157. void UDPSocket<Handler, MaxDatagramSize>::doRead()
  158. {
  159. if (m_closed)
  160. return;
  161. auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
  162. m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
  163. {
  164. if (m_closed)
  165. return disconnectWithError(_ec);
  166. if (_ec != boost::system::errc::success)
  167. clog(NetWarn) << "Receiving UDP message failed. " << _ec.value() << ":" << _ec.message();
  168. if (_len)
  169. m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
  170. doRead();
  171. });
  172. }
  173. template <typename Handler, unsigned MaxDatagramSize>
  174. void UDPSocket<Handler, MaxDatagramSize>::doWrite()
  175. {
  176. if (m_closed)
  177. return;
  178. const UDPDatagram& datagram = m_sendQ[0];
  179. auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
  180. bi::udp::endpoint endpoint(datagram.endpoint());
  181. m_socket.async_send_to(boost::asio::buffer(datagram.data), endpoint, [this, self, endpoint](boost::system::error_code _ec, std::size_t)
  182. {
  183. if (m_closed)
  184. return disconnectWithError(_ec);
  185. if (_ec != boost::system::errc::success)
  186. clog(NetWarn) << "Failed delivering UDP message. " << _ec.value() << ":" << _ec.message();
  187. Guard l(x_sendQ);
  188. m_sendQ.pop_front();
  189. if (m_sendQ.empty())
  190. return;
  191. doWrite();
  192. });
  193. }
  194. template <typename Handler, unsigned MaxDatagramSize>
  195. void UDPSocket<Handler, MaxDatagramSize>::disconnectWithError(boost::system::error_code _ec)
  196. {
  197. // If !started and already stopped, shutdown has already occured. (EOF or Operation canceled)
  198. if (!m_started && m_closed && !m_socket.is_open() /* todo: veirfy this logic*/)
  199. return;
  200. assert(_ec);
  201. {
  202. // disconnect-operation following prior non-zero errors are ignored
  203. Guard l(x_socketError);
  204. if (m_socketError != boost::system::error_code())
  205. return;
  206. m_socketError = _ec;
  207. }
  208. // TODO: (if non-zero error) schedule high-priority writes
  209. // prevent concurrent disconnect
  210. bool expected = true;
  211. if (!m_started.compare_exchange_strong(expected, false))
  212. return;
  213. // set m_closed to true to prevent undeliverable egress messages
  214. bool wasClosed = m_closed;
  215. m_closed = true;
  216. // close sockets
  217. boost::system::error_code ec;
  218. m_socket.shutdown(bi::udp::socket::shutdown_both, ec);
  219. m_socket.close();
  220. // socket never started if it never left stopped-state (pre-handshake)
  221. if (wasClosed)
  222. return;
  223. m_host.onDisconnected(this);
  224. }
  225. }
  226. }