basic.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package rx
  2. func (e Observable) StartWith(obj Object) Observable {
  3. return Observable { func(sched Scheduler, ob *observer) {
  4. ob.next(obj)
  5. sched.run(e, ob)
  6. } }
  7. }
  8. func (e Observable) Map(f func(Object)(Object)) Observable {
  9. return Observable { func(sched Scheduler, ob *observer) {
  10. sched.run(e, &observer {
  11. context: ob.context,
  12. next: func(val Object) {
  13. ob.next(f(val))
  14. },
  15. error: ob.error,
  16. complete: ob.complete,
  17. })
  18. } }
  19. }
  20. func (e Observable) Filter(f func(Object)(bool)) Observable {
  21. return Observable { func(sched Scheduler, ob *observer) {
  22. sched.run(e, &observer {
  23. context: ob.context,
  24. next: func(val Object) {
  25. if (f(val)) {
  26. ob.next(val)
  27. }
  28. },
  29. error: ob.error,
  30. complete: ob.complete,
  31. })
  32. }}
  33. }
  34. func (e Observable) FilterMap(f func(Object)(Object,bool)) Observable {
  35. return Observable { func(sched Scheduler, ob *observer) {
  36. sched.run(e, &observer {
  37. context: ob.context,
  38. next: func(val Object) {
  39. var mapped, ok = f(val)
  40. if ok {
  41. ob.next(mapped)
  42. }
  43. },
  44. error: ob.error,
  45. complete: ob.complete,
  46. })
  47. } }
  48. }
  49. func (e Observable) Reduce(f func(Object,Object)Object, init Object) Observable {
  50. return Observable { func(sched Scheduler, ob *observer) {
  51. var acc = init
  52. sched.run(e, &observer {
  53. context: ob.context,
  54. next: func(val Object) {
  55. acc = f(acc, val)
  56. },
  57. error: ob.error,
  58. complete: func() {
  59. ob.next(acc)
  60. ob.complete()
  61. },
  62. })
  63. } }
  64. }
  65. func (e Observable) Scan(f func(Object,Object)Object, init Object) Observable {
  66. return Observable { func(sched Scheduler, ob *observer) {
  67. var acc = init
  68. sched.run(e, &observer {
  69. context: ob.context,
  70. next: func(val Object) {
  71. acc = f(acc, val)
  72. ob.next(acc)
  73. },
  74. error: ob.error,
  75. complete: ob.complete,
  76. })
  77. } }
  78. }
  79. func (e Observable) Take(amount uint) Observable {
  80. if amount == 0 { panic("take: invalid amount 0") }
  81. return Observable { func(sched Scheduler, ob *observer) {
  82. var ctx, ctx_dispose = ob.context.create_disposable_child()
  83. var taken = uint(0)
  84. sched.run(e, &observer {
  85. context: ctx,
  86. next: func(val Object) {
  87. ob.next(val)
  88. taken += 1
  89. if taken == amount {
  90. ctx_dispose(behaviour_cancel)
  91. ob.complete()
  92. }
  93. },
  94. error: func(err Object) {
  95. ctx_dispose(behaviour_terminate)
  96. ob.error(err)
  97. },
  98. complete: func() {
  99. ctx_dispose(behaviour_terminate)
  100. },
  101. })
  102. } }
  103. }
  104. func (e Observable) DiscardComplete() Observable {
  105. return Observable { func(sched Scheduler, ob *observer) {
  106. sched.run(e, &observer {
  107. context: ob.context,
  108. next: ob.next,
  109. error: ob.error,
  110. complete: func() {
  111. // no-op
  112. },
  113. })
  114. } }
  115. }