Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay the start of the "ticks" after the first request. #514

Merged
merged 1 commit into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public <T> Multi<T> optional(Supplier<Optional<T>> supplier) {

/**
* Like {@link #emitter(Consumer, BackPressureStrategy)} with the {@link BackPressureStrategy#BUFFER} strategy.
*
* <p>
* Note that to create hot streams, you should use a
* {@link io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor}.
*
Expand All @@ -383,10 +383,10 @@ public <T> Multi<T> emitter(Consumer<MultiEmitter<? super T>> consumer) {

/**
* Like {@link #emitter(Consumer)} with the {@link BackPressureStrategy#BUFFER} strategy and the given buffer size.
*
* <p>
* Note that to create hot streams, you should use a
* {@link io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor}.
*
* <p>
* If the buffer is full, a {@link java.nio.BufferOverflowException} in propagated downstream.
*
* @param consumer the consumer receiving the emitter, must not be {@code null}
Expand All @@ -412,7 +412,7 @@ public <T> Multi<T> emitter(Consumer<MultiEmitter<? super T>> consumer, int buff
* to unregister the listener on cancellation.
* <p>
* If the consumer throws an exception, a failure event with the exception is fired.
*
* <p>
* Note that to create hot streams, you should use a
* {@link io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor}.
*
Expand Down Expand Up @@ -504,6 +504,9 @@ public <T> Multi<T> empty() {
* Creates a {@link Multi} that emits {@code long} items (ticks) starting with 0 and incrementing at
* specified time intervals.
* <p>
* The <em>timer</em> starts at the first request. Once this request is received, the produced stream is a hot
* stream.
* <p>
* Be aware that if the subscriber does not request enough items in time, a back pressure failure is fired.
* The produced {@link Multi} never completes until cancellation by the subscriber.
* <p>
Expand Down Expand Up @@ -539,7 +542,7 @@ public Multi<Integer> range(int startInclusive, int endExclusive) {
* completes, fails or when the subscriber cancels the subscription, a finalizer is called to <em>close</em> the
* resource. This cleanup process can be either synchronous and asynchronous, as well as distinct for each type of
* event.
*
* <p>
* This method can be seen as a reactive version of the "try/finally" construct.
*
* @param resourceSupplier a supplier called for each subscriber to generate the resource, must not be {@code null}.
Expand Down Expand Up @@ -568,11 +571,11 @@ public <R, I> MultiResource<R, I> resource(Supplier<? extends R> resourceSupplie
* items from this {@link Publisher}. When the stream completes, fails or when the subscriber cancels the
* subscription, a finalizer is called to <em>close</em> the resource. This cleanup process can be either
* synchronous and asynchronous, as well as distinct for each type of event.
*
* <p>
* If the Uni produced by the {@code resourceSupplier} emits a failure, the failure is propagated downstream.
* If the Uni produced by the {@code resourceSupplier} does not emit an item before downstream cancellation, the
* resource creation is cancelled.
*
* <p>
* This method can be seen as a reactive version of the "try/finally" construct.
*
* @param resourceSupplier a supplier called for each subscriber to generate the resource, must not be {@code null}.
Expand Down Expand Up @@ -602,7 +605,7 @@ public <R, I> MultiResourceUni<R, I> resourceFromUni(Supplier<Uni<R>> resourceSu
* Items are being generated based on subscription requests.
* Requesting {@link Long#MAX_VALUE} items can possibly make for an infinite stream unless the generator function calls
* {@link GeneratorEmitter#complete()} at some point.
*
*
* @param initialStateSupplier a supplier for the initial state, must not be {@code null} but can supply {@code null}
* @param generator the generator function, returns the new state for the next item generation
* @param <S> the state type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public AssertSubscriber<T> await() {
* Awaits for the next item.
* If no item have been received before the default timeout, an {@link AssertionError} is thrown.
* <p>
Note that it requests one item from the upstream.
* Note that it requests one item from the upstream.
*
* @return this {@link AssertSubscriber}
* @see #awaitNextItems(int, int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public IntervalMulti(
@Override
public void subscribe(MultiSubscriber<? super Long> actual) {
IntervalRunnable runnable = new IntervalRunnable(actual, period, initialDelay, executor);

actual.onSubscribe(runnable);
runnable.start();

// Only start the ticks when we get the first request.
}

static final class IntervalRunnable implements Runnable, Subscription {
Expand All @@ -54,6 +52,7 @@ static final class IntervalRunnable implements Runnable, Subscription {
private final Duration initialDelay;
private final ScheduledExecutorService executor;
private volatile boolean cancelled;
private volatile boolean once = true;

private final AtomicLong count = new AtomicLong();
private ScheduledFuture<?> future;
Expand Down Expand Up @@ -105,6 +104,10 @@ public void request(long n) {
if (n > 0) {
Subscriptions.add(requested, n);
}
if (once) {
start();
once = false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,12 @@ public void testAwaitNextItems() {
assertThatThrownBy(() -> Multi.createFrom().empty()
.subscribe().withSubscriber(AssertSubscriber.create(1))
.awaitNextItems(2, 1)).isInstanceOf(AssertionError.class)
.hasMessageContaining("completion").hasMessageContaining("item");
.hasMessageContaining("completion").hasMessageContaining("item");

assertThatThrownBy(() -> Multi.createFrom().empty()
.subscribe().withSubscriber(AssertSubscriber.create(1))
.awaitNextItems(2, 1, Duration.ofSeconds(1))).isInstanceOf(AssertionError.class)
.hasMessageContaining("completion").hasMessageContaining("item");
.hasMessageContaining("completion").hasMessageContaining("item");

// Already failed
assertThatThrownBy(() -> Multi.createFrom().failure(new TestException())
Expand All @@ -507,14 +507,14 @@ public void testAwaitNextItems() {
assertThatThrownBy(() -> Multi.createFrom().failure(new TestException())
.subscribe().withSubscriber(AssertSubscriber.create(1))
.awaitNextItems(2, 1)).isInstanceOf(AssertionError.class)
.hasMessageContaining("failure").hasMessageContaining("item")
.hasMessageContaining(TestException.class.getName());
.hasMessageContaining("failure").hasMessageContaining("item")
.hasMessageContaining(TestException.class.getName());

assertThatThrownBy(() -> Multi.createFrom().failure(new TestException())
.subscribe().withSubscriber(AssertSubscriber.create(1))
.awaitNextItems(2, 1, Duration.ofSeconds(1))).isInstanceOf(AssertionError.class)
.hasMessageContaining("failure").hasMessageContaining("item")
.hasMessageContaining(TestException.class.getName());
.hasMessageContaining("failure").hasMessageContaining("item")
.hasMessageContaining(TestException.class.getName());

// Completion instead of item
assertThatThrownBy(() -> Multi.createFrom().emitter(e -> e.emit(1).complete())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -48,6 +49,24 @@ public void testIntervalOfAFewMillis() {
}
}

@Test
public void testThatTicksStartAfterRequest() {
AssertSubscriber<Long> subscriber = AssertSubscriber.create();

Multi.createFrom().ticks()
.every(Duration.ofMillis(100))
.onItem().transform(l -> System.currentTimeMillis())
.subscribe().withSubscriber(subscriber);

await().pollDelay(Duration.ofMillis(500)).untilAsserted(subscriber::assertNotTerminated);

subscriber.request(100);
subscriber.awaitNextItems(10)
.cancel();

subscriber.assertNotTerminated();
}

@Test
public void testWithInfraExecutorAndNoDelay() throws InterruptedException {
AssertSubscriber<Long> subscriber = AssertSubscriber.create(Long.MAX_VALUE);
Expand Down