Skip to content

Commit

Permalink
Merge pull request #245 from iwaltgen/merge-operator-each-observable-…
Browse files Browse the repository at this point in the history
…goroutine

Fix Merge Operator always emit values sequentially
  • Loading branch information
teivah authored Apr 28, 2020
2 parents 8e8d5ba + be630be commit 37fcd00
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
8 changes: 3 additions & 5 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,9 @@ func Merge(observables []Observable, opts ...Option) Observable {
}
}

go func() {
for _, o := range observables {
f(o)
}
}()
for _, o := range observables {
go f(o)
}

go func() {
wg.Wait()
Expand Down
21 changes: 21 additions & 0 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,27 @@ func Test_Merge_Error(t *testing.T) {
Assert(context.Background(), t, obs, IsNotEmpty(), HasError(errFoo))
}

func Test_Merge_Interval(t *testing.T) {
var obs []Observable
ctx, cancel := context.WithCancel(context.Background())
obs = append(obs, Interval(WithDuration(3*time.Millisecond), WithContext(ctx)).
Take(3).
Map(func(_ context.Context, v interface{}) (interface{}, error) {
return 10 + v.(int), nil
}))
obs = append(obs, Interval(WithDuration(5*time.Millisecond), WithContext(ctx)).
Take(3).
Map(func(_ context.Context, v interface{}) (interface{}, error) {
return 20 + v.(int), nil
}))

go func() {
time.Sleep(50 * time.Millisecond)
cancel()
}()
Assert(ctx, t, Merge(obs), HasNoError(), HasItemsNoOrder(10, 11, 12, 20, 21, 22))
}

func Test_Range(t *testing.T) {
obs := Range(5, 3)
Assert(context.Background(), t, obs, HasItems(5, 6, 7, 8))
Expand Down

0 comments on commit 37fcd00

Please sign in to comment.