diff --git a/README.md b/README.md index 6d5dd521..2213a14d 100644 --- a/README.md +++ b/README.md @@ -438,6 +438,7 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo. * [Buffer](doc/buffer.md) — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time * [FlatMap](doc/flatmap.md) — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable * [GroupBy](doc/groupby.md) — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key +* [GroupByDynamic](doc/groupbydynamic.md) — divide an Observable into a dynamic set of Observables that each emit GroupedObservables from the original Observable, organized by key * [Map](doc/map.md) — transform the items emitted by an Observable by applying a function to each item * [Marshal](doc/marshal.md) — transform the items emitted by an Observable by applying a marshalling function to each item * [Scan](doc/scan.md) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value diff --git a/doc/groupby.md b/doc/groupby.md index 13ca680c..8d126912 100644 --- a/doc/groupby.md +++ b/doc/groupby.md @@ -4,6 +4,8 @@ Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key. +It requires to pass the length of the set. If we require a dynamic set, we need to use [GroupByDynamic](groupbydynamic.md) instead. + ![](http://reactivex.io/documentation/operators/images/groupBy.c.png) ## Example diff --git a/doc/groupbydynamic.md b/doc/groupbydynamic.md new file mode 100644 index 00000000..e6e3c56d --- /dev/null +++ b/doc/groupbydynamic.md @@ -0,0 +1,58 @@ +# GroupBy Operator + +## Overview + +Divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key. + +`GroupByDyDynamic` differs from [GroupBy](groupby.md) in the sense that it does not require to pass the set length. + +![](http://reactivex.io/documentation/operators/images/groupBy.c.png) + +## Example + +```go +count := 3 +observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) int { + return item.V.(int) % count +}, rxgo.WithBufferedChannel(10)) + +for i := range observable.Observe() { + groupedObservable := i.V.(rxgo.GroupedObservable) + fmt.Printf("New observable: %d\n", groupedObservable.Key) + + for i := range groupedObservable.Observe() { + fmt.Printf("item: %v\n", i.V) + } +} +``` + +Output: + +``` +New observable: 0 +item: 0 +item: 3 +item: 6 +item: 9 +New observable: 1 +item: 1 +item: 4 +item: 7 +item: 10 +New observable: 2 +item: 2 +item: 5 +item: 8 +``` + +## Options + +* [WithBufferedChannel](options.md#withbufferedchannel) + +* [WithContext](options.md#withcontext) + +* [WithObservationStrategy](options.md#withobservationstrategy) + +* [WithErrorStrategy](options.md#witherrorstrategy) + +* [WithPublishStrategy](options.md#withpublishstrategy) \ No newline at end of file diff --git a/go.sum b/go.sum index 3b94ef9d..408a5fe6 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,10 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -31,6 +33,7 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/observable.go b/observable.go index 247fb65f..aba2f3f5 100644 --- a/observable.go +++ b/observable.go @@ -1,3 +1,4 @@ +// Package rxgo is the main RxGo package. package rxgo import ( @@ -44,6 +45,7 @@ type Observable interface { FlatMap(apply ItemToObservable, opts ...Option) Observable ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed GroupBy(length int, distribution func(Item) int, opts ...Option) Observable + GroupByDynamic(distribution func(Item) int, opts ...Option) Observable IgnoreElements(opts ...Option) Observable Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable Last(opts ...Option) OptionalSingle @@ -179,7 +181,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...) } }() - runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, bypassGather, option, mergedOptions...) + runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, option, mergedOptions...) return next }), } @@ -383,7 +385,7 @@ func runParallel(ctx context.Context, next chan Item, observe <-chan Item, opera }() } -func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item, observe <-chan Item, next chan Item, operatorFactory func() operator, bypassGather bool, option Option, opts ...Option) { +func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item, observe <-chan Item, next chan Item, operatorFactory func() operator, option Option, opts ...Option) { go func() { op := operatorFactory() stopped := false diff --git a/observable_operator.go b/observable_operator.go index 01235ea1..7f1cc340 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -1320,6 +1320,57 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts . } } +// GroupedObservable is the observable type emitted by the GroupByDynamic operator. +type GroupedObservable struct { + Observable + // Key is the distribution key + Key int +} + +// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key. +func (o *ObservableImpl) GroupByDynamic(distribution func(Item) int, opts ...Option) Observable { + option := parseOptions(opts...) + next := option.buildChannel() + ctx := option.buildContext() + chs := make(map[int]chan Item) + + go func() { + observe := o.Observe(opts...) + loop: + for { + select { + case <-ctx.Done(): + break loop + case i, ok := <-observe: + if !ok { + break loop + } + idx := distribution(i) + ch, contains := chs[idx] + if !contains { + ch = option.buildChannel() + chs[idx] = ch + Of(GroupedObservable{ + Observable: &ObservableImpl{ + iterable: newChannelIterable(ch), + }, + Key: idx, + }).SendContext(ctx, next) + } + i.SendContext(ctx, ch) + } + } + for _, ch := range chs { + close(ch) + } + close(next) + }() + + return &ObservableImpl{ + iterable: newChannelIterable(next), + } +} + // Last returns a new Observable which emit only last item. // Cannot be run in parallel. func (o *ObservableImpl) Last(opts ...Option) OptionalSingle { @@ -1682,7 +1733,6 @@ func (op *repeatOperator) end(ctx context.Context, dst chan<- Item) { for { select { default: - break case <-ctx.Done(): return } @@ -2241,7 +2291,7 @@ func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable // SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32. func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle { - return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if acc == nil { acc = float32(0) } @@ -2267,7 +2317,7 @@ func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle { // SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64. func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle { - return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if acc == nil { acc = float64(0) } @@ -2295,7 +2345,7 @@ func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle { // SumInt64 calculates the average of integers emitted by an Observable and emits an int64. func (o *ObservableImpl) SumInt64(opts ...Option) OptionalSingle { - return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if acc == nil { acc = int64(0) } diff --git a/observable_operator_bench_test.go b/observable_operator_bench_test.go index d3f16e89..f459f9df 100644 --- a/observable_operator_bench_test.go +++ b/observable_operator_bench_test.go @@ -63,7 +63,7 @@ func Benchmark_Reduce_Sequential(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() obs := Range(0, benchNumberOfElementsSmall, WithBufferedChannel(benchChannelCap)). - Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { // Simulate a blocking IO call time.Sleep(5 * time.Millisecond) if a, ok := acc.(int); ok { @@ -84,7 +84,7 @@ func Benchmark_Reduce_Parallel(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() obs := Range(0, benchNumberOfElementsSmall, WithBufferedChannel(benchChannelCap)). - Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { // Simulate a blocking IO call time.Sleep(5 * time.Millisecond) if a, ok := acc.(int); ok { diff --git a/observable_operator_test.go b/observable_operator_test.go index 7bdc9314..c636a25d 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -933,11 +933,42 @@ func Test_Observable_GroupBy_Error(t *testing.T) { Assert(ctx, t, s[2].(Observable), HasAnError()) } +func Test_Observable_GroupByDynamic(t *testing.T) { + defer goleak.VerifyNone(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + count := 3 + max := 10 + + obs := Range(0, max).GroupByDynamic(func(item Item) int { + if item.V == 10 { + return 10 + } + return item.V.(int) % count + }, WithBufferedChannel(max)) + s, err := obs.ToSlice(0) + if err != nil { + assert.FailNow(t, err.Error()) + } + if len(s) != 4 { + assert.FailNow(t, "length", "got=%d, expected=%d", len(s), 4) + } + + Assert(ctx, t, s[0].(GroupedObservable), HasItems(0, 3, 6, 9), HasNoError()) + assert.Equal(t, 0, s[0].(GroupedObservable).Key) + Assert(ctx, t, s[1].(GroupedObservable), HasItems(1, 4, 7), HasNoError()) + assert.Equal(t, 1, s[1].(GroupedObservable).Key) + Assert(ctx, t, s[2].(GroupedObservable), HasItems(2, 5, 8), HasNoError()) + assert.Equal(t, 2, s[2].(GroupedObservable).Key) + Assert(ctx, t, s[3].(GroupedObservable), HasItems(10), HasNoError()) + assert.Equal(t, 10, s[3].(GroupedObservable).Key) +} + func joinTest(ctx context.Context, t *testing.T, left, right []interface{}, window Duration, expected []int64) { leftObs := testObservable(ctx, left...) rightObs := testObservable(ctx, right...) - obs := leftObs.Join(func(ctx context.Context, l interface{}, r interface{}) (interface{}, error) { + obs := leftObs.Join(func(ctx context.Context, l, r interface{}) (interface{}, error) { return map[string]interface{}{ "l": l, "r": r, @@ -1272,7 +1303,7 @@ func Test_Observable_Max(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).Max(func(e1 interface{}, e2 interface{}) int { + obs := Range(0, 10000).Max(func(e1, e2 interface{}) int { i1 := e1.(int) i2 := e2.(int) if i1 > i2 { @@ -1290,7 +1321,7 @@ func Test_Observable_Max_Parallel(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).Max(func(e1 interface{}, e2 interface{}) int { + obs := Range(0, 10000).Max(func(e1, e2 interface{}) int { var i1 int if e1 == nil { i1 = 0 @@ -1320,7 +1351,7 @@ func Test_Observable_Min(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).Min(func(e1 interface{}, e2 interface{}) int { + obs := Range(0, 10000).Min(func(e1, e2 interface{}) int { i1 := e1.(int) i2 := e2.(int) if i1 > i2 { @@ -1338,7 +1369,7 @@ func Test_Observable_Min_Parallel(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(0, 10000).Min(func(e1 interface{}, e2 interface{}) int { + obs := Range(0, 10000).Min(func(e1, e2 interface{}) int { i1 := e1.(int) i2 := e2.(int) if i1 > i2 { @@ -1396,7 +1427,7 @@ func Test_Observable_Reduce(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(1, 10000).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := Range(1, 10000).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if a, ok := acc.(int); ok { if b, ok := elem.(int); ok { return a + b, nil @@ -1413,7 +1444,7 @@ func Test_Observable_Reduce_Empty(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Empty().Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := Empty().Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { return 0, nil }) Assert(ctx, t, obs, IsEmpty(), HasNoError()) @@ -1423,7 +1454,7 @@ func Test_Observable_Reduce_Error(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := testObservable(ctx, 1, 2, errFoo, 4, 5).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := testObservable(ctx, 1, 2, errFoo, 4, 5).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { return 0, nil }) Assert(ctx, t, obs, IsEmpty(), HasError(errFoo)) @@ -1433,7 +1464,7 @@ func Test_Observable_Reduce_ReturnError(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := testObservable(ctx, 1, 2, 3).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := testObservable(ctx, 1, 2, 3).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if elem == 2 { return 0, errFoo } @@ -1446,7 +1477,7 @@ func Test_Observable_Reduce_Parallel(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(1, 10000).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := Range(1, 10000).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if a, ok := acc.(int); ok { if b, ok := elem.(int); ok { return a + b, nil @@ -1463,7 +1494,7 @@ func Test_Observable_Reduce_Parallel_Error(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(1, 10000).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := Range(1, 10000).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if elem == 1000 { return nil, errFoo } @@ -1483,7 +1514,7 @@ func Test_Observable_Reduce_Parallel_WithErrorStrategy(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := Range(1, 10000).Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { + obs := Range(1, 10000).Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) { if elem == 1 { return nil, errFoo } @@ -1636,7 +1667,7 @@ func Test_Observable_Scan(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := testObservable(ctx, 1, 2, 3, 4, 5).Scan(func(_ context.Context, x interface{}, y interface{}) (interface{}, error) { + obs := testObservable(ctx, 1, 2, 3, 4, 5).Scan(func(_ context.Context, x, y interface{}) (interface{}, error) { if x == nil { return y, nil } @@ -1649,7 +1680,7 @@ func Test_Observable_Scan_Parallel(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obs := testObservable(ctx, 1, 2, 3, 4, 5).Scan(func(_ context.Context, x interface{}, y interface{}) (interface{}, error) { + obs := testObservable(ctx, 1, 2, 3, 4, 5).Scan(func(_ context.Context, x, y interface{}) (interface{}, error) { if x == nil { return y, nil } @@ -2278,7 +2309,7 @@ func Test_Observable_ZipFromObservable(t *testing.T) { defer cancel() obs1 := testObservable(ctx, 1, 2, 3) obs2 := testObservable(ctx, 10, 20, 30) - zipper := func(_ context.Context, elem1 interface{}, elem2 interface{}) (interface{}, error) { + zipper := func(_ context.Context, elem1, elem2 interface{}) (interface{}, error) { switch v1 := elem1.(type) { case int: switch v2 := elem2.(type) { @@ -2298,7 +2329,7 @@ func Test_Observable_ZipFromObservable_DifferentLength1(t *testing.T) { defer cancel() obs1 := testObservable(ctx, 1, 2, 3) obs2 := testObservable(ctx, 10, 20) - zipper := func(_ context.Context, elem1 interface{}, elem2 interface{}) (interface{}, error) { + zipper := func(_ context.Context, elem1, elem2 interface{}) (interface{}, error) { switch v1 := elem1.(type) { case int: switch v2 := elem2.(type) { @@ -2318,7 +2349,7 @@ func Test_Observable_ZipFromObservable_DifferentLength2(t *testing.T) { defer cancel() obs1 := testObservable(ctx, 1, 2) obs2 := testObservable(ctx, 10, 20, 30) - zipper := func(_ context.Context, elem1 interface{}, elem2 interface{}) (interface{}, error) { + zipper := func(_ context.Context, elem1, elem2 interface{}) (interface{}, error) { switch v1 := elem1.(type) { case int: switch v2 := elem2.(type) { diff --git a/optionalsingle_test.go b/optionalsingle_test.go index b5309471..99ac834a 100644 --- a/optionalsingle_test.go +++ b/optionalsingle_test.go @@ -43,7 +43,7 @@ func Test_OptionalSingle_Get_ContextCanceled(t *testing.T) { func Test_OptionalSingle_Map(t *testing.T) { defer goleak.VerifyNone(t) - single := Just(1)().Max(func(_ interface{}, _ interface{}) int { + single := Just(1)().Max(func(_, _ interface{}) int { return 1 }).Map(func(_ context.Context, i interface{}) (interface{}, error) { return i.(int) + 1, nil