|
@@ -3,9 +3,37 @@ package effect
|
|
|
import (
|
|
|
"context"
|
|
|
. "kumachan/runtime/common"
|
|
|
+ "kumachan/runtime/lib/container"
|
|
|
)
|
|
|
|
|
|
+func Concat (seq container.Seq, concurrent uint) Effect {
|
|
|
+ if concurrent == 0 { panic("invalid concurrent amount") }
|
|
|
+ return Effect { Action: func(r EffectRunner, ob *Observer) {
|
|
|
+ var ctx, dispose = context.WithCancel(ob.Context)
|
|
|
+ var c = CollectorFrom(ob, ctx, dispose)
|
|
|
+ var q = QueueRunnerFrom(r, concurrent)
|
|
|
+ for v,rest,exists := seq.Next(); exists; v,rest,exists = rest.Next() {
|
|
|
+ var item = EffectFrom(v)
|
|
|
+ c.NewChild()
|
|
|
+ q.Run(item, &Observer {
|
|
|
+ Context: ctx,
|
|
|
+ Next: func(v Value) {
|
|
|
+ c.Pass(v)
|
|
|
+ },
|
|
|
+ Error: func(e Value) {
|
|
|
+ c.Throw(e)
|
|
|
+ },
|
|
|
+ Complete: func() {
|
|
|
+ c.DeleteChild()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ c.ParentComplete()
|
|
|
+ } }
|
|
|
+}
|
|
|
+
|
|
|
func (e Effect) ConcatMap(f func(Value)Value, concurrent uint) Effect {
|
|
|
+ if concurrent == 0 { panic("invalid concurrent amount") }
|
|
|
return Effect { Action: func(r EffectRunner, ob *Observer) {
|
|
|
var ctx, dispose = context.WithCancel(ob.Context)
|
|
|
var c = CollectorFrom(ob, ctx, dispose)
|