Skip to content

Commit

Permalink
* Revert changes in the DefaultKafkaSender
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Feb 23, 2023
1 parent c392973 commit 6681cf6
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand Down Expand Up @@ -78,10 +79,13 @@ public class DefaultKafkaSender<K, V> implements KafkaSender<K, V>, EmitFailureH
* producer properties are supported. The underlying Kafka producer is created lazily when required.
*/
public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> options) {
this.scheduler = Schedulers.newSingle(r -> {
Thread thread = new Thread(r);
thread.setName(options.clientId());
return thread;
this.scheduler = Schedulers.newSingle(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("reactor-kafka-sender-" + System.identityHashCode(this));
return thread;
}
});
this.hasProducer = new AtomicBoolean();
this.senderOptions = options.scheduler(options.isTransactional()
Expand Down Expand Up @@ -133,8 +137,7 @@ public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
});
})
.doOnError(e -> log.trace("Send failed with exception", e))
.publishOn(senderOptions.scheduler(), senderOptions.maxInFlight())
.contextCapture();
.publishOn(senderOptions.scheduler(), senderOptions.maxInFlight());
}

@Override
Expand Down

0 comments on commit 6681cf6

Please sign in to comment.