Skip to content

Commit

Permalink
[BEAM-14038] Auto-startup for Python expansion service. (#17035)
Browse files Browse the repository at this point in the history
]
  • Loading branch information
robertwb authored Mar 17, 2022
1 parent 6da5fa6 commit 0510cec
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 0 deletions.
34 changes: 34 additions & 0 deletions sdks/java/extensions/python/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python')

description = "Apache Beam :: SDKs :: Java :: Extensions :: Python"

dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.vendored_grpc_1_43_2
implementation library.java.slf4j_api
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.junit
testImplementation library.java.hamcrest
testRuntimeOnly library.java.slf4j_simple
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import java.util.Set;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.runners.core.construction.External;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;

/** Wrapper for invoking external Python transforms. */
public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
private final String fullyQualifiedName;
private final Row args;
private final Row kwargs;

public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
this.fullyQualifiedName = fullyQualifiedName;
this.args = args;
this.kwargs = kwargs;
}

@Override
public OutputT expand(InputT input) {
int port;
try {
port = PythonService.findAvailablePort();
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main",
"--port",
"" + port,
"--fully_qualified_name_glob",
"*");
Schema payloadSchema =
Schema.of(
Schema.Field.of("constructor", Schema.FieldType.STRING),
Schema.Field.of("args", Schema.FieldType.row(args.getSchema())),
Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema())));
payloadSchema.setUUID(UUID.randomUUID());
Row payloadRow =
Row.withSchema(payloadSchema).addValues(fullyQualifiedName, args, kwargs).build();
ExternalTransforms.ExternalConfigurationPayload payload =
ExternalTransforms.ExternalConfigurationPayload.newBuilder()
.setSchema(SchemaTranslation.schemaToProto(payloadSchema, true))
.setPayload(
ByteString.copyFrom(
CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
.build();
try (AutoCloseable p = service.start()) {
PythonService.waitForPort("localhost", port, 15000);
PTransform<PInput, PCollectionTuple> transform =
External.<PInput, Object>of(
"beam:transforms:python:fully_qualified_named",
payload.toByteArray(),
"localhost:" + port)
.withMultiOutputs();
PCollectionTuple outputs;
if (input instanceof PCollection) {
outputs = ((PCollection<?>) input).apply(transform);
} else if (input instanceof PCollectionTuple) {
outputs = ((PCollectionTuple) input).apply(transform);
} else if (input instanceof PBegin) {
outputs = ((PBegin) input).apply(transform);
} else {
throw new RuntimeException("Unhandled input type " + input.getClass());
}
Set<TupleTag<?>> tags = outputs.getAll().keySet();
if (tags.size() == 1) {
return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
} else {
return (OutputT) outputs;
}
}
} catch (RuntimeException exn) {
throw exn;
} catch (Exception exn) {
throw new RuntimeException(exn);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility to bootstrap and start a Beam Python service. */
public class PythonService {
private static final Logger LOG = LoggerFactory.getLogger(PythonService.class);

private final String module;
private final List<String> args;

public PythonService(String module, List<String> args) {
this.module = module;
this.args = args;
}

public PythonService(String module, String... args) {
this(module, Arrays.asList(args));
}

@SuppressWarnings("argument.type.incompatible")
public AutoCloseable start() throws IOException, InterruptedException {
File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
bootstrapScript.deleteOnExit();
try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) {
ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout);
}
List<String> bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath());
LOG.info("Running bootstrap command " + bootstrapCommand);
Process bootstrap =
new ProcessBuilder("python", bootstrapScript.getAbsolutePath())
.redirectError(ProcessBuilder.Redirect.INHERIT)
.start();
bootstrap.getOutputStream().close();
BufferedReader reader =
new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8));
String lastLine = reader.readLine();
String lastNonEmptyLine = lastLine;
while (lastLine != null) {
LOG.info(lastLine);
if (lastLine.length() > 0) {
lastNonEmptyLine = lastLine;
}
lastLine = reader.readLine();
}
reader.close(); // Make SpotBugs happy.
int result = bootstrap.waitFor();
if (result != 0) {
throw new RuntimeException(
"Python boostrap failed with error " + result + ", " + lastNonEmptyLine);
}
String pythonExecutable = lastNonEmptyLine;
List<String> command = new ArrayList<>();
command.add(pythonExecutable);
command.add("-m");
command.add(module);
command.addAll(args);
LOG.info("Starting python service with arguments " + command);
Process p =
new ProcessBuilder(command)
.redirectError(ProcessBuilder.Redirect.INHERIT)
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
.start();
return p::destroy;
}

public static int findAvailablePort() throws IOException {
ServerSocket s = new ServerSocket(0);
try {
return s.getLocalPort();
} finally {
s.close();
try {
// Some systems don't free the port for future use immediately.
Thread.sleep(100);
} catch (InterruptedException exn) {
// ignore
}
}
}

public static void waitForPort(String host, int port, int timeoutMs)
throws TimeoutException, InterruptedException {
long start = System.currentTimeMillis();
long duration = 10;
while (System.currentTimeMillis() - start < timeoutMs) {
try {
new Socket(host, port).close();
return;
} catch (IOException exn) {
Thread.sleep(duration);
duration = (long) (duration * 1.2);
}
}
throw new TimeoutException(
"Timeout waiting for Python service startup after "
+ (System.currentTimeMillis() - start)
+ " seconds.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Extensions for invoking Python transforms from the Beam Java SDK. */
package org.apache.beam.sdk.extensions.python;
Loading

0 comments on commit 0510cec

Please sign in to comment.