Skip to content

Commit

Permalink
[BEAM-8137] Add Main method to ExternalWorkerService (#14942)
Browse files Browse the repository at this point in the history
* [BEAM-8137] Add Main method to ExternalWorkerService

1. Add main method to ExternalWorkerService to support launching worker pool from Java SDK Container.

* Add more javadoc/comments

* Add more javadoc/comments

* Update to sleep for Long.MAX_VALUE

* Update format

* Update LOG.error

* Fix checker error

* Update error message

Co-authored-by: Ke Wu <[email protected]>
  • Loading branch information
kw2542 and Ke Wu authored Jun 9, 2021
1 parent 81a46a9 commit f6b00a2
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ public static String createStagingFileName(File path, HashCode hash) {
return String.format("%s-%s%s", fileName, encodedHash, suffix);
}

public static String getExternalServiceAddress(PortablePipelineOptions options) {
String environmentConfig = options.getDefaultEnvironmentConfig();
String environmentOption =
PortablePipelineOptions.getEnvironmentOption(options, externalServiceAddressOption);
if (environmentConfig != null && !environmentConfig.isEmpty()) {
return environmentConfig;
}
return environmentOption;
}

private static File zipDirectory(File directory) throws IOException {
File zipFile = File.createTempFile(directory.getName(), ".zip");
try (FileOutputStream fos = new FileOutputStream(zipFile)) {
Expand Down Expand Up @@ -454,16 +464,6 @@ private static String getDockerContainerImage(PortablePipelineOptions options) {
return environmentOption;
}

private static String getExternalServiceAddress(PortablePipelineOptions options) {
String environmentConfig = options.getDefaultEnvironmentConfig();
String environmentOption =
PortablePipelineOptions.getEnvironmentOption(options, externalServiceAddressOption);
if (environmentConfig != null && !environmentConfig.isEmpty()) {
return environmentConfig;
}
return environmentOption;
}

private static Map<String, String> getProcessVariables(PortablePipelineOptions options) {
ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
String assignments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@
*/
package org.apache.beam.runners.portability;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.util.Collections;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +45,7 @@
public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase implements FnService {

private static final Logger LOG = LoggerFactory.getLogger(ExternalWorkerService.class);
private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";

private final PipelineOptions options;
private final ServerFactory serverFactory = ServerFactory.createDefault();
Expand Down Expand Up @@ -90,10 +98,58 @@ public void stopWorker(
public void close() {}

public GrpcFnServer<ExternalWorkerService> start() throws Exception {
GrpcFnServer<ExternalWorkerService> server =
GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
final String externalServiceAddress =
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
GrpcFnServer<ExternalWorkerService> server;
if (externalServiceAddress.isEmpty()) {
server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
} else {
server =
GrpcFnServer.create(
this,
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
serverFactory);
}
LOG.debug(
"Listening for worker start requests at {}.", server.getApiServiceDescriptor().getUrl());
return server;
}

/**
* Worker pool entry point.
*
* <p>The worker pool exposes an RPC service that is used with EXTERNAL environment to start and
* stop the SDK workers.
*
* <p>The worker pool uses threads for parallelism;
*
* <p>This entry point is used by the Java SDK container in worker pool mode and expects the
* following environment variables:
*
* <ul>
* <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. It needs to be known
* up-front and matches the running job. See {@link PipelineOptions} for further details.
* </ul>
*/
public static void main(String[] args) {
LOG.info("Starting external worker service");
final String optionsEnv =
checkArgumentNotNull(
System.getenv(PIPELINE_OPTIONS_ENV_VAR),
"No pipeline options provided in environment variables " + PIPELINE_OPTIONS_ENV_VAR);
LOG.info("Pipeline options {}", optionsEnv);
PipelineOptions options = PipelineOptionsTranslation.fromJson(optionsEnv);

try (GrpcFnServer<ExternalWorkerService> server = new ExternalWorkerService(options).start()) {
LOG.info(
"External worker service started at address: {}",
server.getApiServiceDescriptor().getUrl());
// Wait to keep ExternalWorkerService running
Sleeper.DEFAULT.sleep(Long.MAX_VALUE);
} catch (Exception e) {
LOG.error("Error running worker service", e);
} finally {
LOG.info("External worker service stopped.");
}
}
}

0 comments on commit f6b00a2

Please sign in to comment.