diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java index d7ac42efe58d..473ae4e08197 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java @@ -404,6 +404,16 @@ public static String createStagingFileName(File path, HashCode hash) { return String.format("%s-%s%s", fileName, encodedHash, suffix); } + public static String getExternalServiceAddress(PortablePipelineOptions options) { + String environmentConfig = options.getDefaultEnvironmentConfig(); + String environmentOption = + PortablePipelineOptions.getEnvironmentOption(options, externalServiceAddressOption); + if (environmentConfig != null && !environmentConfig.isEmpty()) { + return environmentConfig; + } + return environmentOption; + } + private static File zipDirectory(File directory) throws IOException { File zipFile = File.createTempFile(directory.getName(), ".zip"); try (FileOutputStream fos = new FileOutputStream(zipFile)) { @@ -454,16 +464,6 @@ private static String getDockerContainerImage(PortablePipelineOptions options) { return environmentOption; } - private static String getExternalServiceAddress(PortablePipelineOptions options) { - String environmentConfig = options.getDefaultEnvironmentConfig(); - String environmentOption = - PortablePipelineOptions.getEnvironmentOption(options, externalServiceAddressOption); - if (environmentConfig != null && !environmentConfig.isEmpty()) { - return environmentConfig; - } - return environmentOption; - } - private static Map getProcessVariables(PortablePipelineOptions options) { ImmutableMap.Builder variables = ImmutableMap.builder(); String assignments = diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java index 9fbfaac0efd3..000ff648f8a8 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.portability; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.util.Collections; import org.apache.beam.fn.harness.FnHarness; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest; @@ -24,10 +26,15 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse; import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.fnexecution.FnService; import org.apache.beam.runners.fnexecution.GrpcFnServer; import org.apache.beam.runners.fnexecution.ServerFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +45,7 @@ public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase implements FnService { private static final Logger LOG = LoggerFactory.getLogger(ExternalWorkerService.class); + private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS"; private final PipelineOptions options; private final ServerFactory serverFactory = ServerFactory.createDefault(); @@ -90,10 +98,58 @@ public void stopWorker( public void close() {} public GrpcFnServer start() throws Exception { - GrpcFnServer server = - GrpcFnServer.allocatePortAndCreateFor(this, serverFactory); + final String externalServiceAddress = + Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class)); + GrpcFnServer server; + if (externalServiceAddress.isEmpty()) { + server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory); + } else { + server = + GrpcFnServer.create( + this, + Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(), + serverFactory); + } LOG.debug( "Listening for worker start requests at {}.", server.getApiServiceDescriptor().getUrl()); return server; } + + /** + * Worker pool entry point. + * + *

The worker pool exposes an RPC service that is used with EXTERNAL environment to start and + * stop the SDK workers. + * + *

The worker pool uses threads for parallelism; + * + *

This entry point is used by the Java SDK container in worker pool mode and expects the + * following environment variables: + * + *

    + *
  • PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. It needs to be known + * up-front and matches the running job. See {@link PipelineOptions} for further details. + *
+ */ + public static void main(String[] args) { + LOG.info("Starting external worker service"); + final String optionsEnv = + checkArgumentNotNull( + System.getenv(PIPELINE_OPTIONS_ENV_VAR), + "No pipeline options provided in environment variables " + PIPELINE_OPTIONS_ENV_VAR); + LOG.info("Pipeline options {}", optionsEnv); + PipelineOptions options = PipelineOptionsTranslation.fromJson(optionsEnv); + + try (GrpcFnServer server = new ExternalWorkerService(options).start()) { + LOG.info( + "External worker service started at address: {}", + server.getApiServiceDescriptor().getUrl()); + // Wait to keep ExternalWorkerService running + Sleeper.DEFAULT.sleep(Long.MAX_VALUE); + } catch (Exception e) { + LOG.error("Error running worker service", e); + } finally { + LOG.info("External worker service stopped."); + } + } }