Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Narrow down the scope of waiting for pending tasks to per partition #191

Merged
merged 8 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor;

import static org.junit.Assert.assertEquals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.processor.internal.HashableByteArray;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protocol.Sample.HelloTask;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;

public class PropertyReloadRequestTest {

@ClassRule
public static KafkaClusterRule rule = new KafkaClusterRule();

private String topicName;

@Before
public void setUp() {
topicName = rule.admin().createRandomTopic(3, 3);
}

@After
public void tearDown() {
rule.admin().deleteTopics(true, topicName);
}

@Test(timeout = 30000)
public void testPropertyDynamicSwitch() throws Exception {
Set<String> keys = new HashSet<>();

for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
processedKeys.add(new HashableByteArray(context.key()));
processLatch.countDown();
};

DynamicProperty<Integer> concurrencyProp =
new DynamicProperty<>(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY);
concurrencyProp.set(1);
try (ProcessorSubscription subscription = TestUtils.subscription(
rule.bootstrapServers(),
builder -> builder.processorsBuilder(ProcessorsBuilder
.consuming(topicName,
new ProtocolBuffersDeserializer<>(
HelloTask.parser()))
.thenProcess(processor))
.addProperties(StaticPropertySupplier.of(concurrencyProp)));
DecatonClient<HelloTask> client = TestUtils.client(topicName, rule.bootstrapServers())) {

int count = 0;
for (String key : keys) {
count++;
if (count == 1000) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(3);
} else if (count == 5000) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(1);
} else if (count == 7500) {
TimeUnit.SECONDS.sleep(1);
concurrencyProp.set(5);
}
client.put(key, HelloTask.getDefaultInstance());
}
processLatch.await();
}

