multiple.go 577 B

12345678910111213141516171819202122232425262728
  1. package rx
  2. func (e Observable) CompleteWhen(p func(Object)(bool)) Observable {
  3. return Observable { func(sched Scheduler, ob *observer) {
  4. var ctx, ctx_dispose = ob.context.create_disposable_child()
  5. sched.run(e, &observer {
  6. context: ctx,
  7. next: func(obj Object) {
  8. if p(obj) {
  9. ctx_dispose(behaviour_cancel)
  10. ob.complete()
  11. } else {
  12. ob.next(obj)
  13. }
  14. },
  15. error: func(err Object) {
  16. ctx_dispose(behaviour_terminate)
  17. ob.error(err)
  18. },
  19. complete: func() {
  20. ctx_dispose(behaviour_terminate)
  21. ob.complete()
  22. },
  23. })
  24. } }
  25. }