gfcp_sess_test.go 12 KB


  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  4. // Copyright © 2021 Jeffrey H. Johnson <jeff@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package gfcp_test
  11. import (
  12. "fmt"
  13. "io"
  14. "log"
  15. "net"
  16. "net/http"
  17. _ "net/http/pprof"
  18. "runtime"
  19. "sync"
  20. "testing"
  21. "time"
  22. "go.gridfinity.dev/gfcp"
  23. u "go.gridfinity.dev/leaktestfe"
  24. )
  25. const (
  26. portEcho = "127.0.0.1:9079"
  27. portSink = "127.0.0.1:19609"
  28. portTinyBufferEcho = "127.0.0.1:29609"
  29. portListerner = "127.0.0.1:9078"
  30. )
  31. func init() {
  32. go func() {
  33. log.Println(
  34. http.ListenAndServe(
  35. "127.0.0.1:8881",
  36. nil,
  37. ),
  38. )
  39. }()
  40. go echoServer()
  41. go sinkServer()
  42. go tinyBufferEchoServer()
  43. }
  44. func dialEcho() (
  45. *gfcp.UDPSession,
  46. error,
  47. ) {
  48. sess, err := gfcp.DialWithOptions(
  49. portEcho,
  50. 10,
  51. 3,
  52. )
  53. if err != nil {
  54. panic(
  55. err,
  56. )
  57. }
  58. sess.SetStreamMode(
  59. true,
  60. )
  61. sess.SetStreamMode(
  62. false,
  63. )
  64. sess.SetStreamMode(
  65. true,
  66. )
  67. sess.SetWindowSize(
  68. 1380,
  69. 1380,
  70. )
  71. sess.SetReadBuffer(
  72. 64 * 1024 * 1024,
  73. )
  74. sess.SetWriteBuffer(
  75. 64 * 1024 * 1024,
  76. )
  77. sess.SetStreamMode(
  78. true,
  79. )
  80. sess.SetNoDelay(
  81. 1,
  82. 10,
  83. 2,
  84. 1,
  85. )
  86. sess.SetMtu(
  87. 1400,
  88. )
  89. sess.SetMtu(
  90. 9000,
  91. )
  92. sess.SetMtu(
  93. 1400,
  94. )
  95. sess.SetACKNoDelay(
  96. true,
  97. )
  98. sess.SetACKNoDelay(
  99. false,
  100. )
  101. sess.SetDeadline(
  102. time.Now().Add(
  103. time.Minute,
  104. ),
  105. )
  106. return sess, err
  107. }
  108. func dialSink() (
  109. *gfcp.UDPSession,
  110. error,
  111. ) {
  112. sess, err := gfcp.DialWithOptions(
  113. portSink,
  114. 0,
  115. 0,
  116. )
  117. if err != nil {
  118. panic(
  119. err,
  120. )
  121. }
  122. sess.SetStreamMode(
  123. true,
  124. )
  125. sess.SetWindowSize(
  126. 1380,
  127. 1380,
  128. )
  129. sess.SetReadBuffer(
  130. 64 * 1024 * 1024,
  131. )
  132. sess.SetWriteBuffer(
  133. 64 * 1024 * 1024,
  134. )
  135. sess.SetStreamMode(
  136. true,
  137. )
  138. sess.SetNoDelay(
  139. 1,
  140. 10,
  141. 2,
  142. 1,
  143. )
  144. sess.SetMtu(
  145. 1400,
  146. )
  147. sess.SetACKNoDelay(
  148. false,
  149. )
  150. sess.SetDeadline(
  151. time.Now().Add(
  152. time.Minute,
  153. ),
  154. )
  155. return sess, err
  156. }
  157. func dialTinyBufferEcho() (
  158. *gfcp.UDPSession,
  159. error,
  160. ) {
  161. sess, err := gfcp.DialWithOptions(
  162. portTinyBufferEcho,
  163. 10,
  164. 3,
  165. )
  166. if err != nil {
  167. panic(
  168. err,
  169. )
  170. }
  171. return sess, err
  172. }
  173. func listenEcho() (
  174. net.Listener,
  175. error,
  176. ) {
  177. return gfcp.ListenWithOptions(
  178. portEcho,
  179. 10,
  180. 3,
  181. )
  182. }
  183. func listenTinyBufferEcho() (
  184. net.Listener,
  185. error,
  186. ) {
  187. return gfcp.ListenWithOptions(
  188. portTinyBufferEcho,
  189. 10,
  190. 3,
  191. )
  192. }
  193. func listenSink() (
  194. net.Listener,
  195. error,
  196. ) {
  197. return gfcp.ListenWithOptions(
  198. portSink,
  199. 0,
  200. 0,
  201. )
  202. }
  203. func echoServer() {
  204. l, err := listenEcho()
  205. if err != nil {
  206. panic(
  207. err,
  208. )
  209. }
  210. go func() {
  211. GFcplistener := l.(*gfcp.Listener)
  212. GFcplistener.SetReadBuffer(
  213. 64 * 1024 * 1024,
  214. )
  215. GFcplistener.SetWriteBuffer(
  216. 64 * 1024 * 1024,
  217. )
  218. GFcplistener.SetDSCP(
  219. 46,
  220. )
  221. for {
  222. s, err := l.Accept()
  223. if err != nil {
  224. return
  225. }
  226. s.(*gfcp.UDPSession).SetReadBuffer(
  227. 512 * 1024 * 1024,
  228. )
  229. s.(*gfcp.UDPSession).SetWriteBuffer(
  230. 512 * 1024 * 1024,
  231. )
  232. go handleEcho(s.(*gfcp.UDPSession))
  233. }
  234. }()
  235. }
  236. func sinkServer() {
  237. l, err := listenSink()
  238. if err != nil {
  239. panic(
  240. err,
  241. )
  242. }
  243. go func() {
  244. GFcplistener := l.(*gfcp.Listener)
  245. GFcplistener.SetReadBuffer(
  246. 64 * 1024 * 1024,
  247. )
  248. GFcplistener.SetWriteBuffer(
  249. 64 * 1024 * 1024,
  250. )
  251. GFcplistener.SetDSCP(
  252. 46,
  253. )
  254. for {
  255. s, err := l.Accept()
  256. if err != nil {
  257. return
  258. }
  259. go handleSink(s.(*gfcp.UDPSession))
  260. }
  261. }()
  262. }
  263. func tinyBufferEchoServer() {
  264. l, err := listenTinyBufferEcho()
  265. if err != nil {
  266. panic(
  267. err,
  268. )
  269. }
  270. go func() {
  271. for {
  272. s, err := l.Accept()
  273. if err != nil {
  274. return
  275. }
  276. go handleTinyBufferEcho(s.(*gfcp.UDPSession))
  277. }
  278. }()
  279. }
  280. func handleEcho(
  281. conn *gfcp.UDPSession,
  282. ) {
  283. conn.SetStreamMode(
  284. true,
  285. )
  286. conn.SetWindowSize(
  287. 8192,
  288. 8192,
  289. )
  290. conn.SetNoDelay(
  291. 1,
  292. 10,
  293. 2,
  294. 1,
  295. )
  296. conn.SetDSCP(
  297. 46,
  298. )
  299. conn.SetMtu(
  300. 1480,
  301. )
  302. conn.SetACKNoDelay(
  303. false,
  304. )
  305. conn.SetReadDeadline(
  306. time.Now().Add(
  307. time.Minute,
  308. ),
  309. )
  310. conn.SetWriteDeadline(
  311. time.Now().Add(
  312. time.Minute,
  313. ),
  314. )
  315. buf := make(
  316. []byte,
  317. 65536,
  318. )
  319. for {
  320. n, err := conn.Read(
  321. buf,
  322. )
  323. if err != nil {
  324. panic(
  325. err,
  326. )
  327. }
  328. conn.Write(
  329. buf[:n],
  330. )
  331. }
  332. }
  333. func handleSink(
  334. conn *gfcp.UDPSession,
  335. ) {
  336. conn.SetStreamMode(
  337. true,
  338. )
  339. conn.SetWindowSize(
  340. 8192,
  341. 8192,
  342. )
  343. conn.SetNoDelay(
  344. 1,
  345. 10,
  346. 2,
  347. 1,
  348. )
  349. conn.SetDSCP(
  350. 46,
  351. )
  352. conn.SetMtu(
  353. 1400,
  354. )
  355. conn.SetACKNoDelay(
  356. false,
  357. )
  358. conn.SetReadDeadline(
  359. time.Now().Add(
  360. time.Minute,
  361. ),
  362. )
  363. conn.SetWriteDeadline(
  364. time.Now().Add(
  365. time.Minute,
  366. ),
  367. )
  368. buf := make(
  369. []byte,
  370. 65536,
  371. )
  372. for {
  373. _, err := conn.Read(
  374. buf,
  375. )
  376. if err != nil {
  377. panic(
  378. err,
  379. )
  380. }
  381. }
  382. }
  383. func handleTinyBufferEcho(
  384. conn *gfcp.UDPSession,
  385. ) {
  386. conn.SetStreamMode(
  387. true,
  388. )
  389. buf := make(
  390. []byte,
  391. 2,
  392. )
  393. for {
  394. n, err := conn.Read(
  395. buf,
  396. )
  397. if err != nil {
  398. panic(
  399. err,
  400. )
  401. }
  402. conn.Write(
  403. buf[:n],
  404. )
  405. }
  406. }
  407. func TestTimeout(
  408. t *testing.T,
  409. ) {
  410. defer u.Leakplug(
  411. t,
  412. )
  413. cli, err := dialEcho()
  414. if err != nil {
  415. panic(
  416. err,
  417. )
  418. }
  419. buf := make(
  420. []byte,
  421. 10,
  422. )
  423. cli.SetDeadline(
  424. time.Now().Add(
  425. time.Second,
  426. ),
  427. )
  428. <-time.After(
  429. 2 * time.Second,
  430. )
  431. n, err := cli.Read(
  432. buf,
  433. )
  434. if n != 0 || err == nil {
  435. t.Fail()
  436. }
  437. cli.Close()
  438. }
  439. func TestSendRecv(
  440. t *testing.T,
  441. ) {
  442. defer u.Leakplug(
  443. t,
  444. )
  445. cli, err := dialEcho()
  446. if err != nil {
  447. panic(
  448. err,
  449. )
  450. }
  451. cli.SetWriteDelay(
  452. true,
  453. )
  454. cli.SetDUP(
  455. 1,
  456. )
  457. const (
  458. N = 100
  459. )
  460. buf := make(
  461. []byte,
  462. 10,
  463. )
  464. for i := 0; i < N; i++ {
  465. msg := fmt.Sprintf(
  466. "hello%v",
  467. i,
  468. )
  469. cli.Write(
  470. []byte(
  471. msg,
  472. ),
  473. )
  474. if n, err := cli.Read(
  475. buf,
  476. ); err == nil {
  477. if string(
  478. buf[:n],
  479. ) != msg {
  480. t.Fail()
  481. }
  482. } else {
  483. panic(
  484. err,
  485. )
  486. }
  487. }
  488. cli.Close()
  489. }
  490. func TestSendVector(
  491. t *testing.T,
  492. ) {
  493. defer u.Leakplug(
  494. t,
  495. )
  496. cli, err := dialEcho()
  497. if err != nil {
  498. panic(
  499. err,
  500. )
  501. }
  502. cli.SetWriteDelay(
  503. false,
  504. )
  505. const N = 100
  506. buf := make(
  507. []byte,
  508. 20,
  509. )
  510. v := make(
  511. [][]byte,
  512. 2,
  513. )
  514. for i := 0; i < N; i++ {
  515. v[0] = []byte(
  516. fmt.Sprintf(
  517. "holas%v",
  518. i,
  519. ))
  520. v[1] = []byte(
  521. fmt.Sprintf(
  522. "amigo%v",
  523. i,
  524. ))
  525. msg :=
  526. fmt.Sprintf(
  527. "holas%vamigo%v",
  528. i,
  529. i,
  530. )
  531. cli.WriteBuffers(
  532. v,
  533. )
  534. if n, err := cli.Read(
  535. buf,
  536. ); err == nil {
  537. if string(
  538. buf[:n],
  539. ) != msg {
  540. t.Error(
  541. string(
  542. buf[:n],
  543. ),
  544. msg,
  545. )
  546. }
  547. } else {
  548. panic(
  549. err,
  550. )
  551. }
  552. }
  553. cli.Close()
  554. }
  555. func TestTinyBufferReceiver(
  556. t *testing.T,
  557. ) {
  558. defer u.Leakplug(
  559. t,
  560. )
  561. cli, err := dialTinyBufferEcho()
  562. if err != nil {
  563. panic(
  564. err,
  565. )
  566. }
  567. const (
  568. N = 100
  569. )
  570. snd := byte(
  571. 0,
  572. )
  573. fillBuffer := func(
  574. buf []byte,
  575. ) {
  576. for i := 0; i < len(
  577. buf,
  578. ); i++ {
  579. buf[i] = snd
  580. snd++
  581. }
  582. }
  583. rcv := byte(
  584. 0,
  585. )
  586. check := func(
  587. buf []byte,
  588. ) bool {
  589. for i := 0; i < len(
  590. buf,
  591. ); i++ {
  592. if buf[i] != rcv {
  593. return false
  594. }
  595. rcv++
  596. }
  597. return true
  598. }
  599. sndbuf := make(
  600. []byte,
  601. 7,
  602. )
  603. rcvbuf := make(
  604. []byte,
  605. 7,
  606. )
  607. for i := 0; i < N; i++ {
  608. fillBuffer(
  609. sndbuf,
  610. )
  611. cli.Write(
  612. sndbuf,
  613. )
  614. if n, err := io.ReadFull(
  615. cli,
  616. rcvbuf,
  617. ); err == nil {
  618. if !check(
  619. rcvbuf[:n],
  620. ) {
  621. t.Fail()
  622. }
  623. } else {
  624. panic(
  625. err,
  626. )
  627. }
  628. }
  629. cli.Close()
  630. }
  631. func TestClose(
  632. t *testing.T,
  633. ) {
  634. defer u.Leakplug(
  635. t,
  636. )
  637. cli, err := dialEcho()
  638. if err != nil {
  639. panic(
  640. err,
  641. )
  642. }
  643. buf := make(
  644. []byte,
  645. 10,
  646. )
  647. cli.Close()
  648. if cli.Close() == nil {
  649. t.Fail()
  650. }
  651. n, err := cli.Write(
  652. buf,
  653. )
  654. if n != 0 || err == nil {
  655. t.Fail()
  656. }
  657. n, err = cli.Read(
  658. buf,
  659. )
  660. if n != 0 || err == nil {
  661. t.Fail()
  662. }
  663. cli.Close()
  664. }
  665. func TestParallel(
  666. t *testing.T,
  667. ) {
  668. concurrent := 128
  669. if runtime.GOOS == "darwin" {
  670. t.Log(
  671. "\n--- WARN: Running on macOS: Retargetting concurrency:\t32",
  672. )
  673. concurrent = 32
  674. }
  675. if runtime.GOOS == "windows" {
  676. t.Log(
  677. "\n--- WARN: Running on Windows: Retargetting concurrency:\t8",
  678. )
  679. concurrent = 8
  680. }
  681. t.Log(
  682. fmt.Sprintf(
  683. "\n--- INFO: Target concurrency:\t%v",
  684. concurrent,
  685. ),
  686. )
  687. t.Parallel()
  688. t.Log(
  689. fmt.Sprintf(
  690. "\tStage 1/2:\tGoroutines:\t%v",
  691. runtime.NumGoroutine(),
  692. ),
  693. )
  694. defer u.Leakplug(
  695. t,
  696. )
  697. var wg sync.WaitGroup
  698. wg.Add(
  699. concurrent,
  700. )
  701. for i := 0; i < concurrent; i++ {
  702. go parallel_client(
  703. &wg,
  704. )
  705. }
  706. t.Log(
  707. fmt.Sprintf(
  708. "\tStage 2/2:\tGoroutines:\t%v",
  709. runtime.NumGoroutine(),
  710. ),
  711. )
  712. wg.Wait()
  713. t.Log(
  714. fmt.Sprintf(
  715. "\tStage 2/3:\tGoroutines:\t%v",
  716. runtime.NumGoroutine(),
  717. ),
  718. )
  719. }
  720. func parallel_client(
  721. wg *sync.WaitGroup,
  722. ) (
  723. err error,
  724. ) {
  725. cli, err := dialEcho()
  726. if err != nil {
  727. panic(
  728. err,
  729. )
  730. }
  731. err = echo_tester(
  732. cli,
  733. 64,
  734. 64,
  735. )
  736. wg.Done()
  737. return
  738. }
  739. func BenchmarkEchoSpeed1K(
  740. b *testing.B,
  741. ) {
  742. speedclient(
  743. b,
  744. 1*1000,
  745. )
  746. }
  747. func BenchmarkEchoSpeed4K(
  748. b *testing.B,
  749. ) {
  750. speedclient(
  751. b,
  752. 4*1000,
  753. )
  754. }
  755. func BenchmarkEchoSpeed64K(
  756. b *testing.B,
  757. ) {
  758. speedclient(
  759. b,
  760. 64*1000,
  761. )
  762. }
  763. func BenchmarkEchoSpeed256K(
  764. b *testing.B,
  765. ) {
  766. speedclient(
  767. b,
  768. 256*1000,
  769. )
  770. }
  771. func BenchmarkEchoSpeed512K(
  772. b *testing.B,
  773. ) {
  774. speedclient(
  775. b,
  776. 512*1000,
  777. )
  778. }
  779. func BenchmarkEchoSpeed1M(
  780. b *testing.B,
  781. ) {
  782. speedclient(
  783. b,
  784. 1*1000*1000,
  785. )
  786. }
  787. func BenchmarkEchoSpeed4M(
  788. b *testing.B,
  789. ) {
  790. speedclient(
  791. b,
  792. 4*1000*1000,
  793. )
  794. }
  795. func BenchmarkEchoSpeed8M(
  796. b *testing.B,
  797. ) {
  798. speedclient(
  799. b,
  800. 8*1000*1000,
  801. )
  802. }
  803. func speedclient(
  804. b *testing.B,
  805. nbytes int,
  806. ) {
  807. b.ReportAllocs()
  808. cli, err := dialEcho()
  809. if err != nil {
  810. panic(
  811. err,
  812. )
  813. }
  814. if err := echo_tester(
  815. cli,
  816. nbytes,
  817. b.N,
  818. ); err != nil {
  819. b.Fail()
  820. }
  821. b.SetBytes(
  822. int64(
  823. nbytes,
  824. ),
  825. )
  826. }
  827. func BenchmarkSinkSpeed1K(
  828. b *testing.B,
  829. ) {
  830. sinkclient(
  831. b,
  832. 1*1000,
  833. )
  834. }
  835. func BenchmarkSinkSpeed4K(
  836. b *testing.B,
  837. ) {
  838. sinkclient(
  839. b,
  840. 4*1000,
  841. )
  842. }
  843. func BenchmarkSinkSpeed64K(
  844. b *testing.B,
  845. ) {
  846. sinkclient(
  847. b,
  848. 64*1000,
  849. )
  850. }
  851. func BenchmarkSinkSpeed256K(
  852. b *testing.B,
  853. ) {
  854. sinkclient(
  855. b,
  856. 256*1000,
  857. )
  858. }
  859. func BenchmarkSinkSpeed512K(
  860. b *testing.B,
  861. ) {
  862. sinkclient(
  863. b,
  864. 512*1000,
  865. )
  866. }
  867. func BenchmarkSinkSpeed1M(
  868. b *testing.B,
  869. ) {
  870. sinkclient(
  871. b,
  872. 1*1000*1000,
  873. )
  874. }
  875. func BenchmarkSinkSpeed4M(
  876. b *testing.B,
  877. ) {
  878. sinkclient(
  879. b,
  880. 4*1000*1000,
  881. )
  882. }
  883. func BenchmarkSinkSpeed8M(
  884. b *testing.B,
  885. ) {
  886. sinkclient(
  887. b,
  888. 8*1000*1000,
  889. )
  890. }
  891. func sinkclient(
  892. b *testing.B,
  893. nbytes int,
  894. ) {
  895. b.ReportAllocs()
  896. cli, err := dialSink()
  897. if err != nil {
  898. panic(
  899. err,
  900. )
  901. }
  902. sink_tester(
  903. cli,
  904. nbytes,
  905. b.N,
  906. )
  907. b.SetBytes(
  908. int64(
  909. nbytes,
  910. ),
  911. )
  912. }
  913. func echo_tester(
  914. cli net.Conn,
  915. msglen,
  916. msgcount int,
  917. ) error {
  918. buf := make(
  919. []byte,
  920. msglen,
  921. )
  922. for i := 0; i < msgcount; i++ {
  923. if _, err := cli.Write(
  924. buf,
  925. ); err != nil {
  926. return err
  927. }
  928. nrecv := 0
  929. for {
  930. n, err := cli.Read(
  931. buf,
  932. )
  933. if err != nil {
  934. return err
  935. } else {
  936. nrecv += n
  937. if nrecv == msglen {
  938. break
  939. }
  940. }
  941. }
  942. }
  943. return nil
  944. }
  945. func sink_tester(
  946. cli *gfcp.UDPSession,
  947. msglen,
  948. msgcount int,
  949. ) error {
  950. buf := make(
  951. []byte,
  952. msglen,
  953. )
  954. for i := 0; i < msgcount; i++ {
  955. if _, err := cli.Write(
  956. buf,
  957. ); err != nil {
  958. return err
  959. }
  960. }
  961. return nil
  962. }
  963. func TestSnsi(
  964. t *testing.T,
  965. ) {
  966. defer u.Leakplug(
  967. t,
  968. )
  969. t.Log(
  970. *gfcp.DefaultSnsi.Copy(),
  971. )
  972. t.Log(
  973. gfcp.DefaultSnsi.Header(),
  974. )
  975. t.Log(
  976. gfcp.DefaultSnsi.ToSlice(),
  977. )
  978. t.Log(
  979. "Resetting Snsi counters",
  980. )
  981. gfcp.DefaultSnsi.Reset()
  982. t.Log(
  983. gfcp.DefaultSnsi.ToSlice(),
  984. )
  985. }
  986. func TestListenerClose(
  987. t *testing.T,
  988. ) {
  989. defer u.Leakplug(
  990. t,
  991. )
  992. l, err := gfcp.ListenWithOptions(
  993. portListerner,
  994. 10,
  995. 3,
  996. )
  997. if err != nil {
  998. t.Fail()
  999. }
  1000. l.SetReadDeadline(
  1001. time.Now().Add(
  1002. 3 * time.Second,
  1003. ),
  1004. )
  1005. l.SetWriteDeadline(
  1006. time.Now().Add(
  1007. 3 * time.Second,
  1008. ),
  1009. )
  1010. l.SetDeadline(
  1011. time.Now().Add(
  1012. 3 * time.Second,
  1013. ),
  1014. )
  1015. time.Sleep(
  1016. 1 * time.Millisecond,
  1017. )
  1018. if _, err := l.Accept(); err == nil {
  1019. t.Fail()
  1020. }
  1021. l.Close()
  1022. fakeaddr, _ := net.ResolveUDPAddr(
  1023. "udp6",
  1024. "127.0.0.1:7162",
  1025. )
  1026. if l.CloseSession(
  1027. fakeaddr,
  1028. ) {
  1029. t.Fail()
  1030. }
  1031. }