From bad954c3b27b9e5236b93c6b0e2be219337fa179 Mon Sep 17 00:00:00 2001 From: novisfff <62633257+novisfff@users.noreply.github.com> Date: Tue, 16 Aug 2022 10:40:09 +0800 Subject: [PATCH] [HUDI-4574] Fixed timeline based marker thread safety issue (#6383) * fixed timeline based markers thread safety issue * add document for TimelineBasedMarkers thread safety issues --- .../hudi/timeline/service/handlers/MarkerHandler.java | 5 +++-- .../handlers/marker/MarkerCreationDispatchingRunnable.java | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index e793c20432f92..32b85bdfc35b6 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -34,7 +34,7 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -74,7 +74,8 @@ public class MarkerHandler extends Handler { // Parallelism for reading and deleting marker files private final int parallelism; // Marker directory states, {markerDirPath -> MarkerDirState instance} - private final Map markerDirStateMap = new HashMap<>(); + // Use ConcurrentHashMap to ensure thread safety in dispatchingExecutorService + private final Map markerDirStateMap = new ConcurrentHashMap<>(); // A thread to dispatch marker creation requests to batch processing threads private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable; private final Object firstCreationRequestSeenLock = new Object(); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationDispatchingRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationDispatchingRunnable.java index 93093a4b4c4b7..5f647e9b3fe08 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationDispatchingRunnable.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationDispatchingRunnable.java @@ -66,8 +66,10 @@ public void run() { // Only fetch pending marker creation requests that can be processed, // i.e., that markers can be written to a underlying file - for (String markerDir : markerDirStateMap.keySet()) { - MarkerDirState markerDirState = markerDirStateMap.get(markerDir); + // markerDirStateMap is used in other thread, need to ensure thread safety + for (Map.Entry entry : markerDirStateMap.entrySet()) { + String markerDir = entry.getKey(); + MarkerDirState markerDirState = entry.getValue(); Option fileIndex = markerDirState.getNextFileIndexToUse(); if (!fileIndex.isPresent()) { LOG.debug("All marker files are busy, skip batch processing of create marker requests in " + markerDir);