Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BufferWithTimeOrCount buffer emission #289

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,10 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
return customObservableOperator(o.parent, f, opts...)
}

// BufferWithTimeOrCount returns an Observable that emits buffers of items it collects from the source
// Observable either from a given count or at a given time interval.
// BufferWithTimeOrCount returns an Observable that emits buffers, of max size
// count, of items it collects from the source Observable, or if the timespan
// has elapsed, whatever was collected in the buffer since the last emitted
// buffer.
func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable {
if timespan == nil {
return Thrown(IllegalInputError{error: "timespan must no be nil"})
Expand All @@ -528,16 +530,37 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
send := make(chan struct{})
mutex := sync.Mutex{}

checkBuffer := func() {
// checkBuffer will send buffered units of at most size count, unless
// flush is true.
checkBuffer := func(flush bool) {
mutex.Lock()
if len(buffer) != 0 {
if !Of(buffer).SendContext(ctx, next) {
mutex.Unlock()
return
defer mutex.Unlock()

length := len(buffer)
if length != 0 {
var last int
defer func() {
// create a copy of buffer, less whatever was already sent
t := make([]interface{}, length-(last+1))
copy(t, buffer[last+1:length])
teivah marked this conversation as resolved.
Show resolved Hide resolved
buffer = t
}()

for i := 0; i < length; i += count {
teivah marked this conversation as resolved.
Show resolved Hide resolved
high := i + count
if high > length {
if flush {
high = length
} else {
return
}
}
if !Of(buffer[i:high]).SendContext(ctx, next) {
return
}
last = high - 1
}
buffer = make([]interface{}, 0)
}
mutex.Unlock()
}

go func() {
Expand All @@ -546,14 +569,14 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
for {
select {
case <-send:
checkBuffer()
checkBuffer(false)
case <-stop:
checkBuffer()
checkBuffer(true)
return
case <-ctx.Done():
return
case <-time.After(duration):
checkBuffer()
checkBuffer(true)
}
}
}()
Expand Down
188 changes: 184 additions & 4 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,186 @@ func Test_Observable_BufferWithTimeOrCount(t *testing.T) {
}))
}

func Test_Observable_BufferWithTimeOrCount_DoesNotExceedBufferSize(t *testing.T) {
observable := Range(1, 5)
buffers := observable.BufferWithTimeOrCount(WithDuration(time.Millisecond*500), 2)
var seen int
Assert(context.Background(), t, buffers, CustomPredicate(func(items []interface{}) error {
for _, item := range items {
buffer := item.([]interface{})
if len(buffer) > 2 {
return errors.New("items should not be greater than two")
}
for _, entry := range buffer {
value := entry.(int)
if value-1 != seen {
return fmt.Errorf("items should consecutive, %d does not follow %d", value, seen)
}
seen = value
}
}
return nil
}))
}

func delayItem(ctx context.Context, item interface{}) (interface{}, error) {
entry := item.(struct {
name string
msDelay int
})
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Duration(entry.msDelay) * time.Millisecond):
return entry.name, nil
}
}

