timer.go 622 B

123456789101112131415161718192021222324252627282930313233
  1. package rx
  2. import "time"
  3. func Timer(timeout uint) Observable {
  4. return NewGoroutine(func(sender Sender) {
  5. var timer = time.NewTimer(time.Duration(timeout) * time.Millisecond)
  6. go (func() {
  7. <- timer.C
  8. sender.Next(nil)
  9. sender.Complete()
  10. })()
  11. sender.Context().WaitDispose(func() {
  12. timer.Stop()
  13. })
  14. })
  15. }
  16. func Ticker(interval uint) Observable {
  17. return NewGoroutine(func(sender Sender) {
  18. var ticker = time.NewTicker(time.Duration(interval) * time.Millisecond)
  19. go (func() {
  20. for range ticker.C {
  21. sender.Next(nil)
  22. }
  23. })()
  24. sender.Context().WaitDispose(func() {
  25. ticker.Stop()
  26. })
  27. })
  28. }