123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- /*
- This file is part of cpp-ethereum.
-
- cpp-ethereum is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- cpp-ethereum is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
- */
- /** @file RLPXFrameWriter.cpp
- * @author Alex Leverington <nessence@gmail.com>
- * @date 2015
- */
- #include "RLPXFrameWriter.h"
- using namespace std;
- using namespace dev;
- using namespace dev::p2p;
- const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC
- const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC
- void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority)
- {
- if (!_p.isValid())
- return;
- WriterState& qs = _priority ? m_q.first : m_q.second;
- DEV_GUARDED(qs.x)
- qs.q.push_back(move(_p));
- }
- void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority)
- {
- enque(RLPXPacket(m_protocolId, (RLPStream() << _packetType), _payload), _priority);
- }
- size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, deque<bytes>& o_toWrite)
- {
- static const size_t c_blockSize = h128::size;
- static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC
- if (_size < c_overhead + c_blockSize)
- return 0;
- size_t ret = 0;
- size_t frameLen = _size / 16 * 16;
- bytes payload(0);
- bool swapQueues = false;
- while (frameLen >= c_overhead + c_blockSize)
- {
- bool highPending = false;
- bool lowPending = false;
- DEV_GUARDED(m_q.first.x)
- highPending = !!m_q.first.q.size();
- DEV_GUARDED(m_q.second.x)
- lowPending = !!m_q.second.q.size();
- if (!highPending && !lowPending)
- return ret;
-
- // first run when !swapQueues, high > low, otherwise low > high
- bool high = highPending && !swapQueues ? true : !lowPending;
- WriterState &qs = high ? m_q.first : m_q.second;
- size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead;
- size_t offset = 0;
- size_t length = 0;
- while (frameAllot >= c_blockSize)
- {
- // Invariants:
- // !qs.writing && payload.empty() initial entry
- // !qs.writing && !payload.empty() continuation (multiple packets per frame)
- // qs.writing && payload.empty() initial entry, continuation (multiple frames per packet)
- // qs.writing && !payload.empty() INVALID
-
- // write packet-type
- if (qs.writing == nullptr)
- {
- {
- Guard l(qs.x);
- if (!qs.q.empty())
- qs.writing = &qs.q[0];
- else
- break;
- }
- // break here if we can't write-out packet-type
- // or payload is packed and next packet won't fit (implicit)
- qs.multiFrame = qs.writing->size() > frameAllot;
- assert(qs.writing->type().size());
- if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty()))
- {
- qs.writing = nullptr;
- qs.remaining = 0;
- qs.multiFrame = false;
- break;
- }
- else if (qs.multiFrame)
- qs.sequence = ++m_sequenceId;
-
- frameAllot -= qs.writing->type().size();
- payload += qs.writing->type();
-
- qs.remaining = qs.writing->data().size();
- }
-
- // write payload w/remaining allotment
- assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining));
- if (frameAllot && qs.remaining)
- {
- offset = qs.writing->data().size() - qs.remaining;
- length = qs.remaining <= frameAllot ? qs.remaining : frameAllot;
- bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes();
- qs.remaining -= length;
- frameAllot -= portion.size();
- payload += portion;
- }
-
- assert((!qs.remaining && (offset > 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame));
- if (!qs.remaining)
- {
- qs.writing = nullptr;
- DEV_GUARDED(qs.x)
- qs.q.pop_front();
- ret++;
- }
- // qs.writing is left alone for first frame of multi-frame packet
- if (qs.multiFrame)
- break;
- }
-
- if (!payload.empty())
- {
- if (qs.multiFrame)
- if (offset == 0 && qs.writing)
- // 1st frame of segmented packet writes total-size of packet
- _coder.writeFrame(m_protocolId, qs.sequence, qs.writing->size(), &payload, payload);
- else
- _coder.writeFrame(m_protocolId, qs.sequence, &payload, payload);
- else
- _coder.writeFrame(m_protocolId, &payload, payload);
- assert(frameLen >= payload.size());
- frameLen -= payload.size();
- o_toWrite.push_back(payload);
- payload.resize(0);
-
- if (!qs.remaining && qs.multiFrame)
- qs.multiFrame = false;
- }
- else if (swapQueues)
- break;
- swapQueues = true;
- }
- return ret;
- }
|