12345678910111213141516171819202122232425262728 |
- package rx
- func (e Observable) CompleteWhen(p func(Object)(bool)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, ctx_dispose = ob.context.create_disposable_child()
- sched.run(e, &observer {
- context: ctx,
- next: func(obj Object) {
- if p(obj) {
- ctx_dispose(behaviour_cancel)
- ob.complete()
- } else {
- ob.next(obj)
- }
- },
- error: func(err Object) {
- ctx_dispose(behaviour_terminate)
- ob.error(err)
- },
- complete: func() {
- ctx_dispose(behaviour_terminate)
- ob.complete()
- },
- })
- } }
- }
|