main.go 8.3 KB


  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "database/sql"
  9. "log"
  10. "os"
  11. "time"
  12. _ "github.com/lib/pq"
  13. )
  14. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  15. func getEnvDefault(key, def string) string {
  16. if val := os.Getenv(key); val != "" {
  17. return val
  18. }
  19. return def
  20. }
  21. func main() {
  22. log.SetFlags(log.Ltime | log.Ldate)
  23. log.SetOutput(os.Stdout)
  24. db, err := sql.Open("postgres", dbConn)
  25. if err != nil {
  26. log.Fatalln("database:", err)
  27. }
  28. err = setupDB(db)
  29. if err != nil {
  30. log.Fatalln("database:", err)
  31. }
  32. for {
  33. runAggregation(db)
  34. // Sleep until one minute past next midnight
  35. sleepUntilNext(24*time.Hour, 1*time.Minute)
  36. }
  37. }
  38. func runAggregation(db *sql.DB) {
  39. since := maxIndexedDay(db, "VersionSummary")
  40. log.Println("Aggregating VersionSummary data since", since)
  41. rows, err := aggregateVersionSummary(db, since.Add(24*time.Hour))
  42. if err != nil {
  43. log.Println("aggregate:", err)
  44. }
  45. log.Println("Inserted", rows, "rows")
  46. log.Println("Aggregating UserMovement data")
  47. rows, err = aggregateUserMovement(db)
  48. if err != nil {
  49. log.Println("aggregate:", err)
  50. }
  51. log.Println("Inserted", rows, "rows")
  52. since = maxIndexedDay(db, "Performance")
  53. log.Println("Aggregating Performance data since", since)
  54. rows, err = aggregatePerformance(db, since.Add(24*time.Hour))
  55. if err != nil {
  56. log.Println("aggregate:", err)
  57. }
  58. log.Println("Inserted", rows, "rows")
  59. since = maxIndexedDay(db, "BlockStats")
  60. log.Println("Aggregating BlockStats data since", since)
  61. rows, err = aggregateBlockStats(db, since.Add(24*time.Hour))
  62. if err != nil {
  63. log.Println("aggregate:", err)
  64. }
  65. log.Println("Inserted", rows, "rows")
  66. }
  67. func sleepUntilNext(intv, margin time.Duration) {
  68. now := time.Now().UTC()
  69. next := now.Truncate(intv).Add(intv).Add(margin)
  70. log.Println("Sleeping until", next)
  71. time.Sleep(next.Sub(now))
  72. }
  73. func setupDB(db *sql.DB) error {
  74. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  75. Day TIMESTAMP NOT NULL,
  76. Version VARCHAR(8) NOT NULL,
  77. Count INTEGER NOT NULL
  78. )`)
  79. if err != nil {
  80. return err
  81. }
  82. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  83. Day TIMESTAMP NOT NULL,
  84. Added INTEGER NOT NULL,
  85. Bounced INTEGER NOT NULL,
  86. Removed INTEGER NOT NULL
  87. )`)
  88. if err != nil {
  89. return err
  90. }
  91. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Performance (
  92. Day TIMESTAMP NOT NULL,
  93. TotFiles INTEGER NOT NULL,
  94. TotMiB INTEGER NOT NULL,
  95. SHA256Perf DOUBLE PRECISION NOT NULL,
  96. MemorySize INTEGER NOT NULL,
  97. MemoryUsageMiB INTEGER NOT NULL
  98. )`)
  99. if err != nil {
  100. return err
  101. }
  102. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS BlockStats (
  103. Day TIMESTAMP NOT NULL,
  104. Reports INTEGER NOT NULL,
  105. Total INTEGER NOT NULL,
  106. Renamed INTEGER NOT NULL,
  107. Reused INTEGER NOT NULL,
  108. Pulled INTEGER NOT NULL,
  109. CopyOrigin INTEGER NOT NULL,
  110. CopyOriginShifted INTEGER NOT NULL,
  111. CopyElsewhere INTEGER NOT NULL
  112. )`)
  113. if err != nil {
  114. return err
  115. }
  116. var t string
  117. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  118. if err := row.Scan(&t); err != nil {
  119. _, _ = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  120. }
  121. row = db.QueryRow(`SELECT 'VersionDayIndex'::regclass`)
  122. if err := row.Scan(&t); err != nil {
  123. _, _ = db.Exec(`CREATE INDEX VersionDayIndex ON VersionSummary (Day)`)
  124. }
  125. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  126. if err := row.Scan(&t); err != nil {
  127. _, _ = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  128. }
  129. row = db.QueryRow(`SELECT 'PerformanceDayIndex'::regclass`)
  130. if err := row.Scan(&t); err != nil {
  131. _, _ = db.Exec(`CREATE INDEX PerformanceDayIndex ON Performance (Day)`)
  132. }
  133. row = db.QueryRow(`SELECT 'BlockStatsDayIndex'::regclass`)
  134. if err := row.Scan(&t); err != nil {
  135. _, _ = db.Exec(`CREATE INDEX BlockStatsDayIndex ON BlockStats (Day)`)
  136. }
  137. return nil
  138. }
  139. func maxIndexedDay(db *sql.DB, table string) time.Time {
  140. var t time.Time
  141. row := db.QueryRow("SELECT MAX(DATE_TRUNC('day', Day)) FROM " + table)
  142. err := row.Scan(&t)
  143. if err != nil {
  144. return time.Time{}
  145. }
  146. return t
  147. }
  148. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  149. res, err := db.Exec(`INSERT INTO VersionSummary (
  150. SELECT
  151. DATE_TRUNC('day', Received) AS Day,
  152. SUBSTRING(Report->>'version' FROM '^v\d.\d+') AS Ver,
  153. COUNT(*) AS Count
  154. FROM ReportsJson
  155. WHERE
  156. Received > $1
  157. AND Received < DATE_TRUNC('day', NOW())
  158. AND Report->>'version' like 'v_.%'
  159. GROUP BY Day, Ver
  160. );
  161. `, since)
  162. if err != nil {
  163. return 0, err
  164. }
  165. return res.RowsAffected()
  166. }
  167. func aggregateUserMovement(db *sql.DB) (int64, error) {
  168. rows, err := db.Query(`SELECT
  169. DATE_TRUNC('day', Received) AS Day,
  170. Report->>'uniqueID'
  171. FROM ReportsJson
  172. WHERE
  173. Report->>'uniqueID' IS NOT NULL
  174. AND Received < DATE_TRUNC('day', NOW())
  175. AND Report->>'version' like 'v_.%'
  176. ORDER BY Day
  177. `)
  178. if err != nil {
  179. return 0, err
  180. }
  181. defer rows.Close()
  182. firstSeen := make(map[string]time.Time)
  183. lastSeen := make(map[string]time.Time)
  184. var minTs time.Time
  185. minTs = minTs.In(time.UTC)
  186. for rows.Next() {
  187. var ts time.Time
  188. var id string
  189. if err := rows.Scan(&ts, &id); err != nil {
  190. return 0, err
  191. }
  192. if minTs.IsZero() {
  193. minTs = ts
  194. }
  195. if _, ok := firstSeen[id]; !ok {
  196. firstSeen[id] = ts
  197. }
  198. lastSeen[id] = ts
  199. }
  200. type sumRow struct {
  201. day time.Time
  202. added int
  203. removed int
  204. bounced int
  205. }
  206. var sumRows []sumRow
  207. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  208. var added, removed, bounced int
  209. old := t.Before(time.Now().AddDate(0, 0, -30))
  210. for id, first := range firstSeen {
  211. last := lastSeen[id]
  212. if first.Equal(t) && last.Equal(t) && old {
  213. bounced++
  214. continue
  215. }
  216. if first.Equal(t) {
  217. added++
  218. }
  219. if last == t && old {
  220. removed++
  221. }
  222. }
  223. sumRows = append(sumRows, sumRow{t, added, removed, bounced})
  224. }
  225. tx, err := db.Begin()
  226. if err != nil {
  227. return 0, err
  228. }
  229. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  230. tx.Rollback()
  231. return 0, err
  232. }
  233. for _, r := range sumRows {
  234. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
  235. tx.Rollback()
  236. return 0, err
  237. }
  238. }
  239. return int64(len(sumRows)), tx.Commit()
  240. }
  241. func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
  242. res, err := db.Exec(`INSERT INTO Performance (
  243. SELECT
  244. DATE_TRUNC('day', Received) AS Day,
  245. AVG((Report->>'totFiles')::numeric) As TotFiles,
  246. AVG((Report->>'totMiB')::numeric) As TotMiB,
  247. AVG((Report->>'sha256Perf')::numeric) As SHA256Perf,
  248. AVG((Report->>'memorySize')::numeric) As MemorySize,
  249. AVG((Report->>'memoryUsageMiB')::numeric) As MemoryUsageMiB
  250. FROM ReportsJson
  251. WHERE
  252. Received > $1
  253. AND Received < DATE_TRUNC('day', NOW())
  254. AND Report->>'version' like 'v_.%'
  255. /* Some custom implementation reported bytes when we expect megabytes, cap at petabyte */
  256. AND (Report->>'memorySize')::numeric < 1073741824
  257. GROUP BY Day
  258. );
  259. `, since)
  260. if err != nil {
  261. return 0, err
  262. }
  263. return res.RowsAffected()
  264. }
  265. func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) {
  266. // Filter out anything prior 0.14.41 as that has sum aggregations which
  267. // made no sense.
  268. res, err := db.Exec(`INSERT INTO BlockStats (
  269. SELECT
  270. DATE_TRUNC('day', Received) AS Day,
  271. COUNT(1) As Reports,
  272. SUM((Report->'blockStats'->>'total')::numeric) AS Total,
  273. SUM((Report->'blockStats'->>'renamed')::numeric) AS Renamed,
  274. SUM((Report->'blockStats'->>'reused')::numeric) AS Reused,
  275. SUM((Report->'blockStats'->>'pulled')::numeric) AS Pulled,
  276. SUM((Report->'blockStats'->>'copyOrigin')::numeric) AS CopyOrigin,
  277. SUM((Report->'blockStats'->>'copyOriginShifted')::numeric) AS CopyOriginShifted,
  278. SUM((Report->'blockStats'->>'copyElsewhere')::numeric) AS CopyElsewhere
  279. FROM ReportsJson
  280. WHERE
  281. Received > $1
  282. AND Received < DATE_TRUNC('day', NOW())
  283. AND (Report->>'urVersion')::numeric >= 3
  284. AND Report->>'version' like 'v_.%'
  285. AND Report->>'version' NOT LIKE 'v0.14.40%'
  286. AND Report->>'version' NOT LIKE 'v0.14.39%'
  287. AND Report->>'version' NOT LIKE 'v0.14.38%'
  288. GROUP BY Day
  289. );
  290. `, since)
  291. if err != nil {
  292. return 0, err
  293. }
  294. return res.RowsAffected()
  295. }