Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Fixes #21 - Dynamic concurrency control #22

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ without operational burden or harming the clusters performance
** Efficient individual message acknowledgement system (without local or third system state) to massively reduce message replay upon failure
* Per `key` concurrent processing, per partition and unordered message processing
* Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries
* <<Automatic concurrency scaling,auto-scaling>> (self scaling thread pool)
* Vert.x non-blocking library integration (HTTP currently)
* Fair partition traversal
* Zero~ dependencies (`Slf4j` and `Lombok`) for the core module
Expand Down Expand Up @@ -345,8 +346,9 @@ NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disa

After this setup, one then has the choice of interfaces:

* `ParallelStreamProcessor`
* `VertxParallelStreamProcessor`
* `ParallelStreamProcessor` - <<core>>
* `AutoScalingProcessor` - <<auto-scaling>>
* `VertxParallelStreamProcessor` - <<http-with-vertx>>
* `JStreamParallelStreamProcessor`
* `JStreamVertxParallelStreamProcessor`

Expand All @@ -368,6 +370,7 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc:
*/
----

[[core]]
=== Core

==== Simple Message Process
Expand Down Expand Up @@ -410,6 +413,48 @@ You have the option to either use callbacks to be notified of events, or use the

In future versions, we plan to look at supporting other streaming systems like https://github.com/ReactiveX/RxJava[RxJava] via modules.

[[auto-scaling]]
=== Automatic Concurrency Scaling (self configuring)

`AutoScalingProcessor`

Prevents the user from having to choose a level of concurrency with either the core engine or the vertx engine.
Selecting the correct concurrency level in a system can be extremely complicated.
Especially as the system evolves, or the systems integrated with change.
By implementing a self tuning system, this burden is removed, and the system itself continuously tunes itself to adapt to the conditions it's running in.

Netflix's https://medium.com/@NetflixTechBlog/performance-under-load-3e6fa9a60581[blog post] on their project has a great explanation of the concepts.
To summarises, the system executes on a special thread pool, which measures the time taken for the jobs to run.
The thread pool is then gradually expanded, until it measures that the time taken to execute a job starts to degrade, or the job actually fails.
At which point it contracts the thread-pool, and continues measuring.

WARNING:: This approach may cause more duplicates in integrated systems when jobs timeout. If minimising failure or duplication is a concern, either stick to <<core>> or use a https://github.com/Netflix/concurrency-limits/tree/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit[limit] that has a https://github.com/Netflix/concurrency-limits/blob/18692b09e55a0574bea94d92e95a03c3e89012d2/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java#L114[maximum setting] available.

This uses concepts from https://en.wikipedia.org/wiki/TCP_congestion_control#Congestion_window[TCP congestion control] to set the thread pool size by measuring performance of execution.
This was originally planned to be done from near scratch, using concepts from https://en.wikipedia.org/wiki/Control_theory#PID_feedback_control[Engineering Control Theory], but Netflix has developed a library for the exact same thing with it's https://github.com/Netflix/concurrency-limits[concurrency-limits library], which uses very similar theory from http://intronetworks.cs.luc.edu/1/html/newtcps.html#tcp-vegas[TCP congestion] http://pages.cs.wisc.edu/~akella/CS740/F08/740-Papers/BOP94.pdf[control theory].

==== Dependency

This functionality is in a seperate module, in order to keep to the policy that the `core` should have no dependencies within reason.

.Dependency ArtifactId
[source,xml,indent=0]
<artifactId>parallel-consumer-core-auto-scale</artifactId>

==== Usage

Simply construct the `AutoScalingProcessor` The changes are all under the hood, the system otherwise operates as normal.

.Auto Scaling Module
[source,java,indent=0]
parallelConsumer = new AutoScalingProcessor<>(consumerSpy, producerSpy, parallelConsumerOptions);

If you wish to see the `concurrency-limit` engine doing it's work, turn on logging:

.Limit logging
[source,xml,indent=0]
<logger name="com.netflix.concurrency.limits.limit" level="debug"/>

[[http-with-vertx]]
=== HTTP with the Vert.x Module

