gfcp_sess.go 22 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. // Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
  2. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  3. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  4. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package gfcp
  11. import (
  12. "crypto/rand"
  13. "encoding/binary"
  14. "fmt"
  15. "net"
  16. "sync"
  17. "sync/atomic"
  18. "time"
  19. "github.com/pkg/errors"
  20. "golang.org/x/net/ipv4"
  21. "golang.org/x/net/ipv6"
  22. )
  23. type errTimeout struct {
  24. error
  25. }
  26. func (
  27. errTimeout,
  28. ) Timeout() bool {
  29. return true
  30. }
  31. func (
  32. errTimeout,
  33. ) Temporary() bool {
  34. return true
  35. }
  36. func (
  37. errTimeout,
  38. ) Error() string {
  39. return "i/o timeout"
  40. }
  41. const (
  42. // GFcpMtuLimit ...
  43. GFcpMtuLimit = 9000
  44. rxFECMulti = 3
  45. acceptBacklog = 1024
  46. )
  47. const (
  48. errBrokenPipe = "broken pipe"
  49. errInvalidOperation = "invalid operation"
  50. )
  51. // KxmitBuf ...
  52. var KxmitBuf sync.Pool
  53. func init() {
  54. KxmitBuf.New = func() interface{} {
  55. return make(
  56. []byte,
  57. GFcpMtuLimit,
  58. )
  59. }
  60. }
  61. type (
  62. // UDPSession ...
  63. UDPSession struct {
  64. updaterIdx int // record slice index in updater
  65. conn net.PacketConn // the underlying packet connection
  66. GFcp *GFCP // GFCP ARQ protocol
  67. l *Listener // pointing to the Listener object if it's been accepted by a Listener
  68. recvbuf []byte
  69. bufptr []byte
  70. // FecDecoder ...
  71. FecDecoder *FecDecoder
  72. // FecEncoder ...
  73. FecEncoder *FecEncoder
  74. remote net.Addr // remote peer address
  75. rd time.Time // read deadline
  76. wd time.Time // write deadline
  77. headerSize int // the header size additional to a GFCP frame
  78. ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
  79. writeDelay bool // delay GFcp.flush() for Write() for bulk transfer
  80. dup int // duplicate udp packets(testing purpose)
  81. die chan struct{} // notify current session has Closed
  82. chReadEvent chan struct{} // notify Read() can be called without blocking
  83. chWriteEvent chan struct{} // notify Write() can be called without blocking
  84. chReadError chan error // notify PacketConn.Read() have an error
  85. chWriteError chan error // notify PacketConn.Write() have an error
  86. nonce Entropy
  87. isClosed bool // flag the session has Closed
  88. mu sync.Mutex
  89. }
  90. setReadBuffer interface {
  91. SetReadBuffer(
  92. bytes int,
  93. ) error
  94. }
  95. setWriteBuffer interface {
  96. SetWriteBuffer(
  97. bytes int,
  98. ) error
  99. }
  100. )
  101. // newUDPSession creates a new UDP session (client or server)
  102. func newUDPSession(
  103. conv uint32,
  104. dataShards,
  105. parityShards int,
  106. l *Listener,
  107. conn net.PacketConn,
  108. remote net.Addr,
  109. ) *UDPSession {
  110. sess := new(
  111. UDPSession,
  112. )
  113. sess.die = make(
  114. chan struct{},
  115. )
  116. sess.nonce = new(
  117. Nonce,
  118. )
  119. sess.nonce.Init()
  120. sess.chReadEvent = make(
  121. chan struct{},
  122. 1,
  123. )
  124. sess.chWriteEvent = make(
  125. chan struct{},
  126. 1,
  127. )
  128. sess.chReadError = make(
  129. chan error,
  130. 1,
  131. )
  132. sess.chWriteError = make(
  133. chan error,
  134. 1,
  135. )
  136. sess.remote = remote
  137. sess.conn = conn
  138. sess.l = l
  139. sess.recvbuf = make(
  140. []byte,
  141. GFcpMtuLimit,
  142. )
  143. sess.FecDecoder = NewFECDecoder(
  144. rxFECMulti*(dataShards+parityShards),
  145. dataShards,
  146. parityShards,
  147. )
  148. sess.FecEncoder = NewFECEncoder(
  149. dataShards,
  150. parityShards,
  151. 0,
  152. )
  153. if sess.FecEncoder != nil {
  154. sess.headerSize += fecHeaderSizePlus2
  155. }
  156. sess.GFcp = NewGFCP(conv, func(
  157. buf []byte,
  158. size int,
  159. ) {
  160. if size >= GfcpOverhead+sess.headerSize {
  161. sess.output(
  162. buf[:size],
  163. )
  164. }
  165. })
  166. sess.GFcp.ReserveBytes(
  167. sess.headerSize,
  168. )
  169. updater.addSession(
  170. sess,
  171. )
  172. if sess.l == nil {
  173. go sess.readLoop()
  174. atomic.AddUint64(
  175. &DefaultSnsi.GFcpActiveOpen,
  176. 1,
  177. )
  178. } else {
  179. atomic.AddUint64(
  180. &DefaultSnsi.GFcpPassiveOpen,
  181. 1,
  182. )
  183. }
  184. currestab := atomic.AddUint64(
  185. &DefaultSnsi.GFcpNowEstablished,
  186. 1,
  187. )
  188. maxconn := atomic.LoadUint64(
  189. &DefaultSnsi.GFcpMaxConn,
  190. )
  191. if currestab > maxconn {
  192. atomic.CompareAndSwapUint64(
  193. &DefaultSnsi.GFcpMaxConn,
  194. maxconn,
  195. currestab,
  196. )
  197. }
  198. return sess
  199. }
  200. // Read implements net.Conn
  201. // Function is safe for concurrent access.
  202. func (
  203. s *UDPSession,
  204. ) Read(
  205. b []byte,
  206. ) (
  207. n int,
  208. err error,
  209. ) {
  210. for {
  211. s.mu.Lock()
  212. if len(
  213. s.bufptr,
  214. ) > 0 {
  215. n = copy(
  216. b,
  217. s.bufptr,
  218. )
  219. s.bufptr = s.bufptr[n:]
  220. s.mu.Unlock()
  221. atomic.AddUint64(
  222. &DefaultSnsi.GFcpBytesReceived,
  223. uint64(n),
  224. )
  225. return n, nil
  226. }
  227. if s.isClosed {
  228. s.mu.Unlock()
  229. return 0, errors.New(
  230. errBrokenPipe,
  231. )
  232. }
  233. if size := s.GFcp.PeekSize(); size > 0 {
  234. if len(b) >= size {
  235. s.GFcp.Recv(
  236. b,
  237. )
  238. s.mu.Unlock()
  239. atomic.AddUint64(
  240. &DefaultSnsi.GFcpBytesReceived,
  241. uint64(size),
  242. )
  243. return size, nil
  244. }
  245. if cap(
  246. s.recvbuf,
  247. ) < size {
  248. s.recvbuf = make(
  249. []byte,
  250. size,
  251. )
  252. }
  253. s.recvbuf = s.recvbuf[:size]
  254. s.GFcp.Recv(
  255. s.recvbuf,
  256. )
  257. n = copy(
  258. b,
  259. s.recvbuf,
  260. )
  261. s.bufptr = s.recvbuf[n:]
  262. s.mu.Unlock()
  263. atomic.AddUint64(
  264. &DefaultSnsi.GFcpBytesReceived,
  265. uint64(n),
  266. )
  267. return n, nil
  268. }
  269. var timeout *time.Timer
  270. var c <-chan time.Time
  271. if !s.rd.IsZero() {
  272. if time.Now().After(
  273. s.rd,
  274. ) {
  275. s.mu.Unlock()
  276. return 0, errTimeout{}
  277. }
  278. delay := time.Until(
  279. s.rd,
  280. )
  281. timeout = time.NewTimer(
  282. delay,
  283. )
  284. c = timeout.C
  285. }
  286. s.mu.Unlock()
  287. select {
  288. case <-s.chReadEvent:
  289. case <-c:
  290. case <-s.die:
  291. case err = <-s.chReadError:
  292. if timeout != nil {
  293. timeout.Stop()
  294. }
  295. return n, err
  296. }
  297. if timeout != nil {
  298. timeout.Stop()
  299. }
  300. }
  301. }
  302. func (
  303. s *UDPSession,
  304. ) Write(
  305. b []byte,
  306. ) (
  307. n int,
  308. err error,
  309. ) {
  310. return s.WriteBuffers(
  311. [][]byte{b},
  312. )
  313. }
  314. // WriteBuffers ...
  315. func (
  316. s *UDPSession,
  317. ) WriteBuffers(
  318. v [][]byte,
  319. ) (
  320. n int,
  321. err error,
  322. ) {
  323. for {
  324. s.mu.Lock()
  325. if s.isClosed {
  326. s.mu.Unlock()
  327. return 0,
  328. errors.New(
  329. errBrokenPipe,
  330. )
  331. }
  332. if s.GFcp.WaitSnd() < int(s.GFcp.sndWnd) {
  333. for _, b := range v {
  334. n += len(
  335. b)
  336. for {
  337. if len(
  338. b,
  339. ) <= int(
  340. s.GFcp.mss,
  341. ) {
  342. s.GFcp.Send(
  343. b,
  344. )
  345. break
  346. }
  347. s.GFcp.Send(
  348. b[:s.GFcp.mss],
  349. )
  350. b = b[s.GFcp.mss:]
  351. }
  352. }
  353. if s.GFcp.WaitSnd() >= int(
  354. s.GFcp.sndWnd,
  355. ) || !s.writeDelay {
  356. s.GFcp.Flush(
  357. false,
  358. )
  359. }
  360. s.mu.Unlock()
  361. atomic.AddUint64(
  362. &DefaultSnsi.GFcpBytesSent,
  363. uint64(
  364. n,
  365. ),
  366. )
  367. return n, nil
  368. }
  369. var timeout *time.Timer
  370. var c <-chan time.Time
  371. if !s.wd.IsZero() {
  372. if time.Now().After(
  373. s.wd,
  374. ) {
  375. s.mu.Unlock()
  376. return 0, errTimeout{}
  377. }
  378. delay := time.Until(
  379. s.wd,
  380. )
  381. timeout = time.NewTimer(
  382. delay,
  383. )
  384. c = timeout.C
  385. }
  386. s.mu.Unlock()
  387. select {
  388. case <-s.chWriteEvent:
  389. case <-c:
  390. case <-s.die:
  391. case err = <-s.chWriteError:
  392. if timeout != nil {
  393. timeout.Stop()
  394. }
  395. return n, err
  396. }
  397. if timeout != nil {
  398. timeout.Stop()
  399. }
  400. }
  401. }
  402. // Close ...
  403. func (
  404. s *UDPSession,
  405. ) Close() error {
  406. updater.removeSession(
  407. s,
  408. )
  409. if s.l != nil {
  410. s.l.CloseSession(
  411. s.remote,
  412. )
  413. }
  414. s.mu.Lock()
  415. defer s.mu.Unlock()
  416. if s.isClosed {
  417. return errors.New(
  418. errBrokenPipe,
  419. )
  420. }
  421. close(
  422. s.die,
  423. )
  424. s.isClosed = true
  425. atomic.AddUint64(
  426. &DefaultSnsi.GFcpNowEstablished,
  427. ^uint64(
  428. 0,
  429. ),
  430. )
  431. if s.l == nil {
  432. return s.conn.Close()
  433. }
  434. return nil
  435. }
  436. // LocalAddr returns the local network address.
  437. // The address returned is shared by all invocations of LocalAddr - do not modify it.
  438. func (
  439. s *UDPSession,
  440. ) LocalAddr() net.Addr {
  441. return s.conn.LocalAddr()
  442. }
  443. // RemoteAddr returns the remote network address.
  444. // The address returned is shared by all invocations of RemoteAddr - do not modify it.
  445. func (
  446. s *UDPSession,
  447. ) RemoteAddr() net.Addr {
  448. return s.remote
  449. }
  450. // SetDeadline sets a deadline associated with the listener.
  451. // A zero time value disables a deadline.
  452. func (
  453. s *UDPSession,
  454. ) SetDeadline(
  455. t time.Time,
  456. ) error {
  457. s.mu.Lock()
  458. defer s.mu.Unlock()
  459. s.rd = t
  460. s.wd = t
  461. s.notifyReadEvent()
  462. s.notifyWriteEvent()
  463. return nil
  464. }
  465. // SetReadDeadline implements the Conn SetReadDeadline method.
  466. func (
  467. s *UDPSession,
  468. ) SetReadDeadline(
  469. t time.Time,
  470. ) error {
  471. s.mu.Lock()
  472. defer s.mu.Unlock()
  473. s.rd = t
  474. s.notifyReadEvent()
  475. return nil
  476. }
  477. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  478. func (
  479. s *UDPSession,
  480. ) SetWriteDeadline(
  481. t time.Time,
  482. ) error {
  483. s.mu.Lock()
  484. defer s.mu.Unlock()
  485. s.wd = t
  486. s.notifyWriteEvent()
  487. return nil
  488. }
  489. // SetWriteDelay delays writes for bulk transfers, until the next update interval.
  490. func (
  491. s *UDPSession,
  492. ) SetWriteDelay(
  493. delay bool,
  494. ) {
  495. s.mu.Lock()
  496. defer s.mu.Unlock()
  497. s.writeDelay = delay
  498. }
  499. // SetWindowSize sets the maximum window size
  500. func (
  501. s *UDPSession,
  502. ) SetWindowSize(
  503. sndwnd,
  504. rcvwnd int,
  505. ) {
  506. s.mu.Lock()
  507. defer s.mu.Unlock()
  508. s.GFcp.WndSize(
  509. sndwnd,
  510. rcvwnd,
  511. )
  512. }
  513. // SetMtu sets the maximum transmission unit
  514. // This size does not including UDP header itself.
  515. func (
  516. s *UDPSession,
  517. ) SetMtu(
  518. mtu int,
  519. ) bool {
  520. if mtu > GFcpMtuLimit {
  521. return false
  522. }
  523. s.mu.Lock()
  524. defer s.mu.Unlock()
  525. s.GFcp.SetMtu(
  526. mtu,
  527. )
  528. return true
  529. }
  530. // SetStreamMode toggles the streaming mode on or off
  531. func (s *UDPSession) SetStreamMode(
  532. enable bool,
  533. ) {
  534. s.mu.Lock()
  535. defer s.mu.Unlock()
  536. if enable {
  537. s.GFcp.stream = 1
  538. } else {
  539. s.GFcp.stream = 0
  540. }
  541. }
  542. // SetACKNoDelay changes the ACK flushing option.
  543. // If set to true, ACKs are flusghed immediately,
  544. func (
  545. s *UDPSession,
  546. ) SetACKNoDelay(
  547. nodelay bool,
  548. ) {
  549. s.mu.Lock()
  550. defer s.mu.Unlock()
  551. s.ackNoDelay = nodelay
  552. }
  553. // SetDUP duplicates UDP packets for GFcp output.
  554. // Useful for testing, not for normal use.
  555. func (
  556. s *UDPSession,
  557. ) SetDUP(
  558. dup int,
  559. ) {
  560. s.mu.Lock()
  561. defer s.mu.Unlock()
  562. s.dup = dup
  563. }
  564. // SetNoDelay sets TCP_DELAY, for GFcp.
  565. func (
  566. s *UDPSession,
  567. ) SetNoDelay(
  568. nodelay,
  569. interval,
  570. resend,
  571. nc int,
  572. ) {
  573. s.mu.Lock()
  574. defer s.mu.Unlock()
  575. s.GFcp.NoDelay(
  576. nodelay,
  577. interval,
  578. resend,
  579. nc,
  580. )
  581. }
  582. // SetDSCP sets the 6-bit DSCP field of IP header.
  583. // Has no effect, unless accepted by your Listener.
  584. func (
  585. s *UDPSession,
  586. ) SetDSCP(
  587. dscp int,
  588. ) error {
  589. s.mu.Lock()
  590. defer s.mu.Unlock()
  591. if s.l == nil {
  592. if nc, ok := s.conn.(net.Conn); ok {
  593. addr, _ := net.ResolveUDPAddr(
  594. "udp",
  595. nc.LocalAddr().String(),
  596. )
  597. if addr.IP.To4() != nil {
  598. return ipv4.NewConn(
  599. nc,
  600. ).SetTOS(
  601. dscp << 2,
  602. )
  603. }
  604. return ipv6.NewConn(
  605. nc,
  606. ).SetTrafficClass(
  607. dscp,
  608. )
  609. }
  610. }
  611. return errors.New(
  612. errInvalidOperation,
  613. )
  614. }
  615. // SetReadBuffer sets the socket read buffer.
  616. // Has no effect, unless it's accepted by your Listener.
  617. func (
  618. s *UDPSession,
  619. ) SetReadBuffer(
  620. bytes int,
  621. ) error {
  622. s.mu.Lock()
  623. defer s.mu.Unlock()
  624. if s.l == nil {
  625. if nc, ok := s.conn.(setReadBuffer); ok {
  626. return nc.SetReadBuffer(
  627. bytes,
  628. )
  629. }
  630. }
  631. return errors.New(
  632. errInvalidOperation,
  633. )
  634. }
  635. // SetWriteBuffer sets the socket write buffer.
  636. // Has no effect, unless it's accepted by your Listener.
  637. func (
  638. s *UDPSession,
  639. ) SetWriteBuffer(
  640. bytes int,
  641. ) error {
  642. s.mu.Lock()
  643. defer s.mu.Unlock()
  644. if s.l == nil {
  645. if nc, ok := s.conn.(setWriteBuffer); ok {
  646. return nc.SetWriteBuffer(
  647. bytes,
  648. )
  649. }
  650. }
  651. return errors.New(
  652. errInvalidOperation,
  653. )
  654. }
  655. func (
  656. s *UDPSession,
  657. ) output(
  658. buf []byte,
  659. ) {
  660. var ecc [][]byte
  661. if s.FecEncoder != nil {
  662. ecc = s.FecEncoder.Encode(
  663. buf,
  664. )
  665. }
  666. nbytes := 0
  667. npkts := 0
  668. for i := 0; i < s.dup+1; i++ {
  669. if n, err := s.conn.WriteTo(
  670. buf,
  671. s.remote,
  672. ); err == nil {
  673. nbytes += n
  674. npkts++
  675. } else {
  676. s.notifyWriteError(
  677. err,
  678. )
  679. }
  680. }
  681. for k := range ecc {
  682. if n, err := s.conn.WriteTo(
  683. ecc[k],
  684. s.remote,
  685. ); err == nil {
  686. nbytes += n
  687. npkts++
  688. } else {
  689. s.notifyWriteError(
  690. err,
  691. )
  692. }
  693. }
  694. atomic.AddUint64(
  695. &DefaultSnsi.GFcpOutputPackets,
  696. uint64(
  697. npkts,
  698. ),
  699. )
  700. atomic.AddUint64(
  701. &DefaultSnsi.GFcpOutputBytes,
  702. uint64(
  703. nbytes,
  704. ),
  705. )
  706. }
  707. func (
  708. s *UDPSession,
  709. ) update() (
  710. interval time.Duration,
  711. ) {
  712. s.mu.Lock()
  713. waitsnd := s.GFcp.WaitSnd()
  714. interval = time.Duration(
  715. s.GFcp.Flush(
  716. false,
  717. ),
  718. ) * time.Millisecond
  719. if s.GFcp.WaitSnd() < waitsnd {
  720. s.notifyWriteEvent()
  721. }
  722. s.mu.Unlock()
  723. return
  724. }
  725. // GetConv ...
  726. func (
  727. s *UDPSession,
  728. ) GetConv() uint32 {
  729. return s.GFcp.conv
  730. }
  731. func (
  732. s *UDPSession,
  733. ) notifyReadEvent() {
  734. select {
  735. case s.chReadEvent <- struct{}{}:
  736. default:
  737. }
  738. }
  739. func (
  740. s *UDPSession,
  741. ) notifyWriteEvent() {
  742. select {
  743. case s.chWriteEvent <- struct{}{}:
  744. default:
  745. }
  746. }
  747. func (
  748. s *UDPSession,
  749. ) notifyWriteError(
  750. err error,
  751. ) {
  752. select {
  753. case s.chWriteError <- err:
  754. default:
  755. }
  756. }
  757. func (
  758. s *UDPSession,
  759. ) packetInput(
  760. data []byte,
  761. ) {
  762. s.GFcpInput(
  763. data,
  764. )
  765. }
  766. // GFcpInput ...
  767. func (
  768. s *UDPSession,
  769. ) GFcpInput(
  770. data []byte,
  771. ) {
  772. var GFcpInErrors,
  773. fecErrs,
  774. fecRecovered,
  775. fecParityShards uint64
  776. if s.FecDecoder != nil {
  777. if len(
  778. data,
  779. ) > fecHeaderSize {
  780. f := FecPacket(
  781. data,
  782. )
  783. if f.flag() == KTypeData || f.flag() == KTypeParity {
  784. if f.flag() == KTypeParity {
  785. fecParityShards++
  786. }
  787. s.mu.Lock()
  788. recovers := s.FecDecoder.Decode(
  789. f,
  790. )
  791. waitsnd := s.GFcp.WaitSnd()
  792. if f.flag() == KTypeData {
  793. if ret := s.GFcp.Input(
  794. data[fecHeaderSizePlus2:],
  795. true,
  796. s.ackNoDelay,
  797. ); ret != 0 {
  798. GFcpInErrors++
  799. }
  800. }
  801. for _, r := range recovers {
  802. if len(
  803. r,
  804. ) >= 2 {
  805. sz := binary.LittleEndian.Uint16(
  806. r,
  807. )
  808. if int(
  809. sz,
  810. ) <= len(
  811. r,
  812. ) && sz >= 2 {
  813. if ret := s.GFcp.Input(
  814. r[2:sz],
  815. false,
  816. s.ackNoDelay,
  817. ); ret == 0 {
  818. fecRecovered++
  819. } else {
  820. GFcpInErrors++
  821. }
  822. } else {
  823. fecErrs++
  824. }
  825. } else {
  826. fecErrs++
  827. }
  828. // TODO(jhj): Switch to pointer to avoid allocation.
  829. KxmitBuf.Put(
  830. r,
  831. )
  832. }
  833. if n := s.GFcp.PeekSize(); n > 0 {
  834. s.notifyReadEvent()
  835. }
  836. if s.GFcp.WaitSnd() < waitsnd {
  837. s.notifyWriteEvent()
  838. }
  839. s.mu.Unlock()
  840. } else {
  841. atomic.AddUint64(
  842. &DefaultSnsi.GFcpPreInputErrors,
  843. 1,
  844. )
  845. }
  846. } else {
  847. atomic.AddUint64(
  848. &DefaultSnsi.GFcpInputErrors,
  849. 1,
  850. )
  851. }
  852. } else {
  853. s.mu.Lock()
  854. waitsnd := s.GFcp.WaitSnd()
  855. if ret := s.GFcp.Input(
  856. data,
  857. true,
  858. s.ackNoDelay,
  859. ); ret != 0 {
  860. GFcpInErrors++
  861. }
  862. if n := s.GFcp.PeekSize(); n > 0 {
  863. s.notifyReadEvent()
  864. }
  865. if s.GFcp.WaitSnd() < waitsnd {
  866. s.notifyWriteEvent()
  867. }
  868. s.mu.Unlock()
  869. }
  870. atomic.AddUint64(
  871. &DefaultSnsi.GFcpInputPackets,
  872. 1,
  873. )
  874. atomic.AddUint64(
  875. &DefaultSnsi.GFcpInputBytes,
  876. uint64(
  877. len(
  878. data,
  879. ),
  880. ),
  881. )
  882. if fecParityShards > 0 {
  883. atomic.AddUint64(
  884. &DefaultSnsi.GFcpFECParityShards,
  885. fecParityShards,
  886. )
  887. }
  888. if GFcpInErrors > 0 {
  889. atomic.AddUint64(
  890. &DefaultSnsi.GFcpInputErrors,
  891. GFcpInErrors,
  892. )
  893. }
  894. if fecErrs > 0 {
  895. atomic.AddUint64(
  896. &DefaultSnsi.GFcpFailures,
  897. fecErrs,
  898. )
  899. }
  900. if fecRecovered > 0 {
  901. atomic.AddUint64(
  902. &DefaultSnsi.GFcpFECRecovered,
  903. fecRecovered,
  904. )
  905. }
  906. }
  907. type (
  908. // Listener ...
  909. Listener struct {
  910. dataShards int // FEC data shard
  911. parityShards int // FEC parity shard
  912. /// FecDecoder ...
  913. FecDecoder *FecDecoder // FEC mock initialization
  914. conn net.PacketConn // the underlying packet connection
  915. sessions map[string]*UDPSession // all sessions accepted by this Listener
  916. sessionLock sync.Mutex
  917. chAccepts chan *UDPSession // Listen() backlog
  918. chSessionClosed chan net.Addr // session close queue
  919. headerSize int // additional header for a GFcp frame
  920. die chan struct{} // notify when the Listener has closed
  921. rd atomic.Value // read deadline for Accept()
  922. wd atomic.Value
  923. }
  924. )
  925. func (
  926. l *Listener,
  927. ) packetInput(
  928. data []byte,
  929. addr net.Addr,
  930. ) {
  931. l.sessionLock.Lock()
  932. s, ok := l.sessions[addr.String()]
  933. l.sessionLock.Unlock()
  934. if !ok {
  935. if len(
  936. l.chAccepts,
  937. ) < cap(
  938. l.chAccepts,
  939. ) {
  940. var conv uint32
  941. convValid := false
  942. if l.FecDecoder != nil {
  943. isfec := binary.LittleEndian.Uint16(
  944. data[4:],
  945. )
  946. if isfec == KTypeData {
  947. conv = binary.LittleEndian.Uint32(
  948. data[fecHeaderSizePlus2:],
  949. )
  950. convValid = true
  951. }
  952. } else {
  953. conv = binary.LittleEndian.Uint32(
  954. data,
  955. )
  956. convValid = true
  957. }
  958. if convValid {
  959. s := newUDPSession(
  960. conv,
  961. l.dataShards,
  962. l.parityShards,
  963. l,
  964. l.conn,
  965. addr,
  966. )
  967. s.GFcpInput(
  968. data,
  969. )
  970. l.sessionLock.Lock()
  971. l.sessions[addr.String()] = s
  972. l.sessionLock.Unlock()
  973. l.chAccepts <- s
  974. }
  975. }
  976. } else {
  977. s.GFcpInput(
  978. data,
  979. )
  980. }
  981. }
  982. // SetReadBuffer sets the socket read buffer for the Listener.
  983. func (
  984. l *Listener,
  985. ) SetReadBuffer(
  986. bytes int,
  987. ) error {
  988. if nc, ok := l.conn.(setReadBuffer); ok {
  989. return nc.SetReadBuffer(
  990. bytes,
  991. )
  992. }
  993. return errors.New(
  994. errInvalidOperation,
  995. )
  996. }
  997. // SetWriteBuffer sets the socket write buffer for the Listener.
  998. func (
  999. l *Listener,
  1000. ) SetWriteBuffer(
  1001. bytes int,
  1002. ) error {
  1003. if nc, ok := l.conn.(setWriteBuffer); ok {
  1004. return nc.SetWriteBuffer(
  1005. bytes,
  1006. )
  1007. }
  1008. return errors.New(
  1009. errInvalidOperation,
  1010. )
  1011. }
  1012. // SetDSCP sets the 6-bit DSCP field of IP header.
  1013. func (
  1014. l *Listener,
  1015. ) SetDSCP(
  1016. dscp int,
  1017. ) error {
  1018. if nc, ok := l.conn.(net.Conn); ok {
  1019. addr, _ := net.ResolveUDPAddr(
  1020. "udp",
  1021. nc.LocalAddr().String(),
  1022. )
  1023. if addr.IP.To4() != nil {
  1024. return ipv4.NewConn(
  1025. nc,
  1026. ).SetTOS(
  1027. dscp << 2,
  1028. )
  1029. }
  1030. return ipv6.NewConn(
  1031. nc,
  1032. ).SetTrafficClass(
  1033. dscp,
  1034. )
  1035. }
  1036. return errors.New(
  1037. errInvalidOperation,
  1038. )
  1039. }
  1040. // Accept implements the Accept method in the Listener interface.
  1041. // It waits until the next call, then returns a generic 'Conn'.
  1042. func (
  1043. l *Listener,
  1044. ) Accept() (
  1045. net.Conn,
  1046. error,
  1047. ) {
  1048. return l.AcceptGFCP()
  1049. }
  1050. // AcceptGFCP accepts a GFcp connection
  1051. func (
  1052. l *Listener,
  1053. ) AcceptGFCP() (
  1054. *UDPSession,
  1055. error,
  1056. ) {
  1057. var timeout <-chan time.Time
  1058. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  1059. timeout = time.After(
  1060. time.Since(
  1061. tdeadline,
  1062. ),
  1063. )
  1064. }
  1065. select {
  1066. case <-timeout:
  1067. return nil, &errTimeout{}
  1068. case c := <-l.chAccepts:
  1069. return c, nil
  1070. case <-l.die:
  1071. return nil, errors.New(
  1072. errBrokenPipe,
  1073. )
  1074. }
  1075. }
  1076. // SetDeadline sets the deadline associated with the Listener.
  1077. // A zero value will disable all deadlines.
  1078. func (
  1079. l *Listener,
  1080. ) SetDeadline(
  1081. t time.Time,
  1082. ) error {
  1083. var err error
  1084. err = l.SetReadDeadline(
  1085. t,
  1086. )
  1087. if err != nil {
  1088. panic(
  1089. fmt.Sprintf(
  1090. "SetReadDeadLine failure: %v",
  1091. err,
  1092. ),
  1093. )
  1094. }
  1095. err = l.SetWriteDeadline(
  1096. t,
  1097. )
  1098. if err != nil {
  1099. panic(
  1100. fmt.Sprintf(
  1101. "SetWriteDeadline failure: %v",
  1102. err,
  1103. ),
  1104. )
  1105. }
  1106. return nil
  1107. }
  1108. // SetReadDeadline implements the Conn SetReadDeadline method.
  1109. func (
  1110. l *Listener,
  1111. ) SetReadDeadline(
  1112. t time.Time,
  1113. ) error {
  1114. l.rd.Store(
  1115. t,
  1116. )
  1117. return nil
  1118. }
  1119. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  1120. func (
  1121. l *Listener,
  1122. ) SetWriteDeadline(
  1123. t time.Time,
  1124. ) error {
  1125. l.wd.Store(
  1126. t,
  1127. )
  1128. return nil
  1129. }
  1130. // Close stops listening on the UDP address.
  1131. // Any already accepted connections will not be closed.
  1132. func (
  1133. l *Listener,
  1134. ) Close() error {
  1135. close(
  1136. l.die,
  1137. )
  1138. return l.conn.Close()
  1139. }
  1140. // CloseSession notifies the Listener when a Session is Closed.
  1141. func (
  1142. l *Listener,
  1143. ) CloseSession(
  1144. remote net.Addr,
  1145. ) (
  1146. ret bool,
  1147. ) {
  1148. l.sessionLock.Lock()
  1149. defer l.sessionLock.Unlock()
  1150. if _, ok := l.sessions[remote.String()]; ok {
  1151. delete(
  1152. l.sessions,
  1153. remote.String(),
  1154. )
  1155. return true
  1156. }
  1157. return false
  1158. }
  1159. // Addr returns the listener's network address.
  1160. // The address returned is shared by all invocations of Addr - do not modify it.
  1161. func (
  1162. l *Listener,
  1163. ) Addr() net.Addr {
  1164. return l.conn.LocalAddr()
  1165. }
  1166. // Listen listens for incoming GFcp packets addressed to our local address (laddr) via "udp"
  1167. func Listen(
  1168. laddr string,
  1169. ) (
  1170. net.Listener,
  1171. error,
  1172. ) {
  1173. return ListenWithOptions(
  1174. laddr,
  1175. 0,
  1176. 0,
  1177. )
  1178. }
  1179. // ListenWithOptions listens for incoming GFcp packets addressed to our local address (laddr) via "udp"
  1180. // Porvides for encryption, sharding, parity, and RS coding parameters to be specified.
  1181. func ListenWithOptions(
  1182. laddr string,
  1183. dataShards,
  1184. parityShards int,
  1185. ) (
  1186. *Listener,
  1187. error,
  1188. ) {
  1189. udpaddr,
  1190. err := net.ResolveUDPAddr(
  1191. "udp",
  1192. laddr,
  1193. )
  1194. if err != nil {
  1195. return nil,
  1196. errors.Wrap(
  1197. err,
  1198. "net.ResolveUDPAddr",
  1199. )
  1200. }
  1201. conn, err := net.ListenUDP(
  1202. "udp",
  1203. udpaddr,
  1204. )
  1205. if err != nil {
  1206. return nil,
  1207. errors.Wrap(
  1208. err,
  1209. "net.ListenUDP",
  1210. )
  1211. }
  1212. return ServeConn(
  1213. dataShards,
  1214. parityShards,
  1215. conn,
  1216. )
  1217. }
  1218. // ServeConn serves the GFcp protocol - a single packet is processed.
  1219. func ServeConn(
  1220. dataShards,
  1221. parityShards int,
  1222. conn net.PacketConn,
  1223. ) (
  1224. *Listener,
  1225. error,
  1226. ) {
  1227. l := new(
  1228. Listener,
  1229. )
  1230. l.conn = conn
  1231. l.sessions = make(
  1232. map[string]*UDPSession,
  1233. )
  1234. l.chAccepts = make(
  1235. chan *UDPSession,
  1236. acceptBacklog,
  1237. )
  1238. l.chSessionClosed = make(
  1239. chan net.Addr,
  1240. )
  1241. l.die = make(
  1242. chan struct{},
  1243. )
  1244. l.dataShards = dataShards
  1245. l.parityShards = parityShards
  1246. l.FecDecoder = NewFECDecoder(
  1247. rxFECMulti*(dataShards+parityShards),
  1248. dataShards,
  1249. parityShards,
  1250. )
  1251. if l.FecDecoder != nil {
  1252. l.headerSize += fecHeaderSizePlus2
  1253. }
  1254. go l.monitor()
  1255. return l, nil
  1256. }
  1257. // Dial connects to the remote address "raddr" via "udp"
  1258. func Dial(
  1259. raddr string,
  1260. ) (
  1261. net.Conn,
  1262. error,
  1263. ) {
  1264. return DialWithOptions(
  1265. raddr,
  1266. 0,
  1267. 0,
  1268. )
  1269. }
  1270. // DialWithOptions connects to the remote address "raddr" via "udp" with encryption options.
  1271. func DialWithOptions(
  1272. raddr string,
  1273. dataShards,
  1274. parityShards int,
  1275. ) (
  1276. *UDPSession,
  1277. error,
  1278. ) {
  1279. udpaddr, err := net.ResolveUDPAddr(
  1280. "udp",
  1281. raddr,
  1282. )
  1283. if err != nil {
  1284. return nil, errors.Wrap(
  1285. err,
  1286. "net.ResolveUDPAddr",
  1287. )
  1288. }
  1289. network := "udp4"
  1290. if udpaddr.IP.To4() == nil {
  1291. network = "udp"
  1292. }
  1293. conn, err := net.ListenUDP(
  1294. network,
  1295. nil,
  1296. )
  1297. if err != nil {
  1298. return nil, errors.Wrap(
  1299. err,
  1300. "net.DialUDP",
  1301. )
  1302. }
  1303. return NewConn(
  1304. raddr,
  1305. dataShards,
  1306. parityShards,
  1307. conn,
  1308. )
  1309. }
  1310. // NewConn establishes a session, talking GFcp over a packet connection.
  1311. func NewConn(
  1312. raddr string,
  1313. dataShards,
  1314. parityShards int,
  1315. conn net.PacketConn,
  1316. ) (
  1317. *UDPSession,
  1318. error,
  1319. ) {
  1320. udpaddr, err := net.ResolveUDPAddr(
  1321. "udp",
  1322. raddr,
  1323. )
  1324. if err != nil {
  1325. return nil, errors.Wrap(
  1326. err,
  1327. "net.ResolveUDPAddr",
  1328. )
  1329. }
  1330. var convid uint32
  1331. err = binary.Read(
  1332. rand.Reader,
  1333. binary.LittleEndian,
  1334. &convid,
  1335. )
  1336. if err != nil {
  1337. panic(
  1338. "binary.Read failure",
  1339. )
  1340. }
  1341. return newUDPSession(
  1342. convid,
  1343. dataShards,
  1344. parityShards,
  1345. nil,
  1346. conn,
  1347. udpaddr,
  1348. ), nil
  1349. }
  1350. var refTime = time.Now()
  1351. // CurrentMs ...
  1352. func CurrentMs() uint32 {
  1353. return uint32(time.Since(refTime) / time.Millisecond)
  1354. }