12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039 |
- package gfsmux_test
- import (
- "bytes"
- crand "crypto/rand"
- "encoding/binary"
- "fmt"
- "io"
- "log"
- "math/rand"
- "net"
- "net/http"
- _ "net/http/pprof"
- "strings"
- "sync"
- "testing"
- "time"
- smux "github.com/johnsonjh/gfsmux"
- u "github.com/johnsonjh/leaktestfe"
- )
- func init() {
- go func() {
- log.Println(
- http.ListenAndServe(
- "0.0.0.0:6060",
- nil,
- ),
- )
- }()
- }
- // setupServer starts new server listening on a random localhost port and
- // returns address of the server, function to stop the server, new client
- // Connection to this server or an error.
- func setupServer(
- tb testing.TB,
- ) (
- addr string,
- stopfunc func(),
- client net.Conn,
- err error,
- ) {
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- return "", nil, nil, err
- }
- go func() {
- Conn, err := ln.Accept()
- if err != nil {
- return
- }
- go handleConnection(
- Conn,
- )
- }()
- addr = ln.Addr().String()
- Conn, err := net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- ln.Close()
- return "", nil, nil, err
- }
- return ln.Addr().String(), func() { ln.Close() }, Conn, nil
- }
- func handleConnection(
- Conn net.Conn,
- ) {
- session, _ := smux.Server(
- Conn,
- nil,
- )
- for {
- if stream, err := session.AcceptStream(); err == nil {
- go func(
- s io.ReadWriteCloser,
- ) {
- buf := make(
- []byte,
- 65536,
- )
- for {
- n, err := s.Read(
- buf,
- )
- if err != nil {
- return
- }
- s.Write(
- buf[:n],
- )
- }
- }(
- stream,
- )
- } else {
- return
- }
- }
- }
- // setupServer starts new server listening on a random localhost port and
- // returns address of the server, function to stop the server, new client
- // Connection to this server or an error.
- func setupServerV2(
- tb testing.TB,
- ) (
- addr string,
- stopfunc func(),
- client net.Conn,
- err error,
- ) {
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- return "", nil, nil, err
- }
- go func() {
- Conn, err := ln.Accept()
- if err != nil {
- return
- }
- go handleConnectionV2(
- Conn,
- )
- }()
- addr = ln.Addr().String()
- Conn, err := net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- ln.Close()
- return "", nil, nil, err
- }
- return ln.Addr().String(), func() { ln.Close() }, Conn, nil
- }
- func handleConnectionV2(
- Conn net.Conn,
- ) {
- Config := smux.DefaultConfig()
- Config.Version = 2
- session, _ := smux.Server(
- Conn,
- Config,
- )
- for {
- if stream, err := session.AcceptStream(); err == nil {
- go func(
- s io.ReadWriteCloser,
- ) {
- buf := make(
- []byte,
- 65536,
- )
- for {
- n, err := s.Read(
- buf,
- )
- if err != nil {
- return
- }
- s.Write(
- buf[:n],
- )
- }
- }(
- stream,
- )
- } else {
- return
- }
- }
- }
- func TestEcho(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- const N = 100
- buf := make(
- []byte,
- 10,
- )
- var sent string
- var received string
- for i := 0; i < N; i++ {
- msg := fmt.Sprintf(
- "hello%v",
- i,
- )
- stream.Write(
- []byte(msg),
- )
- sent += msg
- if n, err := stream.Read(
- buf,
- ); err != nil {
- t.Fatal(
- err,
- )
- } else {
- received += string(buf[:n])
- }
- }
- if sent != received {
- t.Fatal(
- "data mimatch",
- )
- }
- session.Close()
- }
- func TestWriteTo(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- const N = 1 << 20
- // server
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ln.Close()
- go func() {
- Conn, err := ln.Accept()
- if err != nil {
- return
- }
- session, _ := smux.Server(
- Conn,
- nil,
- )
- for {
- if stream, err := session.AcceptStream(); err == nil {
- go func(
- s io.ReadWriteCloser,
- ) {
- numBytes := 0
- buf := make(
- []byte,
- 65536,
- )
- for {
- n, err := s.Read(
- buf,
- )
- if err != nil {
- return
- }
- s.Write(
- buf[:n],
- )
- numBytes += n
- if numBytes == N {
- s.Close()
- return
- }
- }
- }(
- stream,
- )
- } else {
- return
- }
- }
- }()
- addr := ln.Addr().String()
- Conn, err := net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer Conn.Close()
- // client
- session, _ := smux.Client(
- Conn,
- nil,
- )
- stream, _ := session.OpenStream()
- sndbuf := make(
- []byte,
- N,
- )
- for i := range sndbuf {
- sndbuf[i] = byte(
- rand.Int(),
- )
- }
- go stream.Write(
- sndbuf,
- )
- var rcvbuf bytes.Buffer
- nw, ew := stream.WriteTo(
- &rcvbuf,
- )
- if ew != io.EOF {
- t.Fatal(
- ew,
- )
- }
- if nw != N {
- t.Fatal(
- "WriteTo nw mismatch",
- nw,
- )
- }
- if !bytes.Equal(
- sndbuf,
- rcvbuf.Bytes(),
- ) {
- t.Fatal(
- "mismatched echo bytes",
- )
- }
- }
- func TestWriteToV2(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- Config := smux.DefaultConfig()
- Config.Version = 2
- const N = 1 << 20
- // server
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ln.Close()
- go func() {
- Conn, err := ln.Accept()
- if err != nil {
- return
- }
- session, _ := smux.Server(
- Conn,
- Config,
- )
- for {
- if stream, err := session.AcceptStream(); err == nil {
- go func(
- s io.ReadWriteCloser,
- ) {
- numBytes := 0
- buf := make(
- []byte,
- 65536,
- )
- for {
- n, err := s.Read(
- buf,
- )
- if err != nil {
- return
- }
- s.Write(
- buf[:n],
- )
- numBytes += n
- if numBytes == N {
- s.Close()
- return
- }
- }
- }(
- stream,
- )
- } else {
- return
- }
- }
- }()
- addr := ln.Addr().String()
- Conn, err := net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer Conn.Close()
- // client
- session, _ := smux.Client(
- Conn,
- Config,
- )
- stream, _ := session.OpenStream()
- sndbuf := make(
- []byte,
- N,
- )
- for i := range sndbuf {
- sndbuf[i] = byte(
- rand.Int(),
- )
- }
- go stream.Write(
- sndbuf,
- )
- var rcvbuf bytes.Buffer
- nw, ew := stream.WriteTo(
- &rcvbuf,
- )
- if ew != io.EOF {
- t.Fatal(
- ew,
- )
- }
- if nw != N {
- t.Fatal(
- "WriteTo nw mismatch",
- nw,
- )
- }
- if !bytes.Equal(
- sndbuf,
- rcvbuf.Bytes(),
- ) {
- t.Fatal(
- "mismatched echo bytes",
- )
- }
- }
- func TestGetDieCh(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- cs, ss, err := getSmuxStreamPair()
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ss.Close()
- dieCh := ss.GetDieCh()
- go func() {
- select {
- case <-dieCh:
- case <-time.Tick(time.Second):
- t.Fatal(
- "wait die chan timeout",
- )
- }
- }()
- cs.Close()
- }
- func TestSpeed(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- t.Log(stream.LocalAddr(), stream.RemoteAddr())
- start := time.Now()
- var wg sync.WaitGroup
- wg.Add(
- 1,
- )
- go func() {
- buf := make(
- []byte,
- 1024*1024,
- )
- nrecv := 0
- for {
- n, err := stream.Read(
- buf,
- )
- if err != nil {
- t.Error(
- err,
- )
- break
- } else {
- nrecv += n
- if nrecv == 4096*4096 {
- break
- }
- }
- }
- stream.Close()
- t.Log(
- "time for 16MB rtt",
- time.Since(
- start,
- ),
- )
- wg.Done()
- }()
- msg := make(
- []byte,
- 8192,
- )
- for i := 0; i < 2048; i++ {
- stream.Write(
- msg,
- )
- }
- wg.Wait()
- session.Close()
- }
- func TestParallel(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- par := 1000
- messages := 100
- var wg sync.WaitGroup
- wg.Add(
- par,
- )
- for i := 0; i < par; i++ {
- stream, _ := session.OpenStream()
- go func(
- s *smux.Stream,
- ) {
- buf := make(
- []byte,
- 20,
- )
- for j := 0; j < messages; j++ {
- msg := fmt.Sprintf(
- "hello%v",
- j,
- )
- s.Write(
- []byte(msg),
- )
- if _, err := s.Read(
- buf,
- ); err != nil {
- break
- }
- }
- s.Close()
- wg.Done()
- }(
- stream,
- )
- }
- t.Log(
- "created",
- session.NumStreams(),
- "streams",
- )
- wg.Wait()
- session.Close()
- }
- func TestParallelV2(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- Config := smux.DefaultConfig()
- Config.Version = 2
- _, stop, cli, err := setupServerV2(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- Config,
- )
- par := 1000
- messages := 100
- var wg sync.WaitGroup
- wg.Add(
- par,
- )
- for i := 0; i < par; i++ {
- stream, _ := session.OpenStream()
- go func(
- s *smux.Stream,
- ) {
- buf := make(
- []byte,
- 20,
- )
- for j := 0; j < messages; j++ {
- msg := fmt.Sprintf(
- "hello%v",
- j,
- )
- s.Write(
- []byte(msg),
- )
- if _, err := s.Read(
- buf,
- ); err != nil {
- break
- }
- }
- s.Close()
- wg.Done()
- }(
- stream,
- )
- }
- t.Log(
- "created",
- session.NumStreams(),
- "streams",
- )
- wg.Wait()
- session.Close()
- }
- func TestCloseThenOpen(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- session.Close()
- if _, err := session.OpenStream(); err == nil {
- t.Fatal(
- "opened after close",
- )
- }
- }
- func TestSessionDoubleClose(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- session.Close()
- if err := session.Close(); err == nil {
- t.Fatal(
- "session double close doesn't return error",
- )
- }
- }
- func TestStreamDoubleClose(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- stream.Close()
- if err := stream.Close(); err == nil {
- t.Fatal(
- "stream double close doesn't return error",
- )
- }
- session.Close()
- }
- func TestConcurrentClose(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- numStreams := 100
- streams := make(
- []*smux.Stream,
- 0,
- numStreams,
- )
- var wg sync.WaitGroup
- wg.Add(
- numStreams,
- )
- for i := 0; i < 100; i++ {
- stream, _ := session.OpenStream()
- streams = append(
- streams,
- stream,
- )
- }
- for _, s := range streams {
- stream := s
- go func() {
- stream.Close()
- wg.Done()
- }()
- }
- session.Close()
- wg.Wait()
- }
- func TestTinyReadBuffer(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- const N = 100
- tinybuf := make(
- []byte,
- 6,
- )
- var sent string
- var received string
- for i := 0; i < N; i++ {
- msg := fmt.Sprintf(
- "hello%v",
- i,
- )
- sent += msg
- nsent, err := stream.Write(
- []byte(msg),
- )
- if err != nil {
- t.Fatal(
- "cannot write",
- )
- }
- nrecv := 0
- for nrecv < nsent {
- if n, err := stream.Read(
- tinybuf,
- ); err == nil {
- nrecv += n
- received += string(tinybuf[:n])
- } else {
- t.Fatal(
- "cannot read with tiny buffer",
- )
- }
- }
- }
- if sent != received {
- t.Fatal(
- "data mimatch",
- )
- }
- session.Close()
- }
- func TestIsClose(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- session.Close()
- if !session.IsClosed() {
- t.Fatal(
- "still open after close",
- )
- }
- }
- func TestKeepAliveTimeout(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ln.Close()
- go func() {
- ln.Accept()
- }()
- cli, err := net.Dial(
- "tcp",
- ln.Addr().String(),
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer cli.Close()
- Config := smux.DefaultConfig()
- Config.KeepAliveInterval = time.Second
- Config.KeepAliveTimeout = 2 * time.Second
- session, _ := smux.Client(
- cli,
- Config,
- )
- time.Sleep(3 * time.Second)
- if !session.IsClosed() {
- t.Fatal(
- "keepalive-timeout failed",
- )
- }
- }
- type blockWriteConn struct {
- net.Conn
- }
- func (
- c *blockWriteConn,
- ) Write(
- b []byte,
- ) (
- n int,
- err error,
- ) {
- forever := time.Hour * 24
- time.Sleep(
- forever,
- )
- return c.Conn.Write(
- b,
- )
- }
- func TestKeepAliveBlockWriteTimeout(
- t *testing.T,
- ) {
- defer u.Leakplug(
- t,
- )
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ln.Close()
- go func() {
- ln.Accept()
- }()
- cli, err := net.Dial(
- "tcp",
- ln.Addr().String(),
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer cli.Close()
- // when WriteFrame block, keepalive in old version never timeout
- blockWriteCli := &blockWriteConn{cli}
- Config := smux.DefaultConfig()
- Config.KeepAliveInterval = time.Second
- Config.KeepAliveTimeout = 2 * time.Second
- session, _ := smux.Client(
- blockWriteCli,
- Config,
- )
- time.Sleep(3 * time.Second)
- if !session.IsClosed() {
- t.Fatal(
- "keepalive-timeout failed",
- )
- }
- }
- func TestServerEcho(
- t *testing.T,
- ) {
- ln, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer ln.Close()
- go func() {
- err := func() error {
- Conn, err := ln.Accept()
- if err != nil {
- return err
- }
- defer Conn.Close()
- session, err := smux.Server(
- Conn,
- nil,
- )
- if err != nil {
- return err
- }
- defer session.Close()
- buf := make(
- []byte,
- 10,
- )
- stream, err := session.OpenStream()
- if err != nil {
- return err
- }
- defer stream.Close()
- for i := 0; i < 100; i++ {
- msg := fmt.Sprintf(
- "hello%v",
- i,
- )
- stream.Write(
- []byte(msg),
- )
- n, err := stream.Read(
- buf,
- )
- if err != nil {
- return err
- }
- if got := string(buf[:n]); got != msg {
- return fmt.Errorf(
- "got: %q, want: %q",
- got,
- msg,
- )
- }
- }
- return nil
- }()
- if err != nil {
- t.Error(
- err,
- )
- }
- }()
- cli, err := net.Dial(
- "tcp",
- ln.Addr().String(),
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer cli.Close()
- if session, err := smux.Client(
- cli,
- nil,
- ); err == nil {
- if stream, err := session.AcceptStream(); err == nil {
- buf := make(
- []byte,
- 65536,
- )
- for {
- n, err := stream.Read(
- buf,
- )
- if err != nil {
- break
- }
- stream.Write(
- buf[:n],
- )
- }
- } else {
- t.Fatal(
- err,
- )
- }
- } else {
- t.Fatal(
- err,
- )
- }
- }
- func TestSendWithoutRecv(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- const N = 100
- for i := 0; i < N; i++ {
- msg := fmt.Sprintf(
- "hello%v",
- i,
- )
- stream.Write(
- []byte(msg),
- )
- }
- buf := make(
- []byte,
- 1,
- )
- if _, err := stream.Read(
- buf,
- ); err != nil {
- t.Fatal(
- err,
- )
- }
- stream.Close()
- }
- func TestWriteAfterClose(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- stream.Close()
- if _, err := stream.Write(
- []byte(
- "write after close",
- ),
- ); err == nil {
- t.Fatal(
- "write after close failed",
- )
- }
- }
- func TestReadStreamAfterSessionClose(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- session.Close()
- buf := make(
- []byte,
- 10,
- )
- if _, err := stream.Read(
- buf,
- ); err != nil {
- t.Log(
- err,
- )
- } else {
- t.Fatal(
- "read stream after session close succeeded",
- )
- }
- }
- func TestWriteStreamAfterConnectionClose(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- session.Conn.Close()
- if _, err := stream.Write(
- []byte(
- "write after Connection close",
- ),
- ); err == nil {
- t.Fatal(
- "write after Connection close failed",
- )
- }
- }
- func TestNumStreamAfterClose(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- if _, err := session.OpenStream(); err == nil {
- if session.NumStreams() != 1 {
- t.Fatal(
- "wrong number of streams after opened",
- )
- }
- session.Close()
- if session.NumStreams() != 0 {
- t.Fatal(
- "wrong number of streams after session closed",
- )
- }
- } else {
- t.Fatal(
- err,
- )
- }
- cli.Close()
- }
- func TestRandomFrame(
- t *testing.T,
- ) {
- addr, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- rnd := make(
- []byte,
- rand.Uint32()%1024,
- )
- io.ReadFull(
- crand.Reader,
- rnd,
- )
- session.Conn.Write(
- rnd,
- )
- }
- cli.Close()
- // double syn
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- smux.CmdSyn,
- 1000,
- )
- session.WriteFrame(
- f,
- )
- }
- cli.Close()
- // random cmds
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
- session, _ = smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- allcmds[rand.Int()%len(allcmds)],
- rand.Uint32(),
- )
- session.WriteFrame(
- f,
- )
- }
- cli.Close()
- // random cmds & Sids
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- session.WriteFrame(
- f,
- )
- }
- cli.Close()
- // random version
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- f.Ver = byte(
- rand.Uint32(),
- )
- session.WriteFrame(
- f,
- )
- }
- cli.Close()
- // incorrect size
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- rnd := make(
- []byte,
- rand.Uint32()%1024,
- )
- io.ReadFull(
- crand.Reader,
- rnd,
- )
- f.Data = rnd
- buf := make(
- []byte,
- smux.HeaderSize+len(
- f.Data,
- ),
- )
- buf[0] = f.Ver
- buf[1] = f.Cmd
- binary.LittleEndian.PutUint16(
- buf[2:],
- uint16(
- len(
- rnd,
- )+1,
- ),
- ) // incorrect size
- binary.LittleEndian.PutUint32(
- buf[4:],
- f.Sid,
- )
- copy(
- buf[smux.HeaderSize:],
- f.Data,
- )
- session.Conn.Write(
- buf,
- )
- cli.Close()
- // WriteFrame after die
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- // close first
- session.Close()
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- session.WriteFrame(
- f,
- )
- }
- }
- func TestWriteFrameInternal(
- t *testing.T,
- ) {
- addr, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- rnd := make(
- []byte,
- rand.Uint32()%1024,
- )
- io.ReadFull(
- crand.Reader,
- rnd,
- )
- session.Conn.Write(
- rnd,
- )
- }
- cli.Close()
- // WriteFrame after die
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- session, _ = smux.Client(
- cli,
- nil,
- )
- // close first
- session.Close()
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- session.WriteFrameInternal(
- f,
- time.After(
- session.Config.KeepAliveTimeout,
- ),
- 0,
- )
- }
- // random cmds
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- allcmds := []byte{smux.CmdSyn, smux.CmdFin, smux.CmdPsh, smux.CmdNop}
- session, _ = smux.Client(
- cli,
- nil,
- )
- for i := 0; i < 100; i++ {
- f := smux.NewFrame(
- 1,
- allcmds[rand.Int()%len(allcmds)],
- rand.Uint32(),
- )
- session.WriteFrameInternal(
- f,
- time.After(
- session.Config.KeepAliveTimeout,
- ),
- 0,
- )
- }
- // deadline occur
- {
- c := make(
- chan time.Time,
- )
- close(
- c,
- )
- f := smux.NewFrame(
- 1,
- allcmds[rand.Int()%len(allcmds)],
- rand.Uint32(),
- )
- _, err := session.WriteFrameInternal(f, c, 0)
- if !strings.Contains(err.Error(), "timeout") {
- t.Fatal(
- "write frame with deadline failed",
- err,
- )
- }
- netErr, ok := err.(net.Error)
- if !ok {
- t.Fatal(
- "expected net.Error for timeout",
- )
- }
- if netErr.Timeout() == false {
- t.Fatal(
- "expected Timeout() to be true on timeout error ",
- err,
- )
- }
- if netErr.Temporary() == false {
- t.Fatal(
- "expected Temporary() to be true on timeout error ",
- err,
- )
- }
- }
- cli.Close()
- {
- cli, err = net.Dial(
- "tcp",
- addr,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- Config := smux.DefaultConfig()
- Config.KeepAliveInterval = time.Second
- Config.KeepAliveTimeout = 2 * time.Second
- session, _ = smux.Client(
- &blockWriteConn{cli},
- Config,
- )
- f := smux.NewFrame(
- 1,
- byte(rand.Uint32()),
- rand.Uint32(),
- )
- c := make(
- chan time.Time,
- )
- go func() {
- // die first, deadline second, better for coverage
- time.Sleep(
- time.Second,
- )
- session.Close()
- time.Sleep(
- time.Second,
- )
- close(
- c,
- )
- }()
- _, err = session.WriteFrameInternal(
- f,
- c,
- 0,
- )
- if !strings.Contains(
- err.Error(),
- "closed pipe",
- ) {
- t.Fatal(
- "write frame with to closed Conn failed ",
- err,
- )
- }
- }
- }
- func TestReadDeadline(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- const N = 100
- buf := make(
- []byte,
- 10,
- )
- var readErr error
- for i := 0; i < N; i++ {
- stream.SetReadDeadline(
- time.Now().Add(
- -1 * time.Minute,
- ),
- )
- if _, readErr = stream.Read(
- buf,
- ); readErr != nil {
- break
- }
- }
- if readErr != nil {
- if !strings.Contains(
- readErr.Error(),
- "timeout",
- ) {
- t.Fatalf(
- "Wrong error: %v",
- readErr,
- )
- }
- } else {
- t.Fatal(
- "No error when reading with past deadline",
- )
- }
- session.Close()
- }
- func TestWriteDeadline(
- t *testing.T,
- ) {
- _, stop, cli, err := setupServer(
- t,
- )
- if err != nil {
- t.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- stream, _ := session.OpenStream()
- buf := make(
- []byte,
- 10,
- )
- var writeErr error
- for {
- stream.SetWriteDeadline(
- time.Now().Add(
- -1 * time.Minute,
- ),
- )
- if _, writeErr = stream.Write(
- buf,
- ); writeErr != nil {
- if !strings.Contains(
- writeErr.Error(),
- "timeout",
- ) {
- t.Fatalf(
- "Wrong error: %v",
- writeErr,
- )
- }
- break
- }
- }
- session.Close()
- }
- func BenchmarkAcceptClose(
- b *testing.B,
- ) {
- _, stop, cli, err := setupServer(
- b,
- )
- if err != nil {
- b.Fatal(
- err,
- )
- }
- defer stop()
- session, _ := smux.Client(
- cli,
- nil,
- )
- for i := 0; i < b.N; i++ {
- if stream, err := session.OpenStream(); err == nil {
- stream.Close()
- } else {
- b.Fatal(
- err,
- )
- }
- }
- }
- func BenchmarkConnSmux(
- b *testing.B,
- ) {
- cs, ss, err := getSmuxStreamPair()
- if err != nil {
- b.Fatal(
- err,
- )
- }
- defer cs.Close()
- defer ss.Close()
- bench(
- b,
- cs,
- ss,
- )
- }
- func BenchmarkConnTCP(
- b *testing.B,
- ) {
- cs, ss, err := getTCPConnectionPair()
- if err != nil {
- b.Fatal(
- err,
- )
- }
- defer cs.Close()
- defer ss.Close()
- bench(
- b,
- cs,
- ss,
- )
- }
- func getSmuxStreamPair() (
- *smux.Stream,
- *smux.Stream,
- error,
- ) {
- c1, c2, err := getTCPConnectionPair()
- if err != nil {
- return nil, nil, err
- }
- s, err := smux.Server(
- c2,
- nil,
- )
- if err != nil {
- return nil, nil, err
- }
- c, err := smux.Client(
- c1,
- nil,
- )
- if err != nil {
- return nil, nil, err
- }
- var ss *smux.Stream
- done := make(
- chan error,
- )
- go func() {
- var rerr error
- ss, rerr = s.AcceptStream()
- done <- rerr
- close(
- done,
- )
- }()
- cs, err := c.OpenStream()
- if err != nil {
- return nil, nil, err
- }
- err = <-done
- if err != nil {
- return nil, nil, err
- }
- return cs, ss, nil
- }
- func getTCPConnectionPair() (
- net.Conn,
- net.Conn,
- error,
- ) {
- lst, err := net.Listen(
- "tcp",
- "localhost:0",
- )
- if err != nil {
- return nil, nil, err
- }
- defer lst.Close()
- var Conn0 net.Conn
- var err0 error
- done := make(
- chan struct{},
- )
- go func() {
- Conn0, err0 = lst.Accept()
- close(
- done,
- )
- }()
- Conn1, err := net.Dial(
- "tcp",
- lst.Addr().String(),
- )
- if err != nil {
- return nil, nil, err
- }
- <-done
- if err0 != nil {
- return nil, nil, err0
- }
- return Conn0, Conn1, nil
- }
- func bench(
- b *testing.B,
- rd io.Reader,
- wr io.Writer,
- ) {
- buf := make(
- []byte,
- 128*1024,
- )
- buf2 := make(
- []byte,
- 128*1024,
- )
- b.SetBytes(
- 128 * 1024,
- )
- b.ResetTimer()
- b.ReportAllocs()
- var wg sync.WaitGroup
- wg.Add(
- 1,
- )
- go func() {
- defer wg.Done()
- count := 0
- for {
- n, _ := rd.Read(
- buf2,
- )
- count += n
- if count == 128*1024*b.N {
- return
- }
- }
- }()
- for i := 0; i < b.N; i++ {
- wr.Write(
- buf,
- )
- }
- wg.Wait()
- }
|