Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the Multi select and skip groups #408

Merged
merged 3 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions documentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public class RepetitionsTest {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
13 changes: 7 additions & 6 deletions documentation/src/main/jekyll/guides/filter.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
:include_dir: ../../../../src/test/java/guides/operators

When observing a `Multi`, you may want to not forward all the received items to your downstream.
To _filter_ or _select_ items, you can use `multi.transform().byFilteringItemsWith(predicate)`:
Use the `multi.select()` group to select items.
To _select_ items passing a given predicate, use `multi.select().where(predicate)`:

[source,java,indent=0]
----
include::{include_dir}/FilterTest.java[tag=filter]
----

`byFilteringItemsWith` accepts a predicate called for each item.
`where` accepts a predicate called for each item.
If the predicate returns `true`, the item propagated downstream.
Otherwise, it drops the item.

The predicated passed to `byFilteringItemsWith` is synchronous.
The `byTestingItemsWith` method provides an asynchronous version:
The predicated passed to `where` is synchronous.
The `when` method provides an asynchronous version:

[source,java,indent=0]
----
include::{include_dir}/FilterTest.java[tag=test]
----

`byFilteringItemsWith` accepts a function called for each item.
Unlike `byFilteringItemsWith` where the predicate returns a boolean synchronously, the function returns a `Uni<Boolean>`.
`when` accepts a function called for each item.
Unlike `when` where the predicate returns a boolean synchronously, the function returns a `Uni<Boolean>`.
It forwards the item downstream if the `uni` produced by the function emits `true`.
Otherwise, it drops the item.
24 changes: 15 additions & 9 deletions documentation/src/main/jekyll/guides/repetitions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
:include_dir: ../../../../src/test/java/guides/operators

When observing a `Multi`, you may see duplicated items or repetitions.
Mutiny has operators to removes these duplicates.
The `multi.select()` and `multi.skip()` groups provide methods to only select distinct items or drop repetitions.

== Removing duplicates
== Selecting distinct

The `.transform().byDroppingDuplicates()` operator removes all the duplicates.
The `.select().distinct()` operator removes all the duplicates.
As a result, the downstream only contains distinct items:

[source,java,indent=0]
Expand All @@ -18,23 +18,29 @@ include::{include_dir}/RepetitionsTest.java[tag=distinct]
----

If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items.
Applying `.transform().byDroppingDuplicates()` on such stream produces:
Applying `.select().distinct()` on such stream produces:
{1, 2, 3, 4, 5, 6}.

IMPORTANT: Do not use `.transform().byDroppingDuplicates()` on large or infinite streams.
IMPORTANT: Do not use `.select().distinct()` on large or infinite streams.
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
The operator keeps a reference on all the emitted items, and so, it could lead to memory issues if the stream contains too many distinct items.

== Removing repetitions
TIP: By default, `select().distinct()` uses the `hashCode` method from the item's class.
You can pass a custom comparator for more advanced checks.

The `.transform().byDroppingRepetitions()` operator removes subsequent repetition of an item:
== Skipping repetitions

The `.skip().repetitions()` operator removes subsequent repetitions of an item:

[source,java,indent=0]
----
include::{include_dir}/RepetitionsTest.java[tag=repetition]
----

If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items.
Applying `.transform().byDroppingRepetitions()` on such stream produces:
Applying `.skip().repetitions()` on such stream produces:
{1, 2, 3, 4, 5, 6, 1, 4}.

Unlike `.transform().byDroppingDuplicates()`, you can use this operator on large or infinite streams.
Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams.

TIP: By default, `skip().repetitions()` uses the `equals` method from the item's class.
You can pass a custom comparator for more advanced checks.
45 changes: 27 additions & 18 deletions documentation/src/main/jekyll/guides/take-skip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,84 +11,93 @@ Multi provides the ability to:
* skip items from the beginning of the multi,
* skip the last items.
== Taking items
These actions are available from the `multi.select()` and `multi.skip()` groups, allowing to, respectively, select and skip
items from upstream.

The `multi.transform().byTakingFirstItems` method forwards on the _n_ **first** items from the multi.
== Selecting items

The `multi.select().first` method forwards on the _n_ **first** items from the multi.
It forwards that amount of items and then sends the completion signal.
It also cancels the upstream subscription.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-first]
include::{include_dir}/SelectAndSkipTest.java[tag=take-first]
----

