trivial.go 850 B

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package rx
  2. type TrivialScheduler struct {
  3. EventLoop *EventLoop
  4. }
  5. func (sched TrivialScheduler) dispatch(ev event) {
  6. sched.EventLoop.dispatch(ev)
  7. }
  8. func (sched TrivialScheduler) commit(t task) {
  9. sched.EventLoop.commit(t)
  10. }
  11. func (sched TrivialScheduler) run(observable Observable, output *observer) {
  12. if output.context.disposed {
  13. panic("cannot run an observable within a disposed context")
  14. }
  15. var terminated = false
  16. observable.effect(sched, &observer {
  17. context: output.context,
  18. next: func(x Object) {
  19. if !terminated && !output.context.disposed {
  20. output.next(x)
  21. }
  22. },
  23. error: func(e Object) {
  24. if !terminated && !output.context.disposed {
  25. terminated = true
  26. output.error(e)
  27. }
  28. },
  29. complete: func() {
  30. if !terminated && !output.context.disposed {
  31. terminated = true
  32. output.complete()
  33. }
  34. },
  35. })
  36. }