Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Narrow down the scope of waiting for pending tasks to per partition #191

Merged
merged 8 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class PartitionContext implements AutoCloseable {
@Setter
private boolean revoking;

@Getter
private boolean reloading;

public PartitionContext(PartitionScope scope, Processors<?> processors, int maxPendingRecords) {
this.scope = scope;
this.processors = processors;
Expand Down Expand Up @@ -157,6 +160,13 @@ public void resume() {
metrics.partitionPausedTime.record(pausedNanos, TimeUnit.NANOSECONDS);
}

public void reloading(boolean reloading) {
this.reloading = reloading;
if (reloading) {
pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pausing here by PartitionContext itself doesn't work to pause fetching actually.

Decaton's fetch-pause works like:

So, if we pause here by PartitionContext itself, it will not be included in (1) (because it's already "paused").

Correct approach here may be holding per-partition reloading flag in PartitionContexts, and include them in partitionsNeedsPause

}
}

@Override
public void close() throws Exception {
resume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -53,6 +56,7 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti
private final Map<TopicPartition, PartitionContext> contexts;

private final AtomicBoolean reloadRequested;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this reloadRequested flag? Since we manage reloading states in reloadStates hash map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reloadRequested is useful for quickly determining if contexts should be reloaded without iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, IMO having duplicated info (with per-partition reload) could be an error prone when we maintain this code in the future, while iterating over partitions unlikely becomes a bottleneck (since we already doing iterating partitions for updating high watermarks, checking partition needs pause/resume) so the benefit to have reloadRequested separately is small.

private final ReentrantLock lock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to something like propertyReloadLock for understandability


public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
this.scope = scope;
Expand All @@ -63,6 +67,7 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value();
contexts = new HashMap<>();
reloadRequested = new AtomicBoolean(false);
lock = new ReentrantLock();

scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> {
// This listener will be called at listener registration.
Expand All @@ -71,8 +76,16 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
return;
}

if (!reloadRequested.getAndSet(true)) {
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
lock.lock();
try {
ocadaruma marked this conversation as resolved.
Show resolved Hide resolved
if (!reloadRequested.getAndSet(true)) {
for (PartitionContext context: contexts.values()) {
context.reloading(true);
}
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
}
} finally {
lock.unlock();
}
});
}
Expand Down Expand Up @@ -200,7 +213,7 @@ PartitionContext instantiateContext(TopicPartition tp) {

// visible for testing
boolean pausingAllProcessing() {
return processingRateProp.value() == RateLimiter.PAUSED || reloadRequested.get();
return processingRateProp.value() == RateLimiter.PAUSED;
}

@Override
Expand Down Expand Up @@ -250,23 +263,30 @@ public void partitionsResumed(List<TopicPartition> partitions) {
*/
public void maybeHandlePropertyReload() {
if (reloadRequested.get()) {
if (totalPendingTasks() > 0) {
logger.debug("Waiting pending tasks for property reload.");
return;
lock.lock();
try {
List<TopicPartition> reloadableTopicPartitions = contexts.entrySet()
.stream()
.filter(entry -> entry.getValue().reloading() && entry.getValue().pendingTasksCount() == 0)
.map(entry -> entry.getKey())
.collect(Collectors.toList());
reloadContexts(reloadableTopicPartitions);
long reloadingPartitions = contexts.values()
.stream()
.filter(PartitionContext::reloading)
.count();
if (reloadingPartitions == 0) {
reloadRequested.set(false);
logger.info("Completed reloading all partition contexts");
}
} finally {
lock.unlock();
}
// it's ok to check-and-set reloadRequested without synchronization
// because this field is set to false only in this method, and this method is called from only subscription thread.
reloadRequested.set(false);
logger.info("Completed waiting pending tasks. Start reloading partition contexts");
reloadContexts();
}
}

private void reloadContexts() {
// Save current topicPartitions into copy to update contexts map while iterating over this copy.
Set<TopicPartition> topicPartitions = new HashSet<>(contexts.keySet());

logger.info("Start dropping partition contexts");
private void reloadContexts(Collection<TopicPartition> topicPartitions) {
logger.info("Start dropping partition context({})", topicPartitions);
removePartition(topicPartitions);
logger.info("Finished dropping partition contexts. Start recreating partition contexts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nits] This log and this line

should also include topicPartitions

Map<TopicPartition, AssignmentConfig> configs = topicPartitions.stream().collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -256,38 +257,59 @@ public void testPausingAllProcessing() {
}

@Test
public void testPausingAllProcessingByPropertyReload() {
public void testShouldNotBePausingAllProcessingByPropertyReload() {
assertFalse(contexts.pausingAllProcessing());
partitionConcurrencyProperty.set(42);
assertTrue(contexts.pausingAllProcessing());
assertFalse(contexts.pausingAllProcessing());
}

@Test
public void testMaybeHandlePropertyReload() {
putContexts(12);

int count = 12;
List<PartitionContext> allContexts = putContexts(count);
List<PartitionContext> pendingContexts = new ArrayList<>();
List<PartitionContext> reloadableContexts = new ArrayList<>();
for (int i = 0; i < count; i++) {
PartitionContext context = allContexts.get(i);
if (i % 3 == 0) {
doReturn(100).when(context).pendingTasksCount();
doReturn(true).when(context).reloading();
pendingContexts.add(context);
} else {
doReturn(0).when(context).pendingTasksCount();
doReturn(true).when(context).reloading();
reloadableContexts.add(context);
}
}
clearInvocations(contexts);

PartitionContext context = mock(PartitionContext.class);
doReturn(context).when(contexts).instantiateContext(any());

// there are some pending tasks
doReturn(100).when(contexts).totalPendingTasks();

contexts.maybeHandlePropertyReload();
// property reload is not requested yet
verify(contexts, never()).instantiateContext(any());
for (PartitionContext context: allContexts) {
verify(context, never()).reloading(true);
}

partitionConcurrencyProperty.set(42);
contexts.maybeHandlePropertyReload();

for (PartitionContext context: allContexts) {
verify(context).reloading(true);
}

// property reload is requested, but there are pending tasks
verify(contexts, never()).instantiateContext(any());
verify(contexts, times(reloadableContexts.size())).instantiateContext(any());
for (PartitionContext context: reloadableContexts) {
doReturn(false).when(context).reloading();
}

// pending tasks done
doReturn(0).when(contexts).totalPendingTasks();
for (PartitionContext context: pendingContexts) {
doReturn(0).when(context).pendingTasksCount();
doReturn(true).when(context).reloading();
}
contexts.maybeHandlePropertyReload();

verify(contexts, times(12)).instantiateContext(any());
// completed reloading request
verify(contexts, times(count)).instantiateContext(any());
}

@Test
Expand Down