Skip to content

Commit

Permalink
[HUDI-4574] Fixed timeline based marker thread safety issue (apache#6383
Browse files Browse the repository at this point in the history
)

* fixed timeline based markers thread safety issue
* add document for TimelineBasedMarkers thread safety issues

(cherry picked from commit bad954c)
  • Loading branch information
novisfff authored and neverdizzy committed Dec 1, 2022
1 parent 668f663 commit 8964beb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -74,7 +74,8 @@ public class MarkerHandler extends Handler {
// Parallelism for reading and deleting marker files
private final int parallelism;
// Marker directory states, {markerDirPath -> MarkerDirState instance}
private final Map<String, MarkerDirState> markerDirStateMap = new HashMap<>();
// Use ConcurrentHashMap to ensure thread safety in dispatchingExecutorService
private final Map<String, MarkerDirState> markerDirStateMap = new ConcurrentHashMap<>();
// A thread to dispatch marker creation requests to batch processing threads
private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
private final Object firstCreationRequestSeenLock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public void run() {

// Only fetch pending marker creation requests that can be processed,
// i.e., that markers can be written to a underlying file
for (String markerDir : markerDirStateMap.keySet()) {
MarkerDirState markerDirState = markerDirStateMap.get(markerDir);
// markerDirStateMap is used in other thread, need to ensure thread safety
for (Map.Entry<String, MarkerDirState> entry : markerDirStateMap.entrySet()) {
String markerDir = entry.getKey();
MarkerDirState markerDirState = entry.getValue();
Option<Integer> fileIndex = markerDirState.getNextFileIndexToUse();
if (!fileIndex.isPresent()) {
LOG.debug("All marker files are busy, skip batch processing of create marker requests in " + markerDir);
Expand Down

0 comments on commit 8964beb

Please sign in to comment.