single.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package rx
  2. const single_multiple_return = "An action that assumed to be a single-valued action emitted multiple values"
  3. const single_zero_return = "An action that assumed to be a single-valued action completed with zero values emitted"
  4. const single_unexpected_exception = "An action that assumed to be a single-valued action produced an unexpected exception"
  5. func ScheduleSingle(e Observable, sched Scheduler, ctx *Context) (Optional, bool) {
  6. var chan_ret = make(chan Object)
  7. var chan_err = make(chan Object)
  8. sched.commit(func() {
  9. var returned = false
  10. var returned_value Object
  11. sched.run(e, &observer {
  12. context: ctx,
  13. next: func(obj Object) {
  14. if returned {
  15. panic(single_multiple_return)
  16. }
  17. returned = true
  18. returned_value = obj
  19. },
  20. error: func(err Object) {
  21. if returned {
  22. panic(single_unexpected_exception)
  23. }
  24. chan_err <- err
  25. },
  26. complete: func() {
  27. if !returned {
  28. panic(single_zero_return)
  29. }
  30. chan_ret <- returned_value
  31. },
  32. })
  33. })
  34. select {
  35. case ret := <- chan_ret:
  36. return Optional { true, ret }, true
  37. case err := <- chan_err:
  38. return Optional { false, err }, true
  39. case <- ctx.CancelSignal():
  40. return Optional {}, false
  41. }
  42. }
  43. func (e Observable) Then(f func(Object)(Observable)) Observable {
  44. return Observable { func(sched Scheduler, ob *observer) {
  45. var returned = false
  46. var returned_value Object
  47. sched.run(e, &observer {
  48. context: ob.context,
  49. next: func(x Object) {
  50. if returned {
  51. panic(single_multiple_return)
  52. }
  53. returned = true
  54. returned_value = x
  55. },
  56. error: func(e Object) {
  57. if returned {
  58. panic(single_unexpected_exception)
  59. }
  60. ob.error(e)
  61. },
  62. complete: func() {
  63. if !returned {
  64. panic(single_zero_return)
  65. }
  66. var next = f(returned_value)
  67. sched.run(next, ob)
  68. },
  69. })
  70. } }
  71. }
  72. func (e Observable) WaitComplete() Observable {
  73. return Observable { func(sched Scheduler, ob *observer) {
  74. sched.run(e, &observer {
  75. context: ob.context,
  76. next: func(_ Object) {
  77. // do nothing
  78. },
  79. error: ob.error,
  80. complete: func() {
  81. ob.next(nil)
  82. ob.complete()
  83. },
  84. })
  85. } }
  86. }
  87. func (e Observable) TakeOneAsSingle() Observable {
  88. return Observable { func(sched Scheduler, ob *observer) {
  89. var ctx, ctx_dispose = ob.context.create_disposable_child()
  90. sched.run(e, &observer {
  91. context: ctx,
  92. next: func(val Object) {
  93. ctx_dispose(behaviour_cancel)
  94. ob.next(Optional { true, val })
  95. ob.complete()
  96. },
  97. error: func(err Object) {
  98. ctx_dispose(behaviour_terminate)
  99. ob.error(err)
  100. },
  101. complete: func() {
  102. ctx_dispose(behaviour_terminate)
  103. ob.next(Optional {} )
  104. ob.complete()
  105. },
  106. })
  107. } }
  108. }