RLPXFrameWriter.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file RLPXFrameWriter.cpp
  15. * @author Alex Leverington <nessence@gmail.com>
  16. * @date 2015
  17. */
  18. #include "RLPXFrameWriter.h"
  19. using namespace std;
  20. using namespace dev;
  21. using namespace dev::p2p;
  22. const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC
  23. const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC
  24. void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority)
  25. {
  26. if (!_p.isValid())
  27. return;
  28. WriterState& qs = _priority ? m_q.first : m_q.second;
  29. DEV_GUARDED(qs.x)
  30. qs.q.push_back(move(_p));
  31. }
  32. void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority)
  33. {
  34. enque(RLPXPacket(m_protocolId, (RLPStream() << _packetType), _payload), _priority);
  35. }
  36. size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, deque<bytes>& o_toWrite)
  37. {
  38. static const size_t c_blockSize = h128::size;
  39. static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC
  40. if (_size < c_overhead + c_blockSize)
  41. return 0;
  42. size_t ret = 0;
  43. size_t frameLen = _size / 16 * 16;
  44. bytes payload(0);
  45. bool swapQueues = false;
  46. while (frameLen >= c_overhead + c_blockSize)
  47. {
  48. bool highPending = false;
  49. bool lowPending = false;
  50. DEV_GUARDED(m_q.first.x)
  51. highPending = !!m_q.first.q.size();
  52. DEV_GUARDED(m_q.second.x)
  53. lowPending = !!m_q.second.q.size();
  54. if (!highPending && !lowPending)
  55. return ret;
  56. // first run when !swapQueues, high > low, otherwise low > high
  57. bool high = highPending && !swapQueues ? true : !lowPending;
  58. WriterState &qs = high ? m_q.first : m_q.second;
  59. size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead;
  60. size_t offset = 0;
  61. size_t length = 0;
  62. while (frameAllot >= c_blockSize)
  63. {
  64. // Invariants:
  65. // !qs.writing && payload.empty() initial entry
  66. // !qs.writing && !payload.empty() continuation (multiple packets per frame)
  67. // qs.writing && payload.empty() initial entry, continuation (multiple frames per packet)
  68. // qs.writing && !payload.empty() INVALID
  69. // write packet-type
  70. if (qs.writing == nullptr)
  71. {
  72. {
  73. Guard l(qs.x);
  74. if (!qs.q.empty())
  75. qs.writing = &qs.q[0];
  76. else
  77. break;
  78. }
  79. // break here if we can't write-out packet-type
  80. // or payload is packed and next packet won't fit (implicit)
  81. qs.multiFrame = qs.writing->size() > frameAllot;
  82. assert(qs.writing->type().size());
  83. if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty()))
  84. {
  85. qs.writing = nullptr;
  86. qs.remaining = 0;
  87. qs.multiFrame = false;
  88. break;
  89. }
  90. else if (qs.multiFrame)
  91. qs.sequence = ++m_sequenceId;
  92. frameAllot -= qs.writing->type().size();
  93. payload += qs.writing->type();
  94. qs.remaining = qs.writing->data().size();
  95. }
  96. // write payload w/remaining allotment
  97. assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining));
  98. if (frameAllot && qs.remaining)
  99. {
  100. offset = qs.writing->data().size() - qs.remaining;
  101. length = qs.remaining <= frameAllot ? qs.remaining : frameAllot;
  102. bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes();
  103. qs.remaining -= length;
  104. frameAllot -= portion.size();
  105. payload += portion;
  106. }
  107. assert((!qs.remaining && (offset > 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame));
  108. if (!qs.remaining)
  109. {
  110. qs.writing = nullptr;
  111. DEV_GUARDED(qs.x)
  112. qs.q.pop_front();
  113. ret++;
  114. }
  115. // qs.writing is left alone for first frame of multi-frame packet
  116. if (qs.multiFrame)
  117. break;
  118. }
  119. if (!payload.empty())
  120. {
  121. if (qs.multiFrame)
  122. if (offset == 0 && qs.writing)
  123. // 1st frame of segmented packet writes total-size of packet
  124. _coder.writeFrame(m_protocolId, qs.sequence, qs.writing->size(), &payload, payload);
  125. else
  126. _coder.writeFrame(m_protocolId, qs.sequence, &payload, payload);
  127. else
  128. _coder.writeFrame(m_protocolId, &payload, payload);
  129. assert(frameLen >= payload.size());
  130. frameLen -= payload.size();
  131. o_toWrite.push_back(payload);
  132. payload.resize(0);
  133. if (!qs.remaining && qs.multiFrame)
  134. qs.multiFrame = false;
  135. }
  136. else if (swapQueues)
  137. break;
  138. swapQueues = true;
  139. }
  140. return ret;
  141. }