collector.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Copyright (c) 2017 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. package main
  5. import (
  6. "encoding/json"
  7. "strings"
  8. "sync"
  9. "notabug.org/themusicgod1/glog"
  10. "github.com/golang/protobuf/proto"
  11. "github.com/openconfig/reference/rpc/openconfig"
  12. "github.com/prometheus/client_golang/prometheus"
  13. )
  14. // A metric source.
  15. type source struct {
  16. addr string
  17. path string
  18. }
  19. // Since the labels are fixed per-path and per-device we can cache them here,
  20. // to avoid recomputing them.
  21. type labelledMetric struct {
  22. metric prometheus.Metric
  23. labels []string
  24. }
  25. type collector struct {
  26. // Protects access to metrics map
  27. m sync.Mutex
  28. metrics map[source]*labelledMetric
  29. config *Config
  30. }
  31. func newCollector(config *Config) *collector {
  32. return &collector{
  33. metrics: make(map[source]*labelledMetric),
  34. config: config,
  35. }
  36. }
  37. // Process a notfication and update or create the corresponding metrics.
  38. func (c *collector) update(addr string, message proto.Message) {
  39. resp, ok := message.(*openconfig.SubscribeResponse)
  40. if !ok {
  41. glog.Errorf("Unexpected type of message: %T", message)
  42. return
  43. }
  44. notif := resp.GetUpdate()
  45. if notif == nil {
  46. return
  47. }
  48. device := strings.Split(addr, ":")[0]
  49. prefix := "/" + strings.Join(notif.Prefix.Element, "/")
  50. // Process deletes first
  51. for _, del := range notif.Delete {
  52. path := prefix + "/" + strings.Join(del.Element, "/")
  53. key := source{addr: device, path: path}
  54. c.m.Lock()
  55. delete(c.metrics, key)
  56. c.m.Unlock()
  57. }
  58. // Process updates next
  59. for _, update := range notif.Update {
  60. // We only use JSON encoded values
  61. if update.Value == nil || update.Value.Type != openconfig.Type_JSON {
  62. glog.V(9).Infof("Ignoring incompatible update value in %s", update)
  63. continue
  64. }
  65. path := prefix + "/" + strings.Join(update.Path.Element, "/")
  66. value, suffix, ok := parseValue(update)
  67. if !ok {
  68. continue
  69. }
  70. if suffix != "" {
  71. path += "/" + suffix
  72. }
  73. src := source{addr: device, path: path}
  74. c.m.Lock()
  75. // Use the cached labels and descriptor if available
  76. if m, ok := c.metrics[src]; ok {
  77. m.metric = prometheus.MustNewConstMetric(m.metric.Desc(), prometheus.GaugeValue, value,
  78. m.labels...)
  79. c.m.Unlock()
  80. continue
  81. }
  82. c.m.Unlock()
  83. // Get the descriptor and labels for this source
  84. desc, labelValues := c.config.getDescAndLabels(src)
  85. if desc == nil {
  86. glog.V(8).Infof("Ignoring unmatched update at %s:%s: %+v", device, path, update.Value)
  87. continue
  88. }
  89. c.m.Lock()
  90. // Save the metric and labels in the cache
  91. metric := prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  92. c.metrics[src] = &labelledMetric{
  93. metric: metric,
  94. labels: labelValues,
  95. }
  96. c.m.Unlock()
  97. }
  98. }
  99. func parseValue(update *openconfig.Update) (float64, string, bool) {
  100. // All metrics in Prometheus are floats, so only try to unmarshal as float64.
  101. var intf interface{}
  102. if err := json.Unmarshal(update.Value.Value, &intf); err != nil {
  103. glog.Errorf("Can't parse value in update %v: %v", update, err)
  104. return 0, "", false
  105. }
  106. switch value := intf.(type) {
  107. case float64:
  108. return value, "", true
  109. case map[string]interface{}:
  110. if vIntf, ok := value["value"]; ok {
  111. if val, ok := vIntf.(float64); ok {
  112. return val, "value", true
  113. }
  114. }
  115. case bool:
  116. if value {
  117. return 1, "", true
  118. }
  119. return 0, "", true
  120. default:
  121. glog.V(9).Infof("Ignorig non-numeric update: %v", update)
  122. }
  123. return 0, "", false
  124. }
  125. // Describe implements prometheus.Collector interface
  126. func (c *collector) Describe(ch chan<- *prometheus.Desc) {
  127. c.config.getAllDescs(ch)
  128. }
  129. // Collect implements prometheus.Collector interface
  130. func (c *collector) Collect(ch chan<- prometheus.Metric) {
  131. c.m.Lock()
  132. for _, m := range c.metrics {
  133. ch <- m.metric
  134. }
  135. c.m.Unlock()
  136. }