Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Update windmill proto definition (apache#30046)
Browse files Browse the repository at this point in the history
* make external and internal windmill proto defs identical
* override authority of grpc channel for direct path
* Do not inject WindmillServer from deps, create in StreamingDataflowWorker
  • Loading branch information
m-trieu authored Feb 15, 2024
1 parent 03929cb commit 690a5a4
Show file tree
Hide file tree
Showing 21 changed files with 806 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
Expand All @@ -104,13 +106,16 @@
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -208,7 +213,7 @@ public class StreamingDataflowWorker {
private static final Random clientIdGenerator = new Random();
final WindmillStateCache stateCache;
// Maps from computation ids to per-computation state.
private final ConcurrentMap<String, ComputationState> computationMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ComputationState> computationMap;
private final WeightedBoundedQueue<Commit> commitQueue =
WeightedBoundedQueue.create(
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
Expand Down Expand Up @@ -280,8 +285,7 @@ public class StreamingDataflowWorker {
// Periodic sender of debug information to the debug capture service.
private final DebugCapture.@Nullable Manager debugCaptureManager;
// Collection of ScheduledExecutorServices that are running periodic functions.
private ArrayList<ScheduledExecutorService> scheduledExecutors =
new ArrayList<ScheduledExecutorService>();
private final ArrayList<ScheduledExecutorService> scheduledExecutors = new ArrayList<>();
private int retryLocallyDelayMs = 10000;
// Periodically fires a global config request to dataflow service. Only used when windmill service
// is enabled.
Expand All @@ -292,20 +296,23 @@ public class StreamingDataflowWorker {

@VisibleForTesting
StreamingDataflowWorker(
WindmillServerStub windmillServer,
long clientId,
ConcurrentMap<String, ComputationState> computationMap,
List<MapTask> mapTasks,
DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
WorkUnitClient workUnitClient,
StreamingDataflowWorkerOptions options,
boolean publishCounters,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier)
throws IOException {
Function<String, ScheduledExecutorService> executorSupplier) {
this.stateCache = new WindmillStateCache(options.getWorkerCacheMb());
this.readerCache =
new ReaderCache(
Duration.standardSeconds(options.getReaderCacheTimeoutSec()),
Executors.newCachedThreadPool());
this.computationMap = computationMap;
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.workUnitClient = workUnitClient;
this.options = options;
Expand Down Expand Up @@ -429,16 +436,15 @@ public void run() {
commitThreads = commitThreadsBuilder.build();

this.publishCounters = publishCounters;
this.windmillServer = options.getWindmillServerStub();
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
this.clientId = clientId;
this.windmillServer = windmillServer;
this.metricTrackingWindmillServer =
MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
.setUseStreamingRequests(windmillServiceEnabled)
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
.setNumGetDataStreams(options.getWindmillGetDataStreamCount())
.build();
this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer, options);
this.clientId = clientIdGenerator.nextLong();

for (MapTask mapTask : mapTasks) {
addComputation(mapTask.getSystemName(), mapTask, ImmutableMap.of());
Expand All @@ -456,6 +462,44 @@ public void run() {
LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
}

private static WindmillServerStub createWindmillServerStub(
StreamingDataflowWorkerOptions options,
long clientId,
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
if (options.getWindmillServiceEndpoint() != null
|| options.isEnableStreamingEngine()
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
try {
Duration maxBackoff =
!options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null
? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
: GrpcWindmillServer.MAX_BACKOFF;
GrpcWindmillStreamFactory windmillStreamFactory =
GrpcWindmillStreamFactory.of(
JobHeader.newBuilder()
.setJobId(options.getJobId())
.setProjectId(options.getProject())
.setWorkerId(options.getWorkerId())
.setClientId(clientId)
.build())
.setWindmillMessagesBetweenIsReadyChecks(
options.getWindmillMessagesBetweenIsReadyChecks())
.setMaxBackOffSupplier(() -> maxBackoff)
.setLogEveryNStreamFailures(
options.getWindmillServiceStreamingLogEveryNStreamFailures())
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
.build();
windmillStreamFactory.scheduleHealthChecks(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
return GrpcWindmillServer.create(options, windmillStreamFactory, processHeartbeatResponses);
} catch (IOException e) {
throw new RuntimeException("Failed to create GrpcWindmillServer: ", e);
}
} else {
return new JniWindmillApplianceServer(options.getLocalWindmillHostport());
}
}

/** Returns whether an exception was caused by a {@link OutOfMemoryError}. */
private static boolean isOutOfMemoryError(Throwable t) {
while (t != null) {
Expand Down Expand Up @@ -509,10 +553,17 @@ public static void main(String[] args) throws Exception {
worker.start();
}

public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options)
throws IOException {

public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) {
ConcurrentMap<String, ComputationState> computationMap = new ConcurrentHashMap<>();
long clientId = clientIdGenerator.nextLong();
return new StreamingDataflowWorker(
createWindmillServerStub(
options,
clientId,
new WorkHeartbeatResponseProcessor(
computationId -> Optional.ofNullable(computationMap.get(computationId)))),
clientId,
computationMap,
Collections.emptyList(),
IntrinsicMapTaskExecutorFactory.defaultFactory(),
new DataflowWorkUnitClient(options, LOG),
Expand Down Expand Up @@ -1626,7 +1677,6 @@ private void getConfigFromDataflowService(@Nullable String computation) throws I
@SuppressWarnings("FutureReturnValueIgnored")
private void schedulePeriodicGlobalConfigRequests() {
Preconditions.checkState(windmillServiceEnabled);

if (!windmillServer.isReady()) {
// Get the initial global configuration. This will initialize the windmillServer stub.
while (true) {
Expand Down Expand Up @@ -1975,26 +2025,6 @@ private void sendWorkerUpdatesToDataflowService(
}
}

public void handleHeartbeatResponses(List<ComputationHeartbeatResponse> responses) {
for (ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
// Maps sharding key to (work token, cache token) for work that should be marked failed.
Multimap<Long, WorkId> failedWork = ArrayListMultimap.create();
for (Windmill.HeartbeatResponse heartbeatResponse :
computationHeartbeatResponse.getHeartbeatResponsesList()) {
if (heartbeatResponse.getFailed()) {
failedWork.put(
heartbeatResponse.getShardingKey(),
WorkId.builder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
}
}
ComputationState state = computationMap.get(computationHeartbeatResponse.getComputationId());
if (state != null) state.failWork(failedWork);
}
}

/**
* Sends a GetData request to Windmill for all sufficiently old active work.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.options;

import java.io.IOException;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
Expand All @@ -36,12 +32,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOptions {
@Description("Stub for communicating with Windmill.")
@Default.InstanceFactory(WindmillServerStubFactory.class)
WindmillServerStub getWindmillServerStub();

void setWindmillServerStub(WindmillServerStub value);

@Description("Hostport of a co-located Windmill server.")
@Default.InstanceFactory(LocalWindmillHostportFactory.class)
String getLocalWindmillHostport();
Expand Down Expand Up @@ -168,29 +158,6 @@ public String create(PipelineOptions options) {
}
}

/**
* Factory for creating {@link WindmillServerStub} instances. If {@link setLocalWindmillHostport}
* is set, returns a stub to a local Windmill server, otherwise returns a remote gRPC stub.
*/
public static class WindmillServerStubFactory implements DefaultValueFactory<WindmillServerStub> {
@Override
public WindmillServerStub create(PipelineOptions options) {
StreamingDataflowWorkerOptions streamingOptions =
options.as(StreamingDataflowWorkerOptions.class);
if (streamingOptions.getWindmillServiceEndpoint() != null
|| streamingOptions.isEnableStreamingEngine()
|| streamingOptions.getLocalWindmillHostport().startsWith("grpc:")) {
try {
return GrpcWindmillServer.create(streamingOptions);
} catch (IOException e) {
throw new RuntimeException("Failed to create GrpcWindmillServer: ", e);
}
} else {
return new JniWindmillApplianceServer(streamingOptions.getLocalWindmillHostport());
}
}
}

/** Factory for setting value of WindmillServiceStreamingRpcBatchLimit based on environment. */
public static class WindmillServiceStreamingRpcBatchLimitFactory
implements DefaultValueFactory<Integer> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatResponse;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;

/**
* Processes {@link ComputationHeartbeatResponse}(s). Marks {@link Work} that is invalid from
* Streaming Engine backend so that it gets dropped from streaming worker harness processing.
*/
@Internal
public final class WorkHeartbeatResponseProcessor
implements Consumer<List<ComputationHeartbeatResponse>> {
/** Fetches a {@link ComputationState} for a computationId. */
private final Function<String, Optional<ComputationState>> computationStateFetcher;

public WorkHeartbeatResponseProcessor(
/* Fetches a {@link ComputationState} for a String computationId. */
Function<String, Optional<ComputationState>> computationStateFetcher) {
this.computationStateFetcher = computationStateFetcher;
}

@Override
public void accept(List<ComputationHeartbeatResponse> responses) {
for (ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
// Maps sharding key to (work token, cache token) for work that should be marked failed.
Multimap<Long, WorkId> failedWork = ArrayListMultimap.create();
for (HeartbeatResponse heartbeatResponse :
computationHeartbeatResponse.getHeartbeatResponsesList()) {
if (heartbeatResponse.getFailed()) {
failedWork.put(
heartbeatResponse.getShardingKey(),
WorkId.builder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
}
}

computationStateFetcher
.apply(computationHeartbeatResponse.getComputationId())
.ifPresent(state -> state.failWork(failedWork));
}
}
}
Loading

0 comments on commit 690a5a4

Please sign in to comment.