Skip to content

Commit

Permalink
Replace synchronized blocks with ReentrantLocks for virtual thread su…
Browse files Browse the repository at this point in the history
…pport

Replace synchronized methods and blocks with ReentrantLocks in a few classes in Spring Kafka
to improve compatibility with virtual threads. This changes the synchronization mechanism in:

- KafkaListenerAnnotationBeanPostProcessor
- KafkaListenerEndpointRegistrar
- KafkaAdmin
- DefaultDestinationTopicResolver
- JsonDeserializer/JsonSerializer

The change helps avoid blocking virtual threads when using Spring Kafka in Project Loom
environments while maintaining thread safety.

**Auto-cherry-pick to `3.2.x`**
  • Loading branch information
omercelikceng authored and sobychacko committed Nov 27, 2024
1 parent 1c030ef commit 3c97a3b
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -144,6 +146,7 @@
* @author Wang Zhiyang
* @author Sanghyeok An
* @author Soby Chacko
* @author Omer Celik
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
Expand Down Expand Up @@ -207,6 +210,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
@Nullable
private RetryTopicConfigurer retryTopicConfigurer;

private final Lock globalLock = new ReentrantLock();

@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
Expand Down Expand Up @@ -278,14 +283,20 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
* @param beanFactory the {@link BeanFactory} to be used.
*/
public synchronized void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
if (beanExpressionResolver != null) {
this.resolver = beanExpressionResolver;
public void setBeanFactory(BeanFactory beanFactory) {
try {
this.globalLock.lock();
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
if (beanExpressionResolver != null) {
this.resolver = beanExpressionResolver;
}
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
}
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
}
finally {
this.globalLock.unlock();
}
}

Expand Down Expand Up @@ -451,36 +462,48 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
}
}

private synchronized void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
List<Method> multiMethods, Class<?> clazz, Object bean, String beanName) {

List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method);
defaultMethod = checked;
try {
this.globalLock.lock();
List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method);
defaultMethod = checked;
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
finally {
this.globalLock.unlock();
}
}

protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
String beanName) {

Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
try {
this.globalLock.lock();
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
}
finally {
this.globalLock.unlock();
}
}

private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
Expand All @@ -40,6 +42,7 @@
* @author Gary Russell
* @author Filip Halemba
* @author Wang Zhiyang
* @author Omer Celik
*
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
*/
Expand All @@ -49,6 +52,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial

private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();

private final Lock endpointsLock = new ReentrantLock();

private KafkaListenerEndpointRegistry endpointRegistry;

private MessageHandlerMethodFactory messageHandlerMethodFactory;
Expand Down Expand Up @@ -188,7 +193,8 @@ public void afterPropertiesSet() {
}

protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
try {
this.endpointsLock.lock();
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle
&& this.validator != null) {
Expand All @@ -199,6 +205,9 @@ protected void registerAllEndpoints() {
}
this.startImmediately = true; // trigger immediate startup
}
finally {
this.endpointsLock.unlock();
}
}

private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
Expand Down Expand Up @@ -234,7 +243,8 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
try {
this.endpointsLock.lock();
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
Expand All @@ -243,6 +253,9 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
this.endpointDescriptors.add(descriptor);
}
}
finally {
this.endpointsLock.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -78,6 +80,7 @@
* @author Sanghyeok An
* @author Valentina Armenise
* @author Anders Swanson
* @author Omer Celik
*
* @since 1.3
*/
Expand All @@ -95,6 +98,8 @@ public class KafkaAdmin extends KafkaResourceFactory

private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();

private final Lock clusterIdLock = new ReentrantLock();

private final Map<String, Object> configs;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -267,12 +272,7 @@ public final boolean initialize() {
}
if (adminClient != null) {
try {
synchronized (this) {
if (this.clusterId != null) {
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
TimeUnit.SECONDS);
}
}
updateClusterId(adminClient);
addOrModifyTopicsIfNeeded(adminClient, newTopics);
return true;
}
Expand All @@ -297,6 +297,19 @@ public final boolean initialize() {
return false;
}

private void updateClusterId(Admin adminClient) throws InterruptedException, ExecutionException, TimeoutException {
try {
this.clusterIdLock.lock();
if (this.clusterId != null) {
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
TimeUnit.SECONDS);
}
}
finally {
this.clusterIdLock.unlock();
}
}

/**
* Return a collection of {@link NewTopic}s to create or modify. The default
* implementation retrieves all {@link NewTopic} beans in the application context and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -49,6 +51,7 @@
* @author Gary Russell
* @author Yvette Quinby
* @author Adrian Chlebosz
* @author Omer Celik
* @since 2.7
*
*/
Expand All @@ -62,6 +65,8 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier

private final Map<String, Map<String, DestinationTopicHolder>> sourceDestinationsHolderMap;

private final Lock sourceDestinationsHolderLock = new ReentrantLock();

private final Clock clock;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -210,9 +215,13 @@ private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, St
}

private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, String topic) {
synchronized (this.sourceDestinationsHolderMap) {
try {
this.sourceDestinationsHolderLock.lock();
return doGetDestinationFor(mainListenerId, topic);
}
finally {
this.sourceDestinationsHolderLock.unlock();
}
}

private DestinationTopicHolder doGetDestinationFor(String mainListenerId, String topic) {
Expand All @@ -229,11 +238,15 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
}
validateDestinations(destinationsToAdd);
synchronized (this.sourceDestinationsHolderMap) {
try {
this.sourceDestinationsHolderLock.lock();
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId,
id -> new HashMap<>());
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
}
finally {
this.sourceDestinationsHolderLock.unlock();
}
}

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
Expand Down
Loading

0 comments on commit 3c97a3b

Please sign in to comment.