NOTE: The `select().first()` method selects only the first item.

If the observed multi emits fewer items, it sends the completion event when the upstream completes.

Similarly, The `multi.transform().byTakingLastItems` operator forwards on the _n_ **last** items from the multi.
Similarly, The `multi.select().last` operator forwards on the _n_ **last** items from the multi.
It discards all the items emitted beforehand.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-last]
include::{include_dir}/SelectAndSkipTest.java[tag=take-last]
----

The `multi.transform().byTakingItemsWhile` operator forwards the items while the passed predicate returns `true`:
NOTE: The `select().last()` method selects only the last item.

The `multi.select().first(Predicate)` operator forwards the items while the passed predicate returns `true`:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-while]
include::{include_dir}/SelectAndSkipTest.java[tag=take-while]
----

It calls the predicates for each item.
Once the predicate returns `false`, it stops forwarding the items downstream.
It also sends the completion event and cancels the upstream subscription.

Finally, `multi.transform().byTakingItemsFor` operator picks the first items for a given period.
Finally, `multi.select().first(Duration)` operator picks the first items emitted during a given period.
Once the passed duration expires, it sends the completion event and cancels the upstream subscription.
If the observes multi completes before the passed duration, it sends the completion event.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=take-for]
include::{include_dir}/SelectAndSkipTest.java[tag=take-for]
----

== Skipping items

You can also skip items.
You can also skip items using `multi.skip()`.

The `multi.transform().bySkippingFirstItems` method skips the _n_ **first** items from the multi.
The `multi.skip().first(n)` method skips the _n_ **first** items from the multi.
It forwards all the remaining items and sends the completion event when the upstream multi completes.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-first]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-first]
----

If the observed multi emits fewer items, it sends the completion event without emitting any items.

Similarly, The `multi.transform().bySkippingLastItems` operator skips on the _n_ **last** items from the multi:
NOTE: `skip().last()` drops the very last item only.

Similarly, The `multi.skip().last(n)` operator skips on the _n_ **last** items from the multi:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-last]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-last]
----

The `multi.transform().bySkippingItemsWhile` operator skips the items while the passed predicate returns `true`:
The `multi.skip().first(Predicate)` operator skips the items while the passed predicate returns `true`:

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-while]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-while]
----

It calls the predicates for each item.
Once the predicate returns `false`, it stops discarding the items and starts forwarding downstream.

Finally, `multi.transform().bySkippingItemsFor` operator skips the first items for a given period.
Finally, `multi.skip().first(Duration)` operator skips the first items for a given period.
Once the passed duration expires, it sends the items emitted after the deadline downstream.
If the observes multi completes before the passed duration, it sends the completion event.

[source,java,indent=0]
----
include::{include_dir}/TakeTest.java[tag=skip-for]
include::{include_dir}/SelectAndSkipTest.java[tag=skip-for]
----

4 changes: 2 additions & 2 deletions documentation/src/test/java/guides/CreatingMultiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void pipeline(SystemOut out) {
// tag::pipeline[]
Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().transform(i -> i * 2)
.transform().byTakingFirstItems(3)
.select().first(3)
.onFailure().recoverWithItem(0)
.subscribe().with(System.out::println);
// end::pipeline[]
Expand Down Expand Up @@ -109,7 +109,7 @@ public void creation() {
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(100));
// end::ticks[]
BlockingIterable<Long> longs = ticks
.transform().byTakingFirstItems(3)
.select().first(3)
.subscribe().asIterable();
await().until(() -> longs.stream().count() == 3);
}
Expand Down
20 changes: 10 additions & 10 deletions documentation/src/test/java/guides/operators/FilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public void filter() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::filter[]
List<Integer> list = multi
.transform().byFilteringItemsWith(i -> i > 6)
.select().where(i -> i > 6)
.collect().asList()
.await().indefinitely();
// end::filter[]

