udp.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright (c) 2017 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. package main
  5. import (
  6. "math/rand"
  7. "time"
  8. "notabug.org/themusicgod1/glog"
  9. kcp "github.com/xtaci/kcp-go"
  10. )
  11. type udpClient struct {
  12. addr string
  13. conn *kcp.UDPSession
  14. parity int
  15. timeout time.Duration
  16. }
  17. func newUDPClient(addr string, parity int, timeout time.Duration) OpenTSDBConn {
  18. return &udpClient{
  19. addr: addr,
  20. parity: parity,
  21. timeout: timeout,
  22. }
  23. }
  24. func (c *udpClient) Put(d *DataPoint) error {
  25. var err error
  26. if c.conn == nil {
  27. // Prevent a bunch of clients all disconnecting and attempting to reconnect
  28. // at nearly the same time.
  29. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
  30. c.conn, err = kcp.DialWithOptions(c.addr, nil, 10, c.parity)
  31. if err != nil {
  32. return err
  33. }
  34. c.conn.SetNoDelay(1, 40, 1, 1) // Suggested by kcp-go to lower cpu usage
  35. }
  36. dStr := d.String()
  37. glog.V(3).Info(dStr)
  38. c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
  39. _, err = c.conn.Write([]byte(dStr))
  40. if err != nil {
  41. c.conn.Close()
  42. c.conn = nil
  43. }
  44. return err
  45. }
  46. type udpServer struct {
  47. lis *kcp.Listener
  48. telnet *telnetClient
  49. }
  50. func newUDPServer(udpAddr, tsdbAddr string, parity int) (*udpServer, error) {
  51. lis, err := kcp.ListenWithOptions(udpAddr, nil, 10, parity)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &udpServer{
  56. lis: lis,
  57. telnet: newTelnetClient(tsdbAddr).(*telnetClient),
  58. }, nil
  59. }
  60. func (c *udpServer) Run() error {
  61. for {
  62. conn, err := c.lis.AcceptKCP()
  63. if err != nil {
  64. return err
  65. }
  66. conn.SetNoDelay(1, 40, 1, 1) // Suggested by kcp-go to lower cpu usage
  67. if glog.V(3) {
  68. glog.Infof("New connection from %s", conn.RemoteAddr())
  69. }
  70. go func() {
  71. defer conn.Close()
  72. var buf [4096]byte
  73. for {
  74. n, err := conn.Read(buf[:])
  75. if err != nil {
  76. if n != 0 { // Not EOF
  77. glog.Error(err)
  78. }
  79. return
  80. }
  81. if glog.V(3) {
  82. glog.Info(string(buf[:n]))
  83. }
  84. err = c.telnet.PutBytes(buf[:n])
  85. if err != nil {
  86. glog.Error(err)
  87. return
  88. }
  89. }
  90. }()
  91. }
  92. }