Skip to content

Commit

Permalink
Merge pull request #408 from smallrye/select
Browse files Browse the repository at this point in the history
Implement the Multi select and skip groups
  • Loading branch information
jponge authored Jan 4, 2021
2 parents 0998863 + c3db8c5 commit eccd9a9
Show file tree
Hide file tree
Showing 73 changed files with 2,093 additions and 476 deletions.
4 changes: 2 additions & 2 deletions documentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public class RepetitionsTest {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
13 changes: 7 additions & 6 deletions documentation/src/main/jekyll/guides/filter.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
:include_dir: ../../../../src/test/java/guides/operators

When observing a `Multi`, you may want to not forward all the received items to your downstream.
To _filter_ or _select_ items, you can use `multi.transform().byFilteringItemsWith(predicate)`:
Use the `multi.select()` group to select items.
To _select_ items passing a given predicate, use `multi.select().where(predicate)`:

[source,java,indent=0]
----
include::{include_dir}/FilterTest.java[tag=filter]
----

`byFilteringItemsWith` accepts a predicate called for each item.
`where` accepts a predicate called for each item.
If the predicate returns `true`, the item propagated downstream.
Otherwise, it drops the item.

The predicated passed to `byFilteringItemsWith` is synchronous.
The `byTestingItemsWith` method provides an asynchronous version:
The predicated passed to `where` is synchronous.
The `when` method provides an asynchronous version:

[source,java,indent=0]
----
include::{include_dir}/FilterTest.java[tag=test]
----

`byFilteringItemsWith` accepts a function called for each item.
Unlike `byFilteringItemsWith` where the predicate returns a boolean synchronously, the function returns a `Uni<Boolean>`.
`when` accepts a function called for each item.
Unlike `when` where the predicate returns a boolean synchronously, the function returns a `Uni<Boolean>`.
It forwards the item downstream if the `uni` produced by the function emits `true`.
Otherwise, it drops the item.
24 changes: 15 additions & 9 deletions documentation/src/main/jekyll/guides/repetitions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
:include_dir: ../../../../src/test/java/guides/operators

When observing a `Multi`, you may see duplicated items or repetitions.
Mutiny has operators to removes these duplicates.
The `multi.select()` and `multi.skip()` groups provide methods to only select distinct items or drop repetitions.

== Removing duplicates
== Selecting distinct

The `.transform().byDroppingDuplicates()` operator removes all the duplicates.
The `.select().distinct()` operator removes all the duplicates.
As a result, the downstream only contains distinct items:

[source,java,indent=0]
Expand All @@ -18,23 +18,29 @@ include::{include_dir}/RepetitionsTest.java[tag=distinct]
----

If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items.
Applying `.transform().byDroppingDuplicates()` on such stream produces:
Applying `.select().distinct()` on such stream produces:
{1, 2, 3, 4, 5, 6}.

IMPORTANT: Do not use `.transform().byDroppingDuplicates()` on large or infinite streams.
IMPORTANT: Do not use `.select().distinct()` on large or infinite streams.
The operator keeps a reference on all the emitted items, and so, it could lead to memory issues if the stream contains too many distinct items.

== Removing repetitions
TIP: By default, `select().distinct()` uses the `hashCode` method from the item's class.
You can pass a custom comparator for more advanced checks.

The `.transform().byDroppingRepetitions()` operator removes subsequent repetition of an item:
== Skipping repetitions

The `.skip().repetitions()` operator removes subsequent repetitions of an item:

[source,java,indent=0]
----
include::{include_dir}/RepetitionsTest.java[tag=repetition]
----

If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items.
Applying `.transform().byDroppingRepetitions()` on such stream produces:
Applying `.skip().repetitions()` on such stream produces:
{1, 2, 3, 4, 5, 6, 1, 4}.

Unlike `.transform().byDroppingDuplicates()`, you can use this operator on large or infinite streams.
Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams.

TIP: By default, `skip().repetitions()` uses the `equals` method from the item's class.
You can pass a custom comparator for more advanced checks.
45 changes: 27 additions & 18 deletions documentation/src/main/jekyll/guides/take-skip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,84 +11,93 @@ Multi provides the ability to:
* skip items from the beginning of the multi,
* skip the last items.
== Taking items
These actions are available from the `multi.select()` and `multi.skip()` groups, allowing to, respectively, select and skip
items from upstream.

The `multi.transform().byTakingFirstItems` method forwards on the _n_ **first** items from the multi.
== Selecting items

The `multi.select().first` method forwards on the _n_ **first** items from the multi.
It forwards that amount of items and then sends the completion signal.
It also cancels the upstream subscription.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-first]
include::{include_dir}/SelectAndSkipTest.java[tag=take-first]
----

