diff --git a/README.adoc b/README.adoc index b88887508..d8930156f 100644 --- a/README.adoc +++ b/README.adoc @@ -141,6 +141,7 @@ without operational burden or harming the clusters performance ** Efficient individual message acknowledgement system (without local or third system state) to massively reduce message replay upon failure * Per `key` concurrent processing, per partition and unordered message processing * Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries +* <> (self scaling thread pool) * Vert.x non-blocking library integration (HTTP currently) * Fair partition traversal * Zero~ dependencies (`Slf4j` and `Lombok`) for the core module @@ -345,8 +346,9 @@ NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disa After this setup, one then has the choice of interfaces: -* `ParallelStreamProcessor` -* `VertxParallelStreamProcessor` +* `ParallelStreamProcessor` - <> +* `AutoScalingProcessor` - <> +* `VertxParallelStreamProcessor` - <> * `JStreamParallelStreamProcessor` * `JStreamVertxParallelStreamProcessor` @@ -368,6 +370,7 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc: */ ---- +[[core]] === Core ==== Simple Message Process @@ -410,6 +413,48 @@ You have the option to either use callbacks to be notified of events, or use the In future versions, we plan to look at supporting other streaming systems like https://github.com/ReactiveX/RxJava[RxJava] via modules. +[[auto-scaling]] +=== Automatic Concurrency Scaling (self configuring) + +`AutoScalingProcessor` + +Prevents the user from having to choose a level of concurrency with either the core engine or the vertx engine. +Selecting the correct concurrency level in a system can be extremely complicated. +Especially as the system evolves, or the systems integrated with change. +By implementing a self tuning system, this burden is removed, and the system itself continuously tunes itself to adapt to the conditions it's running in. + +Netflix's https://medium.com/@NetflixTechBlog/performance-under-load-3e6fa9a60581[blog post] on their project has a great explanation of the concepts. +To summarises, the system executes on a special thread pool, which measures the time taken for the jobs to run. +The thread pool is then gradually expanded, until it measures that the time taken to execute a job starts to degrade, or the job actually fails. +At which point it contracts the thread-pool, and continues measuring. + +WARNING:: This approach may cause more duplicates in integrated systems when jobs timeout. If minimising failure or duplication is a concern, either stick to <> or use a https://github.com/Netflix/concurrency-limits/tree/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit[limit] that has a https://github.com/Netflix/concurrency-limits/blob/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java#L114[maximum setting] available. + +This uses concepts from https://en.wikipedia.org/wiki/TCP_congestion_control#Congestion_window[TCP congestion control] to set the thread pool size by measuring performance of execution. +This was originally planned to be done from near scratch, using concepts from https://en.wikipedia.org/wiki/Control_theory#PID_feedback_control[Engineering Control Theory], but Netflix has developed a library for the exact same thing with it's https://github.com/Netflix/concurrency-limits[concurrency-limits library], which uses very similar theory from http://intronetworks.cs.luc.edu/1/html/newtcps.html#tcp-vegas[TCP congestion] http://pages.cs.wisc.edu/~akella/CS740/F08/740-Papers/BOP94.pdf[control theory]. + +==== Dependency + +This functionality is in a seperate module, in order to keep to the policy that the `core` should have no dependencies within reason. + +.Dependency ArtifactId +[source,xml,indent=0] + parallel-consumer-core-auto-scale + +==== Usage + +Simply construct the `AutoScalingProcessor` The changes are all under the hood, the system otherwise operates as normal. + +.Auto Scaling Module +[source,java,indent=0] + parallelConsumer = new AutoScalingProcessor<>(consumerSpy, producerSpy, parallelConsumerOptions); + +If you wish to see the `concurrency-limit` engine doing it's work, turn on logging: + +.Limit logging +[source,xml,indent=0] + + [[http-with-vertx]] === HTTP with the Vert.x Module diff --git a/parallel-consumer-core-auto-scale/pom.xml b/parallel-consumer-core-auto-scale/pom.xml new file mode 100644 index 000000000..5dc591d02 --- /dev/null +++ b/parallel-consumer-core-auto-scale/pom.xml @@ -0,0 +1,51 @@ + + + + + io.confluent.parallelconsumer + parallel-consumer-parent + 0.1-SNAPSHOT + + + + parallel-consumer-core-auto-scale + + + 4.0.0 + + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + + + org.awaitility + awaitility + test + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + tests + test + + + + + com.netflix.concurrency-limits + concurrency-limits-core + 0.3.6 + + + + + + \ No newline at end of file diff --git a/parallel-consumer-core-auto-scale/src/main/java/io/confluent/parallelconsumer/AutoScalingProcessor.java b/parallel-consumer-core-auto-scale/src/main/java/io/confluent/parallelconsumer/AutoScalingProcessor.java new file mode 100644 index 000000000..ae9e5dca9 --- /dev/null +++ b/parallel-consumer-core-auto-scale/src/main/java/io/confluent/parallelconsumer/AutoScalingProcessor.java @@ -0,0 +1,43 @@ +package io.confluent.parallelconsumer; + +import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +@Slf4j +public class AutoScalingProcessor extends ParallelEoSStreamProcessor { + + private SimpleLimiter executionLimitor; + private BlockingAdaptiveExecutor congestionControlledExecutor; + + public AutoScalingProcessor(org.apache.kafka.clients.consumer.Consumer consumer, + org.apache.kafka.clients.producer.Producer producer, + ParallelConsumerOptions options) { + super(consumer, producer, options); + } + + /** + * Could move this into the builder pattern to keep the final modifier on the fields + */ + @Override + protected void constructExecutor(final ParallelConsumerOptions options) { + executionLimitor = SimpleLimiter.newBuilder().limit(Gradient2Limit.newDefault()).build(); + super.workerPool = BlockingAdaptiveExecutor.newBuilder().limiter(executionLimitor).build(); + } + + protected void getWorkAndRegister(final Function, List> userFunction, final Consumer callback) { + int capacity = executionLimitor.getLimit() - executionLimitor.getInflight(); + boolean spareCapacity = capacity > 0; + if (spareCapacity) { + var records = wm.maybeGetWork(capacity); + log.trace("Loop: Submit to pool"); + submitWorkToPool(userFunction, callback, records); + } + } +} diff --git a/parallel-consumer-core-auto-scale/src/test/java/io/confluent/parallelconsumer/vertx/RegressionTest.java b/parallel-consumer-core-auto-scale/src/test/java/io/confluent/parallelconsumer/vertx/RegressionTest.java new file mode 100644 index 000000000..0d25fc5d6 --- /dev/null +++ b/parallel-consumer-core-auto-scale/src/test/java/io/confluent/parallelconsumer/vertx/RegressionTest.java @@ -0,0 +1,25 @@ +package io.confluent.parallelconsumer.vertx; + +/*- + * Copyright (C) 2020 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.AutoScalingProcessor; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest; + +/** + * Ensure all plain operations still work with the extended vertx consumer + */ +public class RegressionTest extends ParallelEoSStreamProcessorTest { + + @Override + protected ParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) { + // tag::construct[] + parallelConsumer = new AutoScalingProcessor<>(consumerSpy, producerSpy, parallelConsumerOptions); + // end::construct[] + return parallelConsumer; + } + +} diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml index 052fac053..c9fbd9ff5 100644 --- a/parallel-consumer-core/pom.xml +++ b/parallel-consumer-core/pom.xml @@ -27,6 +27,11 @@ kafka-clients ${kafka.version} + + com.netflix.concurrency-limits + concurrency-limits-core + 0.3.6 + diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index 98ffa39e6..5eb96bb5c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -4,6 +4,10 @@ * Copyright (C) 2020 Confluent, Inc. */ +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; import io.confluent.csid.utils.WallClock; import lombok.*; import lombok.extern.slf4j.Slf4j; @@ -62,7 +66,7 @@ public class ParallelEoSStreamProcessor implements ParallelStreamProcessor /** * The pool which is used for running the users's supplied function */ - private final ExecutorService workerPool; + protected Executor workerPool; private Optional> controlThreadFuture = Optional.empty(); @@ -138,7 +142,7 @@ public ParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer(options, consumer); @@ -156,6 +160,10 @@ public ParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer consumer) { if (consumer instanceof MockConsumer) // disabled for unit tests which don't test rebalancing @@ -325,7 +333,7 @@ public void close(Duration timeout, DrainingMode drainMode) { } else { log.info("Signaling to close..."); - switch (drainMode){ + switch (drainMode) { case DRAIN: log.info("Will wait for all in flight to complete before"); transitionToDraining(); @@ -378,29 +386,29 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti } producer.close(timeout); - log.debug("Shutting down execution pool..."); - List unfinished = workerPool.shutdownNow(); - if (!unfinished.isEmpty()) { - log.warn("Threads not done: {}", unfinished); - } - - log.trace("Awaiting worker pool termination..."); - boolean interrupted = true; - while (interrupted) { - log.warn("Still interrupted"); - try { - boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS); - interrupted = false; - if (!terminationFinishedWithoutTimeout) { - log.warn("workerPool await timeout!"); - boolean shutdown = workerPool.isShutdown(); - boolean terminated = workerPool.isTerminated(); - } - } catch (InterruptedException e) { - log.error("InterruptedException", e); - interrupted = true; - } - } +// log.debug("Shutting down execution pool..."); +// List unfinished = workerPool.shutdownNow(); +// if (!unfinished.isEmpty()) { +// log.warn("Threads not done: {}", unfinished); +// } + +// log.trace("Awaiting worker pool termination..."); +// boolean interrupted = true; +// while (interrupted) { +// log.warn("Still interrupted"); +// try { +// boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS); +// interrupted = false; +// if (!terminationFinishedWithoutTimeout) { +// log.warn("workerPool await timeout!"); +// boolean shutdown = workerPool.isShutdown(); +// boolean terminated = workerPool.isTerminated(); +// } +// } catch (InterruptedException e) { +// log.error("InterruptedException", e); +// interrupted = true; +// } +// } log.debug("Close complete."); this.state = closed; @@ -506,10 +514,7 @@ private void controlLoop(Function, List> userFunctio Consumer callback) throws TimeoutException, ExecutionException { if (state == running) { log.trace("Loop: Get work"); - var records = wm.maybeGetWork(); - - log.trace("Loop: Submit to pool"); - submitWorkToPool(userFunction, callback, records); + getWorkAndRegister(userFunction, callback); } log.trace("Loop: Process mailbox"); @@ -532,6 +537,12 @@ private void controlLoop(Function, List> userFunctio log.trace("End of control loop, {} remaining in work manager. In state: {}", wm.getPartitionWorkRemainingCount(), state); } + protected void getWorkAndRegister(final Function, List> userFunction, final Consumer callback) { + var records = wm.maybeGetWork(); + log.trace("Loop: Submit to pool"); + submitWorkToPool(userFunction, callback, records); + } + private void drain() { log.debug("Signaling to drain..."); brokerPollSubsystem.drain(); @@ -734,26 +745,27 @@ protected void onSuccess(WorkContainer wc) { * * @param workToProcess the polled records to process */ - private void submitWorkToPool(Function, List> usersFunction, - Consumer callback, - List> workToProcess) { + protected void submitWorkToPool(Function, List> usersFunction, + Consumer callback, + List> workToProcess) { for (var work : workToProcess) { // for each record, construct dispatch to the executor and capture a Future log.trace("Sending work ({}) to pool", work); - Future outputRecordFuture = workerPool.submit(() -> { - return userFunctionRunner(usersFunction, callback, work); - }); - work.setFuture(outputRecordFuture); + Runnable job = () -> { + runUserFunction(usersFunction, callback, work); + }; + + workerPool.execute(job); } } /** * Run the supplied function. */ - protected List, R>> userFunctionRunner(Function, List> usersFunction, - Consumer callback, - WorkContainer wc) { + protected List, R>> runUserFunction(Function, List> usersFunction, + Consumer callback, + WorkContainer wc) { // call the user's function List resultsFromUserFunction; try { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkContainer.java index 5aa71f92b..38e4b3e86 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkContainer.java @@ -45,8 +45,8 @@ public class WorkContainer implements Comparable { @Getter private static Duration retryDelay = Duration.ofSeconds(10); - @Getter - @Setter(AccessLevel.PACKAGE) +// @Getter +// @Setter(AccessLevel.PACKAGE) private Future> future; public WorkContainer(ConsumerRecord cr) { diff --git a/parallel-consumer-core/src/test/resources/logback-test.xml b/parallel-consumer-core/src/test/resources/logback-test.xml index 0c8e39099..e5389058f 100644 --- a/parallel-consumer-core/src/test/resources/logback-test.xml +++ b/parallel-consumer-core/src/test/resources/logback-test.xml @@ -51,6 +51,7 @@ + diff --git a/pom.xml b/pom.xml index 0acd8d4d6..a7837ee58 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ parallel-consumer-core + parallel-consumer-core-auto-scale parallel-consumer-vertx parallel-consumer-examples diff --git a/src/docs/README.adoc b/src/docs/README.adoc index 1611410de..50336c89b 100644 --- a/src/docs/README.adoc +++ b/src/docs/README.adoc @@ -139,6 +139,7 @@ without operational burden or harming the clusters performance ** Efficient individual message acknowledgement system (without local or third system state) to massively reduce message replay upon failure * Per `key` concurrent processing, per partition and unordered message processing * Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries +* <> (self scaling thread pool) * Vert.x non-blocking library integration (HTTP currently) * Fair partition traversal * Zero~ dependencies (`Slf4j` and `Lombok`) for the core module @@ -324,8 +325,9 @@ NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disa After this setup, one then has the choice of interfaces: -* `ParallelStreamProcessor` -* `VertxParallelStreamProcessor` +* `ParallelStreamProcessor` - <> +* `AutoScalingProcessor` - <> +* `VertxParallelStreamProcessor` - <> * `JStreamParallelStreamProcessor` * `JStreamVertxParallelStreamProcessor` @@ -337,6 +339,7 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc: include::{project_root}/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java[tag=javadoc] ---- +[[core]] === Core ==== Simple Message Process @@ -368,6 +371,48 @@ You have the option to either use callbacks to be notified of events, or use the In future versions, we plan to look at supporting other streaming systems like https://github.com/ReactiveX/RxJava[RxJava] via modules. +[[auto-scaling]] +=== Automatic Concurrency Scaling (self configuring) + +`AutoScalingProcessor` + +Prevents the user from having to choose a level of concurrency with either the core engine or the vertx engine. +Selecting the correct concurrency level in a system can be extremely complicated. +Especially as the system evolves, or the systems integrated with change. +By implementing a self tuning system, this burden is removed, and the system itself continuously tunes itself to adapt to the conditions it's running in. + +Netflix's https://medium.com/@NetflixTechBlog/performance-under-load-3e6fa9a60581[blog post] on their project has a great explanation of the concepts. +To summarises, the system executes on a special thread pool, which measures the time taken for the jobs to run. +The thread pool is then gradually expanded, until it measures that the time taken to execute a job starts to degrade, or the job actually fails. +At which point it contracts the thread-pool, and continues measuring. + +WARNING:: This approach may cause more duplicates in integrated systems when jobs timeout. If minimising failure or duplication is a concern, either stick to <> or use a https://github.com/Netflix/concurrency-limits/tree/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit[limit] that has a https://github.com/Netflix/concurrency-limits/blob/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java#L114[maximum setting] available. + +This uses concepts from https://en.wikipedia.org/wiki/TCP_congestion_control#Congestion_window[TCP congestion control] to set the thread pool size by measuring performance of execution. +This was originally planned to be done from near scratch, using concepts from https://en.wikipedia.org/wiki/Control_theory#PID_feedback_control[Engineering Control Theory], but Netflix has developed a library for the exact same thing with it's https://github.com/Netflix/concurrency-limits[concurrency-limits library], which uses very similar theory from http://intronetworks.cs.luc.edu/1/html/newtcps.html#tcp-vegas[TCP congestion] http://pages.cs.wisc.edu/~akella/CS740/F08/740-Papers/BOP94.pdf[control theory]. + +==== Dependency + +This functionality is in a seperate module, in order to keep to the policy that the `core` should have no dependencies within reason. + +.Dependency ArtifactId +[source,xml,indent=0] +include::{project_root}/parallel-consumer-core-auto-scale/pom.xml[tag=artifact] + +==== Usage + +Simply construct the `AutoScalingProcessor` The changes are all under the hood, the system otherwise operates as normal. + +.Auto Scaling Module +[source,java,indent=0] +include::{project_root}/parallel-consumer-core-auto-scale/src/test/java/io/confluent/parallelconsumer/vertx/RegressionTest.java[tag=construct] + +If you wish to see the `concurrency-limit` engine doing it's work, turn on logging: + +.Limit logging +[source,xml,indent=0] + + [[http-with-vertx]] === HTTP with the Vert.x Module