session_test.go 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039
  1. package gfsmux_test
  2. import (
  3. "bytes"
  4. crand "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "log"
  9. "math/rand"
  10. "net"
  11. "net/http"
  12. _ "net/http/pprof"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. smux "github.com/johnsonjh/gfsmux"
  18. u "github.com/johnsonjh/leaktestfe"
  19. )
  20. func init() {
  21. go func() {
  22. log.Println(
  23. http.ListenAndServe(
  24. "0.0.0.0:6060",
  25. nil,
  26. ),
  27. )
  28. }()
  29. }
  30. // setupServer starts new server listening on a random localhost port and
  31. // returns address of the server, function to stop the server, new client
  32. // Connection to this server or an error.
  33. func setupServer(
  34. tb testing.TB,
  35. ) (
  36. addr string,
  37. stopfunc func(),
  38. client net.Conn,
  39. err error,
  40. ) {
  41. ln, err := net.Listen(
  42. "tcp",
  43. "localhost:0",
  44. )
  45. if err != nil {
  46. return "", nil, nil, err
  47. }
  48. go func() {
  49. Conn, err := ln.Accept()
  50. if err != nil {
  51. return
  52. }
  53. go handleConnection(
  54. Conn,
  55. )
  56. }()
  57. addr = ln.Addr().String()
  58. Conn, err := net.Dial(
  59. "tcp",
  60. addr,
  61. )
  62. if err != nil {
  63. ln.Close()
  64. return "", nil, nil, err
  65. }
  66. return ln.Addr().String(), func() { ln.Close() }, Conn, nil
  67. }
  68. func handleConnection(
  69. Conn net.Conn,
  70. ) {
  71. session, _ := smux.Server(
  72. Conn,
  73. nil,
  74. )
  75. for {
  76. if stream, err := session.AcceptStream(); err == nil {
  77. go func(
  78. s io.ReadWriteCloser,
  79. ) {
  80. buf := make(
  81. []byte,
  82. 65536,
  83. )
  84. for {
  85. n, err := s.Read(
  86. buf,
  87. )
  88. if err != nil {
  89. return
  90. }
  91. s.Write(
  92. buf[:n],
  93. )
  94. }
  95. }(
  96. stream,
  97. )
  98. } else {
  99. return
  100. }
  101. }
  102. }
  103. // setupServer starts new server listening on a random localhost port and
  104. // returns address of the server, function to stop the server, new client
  105. // Connection to this server or an error.
  106. func setupServerV2(
  107. tb testing.TB,
  108. ) (
  109. addr string,
  110. stopfunc func(),
  111. client net.Conn,
  112. err error,
  113. ) {
  114. ln, err := net.Listen(
  115. "tcp",
  116. "localhost:0",
  117. )
  118. if err != nil {
  119. return "", nil, nil, err
  120. }
  121. go func() {
  122. Conn, err := ln.Accept()
  123. if err != nil {
  124. return
  125. }
  126. go handleConnectionV2(
  127. Conn,
  128. )
  129. }()
  130. addr = ln.Addr().String()
  131. Conn, err := net.Dial(
  132. "tcp",
  133. addr,
  134. )
  135. if err != nil {
  136. ln.Close()
  137. return "", nil, nil, err
  138. }
  139. return ln.Addr().String(), func() { ln.Close() }, Conn, nil
  140. }
  141. func handleConnectionV2(
  142. Conn net.Conn,
  143. ) {
  144. Config := smux.DefaultConfig()
  145. Config.Version = 2
  146. session, _ := smux.Server(
  147. Conn,
  148. Config,
  149. )
  150. for {
  151. if stream, err := session.AcceptStream(); err == nil {
  152. go func(
  153. s io.ReadWriteCloser,
  154. ) {
  155. buf := make(
  156. []byte,
  157. 65536,
  158. )
  159. for {
  160. n, err := s.Read(
  161. buf,
  162. )
  163. if err != nil {
  164. return
  165. }
  166. s.Write(
  167. buf[:n],
  168. )
  169. }
  170. }(
  171. stream,
  172. )
  173. } else {
  174. return
  175. }
  176. }
  177. }
  178. func TestEcho(
  179. t *testing.T,
  180. ) {
  181. defer u.Leakplug(
  182. t,
  183. )
  184. _, stop, cli, err := setupServer(
  185. t,
  186. )
  187. if err != nil {
  188. t.Fatal(
  189. err,
  190. )
  191. }
  192. defer stop()
  193. session, _ := smux.Client(
  194. cli,
  195. nil,
  196. )
  197. stream, _ := session.OpenStream()
  198. const N = 100
  199. buf := make(
  200. []byte,
  201. 10,
  202. )
  203. var sent string
  204. var received string
  205. for i := 0; i < N; i++ {
  206. msg := fmt.Sprintf(
  207. "hello%v",
  208. i,
  209. )
  210. stream.Write(
  211. []byte(msg),
  212. )
  213. sent += msg
  214. if n, err := stream.Read(
  215. buf,
  216. ); err != nil {
  217. t.Fatal(
  218. err,
  219. )
  220. } else {
  221. received += string(buf[:n])
  222. }
  223. }
  224. if sent != received {
  225. t.Fatal(
  226. "data mimatch",
  227. )
  228. }
  229. session.Close()
  230. }
  231. func TestWriteTo(
  232. t *testing.T,
  233. ) {
  234. defer u.Leakplug(
  235. t,
  236. )
  237. const N = 1 << 20
  238. // server
  239. ln, err := net.Listen(
  240. "tcp",
  241. "localhost:0",
  242. )
  243. if err != nil {
  244. t.Fatal(
  245. err,
  246. )
  247. }
  248. defer ln.Close()
  249. go func() {
  250. Conn, err := ln.Accept()
  251. if err != nil {
  252. return
  253. }
  254. session, _ := smux.Server(
  255. Conn,
  256. nil,
  257. )
  258. for {
  259. if stream, err := session.AcceptStream(); err == nil {
  260. go func(
  261. s io.ReadWriteCloser,
  262. ) {
  263. numBytes := 0
  264. buf := make(
  265. []byte,
  266. 65536,
  267. )
  268. for {
  269. n, err := s.Read(
  270. buf,
  271. )
  272. if err != nil {
  273. return
  274. }
  275. s.Write(
  276. buf[:n],
  277. )
  278. numBytes += n
  279. if numBytes == N {
  280. s.Close()
  281. return
  282. }
  283. }
  284. }(
  285. stream,
  286. )
  287. } else {
  288. return
  289. }
  290. }
  291. }()
  292. addr := ln.Addr().String()
  293. Conn, err := net.Dial(
  294. "tcp",
  295. addr,
  296. )
  297. if err != nil {
  298. t.Fatal(
  299. err,
  300. )
  301. }
  302. defer Conn.Close()
  303. // client
  304. session, _ := smux.Client(
  305. Conn,
  306. nil,
  307. )
  308. stream, _ := session.OpenStream()
  309. sndbuf := make(
  310. []byte,
  311. N,
  312. )
  313. for i := range sndbuf {
  314. sndbuf[i] = byte(
  315. rand.Int(),
  316. )
  317. }
  318. go stream.Write(
  319. sndbuf,
  320. )
  321. var rcvbuf bytes.Buffer
  322. nw, ew := stream.WriteTo(
  323. &rcvbuf,
  324. )
  325. if ew != io.EOF {
  326. t.Fatal(
  327. ew,
  328. )
  329. }
  330. if nw != N {
  331. t.Fatal(
  332. "WriteTo nw mismatch",
  333. nw,
  334. )
  335. }
  336. if !bytes.Equal(
  337. sndbuf,
  338. rcvbuf.Bytes(),
  339. ) {
  340. t.Fatal(
  341. "mismatched echo bytes",
  342. )
  343. }
  344. }
  345. func TestWriteToV2(
  346. t *testing.T,
  347. ) {
  348. defer u.Leakplug(
  349. t,
  350. )
  351. Config := smux.DefaultConfig()
  352. Config.Version = 2
  353. const N = 1 << 20
  354. // server
  355. ln, err := net.Listen(
  356. "tcp",
  357. "localhost:0",
  358. )
  359. if err != nil {
  360. t.Fatal(
  361. err,
  362. )
  363. }
  364. defer ln.Close()
  365. go func() {
  366. Conn, err := ln.Accept()
  367. if err != nil {
  368. return
  369. }
  370. session, _ := smux.Server(
  371. Conn,
  372. Config,
  373. )
  374. for {
  375. if stream, err := session.AcceptStream(); err == nil {
  376. go func(
  377. s io.ReadWriteCloser,
  378. ) {
  379. numBytes := 0
  380. buf := make(
  381. []byte,
  382. 65536,
  383. )
  384. for {
  385. n, err := s.Read(
  386. buf,
  387. )
  388. if err != nil {
  389. return
  390. }
  391. s.Write(
  392. buf[:n],
  393. )
  394. numBytes += n
  395. if numBytes == N {
  396. s.Close()
  397. return
  398. }
  399. }
  400. }(
  401. stream,
  402. )
  403. } else {
  404. return
  405. }
  406. }
  407. }()
  408. addr := ln.Addr().String()
  409. Conn, err := net.Dial(
  410. "tcp",
  411. addr,
  412. )
  413. if err != nil {
  414. t.Fatal(
  415. err,
  416. )
  417. }
  418. defer Conn.Close()
  419. // client
  420. session, _ := smux.Client(
  421. Conn,
  422. Config,
  423. )
  424. stream, _ := session.OpenStream()
  425. sndbuf := make(
  426. []byte,
  427. N,
  428. )
  429. for i := range sndbuf {
  430. sndbuf[i] = byte(
  431. rand.Int(),
  432. )
  433. }
  434. go stream.Write(
  435. sndbuf,
  436. )
  437. var rcvbuf bytes.Buffer
  438. nw, ew := stream.WriteTo(
  439. &rcvbuf,
  440. )
  441. if ew != io.EOF {
  442. t.Fatal(
  443. ew,
  444. )
  445. }
  446. if nw != N {
  447. t.Fatal(
  448. "WriteTo nw mismatch",
  449. nw,
  450. )
  451. }
  452. if !bytes.Equal(
  453. sndbuf,
  454. rcvbuf.Bytes(),
  455. ) {
  456. t.Fatal(
  457. "mismatched echo bytes",
  458. )
  459. }
  460. }
  461. func TestGetDieCh(
  462. t *testing.T,
  463. ) {
  464. defer u.Leakplug(
  465. t,
  466. )
  467. cs, ss, err := getSmuxStreamPair()
  468. if err != nil {
  469. t.Fatal(
  470. err,
  471. )
  472. }
  473. defer ss.Close()
  474. dieCh := ss.GetDieCh()
  475. go func() {
  476. select {
  477. case <-dieCh:
  478. case <-time.Tick(time.Second):
  479. t.Fatal(
  480. "wait die chan timeout",
  481. )
  482. }
  483. }()
  484. cs.Close()
  485. }
  486. func TestSpeed(
  487. t *testing.T,
  488. ) {
  489. defer u.Leakplug(
  490. t,
  491. )
  492. _, stop, cli, err := setupServer(
  493. t,
  494. )
  495. if err != nil {
  496. t.Fatal(
  497. err,
  498. )
  499. }
  500. defer stop()
  501. session, _ := smux.Client(
  502. cli,
  503. nil,
  504. )
  505. stream, _ := session.OpenStream()
  506. t.Log(stream.LocalAddr(), stream.RemoteAddr())
  507. start := time.Now()
  508. var wg sync.WaitGroup
  509. wg.Add(
  510. 1,
  511. )
  512. go func() {
  513. buf := make(
  514. []byte,
  515. 1024*1024,
  516. )
  517. nrecv := 0
  518. for {
  519. n, err := stream.Read(
  520. buf,
  521. )
  522. if err != nil {
  523. t.Error(
  524. err,
  525. )
  526. break
  527. } else {
  528. nrecv += n
  529. if nrecv == 4096*4096 {
  530. break
  531. }
  532. }
  533. }
  534. stream.Close()
  535. t.Log(
  536. "time for 16MB rtt",
  537. time.Since(
  538. start,
  539. ),
  540. )
  541. wg.Done()
  542. }()
  543. msg := make(
  544. []byte,
  545. 8192,
  546. )
  547. for i := 0; i < 2048; i++ {
  548. stream.Write(
  549. msg,
  550. )
  551. }
  552. wg.Wait()
  553. session.Close()
  554. }
  555. func TestParallel(
  556. t *testing.T,
  557. ) {
  558. defer u.Leakplug(
  559. t,
  560. )
  561. _, stop, cli, err := setupServer(
  562. t,
  563. )
  564. if err != nil {
  565. t.Fatal(
  566. err,
  567. )
  568. }
  569. defer stop()
  570. session, _ := smux.Client(
  571. cli,
  572. nil,
  573. )
  574. par := 1000
  575. messages := 100
  576. var wg sync.WaitGroup
  577. wg.Add(
  578. par,
  579. )
  580. for i := 0; i < par; i++ {
  581. stream, _ := session.OpenStream()
  582. go func(
  583. s *smux.Stream,
  584. ) {
  585. buf := make(
  586. []byte,
  587. 20,
  588. )
  589. for j := 0; j < messages; j++ {
  590. msg := fmt.Sprintf(
  591. "hello%v",
  592. j,
  593. )
  594. s.Write(
  595. []byte(msg),
  596. )
  597. if _, err := s.Read(
  598. buf,
  599. ); err != nil {
  600. break
  601. }
  602. }
  603. s.Close()
  604. wg.Done()
  605. }(
  606. stream,
  607. )
  608. }
  609. t.Log(
  610. "created",
  611. session.NumStreams(),
  612. "streams",
  613. )
  614. wg.Wait()
  615. session.Close()
  616. }
  617. func TestParallelV2(
  618. t *testing.T,
  619. ) {
  620. defer u.Leakplug(
  621. t,
  622. )
  623. Config := smux.DefaultConfig()
  624. Config.Version = 2
  625. _, stop, cli, err := setupServerV2(
  626. t,
  627. )
  628. if err != nil {
  629. t.Fatal(
  630. err,
  631. )
  632. }
  633. defer stop()
  634. session, _ := smux.Client(
  635. cli,
  636. Config,
  637. )
  638. par := 1000
  639. messages := 100
  640. var wg sync.WaitGroup
  641. wg.Add(
  642. par,
  643. )
  644. for i := 0; i < par; i++ {
  645. stream, _ := session.OpenStream()
  646. go func(
  647. s *smux.Stream,
  648. ) {
  649. buf := make(
  650. []byte,
  651. 20,
  652. )
  653. for j := 0; j < messages; j++ {
  654. msg := fmt.Sprintf(
  655. "hello%v",
  656. j,
  657. )
  658. s.Write(
  659. []byte(msg),
  660. )
  661. if _, err := s.Read(
  662. buf,
  663. ); err != nil {
  664. break
  665. }
  666. }
  667. s.Close()
  668. wg.Done()
  669. }(
  670. stream,
  671. )
  672. }
  673. t.Log(
  674. "created",
  675. session.NumStreams(),
  676. "streams",
  677. )
  678. wg.Wait()
  679. session.Close()
  680. }
  681. func TestCloseThenOpen(
  682. t *testing.T,
  683. ) {
  684. defer u.Leakplug(
  685. t,
  686. )
  687. _, stop, cli, err := setupServer(
  688. t,
  689. )
  690. if err != nil {
  691. t.Fatal(
  692. err,
  693. )
  694. }
  695. defer stop()
  696. session, _ := smux.Client(
  697. cli,
  698. nil,
  699. )
  700. session.Close()
  701. if _, err := session.OpenStream(); err == nil {
  702. t.Fatal(
  703. "opened after close",
  704. )
  705. }
  706. }
  707. func TestSessionDoubleClose(
  708. t *testing.T,
  709. ) {
  710. defer u.Leakplug(
  711. t,
  712. )
  713. _, stop, cli, err := setupServer(
  714. t,
  715. )
  716. if err != nil {
  717. t.Fatal(
  718. err,
  719. )
  720. }
  721. defer stop()
  722. session, _ := smux.Client(
  723. cli,
  724. nil,
  725. )
  726. session.Close()
  727. if err := session.Close(); err == nil {
  728. t.Fatal(
  729. "session double close doesn't return error",
  730. )
  731. }
  732. }
  733. func TestStreamDoubleClose(
  734. t *testing.T,
  735. ) {
  736. defer u.Leakplug(
  737. t,
  738. )
  739. _, stop, cli, err := setupServer(
  740. t,
  741. )
  742. if err != nil {
  743. t.Fatal(
  744. err,
  745. )
  746. }
  747. defer stop()
  748. session, _ := smux.Client(
  749. cli,
  750. nil,
  751. )
  752. stream, _ := session.OpenStream()
  753. stream.Close()
  754. if err := stream.Close(); err == nil {
  755. t.Fatal(
  756. "stream double close doesn't return error",
  757. )
  758. }
  759. session.Close()
  760. }
  761. func TestConcurrentClose(
  762. t *testing.T,
  763. ) {
  764. defer u.Leakplug(
  765. t,
  766. )
  767. _, stop, cli, err := setupServer(
  768. t,
  769. )
  770. if err != nil {
  771. t.Fatal(
  772. err,
  773. )
  774. }
  775. defer stop()
  776. session, _ := smux.Client(
  777. cli,
  778. nil,
  779. )
  780. numStreams := 100
  781. streams := make(
  782. []*smux.Stream,
  783. 0,
  784. numStreams,
  785. )
  786. var wg sync.WaitGroup
  787. wg.Add(
  788. numStreams,
  789. )
  790. for i := 0; i < 100; i++ {
  791. stream, _ := session.OpenStream()
  792. streams = append(
  793. streams,
  794. stream,
  795. )
  796. }
  797. for _, s := range streams {
  798. stream := s
  799. go func() {
  800. stream.Close()
  801. wg.Done()
  802. }()
  803. }
  804. session.Close()
  805. wg.Wait()
  806. }
  807. func TestTinyReadBuffer(
  808. t *testing.T,
  809. ) {
  810. defer u.Leakplug(
  811. t,
  812. )
  813. _, stop, cli, err := setupServer(
  814. t,
  815. )
  816. if err != nil {
  817. t.Fatal(
  818. err,
  819. )
  820. }
  821. defer stop()
  822. session, _ := smux.Client(
  823. cli,
  824. nil,
  825. )
  826. stream, _ := session.OpenStream()
  827. const N = 100
  828. tinybuf := make(
  829. []byte,
  830. 6,
  831. )
  832. var sent string
  833. var received string
  834. for i := 0; i < N; i++ {
  835. msg := fmt.Sprintf(
  836. "hello%v",
  837. i,
  838. )
  839. sent += msg
  840. nsent, err := stream.Write(
  841. []byte(msg),
  842. )
  843. if err != nil {
  844. t.Fatal(
  845. "cannot write",
  846. )
  847. }
  848. nrecv := 0
  849. for nrecv < nsent {
  850. if n, err := stream.Read(
  851. tinybuf,
  852. ); err == nil {
  853. nrecv += n
  854. received += string(tinybuf[:n])
  855. } else {
  856. t.Fatal(
  857. "cannot read with tiny buffer",
  858. )
  859. }
  860. }
  861. }
  862. if sent != received {
  863. t.Fatal(
  864. "data mimatch",
  865. )
  866. }
  867. session.Close()
  868. }
  869. func TestIsClose(
  870. t *testing.T,
  871. ) {
  872. defer u.Leakplug(
  873. t,
  874. )
  875. _, stop, cli, err := setupServer(
  876. t,
  877. )
  878. if err != nil {
  879. t.Fatal(
  880. err,
  881. )
  882. }
  883. defer stop()
  884. session, _ := smux.Client(
  885. cli,
  886. nil,
  887. )
  888. session.Close()
  889. if !session.IsClosed() {
  890. t.Fatal(
  891. "still open after close",
  892. )
  893. }
  894. }
  895. func TestKeepAliveTimeout(
  896. t *testing.T,
  897. ) {
  898. defer u.Leakplug(
  899. t,
  900. )
  901. ln, err := net.Listen(
  902. "tcp",
  903. "localhost:0",
  904. )
  905. if err != nil {
  906. t.Fatal(
  907. err,
  908. )
  909. }
  910. defer ln.Close()
  911. go func() {
  912. ln.Accept()
  913. }()
  914. cli, err := net.Dial(
  915. "tcp",
  916. ln.Addr().String(),
  917. )
  918. if err != nil {
  919. t.Fatal(
  920. err,
  921. )
  922. }
  923. defer cli.Close()
  924. Config := smux.DefaultConfig()
  925. Config.KeepAliveInterval = time.Second
  926. Config.KeepAliveTimeout = 2 * time.Second
  927. session, _ := smux.Client(
  928. cli,
  929. Config,
  930. )
  931. time.Sleep(3 * time.Second)
  932. if !session.IsClosed() {
  933. t.Fatal(
  934. "keepalive-timeout failed",
  935. )
  936. }
  937. }
  938. type blockWriteConn struct {
  939. net.Conn
  940. }
  941. func (
  942. c *blockWriteConn,
  943. ) Write(
  944. b []byte,
  945. ) (
  946. n int,
  947. err error,
  948. ) {
  949. forever := time.Hour * 24
  950. time.Sleep(
  951. forever,
  952. )
  953. return c.Conn.Write(
  954. b,
  955. )
  956. }
  957. func TestKeepAliveBlockWriteTimeout(
  958. t *testing.T,
  959. ) {
  960. defer u.Leakplug(
  961. t,
  962. )
  963. ln, err := net.Listen(
  964. "tcp",
  965. "localhost:0",
  966. )
  967. if err != nil {
  968. t.Fatal(
  969. err,
  970. )
  971. }
  972. defer ln.Close()
  973. go func() {
  974. ln.Accept()
  975. }()
  976. cli, err := net.Dial(
  977. "tcp",
  978. ln.Addr().String(),
  979. )
  980. if err != nil {
  981. t.Fatal(
  982. err,
  983. )
  984. }
  985. defer cli.Close()
  986. // when WriteFrame block, keepalive in old version never timeout
  987. blockWriteCli := &blockWriteConn{cli}
  988. Config := smux.DefaultConfig()
  989. Config.KeepAliveInterval = time.Second
  990. Config.KeepAliveTimeout = 2 * time.Second
  991. session, _ := smux.Client(
  992. blockWriteCli,
  993. Config,
  994. )
  995. time.Sleep(3 * time.Second)
  996. if !session.IsClosed() {
  997. t.Fatal(
  998. "keepalive-timeout failed",
  999. )
  1000. }
  1001. }
  1002. func TestServerEcho(
  1003. t *testing.T,
  1004. ) {
  1005. ln, err := net.Listen(
  1006. "tcp",
  1007. "localhost:0",
  1008. )
  1009. if err != nil {
  1010. t.Fatal(
  1011. err,
  1012. )
  1013. }
  1014. defer ln.Close()
  1015. go func() {
  1016. err := func() error {
  1017. Conn, err := ln.Accept()
  1018. if err != nil {
  1019. return err
  1020. }
  1021. defer Conn.Close()
  1022. session, err := smux.Server(
  1023. Conn,
  1024. nil,
  1025. )
  1026. if err != nil {
  1027. return err
  1028. }
  1029. defer session.Close()
  1030. buf := make(
  1031. []byte,
  1032. 10,
  1033. )
  1034. stream, err := session.OpenStream()
  1035. if err != nil {
  1036. return err
  1037. }
  1038. defer stream.Close()
  1039. for i := 0; i < 100; i++ {
  1040. msg := fmt.Sprintf(
  1041. "hello%v",
  1042. i,
  1043. )
  1044. stream.Write(
  1045. []byte(msg),
  1046. )
  1047. n, err := stream.Read(
  1048. buf,
  1049. )
  1050. if err != nil {
  1051. return err
  1052. }
  1053. if got := string(buf[:n]); got != msg {
  1054. return fmt.Errorf(
  1055. "got: %q, want: %q",
  1056. got,
  1057. msg,
  1058. )
  1059. }
  1060. }
  1061. return nil
  1062. }()
  1063. if err != nil {
  1064. t.Error(
  1065. err,
  1066. )
  1067. }
  1068. }()
  1069. cli, err := net.Dial(
  1070. "tcp",
  1071. ln.Addr().String(),
  1072. )
  1073. if err != nil {
  1074. t.Fatal(
  1075. err,
  1076. )
  1077. }
  1078. defer cli.Close()
  1079. if session, err := smux.Client(
  1080. cli,
  1081. nil,
  1082. ); err == nil {
  1083. if stream, err := session.AcceptStream(); err == nil {
  1084. buf := make(
  1085. []byte,
  1086. 65536,
  1087. )
  1088. for {
  1089. n, err := stream.Read(
  1090. buf,
  1091. )
  1092. if err != nil {
  1093. break
  1094. }
  1095. stream.Write(
  1096. buf[:n],
  1097. )
  1098. }
  1099. } else {
  1100. t.Fatal(
  1101. err,
  1102. )
  1103. }
  1104. } else {
  1105. t.Fatal(
  1106. err,
  1107. )
  1108. }
  1109. }
  1110. func TestSendWithoutRecv(
  1111. t *testing.T,
  1112. ) {
  1113. _, stop, cli, err := setupServer(
  1114. t,
  1115. )
  1116. if err != nil {
  1117. t.Fatal(
  1118. err,
  1119. )
  1120. }
  1121. defer stop()
  1122. session, _ := smux.Client(
  1123. cli,
  1124. nil,
  1125. )
  1126. stream, _ := session.OpenStream()
  1127. const N = 100
  1128. for i := 0; i < N; i++ {
  1129. msg := fmt.Sprintf(
  1130. "hello%v",
  1131. i,
  1132. )
  1133. stream.Write(
  1134. []byte(msg),
  1135. )
  1136. }
  1137. buf := make(
  1138. []byte,
  1139. 1,
  1140. )
  1141. if _, err := stream.Read(
  1142. buf,
  1143. ); err != nil {
  1144. t.Fatal(
  1145. err,
  1146. )
  1147. }
  1148. stream.Close()
  1149. }
  1150. func TestWriteAfterClose(
  1151. t *testing.T,
  1152. ) {
  1153. _, stop, cli, err := setupServer(
  1154. t,
  1155. )
  1156. if err != nil {
  1157. t.Fatal(
  1158. err,
  1159. )
  1160. }
  1161. defer stop()
  1162. session, _ := smux.Client(
  1163. cli,
  1164. nil,
  1165. )
  1166. stream, _ := session.OpenStream()
  1167. stream.Close()
  1168. if _, err := stream.Write(
  1169. []byte(
  1170. "write after close",
  1171. ),
  1172. ); err == nil {
  1173. t.Fatal(
  1174. "write after close failed",
  1175. )
  1176. }
  1177. }
  1178. func TestReadStreamAfterSessionClose(
  1179. t *testing.T,
  1180. ) {
  1181. _, stop, cli, err := setupServer(
  1182. t,
  1183. )
  1184. if err != nil {
  1185. t.Fatal(
  1186. err,
  1187. )
  1188. }
  1189. defer stop()
  1190. session, _ := smux.Client(
  1191. cli,
  1192. nil,
  1193. )
  1194. stream, _ := session.OpenStream()
  1195. session.Close()
  1196. buf := make(
  1197. []byte,
  1198. 10,
  1199. )
  1200. if _, err := stream.Read(
  1201. buf,
  1202. ); err != nil {
  1203. t.Log(
  1204. err,
  1205. )
  1206. } else {
  1207. t.Fatal(
  1208. "read stream after session close succeeded",
  1209. )
  1210. }
  1211. }
  1212. func TestWriteStreamAfterConnectionClose(
  1213. t *testing.T,
  1214. ) {
  1215. _, stop, cli, err := setupServer(
  1216. t,
  1217. )
  1218. if err != nil {
  1219. t.Fatal(
  1220. err,
  1221. )
  1222. }
  1223. defer stop()
  1224. session, _ := smux.Client(
  1225. cli,
  1226. nil,
  1227. )
  1228. stream, _ := session.OpenStream()
  1229. session.Conn.Close()
  1230. if _, err := stream.Write(
  1231. []byte(
  1232. "write after Connection close",
  1233. ),
  1234. ); err == nil {
  1235. t.Fatal(
  1236. "write after Connection close failed",
  1237. )
  1238. }
  1239. }
  1240. func TestNumStreamAfterClose(
  1241. t *testing.T,
  1242. ) {
  1243. _, stop, cli, err := setupServer(
  1244. t,
  1245. )
  1246. if err != nil {
  1247. t.Fatal(
  1248. err,
  1249. )
  1250. }
  1251. defer stop()
  1252. session, _ := smux.Client(
  1253. cli,
  1254. nil,
  1255. )
  1256. if _, err := session.OpenStream(); err == nil {
  1257. if session.NumStreams() != 1 {
  1258. t.Fatal(
  1259. "wrong number of streams after opened",
  1260. )
  1261. }
  1262. session.Close()
  1263. if session.NumStreams() != 0 {
  1264. t.Fatal(
  1265. "wrong number of streams after session closed",
  1266. )
  1267. }
  1268. } else {
  1269. t.Fatal(
  1270. err,
  1271. )
  1272. }
  1273. cli.Close()
  1274. }
  1275. func TestRandomFrame(
  1276. t *testing.T,
  1277. ) {
  1278. addr, stop, cli, err := setupServer(
  1279. t,
  1280. )
  1281. if err != nil {
  1282. t.Fatal(
  1283. err,
  1284. )
  1285. }
  1286. defer stop()
  1287. session, _ := smux.Client(
  1288. cli,
  1289. nil,
  1290. )
  1291. for i := 0; i < 100; i++ {
  1292. rnd := make(
  1293. []byte,
  1294. rand.Uint32()%1024,
  1295. )
  1296. io.ReadFull(
  1297. crand.Reader,
  1298. rnd,
  1299. )
  1300. session.Conn.Write(
  1301. rnd,
  1302. )
  1303. }
  1304. cli.Close()
  1305. // double syn
  1306. cli, err = net.Dial(
  1307. "tcp",
  1308. addr,
  1309. )
  1310. if err != nil {
  1311. t.Fatal(
  1312. err,
  1313. )
  1314. }
  1315. session, _ = smux.Client(
  1316. cli,
  1317. nil,
  1318. )
  1319. for i := 0; i < 100; i++ {
  1320. f := smux.NewFrame(
  1321. 1,
  1322. smux.CmdSyn,
  1323. 1000,
  1324. )
  1325. session.WriteFrame(
  1326. f,
  1327. )
  1328. }
  1329. cli.Close()
  1330. // random cmds
  1331. cli, err = net.Dial(
  1332. "tcp",
  1333. addr,
  1334. )
  1335. if err != nil {
  1336. t.Fatal(
  1337. err,
  1338. )
  1339. }
  1340. allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
  1341. session, _ = smux.Client(
  1342. cli,
  1343. nil,
  1344. )
  1345. for i := 0; i < 100; i++ {
  1346. f := smux.NewFrame(
  1347. 1,
  1348. allcmds[rand.Int()%len(allcmds)],
  1349. rand.Uint32(),
  1350. )
  1351. session.WriteFrame(
  1352. f,
  1353. )
  1354. }
  1355. cli.Close()
  1356. // random cmds & Sids
  1357. cli, err = net.Dial(
  1358. "tcp",
  1359. addr,
  1360. )
  1361. if err != nil {
  1362. t.Fatal(
  1363. err,
  1364. )
  1365. }
  1366. session, _ = smux.Client(
  1367. cli,
  1368. nil,
  1369. )
  1370. for i := 0; i < 100; i++ {
  1371. f := smux.NewFrame(
  1372. 1,
  1373. byte(rand.Uint32()),
  1374. rand.Uint32(),
  1375. )
  1376. session.WriteFrame(
  1377. f,
  1378. )
  1379. }
  1380. cli.Close()
  1381. // random version
  1382. cli, err = net.Dial(
  1383. "tcp",
  1384. addr,
  1385. )
  1386. if err != nil {
  1387. t.Fatal(
  1388. err,
  1389. )
  1390. }
  1391. session, _ = smux.Client(
  1392. cli,
  1393. nil,
  1394. )
  1395. for i := 0; i < 100; i++ {
  1396. f := smux.NewFrame(
  1397. 1,
  1398. byte(rand.Uint32()),
  1399. rand.Uint32(),
  1400. )
  1401. f.Ver = byte(
  1402. rand.Uint32(),
  1403. )
  1404. session.WriteFrame(
  1405. f,
  1406. )
  1407. }
  1408. cli.Close()
  1409. // incorrect size
  1410. cli, err = net.Dial(
  1411. "tcp",
  1412. addr,
  1413. )
  1414. if err != nil {
  1415. t.Fatal(
  1416. err,
  1417. )
  1418. }
  1419. session, _ = smux.Client(
  1420. cli,
  1421. nil,
  1422. )
  1423. f := smux.NewFrame(
  1424. 1,
  1425. byte(rand.Uint32()),
  1426. rand.Uint32(),
  1427. )
  1428. rnd := make(
  1429. []byte,
  1430. rand.Uint32()%1024,
  1431. )
  1432. io.ReadFull(
  1433. crand.Reader,
  1434. rnd,
  1435. )
  1436. f.Data = rnd
  1437. buf := make(
  1438. []byte,
  1439. smux.HeaderSize+len(
  1440. f.Data,
  1441. ),
  1442. )
  1443. buf[0] = f.Ver
  1444. buf[1] = f.Cmd
  1445. binary.LittleEndian.PutUint16(
  1446. buf[2:],
  1447. uint16(
  1448. len(
  1449. rnd,
  1450. )+1,
  1451. ),
  1452. ) // incorrect size
  1453. binary.LittleEndian.PutUint32(
  1454. buf[4:],
  1455. f.Sid,
  1456. )
  1457. copy(
  1458. buf[smux.HeaderSize:],
  1459. f.Data,
  1460. )
  1461. session.Conn.Write(
  1462. buf,
  1463. )
  1464. cli.Close()
  1465. // WriteFrame after die
  1466. cli, err = net.Dial(
  1467. "tcp",
  1468. addr,
  1469. )
  1470. if err != nil {
  1471. t.Fatal(
  1472. err,
  1473. )
  1474. }
  1475. session, _ = smux.Client(
  1476. cli,
  1477. nil,
  1478. )
  1479. // close first
  1480. session.Close()
  1481. for i := 0; i < 100; i++ {
  1482. f := smux.NewFrame(
  1483. 1,
  1484. byte(rand.Uint32()),
  1485. rand.Uint32(),
  1486. )
  1487. session.WriteFrame(
  1488. f,
  1489. )
  1490. }
  1491. }
  1492. func TestWriteFrameInternal(
  1493. t *testing.T,
  1494. ) {
  1495. addr, stop, cli, err := setupServer(
  1496. t,
  1497. )
  1498. if err != nil {
  1499. t.Fatal(
  1500. err,
  1501. )
  1502. }
  1503. defer stop()
  1504. session, _ := smux.Client(
  1505. cli,
  1506. nil,
  1507. )
  1508. for i := 0; i < 100; i++ {
  1509. rnd := make(
  1510. []byte,
  1511. rand.Uint32()%1024,
  1512. )
  1513. io.ReadFull(
  1514. crand.Reader,
  1515. rnd,
  1516. )
  1517. session.Conn.Write(
  1518. rnd,
  1519. )
  1520. }
  1521. cli.Close()
  1522. // WriteFrame after die
  1523. cli, err = net.Dial(
  1524. "tcp",
  1525. addr,
  1526. )
  1527. if err != nil {
  1528. t.Fatal(
  1529. err,
  1530. )
  1531. }
  1532. session, _ = smux.Client(
  1533. cli,
  1534. nil,
  1535. )
  1536. // close first
  1537. session.Close()
  1538. for i := 0; i < 100; i++ {
  1539. f := smux.NewFrame(
  1540. 1,
  1541. byte(rand.Uint32()),
  1542. rand.Uint32(),
  1543. )
  1544. session.WriteFrameInternal(
  1545. f,
  1546. time.After(
  1547. session.Config.KeepAliveTimeout,
  1548. ),
  1549. 0,
  1550. )
  1551. }
  1552. // random cmds
  1553. cli, err = net.Dial(
  1554. "tcp",
  1555. addr,
  1556. )
  1557. if err != nil {
  1558. t.Fatal(
  1559. err,
  1560. )
  1561. }
  1562. allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
  1563. session, _ = smux.Client(
  1564. cli,
  1565. nil,
  1566. )
  1567. for i := 0; i < 100; i++ {
  1568. f := smux.NewFrame(
  1569. 1,
  1570. allcmds[rand.Int()%len(allcmds)],
  1571. rand.Uint32(),
  1572. )
  1573. session.WriteFrameInternal(
  1574. f,
  1575. time.After(
  1576. session.Config.KeepAliveTimeout,
  1577. ),
  1578. 0,
  1579. )
  1580. }
  1581. // deadline occur
  1582. {
  1583. c := make(
  1584. chan time.Time,
  1585. )
  1586. close(
  1587. c,
  1588. )
  1589. f := smux.NewFrame(
  1590. 1,
  1591. allcmds[rand.Int()%len(allcmds)],
  1592. rand.Uint32(),
  1593. )
  1594. _, err := session.WriteFrameInternal(f, c, 0)
  1595. if !strings.Contains(err.Error(), "timeout") {
  1596. t.Fatal(
  1597. "write frame with deadline failed",
  1598. err,
  1599. )
  1600. }
  1601. netErr, ok := err.(net.Error)
  1602. if !ok {
  1603. t.Fatal(
  1604. "expected net.Error for timeout",
  1605. )
  1606. }
  1607. if netErr.Timeout() == false {
  1608. t.Fatal(
  1609. "expected Timeout() to be true on timeout error ",
  1610. err,
  1611. )
  1612. }
  1613. if netErr.Temporary() == false {
  1614. t.Fatal(
  1615. "expected Temporary() to be true on timeout error ",
  1616. err,
  1617. )
  1618. }
  1619. }
  1620. cli.Close()
  1621. {
  1622. cli, err = net.Dial(
  1623. "tcp",
  1624. addr,
  1625. )
  1626. if err != nil {
  1627. t.Fatal(
  1628. err,
  1629. )
  1630. }
  1631. Config := smux.DefaultConfig()
  1632. Config.KeepAliveInterval = time.Second
  1633. Config.KeepAliveTimeout = 2 * time.Second
  1634. session, _ = smux.Client(
  1635. &blockWriteConn{cli},
  1636. Config,
  1637. )
  1638. f := smux.NewFrame(
  1639. 1,
  1640. byte(rand.Uint32()),
  1641. rand.Uint32(),
  1642. )
  1643. c := make(
  1644. chan time.Time,
  1645. )
  1646. go func() {
  1647. // die first, deadline second, better for coverage
  1648. time.Sleep(
  1649. time.Second,
  1650. )
  1651. session.Close()
  1652. time.Sleep(
  1653. time.Second,
  1654. )
  1655. close(
  1656. c,
  1657. )
  1658. }()
  1659. _, err = session.WriteFrameInternal(
  1660. f,
  1661. c,
  1662. 0,
  1663. )
  1664. if !strings.Contains(
  1665. err.Error(),
  1666. "closed pipe",
  1667. ) {
  1668. t.Fatal(
  1669. "write frame with to closed Conn failed ",
  1670. err,
  1671. )
  1672. }
  1673. }
  1674. }
  1675. func TestReadDeadline(
  1676. t *testing.T,
  1677. ) {
  1678. _, stop, cli, err := setupServer(
  1679. t,
  1680. )
  1681. if err != nil {
  1682. t.Fatal(
  1683. err,
  1684. )
  1685. }
  1686. defer stop()
  1687. session, _ := smux.Client(
  1688. cli,
  1689. nil,
  1690. )
  1691. stream, _ := session.OpenStream()
  1692. const N = 100
  1693. buf := make(
  1694. []byte,
  1695. 10,
  1696. )
  1697. var readErr error
  1698. for i := 0; i < N; i++ {
  1699. stream.SetReadDeadline(
  1700. time.Now().Add(
  1701. -1 * time.Minute,
  1702. ),
  1703. )
  1704. if _, readErr = stream.Read(
  1705. buf,
  1706. ); readErr != nil {
  1707. break
  1708. }
  1709. }
  1710. if readErr != nil {
  1711. if !strings.Contains(
  1712. readErr.Error(),
  1713. "timeout",
  1714. ) {
  1715. t.Fatalf(
  1716. "Wrong error: %v",
  1717. readErr,
  1718. )
  1719. }
  1720. } else {
  1721. t.Fatal(
  1722. "No error when reading with past deadline",
  1723. )
  1724. }
  1725. session.Close()
  1726. }
  1727. func TestWriteDeadline(
  1728. t *testing.T,
  1729. ) {
  1730. _, stop, cli, err := setupServer(
  1731. t,
  1732. )
  1733. if err != nil {
  1734. t.Fatal(
  1735. err,
  1736. )
  1737. }
  1738. defer stop()
  1739. session, _ := smux.Client(
  1740. cli,
  1741. nil,
  1742. )
  1743. stream, _ := session.OpenStream()
  1744. buf := make(
  1745. []byte,
  1746. 10,
  1747. )
  1748. var writeErr error
  1749. for {
  1750. stream.SetWriteDeadline(
  1751. time.Now().Add(
  1752. -1 * time.Minute,
  1753. ),
  1754. )
  1755. if _, writeErr = stream.Write(
  1756. buf,
  1757. ); writeErr != nil {
  1758. if !strings.Contains(
  1759. writeErr.Error(),
  1760. "timeout",
  1761. ) {
  1762. t.Fatalf(
  1763. "Wrong error: %v",
  1764. writeErr,
  1765. )
  1766. }
  1767. break
  1768. }
  1769. }
  1770. session.Close()
  1771. }
  1772. func BenchmarkAcceptClose(
  1773. b *testing.B,
  1774. ) {
  1775. _, stop, cli, err := setupServer(
  1776. b,
  1777. )
  1778. if err != nil {
  1779. b.Fatal(
  1780. err,
  1781. )
  1782. }
  1783. defer stop()
  1784. session, _ := smux.Client(
  1785. cli,
  1786. nil,
  1787. )
  1788. for i := 0; i < b.N; i++ {
  1789. if stream, err := session.OpenStream(); err == nil {
  1790. stream.Close()
  1791. } else {
  1792. b.Fatal(
  1793. err,
  1794. )
  1795. }
  1796. }
  1797. }
  1798. func BenchmarkConnSmux(
  1799. b *testing.B,
  1800. ) {
  1801. cs, ss, err := getSmuxStreamPair()
  1802. if err != nil {
  1803. b.Fatal(
  1804. err,
  1805. )
  1806. }
  1807. defer cs.Close()
  1808. defer ss.Close()
  1809. bench(
  1810. b,
  1811. cs,
  1812. ss,
  1813. )
  1814. }
  1815. func BenchmarkConnTCP(
  1816. b *testing.B,
  1817. ) {
  1818. cs, ss, err := getTCPConnectionPair()
  1819. if err != nil {
  1820. b.Fatal(
  1821. err,
  1822. )
  1823. }
  1824. defer cs.Close()
  1825. defer ss.Close()
  1826. bench(
  1827. b,
  1828. cs,
  1829. ss,
  1830. )
  1831. }
  1832. func getSmuxStreamPair() (
  1833. *smux.Stream,
  1834. *smux.Stream,
  1835. error,
  1836. ) {
  1837. c1, c2, err := getTCPConnectionPair()
  1838. if err != nil {
  1839. return nil, nil, err
  1840. }
  1841. s, err := smux.Server(
  1842. c2,
  1843. nil,
  1844. )
  1845. if err != nil {
  1846. return nil, nil, err
  1847. }
  1848. c, err := smux.Client(
  1849. c1,
  1850. nil,
  1851. )
  1852. if err != nil {
  1853. return nil, nil, err
  1854. }
  1855. var ss *smux.Stream
  1856. done := make(
  1857. chan error,
  1858. )
  1859. go func() {
  1860. var rerr error
  1861. ss, rerr = s.AcceptStream()
  1862. done <- rerr
  1863. close(
  1864. done,
  1865. )
  1866. }()
  1867. cs, err := c.OpenStream()
  1868. if err != nil {
  1869. return nil, nil, err
  1870. }
  1871. err = <-done
  1872. if err != nil {
  1873. return nil, nil, err
  1874. }
  1875. return cs, ss, nil
  1876. }
  1877. func getTCPConnectionPair() (
  1878. net.Conn,
  1879. net.Conn,
  1880. error,
  1881. ) {
  1882. lst, err := net.Listen(
  1883. "tcp",
  1884. "localhost:0",
  1885. )
  1886. if err != nil {
  1887. return nil, nil, err
  1888. }
  1889. defer lst.Close()
  1890. var Conn0 net.Conn
  1891. var err0 error
  1892. done := make(
  1893. chan struct{},
  1894. )
  1895. go func() {
  1896. Conn0, err0 = lst.Accept()
  1897. close(
  1898. done,
  1899. )
  1900. }()
  1901. Conn1, err := net.Dial(
  1902. "tcp",
  1903. lst.Addr().String(),
  1904. )
  1905. if err != nil {
  1906. return nil, nil, err
  1907. }
  1908. <-done
  1909. if err0 != nil {
  1910. return nil, nil, err0
  1911. }
  1912. return Conn0, Conn1, nil
  1913. }
  1914. func bench(
  1915. b *testing.B,
  1916. rd io.Reader,
  1917. wr io.Writer,
  1918. ) {
  1919. buf := make(
  1920. []byte,
  1921. 128*1024,
  1922. )
  1923. buf2 := make(
  1924. []byte,
  1925. 128*1024,
  1926. )
  1927. b.SetBytes(
  1928. 128 * 1024,
  1929. )
  1930. b.ResetTimer()
  1931. b.ReportAllocs()
  1932. var wg sync.WaitGroup
  1933. wg.Add(
  1934. 1,
  1935. )
  1936. go func() {
  1937. defer wg.Done()
  1938. count := 0
  1939. for {
  1940. n, _ := rd.Read(
  1941. buf2,
  1942. )
  1943. count += n
  1944. if count == 128*1024*b.N {
  1945. return
  1946. }
  1947. }
  1948. }()
  1949. for i := 0; i < b.N; i++ {
  1950. wr.Write(
  1951. buf,
  1952. )
  1953. }
  1954. wg.Wait()
  1955. }