gfcp.go 25 KB


  1. // Package gfcp - A Fast and Reliable ARQ Protocol
  2. //
  3. // Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
  4. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  5. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  6. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  7. //
  8. // All rights reserved.
  9. //
  10. // All use of this code is governed by the MIT license.
  11. // The complete license is available in the LICENSE file.
  12. package gfcp
  13. import (
  14. "encoding/binary"
  15. "math"
  16. "runtime/debug"
  17. "sync/atomic"
  18. gfcpLegal "go4.org/legal"
  19. )
  20. // Gfcp protocol constants
  21. const (
  22. GfcpRtoNdl = 20 // GfcpRtoNdl: NoDelay min RTO
  23. GfcpRtoMin = 120 // GfcpRtoMin: Regular min RTO
  24. GfcpRtoDef = 340
  25. GfcpRtoMax = 60000
  26. GfcpCmdPush = 81 // GfcpCmdPush: Push data
  27. GfcpCmdAck = 82 // GfcpCmdAck: Ack
  28. GfcpCmdWask = 83 // GfcpCmdWask: Get Window Size
  29. GfcpCmdWins = 84 // GfcpCmdWins: Set window Size
  30. GfcpAskSend = 1 // GfcpAskSend: Need to send GfcpCmdWask
  31. GfcpAskTell = 2 // GfcpAskTell: Need to send GfcpCmdWins
  32. GfcpWndSnd = 32
  33. GfcpWndRcv = 32
  34. GfcpMtuDef = 1480
  35. GfcpAckFast = 3
  36. GfcpInterval = 100
  37. GfcpOverhead = 24
  38. GfcpDeadLink = 20
  39. GfcpThreshInit = 2
  40. GfcpThreshMin = 2
  41. GfcpProbeInit = 7000 // 7s initial probe window
  42. GfcpProbeLimit = 102000 // 120s hard probe timeout
  43. )
  44. type outputCallback func(
  45. buf []byte,
  46. size int,
  47. )
  48. func gfcpEncode8u(
  49. p []byte,
  50. c byte,
  51. ) []byte {
  52. p[0] = c
  53. return p[1:]
  54. }
  55. func gfcpDecode8u(
  56. p []byte,
  57. c *byte,
  58. ) []byte {
  59. *c = p[0]
  60. return p[1:]
  61. }
  62. func gfcpEncode16u(
  63. p []byte,
  64. w uint16,
  65. ) []byte {
  66. binary.LittleEndian.PutUint16(
  67. p,
  68. w,
  69. )
  70. return p[2:]
  71. }
  72. func gfcpDecode16u(
  73. p []byte,
  74. w *uint16,
  75. ) []byte {
  76. *w = binary.LittleEndian.Uint16(
  77. p,
  78. )
  79. return p[2:]
  80. }
  81. func gfcpEncode32u(
  82. p []byte,
  83. l uint32,
  84. ) []byte {
  85. binary.LittleEndian.PutUint32(
  86. p,
  87. l,
  88. )
  89. return p[4:]
  90. }
  91. func gfcpDecode32u(
  92. p []byte,
  93. l *uint32,
  94. ) []byte {
  95. *l = binary.LittleEndian.Uint32(
  96. p,
  97. )
  98. return p[4:]
  99. }
  100. func _imin(
  101. a,
  102. b uint32,
  103. ) uint32 {
  104. if a <= b {
  105. return a
  106. }
  107. return b
  108. }
  109. func _imax(
  110. a,
  111. b uint32,
  112. ) uint32 {
  113. if a >= b {
  114. return a
  115. }
  116. return b
  117. }
  118. func _ibound(
  119. lower,
  120. middle,
  121. upper uint32,
  122. ) uint32 {
  123. return _imin(
  124. _imax(
  125. lower,
  126. middle,
  127. ),
  128. upper,
  129. )
  130. }
  131. func _itimediff(
  132. later,
  133. earlier uint32,
  134. ) int32 {
  135. return (int32)(later - earlier)
  136. }
  137. // Segment structure
  138. type Segment struct {
  139. conv uint32
  140. cmd uint8
  141. frg uint8
  142. wnd uint16
  143. ts uint32
  144. sn uint32
  145. una uint32
  146. rto uint32
  147. Kxmit uint32
  148. GFcpResendTs uint32
  149. fastack uint32
  150. acked uint32
  151. data []byte
  152. }
  153. func (
  154. GFcpSeg *Segment,
  155. ) encode(
  156. ptr []byte,
  157. ) []byte {
  158. ptr = gfcpEncode32u(
  159. ptr,
  160. GFcpSeg.conv,
  161. )
  162. ptr = gfcpEncode8u(
  163. ptr,
  164. GFcpSeg.cmd,
  165. )
  166. ptr = gfcpEncode8u(
  167. ptr,
  168. GFcpSeg.frg,
  169. )
  170. ptr = gfcpEncode16u(
  171. ptr,
  172. GFcpSeg.wnd,
  173. )
  174. ptr = gfcpEncode32u(
  175. ptr,
  176. GFcpSeg.ts,
  177. )
  178. ptr = gfcpEncode32u(
  179. ptr,
  180. GFcpSeg.sn,
  181. )
  182. ptr = gfcpEncode32u(
  183. ptr,
  184. GFcpSeg.una,
  185. )
  186. ptr = gfcpEncode32u(
  187. ptr, uint32(len(
  188. GFcpSeg.data,
  189. )))
  190. atomic.AddUint64(
  191. &DefaultSnsi.GFcpOutputSegments,
  192. 1,
  193. )
  194. return ptr
  195. }
  196. // GFCP primary structure
  197. type GFCP struct {
  198. conv, mtu, mss, state uint32
  199. sndUna, sndNxt, rcvNxt uint32
  200. ssthresh uint32
  201. rxRttVar, rxSrtt int32
  202. rxRto, rxMinRto uint32
  203. sndWnd, rcvWnd, rmtWnd, cwnd, probe uint32
  204. interval, tsFlush uint32
  205. nodelay, updated uint32
  206. tsProbe, probeWait uint32
  207. deadLink, incr uint32
  208. fastresend int32
  209. nocwnd, stream int32
  210. sndQueue []Segment
  211. rcvQueue []Segment
  212. SndBuf []Segment
  213. rcvBuf []Segment
  214. acklist []ackItem
  215. buffer []byte
  216. reserved int
  217. output outputCallback
  218. }
  219. type ackItem struct {
  220. sn uint32
  221. ts uint32
  222. }
  223. // NewGFCP creates a new GFcp control object.
  224. func NewGFCP(
  225. conv uint32,
  226. output outputCallback,
  227. ) *GFCP {
  228. GFcp := new(
  229. GFCP,
  230. )
  231. GFcp.conv = conv
  232. GFcp.sndWnd = GfcpWndSnd
  233. GFcp.rcvWnd = GfcpWndRcv
  234. GFcp.rmtWnd = GfcpWndRcv
  235. GFcp.mtu = GfcpMtuDef
  236. GFcp.mss = GFcp.mtu - GfcpOverhead
  237. GFcp.buffer = make(
  238. []byte,
  239. GFcp.mtu,
  240. )
  241. GFcp.rxRto = GfcpRtoDef
  242. GFcp.rxMinRto = GfcpRtoMin
  243. GFcp.interval = GfcpInterval
  244. GFcp.tsFlush = GfcpInterval
  245. GFcp.ssthresh = GfcpThreshInit
  246. GFcp.deadLink = GfcpDeadLink
  247. GFcp.output = output
  248. return GFcp
  249. }
  250. func (
  251. GFcp *GFCP,
  252. ) newSegment(
  253. size int,
  254. ) (
  255. GFcpSeg Segment,
  256. ) {
  257. GFcpSeg.data = KxmitBuf.Get().([]byte)[:size]
  258. return
  259. }
  260. func (
  261. GFcp *GFCP,
  262. ) delSegment(
  263. GFcpSeg *Segment,
  264. ) {
  265. if GFcpSeg.data != nil {
  266. KxmitBuf.Put(
  267. // TODO(jhj): Switch to pointer to avoid allocation
  268. GFcpSeg.data,
  269. )
  270. GFcpSeg.data = nil
  271. }
  272. }
  273. // ReserveBytes keeps 'n' bytes from the beginning of buffering.
  274. // Output callbacks use this to return 'false' if 'n' >= 'mss'.
  275. func (
  276. GFcp *GFCP,
  277. ) ReserveBytes(
  278. n int,
  279. ) bool {
  280. if n >= int(
  281. GFcp.mtu-GfcpOverhead,
  282. ) || n < 0 {
  283. return false
  284. }
  285. GFcp.reserved = n
  286. GFcp.mss = GFcp.mtu - GfcpOverhead - uint32(
  287. n,
  288. )
  289. return true
  290. }
  291. // PeekSize checks the size of next message in the receive queue.
  292. func (
  293. GFcp *GFCP,
  294. ) PeekSize() (
  295. length int,
  296. ) {
  297. if len(
  298. GFcp.rcvQueue,
  299. ) == 0 {
  300. return -1
  301. }
  302. GFcpSeg := &GFcp.rcvQueue[0]
  303. if GFcpSeg.frg == 0 {
  304. return len(
  305. GFcpSeg.data,
  306. )
  307. }
  308. if len(
  309. GFcp.rcvQueue,
  310. ) < int(
  311. GFcpSeg.frg+1,
  312. ) {
  313. return -1
  314. }
  315. for k := range GFcp.rcvQueue {
  316. GFcpSeg := &GFcp.rcvQueue[k]
  317. length += len(
  318. GFcpSeg.data,
  319. )
  320. if GFcpSeg.frg == 0 {
  321. break
  322. }
  323. }
  324. return
  325. }
  326. // Recv is upper level recviver; returns size or EAGAIN on error.
  327. func (
  328. GFcp *GFCP,
  329. ) Recv(
  330. buffer []byte,
  331. ) (
  332. n int,
  333. ) {
  334. if len(
  335. GFcp.rcvQueue,
  336. ) == 0 {
  337. return -1
  338. }
  339. peeksize := GFcp.PeekSize()
  340. if peeksize < 0 {
  341. return -2
  342. }
  343. if peeksize > len(
  344. buffer,
  345. ) {
  346. return -3
  347. }
  348. var fastRecovery bool
  349. if len(
  350. GFcp.rcvQueue,
  351. ) >= int(
  352. GFcp.rcvWnd,
  353. ) {
  354. fastRecovery = true
  355. }
  356. count := 0
  357. for k := range GFcp.rcvQueue {
  358. GFcpSeg := &GFcp.rcvQueue[k]
  359. copy(
  360. buffer,
  361. GFcpSeg.data,
  362. )
  363. buffer = buffer[len(
  364. GFcpSeg.data,
  365. ):]
  366. n += len(
  367. GFcpSeg.data,
  368. )
  369. count++
  370. GFcp.delSegment(
  371. GFcpSeg,
  372. )
  373. if GFcpSeg.frg == 0 {
  374. break
  375. }
  376. }
  377. if count > 0 {
  378. GFcp.rcvQueue = GFcp.removeFront(
  379. GFcp.rcvQueue,
  380. count,
  381. )
  382. }
  383. count = 0
  384. for k := range GFcp.rcvBuf {
  385. GFcpSeg := &GFcp.rcvBuf[k]
  386. if GFcpSeg.sn == GFcp.rcvNxt && len(
  387. GFcp.rcvQueue,
  388. ) < int(
  389. GFcp.rcvWnd,
  390. ) {
  391. GFcp.rcvNxt++
  392. count++
  393. } else {
  394. break
  395. }
  396. }
  397. if count > 0 {
  398. GFcp.rcvQueue = append(
  399. GFcp.rcvQueue,
  400. GFcp.rcvBuf[:count]...,
  401. )
  402. GFcp.rcvBuf = GFcp.removeFront(
  403. GFcp.rcvBuf,
  404. count,
  405. )
  406. }
  407. if len(
  408. GFcp.rcvQueue,
  409. ) < int(
  410. GFcp.rcvWnd,
  411. ) && fastRecovery {
  412. GFcp.probe |= GfcpAskTell
  413. }
  414. return
  415. }
  416. // Send is upper level sender, returns <0 on error.
  417. func (
  418. GFcp *GFCP,
  419. ) Send(
  420. buffer []byte,
  421. ) int {
  422. var count int
  423. if len(
  424. buffer,
  425. ) == 0 {
  426. return -1
  427. }
  428. if GFcp.stream != 0 {
  429. n := len(
  430. GFcp.sndQueue,
  431. )
  432. if n > 0 {
  433. GFcpSeg := &GFcp.sndQueue[n-1]
  434. if len(
  435. GFcpSeg.data,
  436. ) < int(
  437. GFcp.mss,
  438. ) {
  439. capacity := int(
  440. GFcp.mss,
  441. ) - len(
  442. GFcpSeg.data,
  443. )
  444. extend := capacity
  445. if len(
  446. buffer,
  447. ) < capacity {
  448. extend = len(
  449. buffer,
  450. )
  451. }
  452. oldlen := len(
  453. GFcpSeg.data,
  454. )
  455. GFcpSeg.data = GFcpSeg.data[:oldlen+extend]
  456. copy(
  457. GFcpSeg.data[oldlen:],
  458. buffer,
  459. )
  460. buffer = buffer[extend:]
  461. }
  462. }
  463. if len(
  464. buffer,
  465. ) == 0 {
  466. return 0
  467. }
  468. }
  469. if len(
  470. buffer,
  471. ) <= int(
  472. GFcp.mss,
  473. ) {
  474. count = 1
  475. } else {
  476. count = (len(
  477. buffer,
  478. ) + int(
  479. GFcp.mss,
  480. ) - 1) / int(
  481. GFcp.mss,
  482. )
  483. }
  484. if count > 255 {
  485. return -2
  486. }
  487. if count == 0 {
  488. count = 1
  489. }
  490. for i := 0; i < count; i++ {
  491. var size int
  492. if len(
  493. buffer,
  494. ) > int(
  495. GFcp.mss,
  496. ) {
  497. size = int(
  498. GFcp.mss,
  499. )
  500. } else {
  501. size = len(
  502. buffer,
  503. )
  504. }
  505. GFcpSeg := GFcp.newSegment(
  506. size,
  507. )
  508. copy(
  509. GFcpSeg.data,
  510. buffer[:size],
  511. )
  512. if GFcp.stream == 0 {
  513. GFcpSeg.frg = uint8(
  514. count - i - 1,
  515. )
  516. } else {
  517. GFcpSeg.frg = 0
  518. }
  519. GFcp.sndQueue = append(
  520. GFcp.sndQueue,
  521. GFcpSeg,
  522. )
  523. buffer = buffer[size:]
  524. }
  525. return 0
  526. }
  527. func (
  528. GFcp *GFCP,
  529. ) updateAck(
  530. rtt int32,
  531. ) {
  532. var rto uint32
  533. if GFcp.rxSrtt == 0 {
  534. GFcp.rxSrtt = rtt
  535. GFcp.rxRttVar = rtt >> 1
  536. } else {
  537. delta := rtt - GFcp.rxSrtt
  538. GFcp.rxSrtt += delta >> 3
  539. if delta < 0 {
  540. delta = -delta
  541. }
  542. if rtt < GFcp.rxSrtt-GFcp.rxRttVar {
  543. GFcp.rxRttVar += (delta - GFcp.rxRttVar) >> 5
  544. } else {
  545. GFcp.rxRttVar += (delta - GFcp.rxRttVar) >> 2
  546. }
  547. }
  548. rto = uint32(
  549. GFcp.rxSrtt,
  550. ) + _imax(
  551. GFcp.interval,
  552. uint32(
  553. GFcp.rxRttVar,
  554. )<<2)
  555. GFcp.rxRto = _ibound(
  556. GFcp.rxMinRto,
  557. rto,
  558. GfcpRtoMax,
  559. )
  560. }
  561. func (
  562. GFcp *GFCP,
  563. ) shrinkBuf() {
  564. if len(
  565. GFcp.SndBuf,
  566. ) > 0 {
  567. GFcpSeg := &GFcp.SndBuf[0]
  568. GFcp.sndUna = GFcpSeg.sn
  569. } else {
  570. GFcp.sndUna = GFcp.sndNxt
  571. }
  572. }
  573. func (
  574. GFcp *GFCP,
  575. ) parseAck(
  576. sn uint32,
  577. ) {
  578. if _itimediff(
  579. sn,
  580. GFcp.sndUna,
  581. ) < 0 || _itimediff(
  582. sn,
  583. GFcp.sndNxt,
  584. ) >= 0 {
  585. return
  586. }
  587. for k := range GFcp.SndBuf {
  588. GFcpSeg := &GFcp.SndBuf[k]
  589. if sn == GFcpSeg.sn {
  590. GFcpSeg.acked = 1
  591. GFcp.delSegment(
  592. GFcpSeg,
  593. )
  594. break
  595. }
  596. if _itimediff(
  597. sn,
  598. GFcpSeg.sn,
  599. ) < 0 {
  600. break
  601. }
  602. }
  603. }
  604. func (
  605. GFcp *GFCP,
  606. ) parseFastack(
  607. sn, ts uint32,
  608. ) {
  609. if _itimediff(
  610. sn,
  611. GFcp.sndUna,
  612. ) < 0 || _itimediff(
  613. sn,
  614. GFcp.sndNxt,
  615. ) >= 0 {
  616. return
  617. }
  618. for k := range GFcp.SndBuf {
  619. GFcpSeg := &GFcp.SndBuf[k]
  620. if _itimediff(
  621. sn,
  622. GFcpSeg.sn,
  623. ) < 0 {
  624. break
  625. } else if sn != GFcpSeg.sn && _itimediff(
  626. GFcpSeg.ts,
  627. ts,
  628. ) <= 0 {
  629. GFcpSeg.fastack++
  630. }
  631. }
  632. }
  633. func (
  634. GFcp *GFCP,
  635. ) parseUna(
  636. una uint32,
  637. ) {
  638. count := 0
  639. for k := range GFcp.SndBuf {
  640. GFcpSeg := &GFcp.SndBuf[k]
  641. if _itimediff(
  642. una,
  643. GFcpSeg.sn,
  644. ) > 0 {
  645. GFcp.delSegment(
  646. GFcpSeg,
  647. )
  648. count++
  649. } else {
  650. break
  651. }
  652. }
  653. if count > 0 {
  654. GFcp.SndBuf = GFcp.removeFront(
  655. GFcp.SndBuf,
  656. count,
  657. )
  658. }
  659. }
  660. func (
  661. GFcp *GFCP,
  662. ) ackPush(
  663. sn,
  664. ts uint32,
  665. ) {
  666. GFcp.acklist = append(
  667. GFcp.acklist,
  668. ackItem{
  669. sn,
  670. ts,
  671. })
  672. }
  673. func (
  674. GFcp *GFCP,
  675. ) parseData(
  676. newGFcpSeg Segment,
  677. ) bool {
  678. sn := newGFcpSeg.sn
  679. if _itimediff(
  680. sn,
  681. GFcp.rcvNxt+GFcp.rcvWnd,
  682. ) >= 0 ||
  683. _itimediff(
  684. sn,
  685. GFcp.rcvNxt,
  686. ) < 0 {
  687. return true
  688. }
  689. n := len(
  690. GFcp.rcvBuf,
  691. ) - 1
  692. insertIdx := 0
  693. repeat := false
  694. for i := n; i >= 0; i-- {
  695. GFcpSeg := &GFcp.rcvBuf[i]
  696. if GFcpSeg.sn == sn {
  697. repeat = true
  698. break
  699. }
  700. if _itimediff(
  701. sn,
  702. GFcpSeg.sn,
  703. ) > 0 {
  704. insertIdx = i + 1
  705. break
  706. }
  707. }
  708. if !repeat {
  709. dataCopy := KxmitBuf.Get().([]byte)[:len(newGFcpSeg.data)]
  710. copy(
  711. dataCopy,
  712. newGFcpSeg.data,
  713. )
  714. newGFcpSeg.data = dataCopy
  715. if insertIdx == n+1 {
  716. GFcp.rcvBuf = append(
  717. GFcp.rcvBuf,
  718. newGFcpSeg,
  719. )
  720. } else {
  721. GFcp.rcvBuf = append(
  722. GFcp.rcvBuf,
  723. Segment{},
  724. )
  725. copy(
  726. GFcp.rcvBuf[insertIdx+1:],
  727. GFcp.rcvBuf[insertIdx:],
  728. )
  729. GFcp.rcvBuf[insertIdx] = newGFcpSeg
  730. }
  731. }
  732. count := 0
  733. for k := range GFcp.rcvBuf {
  734. GFcpSeg := &GFcp.rcvBuf[k]
  735. if GFcpSeg.sn == GFcp.rcvNxt && len(
  736. GFcp.rcvQueue,
  737. ) < int(
  738. GFcp.rcvWnd,
  739. ) {
  740. GFcp.rcvNxt++
  741. count++
  742. } else {
  743. break
  744. }
  745. }
  746. if count > 0 {
  747. GFcp.rcvQueue = append(
  748. GFcp.rcvQueue,
  749. GFcp.rcvBuf[:count]...,
  750. )
  751. GFcp.rcvBuf = GFcp.removeFront(
  752. GFcp.rcvBuf,
  753. count,
  754. )
  755. }
  756. return repeat
  757. }
  758. // Input receives a (low-level) UDP packet, and determinines if
  759. // a full packet has been processsed (not by the FEC algorithm)
  760. func (
  761. GFcp *GFCP,
  762. ) Input(
  763. data []byte,
  764. regular,
  765. ackNoDelay bool,
  766. ) int {
  767. sndUna := GFcp.sndUna
  768. if len(
  769. data,
  770. ) < GfcpOverhead {
  771. return -1
  772. }
  773. var latest uint32
  774. var flag int
  775. var inSegs uint64
  776. for {
  777. var ts,
  778. sn,
  779. length,
  780. una,
  781. conv uint32
  782. var wnd uint16
  783. var cmd,
  784. frg uint8
  785. if len(
  786. data,
  787. ) < int(
  788. GfcpOverhead,
  789. ) {
  790. break
  791. }
  792. data = gfcpDecode32u(
  793. data,
  794. &conv,
  795. )
  796. if conv != GFcp.conv {
  797. return -1
  798. }
  799. data = gfcpDecode8u(
  800. data,
  801. &cmd,
  802. )
  803. data = gfcpDecode8u(
  804. data,
  805. &frg,
  806. )
  807. data = gfcpDecode16u(
  808. data,
  809. &wnd,
  810. )
  811. data = gfcpDecode32u(
  812. data,
  813. &ts,
  814. )
  815. data = gfcpDecode32u(
  816. data,
  817. &sn,
  818. )
  819. data = gfcpDecode32u(
  820. data,
  821. &una,
  822. )
  823. data = gfcpDecode32u(
  824. data,
  825. &length,
  826. )
  827. if len(
  828. data,
  829. ) < int(
  830. length,
  831. ) {
  832. return -2
  833. }
  834. if cmd != GfcpCmdPush && cmd != GfcpCmdAck &&
  835. cmd != GfcpCmdWask && cmd != GfcpCmdWins {
  836. return -3
  837. }
  838. if regular {
  839. GFcp.rmtWnd = uint32(
  840. wnd,
  841. )
  842. }
  843. GFcp.parseUna(
  844. una,
  845. )
  846. GFcp.shrinkBuf()
  847. if cmd == GfcpCmdAck {
  848. GFcp.parseAck(
  849. sn,
  850. )
  851. GFcp.parseFastack(
  852. sn,
  853. ts,
  854. )
  855. flag |= 1
  856. latest = ts
  857. } else if cmd == GfcpCmdPush {
  858. repeat := true
  859. if _itimediff(
  860. sn,
  861. GFcp.rcvNxt+GFcp.rcvWnd,
  862. ) < 0 {
  863. GFcp.ackPush(
  864. sn,
  865. ts,
  866. )
  867. if _itimediff(
  868. sn,
  869. GFcp.rcvNxt,
  870. ) >= 0 {
  871. var GFcpSeg Segment
  872. GFcpSeg.conv = conv
  873. GFcpSeg.cmd = cmd
  874. GFcpSeg.frg = frg
  875. GFcpSeg.wnd = wnd
  876. GFcpSeg.ts = ts
  877. GFcpSeg.sn = sn
  878. GFcpSeg.una = una
  879. GFcpSeg.data = data[:length]
  880. repeat = GFcp.parseData(
  881. GFcpSeg,
  882. )
  883. }
  884. }
  885. if regular && repeat {
  886. atomic.AddUint64(
  887. &DefaultSnsi.GFcpDupSegments,
  888. 1,
  889. )
  890. }
  891. } else if cmd == GfcpCmdWask {
  892. GFcp.probe |= GfcpAskTell
  893. //} else if cmd == GfcpCmdWins {
  894. // XXX(jhj) ??? FUCK YOU CHINKS
  895. } else {
  896. return -3
  897. }
  898. inSegs++
  899. data = data[length:]
  900. }
  901. atomic.AddUint64(
  902. &DefaultSnsi.GFcpInputSegments,
  903. inSegs,
  904. )
  905. if flag != 0 && regular {
  906. current := CurrentMs()
  907. if _itimediff(
  908. current,
  909. latest,
  910. ) >= 0 {
  911. GFcp.updateAck(
  912. _itimediff(
  913. current,
  914. latest,
  915. ),
  916. )
  917. }
  918. }
  919. if GFcp.nocwnd == 0 {
  920. if _itimediff(
  921. GFcp.sndUna,
  922. sndUna,
  923. ) > 0 {
  924. if GFcp.cwnd < GFcp.rmtWnd {
  925. mss := GFcp.mss
  926. if GFcp.cwnd < GFcp.ssthresh {
  927. GFcp.cwnd++
  928. GFcp.incr += mss
  929. } else {
  930. if GFcp.incr < mss {
  931. GFcp.incr = mss
  932. }
  933. GFcp.incr += (mss*mss)/GFcp.incr + (mss / 16)
  934. if (GFcp.cwnd+1)*mss <= GFcp.incr {
  935. GFcp.cwnd++
  936. }
  937. }
  938. if GFcp.cwnd > GFcp.rmtWnd {
  939. GFcp.cwnd = GFcp.rmtWnd
  940. GFcp.incr = GFcp.rmtWnd * mss
  941. }
  942. }
  943. }
  944. }
  945. if ackNoDelay && len(
  946. GFcp.acklist,
  947. ) > 0 {
  948. GFcp.Flush(
  949. true,
  950. )
  951. }
  952. return 0
  953. }
  954. func (
  955. GFcp *GFCP,
  956. ) wndUnused() uint16 {
  957. if len(
  958. GFcp.rcvQueue,
  959. ) < int(GFcp.rcvWnd) {
  960. return uint16(
  961. int(
  962. GFcp.rcvWnd,
  963. ) - len(
  964. GFcp.rcvQueue,
  965. ),
  966. )
  967. }
  968. return 0
  969. }
  970. // Flush ...
  971. func (
  972. GFcp *GFCP,
  973. ) Flush(
  974. ackOnly bool,
  975. ) uint32 {
  976. var GFcpSeg Segment
  977. GFcpSeg.conv = GFcp.conv
  978. GFcpSeg.cmd = GfcpCmdAck
  979. GFcpSeg.wnd = GFcp.wndUnused()
  980. GFcpSeg.una = GFcp.rcvNxt
  981. buffer := GFcp.buffer
  982. ptr := buffer[GFcp.reserved:]
  983. makeSpace := func(
  984. space int,
  985. ) {
  986. size := len(
  987. buffer,
  988. ) - len(
  989. ptr,
  990. )
  991. if size+space > int(
  992. GFcp.mtu,
  993. ) {
  994. GFcp.output(
  995. buffer,
  996. size,
  997. )
  998. ptr = buffer[GFcp.reserved:]
  999. }
  1000. }
  1001. FlushBuffer := func() {
  1002. size := len(
  1003. buffer,
  1004. ) - len(
  1005. ptr,
  1006. )
  1007. if size > GFcp.reserved {
  1008. GFcp.output(
  1009. buffer,
  1010. size,
  1011. )
  1012. }
  1013. }
  1014. for i, ack := range GFcp.acklist {
  1015. makeSpace(
  1016. GfcpOverhead,
  1017. )
  1018. if ack.sn >= GFcp.rcvNxt || len(
  1019. GFcp.acklist,
  1020. )-1 == i {
  1021. GFcpSeg.sn,
  1022. GFcpSeg.ts = ack.sn,
  1023. ack.ts
  1024. ptr = GFcpSeg.encode(
  1025. ptr,
  1026. )
  1027. }
  1028. }
  1029. GFcp.acklist = GFcp.acklist[0:0]
  1030. if ackOnly {
  1031. FlushBuffer()
  1032. return GFcp.interval
  1033. }
  1034. if GFcp.rmtWnd == 0 {
  1035. current := CurrentMs()
  1036. if GFcp.probeWait == 0 {
  1037. GFcp.probeWait = GfcpProbeInit
  1038. GFcp.tsProbe = current + GFcp.probeWait
  1039. } else if _itimediff(
  1040. current,
  1041. GFcp.tsProbe,
  1042. ) >= 0 {
  1043. if GFcp.probeWait < GfcpProbeInit {
  1044. GFcp.probeWait = GfcpProbeInit
  1045. }
  1046. GFcp.probeWait += GFcp.probeWait / 2
  1047. if GFcp.probeWait > GfcpProbeLimit {
  1048. GFcp.probeWait = GfcpProbeLimit
  1049. }
  1050. GFcp.tsProbe = current + GFcp.probeWait
  1051. GFcp.probe |= GfcpAskSend
  1052. }
  1053. }
  1054. GFcp.tsProbe = 0
  1055. GFcp.probeWait = 0
  1056. if (GFcp.probe & GfcpAskSend) != 0 {
  1057. GFcpSeg.cmd = GfcpCmdWask
  1058. makeSpace(
  1059. GfcpOverhead,
  1060. )
  1061. ptr = GFcpSeg.encode(
  1062. ptr,
  1063. )
  1064. }
  1065. if (GFcp.probe & GfcpAskTell) != 0 {
  1066. GFcpSeg.cmd = GfcpCmdWins
  1067. makeSpace(
  1068. GfcpOverhead,
  1069. )
  1070. ptr = GFcpSeg.encode(
  1071. ptr,
  1072. )
  1073. }
  1074. GFcp.probe = 0
  1075. cwnd := _imin(
  1076. GFcp.sndWnd,
  1077. GFcp.rmtWnd,
  1078. )
  1079. if GFcp.nocwnd == 0 {
  1080. cwnd = _imin(
  1081. GFcp.cwnd,
  1082. cwnd,
  1083. )
  1084. }
  1085. newSegsCount := 0
  1086. for k := range GFcp.sndQueue {
  1087. if _itimediff(
  1088. GFcp.sndNxt,
  1089. GFcp.sndUna+cwnd,
  1090. ) >= 0 {
  1091. break
  1092. }
  1093. newGFcpSeg := GFcp.sndQueue[k]
  1094. newGFcpSeg.conv = GFcp.conv
  1095. newGFcpSeg.cmd = GfcpCmdPush
  1096. newGFcpSeg.sn = GFcp.sndNxt
  1097. GFcp.SndBuf = append(
  1098. GFcp.SndBuf,
  1099. newGFcpSeg,
  1100. )
  1101. GFcp.sndNxt++
  1102. newSegsCount++
  1103. }
  1104. if newSegsCount > 0 {
  1105. GFcp.sndQueue = GFcp.removeFront(
  1106. GFcp.sndQueue,
  1107. newSegsCount,
  1108. )
  1109. }
  1110. resent := uint32(
  1111. GFcp.fastresend,
  1112. )
  1113. if GFcp.fastresend <= 0 {
  1114. resent = 0xFFFFFFFF
  1115. }
  1116. current := CurrentMs()
  1117. var change,
  1118. lostSegs,
  1119. fastGFcpRestransmittedSegments,
  1120. earlyGFcpRestransmittedSegments uint64
  1121. minrto := int32(
  1122. GFcp.interval,
  1123. )
  1124. ref := GFcp.SndBuf[:len(
  1125. GFcp.SndBuf,
  1126. )]
  1127. for k := range ref {
  1128. Segment := &ref[k]
  1129. needsend := false
  1130. if Segment.acked == 1 {
  1131. continue
  1132. }
  1133. if Segment.Kxmit == 0 {
  1134. needsend = true
  1135. Segment.rto = GFcp.rxRto
  1136. Segment.GFcpResendTs = current + Segment.rto
  1137. } else if _itimediff(
  1138. current,
  1139. Segment.GFcpResendTs,
  1140. ) >= 0 {
  1141. needsend = true
  1142. if GFcp.nodelay == 0 {
  1143. Segment.rto += GFcp.rxRto
  1144. } else {
  1145. Segment.rto += GFcp.rxRto / 2
  1146. }
  1147. Segment.GFcpResendTs = current + Segment.rto
  1148. lostSegs++
  1149. } else if Segment.fastack >= resent {
  1150. needsend = true
  1151. Segment.fastack = 0
  1152. Segment.rto = GFcp.rxRto
  1153. Segment.GFcpResendTs = current + Segment.rto
  1154. change++
  1155. fastGFcpRestransmittedSegments++
  1156. } else if Segment.fastack > 0 && newSegsCount == 0 {
  1157. needsend = true
  1158. Segment.fastack = 0
  1159. Segment.rto = GFcp.rxRto
  1160. Segment.GFcpResendTs = current + Segment.rto
  1161. change++
  1162. earlyGFcpRestransmittedSegments++
  1163. }
  1164. if needsend {
  1165. current = CurrentMs()
  1166. Segment.Kxmit++
  1167. Segment.ts = current
  1168. Segment.wnd = GFcpSeg.wnd
  1169. Segment.una = GFcpSeg.una
  1170. need := GfcpOverhead + len(
  1171. Segment.data,
  1172. )
  1173. makeSpace(
  1174. need,
  1175. )
  1176. ptr = Segment.encode(
  1177. ptr,
  1178. )
  1179. copy(
  1180. ptr,
  1181. Segment.data,
  1182. )
  1183. ptr = ptr[len(
  1184. Segment.data,
  1185. ):]
  1186. if Segment.Kxmit >= GFcp.deadLink {
  1187. GFcp.state = 0xFFFFFFFF
  1188. }
  1189. }
  1190. if rto := _itimediff(
  1191. Segment.GFcpResendTs,
  1192. current,
  1193. ); rto > 0 && rto < minrto {
  1194. minrto = rto
  1195. }
  1196. }
  1197. FlushBuffer()
  1198. sum := lostSegs
  1199. if lostSegs > 0 {
  1200. atomic.AddUint64(
  1201. &DefaultSnsi.GFcpLostSegments,
  1202. lostSegs,
  1203. )
  1204. }
  1205. if fastGFcpRestransmittedSegments > 0 {
  1206. atomic.AddUint64(
  1207. &DefaultSnsi.FastGFcpRestransmittedSegments,
  1208. fastGFcpRestransmittedSegments,
  1209. )
  1210. sum += fastGFcpRestransmittedSegments
  1211. }
  1212. if earlyGFcpRestransmittedSegments > 0 {
  1213. atomic.AddUint64(
  1214. &DefaultSnsi.EarlyGFcpRestransmittedSegments,
  1215. earlyGFcpRestransmittedSegments,
  1216. )
  1217. sum += earlyGFcpRestransmittedSegments
  1218. }
  1219. if sum > 0 {
  1220. atomic.AddUint64(
  1221. &DefaultSnsi.GFcpRestransmittedSegments,
  1222. sum,
  1223. )
  1224. }
  1225. if GFcp.nocwnd == 0 {
  1226. if change > 0 {
  1227. inflight := GFcp.sndNxt - GFcp.sndUna
  1228. GFcp.ssthresh = inflight / 2
  1229. if GFcp.ssthresh < GfcpThreshMin {
  1230. GFcp.ssthresh = GfcpThreshMin
  1231. }
  1232. GFcp.cwnd = GFcp.ssthresh + resent
  1233. GFcp.incr = GFcp.cwnd * GFcp.mss
  1234. }
  1235. if lostSegs > 0 {
  1236. GFcp.ssthresh = cwnd / 2
  1237. if GFcp.ssthresh < GfcpThreshMin {
  1238. GFcp.ssthresh = GfcpThreshMin
  1239. }
  1240. GFcp.cwnd = 1
  1241. GFcp.incr = GFcp.mss
  1242. }
  1243. if GFcp.cwnd < 1 {
  1244. GFcp.cwnd = 1
  1245. GFcp.incr = GFcp.mss
  1246. }
  1247. }
  1248. return uint32(
  1249. minrto,
  1250. )
  1251. }
  1252. // Update is called repeatedly, 10ms to 100ms, queried via gfcp_check
  1253. // without gfcp_input or _send executing, returning timestamp in ms.
  1254. func (
  1255. GFcp *GFCP,
  1256. ) Update() {
  1257. var slap int32
  1258. current := CurrentMs()
  1259. if GFcp.updated == 0 {
  1260. GFcp.updated = 1
  1261. GFcp.tsFlush = current
  1262. }
  1263. slap = _itimediff(
  1264. current,
  1265. GFcp.tsFlush,
  1266. )
  1267. if slap >= 10000 || slap < -10000 {
  1268. GFcp.tsFlush = current
  1269. slap = 0
  1270. }
  1271. if slap >= 0 {
  1272. GFcp.tsFlush += GFcp.interval
  1273. if _itimediff(
  1274. current,
  1275. GFcp.tsFlush,
  1276. ) >= 0 {
  1277. GFcp.tsFlush = current + GFcp.interval
  1278. }
  1279. GFcp.Flush(
  1280. false,
  1281. )
  1282. }
  1283. }
  1284. // Check function helps determine when to invoke an gfcp_update.
  1285. // It returns when you should invoke gfcp_update, in milliseconds,
  1286. // if there is no gfcp_input or _send calling. You may repeatdly
  1287. // call gfcp_update instead of update, to reduce most unnacessary
  1288. // gfcp_update invocations. This function may be used to schedule
  1289. // gfcp_updates, when implementing an epoll-like mechanism, or for
  1290. // optimizing an gfcp_update loop handling massive GFcp connections.
  1291. func (
  1292. GFcp *GFCP,
  1293. ) Check() uint32 {
  1294. current := CurrentMs()
  1295. tsFlush := GFcp.tsFlush
  1296. tmFlush := int32(
  1297. math.MaxInt32,
  1298. )
  1299. tmPacket := int32(
  1300. math.MaxInt32,
  1301. )
  1302. minimal := uint32(
  1303. 0,
  1304. )
  1305. if GFcp.updated == 0 {
  1306. return current
  1307. }
  1308. if _itimediff(
  1309. current,
  1310. tsFlush,
  1311. ) >= 10000 ||
  1312. _itimediff(
  1313. current,
  1314. tsFlush,
  1315. ) < -10000 {
  1316. tsFlush = current
  1317. }
  1318. if _itimediff(
  1319. current,
  1320. tsFlush,
  1321. ) >= 0 {
  1322. return current
  1323. }
  1324. tmFlush = _itimediff(
  1325. tsFlush,
  1326. current,
  1327. )
  1328. for k := range GFcp.SndBuf {
  1329. GFcpSeg := &GFcp.SndBuf[k]
  1330. diff := _itimediff(
  1331. GFcpSeg.GFcpResendTs,
  1332. current,
  1333. )
  1334. if diff <= 0 {
  1335. return current
  1336. }
  1337. if diff < tmPacket {
  1338. tmPacket = diff
  1339. }
  1340. }
  1341. minimal = uint32(
  1342. tmPacket,
  1343. )
  1344. if tmPacket >= tmFlush {
  1345. minimal = uint32(
  1346. tmFlush,
  1347. )
  1348. }
  1349. if minimal >= GFcp.interval {
  1350. minimal = GFcp.interval
  1351. }
  1352. return current + minimal
  1353. }
  1354. // SetMtu changes MTU size.
  1355. func (
  1356. GFcp *GFCP,
  1357. ) SetMtu(
  1358. mtu int,
  1359. ) int {
  1360. if mtu < 50 || mtu < GfcpOverhead {
  1361. return -1
  1362. }
  1363. if GFcp.reserved >= int(
  1364. GFcp.mtu-GfcpOverhead,
  1365. ) || GFcp.reserved < 0 {
  1366. return -1
  1367. }
  1368. buffer := make(
  1369. []byte,
  1370. mtu,
  1371. )
  1372. /*if buffer == nil {
  1373. return -2
  1374. }*/ // XXX(jhj): buffer can't be nil?
  1375. GFcp.mtu = uint32(
  1376. mtu,
  1377. )
  1378. GFcp.mss = GFcp.mtu - GfcpOverhead - uint32(
  1379. GFcp.reserved,
  1380. )
  1381. GFcp.buffer = buffer
  1382. return 0
  1383. }
  1384. // NoDelay options:
  1385. // * fastest: gfcp_nodelay(GFcp, 1, 20, 2, 1)
  1386. // * nodelay: 0: disable (default), 1: enable
  1387. // * interval: internal update timer interval in milliseconds, defaults to 100ms
  1388. // * resend: 0: disable fast resends (default), 1: enable fast resends
  1389. // * nc: 0: normal congestion control (default), 1: disable congestion control
  1390. func (
  1391. GFcp *GFCP,
  1392. ) NoDelay(
  1393. nodelay,
  1394. interval,
  1395. resend,
  1396. nc int,
  1397. ) int {
  1398. if nodelay >= 0 {
  1399. GFcp.nodelay = uint32(
  1400. nodelay,
  1401. )
  1402. if nodelay != 0 {
  1403. GFcp.rxMinRto = GfcpRtoNdl
  1404. } else {
  1405. GFcp.rxMinRto = GfcpRtoMin
  1406. }
  1407. }
  1408. if interval >= 0 {
  1409. if interval > 5000 {
  1410. interval = 5000
  1411. } else if interval < 10 {
  1412. interval = 10
  1413. }
  1414. GFcp.interval = uint32(
  1415. interval,
  1416. )
  1417. }
  1418. if resend >= 0 {
  1419. GFcp.fastresend = int32(
  1420. resend,
  1421. )
  1422. }
  1423. if nc >= 0 {
  1424. GFcp.nocwnd = int32(
  1425. nc,
  1426. )
  1427. }
  1428. return 0
  1429. }
  1430. // WndSize sets maximum window size (efaults: sndwnd=32 and rcvwnd=32)
  1431. func (
  1432. GFcp *GFCP,
  1433. ) WndSize(
  1434. sndwnd,
  1435. rcvwnd int,
  1436. ) int {
  1437. if sndwnd > 0 {
  1438. GFcp.sndWnd = uint32(
  1439. sndwnd,
  1440. )
  1441. }
  1442. if rcvwnd > 0 {
  1443. GFcp.rcvWnd = uint32(
  1444. rcvwnd,
  1445. )
  1446. }
  1447. return 0
  1448. }
  1449. // WaitSnd shows how many packets are queued to be sent
  1450. func (
  1451. GFcp *GFCP,
  1452. ) WaitSnd() int {
  1453. return len(
  1454. GFcp.SndBuf,
  1455. ) + len(
  1456. GFcp.sndQueue,
  1457. )
  1458. }
  1459. func (
  1460. GFcp *GFCP,
  1461. ) removeFront(
  1462. q []Segment,
  1463. n int,
  1464. ) []Segment {
  1465. if n > cap(
  1466. q,
  1467. )/2 {
  1468. newn := copy(
  1469. q,
  1470. q[n:],
  1471. )
  1472. return q[:newn]
  1473. }
  1474. return q[n:]
  1475. }
  1476. func init() {
  1477. debug.SetGCPercent(
  1478. 180,
  1479. )
  1480. gfcpLegal.RegisterLicense(
  1481. "\nThe MIT License (MIT)\n\nCopyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.\nCopyright © 2015 Daniel Fu <daniel820313@gmail.com>.\nCopyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.\nCopyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the \"Software\"), to deal\nin the Software without restriction, including, without limitation, the rights\nto use, copy, modify, merge, publish, distribute, sub-license, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice, and this permission notice, shall be\nincluded in all copies, or substantial portions, of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING, BUT NOT LIMITED TO, THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\nOUT OF, OR IN CONNECTION WITH THE SOFTWARE, OR THE USE OR OTHER DEALINGS IN\nTHE SOFTWARE.\n",
  1482. )
  1483. }