lkcp9_sess.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461
  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2020 Gridfinity, LLC. <admin@gridfinity.com>.
  4. // Copyright © 2020 Jeffrey H. Johnson <jeff@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package lkcp9 // import "go.gridfinity.dev/lkcp9"
  11. import (
  12. "crypto/rand"
  13. "encoding/binary"
  14. "hash/crc32"
  15. "net"
  16. "sync"
  17. "sync/atomic"
  18. "time"
  19. "github.com/pkg/errors"
  20. "golang.org/x/net/ipv4"
  21. "golang.org/x/net/ipv6"
  22. )
  23. type errTimeout struct {
  24. error
  25. }
  26. func (
  27. errTimeout,
  28. ) Timeout() bool {
  29. return true
  30. }
  31. func (
  32. errTimeout,
  33. ) Temporary() bool {
  34. return true
  35. }
  36. func (
  37. errTimeout,
  38. ) Error() string {
  39. return "i/o timeout"
  40. }
  41. const (
  42. nonceSize = 16
  43. crcSize = 4
  44. cryptHeaderSize = nonceSize + crcSize
  45. KcpMtuLimit = 1500
  46. rxFECMulti = 3
  47. acceptBacklog = 128
  48. )
  49. const (
  50. errBrokenPipe = "broken pipe"
  51. errInvalidOperation = "invalid operation"
  52. )
  53. var KxmitBuf sync.Pool
  54. func init() {
  55. KxmitBuf.New = func() interface{} {
  56. return make(
  57. []byte,
  58. KcpMtuLimit,
  59. )
  60. }
  61. }
  62. type (
  63. UDPSession struct {
  64. updaterIdx int // record slice index in updater
  65. conn net.PacketConn // the underlying packet connection
  66. Kcp *KCP // KCP ARQ protocol
  67. l *Listener // pointing to the Listener object if it's been accepted by a Listener
  68. block BlockCrypt // block encryption object
  69. recvbuf []byte
  70. bufptr []byte
  71. FecDecoder *FecDecoder
  72. FecEncoder *FecEncoder
  73. remote net.Addr // remote peer address
  74. rd time.Time // read deadline
  75. wd time.Time // write deadline
  76. headerSize int // the header size additional to a KCP frame
  77. ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
  78. writeDelay bool // delay Kcp.flush() for Write() for bulk transfer
  79. dup int // duplicate udp packets(testing purpose)
  80. die chan struct{} // notify current session has Closed
  81. chReadEvent chan struct{} // notify Read() can be called without blocking
  82. chWriteEvent chan struct{} // notify Write() can be called without blocking
  83. chReadError chan error // notify PacketConn.Read() have an error
  84. chWriteError chan error // notify PacketConn.Write() have an error
  85. nonce Entropy
  86. isClosed bool // flag the session has Closed
  87. mu sync.Mutex
  88. }
  89. setReadBuffer interface {
  90. SetReadBuffer(
  91. bytes int,
  92. ) error
  93. }
  94. setWriteBuffer interface {
  95. SetWriteBuffer(
  96. bytes int,
  97. ) error
  98. }
  99. )
  100. // newUDPSession creates a new UDP session (client or server)
  101. func newUDPSession(
  102. conv uint32,
  103. dataShards,
  104. parityShards int,
  105. l *Listener,
  106. conn net.PacketConn,
  107. remote net.Addr,
  108. block BlockCrypt,
  109. ) *UDPSession {
  110. sess := new(
  111. UDPSession,
  112. )
  113. sess.die = make(
  114. chan struct{},
  115. )
  116. sess.nonce = new(
  117. KcpNonceAES128,
  118. )
  119. sess.nonce.Init()
  120. sess.chReadEvent = make(
  121. chan struct{},
  122. 1,
  123. )
  124. sess.chWriteEvent = make(
  125. chan struct{},
  126. 1,
  127. )
  128. sess.chReadError = make(
  129. chan error,
  130. 1,
  131. )
  132. sess.chWriteError = make(
  133. chan error,
  134. 1,
  135. )
  136. sess.remote = remote
  137. sess.conn = conn
  138. sess.l = l
  139. sess.block = block
  140. sess.recvbuf = make(
  141. []byte,
  142. KcpMtuLimit,
  143. )
  144. sess.FecDecoder = KcpNewDECDecoder(
  145. rxFECMulti*(dataShards+parityShards),
  146. dataShards,
  147. parityShards,
  148. )
  149. if sess.block != nil {
  150. sess.FecEncoder = KcpNewDECEncoder(
  151. dataShards,
  152. parityShards,
  153. cryptHeaderSize,
  154. )
  155. } else {
  156. sess.FecEncoder = KcpNewDECEncoder(
  157. dataShards,
  158. parityShards,
  159. 0,
  160. )
  161. }
  162. if sess.block != nil {
  163. sess.headerSize += cryptHeaderSize
  164. }
  165. if sess.FecEncoder != nil {
  166. sess.headerSize += fecHeaderSizePlus2
  167. }
  168. sess.Kcp = NewKCP(conv, func(
  169. buf []byte,
  170. size int,
  171. ) {
  172. if size >= IKCP_OVERHEAD+sess.headerSize {
  173. sess.output(
  174. buf[:size],
  175. )
  176. }
  177. })
  178. sess.Kcp.ReserveBytes(
  179. sess.headerSize,
  180. )
  181. updater.addSession(
  182. sess,
  183. )
  184. if sess.l == nil {
  185. go sess.readLoop()
  186. atomic.AddUint64(
  187. &DefaultSnsi.KcpActiveOpen,
  188. 1,
  189. )
  190. } else {
  191. atomic.AddUint64(
  192. &DefaultSnsi.KcpPassiveOpen,
  193. 1,
  194. )
  195. }
  196. currestab := atomic.AddUint64(
  197. &DefaultSnsi.KcpNowEstablished,
  198. 1,
  199. )
  200. maxconn := atomic.LoadUint64(
  201. &DefaultSnsi.MaxConn,
  202. )
  203. if currestab > maxconn {
  204. atomic.CompareAndSwapUint64(
  205. &DefaultSnsi.MaxConn,
  206. maxconn,
  207. currestab,
  208. )
  209. }
  210. return sess
  211. }
  212. // Read implements net.Conn
  213. // Function is safe for concurrent access.
  214. func (
  215. s *UDPSession,
  216. ) Read(
  217. b []byte,
  218. ) (
  219. n int,
  220. err error,
  221. ) {
  222. for {
  223. s.mu.Lock()
  224. if len(
  225. s.bufptr,
  226. ) > 0 {
  227. n = copy(
  228. b,
  229. s.bufptr,
  230. )
  231. s.bufptr = s.bufptr[n:]
  232. s.mu.Unlock()
  233. atomic.AddUint64(
  234. &DefaultSnsi.KcpBytesReceived,
  235. uint64(n),
  236. )
  237. return n, nil
  238. }
  239. if s.isClosed {
  240. s.mu.Unlock()
  241. return 0, errors.New(
  242. errBrokenPipe,
  243. )
  244. }
  245. if size := s.Kcp.PeekSize(); size > 0 {
  246. if len(b) >= size {
  247. s.Kcp.Recv(
  248. b,
  249. )
  250. s.mu.Unlock()
  251. atomic.AddUint64(&DefaultSnsi.KcpBytesReceived, uint64(size))
  252. return size, nil
  253. }
  254. if cap(s.recvbuf) < size {
  255. s.recvbuf = make([]byte, size)
  256. }
  257. s.recvbuf = s.recvbuf[:size]
  258. s.Kcp.Recv(
  259. s.recvbuf,
  260. )
  261. n = copy(
  262. b,
  263. s.recvbuf,
  264. )
  265. s.bufptr = s.recvbuf[n:]
  266. s.mu.Unlock()
  267. atomic.AddUint64(
  268. &DefaultSnsi.KcpBytesReceived,
  269. uint64(n),
  270. )
  271. return n, nil
  272. }
  273. var timeout *time.Timer
  274. var c <-chan time.Time
  275. if !s.rd.IsZero() {
  276. if time.Now().After(
  277. s.rd,
  278. ) {
  279. s.mu.Unlock()
  280. return 0, errTimeout{}
  281. }
  282. delay := s.rd.Sub(
  283. time.Now(),
  284. )
  285. timeout = time.NewTimer(
  286. delay,
  287. )
  288. c = timeout.C
  289. }
  290. s.mu.Unlock()
  291. select {
  292. case <-s.chReadEvent:
  293. case <-c:
  294. case <-s.die:
  295. case err = <-s.chReadError:
  296. if timeout != nil {
  297. timeout.Stop()
  298. }
  299. return n, err
  300. }
  301. if timeout != nil {
  302. timeout.Stop()
  303. }
  304. }
  305. }
  306. func (
  307. s *UDPSession,
  308. ) Write(
  309. b []byte,
  310. ) (
  311. n int,
  312. err error,
  313. ) {
  314. return s.WriteBuffers(
  315. [][]byte{b},
  316. )
  317. }
  318. func (
  319. s *UDPSession,
  320. ) WriteBuffers(
  321. v [][]byte,
  322. ) (
  323. n int,
  324. err error,
  325. ) {
  326. for {
  327. s.mu.Lock()
  328. if s.isClosed {
  329. s.mu.Unlock()
  330. return 0,
  331. errors.New(
  332. errBrokenPipe,
  333. )
  334. }
  335. if s.Kcp.WaitSnd() < int(
  336. s.Kcp.snd_wnd,
  337. ) {
  338. for _, b := range v {
  339. n += len(
  340. b)
  341. for {
  342. if len(
  343. b,
  344. ) <= int(
  345. s.Kcp.mss,
  346. ) {
  347. s.Kcp.Send(
  348. b,
  349. )
  350. break
  351. } else {
  352. s.Kcp.Send(
  353. b[:s.Kcp.mss],
  354. )
  355. b = b[s.Kcp.mss:]
  356. }
  357. }
  358. }
  359. if s.Kcp.WaitSnd() >= int(s.Kcp.snd_wnd) || !s.writeDelay {
  360. s.Kcp.Flush(
  361. false,
  362. )
  363. }
  364. s.mu.Unlock()
  365. atomic.AddUint64(
  366. &DefaultSnsi.KcpBytesSent,
  367. uint64(n),
  368. )
  369. return n, nil
  370. }
  371. var timeout *time.Timer
  372. var c <-chan time.Time
  373. if !s.wd.IsZero() {
  374. if time.Now().After(s.wd) {
  375. s.mu.Unlock()
  376. return 0, errTimeout{}
  377. }
  378. delay := s.wd.Sub(time.Now())
  379. timeout = time.NewTimer(delay)
  380. c = timeout.C
  381. }
  382. s.mu.Unlock()
  383. select {
  384. case <-s.chWriteEvent:
  385. case <-c:
  386. case <-s.die:
  387. case err = <-s.chWriteError:
  388. if timeout != nil {
  389. timeout.Stop()
  390. }
  391. return n, err
  392. }
  393. if timeout != nil {
  394. timeout.Stop()
  395. }
  396. }
  397. }
  398. func (
  399. s *UDPSession,
  400. ) Close() error {
  401. updater.removeSession(s)
  402. if s.l != nil {
  403. s.l.CloseSession(
  404. s.remote,
  405. )
  406. }
  407. s.mu.Lock()
  408. defer s.mu.Unlock()
  409. if s.isClosed {
  410. return errors.New(
  411. errBrokenPipe,
  412. )
  413. }
  414. close(
  415. s.die,
  416. )
  417. s.isClosed = true
  418. atomic.AddUint64(
  419. &DefaultSnsi.KcpNowEstablished,
  420. ^uint64(0),
  421. )
  422. if s.l == nil {
  423. return s.conn.Close()
  424. }
  425. return nil
  426. }
  427. // LocalAddr returns the local network address.
  428. // The address returned is shared by all invocations of LocalAddr - do not modify it.
  429. func (
  430. s *UDPSession,
  431. ) LocalAddr() net.Addr {
  432. return s.conn.LocalAddr()
  433. }
  434. // RemoteAddr returns the remote network address.
  435. // The adress returned is shared by all invocations of RemoteAddr - do not modify it.
  436. func (
  437. s *UDPSession,
  438. ) RemoteAddr() net.Addr {
  439. return s.remote
  440. }
  441. // SetDeadline sets a deadline associated with the listener.
  442. // A zero time value disables a deadline.
  443. func (
  444. s *UDPSession,
  445. ) SetDeadline(
  446. t time.Time,
  447. ) error {
  448. s.mu.Lock()
  449. defer s.mu.Unlock()
  450. s.rd = t
  451. s.wd = t
  452. s.notifyReadEvent()
  453. s.notifyWriteEvent()
  454. return nil
  455. }
  456. // SetReadDeadline implements the Conn SetReadDeadline method.
  457. func (
  458. s *UDPSession,
  459. ) SetReadDeadline(
  460. t time.Time,
  461. ) error {
  462. s.mu.Lock()
  463. defer s.mu.Unlock()
  464. s.rd = t
  465. s.notifyReadEvent()
  466. return nil
  467. }
  468. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  469. func (
  470. s *UDPSession,
  471. ) SetWriteDeadline(
  472. t time.Time,
  473. ) error {
  474. s.mu.Lock()
  475. defer s.mu.Unlock()
  476. s.wd = t
  477. s.notifyWriteEvent()
  478. return nil
  479. }
  480. // SetWriteDelay delays writes for bulk transfers, until the next update interval.
  481. func (
  482. s *UDPSession,
  483. ) SetWriteDelay(
  484. delay bool,
  485. ) {
  486. s.mu.Lock()
  487. defer s.mu.Unlock()
  488. s.writeDelay = delay
  489. }
  490. // SetWindowSize sets the maximum window size
  491. func (
  492. s *UDPSession,
  493. ) SetWindowSize(
  494. sndwnd,
  495. rcvwnd int,
  496. ) {
  497. s.mu.Lock()
  498. defer s.mu.Unlock()
  499. s.Kcp.WndSize(
  500. sndwnd,
  501. rcvwnd,
  502. )
  503. }
  504. // SetMtu sets the maximum transmission unit
  505. // This size does not including UDP header itself.
  506. func (
  507. s *UDPSession,
  508. ) SetMtu(
  509. mtu int,
  510. ) bool {
  511. if mtu > KcpMtuLimit {
  512. return false
  513. }
  514. s.mu.Lock()
  515. defer s.mu.Unlock()
  516. s.Kcp.SetMtu(
  517. mtu,
  518. )
  519. return true
  520. }
  521. // SetStreamMode toggles the streaming mode on or off
  522. func (s *UDPSession) SetStreamMode(
  523. enable bool,
  524. ) {
  525. s.mu.Lock()
  526. defer s.mu.Unlock()
  527. if enable {
  528. s.Kcp.stream = 1
  529. } else {
  530. s.Kcp.stream = 0
  531. }
  532. }
  533. // SetACKNoDelay changes the ACK flushing option.
  534. // If set to true, ACKs are flusghed immediately,
  535. func (
  536. s *UDPSession,
  537. ) SetACKNoDelay(
  538. nodelay bool,
  539. ) {
  540. s.mu.Lock()
  541. defer s.mu.Unlock()
  542. s.ackNoDelay = nodelay
  543. }
  544. // SetDUP duplicates UDP packets for Kcp output.
  545. // Useful for testing, not for normal use.
  546. func (
  547. s *UDPSession,
  548. ) SetDUP(
  549. dup int,
  550. ) {
  551. s.mu.Lock()
  552. defer s.mu.Unlock()
  553. s.dup = dup
  554. }
  555. // SetNoDelay sets TCP_DELAY, for Kcp.
  556. func (
  557. s *UDPSession,
  558. ) SetNoDelay(
  559. nodelay,
  560. interval,
  561. resend,
  562. nc int,
  563. ) {
  564. s.mu.Lock()
  565. defer s.mu.Unlock()
  566. s.Kcp.NoDelay(
  567. nodelay,
  568. interval,
  569. resend,
  570. nc,
  571. )
  572. }
  573. // SetDSCP sets the 6-bit DSCP field of IP header.
  574. // Has no effect, unless accepted by your Listener.
  575. func (
  576. s *UDPSession,
  577. ) SetDSCP(
  578. dscp int,
  579. ) error {
  580. s.mu.Lock()
  581. defer s.mu.Unlock()
  582. if s.l == nil {
  583. if nc, ok := s.conn.(net.Conn); ok {
  584. addr, _ := net.ResolveUDPAddr(
  585. "udp",
  586. nc.LocalAddr().String(),
  587. )
  588. if addr.IP.To4() != nil {
  589. return ipv4.NewConn(nc).SetTOS(
  590. dscp << 2,
  591. )
  592. } else {
  593. return ipv6.NewConn(nc).SetTrafficClass(
  594. dscp,
  595. )
  596. }
  597. }
  598. }
  599. return errors.New(
  600. errInvalidOperation,
  601. )
  602. }
  603. // SetReadBuffer sets the socket read buffer.
  604. // Has no effect, unless it's accepted by your Listener.
  605. func (
  606. s *UDPSession,
  607. ) SetReadBuffer(
  608. bytes int,
  609. ) error {
  610. s.mu.Lock()
  611. defer s.mu.Unlock()
  612. if s.l == nil {
  613. if nc, ok := s.conn.(setReadBuffer); ok {
  614. return nc.SetReadBuffer(
  615. bytes,
  616. )
  617. }
  618. }
  619. return errors.New(
  620. errInvalidOperation,
  621. )
  622. }
  623. // SetWriteBuffer sets the socket write buffer.
  624. // Has no effect, unless it's accepted by your Listener.
  625. func (
  626. s *UDPSession,
  627. ) SetWriteBuffer(
  628. bytes int,
  629. ) error {
  630. s.mu.Lock()
  631. defer s.mu.Unlock()
  632. if s.l == nil {
  633. if nc, ok := s.conn.(setWriteBuffer); ok {
  634. return nc.SetWriteBuffer(
  635. bytes,
  636. )
  637. }
  638. }
  639. return errors.New(
  640. errInvalidOperation,
  641. )
  642. }
  643. func (
  644. s *UDPSession,
  645. ) output(
  646. buf []byte,
  647. ) {
  648. var ecc [][]byte
  649. if s.FecEncoder != nil {
  650. ecc = s.FecEncoder.Encode(
  651. buf,
  652. )
  653. }
  654. if s.block != nil {
  655. s.nonce.Fill(
  656. buf[:nonceSize],
  657. )
  658. checksum := crc32.ChecksumIEEE(
  659. buf[cryptHeaderSize:],
  660. )
  661. binary.LittleEndian.PutUint32(
  662. buf[nonceSize:],
  663. checksum,
  664. )
  665. s.block.Encrypt(
  666. buf,
  667. buf,
  668. )
  669. for k := range ecc {
  670. s.nonce.Fill(ecc[k][:nonceSize])
  671. checksum := crc32.ChecksumIEEE(
  672. ecc[k][cryptHeaderSize:],
  673. )
  674. binary.LittleEndian.PutUint32(
  675. ecc[k][nonceSize:],
  676. checksum,
  677. )
  678. s.block.Encrypt(ecc[k],
  679. ecc[k],
  680. )
  681. }
  682. }
  683. nbytes := 0
  684. npkts := 0
  685. for i := 0; i < s.dup+1; i++ {
  686. if n, err := s.conn.WriteTo(
  687. buf,
  688. s.remote,
  689. ); err == nil {
  690. nbytes += n
  691. npkts++
  692. } else {
  693. s.notifyWriteError(
  694. err,
  695. )
  696. }
  697. }
  698. for k := range ecc {
  699. if n, err := s.conn.WriteTo(
  700. ecc[k],
  701. s.remote,
  702. ); err == nil {
  703. nbytes += n
  704. npkts++
  705. } else {
  706. s.notifyWriteError(
  707. err,
  708. )
  709. }
  710. }
  711. atomic.AddUint64(
  712. &DefaultSnsi.KcpOutputPackets,
  713. uint64(npkts),
  714. )
  715. atomic.AddUint64(
  716. &DefaultSnsi.KcpOutputBytes,
  717. uint64(nbytes),
  718. )
  719. }
  720. func (
  721. s *UDPSession,
  722. ) update() (
  723. interval time.Duration,
  724. ) {
  725. s.mu.Lock()
  726. waitsnd := s.Kcp.WaitSnd()
  727. interval = time.Duration(
  728. s.Kcp.Flush(false)) * time.Millisecond
  729. if s.Kcp.WaitSnd() < waitsnd {
  730. s.notifyWriteEvent()
  731. }
  732. s.mu.Unlock()
  733. return
  734. }
  735. func (
  736. s *UDPSession,
  737. ) GetConv() uint32 {
  738. return s.Kcp.conv
  739. }
  740. func (
  741. s *UDPSession,
  742. ) notifyReadEvent() {
  743. select {
  744. case s.chReadEvent <- struct{}{}:
  745. default:
  746. }
  747. }
  748. func (
  749. s *UDPSession,
  750. ) notifyWriteEvent() {
  751. select {
  752. case s.chWriteEvent <- struct{}{}:
  753. default:
  754. }
  755. }
  756. func (
  757. s *UDPSession,
  758. ) notifyWriteError(err error) {
  759. select {
  760. case s.chWriteError <- err:
  761. default:
  762. }
  763. }
  764. func (
  765. s *UDPSession,
  766. ) packetInput(
  767. data []byte,
  768. ) {
  769. dataValid := false
  770. if s.block != nil {
  771. s.block.Decrypt(
  772. data,
  773. data,
  774. )
  775. data = data[nonceSize:]
  776. checksum := crc32.ChecksumIEEE(
  777. data[crcSize:],
  778. )
  779. if checksum == binary.LittleEndian.Uint32(
  780. data,
  781. ) {
  782. data = data[crcSize:]
  783. dataValid = true
  784. } else {
  785. atomic.AddUint64(
  786. &DefaultSnsi.KcpChecksumFailures,
  787. 1,
  788. )
  789. }
  790. } else if s.block == nil {
  791. dataValid = true
  792. }
  793. if dataValid {
  794. s.KcpInput(
  795. data,
  796. )
  797. }
  798. }
  799. func (
  800. s *UDPSession,
  801. ) KcpInput(
  802. data []byte,
  803. ) {
  804. var KcpInErrors,
  805. fecErrs,
  806. fecRecovered,
  807. fecParityShards uint64
  808. if s.FecDecoder != nil {
  809. if len(
  810. data,
  811. ) > fecHeaderSize {
  812. f := FecPacket(
  813. data,
  814. )
  815. if f.flag() == KTypeData || f.flag() == KTypeParity {
  816. if f.flag() == KTypeParity {
  817. fecParityShards++
  818. }
  819. recovers := s.FecDecoder.Decode(
  820. f,
  821. )
  822. s.mu.Lock()
  823. waitsnd := s.Kcp.WaitSnd()
  824. if f.flag() == KTypeData {
  825. if ret := s.Kcp.Input(
  826. data[fecHeaderSizePlus2:],
  827. true,
  828. s.ackNoDelay,
  829. ); ret != 0 {
  830. KcpInErrors++
  831. }
  832. }
  833. for _, r := range recovers {
  834. if len(
  835. r,
  836. ) >= 2 {
  837. sz := binary.LittleEndian.Uint16(
  838. r,
  839. )
  840. if int(
  841. sz,
  842. ) <= len(
  843. r,
  844. ) && sz >= 2 {
  845. if ret := s.Kcp.Input(
  846. r[2:sz],
  847. false,
  848. s.ackNoDelay,
  849. ); ret == 0 {
  850. fecRecovered++
  851. } else {
  852. KcpInErrors++
  853. }
  854. } else {
  855. fecErrs++
  856. }
  857. } else {
  858. fecErrs++
  859. }
  860. KxmitBuf.Put(
  861. r,
  862. )
  863. }
  864. if n := s.Kcp.PeekSize(); n > 0 {
  865. s.notifyReadEvent()
  866. }
  867. if s.Kcp.WaitSnd() < waitsnd {
  868. s.notifyWriteEvent()
  869. }
  870. s.mu.Unlock()
  871. } else {
  872. atomic.AddUint64(
  873. &DefaultSnsi.KcpPreInputErrors,
  874. 1,
  875. )
  876. }
  877. } else {
  878. atomic.AddUint64(
  879. &DefaultSnsi.KcpInputErrors,
  880. 1,
  881. )
  882. }
  883. } else {
  884. s.mu.Lock()
  885. waitsnd := s.Kcp.WaitSnd()
  886. if ret := s.Kcp.Input(
  887. data,
  888. true,
  889. s.ackNoDelay,
  890. ); ret != 0 {
  891. KcpInErrors++
  892. }
  893. if n := s.Kcp.PeekSize(); n > 0 {
  894. s.notifyReadEvent()
  895. }
  896. if s.Kcp.WaitSnd() < waitsnd {
  897. s.notifyWriteEvent()
  898. }
  899. s.mu.Unlock()
  900. }
  901. atomic.AddUint64(
  902. &DefaultSnsi.KcpInputPackets,
  903. 1,
  904. )
  905. atomic.AddUint64(
  906. &DefaultSnsi.KcpInputBytes,
  907. uint64(len(
  908. data,
  909. )))
  910. if fecParityShards > 0 {
  911. atomic.AddUint64(
  912. &DefaultSnsi.KcpFECParityShards,
  913. fecParityShards,
  914. )
  915. }
  916. if KcpInErrors > 0 {
  917. atomic.AddUint64(
  918. &DefaultSnsi.KcpInputErrors,
  919. KcpInErrors,
  920. )
  921. }
  922. if fecErrs > 0 {
  923. atomic.AddUint64(
  924. &DefaultSnsi.KcpFailures,
  925. fecErrs,
  926. )
  927. }
  928. if fecRecovered > 0 {
  929. atomic.AddUint64(
  930. &DefaultSnsi.KcpFECRecovered,
  931. fecRecovered,
  932. )
  933. }
  934. }
  935. type (
  936. Listener struct {
  937. block BlockCrypt // block encryption
  938. dataShards int // FEC data shard
  939. parityShards int // FEC parity shard
  940. FecDecoder *FecDecoder // FEC mock initialization
  941. conn net.PacketConn // the underlying packet connection
  942. sessions map[string]*UDPSession // all sessions accepted by this Listener
  943. sessionLock sync.Mutex
  944. chAccepts chan *UDPSession // Listen() backlog
  945. chSessionClosed chan net.Addr // session close queue
  946. headerSize int // additional header for a Kcp frame
  947. die chan struct{} // notify when the Listener has closed
  948. rd atomic.Value // read deadline for Accept()
  949. wd atomic.Value
  950. }
  951. )
  952. func (
  953. l *Listener,
  954. ) packetInput(
  955. data []byte,
  956. addr net.Addr,
  957. ) {
  958. dataValid := false
  959. if l.block != nil {
  960. l.block.Decrypt(
  961. data,
  962. data,
  963. )
  964. data = data[nonceSize:]
  965. checksum := crc32.ChecksumIEEE(
  966. data[crcSize:],
  967. )
  968. if checksum == binary.LittleEndian.Uint32(
  969. data,
  970. ) {
  971. data = data[crcSize:]
  972. dataValid = true
  973. } else {
  974. atomic.AddUint64(
  975. &DefaultSnsi.KcpChecksumFailures,
  976. 1,
  977. )
  978. }
  979. } else if l.block == nil {
  980. dataValid = true
  981. }
  982. if dataValid {
  983. l.sessionLock.Lock()
  984. s, ok := l.sessions[addr.String()]
  985. l.sessionLock.Unlock()
  986. if !ok {
  987. if len(
  988. l.chAccepts,
  989. ) < cap(
  990. l.chAccepts,
  991. ) {
  992. var conv uint32
  993. convValid := false
  994. if l.FecDecoder != nil {
  995. isfec := binary.LittleEndian.Uint16(
  996. data[4:],
  997. )
  998. if isfec == KTypeData {
  999. conv = binary.LittleEndian.Uint32(
  1000. data[fecHeaderSizePlus2:],
  1001. )
  1002. convValid = true
  1003. }
  1004. } else {
  1005. conv = binary.LittleEndian.Uint32(
  1006. data,
  1007. )
  1008. convValid = true
  1009. }
  1010. if convValid {
  1011. s := newUDPSession(
  1012. conv,
  1013. l.dataShards,
  1014. l.parityShards,
  1015. l,
  1016. l.conn,
  1017. addr,
  1018. l.block,
  1019. )
  1020. s.KcpInput(
  1021. data,
  1022. )
  1023. l.sessionLock.Lock()
  1024. l.sessions[addr.String()] = s
  1025. l.sessionLock.Unlock()
  1026. l.chAccepts <- s
  1027. }
  1028. }
  1029. } else {
  1030. s.KcpInput(
  1031. data,
  1032. )
  1033. }
  1034. }
  1035. }
  1036. // SetReadBuffer sets the socket read buffer for the Listener.
  1037. func (
  1038. l *Listener,
  1039. ) SetReadBuffer(
  1040. bytes int,
  1041. ) error {
  1042. if nc, ok := l.conn.(setReadBuffer); ok {
  1043. return nc.SetReadBuffer(
  1044. bytes,
  1045. )
  1046. }
  1047. return errors.New(
  1048. errInvalidOperation,
  1049. )
  1050. }
  1051. // SetWriteBuffer sets the socket write buffer for the Listener.
  1052. func (
  1053. l *Listener,
  1054. ) SetWriteBuffer(
  1055. bytes int,
  1056. ) error {
  1057. if nc, ok := l.conn.(setWriteBuffer); ok {
  1058. return nc.SetWriteBuffer(
  1059. bytes,
  1060. )
  1061. }
  1062. return errors.New(
  1063. errInvalidOperation,
  1064. )
  1065. }
  1066. // SetDSCP sets the 6-bit DSCP field of IP header.
  1067. func (
  1068. l *Listener,
  1069. ) SetDSCP(
  1070. dscp int,
  1071. ) error {
  1072. if nc, ok := l.conn.(net.Conn); ok {
  1073. addr, _ := net.ResolveUDPAddr(
  1074. "udp",
  1075. nc.LocalAddr().String(),
  1076. )
  1077. if addr.IP.To4() != nil {
  1078. return ipv4.NewConn(
  1079. nc,
  1080. ).SetTOS(
  1081. dscp << 2,
  1082. )
  1083. } else {
  1084. return ipv6.NewConn(
  1085. nc,
  1086. ).SetTrafficClass(
  1087. dscp,
  1088. )
  1089. }
  1090. }
  1091. return errors.New(
  1092. errInvalidOperation,
  1093. )
  1094. }
  1095. // Accept implements the Accept method in the Listener interface.
  1096. // It waits until the next call, then returns a generic 'Conn'.
  1097. func (
  1098. l *Listener,
  1099. ) Accept() (
  1100. net.Conn,
  1101. error,
  1102. ) {
  1103. return l.AcceptKCP()
  1104. }
  1105. // AcceptKCP accepts a Kcp connection
  1106. func (
  1107. l *Listener,
  1108. ) AcceptKCP() (
  1109. *UDPSession,
  1110. error,
  1111. ) {
  1112. var timeout <-chan time.Time
  1113. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  1114. timeout = time.After(
  1115. tdeadline.Sub(
  1116. time.Now(),
  1117. ))
  1118. }
  1119. select {
  1120. case <-timeout:
  1121. return nil, &errTimeout{}
  1122. case c := <-l.chAccepts:
  1123. return c, nil
  1124. case <-l.die:
  1125. return nil, errors.New(
  1126. errBrokenPipe,
  1127. )
  1128. }
  1129. }
  1130. // SetDeadline sets the deadline associated with the Listener.
  1131. // A zero value will disable all deadlines.
  1132. func (
  1133. l *Listener,
  1134. ) SetDeadline(
  1135. t time.Time,
  1136. ) error {
  1137. l.SetReadDeadline(
  1138. t,
  1139. )
  1140. l.SetWriteDeadline(
  1141. t,
  1142. )
  1143. return nil
  1144. }
  1145. // SetReadDeadline implements the Conn SetReadDeadline method.
  1146. func (
  1147. l *Listener,
  1148. ) SetReadDeadline(
  1149. t time.Time,
  1150. ) error {
  1151. l.rd.Store(
  1152. t,
  1153. )
  1154. return nil
  1155. }
  1156. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  1157. func (
  1158. l *Listener,
  1159. ) SetWriteDeadline(
  1160. t time.Time,
  1161. ) error {
  1162. l.wd.Store(
  1163. t,
  1164. )
  1165. return nil
  1166. }
  1167. // Close stops listening on the UDP address.
  1168. // Any already accepted connections will not be closed.
  1169. func (
  1170. l *Listener,
  1171. ) Close() error {
  1172. close(
  1173. l.die,
  1174. )
  1175. return l.conn.Close()
  1176. }
  1177. // CloseSession notifies the Listener when a Session is Closed.
  1178. func (
  1179. l *Listener,
  1180. ) CloseSession(
  1181. remote net.Addr,
  1182. ) (
  1183. ret bool,
  1184. ) {
  1185. l.sessionLock.Lock()
  1186. defer l.sessionLock.Unlock()
  1187. if _, ok := l.sessions[remote.String()]; ok {
  1188. delete(
  1189. l.sessions,
  1190. remote.String(),
  1191. )
  1192. return true
  1193. }
  1194. return false
  1195. }
  1196. // Addr returns the listener's network address.
  1197. // The address returned is shared by all invocations of Addr - do not modify it.
  1198. func (
  1199. l *Listener,
  1200. ) Addr() net.Addr {
  1201. return l.conn.LocalAddr()
  1202. }
  1203. // Listen listens for incoming Kcp packets addressed to our local address (laddr) via "udp"
  1204. func Listen(laddr string) (
  1205. net.Listener, error) {
  1206. return ListenWithOptions(
  1207. laddr,
  1208. nil,
  1209. 0,
  1210. 0,
  1211. )
  1212. }
  1213. // ListenWithOptions listens for incoming Kcp packets addressed to our local address (laddr) via "udp"
  1214. // Porvides for encryption, sharding, parity, and RS coding parameters to be specified.
  1215. func ListenWithOptions(
  1216. laddr string,
  1217. block BlockCrypt,
  1218. dataShards,
  1219. parityShards int,
  1220. ) (
  1221. *Listener,
  1222. error,
  1223. ) {
  1224. udpaddr, err := net.ResolveUDPAddr(
  1225. "udp",
  1226. laddr,
  1227. )
  1228. if err != nil {
  1229. return nil, errors.Wrap(
  1230. err,
  1231. "net.ResolveUDPAddr",
  1232. )
  1233. }
  1234. conn, err := net.ListenUDP(
  1235. "udp",
  1236. udpaddr,
  1237. )
  1238. if err != nil {
  1239. return nil, errors.Wrap(
  1240. err,
  1241. "net.ListenUDP",
  1242. )
  1243. }
  1244. return ServeConn(
  1245. block,
  1246. dataShards,
  1247. parityShards,
  1248. conn,
  1249. )
  1250. }
  1251. // ServeConn serves the Kcp protocol - a single packet is processed.
  1252. func ServeConn(
  1253. block BlockCrypt,
  1254. dataShards,
  1255. parityShards int,
  1256. conn net.PacketConn,
  1257. ) (
  1258. *Listener,
  1259. error,
  1260. ) {
  1261. l := new(
  1262. Listener,
  1263. )
  1264. l.conn = conn
  1265. l.sessions = make(
  1266. map[string]*UDPSession,
  1267. )
  1268. l.chAccepts = make(
  1269. chan *UDPSession,
  1270. acceptBacklog,
  1271. )
  1272. l.chSessionClosed = make(
  1273. chan net.Addr,
  1274. )
  1275. l.die = make(
  1276. chan struct{},
  1277. )
  1278. l.dataShards = dataShards
  1279. l.parityShards = parityShards
  1280. l.block = block
  1281. l.FecDecoder = KcpNewDECDecoder(
  1282. rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  1283. if l.block != nil {
  1284. l.headerSize += cryptHeaderSize
  1285. }
  1286. if l.FecDecoder != nil {
  1287. l.headerSize += fecHeaderSizePlus2
  1288. }
  1289. go l.monitor()
  1290. return l, nil
  1291. }
  1292. // Dial connects to the remote address "raddr" via "udp"
  1293. func Dial(
  1294. raddr string,
  1295. ) (
  1296. net.Conn,
  1297. error,
  1298. ) {
  1299. return DialWithOptions(
  1300. raddr,
  1301. nil,
  1302. 0,
  1303. 0,
  1304. )
  1305. }
  1306. // DialWithOptions connects to the remote address "raddr" via "udp" with encryption options.
  1307. func DialWithOptions(
  1308. raddr string,
  1309. block BlockCrypt,
  1310. dataShards,
  1311. parityShards int,
  1312. ) (
  1313. *UDPSession,
  1314. error,
  1315. ) {
  1316. udpaddr, err := net.ResolveUDPAddr(
  1317. "udp",
  1318. raddr,
  1319. )
  1320. if err != nil {
  1321. return nil, errors.Wrap(
  1322. err,
  1323. "net.ResolveUDPAddr",
  1324. )
  1325. }
  1326. network := "udp4"
  1327. if udpaddr.IP.To4() == nil {
  1328. network = "udp"
  1329. }
  1330. conn, err := net.ListenUDP(
  1331. network,
  1332. nil,
  1333. )
  1334. if err != nil {
  1335. return nil, errors.Wrap(
  1336. err,
  1337. "net.DialUDP",
  1338. )
  1339. }
  1340. return NewConn(
  1341. raddr,
  1342. block,
  1343. dataShards,
  1344. parityShards,
  1345. conn,
  1346. )
  1347. }
  1348. // NewConn establishes a session, talking Kcp over a packet connection.
  1349. func NewConn(
  1350. raddr string,
  1351. block BlockCrypt,
  1352. dataShards,
  1353. parityShards int,
  1354. conn net.PacketConn,
  1355. ) (
  1356. *UDPSession,
  1357. error,
  1358. ) {
  1359. udpaddr, err := net.ResolveUDPAddr(
  1360. "udp",
  1361. raddr,
  1362. )
  1363. if err != nil {
  1364. return nil, errors.Wrap(
  1365. err,
  1366. "net.ResolveUDPAddr",
  1367. )
  1368. }
  1369. var convid uint32
  1370. binary.Read(
  1371. rand.Reader,
  1372. binary.LittleEndian,
  1373. &convid,
  1374. )
  1375. return newUDPSession(
  1376. convid,
  1377. dataShards,
  1378. parityShards,
  1379. nil,
  1380. conn,
  1381. udpaddr,
  1382. block,
  1383. ), nil
  1384. }
  1385. var refTime time.Time = time.Now()
  1386. func KcpCurrentMs() uint32 {
  1387. return uint32(time.Now().Sub(refTime) / time.Millisecond)
  1388. }