mix.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package rx
  2. func Mix(actions ([] Observable), concurrent uint) Observable {
  3. if concurrent == 0 { panic("invalid concurrent amount") }
  4. return Observable { func(sched Scheduler, ob *observer) {
  5. var ctx, dispose = ob.context.create_disposable_child()
  6. var c = new_collector(ob, dispose)
  7. var q_sched = QueueSchedulerFrom(sched, concurrent)
  8. for _, item := range actions {
  9. c.new_child()
  10. q_sched.run(item, &observer {
  11. context: ctx,
  12. next: func(x Object) {
  13. c.pass(x)
  14. },
  15. error: func(e Object) {
  16. c.throw(e)
  17. },
  18. complete: func() {
  19. c.delete_child()
  20. },
  21. })
  22. }
  23. c.parent_complete()
  24. } }
  25. }
  26. func (e Observable) MixMap(f func(Object) Observable, concurrent uint) Observable {
  27. if concurrent == 0 {
  28. panic("invalid concurrent amount")
  29. }
  30. return Observable { func(sched Scheduler, ob *observer) {
  31. var ctx, dispose = ob.context.create_disposable_child()
  32. var c = new_collector(ob, dispose)
  33. var q_sched = QueueSchedulerFrom(sched, concurrent)
  34. sched.run(e, &observer {
  35. context: ctx,
  36. next: func(x Object) {
  37. var item = f(x)
  38. c.new_child()
  39. q_sched.run(item, &observer {
  40. context: ctx,
  41. next: func(x Object) {
  42. c.pass(x)
  43. },
  44. error: func(e Object) {
  45. c.throw(e)
  46. },
  47. complete: func() {
  48. c.delete_child()
  49. },
  50. })
  51. },
  52. error: func(e Object) {
  53. c.throw(e)
  54. },
  55. complete: func() {
  56. c.parent_complete()
  57. },
  58. })
  59. } }
  60. }
  61. func Concat(actions ([] Observable)) Observable {
  62. return Mix(actions, 1)
  63. }
  64. func (e Observable) ConcatMap(f func(Object) Observable) Observable {
  65. return e.MixMap(f, 1)
  66. }
  67. type QueueScheduler struct {
  68. underlying Scheduler
  69. queue *queue
  70. running uint
  71. max_running uint
  72. }
  73. func QueueSchedulerFrom(sched Scheduler, concurrent uint) *QueueScheduler {
  74. if concurrent == 0 { panic("invalid concurrent amount") }
  75. return &QueueScheduler {
  76. underlying: sched,
  77. queue: new_queue(),
  78. running: 0,
  79. max_running: concurrent,
  80. }
  81. }
  82. func (qs *QueueScheduler) run(e Observable, ob *observer) {
  83. if qs.running < qs.max_running {
  84. qs.running += 1
  85. qs.underlying.run(e, &observer {
  86. context: ob.context,
  87. next: ob.next,
  88. error: ob.error,
  89. complete: func() {
  90. ob.complete()
  91. qs.running -= 1
  92. var next_item, exists = qs.queue.pop()
  93. if exists {
  94. qs.run(next_item, ob)
  95. }
  96. },
  97. })
  98. } else {
  99. qs.queue.push(e)
  100. }
  101. }
  102. func (qs *QueueScheduler) dispatch(ev event) {
  103. qs.underlying.dispatch(ev)
  104. }
  105. func (qs *QueueScheduler) commit(t task) {
  106. qs.underlying.commit(t)
  107. }
  108. type queue [] Observable
  109. func new_queue() *queue {
  110. var q = queue(make([] Observable, 0))
  111. return &q
  112. }
  113. func (q *queue) push(e Observable) {
  114. *q = append(*q, e)
  115. }
  116. func (q *queue) pop() (Observable, bool) {
  117. if len(*q) > 0 {
  118. var e = (*q)[0]
  119. (*q)[0] = Observable { nil }
  120. *q = (*q)[1:]
  121. return e, true
  122. } else {
  123. return Observable {}, false
  124. }
  125. }