123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- /*
- This file is part of cpp-ethereum.
- cpp-ethereum is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- cpp-ethereum is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
- */
- /** @file TransactionQueue.cpp
- * @author Gav Wood <i@gavwood.com>
- * @date 2014
- */
- #include "TransactionQueue.h"
- #include <libdevcore/Log.h>
- #include <libethcore/Exceptions.h>
- #include "Transaction.h"
- using namespace std;
- using namespace dev;
- using namespace dev::eth;
- const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
- const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
- const size_t c_maxVerificationQueueSize = 8192;
- TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
- m_current(PriorityCompare { *this }),
- m_limit(_limit),
- m_futureLimit(_futureLimit)
- {
- unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
- for (unsigned i = 0; i < verifierThreads; ++i)
- m_verifiers.emplace_back([=](){
- setThreadName("txcheck" + toString(i));
- this->verifierBody();
- });
- }
- TransactionQueue::~TransactionQueue()
- {
- m_aborting = true;
- m_queueReady.notify_all();
- for (auto& i: m_verifiers)
- i.join();
- }
- ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
- {
- // Check if we already know this transaction.
- h256 h = sha3(_transactionRLP);
- Transaction t;
- ImportResult ir;
- {
- UpgradableGuard l(m_lock);
- ir = check_WITH_LOCK(h, _ik);
- if (ir != ImportResult::Success)
- return ir;
- try
- {
- // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
- // If it doesn't work, the signature is bad.
- // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
- t = Transaction(_transactionRLP, CheckTransaction::Everything);
- UpgradeGuard ul(l);
- // cdebug << "Importing" << t;
- ir = manageImport_WITH_LOCK(h, t);
- }
- catch (...)
- {
- return ImportResult::Malformed;
- }
- }
- return ir;
- }
- ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
- {
- if (m_known.count(_h))
- return ImportResult::AlreadyKnown;
- if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
- return ImportResult::AlreadyInChain;
- return ImportResult::Success;
- }
- ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
- {
- // Check if we already know this transaction.
- h256 h = _transaction.sha3(WithSignature);
- ImportResult ret;
- {
- UpgradableGuard l(m_lock);
- auto ir = check_WITH_LOCK(h, _ik);
- if (ir != ImportResult::Success)
- return ir;
- {
- _transaction.safeSender(); // Perform EC recovery outside of the write lock
- UpgradeGuard ul(l);
- ret = manageImport_WITH_LOCK(h, _transaction);
- }
- }
- return ret;
- }
- Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
- {
- ReadGuard l(m_lock);
- Transactions ret;
- for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
- if (!_avoid.count(t->transaction.sha3()))
- ret.push_back(t->transaction);
- return ret;
- }
- h256Hash TransactionQueue::knownTransactions() const
- {
- ReadGuard l(m_lock);
- return m_known;
- }
- ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
- {
- try
- {
- assert(_h == _transaction.sha3());
- // Remove any prior transaction with the same nonce but a lower gas price.
- // Bomb out if there's a prior transaction with higher gas price.
- auto cs = m_currentByAddressAndNonce.find(_transaction.from());
- if (cs != m_currentByAddressAndNonce.end())
- {
- auto t = cs->second.find(_transaction.nonce());
- if (t != cs->second.end())
- {
- if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
- return ImportResult::OverbidGasPrice;
- else
- {
- h256 dropped = (*t->second).transaction.sha3();
- remove_WITH_LOCK(dropped);
- m_onReplaced(dropped);
- }
- }
- }
- auto fs = m_future.find(_transaction.from());
- if (fs != m_future.end())
- {
- auto t = fs->second.find(_transaction.nonce());
- if (t != fs->second.end())
- {
- if (_transaction.gasPrice() < t->second.transaction.gasPrice())
- return ImportResult::OverbidGasPrice;
- else
- {
- fs->second.erase(t);
- --m_futureSize;
- if (fs->second.empty())
- m_future.erase(fs);
- }
- }
- }
- // If valid, append to transactions.
- insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
- clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
- while (m_current.size() > m_limit)
- {
- clog(TransactionQueueTraceChannel) << "Dropping out of bounds transaction" << _h;
- remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
- }
- m_onReady();
- }
- catch (Exception const& _e)
- {
- ctxq << "Ignoring invalid transaction: " << diagnostic_information(_e);
- return ImportResult::Malformed;
- }
- catch (std::exception const& _e)
- {
- ctxq << "Ignoring invalid transaction: " << _e.what();
- return ImportResult::Malformed;
- }
- return ImportResult::Success;
- }
- u256 TransactionQueue::maxNonce(Address const& _a) const
- {
- ReadGuard l(m_lock);
- return maxNonce_WITH_LOCK(_a);
- }
- u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
- {
- u256 ret = 0;
- auto cs = m_currentByAddressAndNonce.find(_a);
- if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
- ret = cs->second.rbegin()->first + 1;
- auto fs = m_future.find(_a);
- if (fs != m_future.end() && !fs->second.empty())
- ret = std::max(ret, fs->second.rbegin()->first + 1);
- return ret;
- }
- void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
- {
- if (m_currentByHash.count(_p.first))
- {
- cwarn << "Transaction hash" << _p.first << "already in current?!";
- return;
- }
- Transaction const& t = _p.second;
- // Insert into current
- auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
- PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
- inserted.first->second = handle;
- m_currentByHash[_p.first] = handle;
- // Move following transactions from future to current
- makeCurrent_WITH_LOCK(t);
- m_known.insert(_p.first);
- }
- bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
- {
- auto t = m_currentByHash.find(_txHash);
- if (t == m_currentByHash.end())
- return false;
- Address from = (*t->second).transaction.from();
- auto it = m_currentByAddressAndNonce.find(from);
- assert (it != m_currentByAddressAndNonce.end());
- it->second.erase((*t->second).transaction.nonce());
- m_current.erase(t->second);
- m_currentByHash.erase(t);
- if (it->second.empty())
- m_currentByAddressAndNonce.erase(it);
- m_known.erase(_txHash);
- return true;
- }
- unsigned TransactionQueue::waiting(Address const& _a) const
- {
- ReadGuard l(m_lock);
- unsigned ret = 0;
- auto cs = m_currentByAddressAndNonce.find(_a);
- if (cs != m_currentByAddressAndNonce.end())
- ret = cs->second.size();
- auto fs = m_future.find(_a);
- if (fs != m_future.end())
- ret += fs->second.size();
- return ret;
- }
- void TransactionQueue::setFuture(h256 const& _txHash)
- {
- WriteGuard l(m_lock);
- auto it = m_currentByHash.find(_txHash);
- if (it == m_currentByHash.end())
- return;
- VerifiedTransaction const& st = *(it->second);
- Address from = st.transaction.from();
- auto& queue = m_currentByAddressAndNonce[from];
- auto& target = m_future[from];
- auto cutoff = queue.lower_bound(st.transaction.nonce());
- for (auto m = cutoff; m != queue.end(); ++m)
- {
- VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
- m_currentByHash.erase(t.transaction.sha3());
- target.emplace(t.transaction.nonce(), move(t));
- m_current.erase(m->second);
- ++m_futureSize;
- }
- queue.erase(cutoff, queue.end());
- if (queue.empty())
- m_currentByAddressAndNonce.erase(from);
- }
- void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
- {
- bool newCurrent = false;
- auto fs = m_future.find(_t.from());
- if (fs != m_future.end())
- {
- u256 nonce = _t.nonce() + 1;
- auto fb = fs->second.find(nonce);
- if (fb != fs->second.end())
- {
- auto ft = fb;
- while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
- {
- auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
- PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
- inserted.first->second = handle;
- m_currentByHash[(*handle).transaction.sha3()] = handle;
- --m_futureSize;
- ++ft;
- ++nonce;
- newCurrent = true;
- }
- fs->second.erase(fb, ft);
- if (fs->second.empty())
- m_future.erase(_t.from());
- }
- }
- while (m_futureSize > m_futureLimit)
- {
- // TODO: priority queue for future transactions
- // For now just drop random chain end
- --m_futureSize;
- clog(TransactionQueueTraceChannel) << "Dropping out of bounds future transaction" << m_future.begin()->second.rbegin()->second.transaction.sha3();
- m_future.begin()->second.erase(--m_future.begin()->second.end());
- if (m_future.begin()->second.empty())
- m_future.erase(m_future.begin());
- }
- if (newCurrent)
- m_onReady();
- }
- void TransactionQueue::drop(h256 const& _txHash)
- {
- UpgradableGuard l(m_lock);
- if (!m_known.count(_txHash))
- return;
- UpgradeGuard ul(l);
- m_dropped.insert(_txHash);
- remove_WITH_LOCK(_txHash);
- }
- void TransactionQueue::dropGood(Transaction const& _t)
- {
- WriteGuard l(m_lock);
- makeCurrent_WITH_LOCK(_t);
- if (!m_known.count(_t.sha3()))
- return;
- remove_WITH_LOCK(_t.sha3());
- }
- void TransactionQueue::clear()
- {
- WriteGuard l(m_lock);
- m_known.clear();
- m_current.clear();
- m_currentByAddressAndNonce.clear();
- m_currentByHash.clear();
- m_future.clear();
- m_futureSize = 0;
- }
- void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
- {
- bool queued = false;
- {
- Guard l(x_queue);
- unsigned itemCount = _data.itemCount();
- for (unsigned i = 0; i < itemCount; ++i)
- {
- if (m_unverified.size() >= c_maxVerificationQueueSize)
- {
- clog(TransactionQueueChannel) << "Transaction verification queue is full. Dropping" << itemCount - i << "transactions";
- break;
- }
- m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
- queued = true;
- }
- }
- if (queued)
- m_queueReady.notify_all();
- }
- void TransactionQueue::verifierBody()
- {
- while (!m_aborting)
- {
- UnverifiedTransaction work;
- {
- unique_lock<Mutex> l(x_queue);
- m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
- if (m_aborting)
- return;
- work = move(m_unverified.front());
- m_unverified.pop_front();
- }
- try
- {
- Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
- ImportResult ir = import(t);
- m_onImport(ir, t.sha3(), work.nodeId);
- }
- catch (...)
- {
- // should not happen as exceptions are handled in import.
- cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
- }
- }
- }
|