Skip to content

Commit

Permalink
Narrow down the scope of waiting for pending tasks to per partition
Browse files Browse the repository at this point in the history
  • Loading branch information
ta7uw committed Mar 2, 2023
1 parent 916403f commit c4bbbc1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -53,6 +55,8 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti
private final Map<TopicPartition, PartitionContext> contexts;

private final AtomicBoolean reloadRequested;
private final ReentrantLock lock;
private HashSet<TopicPartition> reloadedPartitions;

public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
this.scope = scope;
Expand All @@ -63,6 +67,8 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value();
contexts = new HashMap<>();
reloadRequested = new AtomicBoolean(false);
lock = new ReentrantLock();
reloadedPartitions = new HashSet<>();

scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> {
// This listener will be called at listener registration.
Expand All @@ -71,8 +77,14 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
return;
}

if (!reloadRequested.getAndSet(true)) {
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
lock.lock();
try {
if (!reloadRequested.getAndSet(true)) {
reloadedPartitions = new HashSet<>();
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
}
} finally {
lock.unlock();
}
});
}
Expand Down Expand Up @@ -250,28 +262,35 @@ public void partitionsResumed(List<TopicPartition> partitions) {
*/
public void maybeHandlePropertyReload() {
if (reloadRequested.get()) {
if (totalPendingTasks() > 0) {
logger.debug("Waiting pending tasks for property reload.");
return;
lock.lock();
try {
Map<TopicPartition, PartitionContext> copiedContexts = new HashMap<>(contexts);
for (Entry<TopicPartition, PartitionContext> entry: copiedContexts.entrySet()) {
TopicPartition topicPartition = entry.getKey();
if (!reloadedPartitions.contains(topicPartition)
&& entry.getValue().pendingTasksCount() == 0) {
logger.debug("Start reloading partition context({})", topicPartition);
reloadContext(topicPartition);
reloadedPartitions.add(topicPartition);
}
}
if (reloadedPartitions.size() == contexts.size()) {
reloadRequested.set(false);
logger.info("Completed reloading all partition contexts");
}
} finally {
lock.unlock();
}
// it's ok to check-and-set reloadRequested without synchronization
// because this field is set to false only in this method, and this method is called from only subscription thread.
reloadRequested.set(false);
logger.info("Completed waiting pending tasks. Start reloading partition contexts");
reloadContexts();
}
}

private void reloadContexts() {
// Save current topicPartitions into copy to update contexts map while iterating over this copy.
Set<TopicPartition> topicPartitions = new HashSet<>(contexts.keySet());

logger.info("Start dropping partition contexts");
removePartition(topicPartitions);
logger.info("Finished dropping partition contexts. Start recreating partition contexts");
Map<TopicPartition, AssignmentConfig> configs = topicPartitions.stream().collect(
toMap(Function.identity(), tp -> new AssignmentConfig(true)));
addPartitions(configs);
private void reloadContext(TopicPartition topicPartition) {
logger.info("Start dropping partition context({})", topicPartition);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(topicPartition);
removePartition(partitions);
logger.info("Finished dropping partition context. Start recreating partition context");
initContext(topicPartition, true);
logger.info("Completed reloading property");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,30 +264,37 @@ public void testPausingAllProcessingByPropertyReload() {

@Test
public void testMaybeHandlePropertyReload() {
putContexts(12);

int count = 12;
List<PartitionContext> allContexts = putContexts(count);
List<PartitionContext> pendingContexts = new ArrayList<>();
List<PartitionContext> reloadableContexts = new ArrayList<>();
for (int i = 0; i < count; i++) {
PartitionContext context = allContexts.get(i);
if (i % 3 == 0) {
doReturn(100).when(context).pendingTasksCount();
pendingContexts.add(context);
} else {
doReturn(0).when(context).pendingTasksCount();
reloadableContexts.add(context);
}
}
clearInvocations(contexts);

PartitionContext context = mock(PartitionContext.class);
doReturn(context).when(contexts).instantiateContext(any());

// there are some pending tasks
doReturn(100).when(contexts).totalPendingTasks();

contexts.maybeHandlePropertyReload();
// property reload is not requested yet
verify(contexts, never()).instantiateContext(any());

partitionConcurrencyProperty.set(42);
contexts.maybeHandlePropertyReload();
// property reload is requested, but there are pending tasks
verify(contexts, never()).instantiateContext(any());
verify(contexts, times(reloadableContexts.size())).instantiateContext(any());

// pending tasks done
doReturn(0).when(contexts).totalPendingTasks();
for (PartitionContext partitionContext: pendingContexts) {
doReturn(0).when(partitionContext).pendingTasksCount();
}
contexts.maybeHandlePropertyReload();

verify(contexts, times(12)).instantiateContext(any());
// completed reloading request
verify(contexts, times(count)).instantiateContext(any());
}

@Test
Expand Down

0 comments on commit c4bbbc1

Please sign in to comment.