-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmap.go
146 lines (136 loc) · 3.7 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package rxgo
import (
"sync"
)
// Map applies the provided mapping func to each value emitted by the
// source observable and returns a new observable of the resulting values.
func Map(fn func(v Value) Value) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
return o.Subscribe(
OnNext(func(val Value) {
v <- fn(val)
}).OnErr(func(err error) {
e <- err
}).OnComplete(func() {
c <- true
}),
).Unsubscribe
})
}
}
// MapTo returns an observable that emits the provided value for each value
// emitted by the source observable.
func MapTo(value Value) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
return o.Subscribe(
OnNext(func(val Value) {
v <- value
}).OnErr(func(err error) {
e <- err
}).OnComplete(func() {
c <- true
}),
).Unsubscribe
})
}
}
// MergeMap applies the provided func to each value emitted by the source observable
// and subscribes to the resulting observable. It returns a new observable that merges
// the emitted values from all the inner observables and that completes onces all inner
// observables have completed.
func MergeMap(fn func(v Value) Observable) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
psub := NewSubscription(nil)
var wg sync.WaitGroup
sub := o.Subscribe(
OnNext(
func(val Value) {
wg.Add(1)
csub := fn(val).
Subscribe(
OnNext(v.Next).
OnErr(e.Error).
OnComplete(wg.Done),
)
psub.Add(csub.Unsubscribe)
},
).
OnErr(e.Error).
OnComplete(func() {
wg.Wait()
c.Complete()
}),
)
psub.Add(sub.Unsubscribe)
return psub.Unsubscribe
})
}
}
// FlatMap is an alias of MergeMap.
func FlatMap(fn func(v Value) Observable) OperatorFunc {
return MergeMap(fn)
}
func ConcatMap(fn func(v Value) Observable) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
var wg sync.WaitGroup
return o.Subscribe(
OnNext(func(val Value) {
wg.Add(1)
fn(val).Subscribe(OnNext(v.Next).OnErr(e.Error).OnComplete(wg.Done)).Wait()
}).OnErr(e.Error).OnComplete(func() {
wg.Wait()
c.Complete()
}),
).Unsubscribe
})
}
}
func ConcatMapTo(o Observable) OperatorFunc {
return ConcatMap(func(v Value) Observable { return o })
}
func SwitchMap(fn func(v Value) Observable) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
var wg sync.WaitGroup
next := NewSubject()
return o.Subscribe(
OnNext(func(val Value) {
wg.Add(1)
next.Next(nil)
fn(val).Pipe(TakeUntil(next)).Subscribe(OnNext(v.Next).OnErr(e.Error).OnComplete(wg.Done))
}).OnErr(e.Error).OnComplete(call(wg.Wait, c.Complete)),
).Unsubscribe
})
}
}
func SwitchMapTo(o Observable) OperatorFunc {
return SwitchMap(func(v Value) Observable { return o })
}
func ExhaustMap(fn func(v Value) Observable) OperatorFunc {
return func(o Observable) Observable {
sem := make(chan bool, 1)
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
var wg sync.WaitGroup
return o.Subscribe(
OnNext(func(val Value) {
select {
case sem <- true:
wg.Add(1)
fn(val).Subscribe(OnNext(v.Next).OnErr(e.Error).OnComplete(func() {
<-sem
wg.Done()
}))
default:
}
}).OnErr(e.Error).OnComplete(func() {
wg.Wait()
c.Complete()
}),
).Unsubscribe
})
}
}