Skip to content

Commit

Permalink
Make max.pending.records property reloadable
Browse files Browse the repository at this point in the history
  • Loading branch information
KarboniteKream committed Jun 24, 2023
1 parent 732dbce commit 9cff79b
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testRegisterWithDefaultSettings() {

CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME);
verify(centralDogmaRepository).commit(
any(String.class),
anyString(),
eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode()))
);
}
Expand Down Expand Up @@ -212,13 +212,13 @@ public void testRegisterWithCustomizedSettings() {
when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap()));

final CommitRequest commitRequest = mock(CommitRequest.class);
when(centralDogmaRepository.commit(any(String.class), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest);
when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest);
when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed)));

CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier);

verify(centralDogmaRepository).commit(
any(String.class),
anyString(),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,41 @@

/**
* Collection of properties that can be configured to adjust {@link DecatonProcessor}'s behavior.
*
* Description of each attributes:
* - Reloadable : Whether update on the property can be applied to running instance without restarting it.
* Note that for properties enabled for this attribute, updating its value may take certain
* latency.
* <p>
* Description of each attribute:
* <ul>
* <li>Reloadable: Whether update on the property can be applied to running instance without restarting it.
* Note that for properties enabled for this attribute, updating its value may take certain
* latency.</li>
* </ul>
*/
public class ProcessorProperties extends AbstractDecatonProperties {
/**
* List of keys of task to skip processing.
*
* <p>
* Note that this property accepts only String keys, while Decaton consumer supports consuming
* keys of arbitrary type. This means that records with non-String keys may just pass through
* this filter.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<List<String>> CONFIG_IGNORE_KEYS =
PropertyDefinition.define("decaton.ignore.keys", List.class, Collections.emptyList(),
PropertyDefinition.checkListElement(String.class));
/**
* Maximum rate of processing tasks per-partition in second.
*
* If the value N is
* - (0, 1,000,000]: Do the best to process tasks as much as N per second.
* N may not be kept well if a task takes over a second to process or N is greater than
* actual throughput per second.
* - 0: Stop all processing but the task currently being processed isn`t interrupted
* - -1: Unlimited
*
* <p>
* If the value N is:
* <ul>
* <li>(0, 1,000,000]: Do the best to process tasks as much as N per second.
* N may not be kept well if a task takes over a second to process or N is greater than
* actual throughput per second.</li>
* <li>0: Stop all processing but the task currently being processed isn`t interrupted</li>
* <li>-1: Unlimited</li>
* </ul>
* <p>
* See also {@link RateLimiter}.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_PROCESSING_RATE =
Expand All @@ -76,42 +80,46 @@ public class ProcessorProperties extends AbstractDecatonProperties {
* Concurrency used to process tasks coming from single partition.
* Reloading this property will be performed for each assigned partition as soon as
* the current pending tasks of the assigned partition have done.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Integer> CONFIG_PARTITION_CONCURRENCY =
PropertyDefinition.define("decaton.partition.concurrency", Integer.class, 1,
v -> v instanceof Integer && (Integer) v > 0);
/**
* Number of records to pause source partition if pending count exceeds this number.
*
* Reloadable: no
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Integer> CONFIG_MAX_PENDING_RECORDS =
PropertyDefinition.define("decaton.max.pending.records", Integer.class, 10_000,
v -> v instanceof Integer && (Integer) v > 0);
/**
* Interval in milliseconds to put in between offset commits.
* Too frequent offset commit would cause high load on brokers while it doesn't essentially prevents
* <p>
* Too frequent offset commit would cause high load on brokers while it doesn't essentially prevent
* duplicate processing.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_COMMIT_INTERVAL_MS =
PropertyDefinition.define("decaton.commit.interval.ms", Long.class, 1000L,
v -> v instanceof Long && (Long) v >= 0);
/**
* Timeout for consumer group rebalance.
* <p>
* Decaton waits up to this time for tasks currently pending or in-progress to finish before allowing a
* rebalance to proceed.
* <p>
* Any tasks that do not complete within this timeout will not have their offsets committed before the
* rebalance, meaning they may be processed multiple times (as they will be processed again after the
* rebalance). If {@link #CONFIG_PARTITION_CONCURRENCY} is greater than 1, this situation might also cause
* other records from the same partition to be processed multiple times (see
* {@link OutOfOrderCommitControl}).
* <p>
* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed
* within this timeout.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_GROUP_REBALANCE_TIMEOUT_MS =
Expand All @@ -124,56 +132,59 @@ public class ProcessorProperties extends AbstractDecatonProperties {
* still be running even after {@link ProcessorSubscription#close()} returns, which might lead to errors
* from e.g. shutting down dependencies of this {@link ProcessorSubscription} that are still in use from
* async tasks.
* <p>
* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed
* within this timeout.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_SHUTDOWN_TIMEOUT_MS =
PropertyDefinition.define("decaton.processing.shutdown.timeout.ms", Long.class, 0L,
v -> v instanceof Long && (Long) v >= 0);

/**
* Control whether to enable or disable decaton specific information store in slf4j's {@link MDC}.
* Control whether to enable or disable decaton specific information store in SLF4J's {@link MDC}.
* This option is enabled by default, but it is known to cause some object allocations which could become
* a problem in massive scale traffic. This option intend to provide an option for users to disable MDC
* properties where not necessary to reduce GC pressure.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Boolean> CONFIG_LOGGING_MDC_ENABLED =
PropertyDefinition.define("decaton.logging.mdc.enabled", Boolean.class, true,
v -> v instanceof Boolean);
/**
* Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers.
* This is disabled for backwards compatiblity, but recommended if you rely on Micrometer
* This is disabled for backwards compatibility, but recommended if you rely on Micrometer
* since JMX metrics are deprecated. The downside is a possible increase in metrics count.
*
* <p>
* Reloadable: no
*/
public static final PropertyDefinition<Boolean> CONFIG_BIND_CLIENT_METRICS =
PropertyDefinition.define("decaton.client.metrics.micrometer.bound", Boolean.class, false,
v -> v instanceof Boolean);
/**
* Control time to "timeout" a deferred completion.
* <p>
* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling
* {@link ProcessingContext#deferCompletion()}, which is useful for processors which integrates with
* asynchronous processing frameworks that sends the processing context to somewhere else and get back
* later.
* <p>
* However, since leaking {@link Completion} returned by {@link ProcessingContext#deferCompletion()} means
* to create a never-completed task, that causes consumption to suck completely after
* {@link #CONFIG_MAX_PENDING_RECORDS} records stacked up, which is not desirable for some use cases.
*
* <p>
* By setting this timeout, Decaton will try to "timeout" a deferred completion after the specified period.
* By setting the timeout to sufficiently large value, which you can be sure that none of normal processing
* to take, some potentially leaked completion will be forcefully completed and decaton can continue to
* consume the following tasks.
*
* Be very careful when using this feature since forcefully completing a timed out completion might leads
* you some data loss if the corresponding processing hasn't yet complete.
*
* <p>
* Be very careful when using this feature since forcefully completing a timed out completion might lead
* to some data loss if the corresponding processing hasn't yet complete.
* <p>
* This timeout can be disabled by setting -1, and it is the default.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS =
Expand All @@ -194,14 +205,15 @@ public class ProcessorProperties extends AbstractDecatonProperties {

/**
* Timeout for processor threads termination.
* <p>
* When a partition is revoked for rebalance or a subscription is about to be shutdown,
* all processors will be destroyed.
* At this time, Decaton waits synchronously for the running tasks to finish until this timeout.
*
* <p>
* Even if timeout occurs, Decaton will continue other clean-up tasks.
* Therefore, you can set this timeout only if unexpected behavior is acceptable in the middle of the last
* {@link DecatonProcessor#process(ProcessingContext, Object)} which timed out.
*
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Long> CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ public class PartitionContext implements AutoCloseable {
@Setter
private volatile boolean reloadRequested;

public PartitionContext(PartitionScope scope, Processors<?> processors, int maxPendingRecords) {
this(scope, processors, new PartitionProcessor(scope, processors), maxPendingRecords);
public PartitionContext(PartitionScope scope, Processors<?> processors) {
this(scope, processors, new PartitionProcessor(scope, processors));
}

// visible for testing
PartitionContext(PartitionScope scope,
Processors<?> processors,
PartitionProcessor partitionProcessor,
int maxPendingRecords) {
PartitionProcessor partitionProcessor) {
this.scope = scope;
this.processors = processors;
this.partitionProcessor = partitionProcessor;

int capacity = maxPendingRecords + scope.maxPollRecords();
int capacity = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value() +
scope.maxPollRecords();
TopicPartition tp = scope.topicPartition();
Metrics metricsCtor = Metrics.withTags("subscription", scope.subscriptionId(),
"topic", tp.topic(),
Expand All @@ -107,7 +107,7 @@ public PartitionContext(PartitionScope scope, Processors<?> processors, int maxP

/**
* Returns the largest offset waiting to be committed, if exists.
*
* <p>
* It returns non-empty value with offset which is larger than the offset
* reported last by {@link #updateCommittedOffset(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertyDefinition;
import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentConfig;
import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentStore;
import com.linecorp.decaton.processor.runtime.internal.CommitManager.OffsetsStore;
Expand All @@ -48,7 +49,7 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti
private final SubscriptionScope scope;
private final Processors<?> processors;
private final Property<Long> processingRateProp;
private final int maxPendingRecords;
private final Property<Integer> maxPendingRecordsProp;
private final Map<TopicPartition, PartitionContext> contexts;

private final ReentrantLock propertyReloadLock;
Expand All @@ -58,26 +59,12 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
this.processors = processors;

processingRateProp = scope.props().get(ProcessorProperties.CONFIG_PROCESSING_RATE);
// We don't support dynamic reload of this value so fix at the time of boot-up.
maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value();
maxPendingRecordsProp = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS);
contexts = new HashMap<>();
propertyReloadLock = new ReentrantLock();

scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> {
// This listener will be called at listener registration.
// It's not necessary to reload contexts at listener registration because PartitionContexts hasn't been instantiated at that time.
if (oldVal == null) {
return;
}

propertyReloadLock.lock();
try {
contexts.values().forEach(context -> context.reloadRequested(true));
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
} finally {
propertyReloadLock.unlock();
}
});
registerReloadListener(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY);
registerReloadListener(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS);
}

public PartitionContext get(TopicPartition tp) {
Expand Down Expand Up @@ -212,13 +199,13 @@ public void updateHighWatermarks() {
}

private boolean shouldPartitionPaused(int pendingRecords) {
return pendingRecords >= maxPendingRecords;
return pendingRecords >= maxPendingRecordsProp.value();
}

// visible for testing
PartitionContext instantiateContext(TopicPartition tp) {
PartitionScope partitionScope = new PartitionScope(scope, tp);
return new PartitionContext(partitionScope, processors, maxPendingRecords);
return new PartitionContext(partitionScope, processors);
}

// visible for testing
Expand Down Expand Up @@ -301,6 +288,26 @@ public void maybeHandlePropertyReload() {
}
}

private <T> void registerReloadListener(PropertyDefinition<T> property) {
scope.props().get(property).listen((oldVal, newVal) -> {
// This listener will also be called at listener registration.
// It's not necessary to reload contexts at listener registration,
// because PartitionContexts hasn't been instantiated at that time.
if (oldVal == null) {
return;
}

propertyReloadLock.lock();
try {
contexts.values().forEach(context -> context.reloadRequested(true));
logger.info("Requested reload of `{}`: oldValue={}, newValue={}",
property.name(), oldVal, newVal);
} finally {
propertyReloadLock.unlock();
}
});
}

private void reloadContexts(Collection<TopicPartition> topicPartitions) {
logger.info("Start dropping partition contexts({})", topicPartitions);
removePartition(topicPartitions);
Expand All @@ -317,7 +324,8 @@ private void destroyProcessors(Collection<TopicPartition> partitions) {
try {
context.destroyProcessors();
} catch (Exception e) {
logger.error("Failed to close partition context for {}", context.topicPartition(), e);
logger.error("Failed to close partition context for {}",
context.topicPartition(), e);
}
}).collect(toList()))
.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -183,7 +185,7 @@ public void testOffsetRegression() throws Exception {
doAnswer(invocation -> {
listener.set(invocation.getArgument(1));
return null;
}).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
}).when(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class));

BlockingQueue<Long> feedOffsets = new ArrayBlockingQueue<>(4);
feedOffsets.add(100L);
Expand All @@ -203,8 +205,8 @@ public void testOffsetRegression() throws Exception {
committedOffsets.putAll(invocation.getArgument(0));
return null;
};
doAnswer(storeCommitOffsets).when(consumer).commitSync(any(Map.class));
doAnswer(storeCommitOffsets).when(consumer).commitAsync(any(Map.class), any());
doAnswer(storeCommitOffsets).when(consumer).commitSync(anyMap());
doAnswer(storeCommitOffsets).when(consumer).commitAsync(anyMap(), any());

AtomicBoolean first = new AtomicBoolean();
doAnswer(invocation -> {
Expand Down
Loading

0 comments on commit 9cff79b

Please sign in to comment.