switch.go 1003 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package rx
  2. func (e Observable) SwitchMap(f func(Object) Observable) Observable {
  3. return Observable { func(sched Scheduler, ob *observer) {
  4. var ctx, dispose = ob.context.create_disposable_child()
  5. var c = new_collector(ob, dispose)
  6. var cur_ctx, cur_dispose = ctx.create_disposable_child()
  7. var cur_terminated = false
  8. sched.run(e, &observer {
  9. context: ctx,
  10. next: func(x Object) {
  11. var item = f(x)
  12. c.new_child()
  13. if cur_terminated {
  14. cur_dispose(behaviour_terminate)
  15. } else {
  16. cur_dispose(behaviour_cancel)
  17. }
  18. cur_ctx, cur_dispose = ctx.create_disposable_child()
  19. sched.run(item, &observer {
  20. context: cur_ctx,
  21. next: func(x Object) {
  22. c.pass(x)
  23. },
  24. error: func(e Object) {
  25. c.throw(e)
  26. cur_terminated = true
  27. },
  28. complete: func() {
  29. c.delete_child()
  30. cur_terminated = true
  31. },
  32. })
  33. },
  34. error: func(e Object) {
  35. c.throw(e)
  36. },
  37. complete: func() {
  38. c.parent_complete()
  39. },
  40. })
  41. } }
  42. }