gfcp_fec.go 7.5 KB


  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  4. //
  5. // All rights reserved.
  6. //
  7. // All use of this code is governed by the MIT license.
  8. // The complete license is available in the LICENSE file.
  9. package gfcp // import "go.gridfinity.dev/gfcp"
  10. import (
  11. "encoding/binary"
  12. "sync/atomic"
  13. "github.com/klauspost/reedsolomon"
  14. )
  15. const (
  16. fecHeaderSize = 6
  17. fecHeaderSizePlus2 = fecHeaderSize + 2
  18. // KTypeData ...
  19. KTypeData = 0xf1
  20. // KTypeParity ...
  21. KTypeParity = 0xf2
  22. )
  23. // FecPacket ...
  24. type FecPacket []byte
  25. func (
  26. bts FecPacket,
  27. ) seqid() uint32 {
  28. return binary.LittleEndian.Uint32(
  29. bts,
  30. )
  31. }
  32. func (
  33. bts FecPacket,
  34. ) flag() uint16 {
  35. return binary.LittleEndian.Uint16(
  36. bts[4:],
  37. )
  38. }
  39. func (
  40. bts FecPacket,
  41. ) data() []byte {
  42. return bts[6:]
  43. }
  44. // FecDecoder ...
  45. type FecDecoder struct {
  46. rxlimit int
  47. dataShards int
  48. parityShards int
  49. shardSize int
  50. rx []FecPacket
  51. DecodeCache [][]byte
  52. flagCache []bool
  53. zeros []byte
  54. codec reedsolomon.Encoder
  55. }
  56. // NewFECDecoder ...
  57. func NewFECDecoder(
  58. rxlimit,
  59. dataShards,
  60. parityShards int,
  61. ) *FecDecoder {
  62. if dataShards <= 0 || parityShards <= 0 {
  63. return nil
  64. }
  65. if rxlimit < dataShards+parityShards {
  66. return nil
  67. }
  68. dec := new(
  69. FecDecoder,
  70. )
  71. dec.rxlimit = rxlimit
  72. dec.dataShards = dataShards
  73. dec.parityShards = parityShards
  74. dec.shardSize = dataShards + parityShards
  75. codec, err := reedsolomon.New(
  76. dataShards,
  77. parityShards,
  78. )
  79. if err != nil {
  80. return nil
  81. }
  82. dec.codec = codec
  83. dec.DecodeCache = make(
  84. [][]byte,
  85. dec.shardSize,
  86. )
  87. dec.flagCache = make(
  88. []bool,
  89. dec.shardSize,
  90. )
  91. dec.zeros = make(
  92. []byte,
  93. GFcpMtuLimit,
  94. )
  95. return dec
  96. }
  97. // Decode ...
  98. func (
  99. dec *FecDecoder,
  100. ) Decode(
  101. in FecPacket,
  102. ) (
  103. recovered [][]byte,
  104. ) {
  105. n := len(
  106. dec.rx,
  107. ) - 1
  108. insertIdx := 0
  109. for i := n; i >= 0; i-- {
  110. if in.seqid() == dec.rx[i].seqid() {
  111. return nil
  112. } else if _itimediff(
  113. in.seqid(),
  114. dec.rx[i].seqid(),
  115. ) > 0 {
  116. insertIdx = i + 1
  117. break
  118. }
  119. }
  120. // make a copy
  121. pkt := FecPacket(KxmitBuf.Get().([]byte)[:len(in)])
  122. copy(
  123. pkt,
  124. in,
  125. )
  126. if insertIdx == n+1 {
  127. dec.rx = append(
  128. dec.rx,
  129. pkt,
  130. )
  131. } else {
  132. dec.rx = append(
  133. dec.rx,
  134. FecPacket{},
  135. )
  136. copy(
  137. dec.rx[insertIdx+1:],
  138. dec.rx[insertIdx:],
  139. )
  140. dec.rx[insertIdx] = pkt
  141. }
  142. shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
  143. shardEnd := shardBegin + uint32(dec.shardSize) - 1
  144. searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
  145. if searchBegin < 0 {
  146. searchBegin = 0
  147. }
  148. searchEnd := searchBegin + dec.shardSize - 1
  149. if searchEnd >= len(
  150. dec.rx,
  151. ) {
  152. searchEnd = len(
  153. dec.rx,
  154. ) - 1
  155. }
  156. if searchEnd-searchBegin+1 >= dec.dataShards {
  157. var numshard, numDataShard, first, maxlen int
  158. shards := dec.DecodeCache
  159. shardsflag := dec.flagCache
  160. for k := range dec.DecodeCache {
  161. shards[k] = nil
  162. shardsflag[k] = false
  163. }
  164. for i := searchBegin; i <= searchEnd; i++ {
  165. seqid := dec.rx[i].seqid()
  166. if _itimediff(
  167. seqid,
  168. shardEnd,
  169. ) > 0 {
  170. break
  171. } else if _itimediff(
  172. seqid,
  173. shardBegin,
  174. ) >= 0 {
  175. shards[seqid%uint32(
  176. dec.shardSize,
  177. )] = dec.rx[i].data()
  178. shardsflag[seqid%uint32(
  179. dec.shardSize,
  180. )] = true
  181. numshard++
  182. if dec.rx[i].flag() == KTypeData {
  183. numDataShard++
  184. }
  185. if numshard == 1 {
  186. first = i
  187. }
  188. if len(
  189. dec.rx[i].data(),
  190. ) > maxlen {
  191. maxlen = len(
  192. dec.rx[i].data(),
  193. )
  194. }
  195. }
  196. }
  197. if numDataShard == dec.dataShards {
  198. dec.rx = dec.freeRange(
  199. first,
  200. numshard,
  201. dec.rx,
  202. )
  203. } else if numshard >= dec.dataShards {
  204. for k := range shards {
  205. if shards[k] != nil {
  206. dlen := len(
  207. shards[k],
  208. )
  209. shards[k] = shards[k][:maxlen]
  210. copy(shards[k][dlen:], dec.zeros)
  211. } else {
  212. shards[k] = KxmitBuf.Get().([]byte)[:0]
  213. }
  214. }
  215. if err := dec.codec.ReconstructData(
  216. shards,
  217. ); err == nil {
  218. for k := range shards[:dec.dataShards] {
  219. if !shardsflag[k] {
  220. recovered = append(
  221. recovered,
  222. shards[k],
  223. )
  224. }
  225. }
  226. }
  227. dec.rx = dec.freeRange(
  228. first,
  229. numshard,
  230. dec.rx,
  231. )
  232. }
  233. }
  234. if len(dec.rx) > dec.rxlimit {
  235. if dec.rx[0].flag() == KTypeData {
  236. atomic.AddUint64(
  237. &DefaultSnsi.GFcpFECRuntShards,
  238. 1,
  239. )
  240. }
  241. dec.rx = dec.freeRange(
  242. 0,
  243. 1,
  244. dec.rx,
  245. )
  246. }
  247. return
  248. }
  249. func (
  250. dec *FecDecoder,
  251. ) freeRange(
  252. first,
  253. n int,
  254. q []FecPacket,
  255. ) []FecPacket {
  256. for i := first; i < first+n; i++ {
  257. // TODO(jhj): Switch to pointer to avoid allocation.
  258. KxmitBuf.Put(
  259. []byte(
  260. q[i],
  261. ),
  262. )
  263. }
  264. if first == 0 && n < (cap(q)/2) {
  265. return q[n:]
  266. }
  267. copy(
  268. q[first:],
  269. q[first+n:],
  270. )
  271. return q[:len(
  272. q,
  273. )-n]
  274. }
  275. type (
  276. // FecEncoder ...
  277. FecEncoder struct {
  278. dataShards int
  279. parityShards int
  280. shardSize int
  281. paws uint32 // Protect Against Wrapped Sequence numbers
  282. next uint32 // next seqid
  283. shardCount int // count the number of datashards collected
  284. maxSize int // track maximum data length in datashard
  285. headerOffset int // FEC header offset
  286. payloadOffset int // FEC payload offset
  287. shardCache [][]byte
  288. EncodeCache [][]byte
  289. zeros []byte
  290. codec reedsolomon.Encoder
  291. }
  292. )
  293. // NewFECEncoder ...
  294. func NewFECEncoder(
  295. dataShards,
  296. parityShards,
  297. offset int,
  298. ) *FecEncoder {
  299. if dataShards <= 0 || parityShards <= 0 {
  300. return nil
  301. }
  302. enc := new(
  303. FecEncoder,
  304. )
  305. enc.dataShards = dataShards
  306. enc.parityShards = parityShards
  307. enc.shardSize = dataShards + parityShards
  308. enc.paws = (0xFFFFFFFF/uint32(enc.shardSize) - 1) * uint32(enc.shardSize)
  309. enc.headerOffset = offset
  310. enc.payloadOffset = enc.headerOffset + fecHeaderSize
  311. codec, err := reedsolomon.New(
  312. dataShards,
  313. parityShards,
  314. )
  315. if err != nil {
  316. return nil
  317. }
  318. enc.codec = codec
  319. enc.EncodeCache = make(
  320. [][]byte,
  321. enc.shardSize,
  322. )
  323. enc.shardCache = make(
  324. [][]byte,
  325. enc.shardSize,
  326. )
  327. for k := range enc.shardCache {
  328. enc.shardCache[k] = make(
  329. []byte,
  330. GFcpMtuLimit,
  331. )
  332. }
  333. enc.zeros = make(
  334. []byte,
  335. GFcpMtuLimit,
  336. )
  337. return enc
  338. }
  339. // Encode ...
  340. func (
  341. enc *FecEncoder,
  342. ) Encode(
  343. b []byte,
  344. ) (
  345. ps [][]byte,
  346. ) {
  347. enc.markData(
  348. b[enc.headerOffset:],
  349. )
  350. binary.LittleEndian.PutUint16(
  351. b[enc.payloadOffset:],
  352. uint16(
  353. len(
  354. b[enc.payloadOffset:],
  355. ),
  356. ),
  357. )
  358. sz := len(
  359. b,
  360. )
  361. enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
  362. copy(
  363. enc.shardCache[enc.shardCount][enc.payloadOffset:],
  364. b[enc.payloadOffset:],
  365. )
  366. enc.shardCount++
  367. if sz > enc.maxSize {
  368. enc.maxSize = sz
  369. }
  370. if enc.shardCount == enc.dataShards {
  371. for i := 0; i < enc.dataShards; i++ {
  372. shard := enc.shardCache[i]
  373. slen := len(
  374. shard,
  375. )
  376. copy(
  377. shard[slen:enc.maxSize],
  378. enc.zeros,
  379. )
  380. }
  381. cache := enc.EncodeCache
  382. for k := range cache {
  383. cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
  384. }
  385. if err := enc.codec.Encode(
  386. cache,
  387. ); err == nil {
  388. ps = enc.shardCache[enc.dataShards:]
  389. for k := range ps {
  390. enc.markParity(
  391. ps[k][enc.headerOffset:],
  392. )
  393. ps[k] = ps[k][:enc.maxSize]
  394. }
  395. }
  396. enc.shardCount = 0
  397. enc.maxSize = 0
  398. }
  399. return
  400. }
  401. func (
  402. enc *FecEncoder,
  403. ) markData(
  404. data []byte,
  405. ) {
  406. binary.LittleEndian.PutUint32(
  407. data,
  408. enc.next,
  409. )
  410. binary.LittleEndian.PutUint16(
  411. data[4:],
  412. KTypeData,
  413. )
  414. enc.next++
  415. }
  416. func (
  417. enc *FecEncoder,
  418. ) markParity(
  419. data []byte,
  420. ) {
  421. binary.LittleEndian.PutUint32(
  422. data,
  423. enc.next,
  424. )
  425. binary.LittleEndian.PutUint16(
  426. data[4:],
  427. KTypeParity,
  428. )
  429. enc.next = (enc.next + 1) % enc.paws
  430. }