123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 |
- // Copyright (C) 2016 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at http://mozilla.org/MPL/2.0/.
- package watchaggregator
- import (
- "context"
- "fmt"
- "path/filepath"
- "strings"
- "time"
- "github.com/syncthing/syncthing/lib/config"
- "github.com/syncthing/syncthing/lib/events"
- "github.com/syncthing/syncthing/lib/fs"
- )
- // Not meant to be changed, but must be changeable for tests
- var (
- maxFiles = 512
- maxFilesPerDir = 128
- )
- // aggregatedEvent represents potentially multiple events at and/or recursively
- // below one path until it times out and a scan is scheduled.
- // If it represents multiple events and there are events of both Remove and
- // NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).
- type aggregatedEvent struct {
- firstModTime time.Time
- lastModTime time.Time
- evType fs.EventType
- }
- // Stores pointers to both aggregated events directly within this directory and
- // child directories recursively containing aggregated events themselves.
- type eventDir struct {
- events map[string]*aggregatedEvent
- dirs map[string]*eventDir
- }
- func newEventDir() *eventDir {
- return &eventDir{
- events: make(map[string]*aggregatedEvent),
- dirs: make(map[string]*eventDir),
- }
- }
- func (dir *eventDir) childCount() int {
- return len(dir.events) + len(dir.dirs)
- }
- func (dir *eventDir) firstModTime() time.Time {
- if dir.childCount() == 0 {
- panic("bug: firstModTime must not be used on empty eventDir")
- }
- firstModTime := time.Now()
- for _, childDir := range dir.dirs {
- dirTime := childDir.firstModTime()
- if dirTime.Before(firstModTime) {
- firstModTime = dirTime
- }
- }
- for _, event := range dir.events {
- if event.firstModTime.Before(firstModTime) {
- firstModTime = event.firstModTime
- }
- }
- return firstModTime
- }
- func (dir *eventDir) eventType() fs.EventType {
- if dir.childCount() == 0 {
- panic("bug: eventType must not be used on empty eventDir")
- }
- var evType fs.EventType
- for _, childDir := range dir.dirs {
- evType |= childDir.eventType()
- if evType == fs.Mixed {
- return fs.Mixed
- }
- }
- for _, event := range dir.events {
- evType |= event.evType
- if evType == fs.Mixed {
- return fs.Mixed
- }
- }
- return evType
- }
- type aggregator struct {
- // folderID never changes and is accessed in CommitConfiguration, which
- // asynchronously updates folderCfg -> can't use folderCfg.ID (racy)
- folderID string
- folderCfg config.FolderConfiguration
- folderCfgUpdate chan config.FolderConfiguration
- // Time after which an event is scheduled for scanning when no modifications occur.
- notifyDelay time.Duration
- // Time after which an event is scheduled for scanning even though modifications occur.
- notifyTimeout time.Duration
- notifyTimer *time.Timer
- notifyTimerNeedsReset bool
- notifyTimerResetChan chan time.Duration
- counts eventCounter
- root *eventDir
- ctx context.Context
- }
- type eventCounter struct {
- removes int // Includes mixed events.
- nonRemoves int
- }
- func (c *eventCounter) add(typ fs.EventType, n int) {
- if typ&fs.Remove != 0 {
- c.removes += n
- } else {
- c.nonRemoves += n
- }
- }
- func (c *eventCounter) total() int { return c.removes + c.nonRemoves }
- func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {
- a := &aggregator{
- folderID: folderCfg.ID,
- folderCfgUpdate: make(chan config.FolderConfiguration),
- notifyTimerNeedsReset: false,
- notifyTimerResetChan: make(chan time.Duration),
- root: newEventDir(),
- ctx: ctx,
- }
- a.updateConfig(folderCfg)
- return a
- }
- func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {
- a := newAggregator(ctx, folderCfg)
- // Necessary for unit tests where the backend is mocked
- go a.mainLoop(in, out, cfg, evLogger)
- }
- func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) {
- a.notifyTimer = time.NewTimer(a.notifyDelay)
- defer a.notifyTimer.Stop()
- inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished)
- defer inProgressItemSubscription.Unsubscribe()
- cfg.Subscribe(a)
- defer cfg.Unsubscribe(a)
- inProgress := make(map[string]struct{})
- for {
- select {
- case event := <-in:
- a.newEvent(event, inProgress)
- case event, ok := <-inProgressItemSubscription.C():
- if ok {
- updateInProgressSet(event, inProgress)
- }
- case <-a.notifyTimer.C:
- a.actOnTimer(out)
- case interval := <-a.notifyTimerResetChan:
- a.resetNotifyTimer(interval)
- case folderCfg := <-a.folderCfgUpdate:
- a.updateConfig(folderCfg)
- case <-a.ctx.Done():
- l.Debugln(a, "Stopped")
- return
- }
- }
- }
- func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {
- if _, ok := a.root.events["."]; ok {
- l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
- return
- }
- if _, ok := inProgress[event.Name]; ok {
- l.Debugln(a, "Skipping path we modified:", event.Name)
- return
- }
- a.aggregateEvent(event, time.Now())
- }
- func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {
- if event.Name == "." || a.counts.total() == maxFiles {
- l.Debugln(a, "Scan entire folder")
- firstModTime := evTime
- if a.root.childCount() != 0 {
- event.Type = event.Type.Merge(a.root.eventType())
- firstModTime = a.root.firstModTime()
- }
- a.root.dirs = make(map[string]*eventDir)
- a.root.events = make(map[string]*aggregatedEvent)
- a.root.events["."] = &aggregatedEvent{
- firstModTime: firstModTime,
- lastModTime: evTime,
- evType: event.Type,
- }
- a.counts = eventCounter{}
- a.counts.add(event.Type, 1)
- a.resetNotifyTimerIfNeeded()
- return
- }
- parentDir := a.root
- // Check if any parent directory is already tracked or will exceed
- // events per directory limit bottom up
- pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
- // As root dir cannot be further aggregated, allow up to maxFiles
- // children.
- localMaxFilesPerDir := maxFiles
- var currPath string
- for i, name := range pathSegments[:len(pathSegments)-1] {
- currPath = filepath.Join(currPath, name)
- if ev, ok := parentDir.events[name]; ok {
- ev.lastModTime = evTime
- if merged := event.Type.Merge(ev.evType); ev.evType != merged {
- a.counts.add(ev.evType, -1)
- a.counts.add(merged, 1)
- ev.evType = merged
- }
- l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
- return
- }
- if parentDir.childCount() == localMaxFilesPerDir {
- l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
- event.Name = filepath.Dir(currPath)
- a.aggregateEvent(event, evTime)
- return
- }
- // If there are no events below path, but we need to recurse
- // into that path, create eventDir at path.
- if newParent, ok := parentDir.dirs[name]; ok {
- parentDir = newParent
- } else {
- l.Debugln(a, "Creating eventDir at:", currPath)
- newParent = newEventDir()
- parentDir.dirs[name] = newParent
- parentDir = newParent
- }
- // Reset allowed children count to maxFilesPerDir for non-root
- if i == 0 {
- localMaxFilesPerDir = maxFilesPerDir
- }
- }
- name := pathSegments[len(pathSegments)-1]
- if ev, ok := parentDir.events[name]; ok {
- ev.lastModTime = evTime
- if merged := event.Type.Merge(ev.evType); ev.evType != merged {
- a.counts.add(ev.evType, -1)
- a.counts.add(merged, 1)
- ev.evType = merged
- }
- l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
- return
- }
- childDir, ok := parentDir.dirs[name]
- // If a dir existed at path, it would be removed from dirs, thus
- // childCount would not increase.
- if !ok && parentDir.childCount() == localMaxFilesPerDir {
- l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
- event.Name = filepath.Dir(event.Name)
- a.aggregateEvent(event, evTime)
- return
- }
- firstModTime := evTime
- if ok {
- firstModTime = childDir.firstModTime()
- if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {
- a.counts.add(event.Type, -1)
- event.Type = merged
- }
- delete(parentDir.dirs, name)
- }
- l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
- parentDir.events[name] = &aggregatedEvent{
- firstModTime: firstModTime,
- lastModTime: evTime,
- evType: event.Type,
- }
- a.counts.add(event.Type, 1)
- a.resetNotifyTimerIfNeeded()
- }
- func (a *aggregator) resetNotifyTimerIfNeeded() {
- if a.notifyTimerNeedsReset {
- a.resetNotifyTimer(a.notifyDelay)
- }
- }
- // resetNotifyTimer should only ever be called when notifyTimer has stopped
- // and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
- func (a *aggregator) resetNotifyTimer(duration time.Duration) {
- l.Debugln(a, "Resetting notifyTimer to", duration.String())
- a.notifyTimerNeedsReset = false
- a.notifyTimer.Reset(duration)
- }
- func (a *aggregator) actOnTimer(out chan<- []string) {
- c := a.counts.total()
- if c == 0 {
- l.Debugln(a, "No tracked events, waiting for new event.")
- a.notifyTimerNeedsReset = true
- return
- }
- oldEvents := make(map[string]*aggregatedEvent, c)
- a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
- if a.notifyDelay != a.notifyTimeout && a.counts.nonRemoves == 0 && a.counts.removes != 0 {
- // Only delayed events remaining, no need to delay them additionally
- a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
- }
- if len(oldEvents) == 0 {
- l.Debugln(a, "No old fs events")
- a.resetNotifyTimer(a.notifyDelay)
- return
- }
- // Sending to channel might block for a long time, but we need to keep
- // reading from notify backend channel to avoid overflow
- go a.notify(oldEvents, out)
- }
- // Schedule scan for given events dispatching deletes last and reset notification
- // afterwards to set up for the next scan scheduling.
- func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
- timeBeforeSending := time.Now()
- l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
- separatedBatches := make(map[fs.EventType][]string)
- for path, event := range oldEvents {
- separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
- }
- for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
- currBatch := separatedBatches[evType]
- if len(currBatch) != 0 {
- select {
- case out <- currBatch:
- case <-a.ctx.Done():
- return
- }
- }
- }
- // If sending to channel blocked for a long time,
- // shorten next notifyDelay accordingly.
- duration := time.Since(timeBeforeSending)
- buffer := time.Millisecond
- var nextDelay time.Duration
- switch {
- case duration < a.notifyDelay/10:
- nextDelay = a.notifyDelay
- case duration+buffer > a.notifyDelay:
- nextDelay = buffer
- default:
- nextDelay = a.notifyDelay - duration
- }
- select {
- case a.notifyTimerResetChan <- nextDelay:
- case <-a.ctx.Done():
- }
- }
- // popOldEvents finds events that should be scheduled for scanning recursively in dirs,
- // removes those events and empty eventDirs and returns a map with all the removed
- // events referenced by their filesystem path
- func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {
- for childName, childDir := range dir.dirs {
- a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)
- if childDir.childCount() == 0 {
- delete(dir.dirs, childName)
- }
- }
- for name, event := range dir.events {
- if a.isOld(event, currTime, delayRem) {
- to[filepath.Join(dirPath, name)] = event
- delete(dir.events, name)
- a.counts.add(event.evType, -1)
- }
- }
- }
- func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {
- // Deletes should in general be scanned last, therefore they are delayed by
- // letting them time out. This behaviour is overridden by delayRem == false.
- // Refer to following comments as to why.
- // An event that has not registered any new modifications recently is scanned.
- // a.notifyDelay is the user facing value signifying the normal delay between
- // picking up a modification and scanning it. As scheduling scans happens at
- // regular intervals of a.notifyDelay the delay of a single event is not exactly
- // a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.
- if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
- return true
- }
- // When an event registers repeat modifications or involves removals it
- // is delayed to reduce resource usage, but after a certain time (notifyTimeout)
- // passed it is scanned anyway.
- // If only removals are remaining to be scanned, there is no point to delay
- // removals further, so this behaviour is overridden by delayRem == false.
- return currTime.Sub(ev.firstModTime) > a.notifyTimeout
- }
- func (a *aggregator) String() string {
- return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
- }
- func (a *aggregator) CommitConfiguration(_, to config.Configuration) bool {
- for _, folderCfg := range to.Folders {
- if folderCfg.ID == a.folderID {
- select {
- case a.folderCfgUpdate <- folderCfg:
- case <-a.ctx.Done():
- }
- return true
- }
- }
- // Nothing to do, model will soon stop this
- return true
- }
- func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
- a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
- a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
- a.folderCfg = folderCfg
- }
- func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
- if event.Type == events.ItemStarted {
- path := event.Data.(map[string]string)["item"]
- inProgress[path] = struct{}{}
- } else if event.Type == events.ItemFinished {
- path := event.Data.(map[string]interface{})["item"].(string)
- delete(inProgress, path)
- }
- }
- // Events that involve removals or continuously receive new modifications are
- // delayed but must time out at some point. The following numbers come out of thin
- // air, they were just considered as a sensible compromise between fast updates and
- // saving resources. For short delays the timeout is 6 times the delay, capped at 1
- // minute. For delays longer than 1 minute, the delay and timeout are equal.
- func notifyTimeout(eventDelayS float64) time.Duration {
- const (
- shortDelayS = 10
- shortDelayMultiplicator = 6
- longDelayS = 60
- )
- longDelayTimeout := time.Duration(1) * time.Minute
- if eventDelayS < shortDelayS {
- return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
- }
- if eventDelayS < longDelayS {
- return longDelayTimeout
- }
- return time.Duration(eventDelayS) * time.Second
- }
|