promise.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package rx
  2. type Promise struct {
  3. state ReactiveEntity
  4. }
  5. type PromiseState struct {
  6. status PromiseStatus
  7. value Object
  8. }
  9. type PromiseStatus int
  10. const (
  11. pending PromiseStatus = iota
  12. resolved
  13. rejected
  14. )
  15. func CreatePromise() Promise {
  16. var init = PromiseState {
  17. status: pending,
  18. }
  19. var state = CreateReactive(init)
  20. return Promise { state }
  21. }
  22. func (p *Promise) Resolve(value Object, sched Scheduler) {
  23. _, _ = ScheduleSingle(p.state.Update(func(state_ Object) Object {
  24. var draft = state_.(PromiseState)
  25. if draft.status != pending {
  26. panic("invalid operation")
  27. }
  28. draft.status = resolved
  29. draft.value = value
  30. return draft
  31. }), sched, Background())
  32. }
  33. func (p *Promise) Reject(err Object, sched Scheduler) {
  34. _, _ = ScheduleSingle(p.state.Update(func(state_ Object) Object {
  35. var draft = state_.(PromiseState)
  36. if draft.status != pending {
  37. panic("invalid operation")
  38. }
  39. draft.status = rejected
  40. draft.value = err
  41. return draft
  42. }), sched, Background())
  43. }
  44. func (p *Promise) Outcome() Observable {
  45. return p.state.Watch().
  46. Filter(func(state_ Object) bool {
  47. var state = state_.(PromiseState)
  48. return state.status != pending
  49. }).
  50. Take(1).
  51. MergeMap(func(state_ Object) Observable {
  52. var state = state_.(PromiseState)
  53. return NewSync(func() (Object, bool) {
  54. if state.status == rejected {
  55. return state.value, false
  56. }
  57. return state.value, true
  58. })
  59. })
  60. }