Skip to content

Commit

Permalink
Delay the start of the "ticks" after the first request.
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Mar 25, 2021
1 parent 23a3c74 commit 7844dea
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public AssertSubscriber<T> await() {
* Awaits for the next item.
* If no item have been received before the default timeout, an {@link AssertionError} is thrown.
* <p>
Note that it requests one item from the upstream.
* Note that it requests one item from the upstream.
*
* @return this {@link AssertSubscriber}
* @see #awaitNextItems(int, int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public IntervalMulti(
@Override
public void subscribe(MultiSubscriber<? super Long> actual) {
IntervalRunnable runnable = new IntervalRunnable(actual, period, initialDelay, executor);

actual.onSubscribe(runnable);
runnable.start();

// Only start the ticks when we get the first request.
}

static final class IntervalRunnable implements Runnable, Subscription {
Expand All @@ -54,6 +52,7 @@ static final class IntervalRunnable implements Runnable, Subscription {
private final Duration initialDelay;
private final ScheduledExecutorService executor;
private volatile boolean cancelled;
private volatile boolean once = true;

private final AtomicLong count = new AtomicLong();
private ScheduledFuture<?> future;
Expand Down Expand Up @@ -105,6 +104,10 @@ public void request(long n) {
if (n > 0) {
Subscriptions.add(requested, n);
}
if (once) {
start();
once = false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -48,6 +49,24 @@ public void testIntervalOfAFewMillis() {
}
}

@Test
public void testThatTicksStartAfterRequest() {
AssertSubscriber<Long> subscriber = AssertSubscriber.create();

Multi.createFrom().ticks()
.every(Duration.ofMillis(100))
.onItem().transform(l -> System.currentTimeMillis())
.subscribe().withSubscriber(subscriber);

await().pollDelay(Duration.ofMillis(500)).untilAsserted(subscriber::assertNotTerminated);

subscriber.request(100);
subscriber.awaitNextItems(10)
.cancel();

subscriber.assertNotTerminated();
}

@Test
public void testWithInfraExecutorAndNoDelay() throws InterruptedException {
AssertSubscriber<Long> subscriber = AssertSubscriber.create(Long.MAX_VALUE);
Expand Down

0 comments on commit 7844dea

Please sign in to comment.