session_test.go 22 KB


  1. package gfsmux_test
  2. import (
  3. "bytes"
  4. crand "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "log"
  9. "math/rand"
  10. "net"
  11. "net/http"
  12. _ "net/http/pprof"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. smux "go.gridfinity.dev/gfsmux"
  18. u "go.gridfinity.dev/leaktestfe"
  19. )
  20. func init() {
  21. go func() {
  22. log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  23. }()
  24. }
  25. // setupServer starts new server listening on a random localhost port and
  26. // returns address of the server, function to stop the server, new client
  27. // Connection to this server or an error.
  28. func setupServer(tb testing.TB) (addr string, stopfunc func(), client net.Conn, err error) {
  29. ln, err := net.Listen("tcp", "localhost:0")
  30. if err != nil {
  31. return "", nil, nil, err
  32. }
  33. go func() {
  34. Conn, err := ln.Accept()
  35. if err != nil {
  36. return
  37. }
  38. go handleConnection(Conn)
  39. }()
  40. addr = ln.Addr().String()
  41. Conn, err := net.Dial("tcp", addr)
  42. if err != nil {
  43. ln.Close()
  44. return "", nil, nil, err
  45. }
  46. return ln.Addr().String(), func() { ln.Close() }, Conn, nil
  47. }
  48. func handleConnection(Conn net.Conn) {
  49. session, _ := smux.Server(Conn, nil)
  50. for {
  51. if stream, err := session.AcceptStream(); err == nil {
  52. go func(s io.ReadWriteCloser) {
  53. buf := make([]byte, 65536)
  54. for {
  55. n, err := s.Read(buf)
  56. if err != nil {
  57. return
  58. }
  59. s.Write(buf[:n])
  60. }
  61. }(stream)
  62. } else {
  63. return
  64. }
  65. }
  66. }
  67. // setupServer starts new server listening on a random localhost port and
  68. // returns address of the server, function to stop the server, new client
  69. // Connection to this server or an error.
  70. func setupServerV2(tb testing.TB) (addr string, stopfunc func(), client net.Conn, err error) {
  71. ln, err := net.Listen("tcp", "localhost:0")
  72. if err != nil {
  73. return "", nil, nil, err
  74. }
  75. go func() {
  76. Conn, err := ln.Accept()
  77. if err != nil {
  78. return
  79. }
  80. go handleConnectionV2(Conn)
  81. }()
  82. addr = ln.Addr().String()
  83. Conn, err := net.Dial("tcp", addr)
  84. if err != nil {
  85. ln.Close()
  86. return "", nil, nil, err
  87. }
  88. return ln.Addr().String(), func() { ln.Close() }, Conn, nil
  89. }
  90. func handleConnectionV2(Conn net.Conn) {
  91. Config := smux.DefaultConfig()
  92. Config.Version = 2
  93. session, _ := smux.Server(Conn, Config)
  94. for {
  95. if stream, err := session.AcceptStream(); err == nil {
  96. go func(s io.ReadWriteCloser) {
  97. buf := make([]byte, 65536)
  98. for {
  99. n, err := s.Read(buf)
  100. if err != nil {
  101. return
  102. }
  103. s.Write(buf[:n])
  104. }
  105. }(stream)
  106. } else {
  107. return
  108. }
  109. }
  110. }
  111. func TestEcho(t *testing.T) {
  112. defer u.Leakplug(t)
  113. _, stop, cli, err := setupServer(t)
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. defer stop()
  118. session, _ := smux.Client(cli, nil)
  119. stream, _ := session.OpenStream()
  120. const N = 100
  121. buf := make([]byte, 10)
  122. var sent string
  123. var received string
  124. for i := 0; i < N; i++ {
  125. msg := fmt.Sprintf("hello%v", i)
  126. stream.Write([]byte(msg))
  127. sent += msg
  128. if n, err := stream.Read(buf); err != nil {
  129. t.Fatal(err)
  130. } else {
  131. received += string(buf[:n])
  132. }
  133. }
  134. if sent != received {
  135. t.Fatal("data mimatch")
  136. }
  137. session.Close()
  138. }
  139. func TestWriteTo(t *testing.T) {
  140. defer u.Leakplug(t)
  141. const N = 1 << 20
  142. // server
  143. ln, err := net.Listen("tcp", "localhost:0")
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. defer ln.Close()
  148. go func() {
  149. Conn, err := ln.Accept()
  150. if err != nil {
  151. return
  152. }
  153. session, _ := smux.Server(Conn, nil)
  154. for {
  155. if stream, err := session.AcceptStream(); err == nil {
  156. go func(s io.ReadWriteCloser) {
  157. numBytes := 0
  158. buf := make([]byte, 65536)
  159. for {
  160. n, err := s.Read(buf)
  161. if err != nil {
  162. return
  163. }
  164. s.Write(buf[:n])
  165. numBytes += n
  166. if numBytes == N {
  167. s.Close()
  168. return
  169. }
  170. }
  171. }(stream)
  172. } else {
  173. return
  174. }
  175. }
  176. }()
  177. addr := ln.Addr().String()
  178. Conn, err := net.Dial("tcp", addr)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. defer Conn.Close()
  183. // client
  184. session, _ := smux.Client(Conn, nil)
  185. stream, _ := session.OpenStream()
  186. sndbuf := make([]byte, N)
  187. for i := range sndbuf {
  188. sndbuf[i] = byte(rand.Int())
  189. }
  190. go stream.Write(sndbuf)
  191. var rcvbuf bytes.Buffer
  192. nw, ew := stream.WriteTo(&rcvbuf)
  193. if ew != io.EOF {
  194. t.Fatal(ew)
  195. }
  196. if nw != N {
  197. t.Fatal("WriteTo nw mismatch", nw)
  198. }
  199. if bytes.Compare(sndbuf, rcvbuf.Bytes()) != 0 {
  200. t.Fatal("mismatched echo bytes")
  201. }
  202. }
  203. func TestWriteToV2(t *testing.T) {
  204. defer u.Leakplug(t)
  205. Config := smux.DefaultConfig()
  206. Config.Version = 2
  207. const N = 1 << 20
  208. // server
  209. ln, err := net.Listen("tcp", "localhost:0")
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. defer ln.Close()
  214. go func() {
  215. Conn, err := ln.Accept()
  216. if err != nil {
  217. return
  218. }
  219. session, _ := smux.Server(Conn, Config)
  220. for {
  221. if stream, err := session.AcceptStream(); err == nil {
  222. go func(s io.ReadWriteCloser) {
  223. numBytes := 0
  224. buf := make([]byte, 65536)
  225. for {
  226. n, err := s.Read(buf)
  227. if err != nil {
  228. return
  229. }
  230. s.Write(buf[:n])
  231. numBytes += n
  232. if numBytes == N {
  233. s.Close()
  234. return
  235. }
  236. }
  237. }(stream)
  238. } else {
  239. return
  240. }
  241. }
  242. }()
  243. addr := ln.Addr().String()
  244. Conn, err := net.Dial("tcp", addr)
  245. if err != nil {
  246. t.Fatal(err)
  247. }
  248. defer Conn.Close()
  249. // client
  250. session, _ := smux.Client(Conn, Config)
  251. stream, _ := session.OpenStream()
  252. sndbuf := make([]byte, N)
  253. for i := range sndbuf {
  254. sndbuf[i] = byte(rand.Int())
  255. }
  256. go stream.Write(sndbuf)
  257. var rcvbuf bytes.Buffer
  258. nw, ew := stream.WriteTo(&rcvbuf)
  259. if ew != io.EOF {
  260. t.Fatal(ew)
  261. }
  262. if nw != N {
  263. t.Fatal("WriteTo nw mismatch", nw)
  264. }
  265. if bytes.Compare(sndbuf, rcvbuf.Bytes()) != 0 {
  266. t.Fatal("mismatched echo bytes")
  267. }
  268. }
  269. func TestGetDieCh(t *testing.T) {
  270. defer u.Leakplug(t)
  271. cs, ss, err := getSmuxStreamPair()
  272. if err != nil {
  273. t.Fatal(err)
  274. }
  275. defer ss.Close()
  276. dieCh := ss.GetDieCh()
  277. go func() {
  278. select {
  279. case <-dieCh:
  280. case <-time.Tick(time.Second):
  281. t.Fatal("wait die chan timeout")
  282. }
  283. }()
  284. cs.Close()
  285. }
  286. func TestSpeed(t *testing.T) {
  287. defer u.Leakplug(t)
  288. _, stop, cli, err := setupServer(t)
  289. if err != nil {
  290. t.Fatal(err)
  291. }
  292. defer stop()
  293. session, _ := smux.Client(cli, nil)
  294. stream, _ := session.OpenStream()
  295. t.Log(stream.LocalAddr(), stream.RemoteAddr())
  296. start := time.Now()
  297. var wg sync.WaitGroup
  298. wg.Add(1)
  299. go func() {
  300. buf := make([]byte, 1024*1024)
  301. nrecv := 0
  302. for {
  303. n, err := stream.Read(buf)
  304. if err != nil {
  305. t.Error(err)
  306. break
  307. } else {
  308. nrecv += n
  309. if nrecv == 4096*4096 {
  310. break
  311. }
  312. }
  313. }
  314. stream.Close()
  315. t.Log("time for 16MB rtt", time.Since(start))
  316. wg.Done()
  317. }()
  318. msg := make([]byte, 8192)
  319. for i := 0; i < 2048; i++ {
  320. stream.Write(msg)
  321. }
  322. wg.Wait()
  323. session.Close()
  324. }
  325. func TestParallel(t *testing.T) {
  326. defer u.Leakplug(t)
  327. _, stop, cli, err := setupServer(t)
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. defer stop()
  332. session, _ := smux.Client(cli, nil)
  333. par := 1000
  334. messages := 100
  335. var wg sync.WaitGroup
  336. wg.Add(par)
  337. for i := 0; i < par; i++ {
  338. stream, _ := session.OpenStream()
  339. go func(s *smux.Stream) {
  340. buf := make([]byte, 20)
  341. for j := 0; j < messages; j++ {
  342. msg := fmt.Sprintf("hello%v", j)
  343. s.Write([]byte(msg))
  344. if _, err := s.Read(buf); err != nil {
  345. break
  346. }
  347. }
  348. s.Close()
  349. wg.Done()
  350. }(stream)
  351. }
  352. t.Log("created", session.NumStreams(), "streams")
  353. wg.Wait()
  354. session.Close()
  355. }
  356. func TestParallelV2(t *testing.T) {
  357. defer u.Leakplug(t)
  358. Config := smux.DefaultConfig()
  359. Config.Version = 2
  360. _, stop, cli, err := setupServerV2(t)
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. defer stop()
  365. session, _ := smux.Client(cli, Config)
  366. par := 1000
  367. messages := 100
  368. var wg sync.WaitGroup
  369. wg.Add(par)
  370. for i := 0; i < par; i++ {
  371. stream, _ := session.OpenStream()
  372. go func(s *smux.Stream) {
  373. buf := make([]byte, 20)
  374. for j := 0; j < messages; j++ {
  375. msg := fmt.Sprintf("hello%v", j)
  376. s.Write([]byte(msg))
  377. if _, err := s.Read(buf); err != nil {
  378. break
  379. }
  380. }
  381. s.Close()
  382. wg.Done()
  383. }(stream)
  384. }
  385. t.Log("created", session.NumStreams(), "streams")
  386. wg.Wait()
  387. session.Close()
  388. }
  389. func TestCloseThenOpen(t *testing.T) {
  390. defer u.Leakplug(t)
  391. _, stop, cli, err := setupServer(t)
  392. if err != nil {
  393. t.Fatal(err)
  394. }
  395. defer stop()
  396. session, _ := smux.Client(cli, nil)
  397. session.Close()
  398. if _, err := session.OpenStream(); err == nil {
  399. t.Fatal("opened after close")
  400. }
  401. }
  402. func TestSessionDoubleClose(t *testing.T) {
  403. defer u.Leakplug(t)
  404. _, stop, cli, err := setupServer(t)
  405. if err != nil {
  406. t.Fatal(err)
  407. }
  408. defer stop()
  409. session, _ := smux.Client(cli, nil)
  410. session.Close()
  411. if err := session.Close(); err == nil {
  412. t.Fatal("session double close doesn't return error")
  413. }
  414. }
  415. func TestStreamDoubleClose(t *testing.T) {
  416. defer u.Leakplug(t)
  417. _, stop, cli, err := setupServer(t)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. defer stop()
  422. session, _ := smux.Client(cli, nil)
  423. stream, _ := session.OpenStream()
  424. stream.Close()
  425. if err := stream.Close(); err == nil {
  426. t.Fatal("stream double close doesn't return error")
  427. }
  428. session.Close()
  429. }
  430. func TestConcurrentClose(t *testing.T) {
  431. defer u.Leakplug(t)
  432. _, stop, cli, err := setupServer(t)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. defer stop()
  437. session, _ := smux.Client(cli, nil)
  438. numStreams := 100
  439. streams := make([]*smux.Stream, 0, numStreams)
  440. var wg sync.WaitGroup
  441. wg.Add(numStreams)
  442. for i := 0; i < 100; i++ {
  443. stream, _ := session.OpenStream()
  444. streams = append(streams, stream)
  445. }
  446. for _, s := range streams {
  447. stream := s
  448. go func() {
  449. stream.Close()
  450. wg.Done()
  451. }()
  452. }
  453. session.Close()
  454. wg.Wait()
  455. }
  456. func TestTinyReadBuffer(t *testing.T) {
  457. defer u.Leakplug(t)
  458. _, stop, cli, err := setupServer(t)
  459. if err != nil {
  460. t.Fatal(err)
  461. }
  462. defer stop()
  463. session, _ := smux.Client(cli, nil)
  464. stream, _ := session.OpenStream()
  465. const N = 100
  466. tinybuf := make([]byte, 6)
  467. var sent string
  468. var received string
  469. for i := 0; i < N; i++ {
  470. msg := fmt.Sprintf("hello%v", i)
  471. sent += msg
  472. nsent, err := stream.Write([]byte(msg))
  473. if err != nil {
  474. t.Fatal("cannot write")
  475. }
  476. nrecv := 0
  477. for nrecv < nsent {
  478. if n, err := stream.Read(tinybuf); err == nil {
  479. nrecv += n
  480. received += string(tinybuf[:n])
  481. } else {
  482. t.Fatal("cannot read with tiny buffer")
  483. }
  484. }
  485. }
  486. if sent != received {
  487. t.Fatal("data mimatch")
  488. }
  489. session.Close()
  490. }
  491. func TestIsClose(t *testing.T) {
  492. defer u.Leakplug(t)
  493. _, stop, cli, err := setupServer(t)
  494. if err != nil {
  495. t.Fatal(err)
  496. }
  497. defer stop()
  498. session, _ := smux.Client(cli, nil)
  499. session.Close()
  500. if !session.IsClosed() {
  501. t.Fatal("still open after close")
  502. }
  503. }
  504. func TestKeepAliveTimeout(t *testing.T) {
  505. defer u.Leakplug(t)
  506. ln, err := net.Listen("tcp", "localhost:0")
  507. if err != nil {
  508. t.Fatal(err)
  509. }
  510. defer ln.Close()
  511. go func() {
  512. ln.Accept()
  513. }()
  514. cli, err := net.Dial("tcp", ln.Addr().String())
  515. if err != nil {
  516. t.Fatal(err)
  517. }
  518. defer cli.Close()
  519. Config := smux.DefaultConfig()
  520. Config.KeepAliveInterval = time.Second
  521. Config.KeepAliveTimeout = 2 * time.Second
  522. session, _ := smux.Client(cli, Config)
  523. time.Sleep(3 * time.Second)
  524. if !session.IsClosed() {
  525. t.Fatal("keepalive-timeout failed")
  526. }
  527. }
  528. type blockWriteConn struct {
  529. net.Conn
  530. }
  531. func (c *blockWriteConn) Write(b []byte) (n int, err error) {
  532. forever := time.Hour * 24
  533. time.Sleep(forever)
  534. return c.Conn.Write(b)
  535. }
  536. func TestKeepAliveBlockWriteTimeout(t *testing.T) {
  537. defer u.Leakplug(t)
  538. ln, err := net.Listen("tcp", "localhost:0")
  539. if err != nil {
  540. t.Fatal(err)
  541. }
  542. defer ln.Close()
  543. go func() {
  544. ln.Accept()
  545. }()
  546. cli, err := net.Dial("tcp", ln.Addr().String())
  547. if err != nil {
  548. t.Fatal(err)
  549. }
  550. defer cli.Close()
  551. // when WriteFrame block, keepalive in old version never timeout
  552. blockWriteCli := &blockWriteConn{cli}
  553. Config := smux.DefaultConfig()
  554. Config.KeepAliveInterval = time.Second
  555. Config.KeepAliveTimeout = 2 * time.Second
  556. session, _ := smux.Client(blockWriteCli, Config)
  557. time.Sleep(3 * time.Second)
  558. if !session.IsClosed() {
  559. t.Fatal("keepalive-timeout failed")
  560. }
  561. }
  562. func TestServerEcho(t *testing.T) {
  563. ln, err := net.Listen("tcp", "localhost:0")
  564. if err != nil {
  565. t.Fatal(err)
  566. }
  567. defer ln.Close()
  568. go func() {
  569. err := func() error {
  570. Conn, err := ln.Accept()
  571. if err != nil {
  572. return err
  573. }
  574. defer Conn.Close()
  575. session, err := smux.Server(Conn, nil)
  576. if err != nil {
  577. return err
  578. }
  579. defer session.Close()
  580. buf := make([]byte, 10)
  581. stream, err := session.OpenStream()
  582. if err != nil {
  583. return err
  584. }
  585. defer stream.Close()
  586. for i := 0; i < 100; i++ {
  587. msg := fmt.Sprintf("hello%v", i)
  588. stream.Write([]byte(msg))
  589. n, err := stream.Read(buf)
  590. if err != nil {
  591. return err
  592. }
  593. if got := string(buf[:n]); got != msg {
  594. return fmt.Errorf("got: %q, want: %q", got, msg)
  595. }
  596. }
  597. return nil
  598. }()
  599. if err != nil {
  600. t.Error(err)
  601. }
  602. }()
  603. cli, err := net.Dial("tcp", ln.Addr().String())
  604. if err != nil {
  605. t.Fatal(err)
  606. }
  607. defer cli.Close()
  608. if session, err := smux.Client(cli, nil); err == nil {
  609. if stream, err := session.AcceptStream(); err == nil {
  610. buf := make([]byte, 65536)
  611. for {
  612. n, err := stream.Read(buf)
  613. if err != nil {
  614. break
  615. }
  616. stream.Write(buf[:n])
  617. }
  618. } else {
  619. t.Fatal(err)
  620. }
  621. } else {
  622. t.Fatal(err)
  623. }
  624. }
  625. func TestSendWithoutRecv(t *testing.T) {
  626. _, stop, cli, err := setupServer(t)
  627. if err != nil {
  628. t.Fatal(err)
  629. }
  630. defer stop()
  631. session, _ := smux.Client(cli, nil)
  632. stream, _ := session.OpenStream()
  633. const N = 100
  634. for i := 0; i < N; i++ {
  635. msg := fmt.Sprintf("hello%v", i)
  636. stream.Write([]byte(msg))
  637. }
  638. buf := make([]byte, 1)
  639. if _, err := stream.Read(buf); err != nil {
  640. t.Fatal(err)
  641. }
  642. stream.Close()
  643. }
  644. func TestWriteAfterClose(t *testing.T) {
  645. _, stop, cli, err := setupServer(t)
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. defer stop()
  650. session, _ := smux.Client(cli, nil)
  651. stream, _ := session.OpenStream()
  652. stream.Close()
  653. if _, err := stream.Write([]byte("write after close")); err == nil {
  654. t.Fatal("write after close failed")
  655. }
  656. }
  657. func TestReadStreamAfterSessionClose(t *testing.T) {
  658. _, stop, cli, err := setupServer(t)
  659. if err != nil {
  660. t.Fatal(err)
  661. }
  662. defer stop()
  663. session, _ := smux.Client(cli, nil)
  664. stream, _ := session.OpenStream()
  665. session.Close()
  666. buf := make([]byte, 10)
  667. if _, err := stream.Read(buf); err != nil {
  668. t.Log(err)
  669. } else {
  670. t.Fatal("read stream after session close succeeded")
  671. }
  672. }
  673. func TestWriteStreamAfterConnectionClose(t *testing.T) {
  674. _, stop, cli, err := setupServer(t)
  675. if err != nil {
  676. t.Fatal(err)
  677. }
  678. defer stop()
  679. session, _ := smux.Client(cli, nil)
  680. stream, _ := session.OpenStream()
  681. session.Conn.Close()
  682. if _, err := stream.Write([]byte("write after Connection close")); err == nil {
  683. t.Fatal("write after Connection close failed")
  684. }
  685. }
  686. func TestNumStreamAfterClose(t *testing.T) {
  687. _, stop, cli, err := setupServer(t)
  688. if err != nil {
  689. t.Fatal(err)
  690. }
  691. defer stop()
  692. session, _ := smux.Client(cli, nil)
  693. if _, err := session.OpenStream(); err == nil {
  694. if session.NumStreams() != 1 {
  695. t.Fatal("wrong number of streams after opened")
  696. }
  697. session.Close()
  698. if session.NumStreams() != 0 {
  699. t.Fatal("wrong number of streams after session closed")
  700. }
  701. } else {
  702. t.Fatal(err)
  703. }
  704. cli.Close()
  705. }
  706. func TestRandomFrame(t *testing.T) {
  707. addr, stop, cli, err := setupServer(t)
  708. if err != nil {
  709. t.Fatal(err)
  710. }
  711. defer stop()
  712. // pure random
  713. session, _ := smux.Client(cli, nil)
  714. for i := 0; i < 100; i++ {
  715. rnd := make([]byte, rand.Uint32()%1024)
  716. io.ReadFull(crand.Reader, rnd)
  717. session.Conn.Write(rnd)
  718. }
  719. cli.Close()
  720. // double syn
  721. cli, err = net.Dial("tcp", addr)
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. session, _ = smux.Client(cli, nil)
  726. for i := 0; i < 100; i++ {
  727. f := smux.NewFrame(1, smux.CmdSyn, 1000)
  728. session.WriteFrame(f)
  729. }
  730. cli.Close()
  731. // random cmds
  732. cli, err = net.Dial("tcp", addr)
  733. if err != nil {
  734. t.Fatal(err)
  735. }
  736. allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
  737. session, _ = smux.Client(cli, nil)
  738. for i := 0; i < 100; i++ {
  739. f := smux.NewFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32())
  740. session.WriteFrame(f)
  741. }
  742. cli.Close()
  743. // random cmds & Sids
  744. cli, err = net.Dial("tcp", addr)
  745. if err != nil {
  746. t.Fatal(err)
  747. }
  748. session, _ = smux.Client(cli, nil)
  749. for i := 0; i < 100; i++ {
  750. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  751. session.WriteFrame(f)
  752. }
  753. cli.Close()
  754. // random version
  755. cli, err = net.Dial("tcp", addr)
  756. if err != nil {
  757. t.Fatal(err)
  758. }
  759. session, _ = smux.Client(cli, nil)
  760. for i := 0; i < 100; i++ {
  761. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  762. f.Ver = byte(rand.Uint32())
  763. session.WriteFrame(f)
  764. }
  765. cli.Close()
  766. // incorrect size
  767. cli, err = net.Dial("tcp", addr)
  768. if err != nil {
  769. t.Fatal(err)
  770. }
  771. session, _ = smux.Client(cli, nil)
  772. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  773. rnd := make([]byte, rand.Uint32()%1024)
  774. io.ReadFull(crand.Reader, rnd)
  775. f.Data = rnd
  776. buf := make([]byte, smux.HeaderSize+len(f.Data))
  777. buf[0] = f.Ver
  778. buf[1] = f.Cmd
  779. binary.LittleEndian.PutUint16(buf[2:], uint16(len(rnd)+1)) /// incorrect size
  780. binary.LittleEndian.PutUint32(buf[4:], f.Sid)
  781. copy(buf[smux.HeaderSize:], f.Data)
  782. session.Conn.Write(buf)
  783. cli.Close()
  784. // WriteFrame after die
  785. cli, err = net.Dial("tcp", addr)
  786. if err != nil {
  787. t.Fatal(err)
  788. }
  789. session, _ = smux.Client(cli, nil)
  790. // close first
  791. session.Close()
  792. for i := 0; i < 100; i++ {
  793. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  794. session.WriteFrame(f)
  795. }
  796. }
  797. func TestWriteFrameInternal(t *testing.T) {
  798. addr, stop, cli, err := setupServer(t)
  799. if err != nil {
  800. t.Fatal(err)
  801. }
  802. defer stop()
  803. // pure random
  804. session, _ := smux.Client(cli, nil)
  805. for i := 0; i < 100; i++ {
  806. rnd := make([]byte, rand.Uint32()%1024)
  807. io.ReadFull(crand.Reader, rnd)
  808. session.Conn.Write(rnd)
  809. }
  810. cli.Close()
  811. // WriteFrame after die
  812. cli, err = net.Dial("tcp", addr)
  813. if err != nil {
  814. t.Fatal(err)
  815. }
  816. session, _ = smux.Client(cli, nil)
  817. // close first
  818. session.Close()
  819. for i := 0; i < 100; i++ {
  820. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  821. session.WriteFrameInternal(f, time.After(session.Config.KeepAliveTimeout), 0)
  822. }
  823. // random cmds
  824. cli, err = net.Dial("tcp", addr)
  825. if err != nil {
  826. t.Fatal(err)
  827. }
  828. allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
  829. session, _ = smux.Client(cli, nil)
  830. for i := 0; i < 100; i++ {
  831. f := smux.NewFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32())
  832. session.WriteFrameInternal(f, time.After(session.Config.KeepAliveTimeout), 0)
  833. }
  834. // deadline occur
  835. {
  836. c := make(chan time.Time)
  837. close(c)
  838. f := smux.NewFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32())
  839. _, err := session.WriteFrameInternal(f, c, 0)
  840. if !strings.Contains(err.Error(), "timeout") {
  841. t.Fatal("write frame with deadline failed", err)
  842. }
  843. netErr, ok := err.(net.Error)
  844. if !ok {
  845. t.Fatal("expected net.Error for timeout")
  846. }
  847. if netErr.Timeout() == false {
  848. t.Fatal("expected Timeout() to be true on timeout error ", err)
  849. }
  850. if netErr.Temporary() == false {
  851. t.Fatal("expected Temporary() to be true on timeout error", err)
  852. }
  853. }
  854. cli.Close()
  855. {
  856. cli, err = net.Dial("tcp", addr)
  857. if err != nil {
  858. t.Fatal(err)
  859. }
  860. Config := smux.DefaultConfig()
  861. Config.KeepAliveInterval = time.Second
  862. Config.KeepAliveTimeout = 2 * time.Second
  863. session, _ = smux.Client(&blockWriteConn{cli}, Config)
  864. f := smux.NewFrame(1, byte(rand.Uint32()), rand.Uint32())
  865. c := make(chan time.Time)
  866. go func() {
  867. // die first, deadline second, better for coverage
  868. time.Sleep(time.Second)
  869. session.Close()
  870. time.Sleep(time.Second)
  871. close(c)
  872. }()
  873. _, err = session.WriteFrameInternal(f, c, 0)
  874. if !strings.Contains(err.Error(), "closed pipe") {
  875. t.Fatal("write frame with to closed Conn failed", err)
  876. }
  877. }
  878. }
  879. func TestReadDeadline(t *testing.T) {
  880. _, stop, cli, err := setupServer(t)
  881. if err != nil {
  882. t.Fatal(err)
  883. }
  884. defer stop()
  885. session, _ := smux.Client(cli, nil)
  886. stream, _ := session.OpenStream()
  887. const N = 100
  888. buf := make([]byte, 10)
  889. var readErr error
  890. for i := 0; i < N; i++ {
  891. stream.SetReadDeadline(time.Now().Add(-1 * time.Minute))
  892. if _, readErr = stream.Read(buf); readErr != nil {
  893. break
  894. }
  895. }
  896. if readErr != nil {
  897. if !strings.Contains(readErr.Error(), "timeout") {
  898. t.Fatalf("Wrong error: %v", readErr)
  899. }
  900. } else {
  901. t.Fatal("No error when reading with past deadline")
  902. }
  903. session.Close()
  904. }
  905. func TestWriteDeadline(t *testing.T) {
  906. _, stop, cli, err := setupServer(t)
  907. if err != nil {
  908. t.Fatal(err)
  909. }
  910. defer stop()
  911. session, _ := smux.Client(cli, nil)
  912. stream, _ := session.OpenStream()
  913. buf := make([]byte, 10)
  914. var writeErr error
  915. for {
  916. stream.SetWriteDeadline(time.Now().Add(-1 * time.Minute))
  917. if _, writeErr = stream.Write(buf); writeErr != nil {
  918. if !strings.Contains(writeErr.Error(), "timeout") {
  919. t.Fatalf("Wrong error: %v", writeErr)
  920. }
  921. break
  922. }
  923. }
  924. session.Close()
  925. }
  926. func BenchmarkAcceptClose(b *testing.B) {
  927. _, stop, cli, err := setupServer(b)
  928. if err != nil {
  929. b.Fatal(err)
  930. }
  931. defer stop()
  932. session, _ := smux.Client(cli, nil)
  933. for i := 0; i < b.N; i++ {
  934. if stream, err := session.OpenStream(); err == nil {
  935. stream.Close()
  936. } else {
  937. b.Fatal(err)
  938. }
  939. }
  940. }
  941. func BenchmarkConnSmux(b *testing.B) {
  942. cs, ss, err := getSmuxStreamPair()
  943. if err != nil {
  944. b.Fatal(err)
  945. }
  946. defer cs.Close()
  947. defer ss.Close()
  948. bench(b, cs, ss)
  949. }
  950. func BenchmarkConnTCP(b *testing.B) {
  951. cs, ss, err := getTCPConnectionPair()
  952. if err != nil {
  953. b.Fatal(err)
  954. }
  955. defer cs.Close()
  956. defer ss.Close()
  957. bench(b, cs, ss)
  958. }
  959. func getSmuxStreamPair() (*smux.Stream, *smux.Stream, error) {
  960. c1, c2, err := getTCPConnectionPair()
  961. if err != nil {
  962. return nil, nil, err
  963. }
  964. s, err := smux.Server(c2, nil)
  965. if err != nil {
  966. return nil, nil, err
  967. }
  968. c, err := smux.Client(c1, nil)
  969. if err != nil {
  970. return nil, nil, err
  971. }
  972. var ss *smux.Stream
  973. done := make(chan error)
  974. go func() {
  975. var rerr error
  976. ss, rerr = s.AcceptStream()
  977. done <- rerr
  978. close(done)
  979. }()
  980. cs, err := c.OpenStream()
  981. if err != nil {
  982. return nil, nil, err
  983. }
  984. err = <-done
  985. if err != nil {
  986. return nil, nil, err
  987. }
  988. return cs, ss, nil
  989. }
  990. func getTCPConnectionPair() (net.Conn, net.Conn, error) {
  991. lst, err := net.Listen("tcp", "localhost:0")
  992. if err != nil {
  993. return nil, nil, err
  994. }
  995. defer lst.Close()
  996. var Conn0 net.Conn
  997. var err0 error
  998. done := make(chan struct{})
  999. go func() {
  1000. Conn0, err0 = lst.Accept()
  1001. close(done)
  1002. }()
  1003. Conn1, err := net.Dial("tcp", lst.Addr().String())
  1004. if err != nil {
  1005. return nil, nil, err
  1006. }
  1007. <-done
  1008. if err0 != nil {
  1009. return nil, nil, err0
  1010. }
  1011. return Conn0, Conn1, nil
  1012. }
  1013. func bench(b *testing.B, rd io.Reader, wr io.Writer) {
  1014. buf := make([]byte, 128*1024)
  1015. buf2 := make([]byte, 128*1024)
  1016. b.SetBytes(128 * 1024)
  1017. b.ResetTimer()
  1018. b.ReportAllocs()
  1019. var wg sync.WaitGroup
  1020. wg.Add(1)
  1021. go func() {
  1022. defer wg.Done()
  1023. count := 0
  1024. for {
  1025. n, _ := rd.Read(buf2)
  1026. count += n
  1027. if count == 128*1024*b.N {
  1028. return
  1029. }
  1030. }
  1031. }()
  1032. for i := 0; i < b.N; i++ {
  1033. wr.Write(buf)
  1034. }
  1035. wg.Wait()
  1036. }