2 Commits 43350c059f ... df116fce02

Author SHA1 Message Date
  mizusato df116fce02 switch-map, concat, merge 4 years ago
  mizusato 22954aa64c fix 4 years ago
4 changed files with 96 additions and 2 deletions
  1. 2 2
      runtime/common/value.go
  2. 28 0
      runtime/lib/effect/concat.go
  3. 25 0
      runtime/lib/effect/merge.go
  4. 41 0
      runtime/lib/effect/switch.go

+ 2 - 2
runtime/common/value.go

@@ -15,8 +15,8 @@ type PlainValue struct {
 
 func (impl SumValue) RuntimeValue() {}
 type SumValue struct {
-	Index Short
-	Value Value
+	Index  Short
+	Value  Value
 }
 
 func (impl ProductValue) RuntimeValue() {}

+ 28 - 0
runtime/lib/effect/concat.go

@@ -3,9 +3,37 @@ package effect
 import (
 	"context"
 	. "kumachan/runtime/common"
+	"kumachan/runtime/lib/container"
 )
 
+func Concat (seq container.Seq, concurrent uint) Effect {
+	if concurrent == 0 { panic("invalid concurrent amount") }
+	return Effect { Action: func(r EffectRunner, ob *Observer) {
+		var ctx, dispose = context.WithCancel(ob.Context)
+		var c = CollectorFrom(ob, ctx, dispose)
+		var q = QueueRunnerFrom(r, concurrent)
+		for v,rest,exists := seq.Next(); exists; v,rest,exists = rest.Next() {
+			var item = EffectFrom(v)
+			c.NewChild()
+			q.Run(item, &Observer {
+				Context:  ctx,
+				Next: func(v Value) {
+					c.Pass(v)
+				},
+				Error: func(e Value) {
+					c.Throw(e)
+				},
+				Complete: func() {
+					c.DeleteChild()
+				},
+			})
+		}
+		c.ParentComplete()
+	} }
+}
+
 func (e Effect) ConcatMap(f func(Value)Value, concurrent uint) Effect {
+	if concurrent == 0 { panic("invalid concurrent amount") }
 	return Effect { Action: func(r EffectRunner, ob *Observer) {
 		var ctx, dispose = context.WithCancel(ob.Context)
 		var c = CollectorFrom(ob, ctx, dispose)

+ 25 - 0
runtime/lib/effect/merge.go

@@ -3,8 +3,33 @@ package effect
 import (
 	"context"
 	. "kumachan/runtime/common"
+	"kumachan/runtime/lib/container"
 )
 
+func Merge (seq container.Seq) Effect {
+	return Effect { Action: func(r EffectRunner, ob *Observer) {
+		var ctx, dispose = context.WithCancel(ob.Context)
+		var c = CollectorFrom(ob, ctx, dispose)
+		for v,rest,exists := seq.Next(); exists; v,rest,exists = rest.Next() {
+			var item = EffectFrom(v)
+			c.NewChild()
+			r.Run(item, &Observer {
+				Context: ctx,
+				Next: func(v Value) {
+					c.Pass(v)
+				},
+				Error: func(e Value) {
+					c.Throw(e)
+				},
+				Complete: func() {
+					c.DeleteChild()
+				},
+			})
+		}
+		c.ParentComplete()
+	} }
+}
+
 func (e Effect) MergeMap(f func(Value)Value) Effect {
 	return Effect { Action: func(r EffectRunner, ob *Observer) {
 		var ctx, dispose = context.WithCancel(ob.Context)

+ 41 - 0
runtime/lib/effect/switch.go

@@ -0,0 +1,41 @@
+package effect
+
+import (
+	"context"
+	. "kumachan/runtime/common"
+)
+
+func (e Effect) SwitchMap(f func(Value)Value) Effect {
+	return Effect { Action: func(r EffectRunner, ob *Observer) {
+		var ctx, dispose = context.WithCancel(ob.Context)
+		var c = CollectorFrom(ob, ctx, dispose)
+		var cur_ctx, cur_dispose = context.WithCancel(ctx)
+		r.Run(e, &Observer {
+			Context:  ctx,
+			Next: func(v Value) {
+				var item = EffectFrom(f(v))
+				c.NewChild()
+				cur_dispose()
+				cur_ctx, cur_dispose = context.WithCancel(ctx)
+				r.Run(item, &Observer {
+					Context:  cur_ctx,
+					Next: func(v Value) {
+						c.Pass(v)
+					},
+					Error: func(e Value) {
+						c.Throw(e)
+					},
+					Complete: func() {
+						c.DeleteChild()
+					},
+				})
+			},
+			Error: func(e Value) {
+				c.Throw(e)
+			},
+			Complete: func() {
+				c.ParentComplete()
+			},
+		})
+	} }
+}