events.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
  7. //go:generate counterfeiter -o mocks/buffered_subscription.go --fake-name BufferedSubscription . BufferedSubscription
  8. // Package events provides event subscription and polling functionality.
  9. package events
  10. import (
  11. "context"
  12. "encoding/json"
  13. "errors"
  14. "fmt"
  15. "runtime"
  16. "time"
  17. "github.com/thejerf/suture/v4"
  18. "github.com/syncthing/syncthing/lib/sync"
  19. )
  20. type EventType int64
  21. const (
  22. Starting EventType = 1 << iota
  23. StartupComplete
  24. DeviceDiscovered
  25. DeviceConnected
  26. DeviceDisconnected
  27. DeviceRejected // DEPRECATED, superseded by PendingDevicesChanged
  28. PendingDevicesChanged
  29. DevicePaused
  30. DeviceResumed
  31. ClusterConfigReceived
  32. LocalChangeDetected
  33. RemoteChangeDetected
  34. LocalIndexUpdated
  35. RemoteIndexUpdated
  36. ItemStarted
  37. ItemFinished
  38. StateChanged
  39. FolderRejected // DEPRECATED, superseded by PendingFoldersChanged
  40. PendingFoldersChanged
  41. ConfigSaved
  42. DownloadProgress
  43. RemoteDownloadProgress
  44. FolderSummary
  45. FolderCompletion
  46. FolderErrors
  47. FolderScanProgress
  48. FolderPaused
  49. FolderResumed
  50. FolderWatchStateChanged
  51. ListenAddressesChanged
  52. LoginAttempt
  53. Failure
  54. AllEvents = (1 << iota) - 1
  55. )
  56. var (
  57. runningTests = false
  58. errNoop = errors.New("method of a noop object called")
  59. )
  60. const eventLogTimeout = 15 * time.Millisecond
  61. func (t EventType) String() string {
  62. switch t {
  63. case Starting:
  64. return "Starting"
  65. case StartupComplete:
  66. return "StartupComplete"
  67. case DeviceDiscovered:
  68. return "DeviceDiscovered"
  69. case DeviceConnected:
  70. return "DeviceConnected"
  71. case DeviceDisconnected:
  72. return "DeviceDisconnected"
  73. case DeviceRejected:
  74. return "DeviceRejected"
  75. case PendingDevicesChanged:
  76. return "PendingDevicesChanged"
  77. case LocalChangeDetected:
  78. return "LocalChangeDetected"
  79. case RemoteChangeDetected:
  80. return "RemoteChangeDetected"
  81. case LocalIndexUpdated:
  82. return "LocalIndexUpdated"
  83. case RemoteIndexUpdated:
  84. return "RemoteIndexUpdated"
  85. case ItemStarted:
  86. return "ItemStarted"
  87. case ItemFinished:
  88. return "ItemFinished"
  89. case StateChanged:
  90. return "StateChanged"
  91. case FolderRejected:
  92. return "FolderRejected"
  93. case PendingFoldersChanged:
  94. return "PendingFoldersChanged"
  95. case ConfigSaved:
  96. return "ConfigSaved"
  97. case DownloadProgress:
  98. return "DownloadProgress"
  99. case RemoteDownloadProgress:
  100. return "RemoteDownloadProgress"
  101. case FolderSummary:
  102. return "FolderSummary"
  103. case FolderCompletion:
  104. return "FolderCompletion"
  105. case FolderErrors:
  106. return "FolderErrors"
  107. case DevicePaused:
  108. return "DevicePaused"
  109. case DeviceResumed:
  110. return "DeviceResumed"
  111. case ClusterConfigReceived:
  112. return "ClusterConfigReceived"
  113. case FolderScanProgress:
  114. return "FolderScanProgress"
  115. case FolderPaused:
  116. return "FolderPaused"
  117. case FolderResumed:
  118. return "FolderResumed"
  119. case ListenAddressesChanged:
  120. return "ListenAddressesChanged"
  121. case LoginAttempt:
  122. return "LoginAttempt"
  123. case FolderWatchStateChanged:
  124. return "FolderWatchStateChanged"
  125. case Failure:
  126. return "Failure"
  127. default:
  128. return "Unknown"
  129. }
  130. }
  131. func (t EventType) MarshalText() ([]byte, error) {
  132. return []byte(t.String()), nil
  133. }
  134. func (t *EventType) UnmarshalJSON(b []byte) error {
  135. var s string
  136. if err := json.Unmarshal(b, &s); err != nil {
  137. return err
  138. }
  139. *t = UnmarshalEventType(s)
  140. return nil
  141. }
  142. func UnmarshalEventType(s string) EventType {
  143. switch s {
  144. case "Starting":
  145. return Starting
  146. case "StartupComplete":
  147. return StartupComplete
  148. case "DeviceDiscovered":
  149. return DeviceDiscovered
  150. case "DeviceConnected":
  151. return DeviceConnected
  152. case "DeviceDisconnected":
  153. return DeviceDisconnected
  154. case "DeviceRejected":
  155. return DeviceRejected
  156. case "PendingDevicesChanged":
  157. return PendingDevicesChanged
  158. case "LocalChangeDetected":
  159. return LocalChangeDetected
  160. case "RemoteChangeDetected":
  161. return RemoteChangeDetected
  162. case "LocalIndexUpdated":
  163. return LocalIndexUpdated
  164. case "RemoteIndexUpdated":
  165. return RemoteIndexUpdated
  166. case "ItemStarted":
  167. return ItemStarted
  168. case "ItemFinished":
  169. return ItemFinished
  170. case "StateChanged":
  171. return StateChanged
  172. case "FolderRejected":
  173. return FolderRejected
  174. case "PendingFoldersChanged":
  175. return PendingFoldersChanged
  176. case "ConfigSaved":
  177. return ConfigSaved
  178. case "DownloadProgress":
  179. return DownloadProgress
  180. case "RemoteDownloadProgress":
  181. return RemoteDownloadProgress
  182. case "FolderSummary":
  183. return FolderSummary
  184. case "FolderCompletion":
  185. return FolderCompletion
  186. case "FolderErrors":
  187. return FolderErrors
  188. case "DevicePaused":
  189. return DevicePaused
  190. case "DeviceResumed":
  191. return DeviceResumed
  192. case "ClusterConfigReceived":
  193. return ClusterConfigReceived
  194. case "FolderScanProgress":
  195. return FolderScanProgress
  196. case "FolderPaused":
  197. return FolderPaused
  198. case "FolderResumed":
  199. return FolderResumed
  200. case "ListenAddressesChanged":
  201. return ListenAddressesChanged
  202. case "LoginAttempt":
  203. return LoginAttempt
  204. case "FolderWatchStateChanged":
  205. return FolderWatchStateChanged
  206. case "Failure":
  207. return Failure
  208. default:
  209. return 0
  210. }
  211. }
  212. const BufferSize = 64
  213. type Logger interface {
  214. suture.Service
  215. Log(t EventType, data interface{})
  216. Subscribe(mask EventType) Subscription
  217. }
  218. type logger struct {
  219. subs []*subscription
  220. nextSubscriptionIDs []int
  221. nextGlobalID int
  222. timeout *time.Timer
  223. events chan Event
  224. funcs chan func(context.Context)
  225. toUnsubscribe chan *subscription
  226. }
  227. type Event struct {
  228. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  229. SubscriptionID int `json:"id"`
  230. // Global ID of the event across all subscriptions
  231. GlobalID int `json:"globalID"`
  232. Time time.Time `json:"time"`
  233. Type EventType `json:"type"`
  234. Data interface{} `json:"data"`
  235. }
  236. type Subscription interface {
  237. C() <-chan Event
  238. Poll(timeout time.Duration) (Event, error)
  239. Mask() EventType
  240. Unsubscribe()
  241. }
  242. type subscription struct {
  243. mask EventType
  244. events chan Event
  245. toUnsubscribe chan *subscription
  246. timeout *time.Timer
  247. ctx context.Context
  248. }
  249. var (
  250. ErrTimeout = errors.New("timeout")
  251. ErrClosed = errors.New("closed")
  252. )
  253. func NewLogger() Logger {
  254. l := &logger{
  255. timeout: time.NewTimer(time.Second),
  256. events: make(chan Event, BufferSize),
  257. funcs: make(chan func(context.Context)),
  258. toUnsubscribe: make(chan *subscription),
  259. }
  260. // Make sure the timer is in the stopped state and hasn't fired anything
  261. // into the channel.
  262. if !l.timeout.Stop() {
  263. <-l.timeout.C
  264. }
  265. return l
  266. }
  267. func (l *logger) Serve(ctx context.Context) error {
  268. loop:
  269. for {
  270. select {
  271. case e := <-l.events:
  272. // Incoming events get sent
  273. l.sendEvent(e)
  274. metricEvents.WithLabelValues(e.Type.String(), metricEventStateCreated).Inc()
  275. case fn := <-l.funcs:
  276. // Subscriptions are handled here.
  277. fn(ctx)
  278. case s := <-l.toUnsubscribe:
  279. l.unsubscribe(s)
  280. case <-ctx.Done():
  281. break loop
  282. }
  283. }
  284. // Closing the event channels corresponds to what happens when a
  285. // subscription is unsubscribed; this stops any BufferedSubscription,
  286. // makes Poll() return ErrClosed, etc.
  287. for _, s := range l.subs {
  288. close(s.events)
  289. }
  290. return nil
  291. }
  292. func (l *logger) Log(t EventType, data interface{}) {
  293. l.events <- Event{
  294. Time: time.Now(), // intentionally high precision
  295. Type: t,
  296. Data: data,
  297. // SubscriptionID and GlobalID are set in sendEvent
  298. }
  299. }
  300. func (l *logger) sendEvent(e Event) {
  301. l.nextGlobalID++
  302. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  303. e.GlobalID = l.nextGlobalID
  304. for i, s := range l.subs {
  305. if s.mask&e.Type != 0 {
  306. e.SubscriptionID = l.nextSubscriptionIDs[i]
  307. l.nextSubscriptionIDs[i]++
  308. l.timeout.Reset(eventLogTimeout)
  309. timedOut := false
  310. select {
  311. case s.events <- e:
  312. metricEvents.WithLabelValues(e.Type.String(), metricEventStateDelivered).Inc()
  313. case <-l.timeout.C:
  314. // if s.events is not ready, drop the event
  315. timedOut = true
  316. metricEvents.WithLabelValues(e.Type.String(), metricEventStateDropped).Inc()
  317. }
  318. // If stop returns false it already sent something to the
  319. // channel. If we didn't already read it above we must do so now
  320. // or we get a spurious timeout on the next loop.
  321. if !l.timeout.Stop() && !timedOut {
  322. <-l.timeout.C
  323. }
  324. }
  325. }
  326. }
  327. func (l *logger) Subscribe(mask EventType) Subscription {
  328. res := make(chan Subscription)
  329. l.funcs <- func(ctx context.Context) {
  330. dl.Debugln("subscribe", mask)
  331. s := &subscription{
  332. mask: mask,
  333. events: make(chan Event, BufferSize),
  334. toUnsubscribe: l.toUnsubscribe,
  335. timeout: time.NewTimer(0),
  336. ctx: ctx,
  337. }
  338. // We need to create the timeout timer in the stopped, non-fired state so
  339. // that Subscription.Poll() can safely reset it and select on the timeout
  340. // channel. This ensures the timer is stopped and the channel drained.
  341. if runningTests {
  342. // Make the behavior stable when running tests to avoid randomly
  343. // varying test coverage. This ensures, in practice if not in
  344. // theory, that the timer fires and we take the true branch of the
  345. // next if.
  346. runtime.Gosched()
  347. }
  348. if !s.timeout.Stop() {
  349. <-s.timeout.C
  350. }
  351. l.subs = append(l.subs, s)
  352. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  353. res <- s
  354. }
  355. return <-res
  356. }
  357. func (l *logger) unsubscribe(s *subscription) {
  358. dl.Debugln("unsubscribe", s.mask)
  359. for i, ss := range l.subs {
  360. if s == ss {
  361. last := len(l.subs) - 1
  362. l.subs[i] = l.subs[last]
  363. l.subs[last] = nil
  364. l.subs = l.subs[:last]
  365. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  366. l.nextSubscriptionIDs[last] = 0
  367. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  368. break
  369. }
  370. }
  371. close(s.events)
  372. }
  373. func (l *logger) String() string {
  374. return fmt.Sprintf("events.Logger/@%p", l)
  375. }
  376. // Poll returns an event from the subscription or an error if the poll times
  377. // out of the event channel is closed. Poll should not be called concurrently
  378. // from multiple goroutines for a single subscription.
  379. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  380. dl.Debugln("poll", timeout)
  381. s.timeout.Reset(timeout)
  382. select {
  383. case e, ok := <-s.events:
  384. if !ok {
  385. return e, ErrClosed
  386. }
  387. if runningTests {
  388. // Make the behavior stable when running tests to avoid randomly
  389. // varying test coverage. This ensures, in practice if not in
  390. // theory, that the timer fires and we take the true branch of
  391. // the next if.
  392. s.timeout.Reset(0)
  393. runtime.Gosched()
  394. }
  395. if !s.timeout.Stop() {
  396. // The timeout must be stopped and possibly drained to be ready
  397. // for reuse in the next call.
  398. <-s.timeout.C
  399. }
  400. return e, nil
  401. case <-s.timeout.C:
  402. return Event{}, ErrTimeout
  403. }
  404. }
  405. func (s *subscription) C() <-chan Event {
  406. return s.events
  407. }
  408. func (s *subscription) Mask() EventType {
  409. return s.mask
  410. }
  411. func (s *subscription) Unsubscribe() {
  412. select {
  413. case s.toUnsubscribe <- s:
  414. case <-s.ctx.Done():
  415. }
  416. }
  417. type bufferedSubscription struct {
  418. sub Subscription
  419. buf []Event
  420. next int
  421. cur int // Current SubscriptionID
  422. mut sync.Mutex
  423. cond *sync.TimeoutCond
  424. }
  425. type BufferedSubscription interface {
  426. Since(id int, into []Event, timeout time.Duration) []Event
  427. Mask() EventType
  428. }
  429. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  430. bs := &bufferedSubscription{
  431. sub: s,
  432. buf: make([]Event, size),
  433. mut: sync.NewMutex(),
  434. }
  435. bs.cond = sync.NewTimeoutCond(bs.mut)
  436. go bs.pollingLoop()
  437. return bs
  438. }
  439. func (s *bufferedSubscription) pollingLoop() {
  440. for ev := range s.sub.C() {
  441. s.mut.Lock()
  442. s.buf[s.next] = ev
  443. s.next = (s.next + 1) % len(s.buf)
  444. s.cur = ev.SubscriptionID
  445. s.cond.Broadcast()
  446. s.mut.Unlock()
  447. }
  448. }
  449. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  450. s.mut.Lock()
  451. defer s.mut.Unlock()
  452. // Check once first before generating the TimeoutCondWaiter
  453. if id >= s.cur {
  454. waiter := s.cond.SetupWait(timeout)
  455. defer waiter.Stop()
  456. for id >= s.cur {
  457. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  458. // Timed out
  459. return into
  460. }
  461. }
  462. }
  463. for i := s.next; i < len(s.buf); i++ {
  464. if s.buf[i].SubscriptionID > id {
  465. into = append(into, s.buf[i])
  466. }
  467. }
  468. for i := 0; i < s.next; i++ {
  469. if s.buf[i].SubscriptionID > id {
  470. into = append(into, s.buf[i])
  471. }
  472. }
  473. return into
  474. }
  475. func (s *bufferedSubscription) Mask() EventType {
  476. return s.sub.Mask()
  477. }
  478. // Error returns a string pointer suitable for JSON marshalling errors. It
  479. // retains the "null on success" semantics, but ensures the error result is a
  480. // string regardless of the underlying concrete error type.
  481. func Error(err error) *string {
  482. if err == nil {
  483. return nil
  484. }
  485. str := err.Error()
  486. return &str
  487. }
  488. type noopLogger struct{}
  489. var NoopLogger Logger = &noopLogger{}
  490. func (*noopLogger) Serve(_ context.Context) error { return nil }
  491. func (*noopLogger) Log(_ EventType, _ interface{}) {}
  492. func (*noopLogger) Subscribe(_ EventType) Subscription {
  493. return &noopSubscription{}
  494. }
  495. type noopSubscription struct{}
  496. func (*noopSubscription) C() <-chan Event {
  497. return nil
  498. }
  499. func (*noopSubscription) Poll(_ time.Duration) (Event, error) {
  500. return Event{}, errNoop
  501. }
  502. func (*noopSubscription) Mask() EventType {
  503. return 0
  504. }
  505. func (*noopSubscription) Unsubscribe() {}