debounce.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package rx
  2. import (
  3. "time"
  4. "sync"
  5. )
  6. func (e Observable) DebounceTime(dueTime uint) Observable {
  7. var dur = time.Duration(dueTime) * time.Millisecond
  8. return Observable { func(sched Scheduler, ob *observer) {
  9. var mutex sync.Mutex
  10. var current Object
  11. var current_index = uint64(0)
  12. var notify = make(chan struct{}, 1)
  13. sched.run(e, &observer {
  14. context: ob.context,
  15. next: func(val Object) {
  16. mutex.Lock()
  17. current = val
  18. current_index += 1
  19. mutex.Unlock()
  20. select {
  21. case notify <- struct{} {}:
  22. default:
  23. }
  24. },
  25. error: ob.error,
  26. complete: ob.complete,
  27. })
  28. go (func() {
  29. var latest Object
  30. var latest_index = ^(uint64(0))
  31. var timer *time.Timer
  32. for {
  33. select {
  34. case <- maybeTimerChannel(timer):
  35. sched.dispatch(event {
  36. kind: ev_next,
  37. payload: latest,
  38. observer: ob,
  39. })
  40. case <- notify:
  41. var prev = latest
  42. var prev_index = latest_index
  43. mutex.Lock()
  44. if current_index == prev_index {
  45. mutex.Unlock()
  46. continue
  47. }
  48. latest = current
  49. latest_index = current_index
  50. mutex.Unlock()
  51. if timer == nil {
  52. timer = time.NewTimer(dur)
  53. } else {
  54. if !(timer.Stop()) {
  55. select {
  56. case <- timer.C:
  57. sched.dispatch(event {
  58. kind: ev_next,
  59. payload: prev,
  60. observer: ob,
  61. })
  62. default:
  63. }
  64. }
  65. timer.Reset(dur)
  66. }
  67. } // select
  68. } // infinite loop
  69. })() // go func()
  70. } }
  71. }
  72. func maybeTimerChannel(timer *time.Timer) <-chan time.Time {
  73. if timer == nil {
  74. return nil
  75. } else {
  76. return timer.C
  77. }
  78. }