assertEquals(10000, processedKeys.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public class ProcessorProperties extends AbstractDecatonProperties {
&& (long) v <= RateLimiter.MAX_RATE);
/**
* Concurrency used to process tasks coming from single partition.
* Reloading this property will pause all assigned partitions until current pending tasks have done.
* Reloading this property will be performed for each assigned partition as soon as
* the current pending tasks of the assigned partition have done.
*
* Reloadable: yes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class PartitionContext implements AutoCloseable {
@Setter
private boolean revoking;

/**
* Indicates that if true, reloading is requested and not completed.
* This is used to perform reloading processing for each partition.
*/
@Getter
@Setter
private volatile boolean reloadRequested;

public PartitionContext(PartitionScope scope, Processors<?> processors, int maxPendingRecords) {
this.scope = scope;
this.processors = processors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -52,7 +51,7 @@ public class PartitionContexts implements OffsetsStore, AssignmentStore, Partiti
private final int maxPendingRecords;
private final Map<TopicPartition, PartitionContext> contexts;

private final AtomicBoolean reloadRequested;
private final ReentrantLock propertyReloadLock;

public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
this.scope = scope;
Expand All @@ -62,7 +61,7 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
// We don't support dynamic reload of this value so fix at the time of boot-up.
maxPendingRecords = scope.props().get(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS).value();
contexts = new HashMap<>();
reloadRequested = new AtomicBoolean(false);
propertyReloadLock = new ReentrantLock();

scope.props().get(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY).listen((oldVal, newVal) -> {
// This listener will be called at listener registration.
Expand All @@ -71,8 +70,12 @@ public PartitionContexts(SubscriptionScope scope, Processors<?> processors) {
return;
}

if (!reloadRequested.getAndSet(true)) {
propertyReloadLock.lock();
try {
ocadaruma marked this conversation as resolved.
Show resolved Hide resolved
contexts.values().forEach(context -> context.reloadRequested(true));
logger.info("Requested reload partition.concurrency oldValue={}, newValue={}", oldVal, newVal);
} finally {
propertyReloadLock.unlock();
}
});
}
Expand All @@ -88,26 +91,41 @@ public Set<TopicPartition> assignedPartitions() {

@Override
public void addPartitions(Map<TopicPartition, AssignmentConfig> partitions) {
for (Entry<TopicPartition, AssignmentConfig> entry : partitions.entrySet()) {
TopicPartition tp = entry.getKey();
AssignmentConfig conf = entry.getValue();
initContext(tp, conf.paused());
propertyReloadLock.lock();
try {
for (Entry<TopicPartition, AssignmentConfig> entry : partitions.entrySet()) {
TopicPartition tp = entry.getKey();
AssignmentConfig conf = entry.getValue();
initContext(tp, conf.paused());
}
} finally {
propertyReloadLock.unlock();
}
}

@Override
public void removePartition(Collection<TopicPartition> partitions) {
destroyProcessors(partitions);
cleanupPartitions(partitions);
propertyReloadLock.lock();
try {
destroyProcessors(partitions);
cleanupPartitions(partitions);
} finally {
propertyReloadLock.unlock();
}
}

private void cleanupPartitions(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
try {
contexts.remove(tp).close();
} catch (Exception e) {
logger.warn("Failed to close partition context {}", tp, e);
propertyReloadLock.lock();
try {
for (TopicPartition tp : partitions) {
try {
contexts.remove(tp).close();
} catch (Exception e) {
logger.warn("Failed to close partition context {}", tp, e);
}
}
} finally {
propertyReloadLock.unlock();
}
}

Expand All @@ -120,12 +138,17 @@ private void cleanupPartitions(Collection<TopicPartition> partitions) {
*/
// visible for testing
PartitionContext initContext(TopicPartition tp, boolean paused) {
PartitionContext context = instantiateContext(tp);
if (paused) {
context.pause();
propertyReloadLock.lock();
try {
PartitionContext context = instantiateContext(tp);
if (paused) {
context.pause();
}
contexts.put(tp, context);
return context;
} finally {
propertyReloadLock.unlock();
}
contexts.put(tp, context);
return context;
}

/**
Expand Down Expand Up @@ -200,7 +223,7 @@ PartitionContext instantiateContext(TopicPartition tp) {

// visible for testing
boolean pausingAllProcessing() {
return processingRateProp.value() == RateLimiter.PAUSED || reloadRequested.get();
return processingRateProp.value() == RateLimiter.PAUSED;
}

@Override
Expand All @@ -214,7 +237,9 @@ public List<TopicPartition> partitionsNeedsPause() {
return contexts.values().stream()
.filter(c -> !c.revoking())
.filter(c -> !c.paused())
.filter(c -> pausingAll || shouldPartitionPaused(c.pendingTasksCount()))
.filter(c -> pausingAll
|| c.reloadRequested()
|| shouldPartitionPaused(c.pendingTasksCount()))
.map(PartitionContext::topicPartition)
.collect(toList());
}
Expand All @@ -225,7 +250,9 @@ public List<TopicPartition> partitionsNeedsResume() {
return contexts.values().stream()
.filter(c -> !c.revoking())
.filter(PartitionContext::paused)
.filter(c -> !pausingAll && !shouldPartitionPaused(c.pendingTasksCount()))
.filter(c -> !pausingAll
&& !c.reloadRequested()
&& !shouldPartitionPaused(c.pendingTasksCount()))
.map(PartitionContext::topicPartition)
.collect(toList());
}
Expand All @@ -245,34 +272,43 @@ public void partitionsResumed(List<TopicPartition> partitions) {
}

/**
* Waits for all pending tasks if property-reload is requested, then recreate all partition contexts with latest property values.
* Waits for pending tasks if property-reload is requested,
* then recreate partition contexts with latest property values.
* This method must be called from only subscription thread.
*/
public void maybeHandlePropertyReload() {
if (reloadRequested.get()) {
if (totalPendingTasks() > 0) {
logger.debug("Waiting pending tasks for property reload.");
propertyReloadLock.lock();
try {
List<TopicPartition> reloadableTopicPartitions = contexts.entrySet()
.stream()
.filter(entry -> entry.getValue().reloadRequested()
&& entry.getValue().pendingTasksCount() == 0)
.map(Entry::getKey)
.collect(toList());
if (reloadableTopicPartitions.isEmpty()) {
return;
}
// it's ok to check-and-set reloadRequested without synchronization
// because this field is set to false only in this method, and this method is called from only subscription thread.
reloadRequested.set(false);
logger.info("Completed waiting pending tasks. Start reloading partition contexts");
reloadContexts();
reloadContexts(reloadableTopicPartitions);
long reloadingPartitions = contexts.values()
.stream()
.filter(PartitionContext::reloadRequested)
.count();
if (reloadingPartitions == 0) {
logger.info("Completed reloading all partition contexts");
}
} finally {
propertyReloadLock.unlock();
}
}

private void reloadContexts() {
// Save current topicPartitions into copy to update contexts map while iterating over this copy.
Set<TopicPartition> topicPartitions = new HashSet<>(contexts.keySet());

logger.info("Start dropping partition contexts");
private void reloadContexts(Collection<TopicPartition> topicPartitions) {
logger.info("Start dropping partition contexts({})", topicPartitions);
removePartition(topicPartitions);
logger.info("Finished dropping partition contexts. Start recreating partition contexts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nits] This log and this line

should also include topicPartitions

Map<TopicPartition, AssignmentConfig> configs = topicPartitions.stream().collect(
toMap(Function.identity(), tp -> new AssignmentConfig(true)));
addPartitions(configs);
logger.info("Completed reloading property");
logger.info("Completed reloading partition contexts({})", topicPartitions);
}

private void destroyProcessors(Collection<TopicPartition> partitions) {
Expand Down
Loading