events.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "context"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "runtime"
  14. "time"
  15. "github.com/thejerf/suture/v4"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. )
  18. type EventType int64
  19. const (
  20. Starting EventType = 1 << iota
  21. StartupComplete
  22. DeviceDiscovered
  23. DeviceConnected
  24. DeviceDisconnected
  25. DeviceRejected
  26. DevicePaused
  27. DeviceResumed
  28. LocalChangeDetected
  29. RemoteChangeDetected
  30. LocalIndexUpdated
  31. RemoteIndexUpdated
  32. ItemStarted
  33. ItemFinished
  34. StateChanged
  35. FolderRejected
  36. ConfigSaved
  37. DownloadProgress
  38. RemoteDownloadProgress
  39. FolderSummary
  40. FolderCompletion
  41. FolderErrors
  42. FolderScanProgress
  43. FolderPaused
  44. FolderResumed
  45. FolderWatchStateChanged
  46. ListenAddressesChanged
  47. LoginAttempt
  48. Failure
  49. AllEvents = (1 << iota) - 1
  50. )
  51. var (
  52. runningTests = false
  53. errNoop = errors.New("method of a noop object called")
  54. )
  55. const eventLogTimeout = 15 * time.Millisecond
  56. func (t EventType) String() string {
  57. switch t {
  58. case Starting:
  59. return "Starting"
  60. case StartupComplete:
  61. return "StartupComplete"
  62. case DeviceDiscovered:
  63. return "DeviceDiscovered"
  64. case DeviceConnected:
  65. return "DeviceConnected"
  66. case DeviceDisconnected:
  67. return "DeviceDisconnected"
  68. case DeviceRejected:
  69. return "DeviceRejected"
  70. case LocalChangeDetected:
  71. return "LocalChangeDetected"
  72. case RemoteChangeDetected:
  73. return "RemoteChangeDetected"
  74. case LocalIndexUpdated:
  75. return "LocalIndexUpdated"
  76. case RemoteIndexUpdated:
  77. return "RemoteIndexUpdated"
  78. case ItemStarted:
  79. return "ItemStarted"
  80. case ItemFinished:
  81. return "ItemFinished"
  82. case StateChanged:
  83. return "StateChanged"
  84. case FolderRejected:
  85. return "FolderRejected"
  86. case ConfigSaved:
  87. return "ConfigSaved"
  88. case DownloadProgress:
  89. return "DownloadProgress"
  90. case RemoteDownloadProgress:
  91. return "RemoteDownloadProgress"
  92. case FolderSummary:
  93. return "FolderSummary"
  94. case FolderCompletion:
  95. return "FolderCompletion"
  96. case FolderErrors:
  97. return "FolderErrors"
  98. case DevicePaused:
  99. return "DevicePaused"
  100. case DeviceResumed:
  101. return "DeviceResumed"
  102. case FolderScanProgress:
  103. return "FolderScanProgress"
  104. case FolderPaused:
  105. return "FolderPaused"
  106. case FolderResumed:
  107. return "FolderResumed"
  108. case ListenAddressesChanged:
  109. return "ListenAddressesChanged"
  110. case LoginAttempt:
  111. return "LoginAttempt"
  112. case FolderWatchStateChanged:
  113. return "FolderWatchStateChanged"
  114. case Failure:
  115. return "Failure"
  116. default:
  117. return "Unknown"
  118. }
  119. }
  120. func (t EventType) MarshalText() ([]byte, error) {
  121. return []byte(t.String()), nil
  122. }
  123. func (t *EventType) UnmarshalJSON(b []byte) error {
  124. var s string
  125. if err := json.Unmarshal(b, &s); err != nil {
  126. return err
  127. }
  128. *t = UnmarshalEventType(s)
  129. return nil
  130. }
  131. func UnmarshalEventType(s string) EventType {
  132. switch s {
  133. case "Starting":
  134. return Starting
  135. case "StartupComplete":
  136. return StartupComplete
  137. case "DeviceDiscovered":
  138. return DeviceDiscovered
  139. case "DeviceConnected":
  140. return DeviceConnected
  141. case "DeviceDisconnected":
  142. return DeviceDisconnected
  143. case "DeviceRejected":
  144. return DeviceRejected
  145. case "LocalChangeDetected":
  146. return LocalChangeDetected
  147. case "RemoteChangeDetected":
  148. return RemoteChangeDetected
  149. case "LocalIndexUpdated":
  150. return LocalIndexUpdated
  151. case "RemoteIndexUpdated":
  152. return RemoteIndexUpdated
  153. case "ItemStarted":
  154. return ItemStarted
  155. case "ItemFinished":
  156. return ItemFinished
  157. case "StateChanged":
  158. return StateChanged
  159. case "FolderRejected":
  160. return FolderRejected
  161. case "ConfigSaved":
  162. return ConfigSaved
  163. case "DownloadProgress":
  164. return DownloadProgress
  165. case "RemoteDownloadProgress":
  166. return RemoteDownloadProgress
  167. case "FolderSummary":
  168. return FolderSummary
  169. case "FolderCompletion":
  170. return FolderCompletion
  171. case "FolderErrors":
  172. return FolderErrors
  173. case "DevicePaused":
  174. return DevicePaused
  175. case "DeviceResumed":
  176. return DeviceResumed
  177. case "FolderScanProgress":
  178. return FolderScanProgress
  179. case "FolderPaused":
  180. return FolderPaused
  181. case "FolderResumed":
  182. return FolderResumed
  183. case "ListenAddressesChanged":
  184. return ListenAddressesChanged
  185. case "LoginAttempt":
  186. return LoginAttempt
  187. case "FolderWatchStateChanged":
  188. return FolderWatchStateChanged
  189. case "Failure":
  190. return Failure
  191. default:
  192. return 0
  193. }
  194. }
  195. const BufferSize = 64
  196. type Logger interface {
  197. suture.Service
  198. Log(t EventType, data interface{})
  199. Subscribe(mask EventType) Subscription
  200. }
  201. type logger struct {
  202. subs []*subscription
  203. nextSubscriptionIDs []int
  204. nextGlobalID int
  205. timeout *time.Timer
  206. events chan Event
  207. funcs chan func(context.Context)
  208. toUnsubscribe chan *subscription
  209. stop chan struct{}
  210. }
  211. type Event struct {
  212. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  213. SubscriptionID int `json:"id"`
  214. // Global ID of the event across all subscriptions
  215. GlobalID int `json:"globalID"`
  216. Time time.Time `json:"time"`
  217. Type EventType `json:"type"`
  218. Data interface{} `json:"data"`
  219. }
  220. type Subscription interface {
  221. C() <-chan Event
  222. Poll(timeout time.Duration) (Event, error)
  223. Mask() EventType
  224. Unsubscribe()
  225. }
  226. type subscription struct {
  227. mask EventType
  228. events chan Event
  229. toUnsubscribe chan *subscription
  230. timeout *time.Timer
  231. ctx context.Context
  232. }
  233. var (
  234. ErrTimeout = errors.New("timeout")
  235. ErrClosed = errors.New("closed")
  236. )
  237. func NewLogger() Logger {
  238. l := &logger{
  239. timeout: time.NewTimer(time.Second),
  240. events: make(chan Event, BufferSize),
  241. funcs: make(chan func(context.Context)),
  242. toUnsubscribe: make(chan *subscription),
  243. }
  244. // Make sure the timer is in the stopped state and hasn't fired anything
  245. // into the channel.
  246. if !l.timeout.Stop() {
  247. <-l.timeout.C
  248. }
  249. return l
  250. }
  251. func (l *logger) Serve(ctx context.Context) error {
  252. loop:
  253. for {
  254. select {
  255. case e := <-l.events:
  256. // Incoming events get sent
  257. l.sendEvent(e)
  258. case fn := <-l.funcs:
  259. // Subscriptions are handled here.
  260. fn(ctx)
  261. case s := <-l.toUnsubscribe:
  262. l.unsubscribe(s)
  263. case <-ctx.Done():
  264. break loop
  265. }
  266. }
  267. // Closing the event channels corresponds to what happens when a
  268. // subscription is unsubscribed; this stops any BufferedSubscription,
  269. // makes Poll() return ErrClosed, etc.
  270. for _, s := range l.subs {
  271. close(s.events)
  272. }
  273. return nil
  274. }
  275. func (l *logger) Log(t EventType, data interface{}) {
  276. l.events <- Event{
  277. Time: time.Now(),
  278. Type: t,
  279. Data: data,
  280. // SubscriptionID and GlobalID are set in sendEvent
  281. }
  282. }
  283. func (l *logger) sendEvent(e Event) {
  284. l.nextGlobalID++
  285. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  286. e.GlobalID = l.nextGlobalID
  287. for i, s := range l.subs {
  288. if s.mask&e.Type != 0 {
  289. e.SubscriptionID = l.nextSubscriptionIDs[i]
  290. l.nextSubscriptionIDs[i]++
  291. l.timeout.Reset(eventLogTimeout)
  292. timedOut := false
  293. select {
  294. case s.events <- e:
  295. case <-l.timeout.C:
  296. // if s.events is not ready, drop the event
  297. timedOut = true
  298. }
  299. // If stop returns false it already sent something to the
  300. // channel. If we didn't already read it above we must do so now
  301. // or we get a spurious timeout on the next loop.
  302. if !l.timeout.Stop() && !timedOut {
  303. <-l.timeout.C
  304. }
  305. }
  306. }
  307. }
  308. func (l *logger) Subscribe(mask EventType) Subscription {
  309. res := make(chan Subscription)
  310. l.funcs <- func(ctx context.Context) {
  311. dl.Debugln("subscribe", mask)
  312. s := &subscription{
  313. mask: mask,
  314. events: make(chan Event, BufferSize),
  315. toUnsubscribe: l.toUnsubscribe,
  316. timeout: time.NewTimer(0),
  317. ctx: ctx,
  318. }
  319. // We need to create the timeout timer in the stopped, non-fired state so
  320. // that Subscription.Poll() can safely reset it and select on the timeout
  321. // channel. This ensures the timer is stopped and the channel drained.
  322. if runningTests {
  323. // Make the behavior stable when running tests to avoid randomly
  324. // varying test coverage. This ensures, in practice if not in
  325. // theory, that the timer fires and we take the true branch of the
  326. // next if.
  327. runtime.Gosched()
  328. }
  329. if !s.timeout.Stop() {
  330. <-s.timeout.C
  331. }
  332. l.subs = append(l.subs, s)
  333. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  334. res <- s
  335. }
  336. return <-res
  337. }
  338. func (l *logger) unsubscribe(s *subscription) {
  339. dl.Debugln("unsubscribe", s.mask)
  340. for i, ss := range l.subs {
  341. if s == ss {
  342. last := len(l.subs) - 1
  343. l.subs[i] = l.subs[last]
  344. l.subs[last] = nil
  345. l.subs = l.subs[:last]
  346. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  347. l.nextSubscriptionIDs[last] = 0
  348. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  349. break
  350. }
  351. }
  352. close(s.events)
  353. }
  354. func (l *logger) String() string {
  355. return fmt.Sprintf("events.Logger/@%p", l)
  356. }
  357. // Poll returns an event from the subscription or an error if the poll times
  358. // out of the event channel is closed. Poll should not be called concurrently
  359. // from multiple goroutines for a single subscription.
  360. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  361. dl.Debugln("poll", timeout)
  362. s.timeout.Reset(timeout)
  363. select {
  364. case e, ok := <-s.events:
  365. if !ok {
  366. return e, ErrClosed
  367. }
  368. if runningTests {
  369. // Make the behavior stable when running tests to avoid randomly
  370. // varying test coverage. This ensures, in practice if not in
  371. // theory, that the timer fires and we take the true branch of
  372. // the next if.
  373. s.timeout.Reset(0)
  374. runtime.Gosched()
  375. }
  376. if !s.timeout.Stop() {
  377. // The timeout must be stopped and possibly drained to be ready
  378. // for reuse in the next call.
  379. <-s.timeout.C
  380. }
  381. return e, nil
  382. case <-s.timeout.C:
  383. return Event{}, ErrTimeout
  384. }
  385. }
  386. func (s *subscription) C() <-chan Event {
  387. return s.events
  388. }
  389. func (s *subscription) Mask() EventType {
  390. return s.mask
  391. }
  392. func (s *subscription) Unsubscribe() {
  393. select {
  394. case s.toUnsubscribe <- s:
  395. case <-s.ctx.Done():
  396. }
  397. }
  398. type bufferedSubscription struct {
  399. sub Subscription
  400. buf []Event
  401. next int
  402. cur int // Current SubscriptionID
  403. mut sync.Mutex
  404. cond *sync.TimeoutCond
  405. }
  406. type BufferedSubscription interface {
  407. Since(id int, into []Event, timeout time.Duration) []Event
  408. Mask() EventType
  409. }
  410. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  411. bs := &bufferedSubscription{
  412. sub: s,
  413. buf: make([]Event, size),
  414. mut: sync.NewMutex(),
  415. }
  416. bs.cond = sync.NewTimeoutCond(bs.mut)
  417. go bs.pollingLoop()
  418. return bs
  419. }
  420. func (s *bufferedSubscription) pollingLoop() {
  421. for ev := range s.sub.C() {
  422. s.mut.Lock()
  423. s.buf[s.next] = ev
  424. s.next = (s.next + 1) % len(s.buf)
  425. s.cur = ev.SubscriptionID
  426. s.cond.Broadcast()
  427. s.mut.Unlock()
  428. }
  429. }
  430. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  431. s.mut.Lock()
  432. defer s.mut.Unlock()
  433. // Check once first before generating the TimeoutCondWaiter
  434. if id >= s.cur {
  435. waiter := s.cond.SetupWait(timeout)
  436. defer waiter.Stop()
  437. for id >= s.cur {
  438. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  439. // Timed out
  440. return into
  441. }
  442. }
  443. }
  444. for i := s.next; i < len(s.buf); i++ {
  445. if s.buf[i].SubscriptionID > id {
  446. into = append(into, s.buf[i])
  447. }
  448. }
  449. for i := 0; i < s.next; i++ {
  450. if s.buf[i].SubscriptionID > id {
  451. into = append(into, s.buf[i])
  452. }
  453. }
  454. return into
  455. }
  456. func (s *bufferedSubscription) Mask() EventType {
  457. return s.sub.Mask()
  458. }
  459. // Error returns a string pointer suitable for JSON marshalling errors. It
  460. // retains the "null on success" semantics, but ensures the error result is a
  461. // string regardless of the underlying concrete error type.
  462. func Error(err error) *string {
  463. if err == nil {
  464. return nil
  465. }
  466. str := err.Error()
  467. return &str
  468. }
  469. type noopLogger struct{}
  470. var NoopLogger Logger = &noopLogger{}
  471. func (*noopLogger) Serve(ctx context.Context) error { return nil }
  472. func (*noopLogger) Stop() {}
  473. func (*noopLogger) Log(t EventType, data interface{}) {}
  474. func (*noopLogger) Subscribe(mask EventType) Subscription {
  475. return &noopSubscription{}
  476. }
  477. type noopSubscription struct{}
  478. func (*noopSubscription) C() <-chan Event {
  479. return nil
  480. }
  481. func (*noopSubscription) Poll(timeout time.Duration) (Event, error) {
  482. return Event{}, errNoop
  483. }
  484. func (s *noopSubscription) Mask() EventType {
  485. return 0
  486. }
  487. func (*noopSubscription) Unsubscribe() {}