sync.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package rx
  2. const sync_did_not_complete = "An action that assumed synchronous did not complete synchronously"
  3. func runSync(action Observable, sched Scheduler, error func(Object)) (Object,bool) {
  4. var returned = Optional {}
  5. var exception = Optional {}
  6. var completed = false
  7. sched.run(action, &observer {
  8. context: Background(), // chained sync action cannot be interrupted
  9. next: func(x Object) {
  10. if returned.HasValue {
  11. panic(single_multiple_return)
  12. }
  13. returned.HasValue = true
  14. returned.Value = x
  15. },
  16. error: func(err Object) {
  17. if returned.HasValue {
  18. panic(single_unexpected_exception)
  19. }
  20. exception.HasValue = true
  21. exception.Value = err
  22. },
  23. complete: func() {
  24. if !(returned.HasValue) {
  25. panic(single_zero_return)
  26. }
  27. completed = true
  28. },
  29. })
  30. if exception.HasValue {
  31. error(exception.Value)
  32. return nil, false
  33. } else if !(completed) {
  34. panic(sync_did_not_complete)
  35. } else if !(returned.HasValue) {
  36. panic("something went wrong")
  37. } else {
  38. return returned.Value, true
  39. }
  40. }
  41. func (e Observable) SyncThen(f func(Object)(Observable)) Observable {
  42. return Observable { func(sched Scheduler, ob *observer) {
  43. var x, ok = runSync(e, sched, ob.error)
  44. if ok {
  45. var next = f(x)
  46. sched.run(next, ob)
  47. }
  48. } }
  49. }
  50. func (e Observable) ChainSync(f func(Object)(Observable)) Observable {
  51. return Observable { func(sched Scheduler, ob *observer) {
  52. var x, ok = runSync(e, sched, ob.error)
  53. if ok {
  54. var next = f(x)
  55. var y, ok = runSync(next, sched, ob.error)
  56. if ok {
  57. ob.next(y)
  58. ob.complete()
  59. }
  60. }
  61. } }
  62. }
  63. func (e Observable) TakeOneAsSingleAssumeSync() Observable {
  64. return Observable { func(sched Scheduler, ob *observer) {
  65. var ctx, ctx_dispose = ob.context.create_disposable_child()
  66. var completed = false
  67. sched.run(e, &observer {
  68. context: ctx,
  69. next: func(val Object) {
  70. ctx_dispose(behaviour_cancel)
  71. ob.next(Optional { true, val })
  72. ob.complete()
  73. },
  74. error: func(err Object) {
  75. ctx_dispose(behaviour_terminate)
  76. ob.error(err)
  77. },
  78. complete: func() {
  79. ctx_dispose(behaviour_terminate)
  80. ob.next(Optional {} )
  81. ob.complete()
  82. completed = true
  83. },
  84. })
  85. if !(completed) {
  86. panic(sync_did_not_complete)
  87. }
  88. } }
  89. }