EthereumHost.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file EthereumHost.cpp
  15. * @author Gav Wood <i@gavwood.com>
  16. * @date 2014
  17. */
  18. #include "EthereumHost.h"
  19. #include <chrono>
  20. #include <thread>
  21. #include <libdevcore/Common.h>
  22. #include <libp2p/Host.h>
  23. #include <libp2p/Session.h>
  24. #include <libethcore/Exceptions.h>
  25. #include "BlockChain.h"
  26. #include "TransactionQueue.h"
  27. #include "BlockQueue.h"
  28. #include "EthereumPeer.h"
  29. #include "BlockChainSync.h"
  30. using namespace std;
  31. using namespace dev;
  32. using namespace dev::eth;
  33. using namespace p2p;
  34. unsigned const EthereumHost::c_oldProtocolVersion = 62; //TODO: remove this once v63+ is common
  35. static unsigned const c_maxSendTransactions = 256;
  36. char const* const EthereumHost::s_stateNames[static_cast<int>(SyncState::Size)] = {"NotSynced", "Idle", "Waiting", "Blocks", "State", "NewBlocks" };
  37. #if defined(_WIN32)
  38. const char* EthereumHostTrace::name() { return EthPurple "^" EthGray " "; }
  39. #else
  40. const char* EthereumHostTrace::name() { return EthPurple "⧫" EthGray " "; }
  41. #endif
  42. EthereumHost::EthereumHost(BlockChain const& _ch, OverlayDB const& _db, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
  43. HostCapability<EthereumPeer>(),
  44. Worker ("ethsync"),
  45. m_chain (_ch),
  46. m_db(_db),
  47. m_tq (_tq),
  48. m_bq (_bq),
  49. m_networkId (_networkId)
  50. {
  51. // TODO: Composition would be better. Left like that to avoid initialization
  52. // issues as BlockChainSync accesses other EthereumHost members.
  53. m_sync.reset(new BlockChainSync(*this));
  54. m_latestBlockSent = _ch.currentHash();
  55. m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
  56. }
  57. EthereumHost::~EthereumHost()
  58. {
  59. }
  60. bool EthereumHost::ensureInitialised()
  61. {
  62. if (!m_latestBlockSent)
  63. {
  64. // First time - just initialise.
  65. m_latestBlockSent = m_chain.currentHash();
  66. clog(EthereumHostTrace) << "Initialising: latest=" << m_latestBlockSent;
  67. Guard l(x_transactions);
  68. m_transactionsSent = m_tq.knownTransactions();
  69. return true;
  70. }
  71. return false;
  72. }
  73. void EthereumHost::reset()
  74. {
  75. RecursiveGuard l(x_sync);
  76. m_sync->abortSync();
  77. m_latestBlockSent = h256();
  78. Guard tl(x_transactions);
  79. m_transactionsSent.clear();
  80. }
  81. void EthereumHost::doWork()
  82. {
  83. bool netChange = ensureInitialised();
  84. auto h = m_chain.currentHash();
  85. // If we've finished our initial sync (including getting all the blocks into the chain so as to reduce invalid transactions), start trading transactions & blocks
  86. if (!isSyncing() && m_chain.isKnown(m_latestBlockSent))
  87. {
  88. if (m_newTransactions)
  89. {
  90. m_newTransactions = false;
  91. maintainTransactions();
  92. }
  93. if (m_newBlocks)
  94. {
  95. m_newBlocks = false;
  96. maintainBlocks(h);
  97. }
  98. }
  99. time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
  100. if (now - m_lastTick >= 1)
  101. {
  102. m_lastTick = now;
  103. foreachPeer([](std::shared_ptr<EthereumPeer> _p) { _p->tick(); return true; });
  104. }
  105. // return netChange;
  106. // TODO: Figure out what to do with netChange.
  107. (void)netChange;
  108. }
  109. void EthereumHost::maintainTransactions()
  110. {
  111. // Send any new transactions.
  112. unordered_map<std::shared_ptr<EthereumPeer>, std::vector<size_t>> peerTransactions;
  113. auto ts = m_tq.topTransactions(c_maxSendTransactions);
  114. {
  115. Guard l(x_transactions);
  116. for (size_t i = 0; i < ts.size(); ++i)
  117. {
  118. auto const& t = ts[i];
  119. bool unsent = !m_transactionsSent.count(t.sha3());
  120. auto peers = get<1>(randomSelection(0, [&](EthereumPeer* p) { return p->m_requireTransactions || (unsent && !p->m_knownTransactions.count(t.sha3())); }));
  121. for (auto const& p: peers)
  122. peerTransactions[p].push_back(i);
  123. }
  124. for (auto const& t: ts)
  125. m_transactionsSent.insert(t.sha3());
  126. }
  127. foreachPeer([&](shared_ptr<EthereumPeer> _p)
  128. {
  129. bytes b;
  130. unsigned n = 0;
  131. for (auto const& i: peerTransactions[_p])
  132. {
  133. _p->m_knownTransactions.insert(ts[i].sha3());
  134. b += ts[i].rlp();
  135. ++n;
  136. }
  137. _p->clearKnownTransactions();
  138. if (n || _p->m_requireTransactions)
  139. {
  140. RLPStream ts;
  141. _p->prep(ts, TransactionsPacket, n).appendRaw(b, n);
  142. _p->sealAndSend(ts);
  143. clog(EthereumHostTrace) << "Sent" << n << "transactions to " << _p->session()->info().clientVersion;
  144. }
  145. _p->m_requireTransactions = false;
  146. return true;
  147. });
  148. }
  149. void EthereumHost::foreachPeer(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
  150. {
  151. //order peers by protocol, rating, connection age
  152. auto sessions = peerSessions();
  153. auto sessionLess = [](std::pair<std::shared_ptr<Session>, std::shared_ptr<Peer>> const& _left, std::pair<std::shared_ptr<Session>, std::shared_ptr<Peer>> const& _right)
  154. { return _left.first->rating() == _right.first->rating() ? _left.first->connectionTime() < _right.first->connectionTime() : _left.first->rating() > _right.first->rating(); };
  155. std::sort(sessions.begin(), sessions.end(), sessionLess);
  156. for (auto s: sessions)
  157. if (!_f(s.first->cap<EthereumPeer>()))
  158. return;
  159. sessions = peerSessions(c_oldProtocolVersion); //TODO: remove once v61+ is common
  160. std::sort(sessions.begin(), sessions.end(), sessionLess);
  161. for (auto s: sessions)
  162. if (!_f(s.first->cap<EthereumPeer>(c_oldProtocolVersion)))
  163. return;
  164. }
  165. tuple<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<Session>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow)
  166. {
  167. vector<shared_ptr<EthereumPeer>> chosen;
  168. vector<shared_ptr<EthereumPeer>> allowed;
  169. vector<shared_ptr<Session>> sessions;
  170. size_t peerCount = 0;
  171. foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
  172. {
  173. if (_allow(_p.get()))
  174. {
  175. allowed.push_back(_p);
  176. sessions.push_back(_p->session());
  177. }
  178. ++peerCount;
  179. return true;
  180. });
  181. size_t chosenSize = (peerCount * _percent + 99) / 100;
  182. chosen.reserve(chosenSize);
  183. for (unsigned i = chosenSize; i && allowed.size(); i--)
  184. {
  185. unsigned n = rand() % allowed.size();
  186. chosen.push_back(std::move(allowed[n]));
  187. allowed.erase(allowed.begin() + n);
  188. }
  189. return make_tuple(move(chosen), move(allowed), move(sessions));
  190. }
  191. void EthereumHost::maintainBlocks(h256 const& _currentHash)
  192. {
  193. // Send any new blocks.
  194. auto detailsFrom = m_chain.details(m_latestBlockSent);
  195. auto detailsTo = m_chain.details(_currentHash);
  196. if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
  197. {
  198. if (diff(detailsFrom.number, detailsTo.number) < 20)
  199. {
  200. // don't be sending more than 20 "new" blocks. if there are any more we were probably waaaay behind.
  201. clog(EthereumHostTrace) << "Sending a new block (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
  202. h256s blocks = get<0>(m_chain.treeRoute(m_latestBlockSent, _currentHash, false, false, true));
  203. auto s = randomSelection(25, [&](EthereumPeer* p){
  204. DEV_GUARDED(p->x_knownBlocks)
  205. return !p->m_knownBlocks.count(_currentHash);
  206. return false;
  207. });
  208. for (shared_ptr<EthereumPeer> const& p: get<0>(s))
  209. for (auto const& b: blocks)
  210. {
  211. RLPStream ts;
  212. p->prep(ts, NewBlockPacket, 2).appendRaw(m_chain.block(b), 1).append(m_chain.details(b).totalDifficulty);
  213. Guard l(p->x_knownBlocks);
  214. p->sealAndSend(ts);
  215. p->m_knownBlocks.clear();
  216. }
  217. for (shared_ptr<EthereumPeer> const& p: get<1>(s))
  218. {
  219. RLPStream ts;
  220. p->prep(ts, NewBlockHashesPacket, blocks.size());
  221. for (auto const& b: blocks)
  222. {
  223. ts.appendList(2);
  224. ts.append(b);
  225. ts.append(m_chain.number(b));
  226. }
  227. Guard l(p->x_knownBlocks);
  228. p->sealAndSend(ts);
  229. p->m_knownBlocks.clear();
  230. }
  231. }
  232. m_latestBlockSent = _currentHash;
  233. }
  234. }
  235. void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
  236. {
  237. RecursiveGuard l(x_sync);
  238. try
  239. {
  240. m_sync->onPeerStatus(_peer);
  241. }
  242. catch (FailedInvariant const&)
  243. {
  244. // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
  245. clog(NetWarn) << "Failed invariant during sync, restarting sync";
  246. m_sync->restartSync();
  247. }
  248. }
  249. void EthereumHost::onPeerBlockHeaders(std::shared_ptr<EthereumPeer> _peer, RLP const& _headers)
  250. {
  251. RecursiveGuard l(x_sync);
  252. try
  253. {
  254. m_sync->onPeerBlockHeaders(_peer, _headers);
  255. }
  256. catch (FailedInvariant const&)
  257. {
  258. // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
  259. clog(NetWarn) << "Failed invariant during sync, restarting sync";
  260. m_sync->restartSync();
  261. }
  262. }
  263. void EthereumHost::onPeerBlockBodies(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  264. {
  265. RecursiveGuard l(x_sync);
  266. try
  267. {
  268. m_sync->onPeerBlockBodies(_peer, _r);
  269. }
  270. catch (FailedInvariant const&)
  271. {
  272. // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
  273. clog(NetWarn) << "Failed invariant during sync, restarting sync";
  274. m_sync->restartSync();
  275. }
  276. }
  277. void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, std::vector<std::pair<h256, u256>> const& _hashes)
  278. {
  279. RecursiveGuard l(x_sync);
  280. try
  281. {
  282. m_sync->onPeerNewHashes(_peer, _hashes);
  283. }
  284. catch (FailedInvariant const&)
  285. {
  286. // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
  287. clog(NetWarn) << "Failed invariant during sync, restarting sync";
  288. m_sync->restartSync();
  289. }
  290. }
  291. void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  292. {
  293. RecursiveGuard l(x_sync);
  294. try
  295. {
  296. m_sync->onPeerNewBlock(_peer, _r);
  297. }
  298. catch (FailedInvariant const&)
  299. {
  300. // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
  301. clog(NetWarn) << "Failed invariant during sync, restarting sync";
  302. m_sync->restartSync();
  303. }
  304. }
  305. void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  306. {
  307. unsigned itemCount = _r.itemCount();
  308. clog(EthereumHostTrace) << "Transactions (" << dec << itemCount << "entries)";
  309. m_tq.enqueue(_r, _peer->session()->id());
  310. }
  311. void EthereumHost::onPeerAborting()
  312. {
  313. RecursiveGuard l(x_sync);
  314. try
  315. {
  316. m_sync->onPeerAborting();
  317. }
  318. catch (Exception&)
  319. {
  320. cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information();
  321. }
  322. }
  323. bool EthereumHost::isSyncing() const
  324. {
  325. return m_sync->isSyncing();
  326. }
  327. SyncStatus EthereumHost::status() const
  328. {
  329. RecursiveGuard l(x_sync);
  330. return m_sync->status();
  331. }
  332. void EthereumHost::onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId)
  333. {
  334. auto session = host()->peerSession(_nodeId);
  335. if (!session)
  336. return;
  337. std::shared_ptr<EthereumPeer> peer = session->cap<EthereumPeer>();
  338. if (!peer)
  339. peer = session->cap<EthereumPeer>(c_oldProtocolVersion);
  340. if (!peer)
  341. return;
  342. Guard l(peer->x_knownTransactions);
  343. peer->m_knownTransactions.insert(_h);
  344. switch (_ir)
  345. {
  346. case ImportResult::Malformed:
  347. peer->addRating(-100);
  348. break;
  349. case ImportResult::AlreadyKnown:
  350. // if we already had the transaction, then don't bother sending it on.
  351. DEV_GUARDED(x_transactions)
  352. m_transactionsSent.insert(_h);
  353. peer->addRating(0);
  354. break;
  355. case ImportResult::Success:
  356. peer->addRating(100);
  357. break;
  358. default:;
  359. }
  360. }