observable.go 32 KB


  1. package core
  2. import ( gctx "context"; "kumachan/standalone/ctn" )
  3. type Observable func(DataPublisher)
  4. type DataPublisher struct {
  5. eventloop *EventLoop
  6. context *context
  7. observer *observer
  8. }
  9. type DataSubscriber struct {
  10. Values chan <- Object
  11. Error chan <- error
  12. Terminate chan <- bool
  13. }
  14. type observer struct {
  15. value func(Object)
  16. error func(error)
  17. complete func()
  18. }
  19. func run(run Observable, eventloop *EventLoop, ctx *context, ob *observer) {
  20. if ctx.isDisposed() {
  21. return
  22. }
  23. var terminated = false
  24. var terminate = func() { terminated = true }
  25. var active = func() bool { return !(terminated || ctx.isDisposed()) }
  26. var proxy = &observer {
  27. value: func(v Object) { if active() { ob.value(v) } },
  28. error: func(e error) { if active() { terminate(); ob.error(e) } },
  29. complete: func() { if active() { terminate(); ob.complete() } },
  30. }
  31. run(DataPublisher {
  32. eventloop: eventloop,
  33. context: ctx,
  34. observer: proxy,
  35. })
  36. }
  37. func Run(o Observable, eventloop *EventLoop, sub DataSubscriber) {
  38. var V, E, T = sub.Values, sub.Error, sub.Terminate
  39. eventloop.addTask(func() {
  40. var ob = &observer {
  41. value: func(v Object) {
  42. if V != nil { V <- v }
  43. },
  44. error: func(e error) {
  45. if E != nil { E <- e; close(E) }
  46. if T != nil { T <- false }
  47. },
  48. complete: func() {
  49. if V != nil { close(V) }
  50. if T != nil { T <- true }
  51. },
  52. }
  53. run(o, eventloop, nil, ob)
  54. })
  55. }
  56. func (pub DataPublisher) useInheritedContext() (*context, *observer) {
  57. return pub.context, pub.observer
  58. }
  59. func (pub DataPublisher) useNewChildContext() (*context, func(), *observer) {
  60. var ctx, dispose = pub.context.createChild()
  61. return ctx, dispose, pub.observer
  62. }
  63. func (pub DataPublisher) run(o Observable, ctx *context, ob *observer) {
  64. run(o, pub.eventloop, ctx, ob)
  65. }
  66. func (pub DataPublisher) addTask(k func()) {
  67. pub.eventloop.addTask(k)
  68. }
  69. func (pub DataPublisher) addTimer(ms int, n int, ctx *context, k func()) {
  70. pub.eventloop.addTimer(ms, n, ctx, k)
  71. }
  72. func (pub DataPublisher) SyncReturn(k func()(Object,error)) {
  73. var v, e = k()
  74. if e == nil {
  75. pub.observer.value(v)
  76. pub.observer.complete()
  77. } else {
  78. pub.observer.error(e)
  79. }
  80. }
  81. func (pub DataPublisher) SyncGenerate(k func(yield func(Object))(error)) {
  82. var e = k(pub.observer.value)
  83. if e == nil {
  84. pub.observer.complete()
  85. } else {
  86. pub.observer.error(e)
  87. }
  88. }
  89. func (pub DataPublisher) AsyncContext() gctx.Context {
  90. return pub.context.goContext()
  91. }
  92. func (pub DataPublisher) AsyncThrow(e error) {
  93. pub.eventloop.addTask(func() {
  94. pub.observer.error(e)
  95. })
  96. }
  97. func (pub DataPublisher) AsyncReturn(v Object) {
  98. pub.eventloop.addTask(func() {
  99. pub.observer.value(v)
  100. pub.observer.complete()
  101. })
  102. }
  103. func (pub DataPublisher) AsyncGenerate() (func(Object),func()) {
  104. var yield = func(v Object) {
  105. pub.eventloop.addTask(func() {
  106. pub.observer.value(v)
  107. })
  108. }
  109. var complete = func() {
  110. pub.eventloop.addTask(func() {
  111. pub.observer.complete()
  112. })
  113. }
  114. return yield, complete
  115. }
  116. func ObservableSyncValue(v Object) Observable {
  117. return Observable(func(pub DataPublisher) {
  118. pub.observer.value(v)
  119. pub.observer.complete()
  120. })
  121. }
  122. func ObservableFlattenLast(o Observable) Observable {
  123. return o.Await(func(obj Object) Observable {
  124. return GetObservable(obj)
  125. })
  126. }
  127. type context struct {
  128. parent *context
  129. children [] *context
  130. disposed bool
  131. cleaners [] cleaner
  132. number uint64
  133. g_context gctx.Context
  134. g_dispose func()
  135. }
  136. type cleaner struct {
  137. clean func()
  138. number uint64
  139. }
  140. func (ctx *context) createChild() (*context, func()) {
  141. var g_ctx, g_dispose = gctx.WithCancel(gctx.Background())
  142. var parent = ctx
  143. var child = &context {
  144. parent: parent,
  145. children: make([] *context, 0),
  146. disposed: false,
  147. cleaners: make([] cleaner, 0),
  148. number: getNumber(),
  149. g_context: g_ctx,
  150. g_dispose: g_dispose,
  151. }
  152. if parent != nil {
  153. if parent.disposed { panic("something went wrong") }
  154. parent.children = append(parent.children, child)
  155. }
  156. return child, child.__dispose
  157. }
  158. func (ctx *context) __dispose() {
  159. var parent = ctx.parent
  160. var self = ctx
  161. if !(self.disposed) {
  162. if parent != nil {
  163. self.parent = nil
  164. parent.children = ctn.RemoveFrom(parent.children, self)
  165. }
  166. var contexts = make([] *context, 0)
  167. var cleaners = make([] cleaner, 0)
  168. var q = [] *context { self }
  169. for len(q) > 0 {
  170. var c = q[0]; q = q[1:]
  171. contexts = append(contexts, c)
  172. cleaners = append(cleaners, c.cleaners...)
  173. q = append(q, c.children...)
  174. }
  175. contexts, _ = ctn.StableSorted(contexts, contextSorter)
  176. cleaners, _ = ctn.StableSorted(cleaners, cleanerSorter)
  177. for _, c := range contexts {
  178. c.disposed = true
  179. }
  180. for _, c := range contexts {
  181. c.g_dispose()
  182. }
  183. for _, c := range cleaners {
  184. c.clean()
  185. }
  186. }
  187. }
  188. func contextSorter(a *context, b *context) bool { return (a.number > b.number) }
  189. func cleanerSorter(a cleaner, b cleaner) bool { return (a.number > b.number) }
  190. var numberCounter = uint64(0)
  191. func getNumber() uint64 { var n = numberCounter; numberCounter++; return n }
  192. // registerCleaner registers release operations of persistent resources
  193. func (ctx *context) registerCleaner(c func()) {
  194. if c == nil {
  195. panic("invalid argument")
  196. }
  197. if ctx != nil {
  198. if !(ctx.disposed) {
  199. ctx.cleaners = append(ctx.cleaners, cleaner {
  200. clean: c,
  201. number: getNumber(),
  202. })
  203. }
  204. }
  205. }
  206. func (ctx *context) isDisposed() bool {
  207. if ctx != nil {
  208. return ctx.disposed
  209. } else {
  210. return false
  211. }
  212. }
  213. func (ctx *context) goContext() gctx.Context {
  214. if ctx != nil {
  215. return ctx.g_context
  216. } else {
  217. return gctx.Background()
  218. }
  219. }
  220. func WithChildContext(o Observable) Observable {
  221. return Observable(func(pub DataPublisher) {
  222. var ctx, dispose, ob = pub.useNewChildContext()
  223. pub.run(o, ctx, &observer {
  224. value: ob.value,
  225. error: func(err error) {
  226. dispose()
  227. ob.error(err)
  228. },
  229. complete: func() {
  230. dispose()
  231. ob.complete()
  232. },
  233. })
  234. })
  235. }
  236. type CancelError struct {}
  237. func (CancelError) Error() string { return "cancelled" }
  238. func WithCancelTrigger(sig Observable, o Observable) Observable {
  239. return Observable(func(pub DataPublisher) {
  240. var ctx, dispose, ob = pub.useNewChildContext()
  241. pub.run(o, ctx, &observer {
  242. value: ob.value,
  243. error: func(e error) {
  244. dispose()
  245. ob.error(e)
  246. },
  247. complete: func() {
  248. dispose()
  249. ob.complete()
  250. },
  251. })
  252. pub.run(sig, ctx, &observer {
  253. value: func(_ Object) {
  254. dispose()
  255. ob.error(CancelError {})
  256. },
  257. error: func(e error) {
  258. dispose()
  259. ob.error(e)
  260. },
  261. complete: func() {
  262. dispose()
  263. ob.error(CancelError {})
  264. },
  265. })
  266. })
  267. }
  268. func WithCancelTimeout(ms int, o Observable) Observable {
  269. return WithCancelTrigger(SetTimeout(ms), o)
  270. }
  271. type Subject struct { *subjectImpl }
  272. type subjectImpl struct {
  273. observerNextId uint64
  274. observerList [] *observer
  275. observerIndex map[uint64] uint
  276. terminated bool
  277. maybeError error
  278. notifyingFlag bool
  279. recentValues [] Object
  280. runtimeHandle RuntimeHandle
  281. }
  282. func CreateSubject(h RuntimeHandle, replay int, items ...Object) Subject {
  283. if replay < 0 { replay = 0 }
  284. var b = Subject { &subjectImpl {
  285. observerNextId: 0,
  286. observerList: make([] *observer, 0),
  287. observerIndex: make(map[uint64] uint),
  288. terminated: false,
  289. maybeError: nil,
  290. notifyingFlag: false,
  291. recentValues: make([] Object, 0, replay),
  292. runtimeHandle: h,
  293. } }
  294. for _, item := range items { b.appendRecentValue(item) }
  295. return b
  296. }
  297. func (b Subject) Observe() Observable {
  298. return Observable(func(pub DataPublisher) {
  299. if b.terminated {
  300. if b.maybeError != nil {
  301. var err = b.maybeError
  302. pub.observer.error(err)
  303. } else {
  304. b.iterateRecentValues(pub.observer.value)
  305. pub.observer.complete()
  306. }
  307. return
  308. }
  309. var ctx, ob = pub.useInheritedContext()
  310. var id = b.appendObserver(ob)
  311. ctx.registerCleaner(func() { b.deleteObserver(id) })
  312. b.iterateRecentValues(pub.observer.value)
  313. })
  314. }
  315. func (b Subject) Plug(o Observable) Observable {
  316. return Observable(func(pub DataPublisher) {
  317. var ctx, ob = pub.useInheritedContext()
  318. pub.run(o, ctx, &observer {
  319. value: b.value,
  320. error: ob.error,
  321. complete: ob.complete,
  322. })
  323. })
  324. }
  325. func (b Subject) value(v Object) {
  326. if b.terminated {
  327. return
  328. }
  329. b.appendRecentValue(v)
  330. b.iterateObservers(func(ob *observer) {
  331. ob.value(v)
  332. })
  333. }
  334. func (b Subject) error(err error) {
  335. if b.terminated {
  336. return
  337. }
  338. b.terminated, b.maybeError = true, err
  339. b.iterateObservers(func(ob *observer) {
  340. ob.error(err)
  341. })
  342. }
  343. func (b Subject) complete() {
  344. if b.terminated {
  345. return
  346. }
  347. b.terminated, b.maybeError = true, nil
  348. b.iterateObservers(func(ob *observer) {
  349. ob.complete()
  350. })
  351. }
  352. func (b Subject) multicastInput() *observer {
  353. return &observer {
  354. value: b.value,
  355. error: b.error,
  356. complete: b.complete,
  357. }
  358. }
  359. func (b Subject) iterateObservers(k func(*observer)) {
  360. var L = len(b.observerList)
  361. if L > 0 {
  362. if b.notifyingFlag {
  363. var h = b.runtimeHandle
  364. Crash(h, InvariantViolation, "synchronous feedback")
  365. }
  366. b.notifyingFlag = true
  367. var snapshot = make([] *observer, L)
  368. copy(snapshot, b.observerList)
  369. for _, ob := range snapshot {
  370. k(ob)
  371. }
  372. b.notifyingFlag = false
  373. }
  374. }
  375. func (b Subject) appendObserver(ob *observer) uint64 {
  376. var id = b.observerNextId
  377. var pos = uint(len(b.observerList))
  378. b.observerList = append(b.observerList, ob)
  379. b.observerIndex[id] = pos
  380. b.observerNextId = (id + 1)
  381. return id
  382. }
  383. func (b Subject) deleteObserver(id uint64) {
  384. var pos, exists = b.observerIndex[id]
  385. if !(exists) { panic("invalid argument") }
  386. // update index
  387. delete(b.observerIndex, id)
  388. for current, _ := range b.observerIndex {
  389. if current > id {
  390. // position left shifted
  391. b.observerIndex[current] -= 1
  392. }
  393. }
  394. // update queue
  395. b.observerList[pos] = nil
  396. var L = uint(len(b.observerList))
  397. if !(L >= 1) { panic("something went wrong") }
  398. for i := pos; i < (L-1); i += 1 {
  399. b.observerList[i] = b.observerList[i + 1]
  400. }
  401. b.observerList[L-1] = nil
  402. b.observerList = b.observerList[:L-1]
  403. }
  404. func (b Subject) iterateRecentValues(f func(Object)) {
  405. var L = len(b.recentValues)
  406. if L > 0 {
  407. var snapshot = make([] Object, L)
  408. copy(snapshot, b.recentValues)
  409. for _, item := range snapshot {
  410. f(item)
  411. }
  412. }
  413. }
  414. func (b Subject) appendRecentValue(v Object) {
  415. var L = len(b.recentValues)
  416. if L < cap(b.recentValues) {
  417. b.recentValues = append(b.recentValues, v)
  418. } else if L > 0 {
  419. for i := 0; i < (L - 1); i += 1 {
  420. b.recentValues[i] = b.recentValues[i+1]
  421. }
  422. b.recentValues[L-1] = v
  423. }
  424. }
  425. func Multicast(o Observable, h RuntimeHandle) Observable {
  426. return Observable(func(pub DataPublisher) {
  427. var ctx, ob = pub.useInheritedContext()
  428. var bus = CreateSubject(h, 0)
  429. ob.value(Obj(bus.Observe()))
  430. ob.complete()
  431. pub.run(o, ctx, bus.multicastInput())
  432. })
  433. }
  434. func Loopback(k func(Observable)(Observable), h RuntimeHandle) Observable {
  435. return Observable(func(pub DataPublisher) {
  436. var ctx, ob = pub.useInheritedContext()
  437. var bus = CreateSubject(h, 0)
  438. pub.run(bus.Observe(), ctx, ob)
  439. pub.run(k(bus.Observe()), ctx, bus.multicastInput())
  440. })
  441. }
  442. func SkipSync(o Observable) Observable {
  443. return Observable(func(pub DataPublisher) {
  444. var ctx, ob = pub.useInheritedContext()
  445. var sync = true
  446. pub.run(o, ctx, &observer {
  447. value: func(v Object) {
  448. if sync {
  449. return
  450. }
  451. ob.value(v)
  452. },
  453. error: ob.error,
  454. complete: ob.complete,
  455. })
  456. sync = false
  457. })
  458. }
  459. func Now() Observable {
  460. return Observable(func(pub DataPublisher) {
  461. pub.observer.value(ObjTimeNow())
  462. pub.observer.complete()
  463. })
  464. }
  465. func SetTimeout(ms int) Observable {
  466. return Observable(func(pub DataPublisher) {
  467. var ctx, ob = pub.useInheritedContext()
  468. pub.addTimer(ms, 1, ctx, func() {
  469. ob.value(nil)
  470. ob.complete()
  471. })
  472. })
  473. }
  474. func SetInterval(ms int, n int) Observable {
  475. return Observable(func(pub DataPublisher) {
  476. var ctx, ob = pub.useInheritedContext()
  477. if n == 0 {
  478. ob.complete()
  479. return
  480. }
  481. var current = 0
  482. pub.addTimer(ms, n, ctx, func() {
  483. if (n < 0) || (current < n) {
  484. ob.value(ObjInt(current + 1))
  485. current += 1
  486. }
  487. if (n >= 0) && (current == n) {
  488. ob.complete()
  489. }
  490. })
  491. })
  492. }
  493. func ObservableSequence(forEach func(func(Observable))) Observable {
  494. return Observable(func(pub DataPublisher) {
  495. forEach(func(item Observable) {
  496. pub.observer.value(Obj(item))
  497. })
  498. pub.observer.complete()
  499. })
  500. }
  501. func (o Observable) Catch(f func(error,Observable)(Observable)) Observable {
  502. return Observable(func(pub DataPublisher) {
  503. var ctx, ob = pub.useInheritedContext()
  504. pub.run(o, ctx, &observer {
  505. error: func(err error) {
  506. pub.run(f(err,o.Catch(f)), ctx, ob)
  507. },
  508. value: ob.value,
  509. complete: ob.complete,
  510. })
  511. })
  512. }
  513. func (o Observable) Retry(limit int) Observable {
  514. return Observable(func(pub DataPublisher) {
  515. var ctx, ob = pub.useInheritedContext()
  516. var retrial = 0
  517. var proxy *observer
  518. proxy = &observer {
  519. error: func(err error) {
  520. if retrial == limit {
  521. ob.error(err)
  522. return
  523. }
  524. retrial++
  525. pub.run(o, ctx, proxy)
  526. },
  527. value: ob.value,
  528. complete: ob.complete,
  529. }
  530. pub.run(o, ctx, proxy)
  531. })
  532. }
  533. func (o Observable) LogError(h RuntimeHandle) Observable {
  534. return Observable(func(pub DataPublisher) {
  535. var ctx, ob = pub.useInheritedContext()
  536. pub.run(o, ctx, &observer {
  537. error: func(err error) {
  538. MakeLogger(h).LogError(err)
  539. ob.complete()
  540. },
  541. value: ob.value,
  542. complete: ob.complete,
  543. })
  544. })
  545. }
  546. func (o Observable) DistinctUntilChanged(equal func(Object,Object)(bool)) Observable {
  547. return Observable(func(pub DataPublisher) {
  548. var ctx, ob = pub.useInheritedContext()
  549. var previous Object
  550. var available = false
  551. pub.run(o, ctx, &observer {
  552. value: func(v Object) {
  553. if available {
  554. if equal(v, previous) {
  555. return
  556. }
  557. }
  558. previous = v
  559. available = true
  560. ob.value(v)
  561. },
  562. error: ob.error,
  563. complete: ob.complete,
  564. })
  565. })
  566. }
  567. func (o Observable) DistinctUntilObjectChanged() Observable {
  568. return o.DistinctUntilChanged(func(a Object, b Object) bool {
  569. return (a == b)
  570. })
  571. }
  572. func (o Observable) ObjectPairEqualities() Observable {
  573. return o.Map(func(obj Object) Object {
  574. var a, b = GetPair(obj)
  575. return ObjBool(a == b)
  576. }).DistinctUntilChanged(func(p Object, q Object) bool {
  577. return (GetBool(p) == GetBool(q))
  578. })
  579. }
  580. func (o Observable) WithLatestFrom(another Observable) Observable {
  581. return Observable(func(pub DataPublisher) {
  582. var ctx, dispose, ob = pub.useNewChildContext()
  583. var attached Object
  584. var available = false
  585. pub.run(another, ctx, &observer {
  586. value: func(v Object) {
  587. attached = v
  588. available = true
  589. },
  590. error: func(err error) {
  591. dispose()
  592. ob.error(err)
  593. },
  594. complete: func() {},
  595. })
  596. pub.run(o, ctx, &observer {
  597. value: func(v Object) {
  598. if available {
  599. ob.value(ObjPair(v, attached))
  600. }
  601. },
  602. error: func(err error) {
  603. dispose()
  604. ob.error(err)
  605. },
  606. complete: func() {
  607. dispose()
  608. ob.complete()
  609. },
  610. })
  611. })
  612. }
  613. func (o Observable) MapToLatestFrom(another Observable) Observable {
  614. return o.WithLatestFrom(another).Map(func(obj Object) Object {
  615. var _, latest = GetPair(obj)
  616. return latest
  617. })
  618. }
  619. func (o Observable) WithCycle(l List) Observable {
  620. return Observable(func(pub DataPublisher) {
  621. if l.Empty() {
  622. pub.observer.complete()
  623. return
  624. }
  625. var ctx, ob = pub.useInheritedContext()
  626. var node = l.Head
  627. pub.run(o, ctx, &observer {
  628. value: func(v Object) {
  629. ob.value(ObjPair(v, node.Value))
  630. node = node.Next
  631. if node == nil { node = l.Head }
  632. },
  633. error: ob.error,
  634. complete: ob.complete,
  635. })
  636. })
  637. }
  638. func (o Observable) WithIndex() Observable {
  639. return Observable(func(pub DataPublisher) {
  640. var ctx, ob = pub.useInheritedContext()
  641. var i = 0
  642. pub.run(o, ctx, &observer {
  643. value: func(v Object) {
  644. ob.value(ObjPair(v, ObjInt(i)))
  645. i += 1
  646. },
  647. error: ob.error,
  648. complete: ob.complete,
  649. })
  650. })
  651. }
  652. func (o Observable) WithTime() Observable {
  653. return Observable(func(pub DataPublisher) {
  654. var ctx, ob = pub.useInheritedContext()
  655. pub.run(o, ctx, &observer {
  656. value: func(v Object) {
  657. ob.value(ObjPair(v, ObjTimeNow()))
  658. },
  659. error: ob.error,
  660. complete: ob.complete,
  661. })
  662. })
  663. }
  664. func (o Observable) DelayRun(ms int) Observable {
  665. return SetTimeout(ms).Then(o)
  666. }
  667. func (o Observable) DelayValues(ms int) Observable {
  668. return o.MergeMap(func(v Object) Observable {
  669. return SetTimeout(ms).MapTo(v)
  670. })
  671. }
  672. func (o Observable) StartWith(first Object) Observable {
  673. return Observable(func(pub DataPublisher) {
  674. var ctx, ob = pub.useInheritedContext()
  675. ob.value(first)
  676. pub.run(o, ctx, ob)
  677. })
  678. }
  679. func (o Observable) EndWith(last Object) Observable {
  680. return Observable(func(pub DataPublisher) {
  681. var ctx, ob = pub.useInheritedContext()
  682. pub.run(o, ctx, &observer {
  683. value: ob.value,
  684. error: ob.error,
  685. complete: func() {
  686. ob.value(last)
  687. ob.complete()
  688. },
  689. })
  690. })
  691. }
  692. func (o Observable) Throttle(f func(Object)(Observable)) Observable {
  693. return o.ExhaustMap(func(v Object) Observable {
  694. return f(v).CompleteOnEmit().StartWith(v)
  695. })
  696. }
  697. func (o Observable) Debounce(f func(Object)(Observable)) Observable {
  698. return o.SwitchMap(func(v Object) Observable {
  699. return f(v).CompleteOnEmit().EndWith(v)
  700. })
  701. }
  702. func (o Observable) ThrottleTime(ms int) Observable {
  703. return o.Throttle(func(_ Object) Observable {
  704. return SetTimeout(ms)
  705. })
  706. }
  707. func (o Observable) DebounceTime(ms int) Observable {
  708. return o.Debounce(func(_ Object) Observable {
  709. return SetTimeout(ms)
  710. })
  711. }
  712. func (o Observable) CompleteOnEmit() Observable {
  713. return Observable(func(pub DataPublisher) {
  714. var ctx, dispose, ob = pub.useNewChildContext()
  715. pub.run(o, ctx, &observer {
  716. value: func(v Object) {
  717. dispose()
  718. ob.complete()
  719. },
  720. error: func(e error) {
  721. dispose()
  722. ob.error(e)
  723. },
  724. complete: func() {
  725. dispose()
  726. ob.complete()
  727. },
  728. })
  729. })
  730. }
  731. func (o Observable) Skip(n int) Observable {
  732. if n <= 0 {
  733. return o
  734. }
  735. return Observable(func(pub DataPublisher) {
  736. var ctx, dispose, ob = pub.useNewChildContext()
  737. var i = 0
  738. pub.run(o, ctx, &observer {
  739. value: func(v Object) {
  740. if i >= n {
  741. ob.value(v)
  742. }
  743. i += 1
  744. },
  745. error: func(err error) {
  746. dispose()
  747. ob.error(err)
  748. },
  749. complete: func() {
  750. dispose()
  751. ob.complete()
  752. },
  753. })
  754. })
  755. }
  756. func (o Observable) Take(limit int) Observable {
  757. if limit <= 0 {
  758. return Observable(func(pub DataPublisher) {
  759. pub.observer.complete()
  760. })
  761. }
  762. return Observable(func(pub DataPublisher) {
  763. var ctx, dispose, ob = pub.useNewChildContext()
  764. var i = 0
  765. pub.run(o, ctx, &observer {
  766. value: func(v Object) {
  767. ob.value(v)
  768. i += 1
  769. if i == limit {
  770. dispose()
  771. ob.complete()
  772. }
  773. },
  774. error: func(err error) {
  775. dispose()
  776. ob.error(err)
  777. },
  778. complete: func() {
  779. dispose()
  780. ob.complete()
  781. },
  782. })
  783. })
  784. }
  785. func (o Observable) TakeLast() Observable {
  786. return Observable(func(pub DataPublisher) {
  787. var ctx, ob = pub.useInheritedContext()
  788. var last Object
  789. var available = false
  790. pub.run(o, ctx, &observer {
  791. error: ob.error,
  792. value: func(v Object) {
  793. last = v
  794. available = true
  795. },
  796. complete: func() {
  797. if available {
  798. ob.value(last)
  799. }
  800. ob.complete()
  801. },
  802. })
  803. })
  804. }
  805. func (o Observable) TakeWhile(f func(Object)(bool)) Observable {
  806. return Observable(func(pub DataPublisher) {
  807. var ctx, dispose, ob = pub.useNewChildContext()
  808. pub.run(o, ctx, &observer {
  809. value: func(v Object) {
  810. if f(v) {
  811. ob.value(v)
  812. } else {
  813. dispose()
  814. ob.complete()
  815. }
  816. },
  817. error: func(err error) {
  818. dispose()
  819. ob.error(err)
  820. },
  821. complete: func() {
  822. dispose()
  823. ob.complete()
  824. },
  825. })
  826. })
  827. }
  828. func (o Observable) TakeUntil(stop Observable) Observable {
  829. return Observable(func(pub DataPublisher) {
  830. var ctx, dispose, ob = pub.useNewChildContext()
  831. pub.run(o, ctx, &observer {
  832. value: ob.value,
  833. error: func(err error) {
  834. dispose()
  835. ob.error(err)
  836. },
  837. complete: func() {
  838. dispose()
  839. ob.complete()
  840. },
  841. })
  842. var stop_ctx, stop_dispose = ctx.createChild()
  843. pub.run(stop, stop_ctx, &observer {
  844. value: func(_ Object) {
  845. dispose()
  846. ob.complete()
  847. },
  848. error: func(err error) {
  849. dispose()
  850. ob.error(err)
  851. },
  852. complete: func() {
  853. stop_dispose()
  854. },
  855. })
  856. })
  857. }
  858. func (o Observable) Count() Observable {
  859. return Observable(func(pub DataPublisher) {
  860. var ctx, ob = pub.useInheritedContext()
  861. var count = 0
  862. pub.run(o, ctx, &observer {
  863. error: ob.error,
  864. value: func(_ Object) {
  865. count++
  866. },
  867. complete: func() {
  868. ob.value(ObjInt(count))
  869. ob.complete()
  870. },
  871. })
  872. })
  873. }
  874. func (o Observable) Collect() Observable {
  875. return Observable(func(pub DataPublisher) {
  876. var ctx, ob = pub.useInheritedContext()
  877. var buf ListBuilder
  878. pub.run(o, ctx, &observer {
  879. error: ob.error,
  880. value: func(v Object) {
  881. buf.Append(v)
  882. },
  883. complete: func() {
  884. ob.value(Obj(buf.Collect()))
  885. ob.complete()
  886. },
  887. })
  888. })
  889. }
  890. func (o Observable) BufferTime(ms int) Observable {
  891. return Observable(func(pub DataPublisher) {
  892. var ctx, dispose, ob = pub.useNewChildContext()
  893. var buf ListBuilder
  894. var renew = func() Object {
  895. var l = buf.Collect()
  896. buf = ListBuilder {}
  897. return Obj(l)
  898. }
  899. pub.run(o, ctx, &observer {
  900. value: func(v Object) {
  901. buf.Append(v)
  902. },
  903. error: func(err error) {
  904. dispose()
  905. ob.error(err)
  906. },
  907. complete: func() {
  908. dispose()
  909. ob.value(renew())
  910. ob.complete()
  911. },
  912. })
  913. pub.run(SetInterval(ms, -1), ctx, &observer {
  914. value: func(_ Object) {
  915. ob.value(renew())
  916. },
  917. error: func(_ error) { panic("something went wrong") },
  918. complete: func() { panic("something went wrong") },
  919. })
  920. })
  921. }
  922. func (o Observable) Pairwise() Observable {
  923. return o.BufferCount(2).Map(QueueToPair)
  924. }
  925. func (o Observable) BufferCount(n int) Observable {
  926. return Observable(func(pub DataPublisher) {
  927. var ctx, ob = pub.useInheritedContext()
  928. var sb = createSlidingBuffer(n)
  929. pub.run(o, ctx, &observer {
  930. value: func(v Object) {
  931. var buf, ok = sb.append(v)
  932. if ok {
  933. ob.value(buf)
  934. }
  935. },
  936. error: ob.error,
  937. complete: ob.complete,
  938. })
  939. })
  940. }
  941. type slidingBuffer struct {
  942. size int
  943. mq ctn.MutQueue[Object]
  944. }
  945. func createSlidingBuffer(size int) *slidingBuffer {
  946. if size < 1 {
  947. size = 1
  948. }
  949. return &slidingBuffer {
  950. size: size,
  951. mq: ctn.MakeMutQueue[Object](),
  952. }
  953. }
  954. func (sb *slidingBuffer) append(v Object) (Object, bool) {
  955. sb.mq.Append(v)
  956. var diff = (sb.mq.Size() - sb.size)
  957. if diff < 0 {
  958. return nil, false
  959. } else if diff == 0 {
  960. var buf = ObjQueue(sb.mq.Queue())
  961. return buf, true
  962. } else {
  963. sb.mq.Shift()
  964. var buf = ObjQueue(sb.mq.Queue())
  965. return buf, true
  966. }
  967. }
  968. func (o Observable) Map(f func(Object)(Object)) Observable {
  969. return Observable(func(pub DataPublisher) {
  970. var ctx, ob = pub.useInheritedContext()
  971. pub.run(o, ctx, &observer {
  972. value: func(v Object) { ob.value(f(v)) },
  973. error: ob.error,
  974. complete: ob.complete,
  975. })
  976. })
  977. }
  978. func (o Observable) MapTo(v Object) Observable {
  979. return o.Map(func(_ Object)(Object) { return v })
  980. }
  981. func (o Observable) Filter(f func(Object)(bool)) Observable {
  982. return Observable(func(pub DataPublisher) {
  983. var ctx, ob = pub.useInheritedContext()
  984. pub.run(o, ctx, &observer {
  985. value: func(v Object) { if f(v) { ob.value(v) } },
  986. error: ob.error,
  987. complete: ob.complete,
  988. })
  989. })
  990. }
  991. func (o Observable) Scan(seed Object, f func(Object,Object)(Object)) Observable {
  992. return Observable(func(pub DataPublisher) {
  993. var ctx, ob = pub.useInheritedContext()
  994. var current = seed
  995. pub.run(o, ctx, &observer {
  996. value: func(v Object) {
  997. current = f(current, v)
  998. ob.value(current)
  999. },
  1000. error: ob.error,
  1001. complete: ob.complete,
  1002. })
  1003. })
  1004. }
  1005. func (o Observable) Reduce(initial Object, f func(Object,Object)(Object)) Observable {
  1006. return o.Scan(initial, f).StartWith(initial)
  1007. }
  1008. func CombineLatest(all ...Observable) Observable {
  1009. return Observable(func(pub DataPublisher) {
  1010. if len(all) == 0 {
  1011. pub.observer.complete()
  1012. return
  1013. }
  1014. var ctx, dispose, ob = pub.useNewChildContext()
  1015. var vv = createValueVector(len(all))
  1016. var completed = 0
  1017. for i_, o := range all {
  1018. var index = i_
  1019. pub.run(o, ctx, &observer {
  1020. value: func(v Object) {
  1021. vv.SetItem(index, v)
  1022. if l, ok := vv.GetList(); ok {
  1023. ob.value(l)
  1024. }
  1025. },
  1026. error: func(err error) {
  1027. dispose()
  1028. ob.error(err)
  1029. },
  1030. complete: func() {
  1031. completed++
  1032. if completed == len(all) {
  1033. dispose()
  1034. ob.complete()
  1035. }
  1036. },
  1037. })
  1038. }
  1039. })
  1040. }
  1041. func (o Observable) CombineLatest(another Observable) Observable {
  1042. return CombineLatest(o, another).Map(ListToPair)
  1043. }
  1044. type valueVector struct {
  1045. values [] Object
  1046. available [] bool
  1047. }
  1048. func createValueVector(size int) *valueVector {
  1049. return &valueVector {
  1050. values: make([] Object, size),
  1051. available: make([] bool, size),
  1052. }
  1053. }
  1054. func (vv *valueVector) SetItem(index int, value Object) {
  1055. vv.values[index] = value
  1056. if vv.available != nil {
  1057. vv.available[index] = true
  1058. }
  1059. }
  1060. func (vv *valueVector) GetList() (Object, bool) {
  1061. var ok = true
  1062. if vv.available != nil {
  1063. for _, item_ok := range vv.available {
  1064. if !(item_ok) {
  1065. ok = false
  1066. }}
  1067. if ok { vv.available = nil }
  1068. }
  1069. if ok {
  1070. return ObjList(vv.values), true
  1071. } else {
  1072. return nil, false
  1073. }
  1074. }
  1075. func (o Observable) Await(k func(Object)(Observable)) Observable {
  1076. return Observable(func(pub DataPublisher) {
  1077. var ctx, ob = pub.useInheritedContext()
  1078. var current Object
  1079. var ok = false
  1080. pub.run(o, ctx, &observer {
  1081. error: ob.error,
  1082. value: func(v Object) {
  1083. current = v
  1084. ok = true
  1085. },
  1086. complete: func() {
  1087. if ok {
  1088. var last = current
  1089. pub.run(k(last), ctx, ob)
  1090. } else {
  1091. ob.complete()
  1092. }
  1093. },
  1094. })
  1095. })
  1096. }
  1097. func (o Observable) AwaitNoexcept(h RuntimeHandle, k func(Object)(Observable)) Observable {
  1098. return Observable(func(pub DataPublisher) {
  1099. var ctx, ob = pub.useInheritedContext()
  1100. pub.run(o, ctx, awaitNoexceptObserver(ob, h, func(obj Object) {
  1101. pub.run(k(obj), ctx, ob)
  1102. }))
  1103. })
  1104. }
  1105. func awaitNoexceptObserver(ob *observer, h RuntimeHandle, k func(Object)) *observer {
  1106. var value, ok = Object(nil), false
  1107. return &observer {
  1108. value: func(v Object) {
  1109. value = v
  1110. ok = true
  1111. },
  1112. error: func(err error) {
  1113. Crash(h, InvariantViolation, ("unexpected error: " + err.Error()))
  1114. },
  1115. complete: func() {
  1116. if ok {
  1117. k(value)
  1118. } else {
  1119. ob.complete()
  1120. }
  1121. },
  1122. }
  1123. }
  1124. func (o Observable) Then(k Observable) Observable {
  1125. return Observable(func(pub DataPublisher) {
  1126. var ctx, ob = pub.useInheritedContext()
  1127. pub.run(o, ctx, &observer {
  1128. error: ob.error,
  1129. value: func(_ Object) {},
  1130. complete: func() { pub.run(k, ctx, ob) },
  1131. })
  1132. })
  1133. }
  1134. func (o Observable) With(bg Observable, log func(error)) Observable {
  1135. return Observable(func(pub DataPublisher) {
  1136. var ctx, ob = pub.useInheritedContext()
  1137. pub.run(bg, ctx, &observer {
  1138. error: log,
  1139. value: func(_ Object) {},
  1140. complete: func() {},
  1141. })
  1142. pub.run(o, ctx, ob)
  1143. })
  1144. }
  1145. func (o Observable) And(bg Observable, log func(error)) Observable {
  1146. return Observable(func(pub DataPublisher) {
  1147. var ctx, ob = pub.useInheritedContext()
  1148. pub.run(o, ctx, ob)
  1149. pub.run(bg, ctx, &observer {
  1150. error: log,
  1151. value: func(_ Object) {},
  1152. complete: func() {},
  1153. })
  1154. })
  1155. }
  1156. func Merge(forEach func(func(Observable))) Observable {
  1157. return ObservableSequence(forEach).MergeMap(GetObservable)
  1158. }
  1159. func (o Observable) MergeMap(f func(Object)(Observable)) Observable {
  1160. return o.MergeMap2(f, false)
  1161. }
  1162. func (o Observable) MergeMap2(f func(Object)(Observable), auto_stop bool) Observable {
  1163. return Observable(func(pub DataPublisher) {
  1164. var ctx, dispose, ob = pub.useNewChildContext()
  1165. var proxy = createMergeProxy(ob, dispose)
  1166. if auto_stop {
  1167. proxy.outerClosed = true
  1168. }
  1169. pub.run(o, ctx, &observer {
  1170. value: func(v Object) {
  1171. var inner = f(v)
  1172. proxy.innerStart()
  1173. pub.run(inner, ctx, &observer {
  1174. value: proxy.pass,
  1175. error: proxy.abort,
  1176. complete: proxy.innerExit,
  1177. })
  1178. },
  1179. error: proxy.abort,
  1180. complete: proxy.outerClose,
  1181. })
  1182. })
  1183. }
  1184. type mergeProxy struct {
  1185. observer *observer
  1186. ctxDispose func()
  1187. innerRunning uint
  1188. outerClosed bool
  1189. }
  1190. func createMergeProxy(ob *observer, dispose func()) *mergeProxy {
  1191. return &mergeProxy {
  1192. observer: ob,
  1193. ctxDispose: dispose,
  1194. innerRunning: 0,
  1195. outerClosed: false,
  1196. }
  1197. }
  1198. func (p *mergeProxy) pass(x Object) {
  1199. p.observer.value(x)
  1200. }
  1201. func (p *mergeProxy) abort(e error) {
  1202. p.observer.error(e)
  1203. p.ctxDispose()
  1204. }
  1205. func (p *mergeProxy) innerStart() {
  1206. p.innerRunning++
  1207. }
  1208. func (p *mergeProxy) innerExit() {
  1209. if p.innerRunning == 0 { panic("something went wrong") }
  1210. p.innerRunning--
  1211. if p.innerRunning == 0 && p.outerClosed {
  1212. p.observer.complete()
  1213. p.ctxDispose()
  1214. }
  1215. }
  1216. func (p *mergeProxy) outerClose() {
  1217. p.outerClosed = true
  1218. if p.innerRunning == 0 {
  1219. p.observer.complete()
  1220. p.ctxDispose()
  1221. }
  1222. }
  1223. func Concat(forEach func(func(Observable))) Observable {
  1224. return ObservableSequence(forEach).ConcatMap(GetObservable)
  1225. }
  1226. func Concurrent(limit int, forEach func(func(Observable))) Observable {
  1227. return ObservableSequence(forEach).ConcurrentMap(limit, GetObservable)
  1228. }
  1229. func (o Observable) ConcatMap(f func(Object)(Observable)) Observable {
  1230. return o.ConcurrentMap(1, f)
  1231. }
  1232. func (o Observable) ConcurrentMap(limit int, f func(Object)(Observable)) Observable {
  1233. return Observable(func(pub DataPublisher) {
  1234. var ctx, dispose, ob = pub.useNewChildContext()
  1235. var proxy = createMergeProxy(ob, dispose)
  1236. var queue = createRunQueue(limit, pub, ctx, &observer {
  1237. value: proxy.pass,
  1238. error: proxy.abort,
  1239. complete: proxy.innerExit,
  1240. })
  1241. pub.run(o, ctx, &observer {
  1242. value: func(v Object) {
  1243. var inner = f(v)
  1244. proxy.innerStart()
  1245. queue.push(inner)
  1246. },
  1247. error: proxy.abort,
  1248. complete: proxy.outerClose,
  1249. })
  1250. })
  1251. }
  1252. func ForkJoin(limit int, all ...Observable) Observable {
  1253. return Observable(func(pub DataPublisher) {
  1254. if len(all) == 0 {
  1255. pub.observer.complete()
  1256. return
  1257. }
  1258. var concurrent = Concurrent(limit, func(yield func(Observable)) {
  1259. for i_, o := range all {
  1260. var index = i_
  1261. yield(o.Map(func(obj Object) Object {
  1262. return ObjPair(obj, ObjInt(index))
  1263. }))
  1264. }
  1265. })
  1266. var ctx, ob = pub.useInheritedContext()
  1267. var vv = createValueVector(len(all))
  1268. pub.run(concurrent, ctx, &observer {
  1269. error: ob.error,
  1270. value: func(obj Object) {
  1271. var v, i_ = GetPair(obj)
  1272. var index = GetInt(i_)
  1273. vv.SetItem(index, v)
  1274. },
  1275. complete: func() {
  1276. if l, ok := vv.GetList(); ok {
  1277. ob.value(l)
  1278. }
  1279. ob.complete()
  1280. },
  1281. })
  1282. })
  1283. }
  1284. func (o Observable) ForkJoin(limit int, another Observable) Observable {
  1285. return ForkJoin(limit, o, another).Map(ListToPair)
  1286. }
  1287. type runQueue struct {
  1288. waiting [] Observable
  1289. running int
  1290. limit int
  1291. publisher DataPublisher
  1292. context *context
  1293. observer *observer
  1294. }
  1295. func createRunQueue(limit int, pub DataPublisher, ctx *context, ob *observer) *runQueue {
  1296. if limit < 1 {
  1297. limit = 1
  1298. }
  1299. var q runQueue
  1300. var proxy = &observer {
  1301. value: ob.value,
  1302. error: ob.error,
  1303. complete: func() {
  1304. ob.complete()
  1305. q.running--
  1306. if len(q.waiting) > 0 {
  1307. var next = q.waiting[0]
  1308. q.waiting[0] = nil
  1309. q.waiting = q.waiting[1:]
  1310. q.publisher.run(next, q.context, q.observer)
  1311. }
  1312. },
  1313. }
  1314. q = runQueue {
  1315. waiting: make([] Observable, 0),
  1316. running: 0,
  1317. limit: limit,
  1318. publisher: pub,
  1319. context: ctx,
  1320. observer: proxy,
  1321. }
  1322. return &q
  1323. }
  1324. func (q *runQueue) push(o Observable) {
  1325. if q.running < q.limit {
  1326. q.running++
  1327. q.publisher.run(o, q.context, q.observer)
  1328. } else {
  1329. q.waiting = append(q.waiting, o)
  1330. }
  1331. }
  1332. func (o Observable) SwitchMap(f func(Object)(Observable)) Observable {
  1333. return Observable(func(pub DataPublisher) {
  1334. var disposeInner func()
  1335. var wrap = func(o Observable) Observable {
  1336. return Observable(func(pub DataPublisher) {
  1337. if disposeInner != nil {
  1338. disposeInner()
  1339. }
  1340. var ctx, dispose, ob = pub.useNewChildContext()
  1341. disposeInner = func() {
  1342. dispose()
  1343. ob.complete()
  1344. }
  1345. pub.run(o, ctx, ob)
  1346. })
  1347. }
  1348. var ctx, dispose, ob = pub.useNewChildContext()
  1349. var proxy = createMergeProxy(ob, dispose)
  1350. pub.run(o, ctx, &observer {
  1351. value: func(v Object) {
  1352. var inner = f(v)
  1353. proxy.innerStart()
  1354. pub.run(wrap(inner), ctx, &observer {
  1355. value: proxy.pass,
  1356. error: proxy.abort,
  1357. complete: proxy.innerExit,
  1358. })
  1359. },
  1360. error: proxy.abort,
  1361. complete: proxy.outerClose,
  1362. })
  1363. })
  1364. }
  1365. func (o Observable) ExhaustMap(f func(Object)(Observable)) Observable {
  1366. return Observable(func(pub DataPublisher) {
  1367. var ctx, dispose, ob = pub.useNewChildContext()
  1368. var proxy = createMergeProxy(ob, dispose)
  1369. pub.run(o, ctx, &observer {
  1370. value: func(v Object) {
  1371. if proxy.innerRunning == 0 {
  1372. var inner = f(v)
  1373. proxy.innerStart()
  1374. pub.run(inner, ctx, &observer {
  1375. value: proxy.pass,
  1376. error: proxy.abort,
  1377. complete: proxy.innerExit,
  1378. })
  1379. }
  1380. },
  1381. error: proxy.abort,
  1382. complete: proxy.outerClose,
  1383. })
  1384. })
  1385. }