NOTE: The `select().first()` method selects only the first item.

If the observed multi emits fewer items, it sends the completion event when the upstream completes.

Similarly, The `multi.transform().byTakingLastItems` operator forwards on the _n_ **last** items from the multi.
Similarly, The `multi.select().last` operator forwards on the _n_ **last** items from the multi.
It discards all the items emitted beforehand.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-last]
include::{include_dir}/SelectAndSkipTest.java[tag=take-last]
----

The `multi.transform().byTakingItemsWhile` operator forwards the items while the passed predicate returns `true`:
NOTE: The `select().last()` method selects only the last item.

The `multi.select().first(Predicate)` operator forwards the items while the passed predicate returns `true`:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-while]
include::{include_dir}/SelectAndSkipTest.java[tag=take-while]
----

It calls the predicates for each item.
Once the predicate returns `false`, it stops forwarding the items downstream.
It also sends the completion event and cancels the upstream subscription.

Finally, `multi.transform().byTakingItemsFor` operator picks the first items for a given period.
Finally, `multi.select().first(Duration)` operator picks the first items emitted during a given period.
Once the passed duration expires, it sends the completion event and cancels the upstream subscription.
If the observes multi completes before the passed duration, it sends the completion event.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-for]
include::{include_dir}/SelectAndSkipTest.java[tag=take-for]
----

== Skipping items

You can also skip items.
You can also skip items using `multi.skip()`.

The `multi.transform().bySkippingFirstItems` method skips the _n_ **first** items from the multi.
The `multi.skip().first(n)` method skips the _n_ **first** items from the multi.
It forwards all the remaining items and sends the completion event when the upstream multi completes.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-first]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-first]
----

If the observed multi emits fewer items, it sends the completion event without emitting any items.

Similarly, The `multi.transform().bySkippingLastItems` operator skips on the _n_ **last** items from the multi:
NOTE: `skip().last()` drops the very last item only.

Similarly, The `multi.skip().last(n)` operator skips on the _n_ **last** items from the multi:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-last]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-last]
----

The `multi.transform().bySkippingItemsWhile` operator skips the items while the passed predicate returns `true`:
The `multi.skip().first(Predicate)` operator skips the items while the passed predicate returns `true`:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-while]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-while]
----

It calls the predicates for each item.
Once the predicate returns `false`, it stops discarding the items and starts forwarding downstream.

Finally, `multi.transform().bySkippingItemsFor` operator skips the first items for a given period.
Finally, `multi.skip().first(Duration)` operator skips the first items for a given period.
Once the passed duration expires, it sends the items emitted after the deadline downstream.
If the observes multi completes before the passed duration, it sends the completion event.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-for]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-for]
----

4 changes: 2 additions & 2 deletions documentation/src/test/java/guides/CreatingMultiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void pipeline(SystemOut out) {
// tag::pipeline[]
Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().transform(i -> i * 2)
.transform().byTakingFirstItems(3)
.select().first(3)
.onFailure().recoverWithItem(0)
.subscribe().with(System.out::println);
// end::pipeline[]
Expand Down Expand Up @@ -109,7 +109,7 @@ public void creation() {
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(100));
// end::ticks[]
BlockingIterable<Long> longs = ticks
.transform().byTakingFirstItems(3)
.select().first(3)
.subscribe().asIterable();
await().until(() -> longs.stream().count() == 3);
}
Expand Down
20 changes: 10 additions & 10 deletions documentation/src/test/java/guides/operators/FilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public void filter() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::filter[]
List<Integer> list = multi
.transform().byFilteringItemsWith(i -> i > 6)
.select().where(i -> i > 6)
.collect().asList()
.await().indefinitely();
// end::filter[]

