Skip to content

Commit

Permalink
Find operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah authored Oct 4, 2020
1 parent f3c50c1 commit a2dd674
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
* [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable
* [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable
* [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test
* [Find](doc/find.md) — emit the first item passing a predicate then complete
* [First](doc/first.md)/[FirstOrDefault](doc/firstordefault.md) — emit only the first item or the first item that meets a condition, from an Observable
* [IgnoreElements](doc/ignoreelements.md) — do not emit any items from an Observable but mirror its termination notification
* [Last](doc/last.md)/[LastOrDefault](doc/lastordefault.md) — emit only the last item emitted by an Observable
Expand Down
35 changes: 35 additions & 0 deletions doc/find.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Find Operator

## Overview

Emit the first item passing a predicate then complete.

## Example

```go
observable := rxgo.Just(1, 2, 3)().Find(func(i interface{}) bool {
return i == 2
})
```

Output:

```
2
```

## Options

* [WithBufferedChannel](options.md#withbufferedchannel)

* [WithContext](options.md#withcontext)

* [WithPool](options.md#withpool)

* [WithCPUPool](options.md#withcpupool)

* [WithObservationStrategy](options.md#withobservationstrategy)

* [WithErrorStrategy](options.md#witherrorstrategy)

* [WithPublishStrategy](options.md#withpublishstrategy)
1 change: 1 addition & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Observable interface {
Error(opts ...Option) error
Errors(opts ...Option) []error
Filter(apply Predicate, opts ...Option) Observable
Find(find Predicate, opts ...Option) OptionalSingle
First(opts ...Option) OptionalSingle
FirstOrDefault(defaultValue interface{}, opts ...Option) Single
FlatMap(apply ItemToObservable, opts ...Option) Observable
Expand Down
30 changes: 30 additions & 0 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,36 @@ func (op *filterOperator) end(_ context.Context, _ chan<- Item) {
func (op *filterOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// Find emits the first item passing a predicate then complete.
func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle {
return optionalSingle(o, func() operator {
return &findOperator{
find: find,
}
}, true, true, opts...)
}

type findOperator struct {
find Predicate
}

func (op *findOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
if op.find(item.V) {
item.SendContext(ctx, dst)
operatorOptions.stop()
}
}

func (op *findOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *findOperator) end(_ context.Context, _ chan<- Item) {
}

func (op *findOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// First returns new Observable which emit only first item.
// Cannot be run in parallel.
func (o *ObservableImpl) First(opts ...Option) OptionalSingle {
Expand Down
20 changes: 20 additions & 0 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,26 @@ func Test_Observable_Filter_Parallel(t *testing.T) {
Assert(ctx, t, obs, HasItemsNoOrder(2, 4), HasNoError())
}

func Test_Observable_Find_NotEmpty(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable(ctx, 1, 2, 3).Find(func(i interface{}) bool {
return i == 2
})
Assert(ctx, t, obs, HasItem(2))
}

func Test_Observable_Find_Empty(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := Empty().Find(func(_ interface{}) bool {
return true
})
Assert(ctx, t, obs, IsEmpty())
}

func Test_Observable_First_NotEmpty(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit a2dd674

Please sign in to comment.