// tag::test[]
List<Integer> list2 = multi
.transform().byTestingItemsWith(i -> Uni.createFrom().item(i > 6))
.select().when(i -> Uni.createFrom().item(i > 6))
.collect().asList()
.await().indefinitely();
// end::test[]
Expand All @@ -36,17 +36,17 @@ public void take() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::take[]
List<Integer> list = multi
.transform().byTakingFirstItems(2)
.select().first(2)
.collect().asList()
.await().indefinitely();

List<Integer> list2 = multi
.transform().byTakingItemsWhile(i -> i < 3)
.select().first(i -> i < 3)
.collect().asList()
.await().indefinitely();

List<Integer> list3 = multi
.transform().byTakingLastItems(2)
.select().last(2)
.collect().asList()
.await().indefinitely();
// end::take[]
Expand All @@ -60,17 +60,17 @@ public void skip() {
Multi<Integer> multi = Multi.createFrom().range(1, 11);
// tag::skip[]
List<Integer> list = multi
.transform().bySkippingFirstItems(8)
.skip().first(8)
.collect().asList()
.await().indefinitely();

List<Integer> list2 = multi
.transform().bySkippingItemsWhile(i -> i < 9)
.skip().first(i -> i < 9)
.collect().asList()
.await().indefinitely();

List<Integer> list3 = multi
.transform().bySkippingLastItems(8)
.skip().last(8)
.collect().asList()
.await().indefinitely();
// end::skip[]
Expand All @@ -84,14 +84,14 @@ public void distinct() {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ public void distinct() {
Multi<Integer> multi = Multi.createFrom().items(1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4);
// tag::distinct[]
List<Integer> list = multi
.transform().byDroppingDuplicates()
.select().distinct()
.collect().asList()
.await().indefinitely();
// end::distinct[]

// tag::repetition[]
List<Integer> list2 = multi
.transform().byDroppingRepetitions()
.skip().repetitions()
.collect().asList()
.await().indefinitely();
// end::repetition[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

import static org.assertj.core.api.Assertions.assertThat;

public class TakeTest {
public class SelectAndSkipTest {

@Test
public void testTake() {
public void testSelect() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9);

// tag::take-first[]
Multi<Integer> firstThreeItems = multi.transform().byTakingFirstItems(3);
Multi<Integer> firstThreeItems = multi.select().first(3);
// end::take-first[]

// tag::take-last[]
Multi<Integer> lastThreeItems = multi.transform().byTakingLastItems(3);
Multi<Integer> lastThreeItems = multi.select().last(3);
// end::take-last[]

// tag::take-while[]
Multi<Integer> takeWhile = multi.transform().byTakingItemsWhile(i -> i < 4);
Multi<Integer> takeWhile = multi.select().first(i -> i < 4);
// end::take-while[]

// tag::take-for[]
Multi<Integer> takeForDuration = multi.transform().byTakingItemsFor(Duration.ofSeconds(1));
Multi<Integer> takeForDuration = multi.select().first(Duration.ofSeconds(1));
// end::take-for[]

assertThat(firstThreeItems.collect().asList().await().indefinitely()).containsExactly(1, 2, 3);
Expand All @@ -41,19 +41,19 @@ public void testSkip() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9);

// tag::skip-first[]
Multi<Integer> skipThreeItems = multi.transform().bySkippingFirstItems(3);
Multi<Integer> skipThreeItems = multi.skip().first(3);
// end::skip-first[]

// tag::skip-last[]
Multi<Integer> skipLastThreeItems = multi.transform().bySkippingLastItems(3);
Multi<Integer> skipLastThreeItems = multi.skip().last(3);
// end::skip-last[]

// tag::skip-while[]
Multi<Integer> skipWhile = multi.transform().bySkippingItemsWhile(i -> i < 4);
Multi<Integer> skipWhile = multi.skip().first(i -> i < 4);
// end::skip-while[]

// tag::skip-for[]
Multi<Integer> skipForDuration = multi.transform().bySkippingItemsFor(Duration.ofSeconds(1));
Multi<Integer> skipForDuration = multi.skip().first(Duration.ofSeconds(1));
// end::skip-for[]

assertThat(skipThreeItems.collect().asList().await().indefinitely()).containsExactly(4, 5, 6, 7, 8, 9);
Expand Down
Loading