Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix unknown exception caused by thread safety issue (#2033)
Browse files Browse the repository at this point in the history
### Motivation
Both the response thread and timeout detection thread will access
PendingProduceCallback
This problem will easily occur when the sending latency is close to the
timeout threshold.

Step-1: response thread and timeout detection thread access 1 at the
same time, none of them meet the conditions for return

Step-2: timeout detection thread execute to 2, and set responseMap =
null
Setp-3: response thread execute to 3,  cause NPE

<img width="864" alt="image"
src="https://github.com/streamnative/kop/assets/9758905/96a1a8f5-cf09-4bcd-8f4a-24f9c8205474">


### Modifications
Add synchronized
  • Loading branch information
315157973 authored Jan 4, 2024
1 parent aa2f171 commit 99eeca5
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,18 @@ private static class PendingProduceCallback implements Runnable {
final CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>> completableFuture;
Map<TopicPartition, MemoryRecords> entriesPerPartition;
@Override
public void run() {
public synchronized void run() {
topicPartitionNum.set(0);
if (completableFuture.isDone()) {
// It may be triggered again in DelayedProduceAndFetch
return;
}
// add the topicPartition with timeout error if it's not existed in responseMap
entriesPerPartition.keySet().forEach(topicPartition -> {
if (!responseMap.containsKey(topicPartition)) {
ProduceResponse.PartitionResponse response = responseMap.putIfAbsent(topicPartition,
new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT));
if (response == null) {
log.error("Adding dummy REQUEST_TIMED_OUT to produce response for {}", topicPartition);
responseMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT));
}
});
if (log.isDebugEnabled()) {
Expand Down

0 comments on commit 99eeca5

Please sign in to comment.