From 62c1b69ae3ebcf8cc0de8ea60adbe7fff693bfe5 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Thu, 8 Feb 2024 17:52:18 -0800 Subject: [PATCH 1/6] fix bug in getProcessingTimesByStepCopy --- .../runners/dataflow/worker/DataflowExecutionContext.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 7d45295b2d8c..28ae6cdd4d11 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -177,6 +177,7 @@ protected abstract SideInputReader getSideInputReaderForViews( /** Dataflow specific {@link StepContext}. */ public abstract static class DataflowStepContext implements StepContext { + private final NameContext nameContext; public DataflowStepContext(NameContext nameContext) { @@ -341,8 +342,8 @@ public Optional getActiveMessageMetadata() { return Optional.ofNullable(activeMessageMetadata); } - public Map getProcessingTimesByStepCopy() { - Map processingTimesCopy = processingTimesByStep; + public synchronized Map getProcessingTimesByStepCopy() { + Map processingTimesCopy = new HashMap<>(processingTimesByStep); return processingTimesCopy; } From e2ca6c753bb8bec2b7f584f0c1b59646ee07f892 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 9 Feb 2024 08:38:58 -0800 Subject: [PATCH 2/6] synchronize access to activeMessageMetadata and copy IntSumaryStatistics --- .../dataflow/worker/DataflowExecutionContext.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 28ae6cdd4d11..2d738aec20c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -29,6 +29,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; @@ -338,12 +339,14 @@ public String getWorkItemId() { return this.workItemId; } - public Optional getActiveMessageMetadata() { + public synchronized Optional getActiveMessageMetadata() { return Optional.ofNullable(activeMessageMetadata); } public synchronized Map getProcessingTimesByStepCopy() { - Map processingTimesCopy = new HashMap<>(processingTimesByStep); + Map processingTimesCopy = + processingTimesByStep.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); return processingTimesCopy; } @@ -352,7 +355,7 @@ public synchronized Map getProcessingTimesByStepCo * processing times map. Sets the activeMessageMetadata to null after the entry has been * recorded. */ - private void recordActiveMessageInProcessingTimesMap() { + private synchronized void recordActiveMessageInProcessingTimesMap() { if (this.activeMessageMetadata == null) { return; } From de203f9b6a2eae0174214670f4bdd211d4fd336d Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 9 Feb 2024 10:21:53 -0800 Subject: [PATCH 3/6] add guardedby annotations --- .../worker/DataflowExecutionContext.java | 56 +++++++++++++------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 2d738aec20c5..2afb4daf7413 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; @@ -53,9 +54,11 @@ import org.joda.time.DateTimeUtils.MillisProvider; import org.joda.time.Instant; -/** Execution context for the Dataflow worker. */ +/** + * Execution context for the Dataflow worker. + */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public abstract class DataflowExecutionContext { @@ -85,8 +88,8 @@ public DataflowExecutionContext( private Map cachedStepContexts = new LinkedHashMap<>(); /** - * Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and {@link - * PCollectionView PCollectionViews}. + * Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and + * {@link PCollectionView PCollectionViews}. * *

If side input source metadata is provided by the service in {@link SideInputInfo * sideInputInfos}, we request a {@link SideInputReader} from the {@code executionContext} using @@ -117,7 +120,9 @@ public CounterFactory getCounterFactory() { return counterFactory; } - /** Returns a collection view of all of the {@link StepContext}s. */ + /** + * Returns a collection view of all of the {@link StepContext}s. + */ public Collection getAllStepContexts() { return Collections.unmodifiableCollection(cachedStepContexts.values()); } @@ -149,8 +154,8 @@ protected void clearSinkFullHint() { } /** - * Returns a {@link SideInputReader} for all the side inputs described in the given {@link - * SideInputInfo} descriptors. + * Returns a {@link SideInputReader} for all the side inputs described in the given + * {@link SideInputInfo} descriptors. */ protected abstract SideInputReader getSideInputReader( Iterable sideInputInfos, DataflowOperationContext operationContext) @@ -176,7 +181,9 @@ public T getStepContext(DataflowOperationContext operationContext) { protected abstract SideInputReader getSideInputReaderForViews( Iterable> views) throws Exception; - /** Dataflow specific {@link StepContext}. */ + /** + * Dataflow specific {@link StepContext}. + */ public abstract static class DataflowStepContext implements StepContext { private final NameContext nameContext; @@ -255,10 +262,13 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker * Metadata on the message whose processing is currently being managed by this tracker. If no * message is actively being processed, activeMessageMetadata will be null. */ - @Nullable private ActiveMessageMetadata activeMessageMetadata = null; + @GuardedBy("this") + @Nullable + private ActiveMessageMetadata activeMessageMetadata = null; private final MillisProvider clock = System::currentTimeMillis; + @GuardedBy("this") private final Map processingTimesByStep = new HashMap<>(); public DataflowExecutionStateTracker( @@ -315,19 +325,24 @@ public Closeable enterState(ExecutionState newState) { if (isDataflowProcessElementState) { DataflowExecutionState newDFState = (DataflowExecutionState) newState; if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); + synchronized (this) { + if (this.activeMessageMetadata != null) { + recordActiveMessageInProcessingTimesMap(); + } + this.activeMessageMetadata = + ActiveMessageMetadata.create(newDFState.getStepName().userName(), + clock.getMillis()); } - this.activeMessageMetadata = - ActiveMessageMetadata.create(newDFState.getStepName().userName(), clock.getMillis()); } elementExecutionTracker.enter(newDFState.getStepName()); } return () -> { if (isDataflowProcessElementState) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); + synchronized (this) { + if (this.activeMessageMetadata != null) { + recordActiveMessageInProcessingTimesMap(); + } } elementExecutionTracker.exit(); } @@ -346,7 +361,11 @@ public synchronized Optional getActiveMessageMetadata() { public synchronized Map getProcessingTimesByStepCopy() { Map processingTimesCopy = processingTimesByStep.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + .collect(Collectors.toMap(e -> e.getKey(), e -> { + IntSummaryStatistics clone = new IntSummaryStatistics(); + clone.combine(e.getValue()); + return clone; + })); return processingTimesCopy; } @@ -365,7 +384,10 @@ private synchronized void recordActiveMessageInProcessingTimesMap() { if (v == null) { v = new IntSummaryStatistics(); } - v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); + synchronized (this) { + v.accept( + (int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); + } return v; }); this.activeMessageMetadata = null; From 8a47404847559362b9a99550de1433b97a215a0c Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 9 Feb 2024 10:28:38 -0800 Subject: [PATCH 4/6] spotless apply --- .../worker/DataflowExecutionContext.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 2afb4daf7413..e827151476e4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -330,8 +330,8 @@ public Closeable enterState(ExecutionState newState) { recordActiveMessageInProcessingTimesMap(); } this.activeMessageMetadata = - ActiveMessageMetadata.create(newDFState.getStepName().userName(), - clock.getMillis()); + ActiveMessageMetadata.create( + newDFState.getStepName().userName(), clock.getMillis()); } } elementExecutionTracker.enter(newDFState.getStepName()); @@ -361,11 +361,14 @@ public synchronized Optional getActiveMessageMetadata() { public synchronized Map getProcessingTimesByStepCopy() { Map processingTimesCopy = processingTimesByStep.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> { - IntSummaryStatistics clone = new IntSummaryStatistics(); - clone.combine(e.getValue()); - return clone; - })); + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> { + IntSummaryStatistics clone = new IntSummaryStatistics(); + clone.combine(e.getValue()); + return clone; + })); return processingTimesCopy; } @@ -385,8 +388,7 @@ private synchronized void recordActiveMessageInProcessingTimesMap() { v = new IntSummaryStatistics(); } synchronized (this) { - v.accept( - (int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); + v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); } return v; }); From 5a43f987c9fee21b3fcd2f591e34b60e36c73560 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 9 Feb 2024 12:46:08 -0800 Subject: [PATCH 5/6] remove unnecessary nullness checks --- .../worker/DataflowExecutionContext.java | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index e827151476e4..76417dae7844 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -54,11 +54,9 @@ import org.joda.time.DateTimeUtils.MillisProvider; import org.joda.time.Instant; -/** - * Execution context for the Dataflow worker. - */ +/** Execution context for the Dataflow worker. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public abstract class DataflowExecutionContext { @@ -88,8 +86,8 @@ public DataflowExecutionContext( private Map cachedStepContexts = new LinkedHashMap<>(); /** - * Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and - * {@link PCollectionView PCollectionViews}. + * Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and {@link + * PCollectionView PCollectionViews}. * *

If side input source metadata is provided by the service in {@link SideInputInfo * sideInputInfos}, we request a {@link SideInputReader} from the {@code executionContext} using @@ -120,9 +118,7 @@ public CounterFactory getCounterFactory() { return counterFactory; } - /** - * Returns a collection view of all of the {@link StepContext}s. - */ + /** Returns a collection view of all of the {@link StepContext}s. */ public Collection getAllStepContexts() { return Collections.unmodifiableCollection(cachedStepContexts.values()); } @@ -154,8 +150,8 @@ protected void clearSinkFullHint() { } /** - * Returns a {@link SideInputReader} for all the side inputs described in the given - * {@link SideInputInfo} descriptors. + * Returns a {@link SideInputReader} for all the side inputs described in the given {@link + * SideInputInfo} descriptors. */ protected abstract SideInputReader getSideInputReader( Iterable sideInputInfos, DataflowOperationContext operationContext) @@ -181,9 +177,7 @@ public T getStepContext(DataflowOperationContext operationContext) { protected abstract SideInputReader getSideInputReaderForViews( Iterable> views) throws Exception; - /** - * Dataflow specific {@link StepContext}. - */ + /** Dataflow specific {@link StepContext}. */ public abstract static class DataflowStepContext implements StepContext { private final NameContext nameContext; @@ -325,10 +319,8 @@ public Closeable enterState(ExecutionState newState) { if (isDataflowProcessElementState) { DataflowExecutionState newDFState = (DataflowExecutionState) newState; if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) { + recordActiveMessageInProcessingTimesMap(); synchronized (this) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); - } this.activeMessageMetadata = ActiveMessageMetadata.create( newDFState.getStepName().userName(), clock.getMillis()); @@ -339,11 +331,7 @@ public Closeable enterState(ExecutionState newState) { return () -> { if (isDataflowProcessElementState) { - synchronized (this) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); - } - } + recordActiveMessageInProcessingTimesMap(); elementExecutionTracker.exit(); } baseCloseable.close(); From 0b8151f4a0408046f629019e5d67e88a9ca14267 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Mon, 12 Feb 2024 14:50:32 -0800 Subject: [PATCH 6/6] fix guardedby complaints --- .../runners/dataflow/worker/DataflowExecutionContext.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 76417dae7844..080fa7c9dac4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -369,15 +369,15 @@ private synchronized void recordActiveMessageInProcessingTimesMap() { if (this.activeMessageMetadata == null) { return; } + int processingTime = + (int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime()); this.processingTimesByStep.compute( this.activeMessageMetadata.userStepName(), (k, v) -> { if (v == null) { v = new IntSummaryStatistics(); } - synchronized (this) { - v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); - } + v.accept(processingTime); return v; }); this.activeMessageMetadata = null;