diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index bc300f80f5..3afa9cce24 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -108,7 +108,7 @@ private static class PendingProduceCallback implements Runnable { final CompletableFuture> completableFuture; Map entriesPerPartition; @Override - public void run() { + public synchronized void run() { topicPartitionNum.set(0); if (completableFuture.isDone()) { // It may be triggered again in DelayedProduceAndFetch @@ -116,9 +116,10 @@ public void run() { } // add the topicPartition with timeout error if it's not existed in responseMap entriesPerPartition.keySet().forEach(topicPartition -> { - if (!responseMap.containsKey(topicPartition)) { + ProduceResponse.PartitionResponse response = responseMap.putIfAbsent(topicPartition, + new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); + if (response == null) { log.error("Adding dummy REQUEST_TIMED_OUT to produce response for {}", topicPartition); - responseMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); } }); if (log.isDebugEnabled()) {