switch.go 767 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. package rx
  2. func (e Effect) SwitchMap(f func(Object)Effect) Effect {
  3. return Effect { func(sched Scheduler, ob *observer) {
  4. var ctx, dispose = ob.context.CreateChild()
  5. var c = new_collector(ob, dispose)
  6. var cur_ctx, cur_dispose = ctx.CreateChild()
  7. sched.run(e, &observer {
  8. context: ctx,
  9. next: func(x Object) {
  10. var item = f(x)
  11. c.new_child()
  12. cur_dispose()
  13. cur_ctx, cur_dispose = ctx.CreateChild()
  14. sched.run(item, &observer {
  15. context: cur_ctx,
  16. next: func(x Object) {
  17. c.pass(x)
  18. },
  19. error: func(e Object) {
  20. c.throw(e)
  21. },
  22. complete: func() {
  23. c.delete_child()
  24. },
  25. })
  26. },
  27. error: func(e Object) {
  28. c.throw(e)
  29. },
  30. complete: func() {
  31. c.parent_complete()
  32. },
  33. })
  34. } }
  35. }