Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug in getProcessingTimesByStepCopy #30270

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
Expand All @@ -52,9 +54,11 @@
import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Instant;

/** Execution context for the Dataflow worker. */
/**
* Execution context for the Dataflow worker.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class DataflowExecutionContext<T extends DataflowStepContext> {

Expand Down Expand Up @@ -84,8 +88,8 @@ public DataflowExecutionContext(
private Map<String, T> cachedStepContexts = new LinkedHashMap<>();

/**
* Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and {@link
* PCollectionView PCollectionViews}.
* Returns a {@link SideInputReader} based on {@link SideInputInfo} descriptors and
* {@link PCollectionView PCollectionViews}.
*
* <p>If side input source metadata is provided by the service in {@link SideInputInfo
* sideInputInfos}, we request a {@link SideInputReader} from the {@code executionContext} using
Expand Down Expand Up @@ -116,7 +120,9 @@ public CounterFactory getCounterFactory() {
return counterFactory;
}

/** Returns a collection view of all of the {@link StepContext}s. */
/**
* Returns a collection view of all of the {@link StepContext}s.
*/
public Collection<? extends T> getAllStepContexts() {
return Collections.unmodifiableCollection(cachedStepContexts.values());
}
Expand Down Expand Up @@ -148,8 +154,8 @@ protected void clearSinkFullHint() {
}

/**
* Returns a {@link SideInputReader} for all the side inputs described in the given {@link
* SideInputInfo} descriptors.
* Returns a {@link SideInputReader} for all the side inputs described in the given
* {@link SideInputInfo} descriptors.
*/
protected abstract SideInputReader getSideInputReader(
Iterable<? extends SideInputInfo> sideInputInfos, DataflowOperationContext operationContext)
Expand All @@ -175,8 +181,11 @@ public T getStepContext(DataflowOperationContext operationContext) {
protected abstract SideInputReader getSideInputReaderForViews(
Iterable<? extends PCollectionView<?>> views) throws Exception;

/** Dataflow specific {@link StepContext}. */
/**
* Dataflow specific {@link StepContext}.
*/
public abstract static class DataflowStepContext implements StepContext {

private final NameContext nameContext;

public DataflowStepContext(NameContext nameContext) {
Expand Down Expand Up @@ -253,10 +262,13 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
* Metadata on the message whose processing is currently being managed by this tracker. If no
* message is actively being processed, activeMessageMetadata will be null.
*/
@Nullable private ActiveMessageMetadata activeMessageMetadata = null;
@GuardedBy("this")
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;

private final MillisProvider clock = System::currentTimeMillis;

@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>();

public DataflowExecutionStateTracker(
Expand Down Expand Up @@ -313,19 +325,24 @@ public Closeable enterState(ExecutionState newState) {
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) {
if (this.activeMessageMetadata != null) {
recordActiveMessageInProcessingTimesMap();
synchronized (this) {
if (this.activeMessageMetadata != null) {
clmccart marked this conversation as resolved.
Show resolved Hide resolved
recordActiveMessageInProcessingTimesMap();
}
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.getMillis());
}
this.activeMessageMetadata =
ActiveMessageMetadata.create(newDFState.getStepName().userName(), clock.getMillis());
}
elementExecutionTracker.enter(newDFState.getStepName());
}

return () -> {
if (isDataflowProcessElementState) {
if (this.activeMessageMetadata != null) {
recordActiveMessageInProcessingTimesMap();
synchronized (this) {
if (this.activeMessageMetadata != null) {
clmccart marked this conversation as resolved.
Show resolved Hide resolved
recordActiveMessageInProcessingTimesMap();
}
}
elementExecutionTracker.exit();
}
Expand All @@ -337,12 +354,21 @@ public String getWorkItemId() {
return this.workItemId;
}

public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
public synchronized Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
return Optional.ofNullable(activeMessageMetadata);
}

public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
Map<String, IntSummaryStatistics> processingTimesCopy = processingTimesByStep;
public synchronized Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
Map<String, IntSummaryStatistics> processingTimesCopy =
processingTimesByStep.entrySet().stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> {
IntSummaryStatistics clone = new IntSummaryStatistics();
clone.combine(e.getValue());
return clone;
}));
return processingTimesCopy;
}

Expand All @@ -351,7 +377,7 @@ public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
* processing times map. Sets the activeMessageMetadata to null after the entry has been
* recorded.
*/
private void recordActiveMessageInProcessingTimesMap() {
private synchronized void recordActiveMessageInProcessingTimesMap() {
if (this.activeMessageMetadata == null) {
return;
}
Expand All @@ -361,7 +387,9 @@ private void recordActiveMessageInProcessingTimesMap() {
if (v == null) {
v = new IntSummaryStatistics();
}
v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime()));
synchronized (this) {
clmccart marked this conversation as resolved.
Show resolved Hide resolved
v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime()));
}
return v;
});
this.activeMessageMetadata = null;
Expand Down
Loading