Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy dynamic distribution function type change #267

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions doc/groupbydynamic.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@

Divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

`GroupByDyDynamic` differs from [GroupBy](groupby.md) in the sense that it does not require to pass the set length.
`GroupByDyDynamic` differs from [GroupBy](groupby.md) for two reasons:
* We don't need to pass a fixed set length.
* The distribution function is a `func(rxgo.Item) string` instead of a `func(rxgo.Item) int`. The rationale is because of possible collisions. For example, if our distribution function produces 128-bit UUIDs, there is a collision risk if such a UUID has to be casted into an int.

![](http://reactivex.io/documentation/operators/images/groupBy.c.png)

## Example

```go
count := 3
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) int {
return item.V.(int) % count
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) string {
return strconv.Itoa(item.V.(int) % count)
}, rxgo.WithBufferedChannel(10))

for i := range observable.Observe() {
Expand Down
2 changes: 1 addition & 1 deletion observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Observable interface {
FlatMap(apply ItemToObservable, opts ...Option) Observable
ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
GroupByDynamic(distribution func(Item) int, opts ...Option) Observable
GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
IgnoreElements(opts ...Option) Observable
Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
Last(opts ...Option) OptionalSingle
Expand Down
6 changes: 3 additions & 3 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,15 +1353,15 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts .
type GroupedObservable struct {
Observable
// Key is the distribution key
Key int
Key string
}

// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) int, opts ...Option) Observable {
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
chs := make(map[int]chan Item)
chs := make(map[string]chan Item)

go func() {
observe := o.Observe(opts...)
Expand Down
15 changes: 8 additions & 7 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -960,11 +961,11 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
count := 3
max := 10

obs := Range(0, max).GroupByDynamic(func(item Item) int {
obs := Range(0, max).GroupByDynamic(func(item Item) string {
if item.V == 10 {
return 10
return "10"
}
return item.V.(int) % count
return strconv.Itoa(item.V.(int) % count)
}, WithBufferedChannel(max))
s, err := obs.ToSlice(0)
if err != nil {
Expand All @@ -975,13 +976,13 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
}

Assert(ctx, t, s[0].(GroupedObservable), HasItems(0, 3, 6, 9), HasNoError())
assert.Equal(t, 0, s[0].(GroupedObservable).Key)
assert.Equal(t, "0", s[0].(GroupedObservable).Key)
Assert(ctx, t, s[1].(GroupedObservable), HasItems(1, 4, 7), HasNoError())
assert.Equal(t, 1, s[1].(GroupedObservable).Key)
assert.Equal(t, "1", s[1].(GroupedObservable).Key)
Assert(ctx, t, s[2].(GroupedObservable), HasItems(2, 5, 8), HasNoError())
assert.Equal(t, 2, s[2].(GroupedObservable).Key)
assert.Equal(t, "2", s[2].(GroupedObservable).Key)
Assert(ctx, t, s[3].(GroupedObservable), HasItems(10), HasNoError())
assert.Equal(t, 10, s[3].(GroupedObservable).Key)
assert.Equal(t, "10", s[3].(GroupedObservable).Key)
}

func joinTest(ctx context.Context, t *testing.T, left, right []interface{}, window Duration, expected []int64) {
Expand Down