merge.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package rx
  2. func Merge(actions ([] Observable)) Observable {
  3. return Observable { func(sched Scheduler, ob *observer) {
  4. var ctx, dispose = ob.context.create_disposable_child()
  5. var c = new_collector(ob, dispose)
  6. for _, item := range actions {
  7. c.new_child()
  8. sched.run(item, &observer {
  9. context: ctx,
  10. next: func(x Object) {
  11. c.pass(x)
  12. },
  13. error: func(e Object) {
  14. c.throw(e)
  15. },
  16. complete: func() {
  17. c.delete_child()
  18. },
  19. })
  20. }
  21. c.parent_complete()
  22. } }
  23. }
  24. func (e Observable) With(side Observable) Observable {
  25. return Observable { func(sched Scheduler, ob *observer) {
  26. sched.run(side, &observer {
  27. context: ob.context,
  28. next: func(_ Object) {},
  29. error: func(_ Object) {},
  30. complete: func() {},
  31. })
  32. sched.run(e, &observer {
  33. context: ob.context,
  34. next: ob.next,
  35. error: ob.error,
  36. complete: ob.complete,
  37. })
  38. } }
  39. }
  40. func (e Observable) MergeMap(f func(Object) Observable) Observable {
  41. return Observable { func(sched Scheduler, ob *observer) {
  42. var ctx, ctx_dispose = ob.context.create_disposable_child()
  43. var c = new_collector(ob, ctx_dispose)
  44. sched.run(e, &observer {
  45. context: ctx,
  46. next: func(x Object) {
  47. var item = f(x)
  48. c.new_child()
  49. sched.run(item, &observer {
  50. context: ctx,
  51. next: func(x Object) {
  52. c.pass(x)
  53. },
  54. error: func(e Object) {
  55. c.throw(e)
  56. },
  57. complete: func() {
  58. c.delete_child()
  59. },
  60. })
  61. },
  62. error: func(e Object) {
  63. c.throw(e)
  64. },
  65. complete: func() {
  66. c.parent_complete()
  67. },
  68. })
  69. } }
  70. }
  71. type collector struct {
  72. observer *observer
  73. dispose disposeFunc
  74. num_children uint
  75. no_more_children bool
  76. }
  77. func new_collector(ob *observer, dispose disposeFunc) *collector {
  78. return &collector {
  79. observer: ob,
  80. dispose: dispose,
  81. num_children: 0,
  82. no_more_children: false,
  83. }
  84. }
  85. func (c *collector) pass(x Object) {
  86. c.observer.next(x)
  87. }
  88. func (c *collector) throw(e Object) {
  89. c.observer.error(e)
  90. c.dispose(behaviour_cancel)
  91. }
  92. func (c *collector) new_child() {
  93. c.num_children += 1
  94. }
  95. func (c *collector) delete_child() {
  96. if c.num_children == 0 { panic("something went wrong") }
  97. c.num_children -= 1
  98. if c.num_children == 0 && c.no_more_children {
  99. c.observer.complete()
  100. c.dispose(behaviour_terminate)
  101. }
  102. }
  103. func (c *collector) parent_complete() {
  104. c.no_more_children = true
  105. if c.num_children == 0 {
  106. c.observer.complete()
  107. c.dispose(behaviour_terminate)
  108. }
  109. }