diff --git a/factory.go b/factory.go index 1c10b980..e9104c96 100644 --- a/factory.go +++ b/factory.go @@ -269,11 +269,9 @@ func Merge(observables []Observable, opts ...Option) Observable { } } - go func() { - for _, o := range observables { - f(o) - } - }() + for _, o := range observables { + go f(o) + } go func() { wg.Wait() diff --git a/factory_test.go b/factory_test.go index 9f1a756d..61941a3e 100644 --- a/factory_test.go +++ b/factory_test.go @@ -379,6 +379,27 @@ func Test_Merge_Error(t *testing.T) { Assert(context.Background(), t, obs, IsNotEmpty(), HasError(errFoo)) } +func Test_Merge_Interval(t *testing.T) { + var obs []Observable + ctx, cancel := context.WithCancel(context.Background()) + obs = append(obs, Interval(WithDuration(3*time.Millisecond), WithContext(ctx)). + Take(3). + Map(func(_ context.Context, v interface{}) (interface{}, error) { + return 10 + v.(int), nil + })) + obs = append(obs, Interval(WithDuration(5*time.Millisecond), WithContext(ctx)). + Take(3). + Map(func(_ context.Context, v interface{}) (interface{}, error) { + return 20 + v.(int), nil + })) + + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + Assert(ctx, t, Merge(obs), HasNoError(), HasItemsNoOrder(10, 11, 12, 20, 21, 22)) +} + func Test_Range(t *testing.T) { obs := Range(5, 3) Assert(context.Background(), t, obs, HasItems(5, 6, 7, 8))