// tag::test[]
List<Integer> list2 = multi
.transform().byTestingItemsWith(i -> Uni.createFrom().item(i > 6))
.select().when(i -> Uni.createFrom().item(i > 6))
.collect().asList()
.await().indefinitely();
// end::test[]
Expand All @@ -36,17 +36,17 @@ public void take() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::take[]
List<Integer> list = multi
.transform().byTakingFirstItems(2)
.select().first(2)
.collect().asList()
.await().indefinitely();

List<Integer> list2 = multi
.transform().byTakingItemsWhile(i -> i < 3)
.select().first(i -> i < 3)
.collect().asList()
.await().indefinitely();

List<Integer> list3 = multi
.transform().byTakingLastItems(2)
.select().last(2)
.collect().asList()
.await().indefinitely();
// end::take[]
Expand All @@ -60,17 +60,17 @@ public void skip() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::skip[]
List<Integer> list = multi
.transform().bySkippingFirstItems(8)
.skip().first(8)
.collect().asList()
.await().indefinitely();

List<Integer> list2 = multi
.transform().bySkippingItemsWhile(i -> i < 9)
.skip().first(i -> i < 9)
.collect().asList()
.await().indefinitely();

List<Integer> list3 = multi
.transform().bySkippingLastItems(8)
.skip().last(8)
.collect().asList()
.await().indefinitely();
// end::skip[]
Expand All @@ -84,14 +84,14 @@ public void distinct() {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ public void distinct() {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

import static org.assertj.core.api.Assertions.assertThat;

public class TakeTest {
public class SelectAndSkipTest {

@Test
public void testTake() {
public void testSelect() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9);

// tag::take-first[]
Multi<Integer> firstThreeItems = multi.transform().byTakingFirstItems(3);
Multi<Integer> firstThreeItems = multi.select().first(3);
// end::take-first[]

// tag::take-last[]
Multi<Integer> lastThreeItems = multi.transform().byTakingLastItems(3);
Multi<Integer> lastThreeItems = multi.select().last(3);
// end::take-last[]

// tag::take-while[]
Multi<Integer> takeWhile = multi.transform().byTakingItemsWhile(i -> i < 4);
Multi<Integer> takeWhile = multi.select().first(i -> i < 4);
// end::take-while[]

// tag::take-for[]
Multi<Integer> takeForDuration = multi.transform().byTakingItemsFor(Duration.ofSeconds(1));
Multi<Integer> takeForDuration = multi.select().first(Duration.ofSeconds(1));
// end::take-for[]

assertThat(firstThreeItems.collect().asList().await().indefinitely()).containsExactly(1, 2, 3);
Expand All @@ -41,19 +41,19 @@ public void testSkip() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9);

// tag::skip-first[]
Multi<Integer> skipThreeItems = multi.transform().bySkippingFirstItems(3);
Multi<Integer> skipThreeItems = multi.skip().first(3);
// end::skip-first[]

// tag::skip-last[]
Multi<Integer> skipLastThreeItems = multi.transform().bySkippingLastItems(3);
Multi<Integer> skipLastThreeItems = multi.skip().last(3);
// end::skip-last[]

// tag::skip-while[]
Multi<Integer> skipWhile = multi.transform().bySkippingItemsWhile(i -> i < 4);
Multi<Integer> skipWhile = multi.skip().first(i -> i < 4);
// end::skip-while[]

// tag::skip-for[]
Multi<Integer> skipForDuration = multi.transform().bySkippingItemsFor(Duration.ofSeconds(1));
Multi<Integer> skipForDuration = multi.skip().first(Duration.ofSeconds(1));
// end::skip-for[]

assertThat(skipThreeItems.collect().asList().await().indefinitely()).containsExactly(4, 5, 6, 7, 8, 9);
Expand Down
Loading

0 comments on commit eccd9a9

Please sign in to comment.