Skip to content

Commit

Permalink
Make max.pending.records property reloadable (#207)
Browse files Browse the repository at this point in the history
* Make `max.pending.records` property reloadable

* Update `PropertyReloadRequestTest`

* Address comments

* Fix nit
  • Loading branch information
KarboniteKream authored Jul 20, 2023
1 parent 732dbce commit 44108c5
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testRegisterWithDefaultSettings() {

CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME);
verify(centralDogmaRepository).commit(
any(String.class),
anyString(),
eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode()))
);
}
Expand Down Expand Up @@ -212,13 +212,13 @@ public void testRegisterWithCustomizedSettings() {
when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap()));

final CommitRequest commitRequest = mock(CommitRequest.class);
when(centralDogmaRepository.commit(any(String.class), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest);
when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest);
when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed)));

CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier);

verify(centralDogmaRepository).commit(
any(String.class),
anyString(),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,19 @@ public void testPropertyDynamicSwitch() throws Exception {
DynamicProperty<Integer> concurrencyProp =
new DynamicProperty<>(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY);
concurrencyProp.set(1);

DynamicProperty<Integer> recordsProp =
new DynamicProperty<>(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS);
recordsProp.set(10);

try (ProcessorSubscription subscription = TestUtils.subscription(
rule.bootstrapServers(),
builder -> builder.processorsBuilder(ProcessorsBuilder
.consuming(topicName,
new ProtocolBuffersDeserializer<>(
HelloTask.parser()))
.thenProcess(processor))
.addProperties(StaticPropertySupplier.of(concurrencyProp)));
.addProperties(StaticPropertySupplier.of(concurrencyProp, recordsProp)));
DecatonClient<HelloTask> client = TestUtils.client(topicName, rule.bootstrapServers())) {

int count = 0;
Expand All @@ -92,12 +97,19 @@ public void testPropertyDynamicSwitch() throws Exception {
if (count == 1000) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(3);
} else if (count == 3000) {
TimeUnit.SECONDS.sleep(1);
recordsProp.set(20);
} else if (count == 5000) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(1);
recordsProp.set(5);
} else if (count == 7500) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(5);
} else if (count == 9000) {
TimeUnit.SECONDS.sleep(1);
recordsProp.set(15);
}
client.put(key, HelloTask.getDefaultInstance());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,41 @@

