diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java index c303325c..07619dfd 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java @@ -167,7 +167,7 @@ public void testRegisterWithDefaultSettings() { CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); verify(centralDogmaRepository).commit( - any(String.class), + anyString(), eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())) ); } @@ -212,13 +212,13 @@ public void testRegisterWithCustomizedSettings() { when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(any(String.class), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); + when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed))); CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier); verify(centralDogmaRepository).commit( - any(String.class), + anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)) ); } diff --git a/processor/src/it/java/com/linecorp/decaton/processor/PropertyReloadRequestTest.java b/processor/src/it/java/com/linecorp/decaton/processor/PropertyReloadRequestTest.java index b2bbd67e..84696373 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/PropertyReloadRequestTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/PropertyReloadRequestTest.java @@ -76,6 +76,11 @@ public void testPropertyDynamicSwitch() throws Exception { DynamicProperty concurrencyProp = new DynamicProperty<>(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY); concurrencyProp.set(1); + + DynamicProperty recordsProp = + new DynamicProperty<>(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS); + recordsProp.set(10); + try (ProcessorSubscription subscription = TestUtils.subscription( rule.bootstrapServers(), builder -> builder.processorsBuilder(ProcessorsBuilder @@ -83,7 +88,7 @@ public void testPropertyDynamicSwitch() throws Exception { new ProtocolBuffersDeserializer<>( HelloTask.parser())) .thenProcess(processor)) - .addProperties(StaticPropertySupplier.of(concurrencyProp))); + .addProperties(StaticPropertySupplier.of(concurrencyProp, recordsProp))); DecatonClient client = TestUtils.client(topicName, rule.bootstrapServers())) { int count = 0; @@ -92,12 +97,19 @@ public void testPropertyDynamicSwitch() throws Exception { if (count == 1000) { TimeUnit.SECONDS.sleep(1); concurrencyProp.set(3); + } else if (count == 3000) { + TimeUnit.SECONDS.sleep(1); + recordsProp.set(20); } else if (count == 5000) { TimeUnit.SECONDS.sleep(1); concurrencyProp.set(1); + recordsProp.set(5); } else if (count == 7500) { TimeUnit.SECONDS.sleep(1); concurrencyProp.set(5); + } else if (count == 9000) { + TimeUnit.SECONDS.sleep(1); + recordsProp.set(15); } client.put(key, HelloTask.getDefaultInstance()); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index 7cc7b85f..a41571a4 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -33,20 +33,22 @@ /** * Collection of properties that can be configured to adjust {@link DecatonProcessor}'s behavior. - * - * Description of each attributes: - * - Reloadable : Whether update on the property can be applied to running instance without restarting it. - * Note that for properties enabled for this attribute, updating its value may take certain - * latency. + *

+ * Description of each attribute: + *

    + *
  • Reloadable: Whether update on the property can be applied to running instance without restarting it. + * Note that for properties enabled for this attribute, updating its value may take certain + * latency.
  • + *
*/ public class ProcessorProperties extends AbstractDecatonProperties { /** * List of keys of task to skip processing. - * + *

* Note that this property accepts only String keys, while Decaton consumer supports consuming * keys of arbitrary type. This means that records with non-String keys may just pass through * this filter. - * + *

* Reloadable: yes */ public static final PropertyDefinition> CONFIG_IGNORE_KEYS = @@ -54,16 +56,18 @@ public class ProcessorProperties extends AbstractDecatonProperties { PropertyDefinition.checkListElement(String.class)); /** * Maximum rate of processing tasks per-partition in second. - * - * If the value N is - * - (0, 1,000,000]: Do the best to process tasks as much as N per second. - * N may not be kept well if a task takes over a second to process or N is greater than - * actual throughput per second. - * - 0: Stop all processing but the task currently being processed isn`t interrupted - * - -1: Unlimited - * + *

+ * If the value N is: + *

    + *
  • (0, 1,000,000]: Do the best to process tasks as much as N per second. + * N may not be kept well if a task takes over a second to process or N is greater than + * actual throughput per second.
  • + *
  • 0: Stop all processing but the task currently being processed isn`t interrupted
  • + *
  • -1: Unlimited
  • + *
+ *

* See also {@link RateLimiter}. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PROCESSING_RATE = @@ -74,9 +78,10 @@ public class ProcessorProperties extends AbstractDecatonProperties { && (long) v <= RateLimiter.MAX_RATE); /** * Concurrency used to process tasks coming from single partition. + *

* Reloading this property will be performed for each assigned partition as soon as * the current pending tasks of the assigned partition have done. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PARTITION_CONCURRENCY = @@ -84,17 +89,21 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Integer && (Integer) v > 0); /** * Number of records to pause source partition if pending count exceeds this number. - * - * Reloadable: no + *

+ * Reloading this property will be performed for each assigned partition as soon as + * the current pending tasks of the assigned partition have done. + *

+ * Reloadable: yes */ public static final PropertyDefinition CONFIG_MAX_PENDING_RECORDS = PropertyDefinition.define("decaton.max.pending.records", Integer.class, 10_000, v -> v instanceof Integer && (Integer) v > 0); /** * Interval in milliseconds to put in between offset commits. - * Too frequent offset commit would cause high load on brokers while it doesn't essentially prevents + *

+ * Too frequent offset commit would cause high load on brokers while it doesn't essentially prevent * duplicate processing. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_COMMIT_INTERVAL_MS = @@ -102,16 +111,19 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Long && (Long) v >= 0); /** * Timeout for consumer group rebalance. + *

* Decaton waits up to this time for tasks currently pending or in-progress to finish before allowing a * rebalance to proceed. + *

* Any tasks that do not complete within this timeout will not have their offsets committed before the * rebalance, meaning they may be processed multiple times (as they will be processed again after the * rebalance). If {@link #CONFIG_PARTITION_CONCURRENCY} is greater than 1, this situation might also cause * other records from the same partition to be processed multiple times (see * {@link OutOfOrderCommitControl}). + *

* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed * within this timeout. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_GROUP_REBALANCE_TIMEOUT_MS = @@ -124,9 +136,10 @@ public class ProcessorProperties extends AbstractDecatonProperties { * still be running even after {@link ProcessorSubscription#close()} returns, which might lead to errors * from e.g. shutting down dependencies of this {@link ProcessorSubscription} that are still in use from * async tasks. + *

* Generally, this should be set such that {@link #CONFIG_MAX_PENDING_RECORDS} can be comfortably processed * within this timeout. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_SHUTDOWN_TIMEOUT_MS = @@ -134,11 +147,11 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Long && (Long) v >= 0); /** - * Control whether to enable or disable decaton specific information store in slf4j's {@link MDC}. + * Control whether to enable or disable decaton specific information store in SLF4J's {@link MDC}. * This option is enabled by default, but it is known to cause some object allocations which could become * a problem in massive scale traffic. This option intend to provide an option for users to disable MDC * properties where not necessary to reduce GC pressure. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_LOGGING_MDC_ENABLED = @@ -146,9 +159,9 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Boolean); /** * Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers. - * This is disabled for backwards compatiblity, but recommended if you rely on Micrometer + * This is disabled for backwards compatibility, but recommended if you rely on Micrometer * since JMX metrics are deprecated. The downside is a possible increase in metrics count. - * + *

* Reloadable: no */ public static final PropertyDefinition CONFIG_BIND_CLIENT_METRICS = @@ -156,24 +169,26 @@ public class ProcessorProperties extends AbstractDecatonProperties { v -> v instanceof Boolean); /** * Control time to "timeout" a deferred completion. + *

* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling * {@link ProcessingContext#deferCompletion()}, which is useful for processors which integrates with * asynchronous processing frameworks that sends the processing context to somewhere else and get back * later. + *

* However, since leaking {@link Completion} returned by {@link ProcessingContext#deferCompletion()} means * to create a never-completed task, that causes consumption to suck completely after * {@link #CONFIG_MAX_PENDING_RECORDS} records stacked up, which is not desirable for some use cases. - * + *

* By setting this timeout, Decaton will try to "timeout" a deferred completion after the specified period. * By setting the timeout to sufficiently large value, which you can be sure that none of normal processing * to take, some potentially leaked completion will be forcefully completed and decaton can continue to * consume the following tasks. - * - * Be very careful when using this feature since forcefully completing a timed out completion might leads - * you some data loss if the corresponding processing hasn't yet complete. - * + *

+ * Be very careful when using this feature since forcefully completing a timed out completion might lead + * to some data loss if the corresponding processing hasn't yet complete. + *

* This timeout can be disabled by setting -1, and it is the default. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS = @@ -194,14 +209,15 @@ public class ProcessorProperties extends AbstractDecatonProperties { /** * Timeout for processor threads termination. + *

* When a partition is revoked for rebalance or a subscription is about to be shutdown, * all processors will be destroyed. * At this time, Decaton waits synchronously for the running tasks to finish until this timeout. - * + *

* Even if timeout occurs, Decaton will continue other clean-up tasks. * Therefore, you can set this timeout only if unexpected behavior is acceptable in the middle of the last * {@link DecatonProcessor#process(ProcessingContext, Object)} which timed out. - * + *

* Reloadable: yes */ public static final PropertyDefinition CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS = diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java index 12738609..d8b02c0b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java @@ -45,6 +45,7 @@ public class PartitionContext implements AutoCloseable { private final PerKeyQuotaManager perKeyQuotaManager; private final Processors processors; private final PartitionStateMetrics metrics; + private final int maxPendingRecords; // The offset committed successfully at last commit private volatile long lastCommittedOffset; @@ -69,18 +70,18 @@ public class PartitionContext implements AutoCloseable { @Setter private volatile boolean reloadRequested; - public PartitionContext(PartitionScope scope, Processors processors, int maxPendingRecords) { - this(scope, processors, new PartitionProcessor(scope, processors), maxPendingRecords); + public PartitionContext(PartitionScope scope, Processors processors) { + this(scope, processors, new PartitionProcessor(scope, processors)); } // visible for testing PartitionContext(PartitionScope scope, Processors processors, - PartitionProcessor partitionProcessor, - int maxPendingRecords) { + PartitionProcessor partitionProcessor) { this.scope = scope; this.processors = processors; this.partitionProcessor = partitionProcessor; + maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value(); int capacity = maxPendingRecords + scope.maxPollRecords(); TopicPartition tp = scope.topicPartition(); @@ -107,7 +108,7 @@ public PartitionContext(PartitionScope scope, Processors processors, int maxP /** * Returns the largest offset waiting to be committed, if exists. - * + *

* It returns non-empty value with offset which is larger than the offset * reported last by {@link #updateCommittedOffset(long)}. * @@ -146,6 +147,10 @@ public int pendingTasksCount() { return commitControl.pendingOffsetsCount(); } + public boolean shouldPausePartition() { + return pendingTasksCount() >= maxPendingRecords; + } + public void destroyProcessors() throws Exception { partitionProcessor.close(); processors.destroyPartitionScope(scope.subscriptionId(), scope.topicPartition()); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java index 737acfa6..15a193d8 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContexts.java @@ -36,6 +36,7 @@ import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; +import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentConfig; import com.linecorp.decaton.processor.runtime.internal.AssignmentManager.AssignmentStore; import com.linecorp.decaton.processor.runtime.internal.CommitManager.OffsetsStore; @@ -48,7 +49,6 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti private final SubscriptionScope scope; private final Processors processors; private final Property processingRateProp; - private final int maxPendingRecords; private final Map contexts; private final ReentrantLock propertyReloadLock; @@ -58,26 +58,11 @@ public PartitionContexts(SubscriptionScope scope, Processors processors) { this.processors = processors; processingRateProp = scope.props().get(ProcessorProperties.CONFIG_PROCESSING_RATE); - // We don't support dynamic reload of this value so fix at the time of boot-up. - maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value(); contexts = new HashMap<>(); propertyReloadLock = new ReentrantLock(); - scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> { - // This listener will be called at listener registration. - // It's not necessary to reload contexts at listener registration because PartitionContexts hasn't been instantiated at that time. - if (oldVal == null) { - return; - } - - propertyReloadLock.lock(); - try { - contexts.values().forEach(context -> context.reloadRequested(true)); - logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal); - } finally { - propertyReloadLock.unlock(); - } - }); + registerReloadListener(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY); + registerReloadListener(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS); } public PartitionContext get(TopicPartition tp) { @@ -211,14 +196,10 @@ public void updateHighWatermarks() { contexts.values().forEach(PartitionContext::updateHighWatermark); } - private boolean shouldPartitionPaused(int pendingRecords) { - return pendingRecords >= maxPendingRecords; - } - // visible for testing PartitionContext instantiateContext(TopicPartition tp) { PartitionScope partitionScope = new PartitionScope(scope, tp); - return new PartitionContext(partitionScope, processors, maxPendingRecords); + return new PartitionContext(partitionScope, processors); } // visible for testing @@ -239,7 +220,7 @@ public List partitionsNeedsPause() { .filter(c -> !c.paused()) .filter(c -> pausingAll || c.reloadRequested() - || shouldPartitionPaused(c.pendingTasksCount())) + || c.shouldPausePartition()) .map(PartitionContext::topicPartition) .collect(toList()); } @@ -252,7 +233,7 @@ public List partitionsNeedsResume() { .filter(PartitionContext::paused) .filter(c -> !pausingAll && !c.reloadRequested() - && !shouldPartitionPaused(c.pendingTasksCount())) + && !c.shouldPausePartition()) .map(PartitionContext::topicPartition) .collect(toList()); } @@ -301,6 +282,26 @@ public void maybeHandlePropertyReload() { } } + private void registerReloadListener(PropertyDefinition property) { + scope.props().get(property).listen((oldVal, newVal) -> { + // This listener will also be called at listener registration. + // It's not necessary to reload contexts at listener registration, + // because PartitionContexts hasn't been instantiated at that time. + if (oldVal == null) { + return; + } + + propertyReloadLock.lock(); + try { + contexts.values().forEach(context -> context.reloadRequested(true)); + logger.info("Requested reload of `{}`: oldValue={}, newValue={}", + property.name(), oldVal, newVal); + } finally { + propertyReloadLock.unlock(); + } + }); + } + private void reloadContexts(Collection topicPartitions) { logger.info("Start dropping partition contexts({})", topicPartitions); removePartition(topicPartitions); @@ -317,7 +318,8 @@ private void destroyProcessors(Collection partitions) { try { context.destroyProcessors(); } catch (Exception e) { - logger.error("Failed to close partition context for {}", context.topicPartition(), e); + logger.error("Failed to close partition context for {}", + context.topicPartition(), e); } }).collect(toList())) .join(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 93f08087..0df3ecdd 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; @@ -183,7 +185,7 @@ public void testOffsetRegression() throws Exception { doAnswer(invocation -> { listener.set(invocation.getArgument(1)); return null; - }).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + }).when(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class)); BlockingQueue feedOffsets = new ArrayBlockingQueue<>(4); feedOffsets.add(100L); @@ -203,8 +205,8 @@ public void testOffsetRegression() throws Exception { committedOffsets.putAll(invocation.getArgument(0)); return null; }; - doAnswer(storeCommitOffsets).when(consumer).commitSync(any(Map.class)); - doAnswer(storeCommitOffsets).when(consumer).commitAsync(any(Map.class), any()); + doAnswer(storeCommitOffsets).when(consumer).commitSync(anyMap()); + doAnswer(storeCommitOffsets).when(consumer).commitAsync(anyMap(), any()); AtomicBoolean first = new AtomicBoolean(); doAnswer(invocation -> { diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java index a3a84d09..aa688bef 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/CommitManagerTest.java @@ -19,6 +19,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -68,7 +69,6 @@ public class CommitManagerTest { public void setUp() { commitManager = spy(new CommitManager(consumer, commitIntervalMillis, store, clock)); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync() { // When committed ended up successfully update committed offsets @@ -77,29 +77,27 @@ public void testCommitCompletedOffsetsSync() { doReturn(offsets).when(store).commitReadyOffsets(); commitManager.commitSync(); - verify(consumer, times(1)).commitSync(any(Map.class)); + verify(consumer, times(1)).commitSync(anyMap()); verify(store, times(1)).storeCommittedOffsets(offsets); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync_NO_COMMIT() { // When target offsets is empty do not attempt any commit doReturn(emptyMap()).when(store).commitReadyOffsets(); commitManager.commitSync(); - verify(consumer, never()).commitSync(any(Map.class)); - verify(consumer, never()).commitAsync(any(Map.class), any()); + verify(consumer, never()).commitSync(anyMap()); + verify(consumer, never()).commitAsync(anyMap(), any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsSync_FAIL() { // When commit raised an exception do not update committed offsets Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); - doThrow(new RuntimeException("error")).when(consumer).commitSync(any(Map.class)); + doThrow(new RuntimeException("error")).when(consumer).commitSync(anyMap()); try { commitManager.commitSync(); } catch (RuntimeException ignored) { @@ -108,9 +106,8 @@ public void testCommitCompletedOffsetsSync_FAIL() { verify(store, never()).storeCommittedOffsets(any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testCommitCompletedOffsetsAsync() throws InterruptedException { + public void testCommitCompletedOffsetsAsync() { Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); @@ -119,17 +116,17 @@ public void testCommitCompletedOffsetsAsync() throws InterruptedException { OffsetCommitCallback cb = invocation.getArgument(1); cbRef.set(cb); return null; - }).when(consumer).commitAsync(any(Map.class), any()); + }).when(consumer).commitAsync(anyMap(), any()); commitManager.commitAsync(); // Committed offsets should not be updated yet here - verify(consumer, times(1)).commitAsync(any(Map.class), any()); + verify(consumer, times(1)).commitAsync(anyMap(), any()); verify(store, never()).storeCommittedOffsets(any()); // Subsequent async commit attempt should be ignored until the in-flight one completes commitManager.commitAsync(); - verify(consumer, times(1)).commitAsync(any(Map.class), any()); + verify(consumer, times(1)).commitAsync(anyMap(), any()); verify(store, never()).storeCommittedOffsets(any()); // Committed offset should be updated once the in-flight request completes @@ -137,9 +134,8 @@ public void testCommitCompletedOffsetsAsync() throws InterruptedException { verify(store, times(1)).storeCommittedOffsets(offsets); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { + public void testCommitCompletedOffsetsAsync_FAIL() { Map offsets = singletonMap( new TopicPartition("topic", 0), new OffsetAndMetadata(1234, null)); doReturn(offsets).when(store).commitReadyOffsets(); @@ -148,7 +144,7 @@ public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { OffsetCommitCallback cb = invocation.getArgument(1); cbRef.set(cb); return null; - }).when(consumer).commitAsync(any(Map.class), any()); + }).when(consumer).commitAsync(anyMap(), any()); commitManager.commitAsync(); // If async commit fails it should never update committed offset @@ -156,7 +152,6 @@ public void testCommitCompletedOffsetsAsync_FAIL() throws InterruptedException { verify(store, never()).storeCommittedOffsets(any()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testCommitCompletedOffsetsAsync_SUBSEQUENT_SYNC() { Map offsets = singletonMap( @@ -168,7 +163,7 @@ public void testCommitCompletedOffsetsAsync_SUBSEQUENT_SYNC() { // Subsequent sync commit can proceed regardless of in-flight async commit commitManager.commitSync(); - verify(consumer, times(1)).commitSync(any(Map.class)); + verify(consumer, times(1)).commitSync(anyMap()); } @Test diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java index 63973f25..2ab7dd8d 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ConsumeManagerTest.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; @@ -29,7 +30,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -85,7 +85,7 @@ public void setUp() { doAnswer(invocation -> { rebalanceListener = invocation.getArgument(1); return null; - }).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + }).when(consumer).subscribe(anyCollection(), any(ConsumerRebalanceListener.class)); consumeManager.init(singletonList(TOPIC)); } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java index 569ddadb..84f75e9b 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextTest.java @@ -19,11 +19,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -38,6 +41,7 @@ import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.PerKeyQuotaConfig; import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.internal.PerKeyQuotaManager.UsageType; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider.NoopTrace; @@ -46,10 +50,14 @@ public class PartitionContextTest { @Rule public MockitoRule rule = MockitoJUnit.rule(); + private static final Property MAX_PENDING_RECORDS = + Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 10); + private static PartitionScope scope(String topic, Optional perKeyQuotaConfig) { return new PartitionScope( new SubscriptionScope("subscription", "topic", - Optional.empty(), perKeyQuotaConfig, ProcessorProperties.builder().build(), + Optional.empty(), perKeyQuotaConfig, + ProcessorProperties.builder().set(MAX_PENDING_RECORDS).build(), NoopTracingProvider.INSTANCE, ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, DefaultSubPartitioner::new), @@ -61,13 +69,11 @@ private static PartitionScope scope(String topic, Optional pe @Mock private PartitionProcessor partitionProcessor; @Mock - ConsumerRecord record; - - private static final int MAX_PENDING_RECORDS = 100; + private ConsumerRecord record; @Test public void testOffsetWaitingCommit() { - PartitionContext context = new PartitionContext(scope("topic", Optional.empty()), processors, MAX_PENDING_RECORDS); + PartitionContext context = new PartitionContext(scope("topic", Optional.empty()), processors); assertFalse(context.offsetWaitingCommit().isPresent()); OffsetState state = context.registerOffset(100); @@ -81,28 +87,60 @@ public void testOffsetWaitingCommit() { assertFalse(context.offsetWaitingCommit().isPresent()); } + @Test + public void testShouldPausePartition() { + PartitionContext context = new PartitionContext(scope("topic", Optional.empty()), processors); + assertFalse(context.shouldPausePartition()); + + // Register MAX-1 records, which should not pause the partition. + int limit = MAX_PENDING_RECORDS.value(); + List states = new ArrayList<>(); + for (int i = 0; i < limit - 1; i++) { + states.add(context.registerOffset(100 + i)); + } + assertEquals(limit - 1, context.pendingTasksCount()); + assertFalse(context.shouldPausePartition()); + + // Register one more record, requiring to pause the partition. + states.add(context.registerOffset(200)); + assertEquals(limit, context.pendingTasksCount()); + assertTrue(context.shouldPausePartition()); + + // Complete the first record, allowing resume. + states.get(0).completion().complete(); + context.updateHighWatermark(); + assertEquals(limit - 1, context.pendingTasksCount()); + assertFalse(context.shouldPausePartition()); + + // Complete all records. + states.forEach(state -> state.completion().complete()); + context.updateHighWatermark(); + assertEquals(0, context.pendingTasksCount()); + assertFalse(context.shouldPausePartition()); + } + @Test public void testQuotaUsage() { PartitionContext context = new PartitionContext( - scope("topic", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic", Optional.of(PerKeyQuotaConfig.shape())), processors); assertEquals(UsageType.COMPLY, context.maybeRecordQuotaUsage(new byte[0]).type()); } @Test public void testQuotaUsageWhenDisabled() { PartitionContext context = new PartitionContext( - scope("topic", Optional.empty()), processors, MAX_PENDING_RECORDS); + scope("topic", Optional.empty()), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); } @Test public void testQuotaUsageNonTargetTopic() { PartitionContext context = new PartitionContext( - scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); context = new PartitionContext( - scope("topic-retry", Optional.of(PerKeyQuotaConfig.shape())), processors, MAX_PENDING_RECORDS); + scope("topic-retry", Optional.of(PerKeyQuotaConfig.shape())), processors); assertNull(context.maybeRecordQuotaUsage(new byte[0])); } @@ -111,8 +149,7 @@ public void testQuotaApplied() { PartitionContext context = new PartitionContext( scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, - partitionProcessor, - MAX_PENDING_RECORDS); + partitionProcessor); context.addRecord(record, new OffsetState(42L), NoopTrace.INSTANCE, (r, o, q) -> true); verify(partitionProcessor, never()).addTask(any()); @@ -123,8 +160,7 @@ public void testQuotaNotApplied() { PartitionContext context = new PartitionContext( scope("topic-shaping", Optional.of(PerKeyQuotaConfig.shape())), processors, - partitionProcessor, - MAX_PENDING_RECORDS); + partitionProcessor); context.addRecord(record, new OffsetState(42L), NoopTrace.INSTANCE, (r, o, q) -> false); verify(partitionProcessor, times(1)).addTask(any()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java index c9f7f426..56cf3615 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/PartitionContextsTest.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.processor.runtime.internal; +import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_MAX_PENDING_RECORDS; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PROCESSING_RATE; import static java.util.Arrays.asList; @@ -56,24 +57,24 @@ import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; -import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; public class PartitionContextsTest { @Rule public MockitoRule rule = MockitoJUnit.rule(); - private static final int PENDING_RECORDS_TO_PAUSE = 10; - private final DynamicProperty partitionConcurrencyProperty = new DynamicProperty<>(CONFIG_PARTITION_CONCURRENCY); + private final DynamicProperty maxPendingRecordsProperty = + new DynamicProperty<>(CONFIG_MAX_PENDING_RECORDS); + private final DynamicProperty processingRateProp = new DynamicProperty<>(CONFIG_PROCESSING_RATE); private final ProcessorProperties props = ProcessorProperties .builder() - .set(Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, PENDING_RECORDS_TO_PAUSE)) .set(partitionConcurrencyProperty) + .set(maxPendingRecordsProperty) .set(processingRateProp) .build(); @@ -108,6 +109,7 @@ private static TopicPartition tp(int partition) { @Before public void setup() { partitionConcurrencyProperty.set(1); + maxPendingRecordsProperty.set(10); contexts = spy(new PartitionContexts(scope, processors)); } @@ -161,8 +163,6 @@ public void testUpdateCommittedOffset() { public void testPartitionsNeedsPause() { List cts = putContexts(2); - doReturn(0).when(cts.get(0)).pendingTasksCount(); - doReturn(0).when(cts.get(1)).pendingTasksCount(); doReturn(false).when(cts.get(0)).reloadRequested(); doReturn(false).when(cts.get(1)).reloadRequested(); @@ -188,12 +188,12 @@ public void testPartitionsNeedsPause() { assertTrue(needPause.isEmpty()); // Pause 1 partition. - doReturn(PENDING_RECORDS_TO_PAUSE).when(cts.get(0)).pendingTasksCount(); + doReturn(true).when(cts.get(0)).shouldPausePartition(); needPause = contexts.partitionsNeedsPause(); assertEquals(1, needPause.size()); assertEquals(cts.get(0).topicPartition(), needPause.iterator().next()); - // Mark as paused so it should disappear from the response. + // Mark as paused, so it should disappear from the response. doReturn(true).when(cts.get(0)).paused(); needPause = contexts.partitionsNeedsPause(); assertTrue(needPause.isEmpty()); @@ -221,14 +221,11 @@ public void testPartitionsNeedsPause() { public void testPartitionsNeedsResume() { List cts = putContexts(2); - doReturn(0).when(cts.get(0)).pendingTasksCount(); - doReturn(0).when(cts.get(1)).pendingTasksCount(); - Collection needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); // Pause 1 partition. - doReturn(PENDING_RECORDS_TO_PAUSE).when(cts.get(0)).pendingTasksCount(); + doReturn(true).when(cts.get(0)).shouldPausePartition(); needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); @@ -237,20 +234,20 @@ public void testPartitionsNeedsResume() { needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); - // Task count is now lower than threshold. Should appear in resume response. - doReturn(PENDING_RECORDS_TO_PAUSE - 1).when(cts.get(0)).pendingTasksCount(); + // Partition no longer needs to be paused. Should appear in resume response. + doReturn(false).when(cts.get(0)).shouldPausePartition(); needsResume = contexts.partitionsNeedsResume(); assertEquals(1, needsResume.size()); assertEquals(cts.get(0).topicPartition(), needsResume.iterator().next()); // All processing paused by rate limiting. - // All partitions should be disappear from resume response. + // All partitions should disappear from resume response. doReturn(true).when(contexts).pausingAllProcessing(); doReturn(true).when(cts.get(1)).paused(); needsResume = contexts.partitionsNeedsResume(); assertTrue(needsResume.isEmpty()); - // All pause finished. Now only partitions should be appear in resume response. + // All pause finished. Now all partitions should appear in resume response. doReturn(false).when(contexts).pausingAllProcessing(); needsResume = contexts.partitionsNeedsResume(); assertEquals(cts.stream().map(PartitionContext::topicPartition).collect(Collectors.toSet()), @@ -282,6 +279,8 @@ public void testShouldNotBePausingAllProcessingByPropertyReload() { assertFalse(contexts.pausingAllProcessing()); partitionConcurrencyProperty.set(42); assertFalse(contexts.pausingAllProcessing()); + maxPendingRecordsProperty.set(42); + assertFalse(contexts.pausingAllProcessing()); } @Test