wired.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package rx
  2. /**
  3. * FRP with wired components
  4. * Sink :> Bus(aka Subject) >: Reactive(aka BehaviorSubject)
  5. * transformations:
  6. * Sink[A] --> adapt[B->A] --> Sink[B]
  7. * Reactive[A] --> adapt[A->B->A] --> Sink[B]
  8. * Reactive[A] --> morph[A->B->A,A->B] --> Reactive[B]
  9. * Reactive[(A,B)] --> project[A] --> Reactive[A]
  10. * Reactive[(A,B)] --> project[B] --> Reactive[B]
  11. * Reactive[A|B] --> branch[A] --> Reactive[A] (restricted read/update operation)
  12. * Reactive[A|B] --> branch[B] --> Reactive[B] (restricted read/update operation)
  13. */
  14. // Sink accepts values
  15. type Sink interface {
  16. Emit(obj Object) Observable
  17. }
  18. // Bus accepts and provides values
  19. type Bus interface {
  20. Sink
  21. Watch() Observable
  22. }
  23. func DistinctWatch(b Bus, eq func(Object,Object)(bool)) Observable {
  24. return b.Watch().DistinctUntilChanged(eq)
  25. }
  26. // Reactive accepts and provides values, while holding a current value
  27. type Reactive interface {
  28. Bus
  29. Read() Observable
  30. Update(f func(old_state Object)(Object)) Observable
  31. }
  32. // ReactiveEntity is a Reactive that is NOT derived from another Reactive
  33. type ReactiveEntity = *ReactiveImpl
  34. // Operators
  35. func Connect(source Observable, sink Sink) Observable {
  36. return source.ConcatMap(func(value Object) Observable {
  37. return sink.Emit(value)
  38. }).WaitComplete()
  39. }
  40. func SinkAdapt(sink Sink, adapter (func(Object) Object)) Sink {
  41. return &AdaptedSink {
  42. base: sink,
  43. adapter: adapter,
  44. }
  45. }
  46. func ReactiveAdapt(r Reactive, in (func(Object) (func(Object) Object))) Sink {
  47. return &AdaptedReactive {
  48. base: r,
  49. in: in,
  50. }
  51. }
  52. func ReactiveMorph (
  53. r Reactive,
  54. in (func(Object) (func(Object) Object)),
  55. out (func(Object) Object),
  56. ) Reactive {
  57. return &MorphedReactive {
  58. AdaptedReactive: &AdaptedReactive {
  59. base: r,
  60. in: in,
  61. },
  62. out: out,
  63. }
  64. }
  65. func ReactiveBranch (
  66. r Reactive,
  67. in (func(Object) Object),
  68. out (func(Object) (Object,bool)),
  69. ) Reactive {
  70. return &FilterMappedReactive {
  71. AdaptedReactive: &AdaptedReactive {
  72. base: r,
  73. in: func(_ Object) func(Object) Object {
  74. return in
  75. },
  76. },
  77. out: out,
  78. }
  79. }
  80. // Operators Implementations
  81. type AdaptedSink struct {
  82. base Sink
  83. adapter func(Object) Object
  84. }
  85. func (a *AdaptedSink) Emit(obj Object) Observable {
  86. return a.base.Emit(a.adapter(obj))
  87. }
  88. type AdaptedReactive struct {
  89. base Reactive
  90. in func(Object) (func(Object) Object)
  91. }
  92. func (a *AdaptedReactive) Emit(obj Object) Observable {
  93. return a.base.Update(func(old_state Object) Object {
  94. return a.in(old_state)(obj)
  95. })
  96. }
  97. type MorphedReactive struct {
  98. *AdaptedReactive
  99. out func(Object) Object
  100. }
  101. func (m *MorphedReactive) Watch() Observable {
  102. return m.base.Watch().Map(m.out)
  103. }
  104. func (m *MorphedReactive) Read() Observable {
  105. return m.base.Read().Map(m.out)
  106. }
  107. func (m *MorphedReactive) Update(f (func(Object) Object)) Observable {
  108. return m.base.Update(func(obj Object) Object {
  109. return m.in(obj)(f(m.out(obj)))
  110. })
  111. }
  112. type FilterMappedReactive struct {
  113. *AdaptedReactive
  114. out func(Object) (Object, bool)
  115. }
  116. func (m *FilterMappedReactive) Watch() Observable {
  117. return m.base.Watch().FilterMap(m.out)
  118. }
  119. func (m *FilterMappedReactive) Read() Observable {
  120. return m.base.Read().Map(func(current Object) Object {
  121. var current_out, ok = m.out(current)
  122. if ok {
  123. return current_out
  124. } else {
  125. panic("FilterMappedReactive: invalid read operation")
  126. }
  127. })
  128. }
  129. func (m *FilterMappedReactive) Update(f (func(Object) Object)) Observable {
  130. return m.base.Update(func(current Object) Object {
  131. var current_out, ok = m.out(current)
  132. if ok {
  133. return m.in(current)(f(current_out))
  134. } else {
  135. panic("FilterMappedReactive: invalid update operation")
  136. }
  137. })
  138. }
  139. // Trivial Sink: BlackHole and Callback
  140. type BlackHole struct{}
  141. func (_ BlackHole) Emit(_ Object) Observable {
  142. return NewConstant(nil)
  143. }
  144. type Callback func(Object) Observable
  145. func (cb Callback) Emit(obj Object) Observable {
  146. return cb(obj)
  147. }
  148. // Basic Implementations of Bus[T] and Reactive[T]
  149. type BusImpl struct {
  150. next_id uint64
  151. watchers [] Watcher // first in, first notified
  152. index map[uint64] uint // id --> position in watchers
  153. }
  154. type Watcher struct {
  155. Notify func(Object)
  156. }
  157. func CreateBus() *BusImpl {
  158. return &BusImpl {
  159. next_id: 0,
  160. watchers: make([] Watcher, 0),
  161. index: make(map[uint64] uint),
  162. }
  163. }
  164. func (b *BusImpl) Watch() Observable {
  165. return NewSubscription(func(next func(Object)) func() {
  166. var id = b.addWatcher(Watcher {
  167. Notify: next,
  168. })
  169. return func() {
  170. b.removeWatcher(id)
  171. }
  172. })
  173. }
  174. func (b *BusImpl) Emit(obj Object) Observable {
  175. return NewSync(func() (Object, bool) {
  176. b.notify(obj)
  177. return nil, true
  178. })
  179. }
  180. func (b *BusImpl) notify(obj Object) {
  181. for _, w := range b.copyWatcher() {
  182. w.Notify(obj)
  183. }
  184. }
  185. func (b *BusImpl) copyWatcher() ([] Watcher) {
  186. var the_copy = make([] Watcher, len(b.watchers))
  187. copy(the_copy, b.watchers)
  188. return the_copy
  189. }
  190. func (b *BusImpl) addWatcher(w Watcher) uint64 {
  191. var id = b.next_id
  192. var pos = uint(len(b.watchers))
  193. b.watchers = append(b.watchers, w)
  194. b.index[id] = pos
  195. b.next_id = (id + 1)
  196. return id
  197. }
  198. func (b *BusImpl) removeWatcher(id uint64) {
  199. var pos, exists = b.index[id]
  200. if !exists { panic("cannot remove absent listener") }
  201. // update index
  202. delete(b.index, id)
  203. for current, _ := range b.index {
  204. if current > id {
  205. // position left shifted
  206. b.index[current] -= 1
  207. }
  208. }
  209. // update queue
  210. b.watchers[pos] = Watcher {}
  211. var L = uint(len(b.watchers))
  212. if !(L >= 1) { panic("something went wrong") }
  213. for i := pos; i < (L-1); i += 1 {
  214. b.watchers[i] = b.watchers[i + 1]
  215. }
  216. b.watchers[L-1] = Watcher {}
  217. b.watchers = b.watchers[:L-1]
  218. }
  219. type ReactiveImpl struct {
  220. bus *BusImpl
  221. value Object
  222. }
  223. func CreateReactive(init Object) ReactiveEntity {
  224. return &ReactiveImpl {
  225. bus: CreateBus(),
  226. value: init,
  227. }
  228. }
  229. func (r *ReactiveImpl) commit(new_value Object) {
  230. r.value = new_value
  231. r.bus.notify(new_value)
  232. }
  233. func (r *ReactiveImpl) Watch() Observable {
  234. return NewSubscription(func(next func(Object)) func() {
  235. next(r.value)
  236. var w = r.bus.addWatcher(Watcher {
  237. Notify: func(obj Object) {
  238. next(obj)
  239. },
  240. })
  241. return func() {
  242. r.bus.removeWatcher(w)
  243. }
  244. })
  245. }
  246. func (r *ReactiveImpl) Read() Observable {
  247. return NewSync(func() (Object, bool) {
  248. return r.value, true
  249. })
  250. }
  251. func (r *ReactiveImpl) Emit(new_value Object) Observable {
  252. return NewSync(func() (Object, bool) {
  253. r.commit(new_value)
  254. return nil, true
  255. })
  256. }
  257. func (r *ReactiveImpl) Update(f (func(Object) Object)) Observable {
  258. return NewSync(func() (Object, bool) {
  259. var old_value = r.value
  260. var new_value = f(old_value)
  261. r.commit(new_value)
  262. return nil, true
  263. })
  264. }