/**
* Collection of properties that can be configured to adjust {@link DecatonProcessor}'s behavior.
*
* Description of each attributes:
* - Reloadable : Whether update on the property can be applied to running instance without restarting it.
* Note that for properties enabled for this attribute, updating its value may take certain
* latency.
* <p>
* Description of each attribute:
* <ul>
* <li>Reloadable: Whether update on the property can be applied to running instance without restarting it.
* Note that for properties enabled for this attribute, updating its value may take certain
* latency.</li>
* </ul>
*/
public class ProcessorProperties extends AbstractDecatonProperties {
/**
* List of keys of task to skip processing.
*
* <p>
* Note that this property accepts only String keys, while Decaton consumer supports consuming
* keys of arbitrary type. This means that records with non-String keys may just pass through
* this filter.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<List<String>> CONFIG_IGNORE_KEYS =
PropertyDefinition.define("decaton.ignore.keys", List.class, Collections.emptyList(),
PropertyDefinition.checkListElement(String.class));
/**
* Maximum rate of processing tasks per-partition in second.
*
* If the value N is
* - (0, 1,000,000]: Do the best to process tasks as much as N per second.
* N may not be kept well if a task takes over a second to process or N is greater than
* actual throughput per second.
* - 0: Stop all processing but the task currently being processed isn`t interrupted
* - -1: Unlimited
*
* <p>
* If the value N is:
* <ul>
* <li>(0, 1,000,000]: Do the best to process tasks as much as N per second.
* N may not be kept well if a task takes over a second to process or N is greater than
* actual throughput per second.</li>
* <li>0: Stop all processing but the task currently being processed isn`t interrupted</li>
* <li>-1: Unlimited</li>
* </ul>
* <p>
* See also {@link RateLimiter}.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_PROCESSING_RATE =
Expand All @@ -74,44 +78,52 @@ public class ProcessorProperties extends AbstractDecatonProperties {
&& (long) v <= RateLimiter.MAX_RATE);
/**
* Concurrency used to process tasks coming from single partition.
* <p>
* Reloading this property will be performed for each assigned partition as soon as
* the current pending tasks of the assigned partition have done.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Integer> CONFIG_PARTITION_CONCURRENCY =
PropertyDefinition.define("decaton.partition.concurrency", Integer.class, 1,
v -> v instanceof Integer && (Integer) v > 0);
/**
* Number of records to pause source partition if pending count exceeds this number.
*
* Reloadable: no
* <p>
* Reloading this property will be performed for each assigned partition as soon as
* the current pending tasks of the assigned partition have done.
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Integer> CONFIG_MAX_PENDING_RECORDS =
PropertyDefinition.define("decaton.max.pending.records", Integer.class, 10_000,
v -> v instanceof Integer && (Integer) v > 0);
/**
* Interval in milliseconds to put in between offset commits.
* Too frequent offset commit would cause high load on brokers while it doesn't essentially prevents
* <p>
* Too frequent offset commit would cause high load on brokers while it doesn't essentially prevent
* duplicate processing.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_COMMIT_INTERVAL_MS =
PropertyDefinition.define("decaton.commit.interval.ms", Long.class, 1000L,
v -> v instanceof Long && (Long) v >= 0);
/**
* Timeout for consumer group rebalance.
* <p>
* Decaton waits up to this time for tasks currently pending or in-progress to finish before allowing a
* rebalance to proceed.
* <p>
* Any tasks that do not complete within this timeout will not have their offsets committed before the
* rebalance, meaning they may be processed multiple times (as they will be processed again after the
* rebalance). If {@link #CONFIG_PARTITION_CONCURRENCY} is greater than 1, this situation might also cause
* other records from the same partition to be processed multiple times (see
* {@link OutOfOrderCommitControl}).
* <p>
* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed
* within this timeout.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_GROUP_REBALANCE_TIMEOUT_MS =
Expand All @@ -124,56 +136,59 @@ public class ProcessorProperties extends AbstractDecatonProperties {
* still be running even after {@link ProcessorSubscription#close()} returns, which might lead to errors
* from e.g. shutting down dependencies of this {@link ProcessorSubscription} that are still in use from
* async tasks.
* <p>
* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed
* within this timeout.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_SHUTDOWN_TIMEOUT_MS =
PropertyDefinition.define("decaton.processing.shutdown.timeout.ms", Long.class, 0L,
v -> v instanceof Long && (Long) v >= 0);

/**
* Control whether to enable or disable decaton specific information store in slf4j's {@link MDC}.
* Control whether to enable or disable decaton specific information store in SLF4J's {@link MDC}.
* This option is enabled by default, but it is known to cause some object allocations which could become
* a problem in massive scale traffic. This option intend to provide an option for users to disable MDC
* properties where not necessary to reduce GC pressure.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Boolean> CONFIG_LOGGING_MDC_ENABLED =
PropertyDefinition.define("decaton.logging.mdc.enabled", Boolean.class, true,
v -> v instanceof Boolean);
/**
* Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers.
* This is disabled for backwards compatiblity, but recommended if you rely on Micrometer
* This is disabled for backwards compatibility, but recommended if you rely on Micrometer
* since JMX metrics are deprecated. The downside is a possible increase in metrics count.
*
* <p>
* Reloadable: no
*/
public static final PropertyDefinition<Boolean> CONFIG_BIND_CLIENT_METRICS =
PropertyDefinition.define("decaton.client.metrics.micrometer.bound", Boolean.class, false,
v -> v instanceof Boolean);
/**
* Control time to "timeout" a deferred completion.
* <p>
* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling
* {@link ProcessingContext#deferCompletion()}, which is useful for processors which integrates with
* asynchronous processing frameworks that sends the processing context to somewhere else and get back
* later.
* <p>
* However, since leaking {@link Completion} returned by {@link ProcessingContext#deferCompletion()} means
* to create a never-completed task, that causes consumption to suck completely after
* {@link #CONFIG_MAX_PENDING_RECORDS} records stacked up, which is not desirable for some use cases.
*
* <p>
* By setting this timeout, Decaton will try to "timeout" a deferred completion after the specified period.
* By setting the timeout to sufficiently large value, which you can be sure that none of normal processing
* to take, some potentially leaked completion will be forcefully completed and decaton can continue to
* consume the following tasks.
*
* Be very careful when using this feature since forcefully completing a timed out completion might leads
* you some data loss if the corresponding processing hasn't yet complete.
*
* <p>
* Be very careful when using this feature since forcefully completing a timed out completion might lead
* to some data loss if the corresponding processing hasn't yet complete.
* <p>
* This timeout can be disabled by setting -1, and it is the default.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS =
Expand All @@ -194,14 +209,15 @@ public class ProcessorProperties extends AbstractDecatonProperties {

/**
* Timeout for processor threads termination.
* <p>
* When a partition is revoked for rebalance or a subscription is about to be shutdown,
* all processors will be destroyed.
* At this time, Decaton waits synchronously for the running tasks to finish until this timeout.
*
* <p>
* Even if timeout occurs, Decaton will continue other clean-up tasks.
* Therefore, you can set this timeout only if unexpected behavior is acceptable in the middle of the last
* {@link DecatonProcessor#process(ProcessingContext, Object)} which timed out.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PartitionContext implements AutoCloseable {
private final PerKeyQuotaManager perKeyQuotaManager;
private final Processors<?> processors;
private final PartitionStateMetrics metrics;
private final int maxPendingRecords;

// The offset committed successfully at last commit
private volatile long lastCommittedOffset;
Expand All @@ -69,18 +70,18 @@ public class PartitionContext implements AutoCloseable {
@Setter
private volatile boolean reloadRequested;

public PartitionContext(PartitionScope scope, Processors<?> processors, int maxPendingRecords) {
this(scope, processors, new PartitionProcessor(scope, processors), maxPendingRecords);
public PartitionContext(PartitionScope scope, Processors<?> processors) {
this(scope, processors, new PartitionProcessor(scope, processors));
}

// visible for testing
PartitionContext(PartitionScope scope,
Processors<?> processors,
PartitionProcessor partitionProcessor,
int maxPendingRecords) {
PartitionProcessor partitionProcessor) {
this.scope = scope;
this.processors = processors;
this.partitionProcessor = partitionProcessor;
maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value();

int capacity = maxPendingRecords + scope.maxPollRecords();
TopicPartition tp = scope.topicPartition();
Expand All @@ -107,7 +108,7 @@ public PartitionContext(PartitionScope scope, Processors<?> processors, int maxP

/**
* Returns the largest offset waiting to be committed, if exists.
*
* <p>
* It returns non-empty value with offset which is larger than the offset
* reported last by {@link #updateCommittedOffset(long)}.
*
Expand Down Expand Up @@ -146,6 +147,10 @@ public int pendingTasksCount() {
return commitControl.pendingOffsetsCount();
}

public boolean shouldPausePartition() {
return pendingTasksCount() >= maxPendingRecords;
}

public void destroyProcessors() throws Exception {
partitionProcessor.close();
processors.destroyPartitionScope(scope.subscriptionId(), scope.topicPartition());
Expand Down
Loading

0 comments on commit 44108c5

Please sign in to comment.