database.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go run ../../proto/scripts/protofmt.go database.proto
  7. //go:generate protoc -I ../../ -I . --gogofast_out=. database.proto
  8. package main
  9. import (
  10. "context"
  11. "log"
  12. "sort"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/sliceutil"
  15. "github.com/syndtr/goleveldb/leveldb"
  16. "github.com/syndtr/goleveldb/leveldb/storage"
  17. "github.com/syndtr/goleveldb/leveldb/util"
  18. )
  19. type clock interface {
  20. Now() time.Time
  21. }
  22. type defaultClock struct{}
  23. func (defaultClock) Now() time.Time {
  24. return time.Now()
  25. }
  26. type database interface {
  27. put(key string, rec DatabaseRecord) error
  28. merge(key string, addrs []DatabaseAddress, seen int64) error
  29. get(key string) (DatabaseRecord, error)
  30. }
  31. type levelDBStore struct {
  32. db *leveldb.DB
  33. inbox chan func()
  34. clock clock
  35. marshalBuf []byte
  36. }
  37. func newLevelDBStore(dir string) (*levelDBStore, error) {
  38. db, err := leveldb.OpenFile(dir, levelDBOptions)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &levelDBStore{
  43. db: db,
  44. inbox: make(chan func(), 16),
  45. clock: defaultClock{},
  46. }, nil
  47. }
  48. func newMemoryLevelDBStore() (*levelDBStore, error) {
  49. db, err := leveldb.Open(storage.NewMemStorage(), nil)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return &levelDBStore{
  54. db: db,
  55. inbox: make(chan func(), 16),
  56. clock: defaultClock{},
  57. }, nil
  58. }
  59. func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
  60. t0 := time.Now()
  61. defer func() {
  62. databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
  63. }()
  64. rc := make(chan error)
  65. s.inbox <- func() {
  66. size := rec.Size()
  67. if len(s.marshalBuf) < size {
  68. s.marshalBuf = make([]byte, size)
  69. }
  70. n, _ := rec.MarshalTo(s.marshalBuf)
  71. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  72. }
  73. err := <-rc
  74. if err != nil {
  75. databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc()
  76. } else {
  77. databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
  78. }
  79. return err
  80. }
  81. func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error {
  82. t0 := time.Now()
  83. defer func() {
  84. databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
  85. }()
  86. rc := make(chan error)
  87. newRec := DatabaseRecord{
  88. Addresses: addrs,
  89. Seen: seen,
  90. }
  91. s.inbox <- func() {
  92. // grab the existing record
  93. oldRec, err := s.get(key)
  94. if err != nil {
  95. // "not found" is not an error from get, so this is serious
  96. // stuff only
  97. rc <- err
  98. return
  99. }
  100. newRec = merge(newRec, oldRec)
  101. // We replicate s.put() functionality here ourselves instead of
  102. // calling it because we want to serialize our get above together
  103. // with the put in the same function.
  104. size := newRec.Size()
  105. if len(s.marshalBuf) < size {
  106. s.marshalBuf = make([]byte, size)
  107. }
  108. n, _ := newRec.MarshalTo(s.marshalBuf)
  109. rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
  110. }
  111. err := <-rc
  112. if err != nil {
  113. databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
  114. } else {
  115. databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
  116. }
  117. return err
  118. }
  119. func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
  120. t0 := time.Now()
  121. defer func() {
  122. databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
  123. }()
  124. keyBs := []byte(key)
  125. val, err := s.db.Get(keyBs, nil)
  126. if err == leveldb.ErrNotFound {
  127. databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
  128. return DatabaseRecord{}, nil
  129. }
  130. if err != nil {
  131. databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
  132. return DatabaseRecord{}, err
  133. }
  134. var rec DatabaseRecord
  135. if err := rec.Unmarshal(val); err != nil {
  136. databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
  137. return DatabaseRecord{}, nil
  138. }
  139. rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
  140. databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
  141. return rec, nil
  142. }
  143. func (s *levelDBStore) Serve(ctx context.Context) error {
  144. t := time.NewTimer(0)
  145. defer t.Stop()
  146. defer s.db.Close()
  147. // Start the statistics serve routine. It will exit with us when
  148. // statisticsTrigger is closed.
  149. statisticsTrigger := make(chan struct{})
  150. statisticsDone := make(chan struct{})
  151. go s.statisticsServe(statisticsTrigger, statisticsDone)
  152. loop:
  153. for {
  154. select {
  155. case fn := <-s.inbox:
  156. // Run function in serialized order.
  157. fn()
  158. case <-t.C:
  159. // Trigger the statistics routine to do its thing in the
  160. // background.
  161. statisticsTrigger <- struct{}{}
  162. case <-statisticsDone:
  163. // The statistics routine is done with one iteratation, schedule
  164. // the next.
  165. t.Reset(databaseStatisticsInterval)
  166. case <-ctx.Done():
  167. // We're done.
  168. close(statisticsTrigger)
  169. break loop
  170. }
  171. }
  172. // Also wait for statisticsServe to return
  173. <-statisticsDone
  174. return nil
  175. }
  176. func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
  177. defer close(done)
  178. for range trigger {
  179. t0 := time.Now()
  180. nowNanos := t0.UnixNano()
  181. cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
  182. cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
  183. cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano()
  184. current, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0
  185. iter := s.db.NewIterator(&util.Range{}, nil)
  186. for iter.Next() {
  187. // Attempt to unmarshal the record and count the
  188. // failure if there's something wrong with it.
  189. var rec DatabaseRecord
  190. if err := rec.Unmarshal(iter.Value()); err != nil {
  191. errors++
  192. continue
  193. }
  194. // If there are addresses that have not expired it's a current
  195. // record, otherwise account it based on when it was last seen
  196. // (last 24 hours or last week) or finally as inactice.
  197. switch {
  198. case len(expire(rec.Addresses, nowNanos)) > 0:
  199. current++
  200. case rec.Seen > cutoff24h:
  201. last24h++
  202. case rec.Seen > cutoff1w:
  203. last1w++
  204. case rec.Seen > cutoff2Mon:
  205. inactive++
  206. case rec.Missed < cutoff2Mon:
  207. // It hasn't been seen lately and we haven't recorded
  208. // someone asking for this device in a long time either;
  209. // delete the record.
  210. if err := s.db.Delete(iter.Key(), nil); err != nil {
  211. databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
  212. } else {
  213. databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
  214. }
  215. default:
  216. inactive++
  217. }
  218. }
  219. iter.Release()
  220. databaseKeys.WithLabelValues("current").Set(float64(current))
  221. databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
  222. databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
  223. databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
  224. databaseKeys.WithLabelValues("error").Set(float64(errors))
  225. databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
  226. // Signal that we are done and can be scheduled again.
  227. done <- struct{}{}
  228. }
  229. }
  230. // merge returns the merged result of the two database records a and b. The
  231. // result is the union of the two address sets, with the newer expiry time
  232. // chosen for any duplicates.
  233. func merge(a, b DatabaseRecord) DatabaseRecord {
  234. // Both lists must be sorted for this to work.
  235. if !sort.IsSorted(databaseAddressOrder(a.Addresses)) {
  236. log.Println("Warning: bug: addresses not correctly sorted in merge")
  237. a.Addresses = sortedAddressCopy(a.Addresses)
  238. }
  239. if !sort.IsSorted(databaseAddressOrder(b.Addresses)) {
  240. // no warning because this is the side we read from disk and it may
  241. // legitimately predate correct sorting.
  242. b.Addresses = sortedAddressCopy(b.Addresses)
  243. }
  244. res := DatabaseRecord{
  245. Addresses: make([]DatabaseAddress, 0, len(a.Addresses)+len(b.Addresses)),
  246. Seen: a.Seen,
  247. }
  248. if b.Seen > a.Seen {
  249. res.Seen = b.Seen
  250. }
  251. aIdx := 0
  252. bIdx := 0
  253. aAddrs := a.Addresses
  254. bAddrs := b.Addresses
  255. loop:
  256. for {
  257. switch {
  258. case aIdx == len(aAddrs) && bIdx == len(bAddrs):
  259. // both lists are exhausted, we are done
  260. break loop
  261. case aIdx == len(aAddrs):
  262. // a is exhausted, pick from b and continue
  263. res.Addresses = append(res.Addresses, bAddrs[bIdx])
  264. bIdx++
  265. continue
  266. case bIdx == len(bAddrs):
  267. // b is exhausted, pick from a and continue
  268. res.Addresses = append(res.Addresses, aAddrs[aIdx])
  269. aIdx++
  270. continue
  271. }
  272. // We have values left on both sides.
  273. aVal := aAddrs[aIdx]
  274. bVal := bAddrs[bIdx]
  275. switch {
  276. case aVal.Address == bVal.Address:
  277. // update for same address, pick newer
  278. if aVal.Expires > bVal.Expires {
  279. res.Addresses = append(res.Addresses, aVal)
  280. } else {
  281. res.Addresses = append(res.Addresses, bVal)
  282. }
  283. aIdx++
  284. bIdx++
  285. case aVal.Address < bVal.Address:
  286. // a is smallest, pick it and continue
  287. res.Addresses = append(res.Addresses, aVal)
  288. aIdx++
  289. default:
  290. // b is smallest, pick it and continue
  291. res.Addresses = append(res.Addresses, bVal)
  292. bIdx++
  293. }
  294. }
  295. return res
  296. }
  297. // expire returns the list of addresses after removing expired entries.
  298. // Expiration happen in place, so the slice given as the parameter is
  299. // destroyed. Internal order is not preserved.
  300. func expire(addrs []DatabaseAddress, now int64) []DatabaseAddress {
  301. i := 0
  302. for i < len(addrs) {
  303. if addrs[i].Expires < now {
  304. addrs = sliceutil.RemoveAndZero(addrs, i)
  305. continue
  306. }
  307. i++
  308. }
  309. return addrs
  310. }
  311. func sortedAddressCopy(addrs []DatabaseAddress) []DatabaseAddress {
  312. sorted := make([]DatabaseAddress, len(addrs))
  313. copy(sorted, addrs)
  314. sort.Sort(databaseAddressOrder(sorted))
  315. return sorted
  316. }
  317. type databaseAddressOrder []DatabaseAddress
  318. func (s databaseAddressOrder) Less(a, b int) bool {
  319. return s[a].Address < s[b].Address
  320. }
  321. func (s databaseAddressOrder) Swap(a, b int) {
  322. s[a], s[b] = s[b], s[a]
  323. }
  324. func (s databaseAddressOrder) Len() int {
  325. return len(s)
  326. }