Expand Down
51 changes: 51 additions & 0 deletions parallel-consumer-core-auto-scale/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright (C) 2020 Confluent, Inc.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-parent</artifactId>
<version>0.1-SNAPSHOT</version>
</parent>

<!-- tag::artifact[] -->
<artifactId>parallel-consumer-core-auto-scale</artifactId>
<!-- end::artifact[] -->

<modelVersion>4.0.0</modelVersion>

<dependencies>
<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!-- External-->
<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>0.3.6</version>
</dependency>

<!-- Testing -->
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.confluent.parallelconsumer;

import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor;
import com.netflix.concurrency.limits.limit.Gradient2Limit;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
public class AutoScalingProcessor<K,V> extends ParallelEoSStreamProcessor<K,V> {

private SimpleLimiter<Void> executionLimitor;
private BlockingAdaptiveExecutor congestionControlledExecutor;

public AutoScalingProcessor(org.apache.kafka.clients.consumer.Consumer<K, V> consumer,
org.apache.kafka.clients.producer.Producer<K, V> producer,
ParallelConsumerOptions options) {
super(consumer, producer, options);
}

/**
* Could move this into the builder pattern to keep the final modifier on the fields
*/
@Override
protected void constructExecutor(final ParallelConsumerOptions options) {
executionLimitor = SimpleLimiter.newBuilder().limit(Gradient2Limit.newDefault()).build();
super.workerPool = BlockingAdaptiveExecutor.newBuilder().limiter(executionLimitor).build();
}

protected <R> void getWorkAndRegister(final Function<ConsumerRecord<K, V>, List<R>> userFunction, final Consumer<R> callback) {
int capacity = executionLimitor.getLimit() - executionLimitor.getInflight();
boolean spareCapacity = capacity > 0;
if (spareCapacity) {
var records = wm.<R>maybeGetWork(capacity);
log.trace("Loop: Submit to pool");
submitWorkToPool(userFunction, callback, records);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.confluent.parallelconsumer.vertx;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/

import io.confluent.parallelconsumer.AutoScalingProcessor;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest;

/**
* Ensure all plain operations still work with the extended vertx consumer
*/
public class RegressionTest extends ParallelEoSStreamProcessorTest {

@Override
protected ParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
// tag::construct[]
parallelConsumer = new AutoScalingProcessor<>(consumerSpy, producerSpy, parallelConsumerOptions);
// end::construct[]
return parallelConsumer;
}

}
5 changes: 5 additions & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>0.3.6</version>
</dependency>

<!-- Testing -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
* Copyright (C) 2020 Confluent, Inc.
*/

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor;
import com.netflix.concurrency.limits.limit.Gradient2Limit;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
import io.confluent.csid.utils.WallClock;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -62,7 +66,7 @@ public class ParallelEoSStreamProcessor<K, V> implements ParallelStreamProcessor
/**
* The pool which is used for running the users's supplied function
*/
private final ExecutorService workerPool;
protected Executor workerPool;

private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();

Expand Down Expand Up @@ -138,7 +142,7 @@ public ParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer<K,
this.producer = producer;
this.consumer = consumer;

workerPool = Executors.newFixedThreadPool(options.getNumberOfThreads());
constructExecutor(options);

//
this.wm = new WorkManager<>(options, consumer);
Expand All @@ -156,6 +160,10 @@ public ParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer<K,
}
}

protected void constructExecutor(final ParallelConsumerOptions options) {
workerPool = Executors.newFixedThreadPool(options.getNumberOfThreads());
}

