gfcp_sess_test.go 12 KB


  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  4. //
  5. // All rights reserved.
  6. //
  7. // All use of this code is governed by the MIT license.
  8. // The complete license is available in the LICENSE file.
  9. package gfcp_test
  10. import (
  11. "fmt"
  12. "io"
  13. "log"
  14. "net"
  15. "net/http"
  16. _ "net/http/pprof"
  17. "runtime"
  18. "sync"
  19. "testing"
  20. "time"
  21. "go.gridfinity.dev/gfcp"
  22. u "go.gridfinity.dev/leaktestfe"
  23. )
  24. const (
  25. portEcho = "127.0.0.1:9079"
  26. portSink = "127.0.0.1:19609"
  27. portTinyBufferEcho = "127.0.0.1:29609"
  28. portListerner = "127.0.0.1:9078"
  29. )
  30. func init() {
  31. go func() {
  32. log.Println(
  33. http.ListenAndServe(
  34. "127.0.0.1:8881",
  35. nil,
  36. ),
  37. )
  38. }()
  39. go echoServer()
  40. go sinkServer()
  41. go tinyBufferEchoServer()
  42. }
  43. func dialEcho() (
  44. *gfcp.UDPSession,
  45. error,
  46. ) {
  47. sess, err := gfcp.DialWithOptions(
  48. portEcho,
  49. 10,
  50. 3,
  51. )
  52. if err != nil {
  53. panic(
  54. err,
  55. )
  56. }
  57. sess.SetStreamMode(
  58. true,
  59. )
  60. sess.SetStreamMode(
  61. false,
  62. )
  63. sess.SetStreamMode(
  64. true,
  65. )
  66. sess.SetWindowSize(
  67. 1380,
  68. 1380,
  69. )
  70. sess.SetReadBuffer(
  71. 64 * 1024 * 1024,
  72. )
  73. sess.SetWriteBuffer(
  74. 64 * 1024 * 1024,
  75. )
  76. sess.SetStreamMode(
  77. true,
  78. )
  79. sess.SetNoDelay(
  80. 1,
  81. 10,
  82. 2,
  83. 1,
  84. )
  85. sess.SetMtu(
  86. 1400,
  87. )
  88. sess.SetMtu(
  89. 9000,
  90. )
  91. sess.SetMtu(
  92. 1400,
  93. )
  94. sess.SetACKNoDelay(
  95. true,
  96. )
  97. sess.SetACKNoDelay(
  98. false,
  99. )
  100. sess.SetDeadline(
  101. time.Now().Add(
  102. time.Minute,
  103. ),
  104. )
  105. return sess, err
  106. }
  107. func dialSink() (
  108. *gfcp.UDPSession,
  109. error,
  110. ) {
  111. sess, err := gfcp.DialWithOptions(
  112. portSink,
  113. 0,
  114. 0,
  115. )
  116. if err != nil {
  117. panic(
  118. err,
  119. )
  120. }
  121. sess.SetStreamMode(
  122. true,
  123. )
  124. sess.SetWindowSize(
  125. 1380,
  126. 1380,
  127. )
  128. sess.SetReadBuffer(
  129. 64 * 1024 * 1024,
  130. )
  131. sess.SetWriteBuffer(
  132. 64 * 1024 * 1024,
  133. )
  134. sess.SetStreamMode(
  135. true,
  136. )
  137. sess.SetNoDelay(
  138. 1,
  139. 10,
  140. 2,
  141. 1,
  142. )
  143. sess.SetMtu(
  144. 1400,
  145. )
  146. sess.SetACKNoDelay(
  147. false,
  148. )
  149. sess.SetDeadline(
  150. time.Now().Add(
  151. time.Minute,
  152. ),
  153. )
  154. return sess, err
  155. }
  156. func dialTinyBufferEcho() (
  157. *gfcp.UDPSession,
  158. error,
  159. ) {
  160. sess, err := gfcp.DialWithOptions(
  161. portTinyBufferEcho,
  162. 10,
  163. 3,
  164. )
  165. if err != nil {
  166. panic(
  167. err,
  168. )
  169. }
  170. return sess, err
  171. }
  172. func listenEcho() (
  173. net.Listener,
  174. error,
  175. ) {
  176. return gfcp.ListenWithOptions(
  177. portEcho,
  178. 10,
  179. 3,
  180. )
  181. }
  182. func listenTinyBufferEcho() (
  183. net.Listener,
  184. error,
  185. ) {
  186. return gfcp.ListenWithOptions(
  187. portTinyBufferEcho,
  188. 10,
  189. 3,
  190. )
  191. }
  192. func listenSink() (
  193. net.Listener,
  194. error,
  195. ) {
  196. return gfcp.ListenWithOptions(
  197. portSink,
  198. 0,
  199. 0,
  200. )
  201. }
  202. func echoServer() {
  203. l, err := listenEcho()
  204. if err != nil {
  205. panic(
  206. err,
  207. )
  208. }
  209. go func() {
  210. GFcplistener := l.(*gfcp.Listener)
  211. GFcplistener.SetReadBuffer(
  212. 64 * 1024 * 1024,
  213. )
  214. GFcplistener.SetWriteBuffer(
  215. 64 * 1024 * 1024,
  216. )
  217. GFcplistener.SetDSCP(
  218. 46,
  219. )
  220. for {
  221. s, err := l.Accept()
  222. if err != nil {
  223. return
  224. }
  225. s.(*gfcp.UDPSession).SetReadBuffer(
  226. 512 * 1024 * 1024,
  227. )
  228. s.(*gfcp.UDPSession).SetWriteBuffer(
  229. 512 * 1024 * 1024,
  230. )
  231. go handleEcho(s.(*gfcp.UDPSession))
  232. }
  233. }()
  234. }
  235. func sinkServer() {
  236. l, err := listenSink()
  237. if err != nil {
  238. panic(
  239. err,
  240. )
  241. }
  242. go func() {
  243. GFcplistener := l.(*gfcp.Listener)
  244. GFcplistener.SetReadBuffer(
  245. 64 * 1024 * 1024,
  246. )
  247. GFcplistener.SetWriteBuffer(
  248. 64 * 1024 * 1024,
  249. )
  250. GFcplistener.SetDSCP(
  251. 46,
  252. )
  253. for {
  254. s, err := l.Accept()
  255. if err != nil {
  256. return
  257. }
  258. go handleSink(s.(*gfcp.UDPSession))
  259. }
  260. }()
  261. }
  262. func tinyBufferEchoServer() {
  263. l, err := listenTinyBufferEcho()
  264. if err != nil {
  265. panic(
  266. err,
  267. )
  268. }
  269. go func() {
  270. for {
  271. s, err := l.Accept()
  272. if err != nil {
  273. return
  274. }
  275. go handleTinyBufferEcho(s.(*gfcp.UDPSession))
  276. }
  277. }()
  278. }
  279. func handleEcho(
  280. conn *gfcp.UDPSession,
  281. ) {
  282. conn.SetStreamMode(
  283. true,
  284. )
  285. conn.SetWindowSize(
  286. 8192,
  287. 8192,
  288. )
  289. conn.SetNoDelay(
  290. 1,
  291. 10,
  292. 2,
  293. 1,
  294. )
  295. conn.SetDSCP(
  296. 46,
  297. )
  298. conn.SetMtu(
  299. 1480,
  300. )
  301. conn.SetACKNoDelay(
  302. false,
  303. )
  304. conn.SetReadDeadline(
  305. time.Now().Add(
  306. time.Minute,
  307. ),
  308. )
  309. conn.SetWriteDeadline(
  310. time.Now().Add(
  311. time.Minute,
  312. ),
  313. )
  314. buf := make(
  315. []byte,
  316. 65536,
  317. )
  318. for {
  319. n, err := conn.Read(
  320. buf,
  321. )
  322. if err != nil {
  323. panic(
  324. err,
  325. )
  326. }
  327. conn.Write(
  328. buf[:n],
  329. )
  330. }
  331. }
  332. func handleSink(
  333. conn *gfcp.UDPSession,
  334. ) {
  335. conn.SetStreamMode(
  336. true,
  337. )
  338. conn.SetWindowSize(
  339. 8192,
  340. 8192,
  341. )
  342. conn.SetNoDelay(
  343. 1,
  344. 10,
  345. 2,
  346. 1,
  347. )
  348. conn.SetDSCP(
  349. 46,
  350. )
  351. conn.SetMtu(
  352. 1400,
  353. )
  354. conn.SetACKNoDelay(
  355. false,
  356. )
  357. conn.SetReadDeadline(
  358. time.Now().Add(
  359. time.Minute,
  360. ),
  361. )
  362. conn.SetWriteDeadline(
  363. time.Now().Add(
  364. time.Minute,
  365. ),
  366. )
  367. buf := make(
  368. []byte,
  369. 65536,
  370. )
  371. for {
  372. _, err := conn.Read(
  373. buf,
  374. )
  375. if err != nil {
  376. panic(
  377. err,
  378. )
  379. }
  380. }
  381. }
  382. func handleTinyBufferEcho(
  383. conn *gfcp.UDPSession,
  384. ) {
  385. conn.SetStreamMode(
  386. true,
  387. )
  388. buf := make(
  389. []byte,
  390. 2,
  391. )
  392. for {
  393. n, err := conn.Read(
  394. buf,
  395. )
  396. if err != nil {
  397. panic(
  398. err,
  399. )
  400. }
  401. conn.Write(
  402. buf[:n],
  403. )
  404. }
  405. }
  406. func TestTimeout(
  407. t *testing.T,
  408. ) {
  409. defer u.Leakplug(
  410. t,
  411. )
  412. cli, err := dialEcho()
  413. if err != nil {
  414. panic(
  415. err,
  416. )
  417. }
  418. buf := make(
  419. []byte,
  420. 10,
  421. )
  422. cli.SetDeadline(
  423. time.Now().Add(
  424. time.Second,
  425. ),
  426. )
  427. <-time.After(
  428. 2 * time.Second,
  429. )
  430. n, err := cli.Read(
  431. buf,
  432. )
  433. if n != 0 || err == nil {
  434. t.Fail()
  435. }
  436. cli.Close()
  437. }
  438. func TestSendRecv(
  439. t *testing.T,
  440. ) {
  441. defer u.Leakplug(
  442. t,
  443. )
  444. cli, err := dialEcho()
  445. if err != nil {
  446. panic(
  447. err,
  448. )
  449. }
  450. cli.SetWriteDelay(
  451. true,
  452. )
  453. cli.SetDUP(
  454. 1,
  455. )
  456. const (
  457. N = 100
  458. )
  459. buf := make(
  460. []byte,
  461. 10,
  462. )
  463. for i := 0; i < N; i++ {
  464. msg := fmt.Sprintf(
  465. "hello%v",
  466. i,
  467. )
  468. cli.Write(
  469. []byte(
  470. msg,
  471. ),
  472. )
  473. if n, err := cli.Read(
  474. buf,
  475. ); err == nil {
  476. if string(
  477. buf[:n],
  478. ) != msg {
  479. t.Fail()
  480. }
  481. } else {
  482. panic(
  483. err,
  484. )
  485. }
  486. }
  487. cli.Close()
  488. }
  489. func TestSendVector(
  490. t *testing.T,
  491. ) {
  492. defer u.Leakplug(
  493. t,
  494. )
  495. cli, err := dialEcho()
  496. if err != nil {
  497. panic(
  498. err,
  499. )
  500. }
  501. cli.SetWriteDelay(
  502. false,
  503. )
  504. const N = 100
  505. buf := make(
  506. []byte,
  507. 20,
  508. )
  509. v := make(
  510. [][]byte,
  511. 2,
  512. )
  513. for i := 0; i < N; i++ {
  514. v[0] = []byte(
  515. fmt.Sprintf(
  516. "holas%v",
  517. i,
  518. ))
  519. v[1] = []byte(
  520. fmt.Sprintf(
  521. "amigo%v",
  522. i,
  523. ))
  524. msg :=
  525. fmt.Sprintf(
  526. "holas%vamigo%v",
  527. i,
  528. i,
  529. )
  530. cli.WriteBuffers(
  531. v,
  532. )
  533. if n, err := cli.Read(
  534. buf,
  535. ); err == nil {
  536. if string(
  537. buf[:n],
  538. ) != msg {
  539. t.Error(
  540. string(
  541. buf[:n],
  542. ),
  543. msg,
  544. )
  545. }
  546. } else {
  547. panic(
  548. err,
  549. )
  550. }
  551. }
  552. cli.Close()
  553. }
  554. func TestTinyBufferReceiver(
  555. t *testing.T,
  556. ) {
  557. defer u.Leakplug(
  558. t,
  559. )
  560. cli, err := dialTinyBufferEcho()
  561. if err != nil {
  562. panic(
  563. err,
  564. )
  565. }
  566. const (
  567. N = 100
  568. )
  569. snd := byte(
  570. 0,
  571. )
  572. fillBuffer := func(
  573. buf []byte,
  574. ) {
  575. for i := 0; i < len(
  576. buf,
  577. ); i++ {
  578. buf[i] = snd
  579. snd++
  580. }
  581. }
  582. rcv := byte(
  583. 0,
  584. )
  585. check := func(
  586. buf []byte,
  587. ) bool {
  588. for i := 0; i < len(
  589. buf,
  590. ); i++ {
  591. if buf[i] != rcv {
  592. return false
  593. }
  594. rcv++
  595. }
  596. return true
  597. }
  598. sndbuf := make(
  599. []byte,
  600. 7,
  601. )
  602. rcvbuf := make(
  603. []byte,
  604. 7,
  605. )
  606. for i := 0; i < N; i++ {
  607. fillBuffer(
  608. sndbuf,
  609. )
  610. cli.Write(
  611. sndbuf,
  612. )
  613. if n, err := io.ReadFull(
  614. cli,
  615. rcvbuf,
  616. ); err == nil {
  617. if !check(
  618. rcvbuf[:n],
  619. ) {
  620. t.Fail()
  621. }
  622. } else {
  623. panic(
  624. err,
  625. )
  626. }
  627. }
  628. cli.Close()
  629. }
  630. func TestClose(
  631. t *testing.T,
  632. ) {
  633. defer u.Leakplug(
  634. t,
  635. )
  636. cli, err := dialEcho()
  637. if err != nil {
  638. panic(
  639. err,
  640. )
  641. }
  642. buf := make(
  643. []byte,
  644. 10,
  645. )
  646. cli.Close()
  647. if cli.Close() == nil {
  648. t.Fail()
  649. }
  650. n, err := cli.Write(
  651. buf,
  652. )
  653. if n != 0 || err == nil {
  654. t.Fail()
  655. }
  656. n, err = cli.Read(
  657. buf,
  658. )
  659. if n != 0 || err == nil {
  660. t.Fail()
  661. }
  662. cli.Close()
  663. }
  664. func TestParallel(
  665. t *testing.T,
  666. ) {
  667. concurrent := 128
  668. if runtime.GOOS == "darwin" {
  669. t.Log(
  670. "\n--- WARN: Running on macOS: Retargetting concurrency:\t32",
  671. )
  672. concurrent = 32
  673. }
  674. if runtime.GOOS == "windows" {
  675. t.Log(
  676. "\n--- WARN: Running on Windows: Retargetting concurrency:\t8",
  677. )
  678. concurrent = 8
  679. }
  680. t.Log(
  681. fmt.Sprintf(
  682. "\n--- INFO: Target concurrency:\t%v",
  683. concurrent,
  684. ),
  685. )
  686. t.Parallel()
  687. t.Log(
  688. fmt.Sprintf(
  689. "\tStage 1/2:\tGoroutines:\t%v",
  690. runtime.NumGoroutine(),
  691. ),
  692. )
  693. defer u.Leakplug(
  694. t,
  695. )
  696. var wg sync.WaitGroup
  697. wg.Add(
  698. concurrent,
  699. )
  700. for i := 0; i < concurrent; i++ {
  701. go parallelClient(
  702. &wg,
  703. )
  704. }
  705. t.Log(
  706. fmt.Sprintf(
  707. "\tStage 2/2:\tGoroutines:\t%v",
  708. runtime.NumGoroutine(),
  709. ),
  710. )
  711. wg.Wait()
  712. t.Log(
  713. fmt.Sprintf(
  714. "\tStage 2/3:\tGoroutines:\t%v",
  715. runtime.NumGoroutine(),
  716. ),
  717. )
  718. }
  719. func parallelClient(
  720. wg *sync.WaitGroup,
  721. ) (
  722. err error,
  723. ) {
  724. cli, err := dialEcho()
  725. if err != nil {
  726. panic(
  727. err,
  728. )
  729. }
  730. err = echoTester(
  731. cli,
  732. 64,
  733. 64,
  734. )
  735. wg.Done()
  736. return
  737. }
  738. func BenchmarkEchoSpeed1K(
  739. b *testing.B,
  740. ) {
  741. speedclient(
  742. b,
  743. 1*1000,
  744. )
  745. }
  746. func BenchmarkEchoSpeed4K(
  747. b *testing.B,
  748. ) {
  749. speedclient(
  750. b,
  751. 4*1000,
  752. )
  753. }
  754. func BenchmarkEchoSpeed64K(
  755. b *testing.B,
  756. ) {
  757. speedclient(
  758. b,
  759. 64*1000,
  760. )
  761. }
  762. func BenchmarkEchoSpeed256K(
  763. b *testing.B,
  764. ) {
  765. speedclient(
  766. b,
  767. 256*1000,
  768. )
  769. }
  770. func BenchmarkEchoSpeed512K(
  771. b *testing.B,
  772. ) {
  773. speedclient(
  774. b,
  775. 512*1000,
  776. )
  777. }
  778. func BenchmarkEchoSpeed1M(
  779. b *testing.B,
  780. ) {
  781. speedclient(
  782. b,
  783. 1*1000*1000,
  784. )
  785. }
  786. func BenchmarkEchoSpeed4M(
  787. b *testing.B,
  788. ) {
  789. speedclient(
  790. b,
  791. 4*1000*1000,
  792. )
  793. }
  794. func BenchmarkEchoSpeed8M(
  795. b *testing.B,
  796. ) {
  797. speedclient(
  798. b,
  799. 8*1000*1000,
  800. )
  801. }
  802. func speedclient(
  803. b *testing.B,
  804. nbytes int,
  805. ) {
  806. b.ReportAllocs()
  807. cli, err := dialEcho()
  808. if err != nil {
  809. panic(
  810. err,
  811. )
  812. }
  813. if err := echoTester(
  814. cli,
  815. nbytes,
  816. b.N,
  817. ); err != nil {
  818. b.Fail()
  819. }
  820. b.SetBytes(
  821. int64(
  822. nbytes,
  823. ),
  824. )
  825. }
  826. func BenchmarkSinkSpeed1K(
  827. b *testing.B,
  828. ) {
  829. sinkclient(
  830. b,
  831. 1*1000,
  832. )
  833. }
  834. func BenchmarkSinkSpeed4K(
  835. b *testing.B,
  836. ) {
  837. sinkclient(
  838. b,
  839. 4*1000,
  840. )
  841. }
  842. func BenchmarkSinkSpeed64K(
  843. b *testing.B,
  844. ) {
  845. sinkclient(
  846. b,
  847. 64*1000,
  848. )
  849. }
  850. func BenchmarkSinkSpeed256K(
  851. b *testing.B,
  852. ) {
  853. sinkclient(
  854. b,
  855. 256*1000,
  856. )
  857. }
  858. func BenchmarkSinkSpeed512K(
  859. b *testing.B,
  860. ) {
  861. sinkclient(
  862. b,
  863. 512*1000,
  864. )
  865. }
  866. func BenchmarkSinkSpeed1M(
  867. b *testing.B,
  868. ) {
  869. sinkclient(
  870. b,
  871. 1*1000*1000,
  872. )
  873. }
  874. func BenchmarkSinkSpeed4M(
  875. b *testing.B,
  876. ) {
  877. sinkclient(
  878. b,
  879. 4*1000*1000,
  880. )
  881. }
  882. func BenchmarkSinkSpeed8M(
  883. b *testing.B,
  884. ) {
  885. sinkclient(
  886. b,
  887. 8*1000*1000,
  888. )
  889. }
  890. func sinkclient(
  891. b *testing.B,
  892. nbytes int,
  893. ) {
  894. b.ReportAllocs()
  895. cli, err := dialSink()
  896. if err != nil {
  897. panic(
  898. err,
  899. )
  900. }
  901. sinkTester(
  902. cli,
  903. nbytes,
  904. b.N,
  905. )
  906. b.SetBytes(
  907. int64(
  908. nbytes,
  909. ),
  910. )
  911. }
  912. func echoTester(
  913. cli net.Conn,
  914. msglen,
  915. msgcount int,
  916. ) error {
  917. buf := make(
  918. []byte,
  919. msglen,
  920. )
  921. for i := 0; i < msgcount; i++ {
  922. if _, err := cli.Write(
  923. buf,
  924. ); err != nil {
  925. return err
  926. }
  927. nrecv := 0
  928. for {
  929. n, err := cli.Read(
  930. buf,
  931. )
  932. if err != nil {
  933. return err
  934. }
  935. nrecv += n
  936. if nrecv == msglen {
  937. break
  938. }
  939. }
  940. }
  941. return nil
  942. }
  943. func sinkTester(
  944. cli *gfcp.UDPSession,
  945. msglen,
  946. msgcount int,
  947. ) error {
  948. buf := make(
  949. []byte,
  950. msglen,
  951. )
  952. for i := 0; i < msgcount; i++ {
  953. if _, err := cli.Write(
  954. buf,
  955. ); err != nil {
  956. return err
  957. }
  958. }
  959. return nil
  960. }
  961. func TestSnsi(
  962. t *testing.T,
  963. ) {
  964. defer u.Leakplug(
  965. t,
  966. )
  967. t.Log(
  968. *gfcp.DefaultSnsi.Copy(),
  969. )
  970. t.Log(
  971. gfcp.DefaultSnsi.Header(),
  972. )
  973. t.Log(
  974. gfcp.DefaultSnsi.ToSlice(),
  975. )
  976. t.Log(
  977. "Resetting Snsi counters",
  978. )
  979. gfcp.DefaultSnsi.Reset()
  980. t.Log(
  981. gfcp.DefaultSnsi.ToSlice(),
  982. )
  983. }
  984. func TestListenerClose(
  985. t *testing.T,
  986. ) {
  987. defer u.Leakplug(
  988. t,
  989. )
  990. l, err := gfcp.ListenWithOptions(
  991. portListerner,
  992. 10,
  993. 3,
  994. )
  995. if err != nil {
  996. t.Fail()
  997. }
  998. l.SetReadDeadline(
  999. time.Now().Add(
  1000. 3 * time.Second,
  1001. ),
  1002. )
  1003. l.SetWriteDeadline(
  1004. time.Now().Add(
  1005. 3 * time.Second,
  1006. ),
  1007. )
  1008. l.SetDeadline(
  1009. time.Now().Add(
  1010. 3 * time.Second,
  1011. ),
  1012. )
  1013. time.Sleep(
  1014. 1 * time.Millisecond,
  1015. )
  1016. if _, err := l.Accept(); err == nil {
  1017. t.Fail()
  1018. }
  1019. l.Close()
  1020. fakeaddr, _ := net.ResolveUDPAddr(
  1021. "udp6",
  1022. "127.0.0.1:7162",
  1023. )
  1024. if l.CloseSession(
  1025. fakeaddr,
  1026. ) {
  1027. t.Fail()
  1028. }
  1029. }