From 9cff79b6c3263a8f11205dec2ae8d93254f8178e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Klemen=20Ko=C5=A1ir?= Date: Sat, 24 Jun 2023 22:37:33 +0900 Subject: [PATCH] Make `max.pending.records` property reloadable --- .../CentralDogmaPropertySupplierTest.java | 6 +- .../runtime/ProcessorProperties.java | 82 +++++++++++-------- .../runtime/internal/PartitionContext.java | 12 +-- .../runtime/internal/PartitionContexts.java | 50 ++++++----- .../runtime/ProcessorSubscriptionTest.java | 8 +- .../runtime/internal/CommitManagerTest.java | 29 +++---- .../runtime/internal/ConsumeManagerTest.java | 4 +- .../internal/PartitionContextTest.java | 27 +++--- .../internal/PartitionContextsTest.java | 24 ++++-- 9 files changed, 136 insertions(+), 106 deletions(-) diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java index c303325c..07619dfd 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java @@ -167,7 +167,7 @@ public void testRegisterWithDefaultSettings() { CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); verify(centralDogmaRepository).commit( - any(String.class), + anyString(), eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())) ); } @@ -212,13 +212,13 @@ public void testRegisterWithCustomizedSettings() { when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(any(String.class), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); + when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed))); CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier); verify(centralDogmaRepository).commit( - any(String.class), + anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)) ); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index 7cc7b85f..db816d5b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -33,20 +33,22 @@ /** * Collection of properties that can be configured to adjust {@link DecatonProcessor}'s behavior. - * - * Description of each attributes: - * - Reloadable : Whether update on the property can be applied to running instance without restarting it. - * Note that for properties enabled for this attribute, updating its value may take certain - * latency. + *

+ * Description of each attribute: + *

*/ public class ProcessorProperties extends AbstractDecatonProperties { /** * List of keys of task to skip processing. - * + *

* Note that this property accepts only String keys, while Decaton consumer supports consuming * keys of arbitrary type. This means that records with non-String keys may just pass through * this filter. - * + *

* Reloadable: yes */ public static final PropertyDefinition> CONFIG_IGNORE_KEYS = @@ -54,16 +56,18 @@ public class ProcessorProperties extends AbstractDecatonProperties { PropertyDefinition.checkListElement(String.class)); /** * Maximum rate of processing tasks per-partition in second. - * - * If the value N is - * - (0, 1,000,000]: Do the best to process tasks as much as N per second. - * N may not be kept well if a task takes over a second to process or N is greater than - * actual throughput per second. - * - 0: Stop all processing but the task currently being processed isn`t interrupted - * - -1: Unlimited - * + *

+ * If the value N is: + *

+ *

* See also {@link RateLimiter}. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PROCESSING_RATE = @@ -76,7 +80,7 @@ public class ProcessorProperties extends AbstractDecatonProperties { * Concurrency used to process tasks coming from single partition. * Reloading this property will be performed for each assigned partition as soon as * the current pending tasks of the assigned partition have done. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PARTITION_CONCURRENCY = @@ -84,17 +88,18 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Integer && (Integer) v > 0); /** * Number of records to pause source partition if pending count exceeds this number. - * - * Reloadable: no + *

+ * Reloadable: yes */ public static final PropertyDefinition CONFIG_MAX_PENDING_RECORDS = PropertyDefinition.define("decaton.max.pending.records", Integer.class, 10_000, v -> v instanceof Integer && (Integer) v > 0); /** * Interval in milliseconds to put in between offset commits. - * Too frequent offset commit would cause high load on brokers while it doesn't essentially prevents + *

+ * Too frequent offset commit would cause high load on brokers while it doesn't essentially prevent * duplicate processing. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_COMMIT_INTERVAL_MS = @@ -102,16 +107,19 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Long && (Long) v >= 0); /** * Timeout for consumer group rebalance. + *

* Decaton waits up to this time for tasks currently pending or in-progress to finish before allowing a * rebalance to proceed. + *

* Any tasks that do not complete within this timeout will not have their offsets committed before the * rebalance, meaning they may be processed multiple times (as they will be processed again after the * rebalance). If {@link #CONFIG_PARTITION_CONCURRENCY} is greater than 1, this situation might also cause * other records from the same partition to be processed multiple times (see * {@link OutOfOrderCommitControl}). + *

* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed * within this timeout. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_GROUP_REBALANCE_TIMEOUT_MS = @@ -124,9 +132,10 @@ public class ProcessorProperties extends AbstractDecatonProperties { * still be running even after {@link ProcessorSubscription#close()} returns, which might lead to errors * from e.g. shutting down dependencies of this {@link ProcessorSubscription} that are still in use from * async tasks. + *

* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed * within this timeout. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_SHUTDOWN_TIMEOUT_MS = @@ -134,11 +143,11 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Long && (Long) v >= 0); /** - * Control whether to enable or disable decaton specific information store in slf4j's {@link MDC}. + * Control whether to enable or disable decaton specific information store in SLF4J's {@link MDC}. * This option is enabled by default, but it is known to cause some object allocations which could become * a problem in massive scale traffic. This option intend to provide an option for users to disable MDC * properties where not necessary to reduce GC pressure. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_LOGGING_MDC_ENABLED = @@ -146,9 +155,9 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Boolean); /** * Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers. - * This is disabled for backwards compatiblity, but recommended if you rely on Micrometer + * This is disabled for backwards compatibility, but recommended if you rely on Micrometer * since JMX metrics are deprecated. The downside is a possible increase in metrics count. - * + *

* Reloadable: no */ public static final PropertyDefinition CONFIG_BIND_CLIENT_METRICS = @@ -156,24 +165,26 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Boolean); /** * Control time to "timeout" a deferred completion. + *

* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling * {@link ProcessingContext#deferCompletion()}, which is useful for processors which integrates with * asynchronous processing frameworks that sends the processing context to somewhere else and get back * later. + *

* However, since leaking {@link Completion} returned by {@link ProcessingContext#deferCompletion()} means * to create a never-completed task, that causes consumption to suck completely after * {@link #CONFIG_MAX_PENDING_RECORDS} records stacked up, which is not desirable for some use cases. - * + *

* By setting this timeout, Decaton will try to "timeout" a deferred completion after the specified period. * By setting the timeout to sufficiently large value, which you can be sure that none of normal processing * to take, some potentially leaked completion will be forcefully completed and decaton can continue to * consume the following tasks. - * - * Be very careful when using this feature since forcefully completing a timed out completion might leads - * you some data loss if the corresponding processing hasn't yet complete. - * + *

+ * Be very careful when using this feature since forcefully completing a timed out completion might lead + * to some data loss if the corresponding processing hasn't yet complete. + *

* This timeout can be disabled by setting -1, and it is the default. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS = @@ -194,14 +205,15 @@ public class ProcessorProperties extends AbstractDecatonProperties { /** * Timeout for processor threads termination. + *

* When a partition is revoked for rebalance or a subscription is about to be shutdown, * all processors will be destroyed. * At this time, Decaton waits synchronously for the running tasks to finish until this timeout. - * + *

* Even if timeout occurs, Decaton will continue other clean-up tasks. * Therefore, you can set this timeout only if unexpected behavior is acceptable in the middle of the last * {@link DecatonProcessor#process(ProcessingContext, Object)} which timed out. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS = diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java index 12738609..8d6ebd02 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java @@ -69,20 +69,20 @@ public class PartitionContext implements AutoCloseable { @Setter private volatile boolean reloadRequested; - public PartitionContext(PartitionScope scope, Processors processors, int maxPendingRecords) { - this(scope, processors, new PartitionProcessor(scope, processors), maxPendingRecords); + public PartitionContext(PartitionScope scope, Processors processors) { + this(scope, processors, new PartitionProcessor(scope, processors)); } // visible for testing PartitionContext(PartitionScope scope, Processors processors, - PartitionProcessor partitionProcessor, - int maxPendingRecords) { + PartitionProcessor partitionProcessor) { this.scope = scope; this.processors = processors; this.partitionProcessor = partitionProcessor; - int capacity = maxPendingRecords + scope.maxPollRecords(); + int capacity = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value() + + scope.maxPollRecords(); TopicPartition tp = scope.topicPartition(); Metrics metricsCtor = Metrics.withTags("subscription", scope.subscriptionId(), "topic", tp.topic(), @@ -107,7 +107,7 @@ public PartitionContext(PartitionScope scope, Processors processors, int maxP /** * Returns the largest offset waiting to be committed, if exists. - * + *

* It returns non-empty value with offset which is larger than the offset * reported last by {@link #updateCommittedOffset(long)}. * diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java index 737acfa6..209db2c1 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java @@ -36,6 +36,7 @@ import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; +import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentConfig; import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentStore; import com.linecorp.decaton.processor.runtime.internal.CommitManager.OffsetsStore; @@ -48,7 +49,7 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti private final SubscriptionScope scope; private final Processors processors; private final Property processingRateProp; - private final int maxPendingRecords; + private final Property maxPendingRecordsProp; private final Map contexts; private final ReentrantLock propertyReloadLock; @@ -58,26 +59,12 @@ public PartitionContexts(SubscriptionScope scope, Processors processors) { this.processors = processors; processingRateProp = scope.props().get(ProcessorProperties.CONFIG_PROCESSING_RATE); - // We don't support dynamic reload of this value so fix at the time of boot-up. - maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value(); + maxPendingRecordsProp = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS); contexts = new HashMap<>(); propertyReloadLock = new ReentrantLock(); - scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> { - // This listener will be called at listener registration. - // It's not necessary to reload contexts at listener registration because PartitionContexts hasn't been instantiated at that time. - if (oldVal == null) { - return; - } - - propertyReloadLock.lock(); - try { - contexts.values().forEach(context -> context.reloadRequested(true)); - logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal); - } finally { - propertyReloadLock.unlock(); - } - }); + registerReloadListener(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY); + registerReloadListener(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS); } public PartitionContext get(TopicPartition tp) { @@ -212,13 +199,13 @@ public void updateHighWatermarks() { } private boolean shouldPartitionPaused(int pendingRecords) { - return pendingRecords >= maxPendingRecords; + return pendingRecords >= maxPendingRecordsProp.value(); } // visible for testing PartitionContext instantiateContext(TopicPartition tp) { PartitionScope partitionScope = new PartitionScope(scope, tp); - return new PartitionContext(partitionScope, processors, maxPendingRecords); + return new PartitionContext(partitionScope, processors); } // visible for testing @@ -301,6 +288,26 @@ public void maybeHandlePropertyReload() { } } + private void registerReloadListener(PropertyDefinition property) { + scope.props().get(property).listen((oldVal, newVal) -> { + // This listener will also be called at listener registration. + // It's not necessary to reload contexts at listener registration, + // because PartitionContexts hasn't been instantiated at that time. + if (oldVal == null) { + return; + } + + propertyReloadLock.lock(); + try { + contexts.values().forEach(context -> context.reloadRequested(true)); + logger.info("Requested reload of `{}`: oldValue={}, newValue={}", + property.name(), oldVal, newVal); + } finally { + propertyReloadLock.unlock(); + } + }); + } + private void reloadContexts(Collection topicPartitions) { logger.info("Start dropping partition contexts({})", topicPartitions); removePartition(topicPartitions); @@ -317,7 +324,8 @@ private void destroyProcessors(Collection partitions) { try { context.destroyProcessors(); } catch (Exception e) { - logger.error("Failed to close partition context for {}", context.topicPartition(), e); + logger.error("Failed to close partition context for {}", + context.topicPartition(), e); } }).collect(toList())) .join(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 93f08087..0df3ecdd 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; @@ -183,7 +185,7 @@ public void testOffsetRegression() throws Exception { doAnswer(invocation -> { listener.set(invocation.getArgument(1)); return null; - }).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + }).when(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class)); BlockingQueue feedOffsets = new ArrayBlockingQueue<>(4); feedOffsets.add(100L); @@ -203,8 +205,8 @@ public void testOffsetRegression() throws Exception { committedOffsets.putAll(invocation.getArgument(0)); return null; }; - doAnswer(storeCommitOffsets).when(consumer).commitSync(any(Map.class)); - doAnswer(storeCommitOffsets).when(consumer).commitAsync(any(Map.class), any()); + doAnswer(storeCommitOffsets).when(consumer).commitSync(anyMap()); + doAnswer(storeCommitOffsets).when(consumer).commitAsync(anyMap(), any()); AtomicBoolean first = new AtomicBoolean(); doAnswer(invocation -> { diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java index a3a84d09..aa688bef 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java @@ -19,6 +19,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -68,7 +69,6 @@ public class CommitManagerTest { public void setUp() { commitManager = spy(new CommitManager(consumer, commitIntervalMillis, store, clock)); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync() { // When committed ended up successfully update committed offsets @@ -77,29 +77,27 @@ public void testCommitCompletedOffsetsSync() { doReturn(offsets).when(store).commitReadyOffsets(); commitManager.commitSync(); - verify(consumer, times(1)).commitSync(any(Map.class)); + verify(consumer, times(1)).commitSync(anyMap()); verify(store, times(1)).storeCommittedOffsets(offsets); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync_NO_COMMIT() { // When target offsets is empty do not attempt any commit doReturn(emptyMap()).when(store).commitReadyOffsets(); commitManager.commitSync(); - verify(consumer, never()).commitSync(any(Map.class)); - verify(consumer, never()).commitAsync(any(Map.class), any()); + verify(consumer, never()).commitSync(anyMap()); + verify(consumer, never()).commitAsync(anyMap(), any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync_FAIL() { // When commit raised an exception do not update committed offsets Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); - doThrow(new RuntimeException("error")).when(consumer).commitSync(any(Map.class)); + doThrow(new RuntimeException("error")).when(consumer).commitSync(anyMap()); try { commitManager.commitSync(); } catch (RuntimeException ignored) { @@ -108,9 +106,8 @@ public void testCommitCompletedOffsetsSync_FAIL() { verify(store, never()).storeCommittedOffsets(any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testCommitCompletedOffsetsAsync() throws InterruptedException { + public void testCommitCompletedOffsetsAsync() { Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); @@ -119,17 +116,17 @@ public void testCommitCompletedOffsetsAsync() throws InterruptedException { OffsetCommitCallback cb = invocation.getArgument(1); cbRef.set(cb); return null; - }).when(consumer).commitAsync(any(Map.class), any()); + }).when(consumer).commitAsync(anyMap(), any()); commitManager.commitAsync(); // Committed offsets should not be updated yet here - verify(consumer, times(1)).commitAsync(any(Map.class), any()); + verify(consumer, times(1)).commitAsync(anyMap(), any()); verify(store, never()).storeCommittedOffsets(any()); // Subsequent async commit attempt should be ignored until the in-flight one completes commitManager.commitAsync(); - verify(consumer, times(1)).commitAsync(any(Map.class), any()); + verify(consumer, times(1)).commitAsync(anyMap(), any()); verify(store, never()).storeCommittedOffsets(any()); // Committed offset should be updated once the in-flight request completes @@ -137,9 +134,8 @@ public void testCommitCompletedOffsetsAsync() throws InterruptedException { verify(store, times(1)).storeCommittedOffsets(offsets); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { + public void testCommitCompletedOffsetsAsync_FAIL() { Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); @@ -148,7 +144,7 @@ public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { OffsetCommitCallback cb = invocation.getArgument(1); cbRef.set(cb); return null; - }).when(consumer).commitAsync(any(Map.class), any()); + }).when(consumer).commitAsync(anyMap(), any()); commitManager.commitAsync(); // If async commit fails it should never update committed offset @@ -156,7 +152,6 @@ public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { verify(store, never()).storeCommittedOffsets(any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsAsync_SUBSEQUENT_SYNC() { Map offsets = singletonMap( @@ -168,7 +163,7 @@ public void testCommitCompletedOffsetsAsync_SUBSEQUENT_SYNC() { // Subsequent sync commit can proceed regardless of in-flight async commit commitManager.commitSync(); - verify(consumer, times(1)).commitSync(any(Map.class)); + verify(consumer, times(1)).commitSync(anyMap()); } @Test diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java index 63973f25..2ab7dd8d 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; @@ -29,7 +30,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -85,7 +85,7 @@ public void setUp() { doAnswer(invocation -> { rebalanceListener = invocation.getArgument(1); return null; - }).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + }).when(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class)); consumeManager.init(singletonList(TOPIC)); } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java index 569ddadb..634942f2 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java @@ -38,6 +38,7 @@ import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.PerKeyQuotaConfig; import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.internal.PerKeyQuotaManager.UsageType; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider.NoopTrace; @@ -46,10 +47,14 @@ public class PartitionContextTest { @Rule public MockitoRule rule = MockitoJUnit.rule(); + private static final Property MAX_PENDING_RECORDS = + Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 100); + private static PartitionScope scope(String topic, Optional perKeyQuotaConfig) { return new PartitionScope( new SubscriptionScope("subscription", "topic", - Optional.empty(), perKeyQuotaConfig, ProcessorProperties.builder().build(), + Optional.empty(), perKeyQuotaConfig, + ProcessorProperties.builder().set(MAX_PENDING_RECORDS).build(), NoopTracingProvider.INSTANCE, ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, DefaultSubPartitioner::new), @@ -61,13 +66,11 @@ private static PartitionScope scope(String topic, Optional pe @Mock private PartitionProcessor partitionProcessor; @Mock - ConsumerRecord record; - - private static final int MAX_PENDING_RECORDS = 100; + private ConsumerRecord record; @Test public void testOffsetWaitingCommit() { - PartitionContext context = new PartitionContext(scope("topic", Optional.empty()), processors, MAX_PENDING_RECORDS); + PartitionContext context = new PartitionContext(scope("topic", Optional.empty()), processors); assertFalse(context.offsetWaitingCommit().isPresent()); OffsetState state = context.registerOffset(100); @@ -84,25 +87,25 @@ public void testOffsetWaitingCommit() { @Test public void testQuotaUsage() { PartitionContext context = new PartitionContext( - scope("topic", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic", Optional.of(PerKeyQuotaConfig.shape())), processors); assertEquals(UsageType.COMPLY, context.maybeRecordQuotaUsage(new byte[0]).type()); } @Test public void testQuotaUsageWhenDisabled() { PartitionContext context = new PartitionContext( - scope("topic", Optional.empty()), processors, MAX_PENDING_RECORDS); + scope("topic", Optional.empty()), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); } @Test public void testQuotaUsageNonTargetTopic() { PartitionContext context = new PartitionContext( - scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); context = new PartitionContext( - scope("topic-retry", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic-retry", Optional.of(PerKeyQuotaConfig.shape())), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); } @@ -111,8 +114,7 @@ public void testQuotaApplied() { PartitionContext context = new PartitionContext( scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, - partitionProcessor, - MAX_PENDING_RECORDS); + partitionProcessor); context.addRecord(record, new OffsetState(42L), NoopTrace.INSTANCE, (r, o, q) -> true); verify(partitionProcessor, never()).addTask(any()); @@ -123,8 +125,7 @@ public void testQuotaNotApplied() { PartitionContext context = new PartitionContext( scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, - partitionProcessor, - MAX_PENDING_RECORDS); + partitionProcessor); context.addRecord(record, new OffsetState(42L), NoopTrace.INSTANCE, (r, o, q) -> false); verify(partitionProcessor, times(1)).addTask(any()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java index c9f7f426..254ff2df 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.processor.runtime.internal; +import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_MAX_PENDING_RECORDS; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PROCESSING_RATE; import static java.util.Arrays.asList; @@ -56,7 +57,6 @@ import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; -import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; public class PartitionContextsTest { @@ -68,12 +68,15 @@ public class PartitionContextsTest { private final DynamicProperty partitionConcurrencyProperty = new DynamicProperty<>(CONFIG_PARTITION_CONCURRENCY); + private final DynamicProperty maxPendingRecordsProperty = + new DynamicProperty<>(CONFIG_MAX_PENDING_RECORDS); + private final DynamicProperty processingRateProp = new DynamicProperty<>(CONFIG_PROCESSING_RATE); private final ProcessorProperties props = ProcessorProperties .builder() - .set(Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, PENDING_RECORDS_TO_PAUSE)) .set(partitionConcurrencyProperty) + .set(maxPendingRecordsProperty) .set(processingRateProp) .build(); @@ -108,6 +111,7 @@ private static TopicPartition tp(int partition) { @Before public void setup() { partitionConcurrencyProperty.set(1); + maxPendingRecordsProperty.set(PENDING_RECORDS_TO_PAUSE); contexts = spy(new PartitionContexts(scope, processors)); } @@ -193,7 +197,7 @@ public void testPartitionsNeedsPause() { assertEquals(1, needPause.size()); assertEquals(cts.get(0).topicPartition(), needPause.iterator().next()); - // Mark as paused so it should disappear from the response. + // Mark as paused, so it should disappear from the response. doReturn(true).when(cts.get(0)).paused(); needPause = contexts.partitionsNeedsPause(); assertTrue(needPause.isEmpty()); @@ -237,20 +241,26 @@ public void testPartitionsNeedsResume() { needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); - // Task count is now lower than threshold. Should appear in resume response. + // Task count is now lower, but so is the threshold. Still shouldn't appear in resume list. + maxPendingRecordsProperty.set(PENDING_RECORDS_TO_PAUSE - 1); doReturn(PENDING_RECORDS_TO_PAUSE - 1).when(cts.get(0)).pendingTasksCount(); needsResume = contexts.partitionsNeedsResume(); + assertTrue(needsResume.isEmpty()); + + // Task count is now lower than the threshold. Should appear in resume response. + doReturn(PENDING_RECORDS_TO_PAUSE - 2).when(cts.get(0)).pendingTasksCount(); + needsResume = contexts.partitionsNeedsResume(); assertEquals(1, needsResume.size()); assertEquals(cts.get(0).topicPartition(), needsResume.iterator().next()); // All processing paused by rate limiting. - // All partitions should be disappear from resume response. + // All partitions should disappear from resume response. doReturn(true).when(contexts).pausingAllProcessing(); doReturn(true).when(cts.get(1)).paused(); needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); - // All pause finished. Now only partitions should be appear in resume response. + // All pause finished. Now all partitions should appear in resume response. doReturn(false).when(contexts).pausingAllProcessing(); needsResume = contexts.partitionsNeedsResume(); assertEquals(cts.stream().map(PartitionContext::topicPartition).collect(Collectors.toSet()), @@ -282,6 +292,8 @@ public void testShouldNotBePausingAllProcessingByPropertyReload() { assertFalse(contexts.pausingAllProcessing()); partitionConcurrencyProperty.set(42); assertFalse(contexts.pausingAllProcessing()); + maxPendingRecordsProperty.set(42); + assertFalse(contexts.pausingAllProcessing()); } @Test