123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- /*
- This file is part of cpp-ethereum.
- cpp-ethereum is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- cpp-ethereum is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
- */
- /** @file EthereumPeer.cpp
- * @author Gav Wood <i@gavwood.com>
- * @date 2014
- */
- #include "EthereumPeer.h"
- #include <chrono>
- #include <libdevcore/Common.h>
- #include <libethcore/Exceptions.h>
- #include <libp2p/Session.h>
- #include <libp2p/Host.h>
- #include "BlockChain.h"
- #include "EthereumHost.h"
- #include "TransactionQueue.h"
- #include "BlockQueue.h"
- #include "BlockChainSync.h"
- using namespace std;
- using namespace dev;
- using namespace dev::eth;
- using namespace p2p;
- static const unsigned c_maxIncomingNewHashes = 1024;
- static const unsigned c_maxHeadersToSend = 1024;
- static string toString(Asking _a)
- {
- switch (_a)
- {
- case Asking::BlockHeaders: return "BlockHeaders";
- case Asking::BlockBodies: return "BlockBodies";
- case Asking::NodeData: return "NodeData";
- case Asking::Receipts: return "Receipts";
- case Asking::Nothing: return "Nothing";
- case Asking::State: return "State";
- }
- return "?";
- }
- EthereumPeer::EthereumPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap, uint16_t _capID):
- Capability(_s, _h, _i, _capID),
- m_peerCapabilityVersion(_cap.second)
- {
- session()->addNote("manners", isRude() ? "RUDE" : "nice");
- m_syncHashNumber = host()->chain().number() + 1;
- requestStatus();
- }
- EthereumPeer::~EthereumPeer()
- {
- if (m_asking != Asking::Nothing)
- {
- clog(NetAllDetail) << "Peer aborting while being asked for " << ::toString(m_asking);
- setRude();
- }
- abortSync();
- }
- bool EthereumPeer::isRude() const
- {
- auto s = session();
- if (s)
- return repMan().isRude(*s, name());
- return false;
- }
- unsigned EthereumPeer::askOverride() const
- {
- std::string static const badGeth = "Geth/v0.9.27";
- auto s = session();
- if (!s)
- return c_maxBlocksAsk;
- if (s->info().clientVersion.substr(0, badGeth.size()) == badGeth)
- return 1;
- bytes const& d = repMan().data(*s, name());
- return d.empty() ? c_maxBlocksAsk : RLP(d).toInt<unsigned>(RLP::LaissezFaire);
- }
- void EthereumPeer::setRude()
- {
- auto s = session();
- if (!s)
- return;
- auto old = askOverride();
- repMan().setData(*s, name(), rlp(askOverride() / 2 + 1));
- cnote << "Rude behaviour; askOverride now" << askOverride() << ", was" << old;
- repMan().noteRude(*s, name());
- session()->addNote("manners", "RUDE");
- }
- void EthereumPeer::abortSync()
- {
- host()->onPeerAborting();
- }
- EthereumHost* EthereumPeer::host() const
- {
- return static_cast<EthereumHost*>(Capability::hostCapability());
- }
- /*
- * Possible asking/syncing states for two peers:
- */
- void EthereumPeer::setIdle()
- {
- setAsking(Asking::Nothing);
- }
- void EthereumPeer::requestStatus()
- {
- assert(m_asking == Asking::Nothing);
- setAsking(Asking::State);
- m_requireTransactions = true;
- RLPStream s;
- bool latest = m_peerCapabilityVersion == host()->protocolVersion();
- prep(s, StatusPacket, 5)
- << (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion)
- << host()->networkId()
- << host()->chain().details().totalDifficulty
- << host()->chain().currentHash()
- << host()->chain().genesisHash();
- sealAndSend(s);
- }
- void EthereumPeer::requestBlockHeaders(unsigned _startNumber, unsigned _count, unsigned _skip, bool _reverse)
- {
- if (m_asking != Asking::Nothing)
- {
- clog(NetWarn) << "Asking headers while requesting " << ::toString(m_asking);
- }
- setAsking(Asking::BlockHeaders);
- RLPStream s;
- prep(s, GetBlockHeadersPacket, 4) << _startNumber << _count << _skip << (_reverse ? 1 : 0);
- clog(NetMessageDetail) << "Requesting " << _count << " block headers starting from " << _startNumber << (_reverse ? " in reverse" : "");
- m_syncHashNumber = _startNumber;
- m_lastAskedHeaders = _count;
- sealAndSend(s);
- }
- void EthereumPeer::requestBlockHeaders(h256 const& _startHash, unsigned _count, unsigned _skip, bool _reverse)
- {
- if (m_asking != Asking::Nothing)
- {
- clog(NetWarn) << "Asking headers while requesting " << ::toString(m_asking);
- }
- setAsking(Asking::BlockHeaders);
- RLPStream s;
- prep(s, GetBlockHeadersPacket, 4) << _startHash << _count << _skip << (_reverse ? 1 : 0);
- clog(NetMessageDetail) << "Requesting " << _count << " block headers starting from " << _startHash << (_reverse ? " in reverse" : "");
- m_syncHash = _startHash;
- m_lastAskedHeaders = _count;
- sealAndSend(s);
- }
- void EthereumPeer::requestBlockBodies(h256s const& _blocks)
- {
- if (m_asking != Asking::Nothing)
- {
- clog(NetWarn) << "Asking headers while requesting " << ::toString(m_asking);
- }
- setAsking(Asking::BlockBodies);
- if (_blocks.size())
- {
- RLPStream s;
- prep(s, GetBlockBodiesPacket, _blocks.size());
- for (auto const& i: _blocks)
- s << i;
- sealAndSend(s);
- }
- else
- setIdle();
- }
- void EthereumPeer::setAsking(Asking _a)
- {
- m_asking = _a;
- m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
- auto s = session();
- if (s)
- {
- s->addNote("ask", ::toString(_a));
- s->addNote("sync", string(isCriticalSyncing() ? "ONGOING" : "holding") + (needsSyncing() ? " & needed" : ""));
- }
- }
- void EthereumPeer::tick()
- {
- auto s = session();
- time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
- if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing))
- // timeout
- s->disconnect(PingTimeout);
- }
- bool EthereumPeer::isConversing() const
- {
- return m_asking != Asking::Nothing;
- }
- bool EthereumPeer::isCriticalSyncing() const
- {
- return m_asking == Asking::BlockHeaders || m_asking == Asking::State || (m_asking == Asking::BlockBodies && m_protocolVersion == 62);
- }
- bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
- {
- m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
- try
- {
- switch (_id)
- {
- case StatusPacket:
- {
- m_protocolVersion = _r[0].toInt<unsigned>();
- m_networkId = _r[1].toInt<u256>();
- m_totalDifficulty = _r[2].toInt<u256>();
- m_latestHash = _r[3].toHash<h256>();
- m_genesisHash = _r[4].toHash<h256>();
- if (m_peerCapabilityVersion == host()->protocolVersion())
- m_protocolVersion = host()->protocolVersion();
- clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
- setIdle();
- host()->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
- break;
- }
- case TransactionsPacket:
- {
- host()->onPeerTransactions(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())), _r);
- break;
- }
- case GetBlockHeadersPacket:
- {
- /// Packet layout:
- /// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
- const auto blockId = _r[0];
- const auto maxHeaders = _r[1].toInt<u256>();
- const auto skip = _r[2].toInt<u256>();
- const auto reverse = _r[3].toInt<bool>();
- auto& bc = host()->chain();
- auto numHeadersToSend = maxHeaders <= c_maxHeadersToSend ? static_cast<unsigned>(maxHeaders) : c_maxHeadersToSend;
- if (skip > std::numeric_limits<unsigned>::max() - 1)
- {
- clog(NetAllDetail) << "Requested block skip is too big: " << skip;
- break;
- }
- auto step = static_cast<unsigned>(skip) + 1;
- assert(step > 0 && "step must not be 0");
- h256 blockHash;
- if (blockId.size() == 32) // block id is a hash
- {
- blockHash = blockId.toHash<h256>();
- //blockNumber = host()->chain().number(blockHash);
- clog(NetMessageSummary) << "GetBlockHeaders (block (hash): " << blockHash
- << ", maxHeaders: " << maxHeaders
- << ", skip: " << skip << ", reverse: " << reverse << ")";
- if (!reverse)
- {
- auto n = bc.number(blockHash);
- if (numHeadersToSend == 0)
- blockHash = {};
- else if (n != 0 || blockHash == bc.genesisHash())
- {
- auto top = n + uint64_t(step) * numHeadersToSend - 1;
- auto lastBlock = bc.number();
- if (top > lastBlock)
- {
- numHeadersToSend = (lastBlock - n) / step + 1;
- top = n + step * (numHeadersToSend - 1);
- }
- assert(top <= lastBlock && "invalid top block calculated");
- blockHash = bc.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
- }
- else
- blockHash = {};
- }
- else if (!bc.isKnown(blockHash))
- blockHash = {};
- }
- else // block id is a number
- {
- auto n = blockId.toInt<bigint>();
- clog(NetMessageSummary) << "GetBlockHeaders (" << n
- << "max: " << maxHeaders
- << "skip: " << skip << (reverse ? "reverse" : "") << ")";
- if (!reverse)
- {
- auto lastBlock = bc.number();
- if (n > lastBlock || numHeadersToSend == 0)
- blockHash = {};
- else
- {
- bigint top = n + uint64_t(step) * (numHeadersToSend - 1);
- if (top > lastBlock)
- {
- numHeadersToSend = (lastBlock - static_cast<unsigned>(n)) / step + 1;
- top = n + step * (numHeadersToSend - 1);
- }
- assert(top <= lastBlock && "invalid top block calculated");
- blockHash = bc.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
- }
- }
- else if (n <= std::numeric_limits<unsigned>::max())
- blockHash = bc.numberHash(static_cast<unsigned>(n));
- else
- blockHash = {};
- }
- auto nextHash = [&bc](h256 _h, unsigned _step)
- {
- static const unsigned c_blockNumberUsageLimit = 1000;
- const auto lastBlock = bc.number();
- const auto limitBlock = lastBlock > c_blockNumberUsageLimit ? lastBlock - c_blockNumberUsageLimit : 0; // find the number of the block below which we don't expect BC changes.
- while (_step) // parent hash traversal
- {
- auto details = bc.details(_h);
- if (details.number < limitBlock)
- break; // stop using parent hash traversal, fallback to using block numbers
- _h = details.parent;
- --_step;
- }
- if (_step) // still need lower block
- {
- auto n = bc.number(_h);
- if (n >= _step)
- _h = bc.numberHash(n - _step);
- else
- _h = {};
- }
- return _h;
- };
- bytes rlp;
- unsigned itemCount = 0;
- vector<h256> hashes;
- for (unsigned i = 0; i != numHeadersToSend; ++i)
- {
- if (!blockHash || !bc.isKnown(blockHash))
- break;
- hashes.push_back(blockHash);
- ++itemCount;
- blockHash = nextHash(blockHash, step);
- }
- for (unsigned i = 0; i < hashes.size() && rlp.size() < c_maxPayload; ++i)
- rlp += bc.headerData(hashes[reverse ? i : hashes.size() - 1 - i]);
- RLPStream s;
- prep(s, BlockHeadersPacket, itemCount).appendRaw(rlp, itemCount);
- sealAndSend(s);
- addRating(0);
- break;
- }
- case BlockHeadersPacket:
- {
- if (m_asking != Asking::BlockHeaders)
- clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
- else
- {
- setIdle();
- host()->onPeerBlockHeaders(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
- }
- break;
- }
- case GetBlockBodiesPacket:
- {
- unsigned count = static_cast<unsigned>(_r.itemCount());
- clog(NetMessageSummary) << "GetBlockBodies (" << dec << count << "entries)";
- if (!count)
- {
- clog(NetImpolite) << "Zero-entry GetBlockBodies: Not replying.";
- addRating(-10);
- break;
- }
- // return the requested blocks.
- bytes rlp;
- unsigned n = 0;
- auto numBodiesToSend = std::min(count, c_maxBlocks);
- for (unsigned i = 0; i < numBodiesToSend && rlp.size() < c_maxPayload; ++i)
- {
- auto h = _r[i].toHash<h256>();
- if (host()->chain().isKnown(h))
- {
- bytes blockBytes = host()->chain().block(h);
- RLP block{blockBytes};
- RLPStream body;
- body.appendList(2);
- body.appendRaw(block[1].data()); // transactions
- body.appendRaw(block[2].data()); // uncles
- auto bodyBytes = body.out();
- rlp.insert(rlp.end(), bodyBytes.begin(), bodyBytes.end());
- ++n;
- }
- }
- if (count > 20 && n == 0)
- clog(NetWarn) << "all" << count << "unknown blocks requested; peer on different chain?";
- else
- clog(NetMessageSummary) << n << "blocks known and returned;" << (numBodiesToSend - n) << "blocks unknown;" << (count > c_maxBlocks ? count - c_maxBlocks : 0) << "blocks ignored";
- addRating(0);
- RLPStream s;
- prep(s, BlockBodiesPacket, n).appendRaw(rlp, n);
- sealAndSend(s);
- break;
- }
- case BlockBodiesPacket:
- {
- if (m_asking != Asking::BlockBodies)
- clog(NetImpolite) << "Peer giving us block bodies when we didn't ask for them.";
- else
- {
- setIdle();
- host()->onPeerBlockBodies(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
- }
- break;
- }
- case NewBlockPacket:
- {
- host()->onPeerNewBlock(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
- break;
- }
- case NewBlockHashesPacket:
- {
- unsigned itemCount = _r.itemCount();
- clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes");
- if (itemCount > c_maxIncomingNewHashes)
- {
- disable("Too many new hashes");
- break;
- }
- vector<pair<h256, u256>> hashes(itemCount);
- for (unsigned i = 0; i < itemCount; ++i)
- hashes[i] = std::make_pair(_r[i][0].toHash<h256>(), _r[i][1].toInt<u256>());
- host()->onPeerNewHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
- break;
- }
- case GetNodeDataPacket:
- {
- unsigned count = static_cast<unsigned>(_r.itemCount());
- if (!count)
- {
- clog(NetImpolite) << "Zero-entry GetNodeData: Not replying.";
- addRating(-10);
- break;
- }
- clog(NetMessageSummary) << "GetNodeData (" << dec << count << " entries)";
- // return the requested nodes.
- strings data;
- unsigned n = 0;
- size_t payloadSize = 0;
- auto numItemsToSend = std::min(count, c_maxNodes);
- for (unsigned i = 0; i < numItemsToSend && payloadSize < c_maxPayload; ++i)
- {
- auto h = _r[i].toHash<h256>();
- auto node = host()->db().lookup(h);
- if (!node.empty())
- {
- payloadSize += node.length();
- data.push_back(move(node));
- ++n;
- }
- }
- clog(NetMessageSummary) << n << " nodes known and returned;" << (numItemsToSend - n) << " unknown;" << (count > c_maxNodes ? count - c_maxNodes : 0) << " ignored";
- addRating(0);
- RLPStream s;
- prep(s, NodeDataPacket, n);
- for (auto const& element: data)
- s.append(element);
- sealAndSend(s);
- break;
- }
- case GetReceiptsPacket:
- {
- unsigned count = static_cast<unsigned>(_r.itemCount());
- if (!count)
- {
- clog(NetImpolite) << "Zero-entry GetReceipts: Not replying.";
- addRating(-10);
- break;
- }
- clog(NetMessageSummary) << "GetReceipts (" << dec << count << " entries)";
- // return the requested receipts.
- bytes rlp;
- unsigned n = 0;
- auto numItemsToSend = std::min(count, c_maxReceipts);
- for (unsigned i = 0; i < numItemsToSend && rlp.size() < c_maxPayload; ++i)
- {
- auto h = _r[i].toHash<h256>();
- if (host()->chain().isKnown(h))
- {
- auto const receipts = host()->chain().receipts(h);
- auto receiptsRlpList = receipts.rlp();
- rlp.insert(rlp.end(), receiptsRlpList.begin(), receiptsRlpList.end());
- ++n;
- }
- }
- clog(NetMessageSummary) << n << " receipt lists known and returned;" << (numItemsToSend - n) << " unknown;" << (count > c_maxReceipts ? count - c_maxReceipts : 0) << " ignored";
- addRating(0);
- RLPStream s;
- prep(s, ReceiptsPacket, n).appendRaw(rlp, n);
- sealAndSend(s);
- break;
- }
- default:
- return false;
- }
- }
- catch (Exception const&)
- {
- clog(NetWarn) << "Peer causing an Exception:" << boost::current_exception_diagnostic_information() << _r;
- }
- catch (std::exception const& _e)
- {
- clog(NetWarn) << "Peer causing an exception:" << _e.what() << _r;
- }
- return true;
- }
|