Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] Multi.forEachCS #5532

Merged
merged 1 commit into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1111,7 +1111,7 @@ default <U> U to(Function<? super Multi<T>, ? extends U> converter) {
// --------------------------------------------------------------------------------------------------------

/**
* Terminal stage, invokes provided consumer for every item in the stream.
* Terminal stage, invokes provided consumer for every item in the stream with no backpressure.
*
* @param consumer consumer to be invoked for each item
* @return Single completed when the stream terminates
Expand All @@ -1132,6 +1132,20 @@ default Single<Void> forEach(Consumer<? super T> consumer) {
return single;
}

/**
* Terminal stage, invokes provided consumer for every item in the stream with strict backpressure.
* Items are requested 1 by 1 with no prefetch always waiting for each completion stage
* to complete before requesting another item.
*
* @param function invoked for each item returning completion stage to signal asynchronous completion
* @return Single completed when the stream terminates
*/
default Single<Void> forEachCompletionStage(Function<? super T, CompletionStage<Void>> function) {
return map(function::apply)
.flatMap(cs -> Single.create(cs, true), 1, false, 1)
.ignoreElements();
}

/**
* Terminal stage, ignore all items and complete returned {@code Single<Void>} successfully or exceptionally.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2021 Oracle and/or its affiliates.
* Copyright (c) 2019, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,12 +15,15 @@
*/
package io.helidon.common.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,7 +34,9 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -42,13 +47,29 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;

/**
* {@link MultiTest} test.
*/
public class MultiTest {

private static ExecutorService exec;

@BeforeAll
static void beforeAll() {
exec = Executors.newFixedThreadPool(4);
}

@AfterAll
static void afterAll() throws InterruptedException {
exec.shutdown();
if (!exec.awaitTermination(5, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
}

@Test
public void testJust() {
MultiTestSubscriber<String> subscriber = new MultiTestSubscriber<>();
Expand Down Expand Up @@ -671,6 +692,30 @@ public void testDoubleSubscribe() {
assertThat(subscriber2.getLastError(), is(nullValue()));
}

@Test
void testForEachCompletionStage() {
List<String> flags = Collections.synchronizedList(new ArrayList<>(8));
Multi.just(200, 150, 100, 50)
.log()
.forEachCompletionStage(i -> CompletableFuture.runAsync(() -> {
flags.add("entering " + i);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
flags.add("leaving " + i);
}, exec))
.await();

assertThat(flags, contains(
"entering 200", "leaving 200",
"entering 150", "leaving 150",
"entering 100", "leaving 100",
"entering 50", "leaving 50"
));
}

private static class MultiTestSubscriber<T> extends TestSubscriber<T> {

@Override
Expand Down
3 changes: 2 additions & 1 deletion docs/includes/reactivestreams/engine.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ Single.just("1")
|flatMapOptional|Transform each upstream item with the supplied function and flatten the resulting `Optional` to the downstream as item if present.
|observeOn|Re-emit the upstream's signals to the downstream on the given executor's thread using a default buffer size of 32 and errors skipping ahead of items.
|observeOn|Re-emit the upstream's signals to the downstream on the given executor's thread.
|forEach|Terminal stage, invokes provided consumer for every item in the stream.
|forEach|Terminal stage, invokes provided consumer for every item in the stream with no backpressure.
|forEachCompletionStage|Terminal stage, invokes provided function for every item in the stream with strict backpressure, requests another item only when previous operation is finished.
|collectList|Collect the items of this `Multi` instance into a `Single` of `List`.
|collect|Collect the items of this `Multi` instance into a `Single`.
|collect|Collect the items of this `Multi` into a collection provided via a `Supplier` and mutated by a `BiConsumer` callback.
Expand Down