stream.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. package gfsmux
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. // Stream implements net.Conn
  11. type Stream struct {
  12. id uint32
  13. sess *Session
  14. // slice heads kept for recycle
  15. buffers [][]byte
  16. heads [][]byte
  17. bufferLock sync.Mutex
  18. frameSize int
  19. // notify a read event
  20. chReadEvent chan struct{}
  21. // flag the stream has closed
  22. die chan struct{}
  23. dieOnce sync.Once
  24. // FIN command
  25. chFinEvent chan struct{}
  26. finEventOnce sync.Once
  27. // deadlines
  28. readDeadline atomic.Value
  29. writeDeadline atomic.Value
  30. // per stream sliding window control
  31. numRead uint32 // number of consumed bytes
  32. numWritten uint32 // count num of bytes written
  33. incr uint32 // counting for sending
  34. // UPD (Update) command
  35. peerConsumed uint32 // num of bytes remote peer has consumed
  36. peerWindow uint32 // peer window, init to 256KB, updated by peer
  37. chUpdate chan struct{} // notify remote data consumed and window update
  38. }
  39. // newStream initiates a Stream struct
  40. func newStream(
  41. id uint32,
  42. frameSize int,
  43. sess *Session,
  44. ) *Stream {
  45. s := new(
  46. Stream,
  47. )
  48. s.id = id
  49. s.chReadEvent = make(
  50. chan struct{},
  51. 1,
  52. )
  53. s.chUpdate = make(
  54. chan struct{},
  55. 1,
  56. )
  57. s.frameSize = frameSize
  58. s.sess = sess
  59. s.die = make(
  60. chan struct{},
  61. )
  62. s.chFinEvent = make(
  63. chan struct{},
  64. )
  65. s.peerWindow = initialPeerWindow // set to initial window size
  66. return s
  67. }
  68. // ID returns the unique stream ID.
  69. func (
  70. s *Stream,
  71. ) ID() uint32 {
  72. return s.id
  73. }
  74. // Read implements net.Conn
  75. func (
  76. s *Stream,
  77. ) Read(
  78. b []byte,
  79. ) (
  80. n int,
  81. err error,
  82. ) {
  83. for {
  84. n, err = s.tryRead(
  85. b,
  86. )
  87. if err == ErrWouldBlock {
  88. if ew := s.waitRead(); ew != nil {
  89. return 0, ew
  90. }
  91. } else {
  92. return n, err
  93. }
  94. }
  95. }
  96. // tryRead is the nonblocking version of Read
  97. func (
  98. s *Stream,
  99. ) tryRead(
  100. b []byte,
  101. ) (
  102. n int,
  103. err error,
  104. ) {
  105. if s.sess.Config.Version == 2 {
  106. return s.tryReadv2(
  107. b,
  108. )
  109. }
  110. if len(
  111. b,
  112. ) == 0 {
  113. return 0, nil
  114. }
  115. s.bufferLock.Lock()
  116. if len(
  117. s.buffers,
  118. ) > 0 {
  119. n = copy(
  120. b,
  121. s.buffers[0],
  122. )
  123. s.buffers[0] = s.buffers[0][n:]
  124. if len(
  125. s.buffers[0],
  126. ) == 0 {
  127. s.buffers[0] = nil
  128. s.buffers = s.buffers[1:]
  129. defaultAllocator.Put(
  130. s.heads[0],
  131. )
  132. s.heads = s.heads[1:]
  133. }
  134. }
  135. s.bufferLock.Unlock()
  136. if n > 0 {
  137. s.sess.returnTokens(
  138. n,
  139. )
  140. return n, nil
  141. }
  142. select {
  143. case <-s.die:
  144. return 0, io.EOF
  145. default:
  146. return 0, ErrWouldBlock
  147. }
  148. }
  149. func (
  150. s *Stream,
  151. ) tryReadv2(
  152. b []byte,
  153. ) (
  154. n int,
  155. err error,
  156. ) {
  157. if len(
  158. b,
  159. ) == 0 {
  160. return 0, nil
  161. }
  162. var notifyConsumed uint32
  163. s.bufferLock.Lock()
  164. if len(
  165. s.buffers,
  166. ) > 0 {
  167. n = copy(
  168. b,
  169. s.buffers[0],
  170. )
  171. s.buffers[0] = s.buffers[0][n:]
  172. if len(
  173. s.buffers[0],
  174. ) == 0 {
  175. s.buffers[0] = nil
  176. s.buffers = s.buffers[1:]
  177. // full recycle
  178. defaultAllocator.Put(
  179. s.heads[0],
  180. )
  181. s.heads = s.heads[1:]
  182. }
  183. }
  184. // ideally, if more than half of buffer has consumed, send read ack to
  185. // peer based on round-trip time of ACK, continuous flowing data won't
  186. // slow down because of waiting for ACK, as long as the consumer keeps on
  187. // reading data s.numRead == n ... also notify window at the first read
  188. s.numRead += uint32(n)
  189. s.incr += uint32(n)
  190. if s.incr >= uint32(s.sess.Config.MaxStreamBuffer/2) || s.numRead == uint32(n) {
  191. notifyConsumed = s.numRead
  192. s.incr = 0
  193. }
  194. s.bufferLock.Unlock()
  195. if n > 0 {
  196. s.sess.returnTokens(
  197. n,
  198. )
  199. if notifyConsumed > 0 {
  200. err := s.sendWindowUpdate(
  201. notifyConsumed,
  202. )
  203. return n, err
  204. }
  205. return n, nil
  206. }
  207. select {
  208. case <-s.die:
  209. return 0, io.EOF
  210. default:
  211. return 0, ErrWouldBlock
  212. }
  213. }
  214. // WriteTo implements io.WriteTo
  215. func (
  216. s *Stream,
  217. ) WriteTo(
  218. w io.Writer,
  219. ) (
  220. n int64,
  221. err error,
  222. ) {
  223. if s.sess.Config.Version == 2 {
  224. return s.writeTov2(
  225. w,
  226. )
  227. }
  228. for {
  229. var buf []byte
  230. s.bufferLock.Lock()
  231. if len(
  232. s.buffers,
  233. ) > 0 {
  234. buf = s.buffers[0]
  235. s.buffers = s.buffers[1:]
  236. s.heads = s.heads[1:]
  237. }
  238. s.bufferLock.Unlock()
  239. if buf != nil {
  240. nw, ew := w.Write(
  241. buf,
  242. )
  243. s.sess.returnTokens(
  244. len(
  245. buf,
  246. ),
  247. )
  248. defaultAllocator.Put(
  249. buf,
  250. )
  251. if nw > 0 {
  252. n += int64(nw)
  253. }
  254. if ew != nil {
  255. return n, ew
  256. }
  257. } else if ew := s.waitRead(); ew != nil {
  258. return n, ew
  259. }
  260. }
  261. }
  262. func (
  263. s *Stream,
  264. ) writeTov2(
  265. w io.Writer,
  266. ) (
  267. n int64,
  268. err error,
  269. ) {
  270. for {
  271. var notifyConsumed uint32
  272. var buf []byte
  273. s.bufferLock.Lock()
  274. if len(
  275. s.buffers,
  276. ) > 0 {
  277. buf = s.buffers[0]
  278. s.buffers = s.buffers[1:]
  279. s.heads = s.heads[1:]
  280. }
  281. s.numRead += uint32(
  282. len(
  283. buf,
  284. ),
  285. )
  286. s.incr += uint32(
  287. len(
  288. buf,
  289. ),
  290. )
  291. if s.incr >= uint32(s.sess.Config.MaxStreamBuffer/2) || s.numRead == uint32(len(buf)) {
  292. notifyConsumed = s.numRead
  293. s.incr = 0
  294. }
  295. s.bufferLock.Unlock()
  296. if buf != nil {
  297. nw, ew := w.Write(
  298. buf,
  299. )
  300. s.sess.returnTokens(
  301. len(
  302. buf,
  303. ),
  304. )
  305. defaultAllocator.Put(
  306. buf,
  307. )
  308. if nw > 0 {
  309. n += int64(nw)
  310. }
  311. if ew != nil {
  312. return n, ew
  313. }
  314. if notifyConsumed > 0 {
  315. if err := s.sendWindowUpdate(
  316. notifyConsumed,
  317. ); err != nil {
  318. return n, err
  319. }
  320. }
  321. } else if ew := s.waitRead(); ew != nil {
  322. return n, ew
  323. }
  324. }
  325. }
  326. func (
  327. s *Stream,
  328. ) sendWindowUpdate(
  329. consumed uint32,
  330. ) error {
  331. var timer *time.Timer
  332. var deadline <-chan time.Time
  333. if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() {
  334. timer = time.NewTimer(
  335. time.Until(
  336. d,
  337. ),
  338. )
  339. defer timer.Stop()
  340. deadline = timer.C
  341. }
  342. frame := NewFrame(
  343. byte(s.sess.Config.Version),
  344. CmdUpd,
  345. s.id,
  346. )
  347. var hdr updHeader
  348. binary.LittleEndian.PutUint32(
  349. hdr[:],
  350. consumed,
  351. )
  352. binary.LittleEndian.PutUint32(
  353. hdr[4:],
  354. uint32(s.sess.Config.MaxStreamBuffer),
  355. )
  356. frame.Data = hdr[:]
  357. _, err := s.sess.WriteFrameInternal(
  358. frame,
  359. deadline,
  360. 0,
  361. )
  362. return err
  363. }
  364. func (
  365. s *Stream,
  366. ) waitRead() error {
  367. var timer *time.Timer
  368. var deadline <-chan time.Time
  369. if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() {
  370. timer = time.NewTimer(
  371. time.Until(
  372. d,
  373. ),
  374. )
  375. defer timer.Stop()
  376. deadline = timer.C
  377. }
  378. select {
  379. case <-s.chReadEvent:
  380. return nil
  381. case <-s.chFinEvent:
  382. // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82 // XXX
  383. s.bufferLock.Lock()
  384. defer s.bufferLock.Unlock()
  385. if len(
  386. s.buffers,
  387. ) > 0 {
  388. return nil
  389. }
  390. return io.EOF
  391. case <-s.sess.chSocketReadError:
  392. return s.sess.socketReadError.Load().(error)
  393. case <-s.sess.chProtoError:
  394. return s.sess.protoError.Load().(error)
  395. case <-deadline:
  396. return ErrTimeout
  397. case <-s.die:
  398. return io.ErrClosedPipe
  399. }
  400. }
  401. // Write implements net.Conn
  402. //
  403. // Behavior when multiple concurrent goroutines write is not deterministic,
  404. // so the frames will interleave in random ways.
  405. func (
  406. s *Stream,
  407. ) Write(
  408. b []byte,
  409. ) (
  410. n int,
  411. err error,
  412. ) {
  413. if s.sess.Config.Version == 2 {
  414. return s.writeV2(
  415. b,
  416. )
  417. }
  418. var deadline <-chan time.Time
  419. if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
  420. timer := time.NewTimer(
  421. time.Until(
  422. d,
  423. ),
  424. )
  425. defer timer.Stop()
  426. deadline = timer.C
  427. }
  428. // check if stream has closed
  429. select {
  430. case <-s.die:
  431. return 0, io.ErrClosedPipe
  432. default:
  433. }
  434. // frame split and transmit
  435. sent := 0
  436. frame := NewFrame(
  437. byte(s.sess.Config.Version),
  438. CmdPsh,
  439. s.id,
  440. )
  441. bts := b
  442. for len(
  443. bts,
  444. ) > 0 {
  445. sz := len(
  446. bts,
  447. )
  448. if sz > s.frameSize {
  449. sz = s.frameSize
  450. }
  451. frame.Data = bts[:sz]
  452. bts = bts[sz:]
  453. n, err := s.sess.WriteFrameInternal(
  454. frame,
  455. deadline,
  456. uint64(s.numWritten),
  457. )
  458. s.numWritten++
  459. sent += n
  460. if err != nil {
  461. return sent, err
  462. }
  463. }
  464. return sent, nil
  465. }
  466. func (
  467. s *Stream,
  468. ) writeV2(
  469. b []byte,
  470. ) (
  471. n int,
  472. err error,
  473. ) {
  474. // check for empty input
  475. if len(b) == 0 {
  476. return 0, nil
  477. }
  478. // check if stream has closed
  479. select {
  480. case <-s.die:
  481. return 0, io.ErrClosedPipe
  482. default:
  483. }
  484. // create write deadline timer
  485. var deadline <-chan time.Time
  486. if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
  487. timer := time.NewTimer(
  488. time.Until(
  489. d,
  490. ),
  491. )
  492. defer timer.Stop()
  493. deadline = timer.C
  494. }
  495. // frame split and transmit process
  496. sent := 0
  497. frame := NewFrame(
  498. byte(s.sess.Config.Version),
  499. CmdPsh,
  500. s.id,
  501. )
  502. for {
  503. // per stream sliding window control
  504. // [.... [consumed... numWritten] ... win... ]
  505. // [.... [consumed...................+rmtwnd]]
  506. var bts []byte
  507. // note:
  508. // even if uint32 overflow, this math still works:
  509. // eg1: uint32(0) - uint32(math.MaxUint32) = 1
  510. // eg2: int32(uint32(0) - uint32(1)) = -1
  511. // security check for misbehavior
  512. inflight := int32(atomic.LoadUint32(
  513. &s.numWritten,
  514. ) - atomic.LoadUint32(
  515. &s.peerConsumed,
  516. ))
  517. if inflight < 0 {
  518. return 0, ErrConsumed
  519. }
  520. win := int32(atomic.LoadUint32(
  521. &s.peerWindow,
  522. )) - inflight
  523. if win > 0 {
  524. if win > int32(len(
  525. b,
  526. )) {
  527. bts = b
  528. b = nil
  529. } else {
  530. bts = b[:win]
  531. b = b[win:]
  532. }
  533. for len(
  534. bts,
  535. ) > 0 {
  536. sz := len(
  537. bts,
  538. )
  539. if sz > s.frameSize {
  540. sz = s.frameSize
  541. }
  542. frame.Data = bts[:sz]
  543. bts = bts[sz:]
  544. n, err := s.sess.WriteFrameInternal(
  545. frame,
  546. deadline,
  547. uint64(atomic.LoadUint32(
  548. &s.numWritten,
  549. ),
  550. ),
  551. )
  552. atomic.AddUint32(
  553. &s.numWritten,
  554. uint32(sz),
  555. )
  556. sent += n
  557. if err != nil {
  558. return sent, err
  559. }
  560. }
  561. }
  562. // if there is any data remaining to be sent
  563. // wait until stream closes, window changes or deadline reached
  564. // this blocking behavior will inform upper layer to do flow control
  565. if len(
  566. b,
  567. ) > 0 {
  568. select {
  569. case <-s.chFinEvent: // if fin arrived, future window update is impossible
  570. return 0, io.EOF
  571. case <-s.die:
  572. return sent, io.ErrClosedPipe
  573. case <-deadline:
  574. return sent, ErrTimeout
  575. case <-s.sess.chSocketWriteError:
  576. return sent, s.sess.socketWriteError.Load().(error)
  577. case <-s.chUpdate:
  578. continue
  579. }
  580. } else {
  581. return sent, nil
  582. }
  583. }
  584. }
  585. // Close implements net.Conn
  586. func (
  587. s *Stream,
  588. ) Close() error {
  589. var once bool
  590. var err error
  591. s.dieOnce.Do(func() {
  592. close(
  593. s.die,
  594. )
  595. once = true
  596. })
  597. if once {
  598. _, err = s.sess.WriteFrame(
  599. NewFrame(
  600. byte(s.sess.Config.Version),
  601. CmdFin,
  602. s.id,
  603. ),
  604. )
  605. s.sess.streamClosed(
  606. s.id,
  607. )
  608. return err
  609. }
  610. return io.ErrClosedPipe
  611. }
  612. // GetDieCh returns a readonly chan which can be readable
  613. // when the stream is to be closed.
  614. func (
  615. s *Stream,
  616. ) GetDieCh() <-chan struct{} {
  617. return s.die
  618. }
  619. // SetReadDeadline sets the read deadline as defined by
  620. // net.Conn.SetReadDeadline.
  621. // A zero time value disables the deadline.
  622. func (
  623. s *Stream,
  624. ) SetReadDeadline(
  625. t time.Time,
  626. ) error {
  627. s.readDeadline.Store(
  628. t,
  629. )
  630. s.notifyReadEvent()
  631. return nil
  632. }
  633. // SetWriteDeadline sets the write deadline as defined by
  634. // net.Conn.SetWriteDeadline.
  635. // A zero time value disables the deadline.
  636. func (
  637. s *Stream,
  638. ) SetWriteDeadline(
  639. t time.Time,
  640. ) error {
  641. s.writeDeadline.Store(
  642. t,
  643. )
  644. return nil
  645. }
  646. // SetDeadline sets both read and write deadlines as defined by
  647. // net.Conn.SetDeadline.
  648. // A zero time value disables the deadlines.
  649. func (
  650. s *Stream,
  651. ) SetDeadline(
  652. t time.Time,
  653. ) error {
  654. if err := s.SetReadDeadline(
  655. t,
  656. ); err != nil {
  657. return err
  658. }
  659. if err := s.SetWriteDeadline(
  660. t,
  661. ); err != nil {
  662. return err
  663. }
  664. return nil
  665. }
  666. // session closes
  667. func (
  668. s *Stream,
  669. ) sessionClose() {
  670. s.dieOnce.Do(func() {
  671. close(
  672. s.die,
  673. )
  674. })
  675. }
  676. // LocalAddr satisfies net.Conn interface
  677. func (
  678. s *Stream,
  679. ) LocalAddr() net.Addr {
  680. if ts, ok := s.sess.Conn.(interface {
  681. LocalAddr() net.Addr
  682. }); ok {
  683. return ts.LocalAddr()
  684. }
  685. return nil
  686. }
  687. // RemoteAddr satisfies net.Conn interface
  688. func (s *Stream) RemoteAddr() net.Addr {
  689. if ts, ok := s.sess.Conn.(interface {
  690. RemoteAddr() net.Addr
  691. }); ok {
  692. return ts.RemoteAddr()
  693. }
  694. return nil
  695. }
  696. // pushBytes append buf to buffers
  697. func (
  698. s *Stream,
  699. ) pushBytes(
  700. buf []byte,
  701. ) (
  702. written int,
  703. err error,
  704. ) {
  705. s.bufferLock.Lock()
  706. s.buffers = append(
  707. s.buffers,
  708. buf,
  709. )
  710. s.heads = append(
  711. s.heads,
  712. buf,
  713. )
  714. s.bufferLock.Unlock()
  715. return
  716. }
  717. // recycleTokens transform remaining bytes to tokens(will truncate buffer)
  718. func (
  719. s *Stream,
  720. ) recycleTokens() (
  721. n int,
  722. ) {
  723. s.bufferLock.Lock()
  724. for k := range s.buffers {
  725. n += len(
  726. s.buffers[k],
  727. )
  728. defaultAllocator.Put(
  729. s.heads[k],
  730. )
  731. }
  732. s.buffers = nil
  733. s.heads = nil
  734. s.bufferLock.Unlock()
  735. return
  736. }
  737. // notify read event
  738. func (
  739. s *Stream,
  740. ) notifyReadEvent() {
  741. select {
  742. case s.chReadEvent <- struct{}{}:
  743. default:
  744. }
  745. }
  746. // update command
  747. func (
  748. s *Stream,
  749. ) update(
  750. consumed,
  751. window uint32,
  752. ) {
  753. atomic.StoreUint32(
  754. &s.peerConsumed,
  755. consumed,
  756. )
  757. atomic.StoreUint32(
  758. &s.peerWindow,
  759. window,
  760. )
  761. select {
  762. case s.chUpdate <- struct{}{}:
  763. default:
  764. }
  765. }
  766. // mark this stream has been closed in protocol
  767. func (
  768. s *Stream,
  769. ) fin() {
  770. s.finEventOnce.Do(func() {
  771. close(
  772. s.chFinEvent,
  773. )
  774. })
  775. }