-
Notifications
You must be signed in to change notification settings - Fork 1
/
reduce.go
50 lines (48 loc) · 1.18 KB
/
reduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package rxgo
func Reduce(accumulator func(acc, value Value) Value, initialValue Value) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
accValue := make(chan Value, 1)
if initialValue != nil {
accValue <- initialValue
}
return o.Subscribe(
OnNext(func(val Value) {
select {
case acc := <-accValue:
accValue <- accumulator(acc, val)
default:
accValue <- val
}
}).OnComplete(func() {
v.Next(<-accValue)
c.Complete()
}),
).Unsubscribe
})
}
}
func Scan(accumulator func(acc, value Value) Value, initialValue Value) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
accValue := make(chan Value, 1)
if initialValue != nil {
accValue <- initialValue
}
return o.Subscribe(
OnNext(func(val Value) {
select {
case acc := <-accValue:
nextValue := accumulator(acc, val)
v.Next(nextValue)
accValue <- nextValue
default:
accValue <- val
}
}).OnComplete(func() {
c.Complete()
}),
).Unsubscribe
})
}
}