main.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // Copyright (c) 2016 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. // The octsdb tool pushes OpenConfig telemetry to OpenTSDB.
  5. package main
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "flag"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "notabug.org/themusicgod1/goarista/openconfig/client"
  16. "notabug.org/themusicgod1/glog"
  17. "github.com/golang/protobuf/proto"
  18. "github.com/openconfig/reference/rpc/openconfig"
  19. )
  20. func main() {
  21. tsdbFlag := flag.String("tsdb", "",
  22. "Address of the OpenTSDB server where to push telemetry to")
  23. textFlag := flag.Bool("text", false,
  24. "Print the output as simple text")
  25. configFlag := flag.String("config", "",
  26. "Config to turn OpenConfig telemetry into OpenTSDB put requests")
  27. isUDPServerFlag := flag.Bool("isudpserver", false,
  28. "Set to true to run as a UDP to TCP to OpenTSDB server.")
  29. udpAddrFlag := flag.String("udpaddr", "",
  30. "Address of the UDP server to connect to/serve on.")
  31. parityFlag := flag.Int("parityshards", 0,
  32. "Number of parity shards for the Reed Solomon Erasure Coding used for UDP."+
  33. " Clients and servers should have the same number.")
  34. udpTimeoutFlag := flag.Duration("udptimeout", 2*time.Second,
  35. "Timeout for each")
  36. username, password, subscriptions, addrs, opts := client.ParseFlags()
  37. if !(*tsdbFlag != "" || *textFlag || *udpAddrFlag != "") {
  38. glog.Fatal("Specify the address of the OpenTSDB server to write to with -tsdb")
  39. } else if *configFlag == "" {
  40. glog.Fatal("Specify a JSON configuration file with -config")
  41. }
  42. config, err := loadConfig(*configFlag)
  43. if err != nil {
  44. glog.Fatal(err)
  45. }
  46. // Ignore the default "subscribe-to-everything" subscription of the
  47. // -subscribe flag.
  48. if subscriptions[0] == "" {
  49. subscriptions = subscriptions[1:]
  50. }
  51. // Add the subscriptions from the config file.
  52. subscriptions = append(subscriptions, config.Subscriptions...)
  53. // Run a UDP server that forwards messages to OpenTSDB via Telnet (TCP)
  54. if *isUDPServerFlag {
  55. if *udpAddrFlag == "" {
  56. glog.Fatal("Specify the address for the UDP server to listen on with -udpaddr")
  57. }
  58. server, err := newUDPServer(*udpAddrFlag, *tsdbFlag, *parityFlag)
  59. if err != nil {
  60. glog.Fatal("Failed to create UDP server: ", err)
  61. }
  62. glog.Fatal(server.Run())
  63. }
  64. var c OpenTSDBConn
  65. if *textFlag {
  66. c = newTextDumper()
  67. } else if *udpAddrFlag != "" {
  68. c = newUDPClient(*udpAddrFlag, *parityFlag, *udpTimeoutFlag)
  69. } else {
  70. // TODO: support HTTP(S).
  71. c = newTelnetClient(*tsdbFlag)
  72. }
  73. wg := new(sync.WaitGroup)
  74. for _, addr := range addrs {
  75. wg.Add(1)
  76. publish := func(addr string, message proto.Message) {
  77. resp, ok := message.(*openconfig.SubscribeResponse)
  78. if !ok {
  79. glog.Errorf("Unexpected type of message: %T", message)
  80. return
  81. }
  82. if notif := resp.GetUpdate(); notif != nil {
  83. pushToOpenTSDB(addr, c, config, notif)
  84. }
  85. }
  86. c := client.New(username, password, addr, opts)
  87. go c.Subscribe(wg, subscriptions, publish)
  88. }
  89. wg.Wait()
  90. }
  91. func pushToOpenTSDB(addr string, conn OpenTSDBConn, config *Config,
  92. notif *openconfig.Notification) {
  93. if notif.Timestamp <= 0 {
  94. glog.Fatalf("Invalid timestamp %d in %s", notif.Timestamp, notif)
  95. }
  96. host := addr[:strings.IndexRune(addr, ':')]
  97. if host == "localhost" {
  98. // TODO: On Linux this reads /proc/sys/kernel/hostname each time,
  99. // which isn't the most efficient, but at least we don't have to
  100. // deal with detecting hostname changes.
  101. host, _ = os.Hostname()
  102. if host == "" {
  103. glog.Info("could not figure out localhost's hostname")
  104. return
  105. }
  106. }
  107. prefix := "/" + strings.Join(notif.Prefix.Element, "/")
  108. for _, update := range notif.Update {
  109. if update.Value == nil || update.Value.Type != openconfig.Type_JSON {
  110. glog.V(9).Infof("Ignoring incompatible update value in %s", update)
  111. continue
  112. }
  113. value := parseValue(update)
  114. if value == nil {
  115. continue
  116. }
  117. path := prefix + "/" + strings.Join(update.Path.Element, "/")
  118. metricName, tags := config.Match(path)
  119. if metricName == "" {
  120. glog.V(8).Infof("Ignoring unmatched update at %s: %+v", path, update.Value)
  121. continue
  122. }
  123. tags["host"] = host
  124. for i, v := range value {
  125. if len(value) > 1 {
  126. tags["index"] = strconv.Itoa(i)
  127. }
  128. err := conn.Put(&DataPoint{
  129. Metric: metricName,
  130. Timestamp: uint64(notif.Timestamp),
  131. Value: v,
  132. Tags: tags,
  133. })
  134. if err != nil {
  135. glog.Info("Failed to put datapoint: ", err)
  136. }
  137. }
  138. }
  139. }
  140. // parseValue returns either an integer/floating point value of the given update, or if
  141. // the value is a slice of integers/floating point values. If the value is neither of these
  142. // or if any element in the slice is non numerical, parseValue returns nil.
  143. func parseValue(update *openconfig.Update) []interface{} {
  144. var value interface{}
  145. decoder := json.NewDecoder(bytes.NewReader(update.Value.Value))
  146. decoder.UseNumber()
  147. err := decoder.Decode(&value)
  148. if err != nil {
  149. glog.Fatalf("Malformed JSON update %q in %s", update.Value.Value, update)
  150. }
  151. switch value := value.(type) {
  152. case json.Number:
  153. return []interface{}{parseNumber(value, update)}
  154. case []interface{}:
  155. for i, val := range value {
  156. jsonNum, ok := val.(json.Number)
  157. if !ok {
  158. // If any value is not a number, skip it.
  159. glog.Infof("Element %d: %v is %T, not json.Number", i, val, val)
  160. continue
  161. }
  162. num := parseNumber(jsonNum, update)
  163. value[i] = num
  164. }
  165. return value
  166. case map[string]interface{}:
  167. // Special case for simple value types that just have a "value"
  168. // attribute (common case).
  169. if val, ok := value["value"].(json.Number); ok && len(value) == 1 {
  170. return []interface{}{parseNumber(val, update)}
  171. }
  172. default:
  173. glog.V(9).Infof("Ignoring non-numeric or non-numeric slice value in %s", update)
  174. }
  175. return nil
  176. }
  177. // Convert our json.Number to either an int64, uint64, or float64.
  178. func parseNumber(num json.Number, update *openconfig.Update) interface{} {
  179. var value interface{}
  180. var err error
  181. if value, err = num.Int64(); err != nil {
  182. // num is either a large unsigned integer or a floating point.
  183. if strings.Contains(err.Error(), "value out of range") { // Sigh.
  184. value, err = strconv.ParseUint(num.String(), 10, 64)
  185. } else {
  186. value, err = num.Float64()
  187. if err != nil {
  188. glog.Fatalf("Malformed JSON number %q in %s", num, update)
  189. }
  190. }
  191. }
  192. return value
  193. }