TransactionQueue.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file TransactionQueue.cpp
  15. * @author Gav Wood <i@gavwood.com>
  16. * @date 2014
  17. */
  18. #include "TransactionQueue.h"
  19. #include <libdevcore/Log.h>
  20. #include <libethcore/Exceptions.h>
  21. #include "Transaction.h"
  22. using namespace std;
  23. using namespace dev;
  24. using namespace dev::eth;
  25. const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
  26. const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
  27. const size_t c_maxVerificationQueueSize = 8192;
  28. TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
  29. m_current(PriorityCompare { *this }),
  30. m_limit(_limit),
  31. m_futureLimit(_futureLimit)
  32. {
  33. unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
  34. for (unsigned i = 0; i < verifierThreads; ++i)
  35. m_verifiers.emplace_back([=](){
  36. setThreadName("txcheck" + toString(i));
  37. this->verifierBody();
  38. });
  39. }
  40. TransactionQueue::~TransactionQueue()
  41. {
  42. m_aborting = true;
  43. m_queueReady.notify_all();
  44. for (auto& i: m_verifiers)
  45. i.join();
  46. }
  47. ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
  48. {
  49. // Check if we already know this transaction.
  50. h256 h = sha3(_transactionRLP);
  51. Transaction t;
  52. ImportResult ir;
  53. {
  54. UpgradableGuard l(m_lock);
  55. ir = check_WITH_LOCK(h, _ik);
  56. if (ir != ImportResult::Success)
  57. return ir;
  58. try
  59. {
  60. // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
  61. // If it doesn't work, the signature is bad.
  62. // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
  63. t = Transaction(_transactionRLP, CheckTransaction::Everything);
  64. UpgradeGuard ul(l);
  65. // cdebug << "Importing" << t;
  66. ir = manageImport_WITH_LOCK(h, t);
  67. }
  68. catch (...)
  69. {
  70. return ImportResult::Malformed;
  71. }
  72. }
  73. return ir;
  74. }
  75. ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
  76. {
  77. if (m_known.count(_h))
  78. return ImportResult::AlreadyKnown;
  79. if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
  80. return ImportResult::AlreadyInChain;
  81. return ImportResult::Success;
  82. }
  83. ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
  84. {
  85. // Check if we already know this transaction.
  86. h256 h = _transaction.sha3(WithSignature);
  87. ImportResult ret;
  88. {
  89. UpgradableGuard l(m_lock);
  90. auto ir = check_WITH_LOCK(h, _ik);
  91. if (ir != ImportResult::Success)
  92. return ir;
  93. {
  94. _transaction.safeSender(); // Perform EC recovery outside of the write lock
  95. UpgradeGuard ul(l);
  96. ret = manageImport_WITH_LOCK(h, _transaction);
  97. }
  98. }
  99. return ret;
  100. }
  101. Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
  102. {
  103. ReadGuard l(m_lock);
  104. Transactions ret;
  105. for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
  106. if (!_avoid.count(t->transaction.sha3()))
  107. ret.push_back(t->transaction);
  108. return ret;
  109. }
  110. h256Hash TransactionQueue::knownTransactions() const
  111. {
  112. ReadGuard l(m_lock);
  113. return m_known;
  114. }
  115. ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
  116. {
  117. try
  118. {
  119. assert(_h == _transaction.sha3());
  120. // Remove any prior transaction with the same nonce but a lower gas price.
  121. // Bomb out if there's a prior transaction with higher gas price.
  122. auto cs = m_currentByAddressAndNonce.find(_transaction.from());
  123. if (cs != m_currentByAddressAndNonce.end())
  124. {
  125. auto t = cs->second.find(_transaction.nonce());
  126. if (t != cs->second.end())
  127. {
  128. if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
  129. return ImportResult::OverbidGasPrice;
  130. else
  131. {
  132. h256 dropped = (*t->second).transaction.sha3();
  133. remove_WITH_LOCK(dropped);
  134. m_onReplaced(dropped);
  135. }
  136. }
  137. }
  138. auto fs = m_future.find(_transaction.from());
  139. if (fs != m_future.end())
  140. {
  141. auto t = fs->second.find(_transaction.nonce());
  142. if (t != fs->second.end())
  143. {
  144. if (_transaction.gasPrice() < t->second.transaction.gasPrice())
  145. return ImportResult::OverbidGasPrice;
  146. else
  147. {
  148. fs->second.erase(t);
  149. --m_futureSize;
  150. if (fs->second.empty())
  151. m_future.erase(fs);
  152. }
  153. }
  154. }
  155. // If valid, append to transactions.
  156. insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
  157. clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
  158. while (m_current.size() > m_limit)
  159. {
  160. clog(TransactionQueueTraceChannel) << "Dropping out of bounds transaction" << _h;
  161. remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
  162. }
  163. m_onReady();
  164. }
  165. catch (Exception const& _e)
  166. {
  167. ctxq << "Ignoring invalid transaction: " << diagnostic_information(_e);
  168. return ImportResult::Malformed;
  169. }
  170. catch (std::exception const& _e)
  171. {
  172. ctxq << "Ignoring invalid transaction: " << _e.what();
  173. return ImportResult::Malformed;
  174. }
  175. return ImportResult::Success;
  176. }
  177. u256 TransactionQueue::maxNonce(Address const& _a) const
  178. {
  179. ReadGuard l(m_lock);
  180. return maxNonce_WITH_LOCK(_a);
  181. }
  182. u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
  183. {
  184. u256 ret = 0;
  185. auto cs = m_currentByAddressAndNonce.find(_a);
  186. if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
  187. ret = cs->second.rbegin()->first + 1;
  188. auto fs = m_future.find(_a);
  189. if (fs != m_future.end() && !fs->second.empty())
  190. ret = std::max(ret, fs->second.rbegin()->first + 1);
  191. return ret;
  192. }
  193. void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
  194. {
  195. if (m_currentByHash.count(_p.first))
  196. {
  197. cwarn << "Transaction hash" << _p.first << "already in current?!";
  198. return;
  199. }
  200. Transaction const& t = _p.second;
  201. // Insert into current
  202. auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
  203. PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
  204. inserted.first->second = handle;
  205. m_currentByHash[_p.first] = handle;
  206. // Move following transactions from future to current
  207. makeCurrent_WITH_LOCK(t);
  208. m_known.insert(_p.first);
  209. }
  210. bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
  211. {
  212. auto t = m_currentByHash.find(_txHash);
  213. if (t == m_currentByHash.end())
  214. return false;
  215. Address from = (*t->second).transaction.from();
  216. auto it = m_currentByAddressAndNonce.find(from);
  217. assert (it != m_currentByAddressAndNonce.end());
  218. it->second.erase((*t->second).transaction.nonce());
  219. m_current.erase(t->second);
  220. m_currentByHash.erase(t);
  221. if (it->second.empty())
  222. m_currentByAddressAndNonce.erase(it);
  223. m_known.erase(_txHash);
  224. return true;
  225. }
  226. unsigned TransactionQueue::waiting(Address const& _a) const
  227. {
  228. ReadGuard l(m_lock);
  229. unsigned ret = 0;
  230. auto cs = m_currentByAddressAndNonce.find(_a);
  231. if (cs != m_currentByAddressAndNonce.end())
  232. ret = cs->second.size();
  233. auto fs = m_future.find(_a);
  234. if (fs != m_future.end())
  235. ret += fs->second.size();
  236. return ret;
  237. }
  238. void TransactionQueue::setFuture(h256 const& _txHash)
  239. {
  240. WriteGuard l(m_lock);
  241. auto it = m_currentByHash.find(_txHash);
  242. if (it == m_currentByHash.end())
  243. return;
  244. VerifiedTransaction const& st = *(it->second);
  245. Address from = st.transaction.from();
  246. auto& queue = m_currentByAddressAndNonce[from];
  247. auto& target = m_future[from];
  248. auto cutoff = queue.lower_bound(st.transaction.nonce());
  249. for (auto m = cutoff; m != queue.end(); ++m)
  250. {
  251. VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
  252. m_currentByHash.erase(t.transaction.sha3());
  253. target.emplace(t.transaction.nonce(), move(t));
  254. m_current.erase(m->second);
  255. ++m_futureSize;
  256. }
  257. queue.erase(cutoff, queue.end());
  258. if (queue.empty())
  259. m_currentByAddressAndNonce.erase(from);
  260. }
  261. void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
  262. {
  263. bool newCurrent = false;
  264. auto fs = m_future.find(_t.from());
  265. if (fs != m_future.end())
  266. {
  267. u256 nonce = _t.nonce() + 1;
  268. auto fb = fs->second.find(nonce);
  269. if (fb != fs->second.end())
  270. {
  271. auto ft = fb;
  272. while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
  273. {
  274. auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
  275. PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
  276. inserted.first->second = handle;
  277. m_currentByHash[(*handle).transaction.sha3()] = handle;
  278. --m_futureSize;
  279. ++ft;
  280. ++nonce;
  281. newCurrent = true;
  282. }
  283. fs->second.erase(fb, ft);
  284. if (fs->second.empty())
  285. m_future.erase(_t.from());
  286. }
  287. }
  288. while (m_futureSize > m_futureLimit)
  289. {
  290. // TODO: priority queue for future transactions
  291. // For now just drop random chain end
  292. --m_futureSize;
  293. clog(TransactionQueueTraceChannel) << "Dropping out of bounds future transaction" << m_future.begin()->second.rbegin()->second.transaction.sha3();
  294. m_future.begin()->second.erase(--m_future.begin()->second.end());
  295. if (m_future.begin()->second.empty())
  296. m_future.erase(m_future.begin());
  297. }
  298. if (newCurrent)
  299. m_onReady();
  300. }
  301. void TransactionQueue::drop(h256 const& _txHash)
  302. {
  303. UpgradableGuard l(m_lock);
  304. if (!m_known.count(_txHash))
  305. return;
  306. UpgradeGuard ul(l);
  307. m_dropped.insert(_txHash);
  308. remove_WITH_LOCK(_txHash);
  309. }
  310. void TransactionQueue::dropGood(Transaction const& _t)
  311. {
  312. WriteGuard l(m_lock);
  313. makeCurrent_WITH_LOCK(_t);
  314. if (!m_known.count(_t.sha3()))
  315. return;
  316. remove_WITH_LOCK(_t.sha3());
  317. }
  318. void TransactionQueue::clear()
  319. {
  320. WriteGuard l(m_lock);
  321. m_known.clear();
  322. m_current.clear();
  323. m_currentByAddressAndNonce.clear();
  324. m_currentByHash.clear();
  325. m_future.clear();
  326. m_futureSize = 0;
  327. }
  328. void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
  329. {
  330. bool queued = false;
  331. {
  332. Guard l(x_queue);
  333. unsigned itemCount = _data.itemCount();
  334. for (unsigned i = 0; i < itemCount; ++i)
  335. {
  336. if (m_unverified.size() >= c_maxVerificationQueueSize)
  337. {
  338. clog(TransactionQueueChannel) << "Transaction verification queue is full. Dropping" << itemCount - i << "transactions";
  339. break;
  340. }
  341. m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
  342. queued = true;
  343. }
  344. }
  345. if (queued)
  346. m_queueReady.notify_all();
  347. }
  348. void TransactionQueue::verifierBody()
  349. {
  350. while (!m_aborting)
  351. {
  352. UnverifiedTransaction work;
  353. {
  354. unique_lock<Mutex> l(x_queue);
  355. m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
  356. if (m_aborting)
  357. return;
  358. work = move(m_unverified.front());
  359. m_unverified.pop_front();
  360. }
  361. try
  362. {
  363. Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
  364. ImportResult ir = import(t);
  365. m_onImport(ir, t.sha3(), work.nodeId);
  366. }
  367. catch (...)
  368. {
  369. // should not happen as exceptions are handled in import.
  370. cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
  371. }
  372. }
  373. }