func Test_Observable_BufferWithTimeOrCount_ElapsedCapturesPartial(t *testing.T) {
defer goleak.VerifyNone(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()

observable := Just([]struct {
name string
msDelay int
}{
{"a", 5},
{"b", 1000},
})().Map(delayItem, WithContext(ctx))

buffers := observable.BufferWithTimeOrCount(WithDuration(time.Millisecond*10), 2, WithContext(ctx))

Assert(ctx, t, buffers, CustomPredicate(func(items []interface{}) error {
partialEncountered := false
for _, item := range items {
buffer := item.([]interface{})
if len(buffer) == 1 {
partialEncountered = true
}
}
if !partialEncountered {
return errors.New("at least one partial observation should have occurred")
}
return nil
}))
}

func Test_Observable_BufferWithTimeOrCount_EmitsCompleteBuffers(t *testing.T) {
defer goleak.VerifyNone(t)

ctx, cancel1 := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel1()

// straggler not emitted before context timeout
observable := Just([]struct {
name string
msDelay int
}{
{"a", 1},
{"b", 1},
{"c", 1},
{"d", 1},
{"e", 1000},
})().Map(delayItem, WithContext(ctx))

buffers := observable.BufferWithTimeOrCount(WithDuration(time.Millisecond*20), 2, WithContext(ctx))

ctx, cancel2 := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel2()

Assert(ctx, t, buffers, CustomPredicate(func(items []interface{}) error {
for _, item := range items {
buffer := item.([]interface{})
if len(buffer) != 2 {
return errors.New("items should be bundles of two")
}
}
return nil
}))
}

func Test_Observable_BufferWithTimeOrCount_CapturesStraggler(t *testing.T) {
defer goleak.VerifyNone(t)

ctx, cancel1 := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel1()

// straggler will stay in buffer until buffer timeout
observable := Just([]struct {
name string
msDelay int
}{
{"a", 1},
{"b", 1},
{"c", 1},
{"d", 1},
{"straggler", 1},
})().Map(delayItem, WithContext(ctx))

buffers := observable.BufferWithTimeOrCount(WithDuration(time.Millisecond*10), 2, WithContext(ctx))

ctx, cancel2 := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel2()

Assert(ctx, t, buffers, CustomPredicate(func(items []interface{}) error {
seen := false
for _, item := range items {
buffer := item.([]interface{})
for _, b := range buffer {
if b.(string) == "straggler" {
seen = true
}
}
}
if !seen {
return errors.New("straggler item not emitted")
}
return nil
}))
}

func Test_Observable_BufferWithTimeOrCount_DoneEmitsStraggler(t *testing.T) {
maguro marked this conversation as resolved.
Show resolved Hide resolved
defer goleak.VerifyNone(t)

ctx, cancel1 := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel1()

// straggler will stay in buffer until context times out
observable := Just([]struct {
name string
msDelay int
}{
{"a", 1},
{"b", 1},
{"c", 1},
{"d", 1},
{"straggler", 1},
})().Map(delayItem, WithContext(ctx))

buffers := observable.BufferWithTimeOrCount(WithDuration(time.Second*10), 2, WithContext(ctx))

ctx, cancel2 := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel2()

Assert(ctx, t, buffers, CustomPredicate(func(items []interface{}) error {
seen := false
for _, item := range items {
buffer := item.([]interface{})
for _, b := range buffer {
if b.(string) == "straggler" {
seen = true
}
}
}
if !seen {
return errors.New("straggler item not emitted")
}
return nil
}))
}

func Test_Observable_Contain(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -382,12 +562,12 @@ func Test_Observable_Count_Parallel(t *testing.T) {
}

// FIXME
//func Test_Observable_Debounce(t *testing.T) {
// func Test_Observable_Debounce(t *testing.T) {
// defer goleak.VerifyNone(t)
// ctx, obs, d := timeCausality(1, tick, 2, tick, 3, 4, 5, tick, 6, tick)
// Assert(ctx, t, obs.Debounce(d, WithBufferedChannel(10), WithContext(ctx)),
// HasItems(1, 2, 5, 6))
//}
// }

func Test_Observable_Debounce_Error(t *testing.T) {
defer goleak.VerifyNone(t)
Expand Down Expand Up @@ -2300,7 +2480,7 @@ func Test_Observable_WindowWithCount_InputError(t *testing.T) {
}

// FIXME
//func Test_Observable_WindowWithTime(t *testing.T) {
// func Test_Observable_WindowWithTime(t *testing.T) {
// defer goleak.VerifyNone(t)
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
Expand All @@ -2317,7 +2497,7 @@ func Test_Observable_WindowWithCount_InputError(t *testing.T) {
// observe := obs.WindowWithTime(WithDuration(10*time.Millisecond), WithBufferedChannel(10)).Observe()
// Assert(ctx, t, (<-observe).V.(Observable), HasItems(1, 2))
// Assert(ctx, t, (<-observe).V.(Observable), HasItems(3))
//}
// }

func Test_Observable_WindowWithTimeOrCount(t *testing.T) {
defer goleak.VerifyNone(t)
Expand Down