gfcp_fec.go 7.6 KB


  1. // Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
  2. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  3. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  4. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package gfcp
  11. import (
  12. "encoding/binary"
  13. "sync/atomic"
  14. "github.com/klauspost/reedsolomon"
  15. )
  16. const (
  17. fecHeaderSize = 6
  18. fecHeaderSizePlus2 = fecHeaderSize + 2
  19. // KTypeData ...
  20. KTypeData = 0xf1
  21. // KTypeParity ...
  22. KTypeParity = 0xf2
  23. )
  24. // FecPacket ...
  25. type FecPacket []byte
  26. func (
  27. bts FecPacket,
  28. ) seqid() uint32 {
  29. return binary.LittleEndian.Uint32(
  30. bts,
  31. )
  32. }
  33. func (
  34. bts FecPacket,
  35. ) flag() uint16 {
  36. return binary.LittleEndian.Uint16(
  37. bts[4:],
  38. )
  39. }
  40. func (
  41. bts FecPacket,
  42. ) data() []byte {
  43. return bts[6:]
  44. }
  45. // FecDecoder ...
  46. type FecDecoder struct {
  47. rxlimit int
  48. dataShards int
  49. parityShards int
  50. shardSize int
  51. rx []FecPacket
  52. DecodeCache [][]byte
  53. flagCache []bool
  54. zeros []byte
  55. codec reedsolomon.Encoder
  56. }
  57. // NewFECDecoder ...
  58. func NewFECDecoder(
  59. rxlimit,
  60. dataShards,
  61. parityShards int,
  62. ) *FecDecoder {
  63. if dataShards <= 0 || parityShards <= 0 {
  64. return nil
  65. }
  66. if rxlimit < dataShards+parityShards {
  67. return nil
  68. }
  69. dec := new(
  70. FecDecoder,
  71. )
  72. dec.rxlimit = rxlimit
  73. dec.dataShards = dataShards
  74. dec.parityShards = parityShards
  75. dec.shardSize = dataShards + parityShards
  76. codec, err := reedsolomon.New(
  77. dataShards,
  78. parityShards,
  79. )
  80. if err != nil {
  81. return nil
  82. }
  83. dec.codec = codec
  84. dec.DecodeCache = make(
  85. [][]byte,
  86. dec.shardSize,
  87. )
  88. dec.flagCache = make(
  89. []bool,
  90. dec.shardSize,
  91. )
  92. dec.zeros = make(
  93. []byte,
  94. GFcpMtuLimit,
  95. )
  96. return dec
  97. }
  98. // Decode ...
  99. func (
  100. dec *FecDecoder,
  101. ) Decode(
  102. in FecPacket,
  103. ) (
  104. recovered [][]byte,
  105. ) {
  106. n := len(
  107. dec.rx,
  108. ) - 1
  109. insertIdx := 0
  110. for i := n; i >= 0; i-- {
  111. if in.seqid() == dec.rx[i].seqid() {
  112. return nil
  113. } else if _itimediff(
  114. in.seqid(),
  115. dec.rx[i].seqid(),
  116. ) > 0 {
  117. insertIdx = i + 1
  118. break
  119. }
  120. }
  121. // make a copy
  122. pkt := FecPacket(KxmitBuf.Get().([]byte)[:len(in)])
  123. copy(
  124. pkt,
  125. in,
  126. )
  127. if insertIdx == n+1 {
  128. dec.rx = append(
  129. dec.rx,
  130. pkt,
  131. )
  132. } else {
  133. dec.rx = append(
  134. dec.rx,
  135. FecPacket{},
  136. )
  137. copy(
  138. dec.rx[insertIdx+1:],
  139. dec.rx[insertIdx:],
  140. )
  141. dec.rx[insertIdx] = pkt
  142. }
  143. shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
  144. shardEnd := shardBegin + uint32(dec.shardSize) - 1
  145. searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
  146. if searchBegin < 0 {
  147. searchBegin = 0
  148. }
  149. searchEnd := searchBegin + dec.shardSize - 1
  150. if searchEnd >= len(
  151. dec.rx,
  152. ) {
  153. searchEnd = len(
  154. dec.rx,
  155. ) - 1
  156. }
  157. if searchEnd-searchBegin+1 >= dec.dataShards {
  158. var numshard, numDataShard, first, maxlen int
  159. shards := dec.DecodeCache
  160. shardsflag := dec.flagCache
  161. for k := range dec.DecodeCache {
  162. shards[k] = nil
  163. shardsflag[k] = false
  164. }
  165. for i := searchBegin; i <= searchEnd; i++ {
  166. seqid := dec.rx[i].seqid()
  167. if _itimediff(
  168. seqid,
  169. shardEnd,
  170. ) > 0 {
  171. break
  172. } else if _itimediff(
  173. seqid,
  174. shardBegin,
  175. ) >= 0 {
  176. shards[seqid%uint32(
  177. dec.shardSize,
  178. )] = dec.rx[i].data()
  179. shardsflag[seqid%uint32(
  180. dec.shardSize,
  181. )] = true
  182. numshard++
  183. if dec.rx[i].flag() == KTypeData {
  184. numDataShard++
  185. }
  186. if numshard == 1 {
  187. first = i
  188. }
  189. if len(
  190. dec.rx[i].data(),
  191. ) > maxlen {
  192. maxlen = len(
  193. dec.rx[i].data(),
  194. )
  195. }
  196. }
  197. }
  198. if numDataShard == dec.dataShards {
  199. dec.rx = dec.freeRange(
  200. first,
  201. numshard,
  202. dec.rx,
  203. )
  204. } else if numshard >= dec.dataShards {
  205. for k := range shards {
  206. if shards[k] != nil {
  207. dlen := len(
  208. shards[k],
  209. )
  210. shards[k] = shards[k][:maxlen]
  211. copy(shards[k][dlen:], dec.zeros)
  212. } else if k < dec.dataShards {
  213. shards[k] = KxmitBuf.Get().([]byte)[:0]
  214. }
  215. }
  216. if err := dec.codec.ReconstructData(
  217. shards,
  218. ); err == nil {
  219. for k := range shards[:dec.dataShards] {
  220. if !shardsflag[k] {
  221. recovered = append(
  222. recovered,
  223. shards[k],
  224. )
  225. }
  226. }
  227. }
  228. dec.rx = dec.freeRange(
  229. first,
  230. numshard,
  231. dec.rx,
  232. )
  233. }
  234. }
  235. if len(dec.rx) > dec.rxlimit {
  236. if dec.rx[0].flag() == KTypeData {
  237. atomic.AddUint64(
  238. &DefaultSnsi.GFcpFECRuntShards,
  239. 1,
  240. )
  241. }
  242. dec.rx = dec.freeRange(
  243. 0,
  244. 1,
  245. dec.rx,
  246. )
  247. }
  248. return
  249. }
  250. func (
  251. dec *FecDecoder,
  252. ) freeRange(
  253. first,
  254. n int,
  255. q []FecPacket,
  256. ) []FecPacket {
  257. for i := first; i < first+n; i++ {
  258. // TODO(jhj): Switch to pointer to avoid allocation.
  259. KxmitBuf.Put(
  260. []byte(
  261. q[i],
  262. ),
  263. )
  264. }
  265. if first == 0 && n < (cap(q)/2) {
  266. return q[n:]
  267. }
  268. copy(
  269. q[first:],
  270. q[first+n:],
  271. )
  272. return q[:len(
  273. q,
  274. )-n]
  275. }
  276. type (
  277. // FecEncoder ...
  278. FecEncoder struct {
  279. dataShards int
  280. parityShards int
  281. shardSize int
  282. paws uint32 // Protect Against Wrapped Sequence numbers
  283. next uint32 // next seqid
  284. shardCount int // count the number of datashards collected
  285. maxSize int // track maximum data length in datashard
  286. headerOffset int // FEC header offset
  287. payloadOffset int // FEC payload offset
  288. shardCache [][]byte
  289. EncodeCache [][]byte
  290. zeros []byte
  291. codec reedsolomon.Encoder
  292. }
  293. )
  294. // NewFECEncoder ...
  295. func NewFECEncoder(
  296. dataShards,
  297. parityShards,
  298. offset int,
  299. ) *FecEncoder {
  300. if dataShards <= 0 || parityShards <= 0 {
  301. return nil
  302. }
  303. enc := new(
  304. FecEncoder,
  305. )
  306. enc.dataShards = dataShards
  307. enc.parityShards = parityShards
  308. enc.shardSize = dataShards + parityShards
  309. enc.paws = (0xFFFFFFFF/uint32(enc.shardSize) - 1) * uint32(enc.shardSize)
  310. enc.headerOffset = offset
  311. enc.payloadOffset = enc.headerOffset + fecHeaderSize
  312. codec, err := reedsolomon.New(
  313. dataShards,
  314. parityShards,
  315. )
  316. if err != nil {
  317. return nil
  318. }
  319. enc.codec = codec
  320. enc.EncodeCache = make(
  321. [][]byte,
  322. enc.shardSize,
  323. )
  324. enc.shardCache = make(
  325. [][]byte,
  326. enc.shardSize,
  327. )
  328. for k := range enc.shardCache {
  329. enc.shardCache[k] = make(
  330. []byte,
  331. GFcpMtuLimit,
  332. )
  333. }
  334. enc.zeros = make(
  335. []byte,
  336. GFcpMtuLimit,
  337. )
  338. return enc
  339. }
  340. // Encode ...
  341. func (
  342. enc *FecEncoder,
  343. ) Encode(
  344. b []byte,
  345. ) (
  346. ps [][]byte,
  347. ) {
  348. enc.markData(
  349. b[enc.headerOffset:],
  350. )
  351. binary.LittleEndian.PutUint16(
  352. b[enc.payloadOffset:],
  353. uint16(
  354. len(
  355. b[enc.payloadOffset:],
  356. ),
  357. ),
  358. )
  359. sz := len(
  360. b,
  361. )
  362. enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
  363. copy(
  364. enc.shardCache[enc.shardCount][enc.payloadOffset:],
  365. b[enc.payloadOffset:],
  366. )
  367. enc.shardCount++
  368. if sz > enc.maxSize {
  369. enc.maxSize = sz
  370. }
  371. if enc.shardCount == enc.dataShards {
  372. for i := 0; i < enc.dataShards; i++ {
  373. shard := enc.shardCache[i]
  374. slen := len(
  375. shard,
  376. )
  377. copy(
  378. shard[slen:enc.maxSize],
  379. enc.zeros,
  380. )
  381. }
  382. cache := enc.EncodeCache
  383. for k := range cache {
  384. cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
  385. }
  386. if err := enc.codec.Encode(
  387. cache,
  388. ); err == nil {
  389. ps = enc.shardCache[enc.dataShards:]
  390. for k := range ps {
  391. enc.markParity(
  392. ps[k][enc.headerOffset:],
  393. )
  394. ps[k] = ps[k][:enc.maxSize]
  395. }
  396. }
  397. enc.shardCount = 0
  398. enc.maxSize = 0
  399. }
  400. return
  401. }
  402. func (
  403. enc *FecEncoder,
  404. ) markData(
  405. data []byte,
  406. ) {
  407. binary.LittleEndian.PutUint32(
  408. data,
  409. enc.next,
  410. )
  411. binary.LittleEndian.PutUint16(
  412. data[4:],
  413. KTypeData,
  414. )
  415. enc.next++
  416. }
  417. func (
  418. enc *FecEncoder,
  419. ) markParity(
  420. data []byte,
  421. ) {
  422. binary.LittleEndian.PutUint32(
  423. data,
  424. enc.next,
  425. )
  426. binary.LittleEndian.PutUint16(
  427. data[4:],
  428. KTypeParity,
  429. )
  430. enc.next = (enc.next + 1) % enc.paws
  431. }