From d97137711dd94089222c043ed8bf5b25eb2e1782 Mon Sep 17 00:00:00 2001 From: abilan Date: Thu, 23 Feb 2023 18:14:09 -0500 Subject: [PATCH] * Revert changes in the `DefaultKafkaSender` --- .../sender/internals/DefaultKafkaSender.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java index 0ab5b5de..fc8ae5ed 100644 --- a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java +++ b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -79,10 +80,13 @@ public class DefaultKafkaSender implements KafkaSender, EmitFailureH * producer properties are supported. The underlying Kafka producer is created lazily when required. */ public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions options) { - this.scheduler = Schedulers.newSingle(r -> { - Thread thread = new Thread(r); - thread.setName(options.clientId()); - return thread; + this.scheduler = Schedulers.newSingle(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("reactor-kafka-sender-" + System.identityHashCode(this)); + return thread; + } }); this.hasProducer = new AtomicBoolean(); this.senderOptions = options.scheduler(options.isTransactional() @@ -139,8 +143,7 @@ public void subscribe(CoreSubscriber> s) { }); }) .doOnError(e -> log.trace("Send failed with exception", e)) - .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()) - .contextCapture(); + .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()); } @Override