123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- // Copyright (C) 2018 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- //go:generate go run ../../proto/scripts/protofmt.go database.proto
- //go:generate protoc -I ../../ -I . --gogofast_out=. database.proto
- package main
- import (
- "context"
- "log"
- "sort"
- "time"
- "github.com/syncthing/syncthing/lib/sliceutil"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- type clock interface {
- Now() time.Time
- }
- type defaultClock struct{}
- func (defaultClock) Now() time.Time {
- return time.Now()
- }
- type database interface {
- put(key string, rec DatabaseRecord) error
- merge(key string, addrs []DatabaseAddress, seen int64) error
- get(key string) (DatabaseRecord, error)
- }
- type levelDBStore struct {
- db *leveldb.DB
- inbox chan func()
- clock clock
- marshalBuf []byte
- }
- func newLevelDBStore(dir string) (*levelDBStore, error) {
- db, err := leveldb.OpenFile(dir, levelDBOptions)
- if err != nil {
- return nil, err
- }
- return &levelDBStore{
- db: db,
- inbox: make(chan func(), 16),
- clock: defaultClock{},
- }, nil
- }
- func newMemoryLevelDBStore() (*levelDBStore, error) {
- db, err := leveldb.Open(storage.NewMemStorage(), nil)
- if err != nil {
- return nil, err
- }
- return &levelDBStore{
- db: db,
- inbox: make(chan func(), 16),
- clock: defaultClock{},
- }, nil
- }
- func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
- t0 := time.Now()
- defer func() {
- databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
- }()
- rc := make(chan error)
- s.inbox <- func() {
- size := rec.Size()
- if len(s.marshalBuf) < size {
- s.marshalBuf = make([]byte, size)
- }
- n, _ := rec.MarshalTo(s.marshalBuf)
- rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
- }
- err := <-rc
- if err != nil {
- databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc()
- } else {
- databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
- }
- return err
- }
- func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error {
- t0 := time.Now()
- defer func() {
- databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
- }()
- rc := make(chan error)
- newRec := DatabaseRecord{
- Addresses: addrs,
- Seen: seen,
- }
- s.inbox <- func() {
- // grab the existing record
- oldRec, err := s.get(key)
- if err != nil {
- // "not found" is not an error from get, so this is serious
- // stuff only
- rc <- err
- return
- }
- newRec = merge(newRec, oldRec)
- // We replicate s.put() functionality here ourselves instead of
- // calling it because we want to serialize our get above together
- // with the put in the same function.
- size := newRec.Size()
- if len(s.marshalBuf) < size {
- s.marshalBuf = make([]byte, size)
- }
- n, _ := newRec.MarshalTo(s.marshalBuf)
- rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
- }
- err := <-rc
- if err != nil {
- databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
- } else {
- databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
- }
- return err
- }
- func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
- t0 := time.Now()
- defer func() {
- databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
- }()
- keyBs := []byte(key)
- val, err := s.db.Get(keyBs, nil)
- if err == leveldb.ErrNotFound {
- databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
- return DatabaseRecord{}, nil
- }
- if err != nil {
- databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
- return DatabaseRecord{}, err
- }
- var rec DatabaseRecord
- if err := rec.Unmarshal(val); err != nil {
- databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
- return DatabaseRecord{}, nil
- }
- rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
- databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
- return rec, nil
- }
- func (s *levelDBStore) Serve(ctx context.Context) error {
- t := time.NewTimer(0)
- defer t.Stop()
- defer s.db.Close()
- // Start the statistics serve routine. It will exit with us when
- // statisticsTrigger is closed.
- statisticsTrigger := make(chan struct{})
- statisticsDone := make(chan struct{})
- go s.statisticsServe(statisticsTrigger, statisticsDone)
- loop:
- for {
- select {
- case fn := <-s.inbox:
- // Run function in serialized order.
- fn()
- case <-t.C:
- // Trigger the statistics routine to do its thing in the
- // background.
- statisticsTrigger <- struct{}{}
- case <-statisticsDone:
- // The statistics routine is done with one iteratation, schedule
- // the next.
- t.Reset(databaseStatisticsInterval)
- case <-ctx.Done():
- // We're done.
- close(statisticsTrigger)
- break loop
- }
- }
- // Also wait for statisticsServe to return
- <-statisticsDone
- return nil
- }
- func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
- defer close(done)
- for range trigger {
- t0 := time.Now()
- nowNanos := t0.UnixNano()
- cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
- cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
- cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano()
- current, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0
- iter := s.db.NewIterator(&util.Range{}, nil)
- for iter.Next() {
- // Attempt to unmarshal the record and count the
- // failure if there's something wrong with it.
- var rec DatabaseRecord
- if err := rec.Unmarshal(iter.Value()); err != nil {
- errors++
- continue
- }
- // If there are addresses that have not expired it's a current
- // record, otherwise account it based on when it was last seen
- // (last 24 hours or last week) or finally as inactice.
- switch {
- case len(expire(rec.Addresses, nowNanos)) > 0:
- current++
- case rec.Seen > cutoff24h:
- last24h++
- case rec.Seen > cutoff1w:
- last1w++
- case rec.Seen > cutoff2Mon:
- inactive++
- case rec.Missed < cutoff2Mon:
- // It hasn't been seen lately and we haven't recorded
- // someone asking for this device in a long time either;
- // delete the record.
- if err := s.db.Delete(iter.Key(), nil); err != nil {
- databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
- } else {
- databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
- }
- default:
- inactive++
- }
- }
- iter.Release()
- databaseKeys.WithLabelValues("current").Set(float64(current))
- databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
- databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
- databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
- databaseKeys.WithLabelValues("error").Set(float64(errors))
- databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
- // Signal that we are done and can be scheduled again.
- done <- struct{}{}
- }
- }
- // merge returns the merged result of the two database records a and b. The
- // result is the union of the two address sets, with the newer expiry time
- // chosen for any duplicates.
- func merge(a, b DatabaseRecord) DatabaseRecord {
- // Both lists must be sorted for this to work.
- if !sort.IsSorted(databaseAddressOrder(a.Addresses)) {
- log.Println("Warning: bug: addresses not correctly sorted in merge")
- a.Addresses = sortedAddressCopy(a.Addresses)
- }
- if !sort.IsSorted(databaseAddressOrder(b.Addresses)) {
- // no warning because this is the side we read from disk and it may
- // legitimately predate correct sorting.
- b.Addresses = sortedAddressCopy(b.Addresses)
- }
- res := DatabaseRecord{
- Addresses: make([]DatabaseAddress, 0, len(a.Addresses)+len(b.Addresses)),
- Seen: a.Seen,
- }
- if b.Seen > a.Seen {
- res.Seen = b.Seen
- }
- aIdx := 0
- bIdx := 0
- aAddrs := a.Addresses
- bAddrs := b.Addresses
- loop:
- for {
- switch {
- case aIdx == len(aAddrs) && bIdx == len(bAddrs):
- // both lists are exhausted, we are done
- break loop
- case aIdx == len(aAddrs):
- // a is exhausted, pick from b and continue
- res.Addresses = append(res.Addresses, bAddrs[bIdx])
- bIdx++
- continue
- case bIdx == len(bAddrs):
- // b is exhausted, pick from a and continue
- res.Addresses = append(res.Addresses, aAddrs[aIdx])
- aIdx++
- continue
- }
- // We have values left on both sides.
- aVal := aAddrs[aIdx]
- bVal := bAddrs[bIdx]
- switch {
- case aVal.Address == bVal.Address:
- // update for same address, pick newer
- if aVal.Expires > bVal.Expires {
- res.Addresses = append(res.Addresses, aVal)
- } else {
- res.Addresses = append(res.Addresses, bVal)
- }
- aIdx++
- bIdx++
- case aVal.Address < bVal.Address:
- // a is smallest, pick it and continue
- res.Addresses = append(res.Addresses, aVal)
- aIdx++
- default:
- // b is smallest, pick it and continue
- res.Addresses = append(res.Addresses, bVal)
- bIdx++
- }
- }
- return res
- }
- // expire returns the list of addresses after removing expired entries.
- // Expiration happen in place, so the slice given as the parameter is
- // destroyed. Internal order is not preserved.
- func expire(addrs []DatabaseAddress, now int64) []DatabaseAddress {
- i := 0
- for i < len(addrs) {
- if addrs[i].Expires < now {
- addrs = sliceutil.RemoveAndZero(addrs, i)
- continue
- }
- i++
- }
- return addrs
- }
- func sortedAddressCopy(addrs []DatabaseAddress) []DatabaseAddress {
- sorted := make([]DatabaseAddress, len(addrs))
- copy(sorted, addrs)
- sort.Sort(databaseAddressOrder(sorted))
- return sorted
- }
- type databaseAddressOrder []DatabaseAddress
- func (s databaseAddressOrder) Less(a, b int) bool {
- return s[a].Address < s[b].Address
- }
- func (s databaseAddressOrder) Swap(a, b int) {
- s[a], s[b] = s[b], s[a]
- }
- func (s databaseAddressOrder) Len() int {
- return len(s)
- }
|