Skip to content

Commit

Permalink
Implement throttle for Source(Flow)WithContext (#29107)
Browse files Browse the repository at this point in the history
  • Loading branch information
xoyo24 authored Jun 8, 2020
1 parent 34b9a26 commit 6328e0a
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.javadsl;

import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.ThrottleMode;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class FlowWithContextThrottleTest extends StreamTest {

public FlowWithContextThrottleTest() {
super(actorSystemResource);
}

@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf());

@Test
public void mustWorksForTwoStreams() throws Exception {
final FlowWithContext<Integer, String, Integer, String, NotUsed> sharedThrottle =
FlowWithContext.<Integer, String>create()
.throttle(1, java.time.Duration.ofDays(1), 1, (a) -> 1, ThrottleMode.enforcing());

CompletionStage<List<Pair<Integer, String>>> result1 =
Source.single(new Pair<>(1, "context-a"))
.via(sharedThrottle.asFlow())
.via(sharedThrottle.asFlow())
.runWith(Sink.seq(), system);

// If there is accidental shared state then we would not be able to pass through the single
// element
List<Pair<Integer, String>> pairs1 = result1.toCompletableFuture().get(3, TimeUnit.SECONDS);

assertEquals(1, pairs1.size());
assertEquals(Integer.valueOf(1), pairs1.get(0).first());
assertEquals("context-a", pairs1.get(0).second());

// It works with a new stream, too
CompletionStage<List<Pair<Integer, String>>> result2 =
Source.single(new Pair<>(2, "context-b"))
.via(sharedThrottle.asFlow())
.via(sharedThrottle.asFlow())
.runWith(Sink.seq(), system);

List<Pair<Integer, String>> pairs2 = result2.toCompletableFuture().get(3, TimeUnit.SECONDS);

assertEquals(1, pairs2.size());
assertEquals(Integer.valueOf(2), pairs2.get(0).first());
assertEquals("context-b", pairs2.get(0).second());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.javadsl;

import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.ThrottleMode;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class SourceWithContextThrottleTest extends StreamTest {

public SourceWithContextThrottleTest() {
super(actorSystemResource);
}

@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf());

@Test
public void mustBeAbleToUseThrottle() throws Exception {
List<Pair<Integer, String>> list =
Arrays.asList(
new Pair<>(0, "context-a"), new Pair<>(1, "context-b"), new Pair<>(2, "context-c"));
Pair<Integer, String> result =
SourceWithContext.fromPairs(Source.from(list))
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping())
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing())
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(list.get(0), result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.scaladsl

import akka.stream.ThrottleMode.Shaping
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink

import scala.concurrent.duration._

class FlowWithContextThrottleSpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 2
akka.stream.materializer.max-input-buffer-size = 2
""") {

private def toMessage(i: Int) = Message(s"data-$i", i.toLong)

private def genMessage(length: Int, i: Int) = Message("a" * length, i.toLong)

"throttle() on FlowWithContextOps" must {
"on FlowWithContext" must {
"work for the happy case" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(19, 1000.millis, -1, Shaping)
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))

Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}

"accept very high rates" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(1, 1.nanos, 0, Shaping)
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))

Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}

"accept very low rates" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(1, 100.days, 1, Shaping)
val input = (1 to 5).map(toMessage)
val expected = (input.head, input.head.offset)

Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNext(expected)
.expectNoMessage(100.millis)
.cancel() // We won't wait 100 days, sorry
}

"emit single element per tick" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Message]()
val downstream = TestSubscriber.probe[(Message, Long)]()
val throttle = FlowWithContext[Message, Long].throttle(1, 300.millis, 0, Shaping)

Source
.fromPublisher(upstream)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(Sink.fromSubscriber(downstream))

downstream.request(20)
upstream.sendNext(Message("a", 1L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("a", 1L), 1L))

upstream.sendNext(Message("b", 2L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("b", 2L), 2L))

upstream.sendComplete()
downstream.expectComplete()
}

"emit elements according to cost" in assertAllStagesStopped {
val list = (1 to 4).map(i => genMessage(i * 2, i))
val throttle = FlowWithContext[Message, Long].throttle(2, 200.millis, 0, _.data.length, Shaping)

Source(list)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.map(_._1)
.runWith(TestSink.probe[Message])
.request(4)
.expectNext(list(0))
.expectNoMessage(300.millis)
.expectNext(list(1))
.expectNoMessage(500.millis)
.expectNext(list(2))
.expectNoMessage(700.millis)
.expectNext(list(3))
.expectComplete()
}
}

"on SourceWithContext" must {
"work for the happy case" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))

Source(input)
.asSourceWithContext(m => m.offset)
.throttle(19, 1000.millis, -1, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}

"accept very high rates" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))

Source(input)
.asSourceWithContext(m => m.offset)
.throttle(1, 1.nanos, 0, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}

"accept very low rates" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = (input.head, input.head.offset)

Source(input)
.asSourceWithContext(m => m.offset)
.throttle(1, 100.days, 1, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNext(expected)
.expectNoMessage(100.millis)
.cancel() // We won't wait 100 days, sorry
}

"emit single element per tick" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Message]()
val downstream = TestSubscriber.probe[(Message, Long)]()

Source
.fromPublisher(upstream)
.asSourceWithContext(m => m.offset)
.throttle(1, 300.millis, 0, Shaping)
.asSource
.runWith(Sink.fromSubscriber(downstream))

downstream.request(20)
upstream.sendNext(Message("a", 1L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("a", 1L), 1L))

upstream.sendNext(Message("b", 2L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("b", 2L), 2L))

upstream.sendComplete()
downstream.expectComplete()
}

"emit elements according to cost" in assertAllStagesStopped {
val list = (1 to 4).map(i => genMessage(i * 2, i))

Source(list)
.asSourceWithContext(m => m.offset)
.throttle(2, 200.millis, 0, _.data.length, Shaping)
.asSource
.map(_._1)
.runWith(TestSink.probe[Message])
.request(4)
.expectNext(list(0))
.expectNoMessage(300.millis)
.expectNext(list(1))
.expectNoMessage(500.millis)
.expectNext(list(2))
.expectNoMessage(700.millis)
.expectNext(list(3))
.expectComplete()
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.japi.{ function, Pair, Util }
import akka.stream._
import akka.util.ConstantFun
import akka.util.ccompat.JavaConverters._
import akka.util.JavaDurationConverters._

object FlowWithContext {

Expand Down Expand Up @@ -286,6 +287,50 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
marker: function.Function2[Out, CtxOut, LogMarker]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)

/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(elements: Int, per: java.time.Duration): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(elements, per.asScala))

/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
elements: Int,
per: java.time.Duration,
maximumBurst: Int,
mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(elements, per.asScala, maximumBurst, mode))

/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(cost, per.asScala, costCalculation.apply))

/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
maximumBurst: Int,
costCalculation: function.Function[Out, Integer],
mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode))

def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
scaladsl.FlowWithContext.fromTuples(
scaladsl
Expand Down
Loading

0 comments on commit 6328e0a

Please sign in to comment.