-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlatest.go
47 lines (46 loc) · 1.02 KB
/
latest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package rxgo
func WithLatestFrom(other Observable) OperatorFunc {
return func(obs Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
baseObs := NewChanObserver()
baseSub := obs.Subscribe(baseObs)
otherObs := NewChanObserver()
otherSub := other.Subscribe(otherObs)
go func() {
var a Value
var b Value
LOOP:
for {
select {
case val := <-baseObs.ValueChan:
a = val
if a != nil && b != nil {
v.Next([]Value{a, b})
}
case otherVal := <-otherObs.ValueChan:
b = otherVal
if a != nil && b != nil {
v.Next([]Value{a, b})
}
case <-baseObs.CompleteChan:
c.Complete()
break LOOP
case <-otherObs.CompleteChan:
c.Complete()
break LOOP
case err := <-baseObs.ErrChan:
e.Error(err)
break LOOP
case err := <-otherObs.ErrChan:
e.Error(err)
break LOOP
}
}
}()
return call(
baseSub.Unsubscribe,
otherSub.Unsubscribe,
)
})
}
}