private void checkNotSubscribed(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
if (consumer instanceof MockConsumer)
// disabled for unit tests which don't test rebalancing
Expand Down Expand Up @@ -325,7 +333,7 @@ public void close(Duration timeout, DrainingMode drainMode) {
} else {
log.info("Signaling to close...");

switch (drainMode){
switch (drainMode) {
case DRAIN:
log.info("Will wait for all in flight to complete before");
transitionToDraining();
Expand Down Expand Up @@ -378,29 +386,29 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
}
producer.close(timeout);

log.debug("Shutting down execution pool...");
List<Runnable> unfinished = workerPool.shutdownNow();
if (!unfinished.isEmpty()) {
log.warn("Threads not done: {}", unfinished);
}

log.trace("Awaiting worker pool termination...");
boolean interrupted = true;
while (interrupted) {
log.warn("Still interrupted");
try {
boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS);
interrupted = false;
if (!terminationFinishedWithoutTimeout) {
log.warn("workerPool await timeout!");
boolean shutdown = workerPool.isShutdown();
boolean terminated = workerPool.isTerminated();
}
} catch (InterruptedException e) {
log.error("InterruptedException", e);
interrupted = true;
}
}
// log.debug("Shutting down execution pool...");
// List<Runnable> unfinished = workerPool.shutdownNow();
// if (!unfinished.isEmpty()) {
// log.warn("Threads not done: {}", unfinished);
// }

// log.trace("Awaiting worker pool termination...");
// boolean interrupted = true;
// while (interrupted) {
// log.warn("Still interrupted");
// try {
// boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS);
// interrupted = false;
// if (!terminationFinishedWithoutTimeout) {
// log.warn("workerPool await timeout!");
// boolean shutdown = workerPool.isShutdown();
// boolean terminated = workerPool.isTerminated();
// }
// } catch (InterruptedException e) {
// log.error("InterruptedException", e);
// interrupted = true;
// }
// }

log.debug("Close complete.");
this.state = closed;
Expand Down Expand Up @@ -506,10 +514,7 @@ private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunctio
Consumer<R> callback) throws TimeoutException, ExecutionException {
if (state == running) {
log.trace("Loop: Get work");
var records = wm.<R>maybeGetWork();

log.trace("Loop: Submit to pool");
submitWorkToPool(userFunction, callback, records);
getWorkAndRegister(userFunction, callback);
}

log.trace("Loop: Process mailbox");
Expand All @@ -532,6 +537,12 @@ private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunctio
log.trace("End of control loop, {} remaining in work manager. In state: {}", wm.getPartitionWorkRemainingCount(), state);
}

protected <R> void getWorkAndRegister(final Function<ConsumerRecord<K, V>, List<R>> userFunction, final Consumer<R> callback) {
var records = wm.<R>maybeGetWork();
log.trace("Loop: Submit to pool");
submitWorkToPool(userFunction, callback, records);
}

private void drain() {
log.debug("Signaling to drain...");
brokerPollSubsystem.drain();
Expand Down Expand Up @@ -734,26 +745,27 @@ protected void onSuccess(WorkContainer<K, V> wc) {
*
* @param workToProcess the polled records to process
*/
private <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
Consumer<R> callback,
List<WorkContainer<K, V>> workToProcess) {
protected <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
Consumer<R> callback,
List<WorkContainer<K, V>> workToProcess) {

for (var work : workToProcess) {
// for each record, construct dispatch to the executor and capture a Future
log.trace("Sending work ({}) to pool", work);
Future outputRecordFuture = workerPool.submit(() -> {
return userFunctionRunner(usersFunction, callback, work);
});
work.setFuture(outputRecordFuture);
Runnable job = () -> {
runUserFunction(usersFunction, callback, work);
};

workerPool.execute(job);
}
}

/**
* Run the supplied function.
*/
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> userFunctionRunner(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
Consumer<R> callback,
WorkContainer<K, V> wc) {
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
Consumer<R> callback,
WorkContainer<K, V> wc) {
// call the user's function
List<R> resultsFromUserFunction;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer> {
@Getter
private static Duration retryDelay = Duration.ofSeconds(10);

@Getter
@Setter(AccessLevel.PACKAGE)
// @Getter
// @Setter(AccessLevel.PACKAGE)
private Future<List<Object>> future;

public WorkContainer(ConsumerRecord<K, V> cr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<!-- external -->
<!-- <logger name="io.vertx" level="debug"/>-->
<logger name="io.netty" level="info"/>
<logger name="com.netflix" level="debug"/>

<!-- kafka internal -->
<!-- <logger name="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator" level="debug"/>-->
Expand Down
Loading