stats.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright (C) 2018 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
  2. package main
  3. import (
  4. "encoding/json"
  5. "net"
  6. "net/http"
  7. "os"
  8. "time"
  9. "github.com/prometheus/client_golang/prometheus"
  10. "github.com/prometheus/client_golang/prometheus/collectors"
  11. "github.com/syncthing/syncthing/lib/sync"
  12. )
  13. func init() {
  14. processCollectorOpts := collectors.ProcessCollectorOpts{
  15. Namespace: "syncthing_relaypoolsrv",
  16. PidFn: func() (int, error) {
  17. return os.Getpid(), nil
  18. },
  19. }
  20. prometheus.MustRegister(
  21. collectors.NewProcessCollector(processCollectorOpts),
  22. )
  23. }
  24. var (
  25. statusClient = http.Client{
  26. Timeout: 5 * time.Second,
  27. }
  28. apiRequestsTotal = makeCounter("api_requests_total", "Number of API requests.", "type", "result")
  29. apiRequestsSeconds = makeSummary("api_requests_seconds", "Latency of API requests.", "type")
  30. relayTestsTotal = makeCounter("tests_total", "Number of relay tests.", "result")
  31. relayTestActionsSeconds = makeSummary("test_actions_seconds", "Latency of relay test actions.", "type")
  32. locationLookupSeconds = makeSummary("location_lookup_seconds", "Latency of location lookups.").WithLabelValues()
  33. metricsRequestsSeconds = makeSummary("metrics_requests_seconds", "Latency of metric requests.").WithLabelValues()
  34. scrapeSeconds = makeSummary("relay_scrape_seconds", "Latency of metric scrapes from remote relays.", "result")
  35. relayUptime = makeGauge("relay_uptime", "Uptime of relay", "relay")
  36. relayPendingSessionKeys = makeGauge("relay_pending_session_keys", "Number of pending session keys (two keys per session, one per each side of the connection)", "relay")
  37. relayActiveSessions = makeGauge("relay_active_sessions", "Number of sessions that are happening, a session contains two parties", "relay")
  38. relayConnections = makeGauge("relay_connections", "Number of devices connected to the relay", "relay")
  39. relayProxies = makeGauge("relay_proxies", "Number of active proxy routines sending data between peers (two proxies per session, one for each way)", "relay")
  40. relayBytesProxied = makeGauge("relay_bytes_proxied", "Number of bytes proxied by the relay", "relay")
  41. relayGoRoutines = makeGauge("relay_go_routines", "Number of Go routines in the process", "relay")
  42. relaySessionRate = makeGauge("relay_session_rate", "Rate applied per session", "relay")
  43. relayGlobalRate = makeGauge("relay_global_rate", "Global rate applied on the whole relay", "relay")
  44. relayBuildInfo = makeGauge("relay_build_info", "Build information about a relay", "relay", "go_version", "go_os", "go_arch")
  45. relayLocationInfo = makeGauge("relay_location_info", "Location information about a relay", "relay", "city", "country", "continent")
  46. lastStats = make(map[string]stats)
  47. )
  48. func makeGauge(name string, help string, labels ...string) *prometheus.GaugeVec {
  49. gauge := prometheus.NewGaugeVec(
  50. prometheus.GaugeOpts{
  51. Namespace: "syncthing",
  52. Subsystem: "relaypoolsrv",
  53. Name: name,
  54. Help: help,
  55. },
  56. labels,
  57. )
  58. prometheus.MustRegister(gauge)
  59. return gauge
  60. }
  61. func makeSummary(name string, help string, labels ...string) *prometheus.SummaryVec {
  62. summary := prometheus.NewSummaryVec(
  63. prometheus.SummaryOpts{
  64. Namespace: "syncthing",
  65. Subsystem: "relaypoolsrv",
  66. Name: name,
  67. Help: help,
  68. Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
  69. },
  70. labels,
  71. )
  72. prometheus.MustRegister(summary)
  73. return summary
  74. }
  75. func makeCounter(name string, help string, labels ...string) *prometheus.CounterVec {
  76. counter := prometheus.NewCounterVec(
  77. prometheus.CounterOpts{
  78. Namespace: "syncthing",
  79. Subsystem: "relaypoolsrv",
  80. Name: name,
  81. Help: help,
  82. },
  83. labels,
  84. )
  85. prometheus.MustRegister(counter)
  86. return counter
  87. }
  88. func statsRefresher(interval time.Duration) {
  89. ticker := time.NewTicker(interval)
  90. for range ticker.C {
  91. refreshStats()
  92. }
  93. }
  94. type statsFetchResult struct {
  95. relay *relay
  96. stats *stats
  97. }
  98. func refreshStats() {
  99. mut.RLock()
  100. relays := append(permanentRelays, knownRelays...)
  101. mut.RUnlock()
  102. now := time.Now()
  103. wg := sync.NewWaitGroup()
  104. results := make(chan statsFetchResult, len(relays))
  105. for _, rel := range relays {
  106. wg.Add(1)
  107. go func(rel *relay) {
  108. t0 := time.Now()
  109. stats := fetchStats(rel)
  110. duration := time.Since(t0).Seconds()
  111. result := "success"
  112. if stats == nil {
  113. result = "failed"
  114. }
  115. scrapeSeconds.WithLabelValues(result).Observe(duration)
  116. results <- statsFetchResult{
  117. relay: rel,
  118. stats: fetchStats(rel),
  119. }
  120. wg.Done()
  121. }(rel)
  122. }
  123. wg.Wait()
  124. close(results)
  125. mut.Lock()
  126. relayBuildInfo.Reset()
  127. relayLocationInfo.Reset()
  128. for result := range results {
  129. result.relay.StatsRetrieved = now
  130. result.relay.Stats = result.stats
  131. if result.stats == nil {
  132. deleteMetrics(result.relay.uri.Host)
  133. } else {
  134. updateMetrics(result.relay.uri.Host, *result.stats, result.relay.Location)
  135. }
  136. }
  137. mut.Unlock()
  138. }
  139. func fetchStats(relay *relay) *stats {
  140. statusAddr := relay.uri.Query().Get("statusAddr")
  141. if statusAddr == "" {
  142. statusAddr = ":22070"
  143. }
  144. statusHost, statusPort, err := net.SplitHostPort(statusAddr)
  145. if err != nil {
  146. return nil
  147. }
  148. if statusHost == "" {
  149. if host, _, err := net.SplitHostPort(relay.uri.Host); err != nil {
  150. return nil
  151. } else {
  152. statusHost = host
  153. }
  154. }
  155. url := "http://" + net.JoinHostPort(statusHost, statusPort) + "/status"
  156. response, err := statusClient.Get(url)
  157. if err != nil {
  158. return nil
  159. }
  160. var stats stats
  161. if json.NewDecoder(response.Body).Decode(&stats); err != nil {
  162. return nil
  163. }
  164. return &stats
  165. }
  166. func updateMetrics(host string, stats stats, location location) {
  167. if stats.GoVersion != "" || stats.GoOS != "" || stats.GoArch != "" {
  168. relayBuildInfo.WithLabelValues(host, stats.GoVersion, stats.GoOS, stats.GoArch).Add(1)
  169. }
  170. if location.City != "" || location.Country != "" || location.Continent != "" {
  171. relayLocationInfo.WithLabelValues(host, location.City, location.Country, location.Continent).Add(1)
  172. }
  173. if lastStat, ok := lastStats[host]; ok {
  174. stats = mergeStats(stats, lastStat)
  175. }
  176. relayUptime.WithLabelValues(host).Set(float64(stats.UptimeSeconds))
  177. relayPendingSessionKeys.WithLabelValues(host).Set(float64(stats.PendingSessionKeys))
  178. relayActiveSessions.WithLabelValues(host).Set(float64(stats.ActiveSessions))
  179. relayConnections.WithLabelValues(host).Set(float64(stats.Connections))
  180. relayProxies.WithLabelValues(host).Set(float64(stats.Proxies))
  181. relayBytesProxied.WithLabelValues(host).Set(float64(stats.BytesProxied))
  182. relayGoRoutines.WithLabelValues(host).Set(float64(stats.GoRoutines))
  183. relaySessionRate.WithLabelValues(host).Set(float64(stats.Options.SessionRate))
  184. relayGlobalRate.WithLabelValues(host).Set(float64(stats.Options.GlobalRate))
  185. lastStats[host] = stats
  186. }
  187. func deleteMetrics(host string) {
  188. relayUptime.DeleteLabelValues(host)
  189. relayPendingSessionKeys.DeleteLabelValues(host)
  190. relayActiveSessions.DeleteLabelValues(host)
  191. relayConnections.DeleteLabelValues(host)
  192. relayProxies.DeleteLabelValues(host)
  193. relayBytesProxied.DeleteLabelValues(host)
  194. relayGoRoutines.DeleteLabelValues(host)
  195. relaySessionRate.DeleteLabelValues(host)
  196. relayGlobalRate.DeleteLabelValues(host)
  197. delete(lastStats, host)
  198. }
  199. // Due to some unexplainable behaviour, some of the numbers sometimes travel slightly backwards (by less than 1%)
  200. // This happens between scrapes, which is 30s, so this can't be a race.
  201. // This causes prometheus to assume a "rate reset", hence causes phenomenal spikes.
  202. // One of the number that moves backwards is BytesProxied, which atomically increments a counter with numeric value
  203. // returned by net.Conn.Read(). I don't think that can return a negative value, so I have no idea what's going on.
  204. func mergeStats(new stats, old stats) stats {
  205. new.UptimeSeconds = mergeValue(new.UptimeSeconds, old.UptimeSeconds)
  206. new.PendingSessionKeys = mergeValue(new.PendingSessionKeys, old.PendingSessionKeys)
  207. new.ActiveSessions = mergeValue(new.ActiveSessions, old.ActiveSessions)
  208. new.Connections = mergeValue(new.Connections, old.Connections)
  209. new.Proxies = mergeValue(new.Proxies, old.Proxies)
  210. new.BytesProxied = mergeValue(new.BytesProxied, old.BytesProxied)
  211. new.GoRoutines = mergeValue(new.GoRoutines, old.GoRoutines)
  212. new.Options.SessionRate = mergeValue(new.Options.SessionRate, old.Options.SessionRate)
  213. new.Options.GlobalRate = mergeValue(new.Options.GlobalRate, old.Options.GlobalRate)
  214. return new
  215. }
  216. func mergeValue(new, old int) int {
  217. if new >= old {
  218. return new // normal increase
  219. }
  220. if float64(new) > 0.99*float64(old) {
  221. return old // slight backward movement
  222. }
  223. return new // reset (relay restart)
  224. }