Host.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file Host.cpp
  15. * @author Alex Leverington <nessence@gmail.com>
  16. * @author Gav Wood <i@gavwood.com>
  17. * @date 2014
  18. */
  19. #include <set>
  20. #include <chrono>
  21. #include <thread>
  22. #include <mutex>
  23. #include <memory>
  24. #include <boost/algorithm/string.hpp>
  25. #include <libdevcore/Common.h>
  26. #include <libdevcore/Assertions.h>
  27. #include <libdevcore/CommonIO.h>
  28. #include <libdevcore/Exceptions.h>
  29. #include <libdevcore/FileSystem.h>
  30. #include "Session.h"
  31. #include "Common.h"
  32. #include "Capability.h"
  33. #include "UPnP.h"
  34. #include "RLPxHandshake.h"
  35. #include "Host.h"
  36. using namespace std;
  37. using namespace dev;
  38. using namespace dev::p2p;
  39. /// Interval at which Host::run will call keepAlivePeers to ping peers.
  40. std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30);
  41. /// Disconnect timeout after failure to respond to keepAlivePeers ping.
  42. std::chrono::milliseconds const c_keepAliveTimeOut = std::chrono::milliseconds(1000);
  43. HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
  44. void HostNodeTableHandler::processEvent(NodeID const& _n, NodeTableEventType const& _e)
  45. {
  46. m_host.onNodeTableEvent(_n, _e);
  47. }
  48. ReputationManager::ReputationManager()
  49. {
  50. }
  51. void ReputationManager::noteRude(Session const& _s, std::string const& _sub)
  52. {
  53. DEV_WRITE_GUARDED(x_nodes)
  54. m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].isRude = true;
  55. }
  56. bool ReputationManager::isRude(Session const& _s, std::string const& _sub) const
  57. {
  58. DEV_READ_GUARDED(x_nodes)
  59. {
  60. auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
  61. if (nit == m_nodes.end())
  62. return false;
  63. auto sit = nit->second.subs.find(_sub);
  64. bool ret = sit == nit->second.subs.end() ? false : sit->second.isRude;
  65. return _sub.empty() ? ret : (ret || isRude(_s));
  66. }
  67. return false;
  68. }
  69. void ReputationManager::setData(Session const& _s, std::string const& _sub, bytes const& _data)
  70. {
  71. DEV_WRITE_GUARDED(x_nodes)
  72. m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].data = _data;
  73. }
  74. bytes ReputationManager::data(Session const& _s, std::string const& _sub) const
  75. {
  76. DEV_READ_GUARDED(x_nodes)
  77. {
  78. auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
  79. if (nit == m_nodes.end())
  80. return bytes();
  81. auto sit = nit->second.subs.find(_sub);
  82. return sit == nit->second.subs.end() ? bytes() : sit->second.data;
  83. }
  84. return bytes();
  85. }
  86. Host::Host(string const& _clientVersion, KeyPair const& _alias, NetworkPreferences const& _n):
  87. Worker("p2p", 0),
  88. m_clientVersion(_clientVersion),
  89. m_netPrefs(_n),
  90. m_ifAddresses(Network::getInterfaceAddresses()),
  91. m_ioService(2),
  92. m_tcp4Acceptor(m_ioService),
  93. m_alias(_alias),
  94. m_lastPing(chrono::steady_clock::time_point::min())
  95. {
  96. clog(NetNote) << "Id:" << id();
  97. }
  98. Host::Host(string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
  99. Host(_clientVersion, networkAlias(_restoreNetwork), _n)
  100. {
  101. m_restoreNetwork = _restoreNetwork.toBytes();
  102. }
  103. Host::~Host()
  104. {
  105. stop();
  106. }
  107. void Host::start()
  108. {
  109. DEV_TIMED_FUNCTION_ABOVE(500);
  110. startWorking();
  111. while (isWorking() && !haveNetwork())
  112. this_thread::sleep_for(chrono::milliseconds(10));
  113. // network start failed!
  114. if (isWorking())
  115. return;
  116. clog(NetWarn) << "Network start failed!";
  117. doneWorking();
  118. }
  119. void Host::stop()
  120. {
  121. // called to force io_service to kill any remaining tasks it might have -
  122. // such tasks may involve socket reads from Capabilities that maintain references
  123. // to resources we're about to free.
  124. {
  125. // Although m_run is set by stop() or start(), it effects m_runTimer so x_runTimer is used instead of a mutex for m_run.
  126. Guard l(x_runTimer);
  127. // ignore if already stopped/stopping
  128. if (!m_run)
  129. return;
  130. // signal run() to prepare for shutdown and reset m_timer
  131. m_run = false;
  132. }
  133. // wait for m_timer to reset (indicating network scheduler has stopped)
  134. while (!!m_timer)
  135. this_thread::sleep_for(chrono::milliseconds(50));
  136. // stop worker thread
  137. if (isWorking())
  138. stopWorking();
  139. }
  140. void Host::doneWorking()
  141. {
  142. // reset ioservice (cancels all timers and allows manually polling network, below)
  143. m_ioService.reset();
  144. DEV_GUARDED(x_timers)
  145. m_timers.clear();
  146. // shutdown acceptor
  147. m_tcp4Acceptor.cancel();
  148. if (m_tcp4Acceptor.is_open())
  149. m_tcp4Acceptor.close();
  150. // There maybe an incoming connection which started but hasn't finished.
  151. // Wait for acceptor to end itself instead of assuming it's complete.
  152. // This helps ensure a peer isn't stopped at the same time it's starting
  153. // and that socket for pending connection is closed.
  154. while (m_accepting)
  155. m_ioService.poll();
  156. // stop capabilities (eth: stops syncing or block/tx broadcast)
  157. for (auto const& h: m_capabilities)
  158. h.second->onStopping();
  159. // disconnect pending handshake, before peers, as a handshake may create a peer
  160. for (unsigned n = 0;; n = 0)
  161. {
  162. DEV_GUARDED(x_connecting)
  163. for (auto const& i: m_connecting)
  164. if (auto h = i.lock())
  165. {
  166. h->cancel();
  167. n++;
  168. }
  169. if (!n)
  170. break;
  171. m_ioService.poll();
  172. }
  173. // disconnect peers
  174. for (unsigned n = 0;; n = 0)
  175. {
  176. DEV_RECURSIVE_GUARDED(x_sessions)
  177. for (auto i: m_sessions)
  178. if (auto p = i.second.lock())
  179. if (p->isConnected())
  180. {
  181. p->disconnect(ClientQuit);
  182. n++;
  183. }
  184. if (!n)
  185. break;
  186. // poll so that peers send out disconnect packets
  187. m_ioService.poll();
  188. }
  189. // stop network (again; helpful to call before subsequent reset())
  190. m_ioService.stop();
  191. // reset network (allows reusing ioservice in future)
  192. m_ioService.reset();
  193. // finally, clear out peers (in case they're lingering)
  194. RecursiveGuard l(x_sessions);
  195. m_sessions.clear();
  196. }
  197. void Host::startPeerSession(Public const& _id, RLP const& _rlp, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s)
  198. {
  199. // session maybe ingress or egress so m_peers and node table entries may not exist
  200. shared_ptr<Peer> p;
  201. DEV_RECURSIVE_GUARDED(x_sessions)
  202. {
  203. if (m_peers.count(_id))
  204. p = m_peers[_id];
  205. else
  206. {
  207. // peer doesn't exist, try to get port info from node table
  208. if (m_nodeTable)
  209. if (Node n = m_nodeTable->node(_id))
  210. p = make_shared<Peer>(n);
  211. if (!p)
  212. p = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint));
  213. m_peers[_id] = p;
  214. }
  215. }
  216. if (p->isOffline())
  217. p->m_lastConnected = std::chrono::system_clock::now();
  218. p->endpoint.address = _s->remoteEndpoint().address();
  219. auto protocolVersion = _rlp[0].toInt<unsigned>();
  220. auto clientVersion = _rlp[1].toString();
  221. auto caps = _rlp[2].toVector<CapDesc>();
  222. auto listenPort = _rlp[3].toInt<unsigned short>();
  223. auto pub = _rlp[4].toHash<Public>();
  224. if (pub != _id)
  225. {
  226. cdebug << "Wrong ID: " << pub << " vs. " << _id;
  227. return;
  228. }
  229. // clang error (previously: ... << hex << caps ...)
  230. // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
  231. stringstream capslog;
  232. // leave only highset mutually supported capability version
  233. caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
  234. for (auto cap: caps)
  235. capslog << "(" << cap.first << "," << dec << cap.second << ")";
  236. clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
  237. // create session so disconnects are managed
  238. auto ps = make_shared<Session>(this, move(_io), _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
  239. if (protocolVersion < dev::p2p::c_protocolVersion - 1)
  240. {
  241. ps->disconnect(IncompatibleProtocol);
  242. return;
  243. }
  244. if (caps.empty())
  245. {
  246. ps->disconnect(UselessPeer);
  247. return;
  248. }
  249. if (m_netPrefs.pin && !m_requiredPeers.count(_id))
  250. {
  251. cdebug << "Unexpected identity from peer (got" << _id << ", must be one of " << m_requiredPeers << ")";
  252. ps->disconnect(UnexpectedIdentity);
  253. return;
  254. }
  255. {
  256. RecursiveGuard l(x_sessions);
  257. if (m_sessions.count(_id) && !!m_sessions[_id].lock())
  258. if (auto s = m_sessions[_id].lock())
  259. if(s->isConnected())
  260. {
  261. // Already connected.
  262. clog(NetWarn) << "Session already exists for peer with id" << _id;
  263. ps->disconnect(DuplicatePeer);
  264. return;
  265. }
  266. if (!peerSlotsAvailable())
  267. {
  268. ps->disconnect(TooManyPeers);
  269. return;
  270. }
  271. unsigned offset = (unsigned)UserPacket;
  272. uint16_t cnt = 1;
  273. // todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
  274. for (auto const& i: caps)
  275. {
  276. auto pcap = m_capabilities[i];
  277. if (!pcap)
  278. return ps->disconnect(IncompatibleProtocol);
  279. if (Session::isFramingAllowedForVersion(protocolVersion))
  280. pcap->newPeerCapability(ps, 0, i, cnt++);
  281. else
  282. {
  283. pcap->newPeerCapability(ps, offset, i, 0);
  284. offset += pcap->messageCount();
  285. }
  286. }
  287. ps->start();
  288. m_sessions[_id] = ps;
  289. }
  290. clog(NetP2PNote) << "p2p.host.peer.register" << _id;
  291. }
  292. void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
  293. {
  294. if (_e == NodeEntryAdded)
  295. {
  296. clog(NetP2PNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
  297. // only add iff node is in node table
  298. if (Node n = m_nodeTable->node(_n))
  299. {
  300. shared_ptr<Peer> p;
  301. DEV_RECURSIVE_GUARDED(x_sessions)
  302. {
  303. if (m_peers.count(_n))
  304. {
  305. p = m_peers[_n];
  306. p->endpoint = n.endpoint;
  307. }
  308. else
  309. {
  310. p = make_shared<Peer>(n);
  311. m_peers[_n] = p;
  312. clog(NetP2PNote) << "p2p.host.peers.events.peerAdded " << _n << p->endpoint;
  313. }
  314. }
  315. if (peerSlotsAvailable(Egress))
  316. connect(p);
  317. }
  318. }
  319. else if (_e == NodeEntryDropped)
  320. {
  321. clog(NetP2PNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
  322. RecursiveGuard l(x_sessions);
  323. if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
  324. m_peers.erase(_n);
  325. }
  326. }
  327. void Host::determinePublic()
  328. {
  329. // set m_tcpPublic := listenIP (if public) > public > upnp > unspecified address.
  330. auto ifAddresses = Network::getInterfaceAddresses();
  331. auto laddr = m_netPrefs.listenIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.listenIPAddress);
  332. auto lset = !laddr.is_unspecified();
  333. auto paddr = m_netPrefs.publicIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.publicIPAddress);
  334. auto pset = !paddr.is_unspecified();
  335. bool listenIsPublic = lset && isPublicAddress(laddr);
  336. bool publicIsHost = !lset && pset && ifAddresses.count(paddr);
  337. bi::tcp::endpoint ep(bi::address(), m_listenPort);
  338. if (m_netPrefs.traverseNAT && listenIsPublic)
  339. {
  340. clog(NetNote) << "Listen address set to Public address:" << laddr << ". UPnP disabled.";
  341. ep.address(laddr);
  342. }
  343. else if (m_netPrefs.traverseNAT && publicIsHost)
  344. {
  345. clog(NetNote) << "Public address set to Host configured address:" << paddr << ". UPnP disabled.";
  346. ep.address(paddr);
  347. }
  348. else if (m_netPrefs.traverseNAT)
  349. {
  350. bi::address natIFAddr;
  351. ep = Network::traverseNAT(lset && ifAddresses.count(laddr) ? std::set<bi::address>({laddr}) : ifAddresses, m_listenPort, natIFAddr);
  352. if (lset && natIFAddr != laddr)
  353. // if listen address is set, Host will use it, even if upnp returns different
  354. clog(NetWarn) << "Listen address" << laddr << "differs from local address" << natIFAddr << "returned by UPnP!";
  355. if (pset && ep.address() != paddr)
  356. {
  357. // if public address is set, Host will advertise it, even if upnp returns different
  358. clog(NetWarn) << "Specified public address" << paddr << "differs from external address" << ep.address() << "returned by UPnP!";
  359. ep.address(paddr);
  360. }
  361. }
  362. else if (pset)
  363. ep.address(paddr);
  364. m_tcpPublic = ep;
  365. }
  366. void Host::runAcceptor()
  367. {
  368. assert(m_listenPort > 0);
  369. if (m_run && !m_accepting)
  370. {
  371. clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
  372. m_accepting = true;
  373. auto socket = make_shared<RLPXSocket>(m_ioService);
  374. m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
  375. {
  376. m_accepting = false;
  377. if (ec || !m_run)
  378. {
  379. socket->close();
  380. return;
  381. }
  382. if (peerCount() > peerSlots(Ingress))
  383. {
  384. clog(NetConnect) << "Dropping incoming connect due to maximum peer count (" << Ingress << " * ideal peer count): " << socket->remoteEndpoint();
  385. socket->close();
  386. if (ec.value() < 1)
  387. runAcceptor();
  388. return;
  389. }
  390. bool success = false;
  391. try
  392. {
  393. // incoming connection; we don't yet know nodeid
  394. auto handshake = make_shared<RLPXHandshake>(this, socket);
  395. m_connecting.push_back(handshake);
  396. handshake->start();
  397. success = true;
  398. }
  399. catch (Exception const& _e)
  400. {
  401. clog(NetWarn) << "ERROR: " << diagnostic_information(_e);
  402. }
  403. catch (std::exception const& _e)
  404. {
  405. clog(NetWarn) << "ERROR: " << _e.what();
  406. }
  407. if (!success)
  408. socket->ref().close();
  409. runAcceptor();
  410. });
  411. }
  412. }
  413. std::unordered_map<Public, std::string> const& Host::pocHosts()
  414. {
  415. static const std::unordered_map<Public, std::string> c_ret = {
  416. { Public("5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a"), "gav.ethdev.com:30300" },
  417. { Public("e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc"), "gav.ethdev.com:30303" },
  418. { Public("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c"), "52.16.188.185:30303" },
  419. { Public("7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034"), "54.207.93.166:30303" },
  420. { Public("5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a"), "92.51.165.126:30303" },
  421. };
  422. return c_ret;
  423. }
  424. void Host::addPeer(NodeSpec const& _s, PeerType _t)
  425. {
  426. if (_t == PeerType::Optional)
  427. addNode(_s.id(), _s.nodeIPEndpoint());
  428. else
  429. requirePeer(_s.id(), _s.nodeIPEndpoint());
  430. }
  431. void Host::addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint)
  432. {
  433. // return if network is stopped while waiting on Host::run() or nodeTable to start
  434. while (!haveNetwork())
  435. if (isWorking())
  436. this_thread::sleep_for(chrono::milliseconds(50));
  437. else
  438. return;
  439. if (_endpoint.tcpPort < 30300 || _endpoint.tcpPort > 30305)
  440. clog(NetConnect) << "Non-standard port being recorded: " << _endpoint.tcpPort;
  441. if (m_nodeTable)
  442. m_nodeTable->addNode(Node(_node, _endpoint));
  443. }
  444. void Host::requirePeer(NodeID const& _n, NodeIPEndpoint const& _endpoint)
  445. {
  446. m_requiredPeers.insert(_n);
  447. if (!m_run)
  448. return;
  449. Node node(_n, _endpoint, PeerType::Required);
  450. if (_n)
  451. {
  452. // create or update m_peers entry
  453. shared_ptr<Peer> p;
  454. DEV_RECURSIVE_GUARDED(x_sessions)
  455. if (m_peers.count(_n))
  456. {
  457. p = m_peers[_n];
  458. p->endpoint = node.endpoint;
  459. p->peerType = PeerType::Required;
  460. }
  461. else
  462. {
  463. p = make_shared<Peer>(node);
  464. m_peers[_n] = p;
  465. }
  466. // required for discovery
  467. if (m_nodeTable)
  468. m_nodeTable->addNode(*p, NodeTable::NodeRelation::Unknown);
  469. }
  470. else if (m_nodeTable)
  471. {
  472. m_nodeTable->addNode(node);
  473. auto t = make_shared<boost::asio::deadline_timer>(m_ioService);
  474. t->expires_from_now(boost::posix_time::milliseconds(600));
  475. t->async_wait([this, _n](boost::system::error_code const& _ec)
  476. {
  477. if (!_ec)
  478. if (m_nodeTable)
  479. if (auto n = m_nodeTable->node(_n))
  480. requirePeer(n.id, n.endpoint);
  481. });
  482. DEV_GUARDED(x_timers)
  483. m_timers.push_back(t);
  484. }
  485. }
  486. void Host::relinquishPeer(NodeID const& _node)
  487. {
  488. Guard l(x_requiredPeers);
  489. if (m_requiredPeers.count(_node))
  490. m_requiredPeers.erase(_node);
  491. }
  492. void Host::connect(std::shared_ptr<Peer> const& _p)
  493. {
  494. if (!m_run)
  495. return;
  496. if (havePeerSession(_p->id))
  497. {
  498. clog(NetConnect) << "Aborted connect. Node already connected.";
  499. return;
  500. }
  501. if (!!m_nodeTable && !m_nodeTable->haveNode(_p->id) && _p->peerType == PeerType::Optional)
  502. return;
  503. // prevent concurrently connecting to a node
  504. Peer *nptr = _p.get();
  505. {
  506. Guard l(x_pendingNodeConns);
  507. if (m_pendingPeerConns.count(nptr))
  508. return;
  509. m_pendingPeerConns.insert(nptr);
  510. }
  511. _p->m_lastAttempted = std::chrono::system_clock::now();
  512. bi::tcp::endpoint ep(_p->endpoint);
  513. clog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id();
  514. auto socket = make_shared<RLPXSocket>(m_ioService);
  515. socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
  516. {
  517. _p->m_lastAttempted = std::chrono::system_clock::now();
  518. _p->m_failedAttempts++;
  519. if (ec)
  520. {
  521. clog(NetConnect) << "Connection refused to node" << _p->id << "@" << ep << "(" << ec.message() << ")";
  522. // Manually set error (session not present)
  523. _p->m_lastDisconnect = TCPError;
  524. }
  525. else
  526. {
  527. clog(NetConnect) << "Connecting to" << _p->id << "@" << ep;
  528. auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
  529. {
  530. Guard l(x_connecting);
  531. m_connecting.push_back(handshake);
  532. }
  533. handshake->start();
  534. }
  535. Guard l(x_pendingNodeConns);
  536. m_pendingPeerConns.erase(nptr);
  537. });
  538. }
  539. PeerSessionInfos Host::peerSessionInfo() const
  540. {
  541. if (!m_run)
  542. return PeerSessionInfos();
  543. std::vector<PeerSessionInfo> ret;
  544. RecursiveGuard l(x_sessions);
  545. for (auto& i: m_sessions)
  546. if (auto j = i.second.lock())
  547. if (j->isConnected())
  548. DEV_GUARDED(j->x_info)
  549. ret.push_back(j->m_info);
  550. return ret;
  551. }
  552. size_t Host::peerCount() const
  553. {
  554. unsigned retCount = 0;
  555. RecursiveGuard l(x_sessions);
  556. for (auto& i: m_sessions)
  557. if (std::shared_ptr<Session> j = i.second.lock())
  558. if (j->isConnected())
  559. retCount++;
  560. return retCount;
  561. }
  562. void Host::run(boost::system::error_code const&)
  563. {
  564. if (!m_run)
  565. {
  566. // reset NodeTable
  567. m_nodeTable.reset();
  568. // stopping io service allows running manual network operations for shutdown
  569. // and also stops blocking worker thread, allowing worker thread to exit
  570. m_ioService.stop();
  571. // resetting timer signals network that nothing else can be scheduled to run
  572. m_timer.reset();
  573. return;
  574. }
  575. m_nodeTable->processEvents();
  576. // cleanup zombies
  577. DEV_GUARDED(x_connecting)
  578. m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); });
  579. DEV_GUARDED(x_timers)
  580. m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
  581. {
  582. return t->expires_from_now().total_milliseconds() < 0;
  583. });
  584. keepAlivePeers();
  585. // At this time peers will be disconnected based on natural TCP timeout.
  586. // disconnectLatePeers needs to be updated for the assumption that Session
  587. // is always live and to ensure reputation and fallback timers are properly
  588. // updated. // disconnectLatePeers();
  589. // todo: update peerSlotsAvailable()
  590. list<shared_ptr<Peer>> toConnect;
  591. unsigned reqConn = 0;
  592. {
  593. RecursiveGuard l(x_sessions);
  594. for (auto const& p: m_peers)
  595. {
  596. bool haveSession = havePeerSession(p.second->id);
  597. bool required = p.second->peerType == PeerType::Required;
  598. if (haveSession && required)
  599. reqConn++;
  600. else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
  601. toConnect.push_back(p.second);
  602. }
  603. }
  604. for (auto p: toConnect)
  605. if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
  606. connect(p);
  607. if (!m_netPrefs.pin)
  608. {
  609. unsigned pendingCount = 0;
  610. DEV_GUARDED(x_pendingNodeConns)
  611. pendingCount = m_pendingPeerConns.size();
  612. int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn;
  613. if (openSlots > 0)
  614. for (auto p: toConnect)
  615. if (p->peerType == PeerType::Optional && openSlots--)
  616. connect(p);
  617. }
  618. auto runcb = [this](boost::system::error_code const& error) { run(error); };
  619. m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
  620. m_timer->async_wait(runcb);
  621. }
  622. void Host::startedWorking()
  623. {
  624. asserts(!m_timer);
  625. {
  626. // prevent m_run from being set to true at same time as set to false by stop()
  627. // don't release mutex until m_timer is set so in case stop() is called at same
  628. // time, stop will wait on m_timer and graceful network shutdown.
  629. Guard l(x_runTimer);
  630. // create deadline timer
  631. m_timer.reset(new boost::asio::deadline_timer(m_ioService));
  632. m_run = true;
  633. }
  634. // start capability threads (ready for incoming connections)
  635. for (auto const& h: m_capabilities)
  636. h.second->onStarting();
  637. // try to open acceptor (todo: ipv6)
  638. int port = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs);
  639. if (port > 0)
  640. {
  641. m_listenPort = port;
  642. determinePublic();
  643. runAcceptor();
  644. }
  645. else
  646. clog(NetP2PNote) << "p2p.start.notice id:" << id() << "TCP Listen port is invalid or unavailable.";
  647. auto nodeTable = make_shared<NodeTable>(
  648. m_ioService,
  649. m_alias,
  650. NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
  651. m_netPrefs.discovery
  652. );
  653. nodeTable->setEventHandler(new HostNodeTableHandler(*this));
  654. m_nodeTable = nodeTable;
  655. restoreNetwork(&m_restoreNetwork);
  656. clog(NetP2PNote) << "p2p.started id:" << id();
  657. run(boost::system::error_code());
  658. }
  659. void Host::doWork()
  660. {
  661. try
  662. {
  663. if (m_run)
  664. m_ioService.run();
  665. }
  666. catch (std::exception const& _e)
  667. {
  668. clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
  669. clog(NetP2PWarn) << "Network Restart is Recommended.";
  670. }
  671. }
  672. void Host::keepAlivePeers()
  673. {
  674. if (chrono::steady_clock::now() - c_keepAliveInterval < m_lastPing)
  675. return;
  676. RecursiveGuard l(x_sessions);
  677. for (auto it = m_sessions.begin(); it != m_sessions.end();)
  678. if (auto p = it->second.lock())
  679. {
  680. p->ping();
  681. ++it;
  682. }
  683. else
  684. it = m_sessions.erase(it);
  685. m_lastPing = chrono::steady_clock::now();
  686. }
  687. void Host::disconnectLatePeers()
  688. {
  689. auto now = chrono::steady_clock::now();
  690. if (now - c_keepAliveTimeOut < m_lastPing)
  691. return;
  692. RecursiveGuard l(x_sessions);
  693. for (auto p: m_sessions)
  694. if (auto pp = p.second.lock())
  695. if (now - c_keepAliveTimeOut > m_lastPing && pp->m_lastReceived < m_lastPing)
  696. pp->disconnect(PingTimeout);
  697. }
  698. bytes Host::saveNetwork() const
  699. {
  700. std::list<Peer> peers;
  701. {
  702. RecursiveGuard l(x_sessions);
  703. for (auto p: m_peers)
  704. if (p.second)
  705. peers.push_back(*p.second);
  706. }
  707. peers.sort();
  708. RLPStream network;
  709. int count = 0;
  710. for (auto const& p: peers)
  711. {
  712. // todo: ipv6
  713. if (!p.endpoint.address.is_v4())
  714. continue;
  715. // Only save peers which have connected within 2 days, with properly-advertised port and public IP address
  716. if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.peerType == PeerType::Required || p.endpoint.isAllowed()))
  717. {
  718. network.appendList(11);
  719. p.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
  720. network << p.id << (p.peerType == PeerType::Required ? true : false)
  721. << chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
  722. << chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
  723. << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
  724. count++;
  725. }
  726. }
  727. if (!!m_nodeTable)
  728. {
  729. auto state = m_nodeTable->snapshot();
  730. state.sort();
  731. for (auto const& entry: state)
  732. {
  733. network.appendList(4);
  734. entry.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
  735. network << entry.id;
  736. count++;
  737. }
  738. }
  739. // else: TODO: use previous configuration if available
  740. RLPStream ret(3);
  741. ret << dev::p2p::c_protocolVersion << m_alias.secret().ref();
  742. ret.appendList(count);
  743. if (!!count)
  744. ret.appendRaw(network.out(), count);
  745. return ret.out();
  746. }
  747. void Host::restoreNetwork(bytesConstRef _b)
  748. {
  749. if (!_b.size())
  750. return;
  751. // nodes can only be added if network is added
  752. if (!isStarted())
  753. BOOST_THROW_EXCEPTION(NetworkStartRequired());
  754. if (m_dropPeers)
  755. return;
  756. RecursiveGuard l(x_sessions);
  757. RLP r(_b);
  758. unsigned fileVersion = r[0].toInt<unsigned>();
  759. if (r.itemCount() > 0 && r[0].isInt() && fileVersion >= dev::p2p::c_protocolVersion - 1)
  760. {
  761. // r[0] = version
  762. // r[1] = key
  763. // r[2] = nodes
  764. for (auto i: r[2])
  765. {
  766. // todo: ipv6
  767. if (i[0].itemCount() != 4 && i[0].size() != 4)
  768. continue;
  769. if (i.itemCount() == 4 || i.itemCount() == 11)
  770. {
  771. Node n((NodeID)i[3], NodeIPEndpoint(i));
  772. if (i.itemCount() == 4 && n.endpoint.isAllowed())
  773. m_nodeTable->addNode(n);
  774. else if (i.itemCount() == 11)
  775. {
  776. n.peerType = i[4].toInt<bool>() ? PeerType::Required : PeerType::Optional;
  777. if (!n.endpoint.isAllowed() && n.peerType == PeerType::Optional)
  778. continue;
  779. shared_ptr<Peer> p = make_shared<Peer>(n);
  780. p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
  781. p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[6].toInt<unsigned>()));
  782. p->m_failedAttempts = i[7].toInt<unsigned>();
  783. p->m_lastDisconnect = (DisconnectReason)i[8].toInt<unsigned>();
  784. p->m_score = (int)i[9].toInt<unsigned>();
  785. p->m_rating = (int)i[10].toInt<unsigned>();
  786. m_peers[p->id] = p;
  787. if (p->peerType == PeerType::Required)
  788. requirePeer(p->id, n.endpoint);
  789. else
  790. m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
  791. }
  792. }
  793. else if (i.itemCount() == 3 || i.itemCount() == 10)
  794. {
  795. Node n((NodeID)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
  796. if (i.itemCount() == 3 && n.endpoint.isAllowed())
  797. m_nodeTable->addNode(n);
  798. else if (i.itemCount() == 10)
  799. {
  800. n.peerType = i[3].toInt<bool>() ? PeerType::Required : PeerType::Optional;
  801. if (!n.endpoint.isAllowed() && n.peerType == PeerType::Optional)
  802. continue;
  803. shared_ptr<Peer> p = make_shared<Peer>(n);
  804. p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
  805. p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
  806. p->m_failedAttempts = i[6].toInt<unsigned>();
  807. p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
  808. p->m_score = (int)i[8].toInt<unsigned>();
  809. p->m_rating = (int)i[9].toInt<unsigned>();
  810. m_peers[p->id] = p;
  811. if (p->peerType == PeerType::Required)
  812. requirePeer(p->id, n.endpoint);
  813. else
  814. m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
  815. }
  816. }
  817. }
  818. }
  819. }
  820. KeyPair Host::networkAlias(bytesConstRef _b)
  821. {
  822. RLP r(_b);
  823. if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<unsigned>() >= 3)
  824. return KeyPair(Secret(r[1].toBytes()));
  825. else
  826. return KeyPair::create();
  827. }