123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952 |
- /*
- This file is part of cpp-ethereum.
- cpp-ethereum is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- cpp-ethereum is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
- */
- /** @file Host.cpp
- * @author Alex Leverington <nessence@gmail.com>
- * @author Gav Wood <i@gavwood.com>
- * @date 2014
- */
- #include <set>
- #include <chrono>
- #include <thread>
- #include <mutex>
- #include <memory>
- #include <boost/algorithm/string.hpp>
- #include <libdevcore/Common.h>
- #include <libdevcore/Assertions.h>
- #include <libdevcore/CommonIO.h>
- #include <libdevcore/Exceptions.h>
- #include <libdevcore/FileSystem.h>
- #include "Session.h"
- #include "Common.h"
- #include "Capability.h"
- #include "UPnP.h"
- #include "RLPxHandshake.h"
- #include "Host.h"
- using namespace std;
- using namespace dev;
- using namespace dev::p2p;
- /// Interval at which Host::run will call keepAlivePeers to ping peers.
- std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30);
- /// Disconnect timeout after failure to respond to keepAlivePeers ping.
- std::chrono::milliseconds const c_keepAliveTimeOut = std::chrono::milliseconds(1000);
- HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
- void HostNodeTableHandler::processEvent(NodeID const& _n, NodeTableEventType const& _e)
- {
- m_host.onNodeTableEvent(_n, _e);
- }
- ReputationManager::ReputationManager()
- {
- }
- void ReputationManager::noteRude(Session const& _s, std::string const& _sub)
- {
- DEV_WRITE_GUARDED(x_nodes)
- m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].isRude = true;
- }
- bool ReputationManager::isRude(Session const& _s, std::string const& _sub) const
- {
- DEV_READ_GUARDED(x_nodes)
- {
- auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
- if (nit == m_nodes.end())
- return false;
- auto sit = nit->second.subs.find(_sub);
- bool ret = sit == nit->second.subs.end() ? false : sit->second.isRude;
- return _sub.empty() ? ret : (ret || isRude(_s));
- }
- return false;
- }
- void ReputationManager::setData(Session const& _s, std::string const& _sub, bytes const& _data)
- {
- DEV_WRITE_GUARDED(x_nodes)
- m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].data = _data;
- }
- bytes ReputationManager::data(Session const& _s, std::string const& _sub) const
- {
- DEV_READ_GUARDED(x_nodes)
- {
- auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
- if (nit == m_nodes.end())
- return bytes();
- auto sit = nit->second.subs.find(_sub);
- return sit == nit->second.subs.end() ? bytes() : sit->second.data;
- }
- return bytes();
- }
- Host::Host(string const& _clientVersion, KeyPair const& _alias, NetworkPreferences const& _n):
- Worker("p2p", 0),
- m_clientVersion(_clientVersion),
- m_netPrefs(_n),
- m_ifAddresses(Network::getInterfaceAddresses()),
- m_ioService(2),
- m_tcp4Acceptor(m_ioService),
- m_alias(_alias),
- m_lastPing(chrono::steady_clock::time_point::min())
- {
- clog(NetNote) << "Id:" << id();
- }
- Host::Host(string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
- Host(_clientVersion, networkAlias(_restoreNetwork), _n)
- {
- m_restoreNetwork = _restoreNetwork.toBytes();
- }
- Host::~Host()
- {
- stop();
- }
- void Host::start()
- {
- DEV_TIMED_FUNCTION_ABOVE(500);
- startWorking();
- while (isWorking() && !haveNetwork())
- this_thread::sleep_for(chrono::milliseconds(10));
-
- // network start failed!
- if (isWorking())
- return;
- clog(NetWarn) << "Network start failed!";
- doneWorking();
- }
- void Host::stop()
- {
- // called to force io_service to kill any remaining tasks it might have -
- // such tasks may involve socket reads from Capabilities that maintain references
- // to resources we're about to free.
- {
- // Although m_run is set by stop() or start(), it effects m_runTimer so x_runTimer is used instead of a mutex for m_run.
- Guard l(x_runTimer);
- // ignore if already stopped/stopping
- if (!m_run)
- return;
-
- // signal run() to prepare for shutdown and reset m_timer
- m_run = false;
- }
- // wait for m_timer to reset (indicating network scheduler has stopped)
- while (!!m_timer)
- this_thread::sleep_for(chrono::milliseconds(50));
- // stop worker thread
- if (isWorking())
- stopWorking();
- }
- void Host::doneWorking()
- {
- // reset ioservice (cancels all timers and allows manually polling network, below)
- m_ioService.reset();
- DEV_GUARDED(x_timers)
- m_timers.clear();
-
- // shutdown acceptor
- m_tcp4Acceptor.cancel();
- if (m_tcp4Acceptor.is_open())
- m_tcp4Acceptor.close();
- // There maybe an incoming connection which started but hasn't finished.
- // Wait for acceptor to end itself instead of assuming it's complete.
- // This helps ensure a peer isn't stopped at the same time it's starting
- // and that socket for pending connection is closed.
- while (m_accepting)
- m_ioService.poll();
- // stop capabilities (eth: stops syncing or block/tx broadcast)
- for (auto const& h: m_capabilities)
- h.second->onStopping();
- // disconnect pending handshake, before peers, as a handshake may create a peer
- for (unsigned n = 0;; n = 0)
- {
- DEV_GUARDED(x_connecting)
- for (auto const& i: m_connecting)
- if (auto h = i.lock())
- {
- h->cancel();
- n++;
- }
- if (!n)
- break;
- m_ioService.poll();
- }
-
- // disconnect peers
- for (unsigned n = 0;; n = 0)
- {
- DEV_RECURSIVE_GUARDED(x_sessions)
- for (auto i: m_sessions)
- if (auto p = i.second.lock())
- if (p->isConnected())
- {
- p->disconnect(ClientQuit);
- n++;
- }
- if (!n)
- break;
- // poll so that peers send out disconnect packets
- m_ioService.poll();
- }
- // stop network (again; helpful to call before subsequent reset())
- m_ioService.stop();
- // reset network (allows reusing ioservice in future)
- m_ioService.reset();
- // finally, clear out peers (in case they're lingering)
- RecursiveGuard l(x_sessions);
- m_sessions.clear();
- }
- void Host::startPeerSession(Public const& _id, RLP const& _rlp, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s)
- {
- // session maybe ingress or egress so m_peers and node table entries may not exist
- shared_ptr<Peer> p;
- DEV_RECURSIVE_GUARDED(x_sessions)
- {
- if (m_peers.count(_id))
- p = m_peers[_id];
- else
- {
- // peer doesn't exist, try to get port info from node table
- if (m_nodeTable)
- if (Node n = m_nodeTable->node(_id))
- p = make_shared<Peer>(n);
- if (!p)
- p = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint));
- m_peers[_id] = p;
- }
- }
- if (p->isOffline())
- p->m_lastConnected = std::chrono::system_clock::now();
- p->endpoint.address = _s->remoteEndpoint().address();
- auto protocolVersion = _rlp[0].toInt<unsigned>();
- auto clientVersion = _rlp[1].toString();
- auto caps = _rlp[2].toVector<CapDesc>();
- auto listenPort = _rlp[3].toInt<unsigned short>();
- auto pub = _rlp[4].toHash<Public>();
- if (pub != _id)
- {
- cdebug << "Wrong ID: " << pub << " vs. " << _id;
- return;
- }
- // clang error (previously: ... << hex << caps ...)
- // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
- stringstream capslog;
- // leave only highset mutually supported capability version
- caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
- for (auto cap: caps)
- capslog << "(" << cap.first << "," << dec << cap.second << ")";
- clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
-
- // create session so disconnects are managed
- auto ps = make_shared<Session>(this, move(_io), _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
- if (protocolVersion < dev::p2p::c_protocolVersion - 1)
- {
- ps->disconnect(IncompatibleProtocol);
- return;
- }
- if (caps.empty())
- {
- ps->disconnect(UselessPeer);
- return;
- }
- if (m_netPrefs.pin && !m_requiredPeers.count(_id))
- {
- cdebug << "Unexpected identity from peer (got" << _id << ", must be one of " << m_requiredPeers << ")";
- ps->disconnect(UnexpectedIdentity);
- return;
- }
-
- {
- RecursiveGuard l(x_sessions);
- if (m_sessions.count(_id) && !!m_sessions[_id].lock())
- if (auto s = m_sessions[_id].lock())
- if(s->isConnected())
- {
- // Already connected.
- clog(NetWarn) << "Session already exists for peer with id" << _id;
- ps->disconnect(DuplicatePeer);
- return;
- }
-
- if (!peerSlotsAvailable())
- {
- ps->disconnect(TooManyPeers);
- return;
- }
- unsigned offset = (unsigned)UserPacket;
- uint16_t cnt = 1;
- // todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
- for (auto const& i: caps)
- {
- auto pcap = m_capabilities[i];
- if (!pcap)
- return ps->disconnect(IncompatibleProtocol);
- if (Session::isFramingAllowedForVersion(protocolVersion))
- pcap->newPeerCapability(ps, 0, i, cnt++);
- else
- {
- pcap->newPeerCapability(ps, offset, i, 0);
- offset += pcap->messageCount();
- }
- }
- ps->start();
- m_sessions[_id] = ps;
- }
-
- clog(NetP2PNote) << "p2p.host.peer.register" << _id;
- }
- void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
- {
- if (_e == NodeEntryAdded)
- {
- clog(NetP2PNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
- // only add iff node is in node table
- if (Node n = m_nodeTable->node(_n))
- {
- shared_ptr<Peer> p;
- DEV_RECURSIVE_GUARDED(x_sessions)
- {
- if (m_peers.count(_n))
- {
- p = m_peers[_n];
- p->endpoint = n.endpoint;
- }
- else
- {
- p = make_shared<Peer>(n);
- m_peers[_n] = p;
- clog(NetP2PNote) << "p2p.host.peers.events.peerAdded " << _n << p->endpoint;
- }
- }
- if (peerSlotsAvailable(Egress))
- connect(p);
- }
- }
- else if (_e == NodeEntryDropped)
- {
- clog(NetP2PNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
- RecursiveGuard l(x_sessions);
- if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
- m_peers.erase(_n);
- }
- }
- void Host::determinePublic()
- {
- // set m_tcpPublic := listenIP (if public) > public > upnp > unspecified address.
-
- auto ifAddresses = Network::getInterfaceAddresses();
- auto laddr = m_netPrefs.listenIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.listenIPAddress);
- auto lset = !laddr.is_unspecified();
- auto paddr = m_netPrefs.publicIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.publicIPAddress);
- auto pset = !paddr.is_unspecified();
-
- bool listenIsPublic = lset && isPublicAddress(laddr);
- bool publicIsHost = !lset && pset && ifAddresses.count(paddr);
-
- bi::tcp::endpoint ep(bi::address(), m_listenPort);
- if (m_netPrefs.traverseNAT && listenIsPublic)
- {
- clog(NetNote) << "Listen address set to Public address:" << laddr << ". UPnP disabled.";
- ep.address(laddr);
- }
- else if (m_netPrefs.traverseNAT && publicIsHost)
- {
- clog(NetNote) << "Public address set to Host configured address:" << paddr << ". UPnP disabled.";
- ep.address(paddr);
- }
- else if (m_netPrefs.traverseNAT)
- {
- bi::address natIFAddr;
- ep = Network::traverseNAT(lset && ifAddresses.count(laddr) ? std::set<bi::address>({laddr}) : ifAddresses, m_listenPort, natIFAddr);
-
- if (lset && natIFAddr != laddr)
- // if listen address is set, Host will use it, even if upnp returns different
- clog(NetWarn) << "Listen address" << laddr << "differs from local address" << natIFAddr << "returned by UPnP!";
-
- if (pset && ep.address() != paddr)
- {
- // if public address is set, Host will advertise it, even if upnp returns different
- clog(NetWarn) << "Specified public address" << paddr << "differs from external address" << ep.address() << "returned by UPnP!";
- ep.address(paddr);
- }
- }
- else if (pset)
- ep.address(paddr);
- m_tcpPublic = ep;
- }
- void Host::runAcceptor()
- {
- assert(m_listenPort > 0);
- if (m_run && !m_accepting)
- {
- clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
- m_accepting = true;
- auto socket = make_shared<RLPXSocket>(m_ioService);
- m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
- {
- m_accepting = false;
- if (ec || !m_run)
- {
- socket->close();
- return;
- }
- if (peerCount() > peerSlots(Ingress))
- {
- clog(NetConnect) << "Dropping incoming connect due to maximum peer count (" << Ingress << " * ideal peer count): " << socket->remoteEndpoint();
- socket->close();
- if (ec.value() < 1)
- runAcceptor();
- return;
- }
-
- bool success = false;
- try
- {
- // incoming connection; we don't yet know nodeid
- auto handshake = make_shared<RLPXHandshake>(this, socket);
- m_connecting.push_back(handshake);
- handshake->start();
- success = true;
- }
- catch (Exception const& _e)
- {
- clog(NetWarn) << "ERROR: " << diagnostic_information(_e);
- }
- catch (std::exception const& _e)
- {
- clog(NetWarn) << "ERROR: " << _e.what();
- }
- if (!success)
- socket->ref().close();
- runAcceptor();
- });
- }
- }
- std::unordered_map<Public, std::string> const& Host::pocHosts()
- {
- static const std::unordered_map<Public, std::string> c_ret = {
- { Public("5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a"), "gav.ethdev.com:30300" },
- { Public("e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc"), "gav.ethdev.com:30303" },
- { Public("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c"), "52.16.188.185:30303" },
- { Public("7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034"), "54.207.93.166:30303" },
- { Public("5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a"), "92.51.165.126:30303" },
- };
- return c_ret;
- }
- void Host::addPeer(NodeSpec const& _s, PeerType _t)
- {
- if (_t == PeerType::Optional)
- addNode(_s.id(), _s.nodeIPEndpoint());
- else
- requirePeer(_s.id(), _s.nodeIPEndpoint());
- }
- void Host::addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint)
- {
- // return if network is stopped while waiting on Host::run() or nodeTable to start
- while (!haveNetwork())
- if (isWorking())
- this_thread::sleep_for(chrono::milliseconds(50));
- else
- return;
- if (_endpoint.tcpPort < 30300 || _endpoint.tcpPort > 30305)
- clog(NetConnect) << "Non-standard port being recorded: " << _endpoint.tcpPort;
- if (m_nodeTable)
- m_nodeTable->addNode(Node(_node, _endpoint));
- }
- void Host::requirePeer(NodeID const& _n, NodeIPEndpoint const& _endpoint)
- {
- m_requiredPeers.insert(_n);
- if (!m_run)
- return;
-
- Node node(_n, _endpoint, PeerType::Required);
- if (_n)
- {
- // create or update m_peers entry
- shared_ptr<Peer> p;
- DEV_RECURSIVE_GUARDED(x_sessions)
- if (m_peers.count(_n))
- {
- p = m_peers[_n];
- p->endpoint = node.endpoint;
- p->peerType = PeerType::Required;
- }
- else
- {
- p = make_shared<Peer>(node);
- m_peers[_n] = p;
- }
- // required for discovery
- if (m_nodeTable)
- m_nodeTable->addNode(*p, NodeTable::NodeRelation::Unknown);
- }
- else if (m_nodeTable)
- {
- m_nodeTable->addNode(node);
- auto t = make_shared<boost::asio::deadline_timer>(m_ioService);
- t->expires_from_now(boost::posix_time::milliseconds(600));
- t->async_wait([this, _n](boost::system::error_code const& _ec)
- {
- if (!_ec)
- if (m_nodeTable)
- if (auto n = m_nodeTable->node(_n))
- requirePeer(n.id, n.endpoint);
- });
- DEV_GUARDED(x_timers)
- m_timers.push_back(t);
- }
- }
- void Host::relinquishPeer(NodeID const& _node)
- {
- Guard l(x_requiredPeers);
- if (m_requiredPeers.count(_node))
- m_requiredPeers.erase(_node);
- }
- void Host::connect(std::shared_ptr<Peer> const& _p)
- {
- if (!m_run)
- return;
-
- if (havePeerSession(_p->id))
- {
- clog(NetConnect) << "Aborted connect. Node already connected.";
- return;
- }
- if (!!m_nodeTable && !m_nodeTable->haveNode(_p->id) && _p->peerType == PeerType::Optional)
- return;
- // prevent concurrently connecting to a node
- Peer *nptr = _p.get();
- {
- Guard l(x_pendingNodeConns);
- if (m_pendingPeerConns.count(nptr))
- return;
- m_pendingPeerConns.insert(nptr);
- }
- _p->m_lastAttempted = std::chrono::system_clock::now();
-
- bi::tcp::endpoint ep(_p->endpoint);
- clog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id();
- auto socket = make_shared<RLPXSocket>(m_ioService);
- socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
- {
- _p->m_lastAttempted = std::chrono::system_clock::now();
- _p->m_failedAttempts++;
-
- if (ec)
- {
- clog(NetConnect) << "Connection refused to node" << _p->id << "@" << ep << "(" << ec.message() << ")";
- // Manually set error (session not present)
- _p->m_lastDisconnect = TCPError;
- }
- else
- {
- clog(NetConnect) << "Connecting to" << _p->id << "@" << ep;
- auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
- {
- Guard l(x_connecting);
- m_connecting.push_back(handshake);
- }
- handshake->start();
- }
-
- Guard l(x_pendingNodeConns);
- m_pendingPeerConns.erase(nptr);
- });
- }
- PeerSessionInfos Host::peerSessionInfo() const
- {
- if (!m_run)
- return PeerSessionInfos();
- std::vector<PeerSessionInfo> ret;
- RecursiveGuard l(x_sessions);
- for (auto& i: m_sessions)
- if (auto j = i.second.lock())
- if (j->isConnected())
- DEV_GUARDED(j->x_info)
- ret.push_back(j->m_info);
- return ret;
- }
- size_t Host::peerCount() const
- {
- unsigned retCount = 0;
- RecursiveGuard l(x_sessions);
- for (auto& i: m_sessions)
- if (std::shared_ptr<Session> j = i.second.lock())
- if (j->isConnected())
- retCount++;
- return retCount;
- }
- void Host::run(boost::system::error_code const&)
- {
- if (!m_run)
- {
- // reset NodeTable
- m_nodeTable.reset();
- // stopping io service allows running manual network operations for shutdown
- // and also stops blocking worker thread, allowing worker thread to exit
- m_ioService.stop();
- // resetting timer signals network that nothing else can be scheduled to run
- m_timer.reset();
- return;
- }
- m_nodeTable->processEvents();
- // cleanup zombies
- DEV_GUARDED(x_connecting)
- m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); });
- DEV_GUARDED(x_timers)
- m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
- {
- return t->expires_from_now().total_milliseconds() < 0;
- });
- keepAlivePeers();
-
- // At this time peers will be disconnected based on natural TCP timeout.
- // disconnectLatePeers needs to be updated for the assumption that Session
- // is always live and to ensure reputation and fallback timers are properly
- // updated. // disconnectLatePeers();
- // todo: update peerSlotsAvailable()
-
- list<shared_ptr<Peer>> toConnect;
- unsigned reqConn = 0;
- {
- RecursiveGuard l(x_sessions);
- for (auto const& p: m_peers)
- {
- bool haveSession = havePeerSession(p.second->id);
- bool required = p.second->peerType == PeerType::Required;
- if (haveSession && required)
- reqConn++;
- else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
- toConnect.push_back(p.second);
- }
- }
- for (auto p: toConnect)
- if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
- connect(p);
-
- if (!m_netPrefs.pin)
- {
- unsigned pendingCount = 0;
- DEV_GUARDED(x_pendingNodeConns)
- pendingCount = m_pendingPeerConns.size();
- int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn;
- if (openSlots > 0)
- for (auto p: toConnect)
- if (p->peerType == PeerType::Optional && openSlots--)
- connect(p);
- }
- auto runcb = [this](boost::system::error_code const& error) { run(error); };
- m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
- m_timer->async_wait(runcb);
- }
- void Host::startedWorking()
- {
- asserts(!m_timer);
- {
- // prevent m_run from being set to true at same time as set to false by stop()
- // don't release mutex until m_timer is set so in case stop() is called at same
- // time, stop will wait on m_timer and graceful network shutdown.
- Guard l(x_runTimer);
- // create deadline timer
- m_timer.reset(new boost::asio::deadline_timer(m_ioService));
- m_run = true;
- }
- // start capability threads (ready for incoming connections)
- for (auto const& h: m_capabilities)
- h.second->onStarting();
-
- // try to open acceptor (todo: ipv6)
- int port = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs);
- if (port > 0)
- {
- m_listenPort = port;
- determinePublic();
- runAcceptor();
- }
- else
- clog(NetP2PNote) << "p2p.start.notice id:" << id() << "TCP Listen port is invalid or unavailable.";
- auto nodeTable = make_shared<NodeTable>(
- m_ioService,
- m_alias,
- NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
- m_netPrefs.discovery
- );
- nodeTable->setEventHandler(new HostNodeTableHandler(*this));
- m_nodeTable = nodeTable;
- restoreNetwork(&m_restoreNetwork);
- clog(NetP2PNote) << "p2p.started id:" << id();
- run(boost::system::error_code());
- }
- void Host::doWork()
- {
- try
- {
- if (m_run)
- m_ioService.run();
- }
- catch (std::exception const& _e)
- {
- clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
- clog(NetP2PWarn) << "Network Restart is Recommended.";
- }
- }
- void Host::keepAlivePeers()
- {
- if (chrono::steady_clock::now() - c_keepAliveInterval < m_lastPing)
- return;
- RecursiveGuard l(x_sessions);
- for (auto it = m_sessions.begin(); it != m_sessions.end();)
- if (auto p = it->second.lock())
- {
- p->ping();
- ++it;
- }
- else
- it = m_sessions.erase(it);
- m_lastPing = chrono::steady_clock::now();
- }
- void Host::disconnectLatePeers()
- {
- auto now = chrono::steady_clock::now();
- if (now - c_keepAliveTimeOut < m_lastPing)
- return;
- RecursiveGuard l(x_sessions);
- for (auto p: m_sessions)
- if (auto pp = p.second.lock())
- if (now - c_keepAliveTimeOut > m_lastPing && pp->m_lastReceived < m_lastPing)
- pp->disconnect(PingTimeout);
- }
- bytes Host::saveNetwork() const
- {
- std::list<Peer> peers;
- {
- RecursiveGuard l(x_sessions);
- for (auto p: m_peers)
- if (p.second)
- peers.push_back(*p.second);
- }
- peers.sort();
- RLPStream network;
- int count = 0;
- for (auto const& p: peers)
- {
- // todo: ipv6
- if (!p.endpoint.address.is_v4())
- continue;
- // Only save peers which have connected within 2 days, with properly-advertised port and public IP address
- if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.peerType == PeerType::Required || p.endpoint.isAllowed()))
- {
- network.appendList(11);
- p.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
- network << p.id << (p.peerType == PeerType::Required ? true : false)
- << chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
- << chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
- << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
- count++;
- }
- }
- if (!!m_nodeTable)
- {
- auto state = m_nodeTable->snapshot();
- state.sort();
- for (auto const& entry: state)
- {
- network.appendList(4);
- entry.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
- network << entry.id;
- count++;
- }
- }
- // else: TODO: use previous configuration if available
- RLPStream ret(3);
- ret << dev::p2p::c_protocolVersion << m_alias.secret().ref();
- ret.appendList(count);
- if (!!count)
- ret.appendRaw(network.out(), count);
- return ret.out();
- }
- void Host::restoreNetwork(bytesConstRef _b)
- {
- if (!_b.size())
- return;
-
- // nodes can only be added if network is added
- if (!isStarted())
- BOOST_THROW_EXCEPTION(NetworkStartRequired());
- if (m_dropPeers)
- return;
-
- RecursiveGuard l(x_sessions);
- RLP r(_b);
- unsigned fileVersion = r[0].toInt<unsigned>();
- if (r.itemCount() > 0 && r[0].isInt() && fileVersion >= dev::p2p::c_protocolVersion - 1)
- {
- // r[0] = version
- // r[1] = key
- // r[2] = nodes
- for (auto i: r[2])
- {
- // todo: ipv6
- if (i[0].itemCount() != 4 && i[0].size() != 4)
- continue;
- if (i.itemCount() == 4 || i.itemCount() == 11)
- {
- Node n((NodeID)i[3], NodeIPEndpoint(i));
- if (i.itemCount() == 4 && n.endpoint.isAllowed())
- m_nodeTable->addNode(n);
- else if (i.itemCount() == 11)
- {
- n.peerType = i[4].toInt<bool>() ? PeerType::Required : PeerType::Optional;
- if (!n.endpoint.isAllowed() && n.peerType == PeerType::Optional)
- continue;
- shared_ptr<Peer> p = make_shared<Peer>(n);
- p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
- p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[6].toInt<unsigned>()));
- p->m_failedAttempts = i[7].toInt<unsigned>();
- p->m_lastDisconnect = (DisconnectReason)i[8].toInt<unsigned>();
- p->m_score = (int)i[9].toInt<unsigned>();
- p->m_rating = (int)i[10].toInt<unsigned>();
- m_peers[p->id] = p;
- if (p->peerType == PeerType::Required)
- requirePeer(p->id, n.endpoint);
- else
- m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
- }
- }
- else if (i.itemCount() == 3 || i.itemCount() == 10)
- {
- Node n((NodeID)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
- if (i.itemCount() == 3 && n.endpoint.isAllowed())
- m_nodeTable->addNode(n);
- else if (i.itemCount() == 10)
- {
- n.peerType = i[3].toInt<bool>() ? PeerType::Required : PeerType::Optional;
- if (!n.endpoint.isAllowed() && n.peerType == PeerType::Optional)
- continue;
- shared_ptr<Peer> p = make_shared<Peer>(n);
- p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
- p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
- p->m_failedAttempts = i[6].toInt<unsigned>();
- p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
- p->m_score = (int)i[8].toInt<unsigned>();
- p->m_rating = (int)i[9].toInt<unsigned>();
- m_peers[p->id] = p;
- if (p->peerType == PeerType::Required)
- requirePeer(p->id, n.endpoint);
- else
- m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
- }
- }
- }
- }
- }
- KeyPair Host::networkAlias(bytesConstRef _b)
- {
- RLP r(_b);
- if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<unsigned>() >= 3)
- return KeyPair(Secret(r[1].toBytes()));
- else
- return KeyPair::create();
- }
|