123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package rx
- func Merge(actions ([] Observable)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, dispose = ob.context.create_disposable_child()
- var c = new_collector(ob, dispose)
- for _, item := range actions {
- c.new_child()
- sched.run(item, &observer {
- context: ctx,
- next: func(x Object) {
- c.pass(x)
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- }
- c.parent_complete()
- } }
- }
- func (e Observable) With(side Observable) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- sched.run(side, &observer {
- context: ob.context,
- next: func(_ Object) {},
- error: func(_ Object) {},
- complete: func() {},
- })
- sched.run(e, &observer {
- context: ob.context,
- next: ob.next,
- error: ob.error,
- complete: ob.complete,
- })
- } }
- }
- func (e Observable) MergeMap(f func(Object) Observable) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, ctx_dispose = ob.context.create_disposable_child()
- var c = new_collector(ob, ctx_dispose)
- sched.run(e, &observer {
- context: ctx,
- next: func(x Object) {
- var item = f(x)
- c.new_child()
- sched.run(item, &observer {
- context: ctx,
- next: func(x Object) {
- c.pass(x)
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.parent_complete()
- },
- })
- } }
- }
- type collector struct {
- observer *observer
- dispose disposeFunc
- num_children uint
- no_more_children bool
- }
- func new_collector(ob *observer, dispose disposeFunc) *collector {
- return &collector {
- observer: ob,
- dispose: dispose,
- num_children: 0,
- no_more_children: false,
- }
- }
- func (c *collector) pass(x Object) {
- c.observer.next(x)
- }
- func (c *collector) throw(e Object) {
- c.observer.error(e)
- c.dispose(behaviour_cancel)
- }
- func (c *collector) new_child() {
- c.num_children += 1
- }
- func (c *collector) delete_child() {
- if c.num_children == 0 { panic("something went wrong") }
- c.num_children -= 1
- if c.num_children == 0 && c.no_more_children {
- c.observer.complete()
- c.dispose(behaviour_terminate)
- }
- }
- func (c *collector) parent_complete() {
- c.no_more_children = true
- if c.num_children == 0 {
- c.observer.complete()
- c.dispose(behaviour_terminate)
- }
- }
|