Skip to content

Commit

Permalink
Remove unused StreamingDataflowWorker parameter (#30256)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored Feb 12, 2024
1 parent 68873c8 commit 371576a
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ private void process(

StageInfo stageInfo =
stageInfoMap.computeIfAbsent(
mapTask.getStageName(), s -> StageInfo.create(s, mapTask.getSystemName(), this));
mapTask.getStageName(), s -> StageInfo.create(s, mapTask.getSystemName()));

ExecutionState executionState = null;
String counterName = "dataflow_source_bytes_processed-" + mapTask.getSystemName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
Expand Down Expand Up @@ -440,21 +441,18 @@ <T, W extends BoundedWindow> void writePCollectionViewData(
* needs to be thread safe for multiple writers. A single stage could have multiple executors
* running concurrently.
*/
public static class StreamingModeExecutionState
extends DataflowOperationContext.DataflowExecutionState {
public static class StreamingModeExecutionState extends DataflowExecutionState {

// AtomicLong is used because this value is written in two places:
// 1. The sampling thread calls takeSample to increment the time spent in this state
// 2. The reporting thread calls extractUpdate which reads the current sum *AND* sets it to 0.
private final AtomicLong totalMillisInState = new AtomicLong();

@SuppressWarnings("unused")
public StreamingModeExecutionState(
NameContext nameContext,
String stateName,
MetricsContainer metricsContainer,
ProfileScope profileScope,
StreamingDataflowWorker worker) {
ProfileScope profileScope) {
// TODO: Take in the requesting step name and side input index for streaming.
super(nameContext, stateName, null, null, metricsContainer, profileScope);
}
Expand Down Expand Up @@ -492,37 +490,30 @@ public void takeSample(long millisSinceLastSample) {
*/
public static class StreamingModeExecutionStateRegistry extends DataflowExecutionStateRegistry {

private final StreamingDataflowWorker worker;

public StreamingModeExecutionStateRegistry(StreamingDataflowWorker worker) {
this.worker = worker;
}

@Override
protected DataflowOperationContext.DataflowExecutionState createState(
protected DataflowExecutionState createState(
NameContext nameContext,
String stateName,
String requestingStepName,
Integer inputIndex,
MetricsContainer container,
ProfileScope profileScope) {
return new StreamingModeExecutionState(
nameContext, stateName, container, profileScope, worker);
return new StreamingModeExecutionState(nameContext, stateName, container, profileScope);
}
}

private static class ScopedReadStateSupplier implements Supplier<Closeable> {
private final ExecutionState readState;
private final @Nullable ExecutionStateTracker stateTracker;

ScopedReadStateSupplier(
private ScopedReadStateSupplier(
DataflowOperationContext operationContext, ExecutionStateTracker stateTracker) {
this.readState = operationContext.newExecutionState("windmill-read");
this.stateTracker = stateTracker;
}

@Override
public Closeable get() {
public @Nullable Closeable get() {
if (stateTracker == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,27 @@ public static NameContext create(
@Nullable String originalName,
String systemName,
@Nullable String userName) {
return new AutoValue_NameContext(stageName, originalName, systemName, userName);
return NameContext.newBuilder(stageName)
.setOriginalName(originalName)
.setSystemName(systemName)
.setUserName(userName)
.build();
}

/**
* Create a {@link NameContext} with only a {@code stageName} for representing time spent outside
* specific steps..
*/
public static NameContext forStage(String stageName) {
return new AutoValue_NameContext(stageName, null, null, null);
return newBuilder(stageName).build();
}

public static Builder newBuilder(String stageName) {
return new AutoValue_NameContext.Builder()
.setStageName(stageName)
.setUserName(null)
.setSystemName(null)
.setOriginalName(null);
}

/** Returns the name of the stage this instruction is executing in. */
Expand Down Expand Up @@ -86,4 +98,17 @@ public static NameContext forStage(String stageName) {
* <p>Examples: "MapElements/Map", "BigShuffle.GroupByFirstNBytes/GroupByKey/Reify"
*/
public abstract @Nullable String userName();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setStageName(String value);

public abstract Builder setOriginalName(@Nullable String value);

public abstract Builder setSystemName(@Nullable String value);

public abstract Builder setUserName(@Nullable String value);

public abstract NameContext build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@
/** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */
@AutoValue
public abstract class StageInfo {
public static StageInfo create(
String stageName, String systemName, StreamingDataflowWorker worker) {
NameContext nameContext = NameContext.create(stageName, null, systemName, null);
public static StageInfo create(String stageName, String systemName) {
NameContext nameContext = NameContext.newBuilder(stageName).setSystemName(systemName).build();
CounterSet deltaCounters = new CounterSet();
return new AutoValue_StageInfo(
stageName,
systemName,
StreamingStepMetricsContainer.createRegistry(),
new StreamingModeExecutionStateRegistry(worker),
new StreamingModeExecutionStateRegistry(),
deltaCounters,
deltaCounters.longSum(
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() thro
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
NoopProfileScope.NOOP,
null);
NoopProfileScope.NOOP);

Closeable closure = tracker.enterState(state);

Expand Down Expand Up @@ -162,17 +161,15 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes()
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
NoopProfileScope.NOOP,
null);
NoopProfileScope.NOOP);
tracker.enterState(state);
// Enter a new processing state
StreamingModeExecutionState newState =
new StreamingModeExecutionState(
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
NoopProfileScope.NOOP,
null);
NoopProfileScope.NOOP);
tracker.enterState(newState);

// The first completed state should be recorded and the new state should be active.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class StreamingModeExecutionContextTest {
@Mock private WindmillStateReader stateReader;

private final StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry(null);
new StreamingModeExecutionStateRegistry();
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;

Expand Down Expand Up @@ -316,11 +316,7 @@ public void testAtomicExtractUpdate() throws InterruptedException, ExecutionExce

StreamingModeExecutionState state =
new StreamingModeExecutionState(
NameContextsForTests.nameContextForTest(),
"testState",
null,
NoopProfileScope.NOOP,
null);
NameContextsForTests.nameContextForTest(), "testState", null, NoopProfileScope.NOOP);
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicBoolean doneWriting = new AtomicBoolean(false);

Expand Down Expand Up @@ -359,11 +355,7 @@ public void stateSamplingInStreaming() {
// reach the reading thread.
StreamingModeExecutionState state =
new StreamingModeExecutionState(
NameContextsForTests.nameContextForTest(),
"testState",
null,
NoopProfileScope.NOOP,
null);
NameContextsForTests.nameContextForTest(), "testState", null, NoopProfileScope.NOOP);
ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
try {
sampler.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public void testUnboundedSplits() throws Exception {
public void testReadUnboundedReader() throws Exception {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry(null);
new StreamingModeExecutionStateRegistry();
ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), Runnable::run);
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
Expand Down Expand Up @@ -941,7 +941,7 @@ public void testGetReaderProgressThrowing() {
public void testFailedWorkItemsAbort() throws Exception {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry(null);
new StreamingModeExecutionStateRegistry();
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
counterSet,
Expand Down

0 comments on commit 371576a

Please sign in to comment.