gfcp_sess_test.go 12 KB


  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2020 Gridfinity, LLC. <admin@gridfinity.com>.
  4. // Copyright © 2020 Jeffrey H. Johnson <jeff@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package gfcp_test
  11. import (
  12. "fmt"
  13. // "hash/fnv"
  14. "io"
  15. "log"
  16. "net"
  17. "net/http"
  18. _ "net/http/pprof"
  19. "runtime"
  20. "sync"
  21. "testing"
  22. "time"
  23. "go.gridfinity.dev/gfcp"
  24. u "go.gridfinity.dev/leaktestfe"
  25. // "golang.org/x/crypto/pbkdf2"
  26. )
  27. const (
  28. portEcho = "127.0.0.1:9079"
  29. portSink = "127.0.0.1:19609"
  30. portTinyBufferEcho = "127.0.0.1:29609"
  31. portListerner = "127.0.0.1:9078"
  32. )
  33. /*var (
  34. key = []byte(
  35. "testkey",
  36. )
  37. pass = pbkdf2.Key(
  38. key,
  39. []byte(portSink),
  40. 4096,
  41. 32,
  42. fnv.New128a,
  43. )
  44. )*/
  45. func init() {
  46. go func() {
  47. log.Println(
  48. http.ListenAndServe(
  49. "127.0.0.1:8881",
  50. nil,
  51. ),
  52. )
  53. }()
  54. go echoServer()
  55. go sinkServer()
  56. go tinyBufferEchoServer()
  57. }
  58. func dialEcho() (
  59. *gfcp.UDPSession,
  60. error,
  61. ) {
  62. sess, err := gfcp.DialWithOptions(
  63. portEcho,
  64. 10,
  65. 3,
  66. )
  67. if err != nil {
  68. panic(
  69. err,
  70. )
  71. }
  72. sess.SetStreamMode(
  73. true,
  74. )
  75. sess.SetStreamMode(
  76. false,
  77. )
  78. sess.SetStreamMode(
  79. true,
  80. )
  81. sess.SetWindowSize(
  82. 1380,
  83. 1380,
  84. )
  85. sess.SetReadBuffer(
  86. 64 * 1024 * 1024,
  87. )
  88. sess.SetWriteBuffer(
  89. 64 * 1024 * 1024,
  90. )
  91. sess.SetStreamMode(
  92. true,
  93. )
  94. sess.SetNoDelay(
  95. 1,
  96. 10,
  97. 2,
  98. 1,
  99. )
  100. sess.SetMtu(
  101. 1400,
  102. )
  103. sess.SetMtu(
  104. 9000,
  105. )
  106. sess.SetMtu(
  107. 1400,
  108. )
  109. sess.SetACKNoDelay(
  110. true,
  111. )
  112. sess.SetACKNoDelay(
  113. false,
  114. )
  115. sess.SetDeadline(
  116. time.Now().Add(
  117. time.Minute,
  118. ),
  119. )
  120. return sess, err
  121. }
  122. func dialSink() (
  123. *gfcp.UDPSession,
  124. error,
  125. ) {
  126. sess, err := gfcp.DialWithOptions(
  127. portSink,
  128. 0,
  129. 0,
  130. )
  131. if err != nil {
  132. panic(
  133. err,
  134. )
  135. }
  136. sess.SetStreamMode(
  137. true,
  138. )
  139. sess.SetWindowSize(
  140. 1380,
  141. 1380,
  142. )
  143. sess.SetReadBuffer(
  144. 64 * 1024 * 1024,
  145. )
  146. sess.SetWriteBuffer(
  147. 64 * 1024 * 1024,
  148. )
  149. sess.SetStreamMode(
  150. true,
  151. )
  152. sess.SetNoDelay(
  153. 1,
  154. 10,
  155. 2,
  156. 1,
  157. )
  158. sess.SetMtu(
  159. 1400,
  160. )
  161. sess.SetACKNoDelay(
  162. false,
  163. )
  164. sess.SetDeadline(
  165. time.Now().Add(
  166. time.Minute,
  167. ),
  168. )
  169. return sess, err
  170. }
  171. func dialTinyBufferEcho() (
  172. *gfcp.UDPSession,
  173. error,
  174. ) {
  175. sess, err := gfcp.DialWithOptions(
  176. portTinyBufferEcho,
  177. 10,
  178. 3,
  179. )
  180. if err != nil {
  181. panic(
  182. err,
  183. )
  184. }
  185. return sess, err
  186. }
  187. func listenEcho() (
  188. net.Listener,
  189. error,
  190. ) {
  191. return gfcp.ListenWithOptions(
  192. portEcho,
  193. 10,
  194. 3,
  195. )
  196. }
  197. func listenTinyBufferEcho() (
  198. net.Listener,
  199. error,
  200. ) {
  201. return gfcp.ListenWithOptions(
  202. portTinyBufferEcho,
  203. 10,
  204. 3,
  205. )
  206. }
  207. func listenSink() (
  208. net.Listener,
  209. error,
  210. ) {
  211. return gfcp.ListenWithOptions(
  212. portSink,
  213. 0,
  214. 0,
  215. )
  216. }
  217. func echoServer() {
  218. l, err := listenEcho()
  219. if err != nil {
  220. panic(
  221. err,
  222. )
  223. }
  224. go func() {
  225. GFcplistener := l.(*gfcp.Listener)
  226. GFcplistener.SetReadBuffer(
  227. 64 * 1024 * 1024,
  228. )
  229. GFcplistener.SetWriteBuffer(
  230. 64 * 1024 * 1024,
  231. )
  232. GFcplistener.SetDSCP(
  233. 46,
  234. )
  235. for {
  236. s, err := l.Accept()
  237. if err != nil {
  238. return
  239. }
  240. s.(*gfcp.UDPSession).SetReadBuffer(
  241. 512 * 1024 * 1024,
  242. )
  243. s.(*gfcp.UDPSession).SetWriteBuffer(
  244. 512 * 1024 * 1024,
  245. )
  246. go handleEcho(s.(*gfcp.UDPSession))
  247. }
  248. }()
  249. }
  250. func sinkServer() {
  251. l, err := listenSink()
  252. if err != nil {
  253. panic(
  254. err,
  255. )
  256. }
  257. go func() {
  258. GFcplistener := l.(*gfcp.Listener)
  259. GFcplistener.SetReadBuffer(
  260. 64 * 1024 * 1024,
  261. )
  262. GFcplistener.SetWriteBuffer(
  263. 64 * 1024 * 1024,
  264. )
  265. GFcplistener.SetDSCP(
  266. 46,
  267. )
  268. for {
  269. s, err := l.Accept()
  270. if err != nil {
  271. return
  272. }
  273. go handleSink(s.(*gfcp.UDPSession))
  274. }
  275. }()
  276. }
  277. func tinyBufferEchoServer() {
  278. l, err := listenTinyBufferEcho()
  279. if err != nil {
  280. panic(
  281. err,
  282. )
  283. }
  284. go func() {
  285. for {
  286. s, err := l.Accept()
  287. if err != nil {
  288. return
  289. }
  290. go handleTinyBufferEcho(s.(*gfcp.UDPSession))
  291. }
  292. }()
  293. }
  294. func handleEcho(
  295. conn *gfcp.UDPSession,
  296. ) {
  297. conn.SetStreamMode(
  298. true,
  299. )
  300. conn.SetWindowSize(
  301. 8192,
  302. 8192,
  303. )
  304. conn.SetNoDelay(
  305. 1,
  306. 10,
  307. 2,
  308. 1,
  309. )
  310. conn.SetDSCP(
  311. 46,
  312. )
  313. conn.SetMtu(
  314. 1480,
  315. )
  316. conn.SetACKNoDelay(
  317. false,
  318. )
  319. conn.SetReadDeadline(
  320. time.Now().Add(
  321. time.Minute,
  322. ),
  323. )
  324. conn.SetWriteDeadline(
  325. time.Now().Add(
  326. time.Minute,
  327. ),
  328. )
  329. buf := make(
  330. []byte,
  331. 65536,
  332. )
  333. for {
  334. n, err := conn.Read(
  335. buf,
  336. )
  337. if err != nil {
  338. panic(
  339. err,
  340. )
  341. }
  342. conn.Write(
  343. buf[:n],
  344. )
  345. }
  346. }
  347. func handleSink(
  348. conn *gfcp.UDPSession,
  349. ) {
  350. conn.SetStreamMode(
  351. true,
  352. )
  353. conn.SetWindowSize(
  354. 8192,
  355. 8192,
  356. )
  357. conn.SetNoDelay(
  358. 1,
  359. 10,
  360. 2,
  361. 1,
  362. )
  363. conn.SetDSCP(
  364. 46,
  365. )
  366. conn.SetMtu(
  367. 1400,
  368. )
  369. conn.SetACKNoDelay(
  370. false,
  371. )
  372. conn.SetReadDeadline(
  373. time.Now().Add(
  374. time.Minute,
  375. ),
  376. )
  377. conn.SetWriteDeadline(
  378. time.Now().Add(
  379. time.Minute,
  380. ),
  381. )
  382. buf := make(
  383. []byte,
  384. 65536,
  385. )
  386. for {
  387. _, err := conn.Read(
  388. buf,
  389. )
  390. if err != nil {
  391. panic(
  392. err,
  393. )
  394. }
  395. }
  396. }
  397. func handleTinyBufferEcho(
  398. conn *gfcp.UDPSession,
  399. ) {
  400. conn.SetStreamMode(
  401. true,
  402. )
  403. buf := make(
  404. []byte,
  405. 2,
  406. )
  407. for {
  408. n, err := conn.Read(
  409. buf,
  410. )
  411. if err != nil {
  412. panic(
  413. err,
  414. )
  415. }
  416. conn.Write(
  417. buf[:n],
  418. )
  419. }
  420. }
  421. func TestTimeout(
  422. t *testing.T,
  423. ) {
  424. defer u.Leakplug(
  425. t,
  426. )
  427. cli, err := dialEcho()
  428. if err != nil {
  429. panic(
  430. err,
  431. )
  432. }
  433. buf := make(
  434. []byte,
  435. 10,
  436. )
  437. cli.SetDeadline(
  438. time.Now().Add(
  439. time.Second,
  440. ),
  441. )
  442. <-time.After(
  443. 2 * time.Second,
  444. )
  445. n, err := cli.Read(
  446. buf,
  447. )
  448. if n != 0 || err == nil {
  449. t.Fail()
  450. }
  451. cli.Close()
  452. }
  453. func TestSendRecv(
  454. t *testing.T,
  455. ) {
  456. defer u.Leakplug(
  457. t,
  458. )
  459. cli, err := dialEcho()
  460. if err != nil {
  461. panic(
  462. err,
  463. )
  464. }
  465. cli.SetWriteDelay(
  466. true,
  467. )
  468. cli.SetDUP(
  469. 1,
  470. )
  471. const (
  472. N = 100
  473. )
  474. buf := make(
  475. []byte,
  476. 10,
  477. )
  478. for i := 0; i < N; i++ {
  479. msg := fmt.Sprintf(
  480. "hello%v",
  481. i,
  482. )
  483. cli.Write(
  484. []byte(
  485. msg,
  486. ),
  487. )
  488. if n, err := cli.Read(
  489. buf,
  490. ); err == nil {
  491. if string(
  492. buf[:n],
  493. ) != msg {
  494. t.Fail()
  495. }
  496. } else {
  497. panic(
  498. err,
  499. )
  500. }
  501. }
  502. cli.Close()
  503. }
  504. func TestSendVector(
  505. t *testing.T,
  506. ) {
  507. defer u.Leakplug(
  508. t,
  509. )
  510. cli, err := dialEcho()
  511. if err != nil {
  512. panic(
  513. err,
  514. )
  515. }
  516. cli.SetWriteDelay(
  517. false,
  518. )
  519. const N = 100
  520. buf := make(
  521. []byte,
  522. 20,
  523. )
  524. v := make(
  525. [][]byte,
  526. 2,
  527. )
  528. for i := 0; i < N; i++ {
  529. v[0] = []byte(
  530. fmt.Sprintf(
  531. "holas%v",
  532. i,
  533. ))
  534. v[1] = []byte(
  535. fmt.Sprintf(
  536. "amigo%v",
  537. i,
  538. ))
  539. msg :=
  540. fmt.Sprintf(
  541. "holas%vamigo%v",
  542. i,
  543. i,
  544. )
  545. cli.WriteBuffers(
  546. v,
  547. )
  548. if n, err := cli.Read(
  549. buf,
  550. ); err == nil {
  551. if string(
  552. buf[:n],
  553. ) != msg {
  554. t.Error(
  555. string(
  556. buf[:n],
  557. ),
  558. msg,
  559. )
  560. }
  561. } else {
  562. panic(
  563. err,
  564. )
  565. }
  566. }
  567. cli.Close()
  568. }
  569. func TestTinyBufferReceiver(
  570. t *testing.T,
  571. ) {
  572. defer u.Leakplug(
  573. t,
  574. )
  575. cli, err := dialTinyBufferEcho()
  576. if err != nil {
  577. panic(
  578. err,
  579. )
  580. }
  581. const (
  582. N = 100
  583. )
  584. snd := byte(
  585. 0,
  586. )
  587. fillBuffer := func(
  588. buf []byte,
  589. ) {
  590. for i := 0; i < len(
  591. buf,
  592. ); i++ {
  593. buf[i] = snd
  594. snd++
  595. }
  596. }
  597. rcv := byte(
  598. 0,
  599. )
  600. check := func(
  601. buf []byte,
  602. ) bool {
  603. for i := 0; i < len(
  604. buf,
  605. ); i++ {
  606. if buf[i] != rcv {
  607. return false
  608. }
  609. rcv++
  610. }
  611. return true
  612. }
  613. sndbuf := make(
  614. []byte,
  615. 7,
  616. )
  617. rcvbuf := make(
  618. []byte,
  619. 7,
  620. )
  621. for i := 0; i < N; i++ {
  622. fillBuffer(
  623. sndbuf,
  624. )
  625. cli.Write(
  626. sndbuf,
  627. )
  628. if n, err := io.ReadFull(
  629. cli,
  630. rcvbuf,
  631. ); err == nil {
  632. if !check(
  633. rcvbuf[:n],
  634. ) {
  635. t.Fail()
  636. }
  637. } else {
  638. panic(
  639. err,
  640. )
  641. }
  642. }
  643. cli.Close()
  644. }
  645. func TestClose(
  646. t *testing.T,
  647. ) {
  648. defer u.Leakplug(
  649. t,
  650. )
  651. cli, err := dialEcho()
  652. if err != nil {
  653. panic(
  654. err,
  655. )
  656. }
  657. buf := make(
  658. []byte,
  659. 10,
  660. )
  661. cli.Close()
  662. if cli.Close() == nil {
  663. t.Fail()
  664. }
  665. n, err := cli.Write(
  666. buf,
  667. )
  668. if n != 0 || err == nil {
  669. t.Fail()
  670. }
  671. n, err = cli.Read(
  672. buf,
  673. )
  674. if n != 0 || err == nil {
  675. t.Fail()
  676. }
  677. cli.Close()
  678. }
  679. func TestParallel(
  680. t *testing.T,
  681. ) {
  682. concurrent := 1024
  683. if runtime.GOOS == "darwin" {
  684. t.Log(
  685. "\n--- WARN: Running on macOS: Retargetting concurrency:\t128",
  686. )
  687. concurrent = 128
  688. }
  689. t.Log(
  690. fmt.Sprintf(
  691. "\n--- INFO: Target concurrency:\t%v",
  692. concurrent,
  693. ),
  694. )
  695. t.Parallel()
  696. t.Log(
  697. fmt.Sprintf(
  698. "\tStage 1/2:\tGoroutines:\t%v",
  699. runtime.NumGoroutine(),
  700. ),
  701. )
  702. defer u.Leakplug(
  703. t,
  704. )
  705. var wg sync.WaitGroup
  706. wg.Add(
  707. concurrent,
  708. )
  709. for i := 0; i < concurrent; i++ {
  710. go parallel_client(
  711. &wg,
  712. )
  713. }
  714. t.Log(
  715. fmt.Sprintf(
  716. "\tStage 2/2:\tGoroutines:\t%v",
  717. runtime.NumGoroutine(),
  718. ),
  719. )
  720. wg.Wait()
  721. t.Log(
  722. fmt.Sprintf(
  723. "\tStage 2/3:\tGoroutines:\t%v",
  724. runtime.NumGoroutine(),
  725. ),
  726. )
  727. }
  728. func parallel_client(
  729. wg *sync.WaitGroup,
  730. ) (
  731. err error,
  732. ) {
  733. cli, err := dialEcho()
  734. if err != nil {
  735. panic(
  736. err,
  737. )
  738. }
  739. err = echo_tester(
  740. cli,
  741. 64,
  742. 64,
  743. )
  744. wg.Done()
  745. return
  746. }
  747. func BenchmarkEchoSpeed1K(
  748. b *testing.B,
  749. ) {
  750. speedclient(
  751. b,
  752. 1*1000,
  753. )
  754. }
  755. func BenchmarkEchoSpeed4K(
  756. b *testing.B,
  757. ) {
  758. speedclient(
  759. b,
  760. 4*1000,
  761. )
  762. }
  763. func BenchmarkEchoSpeed64K(
  764. b *testing.B,
  765. ) {
  766. speedclient(
  767. b,
  768. 64*1000,
  769. )
  770. }
  771. func BenchmarkEchoSpeed256K(
  772. b *testing.B,
  773. ) {
  774. speedclient(
  775. b,
  776. 256*1000,
  777. )
  778. }
  779. func BenchmarkEchoSpeed512K(
  780. b *testing.B,
  781. ) {
  782. speedclient(
  783. b,
  784. 512*1000,
  785. )
  786. }
  787. func BenchmarkEchoSpeed1M(
  788. b *testing.B,
  789. ) {
  790. speedclient(
  791. b,
  792. 1*1000*1000,
  793. )
  794. }
  795. func BenchmarkEchoSpeed4M(
  796. b *testing.B,
  797. ) {
  798. speedclient(
  799. b,
  800. 4*1000*1000,
  801. )
  802. }
  803. func BenchmarkEchoSpeed8M(
  804. b *testing.B,
  805. ) {
  806. speedclient(
  807. b,
  808. 8*1000*1000,
  809. )
  810. }
  811. func speedclient(
  812. b *testing.B,
  813. nbytes int,
  814. ) {
  815. b.ReportAllocs()
  816. cli, err := dialEcho()
  817. if err != nil {
  818. panic(
  819. err,
  820. )
  821. }
  822. if err := echo_tester(
  823. cli,
  824. nbytes,
  825. b.N,
  826. ); err != nil {
  827. b.Fail()
  828. }
  829. b.SetBytes(
  830. int64(
  831. nbytes,
  832. ),
  833. )
  834. }
  835. func BenchmarkSinkSpeed1K(
  836. b *testing.B,
  837. ) {
  838. sinkclient(
  839. b,
  840. 1*1000,
  841. )
  842. }
  843. func BenchmarkSinkSpeed4K(
  844. b *testing.B,
  845. ) {
  846. sinkclient(
  847. b,
  848. 4*1000,
  849. )
  850. }
  851. func BenchmarkSinkSpeed64K(
  852. b *testing.B,
  853. ) {
  854. sinkclient(
  855. b,
  856. 64*1000,
  857. )
  858. }
  859. func BenchmarkSinkSpeed256K(
  860. b *testing.B,
  861. ) {
  862. sinkclient(
  863. b,
  864. 256*1000,
  865. )
  866. }
  867. func BenchmarkSinkSpeed512K(
  868. b *testing.B,
  869. ) {
  870. sinkclient(
  871. b,
  872. 512*1000,
  873. )
  874. }
  875. func BenchmarkSinkSpeed1M(
  876. b *testing.B,
  877. ) {
  878. sinkclient(
  879. b,
  880. 1*1000*1000,
  881. )
  882. }
  883. func BenchmarkSinkSpeed4M(
  884. b *testing.B,
  885. ) {
  886. sinkclient(
  887. b,
  888. 4*1000*1000,
  889. )
  890. }
  891. func BenchmarkSinkSpeed8M(
  892. b *testing.B,
  893. ) {
  894. sinkclient(
  895. b,
  896. 8*1000*1000,
  897. )
  898. }
  899. func sinkclient(
  900. b *testing.B,
  901. nbytes int,
  902. ) {
  903. b.ReportAllocs()
  904. cli, err := dialSink()
  905. if err != nil {
  906. panic(
  907. err,
  908. )
  909. }
  910. sink_tester(
  911. cli,
  912. nbytes,
  913. b.N,
  914. )
  915. b.SetBytes(
  916. int64(
  917. nbytes,
  918. ),
  919. )
  920. }
  921. func echo_tester(
  922. cli net.Conn,
  923. msglen,
  924. msgcount int,
  925. ) error {
  926. buf := make(
  927. []byte,
  928. msglen,
  929. )
  930. for i := 0; i < msgcount; i++ {
  931. if _, err := cli.Write(
  932. buf,
  933. ); err != nil {
  934. return err
  935. }
  936. nrecv := 0
  937. for {
  938. n, err := cli.Read(
  939. buf,
  940. )
  941. if err != nil {
  942. return err
  943. } else {
  944. nrecv += n
  945. if nrecv == msglen {
  946. break
  947. }
  948. }
  949. }
  950. }
  951. return nil
  952. }
  953. func sink_tester(
  954. cli *gfcp.UDPSession,
  955. msglen,
  956. msgcount int,
  957. ) error {
  958. buf := make(
  959. []byte,
  960. msglen,
  961. )
  962. for i := 0; i < msgcount; i++ {
  963. if _, err := cli.Write(
  964. buf,
  965. ); err != nil {
  966. return err
  967. }
  968. }
  969. return nil
  970. }
  971. func TestSnsi(
  972. t *testing.T,
  973. ) {
  974. defer u.Leakplug(
  975. t,
  976. )
  977. t.Log(
  978. *gfcp.DefaultSnsi.Copy(),
  979. )
  980. t.Log(
  981. gfcp.DefaultSnsi.Header(),
  982. )
  983. t.Log(
  984. gfcp.DefaultSnsi.ToSlice(),
  985. )
  986. t.Log(
  987. "Resetting Snsi counters",
  988. )
  989. gfcp.DefaultSnsi.Reset()
  990. t.Log(
  991. gfcp.DefaultSnsi.ToSlice(),
  992. )
  993. }
  994. func TestListenerClose(
  995. t *testing.T,
  996. ) {
  997. defer u.Leakplug(
  998. t,
  999. )
  1000. l, err := gfcp.ListenWithOptions(
  1001. portListerner,
  1002. 10,
  1003. 3,
  1004. )
  1005. if err != nil {
  1006. t.Fail()
  1007. }
  1008. l.SetReadDeadline(
  1009. time.Now().Add(
  1010. 3 * time.Second,
  1011. ),
  1012. )
  1013. l.SetWriteDeadline(
  1014. time.Now().Add(
  1015. 3 * time.Second,
  1016. ),
  1017. )
  1018. l.SetDeadline(
  1019. time.Now().Add(
  1020. 3 * time.Second,
  1021. ),
  1022. )
  1023. time.Sleep(
  1024. 1 * time.Millisecond,
  1025. )
  1026. if _, err := l.Accept(); err == nil {
  1027. t.Fail()
  1028. }
  1029. l.Close()
  1030. fakeaddr, _ := net.ResolveUDPAddr(
  1031. "udp6",
  1032. "127.0.0.1:7162",
  1033. )
  1034. if l.CloseSession(
  1035. fakeaddr,
  1036. ) {
  1037. t.Fail()
  1038. }
  1039. }