Skip to content

Commit

Permalink
GroupByDynamic operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah authored Oct 2, 2020
1 parent b8221f3 commit d6b151a
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 26 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
* [Buffer](doc/buffer.md) — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
* [FlatMap](doc/flatmap.md) — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
* [GroupBy](doc/groupby.md) — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
* [GroupByDynamic](doc/groupbydynamic.md) — divide an Observable into a dynamic set of Observables that each emit GroupedObservables from the original Observable, organized by key
* [Map](doc/map.md) — transform the items emitted by an Observable by applying a function to each item
* [Marshal](doc/marshal.md) — transform the items emitted by an Observable by applying a marshalling function to each item
* [Scan](doc/scan.md) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
Expand Down
2 changes: 2 additions & 0 deletions doc/groupby.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.

It requires to pass the length of the set. If we require a dynamic set, we need to use [GroupByDynamic](groupbydynamic.md) instead.

![](http://reactivex.io/documentation/operators/images/groupBy.c.png)

## Example
Expand Down
58 changes: 58 additions & 0 deletions doc/groupbydynamic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# GroupBy Operator

## Overview

Divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

`GroupByDyDynamic` differs from [GroupBy](groupby.md) in the sense that it does not require to pass the set length.

![](http://reactivex.io/documentation/operators/images/groupBy.c.png)

## Example

```go
count := 3
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) int {
return item.V.(int) % count
}, rxgo.WithBufferedChannel(10))

for i := range observable.Observe() {
groupedObservable := i.V.(rxgo.GroupedObservable)
fmt.Printf("New observable: %d\n", groupedObservable.Key)

for i := range groupedObservable.Observe() {
fmt.Printf("item: %v\n", i.V)
}
}
```

Output:

```
New observable: 0
item: 0
item: 3
item: 6
item: 9
New observable: 1
item: 1
item: 4
item: 7
item: 10
New observable: 2
item: 2
item: 5
item: 8
```

## Options

* [WithBufferedChannel](options.md#withbufferedchannel)

* [WithContext](options.md#withcontext)

* [WithObservationStrategy](options.md#withobservationstrategy)

* [WithErrorStrategy](options.md#witherrorstrategy)

* [WithPublishStrategy](options.md#withpublishstrategy)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -31,6 +33,7 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
6 changes: 4 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package rxgo is the main RxGo package.
package rxgo

import (
Expand Down Expand Up @@ -44,6 +45,7 @@ type Observable interface {
FlatMap(apply ItemToObservable, opts ...Option) Observable
ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
GroupByDynamic(distribution func(Item) int, opts ...Option) Observable
IgnoreElements(opts ...Option) Observable
Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
Last(opts ...Option) OptionalSingle
Expand Down Expand Up @@ -179,7 +181,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...)
}
}()
runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, bypassGather, option, mergedOptions...)
runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, option, mergedOptions...)
return next
}),
}
Expand Down Expand Up @@ -383,7 +385,7 @@ func runParallel(ctx context.Context, next chan Item, observe <-chan Item, opera
}()
}

func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item, observe <-chan Item, next chan Item, operatorFactory func() operator, bypassGather bool, option Option, opts ...Option) {
func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item, observe <-chan Item, next chan Item, operatorFactory func() operator, option Option, opts ...Option) {
go func() {
op := operatorFactory()
stopped := false
Expand Down
58 changes: 54 additions & 4 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,57 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts .
}
}

// GroupedObservable is the observable type emitted by the GroupByDynamic operator.
type GroupedObservable struct {
Observable
// Key is the distribution key
Key int
}

// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) int, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
chs := make(map[int]chan Item)

go func() {
observe := o.Observe(opts...)
loop:
for {
select {
case <-ctx.Done():
break loop
case i, ok := <-observe:
if !ok {
break loop
}
idx := distribution(i)
ch, contains := chs[idx]
if !contains {
ch = option.buildChannel()
chs[idx] = ch
Of(GroupedObservable{
Observable: &ObservableImpl{
iterable: newChannelIterable(ch),
},
Key: idx,
}).SendContext(ctx, next)
}
i.SendContext(ctx, ch)
}
}
for _, ch := range chs {
close(ch)
}
close(next)
}()

return &ObservableImpl{
iterable: newChannelIterable(next),
}
}

// Last returns a new Observable which emit only last item.
// Cannot be run in parallel.
func (o *ObservableImpl) Last(opts ...Option) OptionalSingle {
Expand Down Expand Up @@ -1682,7 +1733,6 @@ func (op *repeatOperator) end(ctx context.Context, dst chan<- Item) {
for {
select {
default:
break
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -2241,7 +2291,7 @@ func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable

// SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.
func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle {
return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) {
if acc == nil {
acc = float32(0)
}
Expand All @@ -2267,7 +2317,7 @@ func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle {

// SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.
func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle {
return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) {
if acc == nil {
acc = float64(0)
}
Expand Down Expand Up @@ -2295,7 +2345,7 @@ func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle {

// SumInt64 calculates the average of integers emitted by an Observable and emits an int64.
func (o *ObservableImpl) SumInt64(opts ...Option) OptionalSingle {
return o.Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
return o.Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) {
if acc == nil {
acc = int64(0)
}
Expand Down
4 changes: 2 additions & 2 deletions observable_operator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Benchmark_Reduce_Sequential(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
obs := Range(0, benchNumberOfElementsSmall, WithBufferedChannel(benchChannelCap)).
Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) {
// Simulate a blocking IO call
time.Sleep(5 * time.Millisecond)
if a, ok := acc.(int); ok {
Expand All @@ -84,7 +84,7 @@ func Benchmark_Reduce_Parallel(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
obs := Range(0, benchNumberOfElementsSmall, WithBufferedChannel(benchChannelCap)).
Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
Reduce(func(_ context.Context, acc, elem interface{}) (interface{}, error) {
// Simulate a blocking IO call
time.Sleep(5 * time.Millisecond)
if a, ok := acc.(int); ok {
Expand Down
Loading

0 comments on commit d6b151a

Please sign in to comment.