diff --git a/sdks/java/extensions/python/build.gradle b/sdks/java/extensions/python/build.gradle new file mode 100644 index 000000000000..c48518ec0ae2 --- /dev/null +++ b/sdks/java/extensions/python/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python') + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Python" + +dependencies { + implementation library.java.vendored_guava_26_0_jre + implementation library.java.vendored_grpc_1_43_2 + implementation library.java.slf4j_api + implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(path: ":runners:core-construction-java") + implementation project(path: ":sdks:java:core", configuration: "shadow") + testImplementation library.java.junit + testImplementation library.java.hamcrest + testRuntimeOnly library.java.slf4j_simple +} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java new file mode 100644 index 000000000000..163f873beb17 --- /dev/null +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python; + +import java.util.Set; +import java.util.UUID; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.runners.core.construction.External; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** Wrapper for invoking external Python transforms. */ +public class ExternalPythonTransform + extends PTransform { + private final String fullyQualifiedName; + private final Row args; + private final Row kwargs; + + public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) { + this.fullyQualifiedName = fullyQualifiedName; + this.args = args; + this.kwargs = kwargs; + } + + @Override + public OutputT expand(InputT input) { + int port; + try { + port = PythonService.findAvailablePort(); + PythonService service = + new PythonService( + "apache_beam.runners.portability.expansion_service_main", + "--port", + "" + port, + "--fully_qualified_name_glob", + "*"); + Schema payloadSchema = + Schema.of( + Schema.Field.of("constructor", Schema.FieldType.STRING), + Schema.Field.of("args", Schema.FieldType.row(args.getSchema())), + Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema()))); + payloadSchema.setUUID(UUID.randomUUID()); + Row payloadRow = + Row.withSchema(payloadSchema).addValues(fullyQualifiedName, args, kwargs).build(); + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(payloadSchema, true)) + .setPayload( + ByteString.copyFrom( + CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow))) + .build(); + try (AutoCloseable p = service.start()) { + PythonService.waitForPort("localhost", port, 15000); + PTransform transform = + External.of( + "beam:transforms:python:fully_qualified_named", + payload.toByteArray(), + "localhost:" + port) + .withMultiOutputs(); + PCollectionTuple outputs; + if (input instanceof PCollection) { + outputs = ((PCollection) input).apply(transform); + } else if (input instanceof PCollectionTuple) { + outputs = ((PCollectionTuple) input).apply(transform); + } else if (input instanceof PBegin) { + outputs = ((PBegin) input).apply(transform); + } else { + throw new RuntimeException("Unhandled input type " + input.getClass()); + } + Set> tags = outputs.getAll().keySet(); + if (tags.size() == 1) { + return (OutputT) outputs.get(Iterables.getOnlyElement(tags)); + } else { + return (OutputT) outputs; + } + } + } catch (RuntimeException exn) { + throw exn; + } catch (Exception exn) { + throw new RuntimeException(exn); + } + } +} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java new file mode 100644 index 000000000000..e4a59939d913 --- /dev/null +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility to bootstrap and start a Beam Python service. */ +public class PythonService { + private static final Logger LOG = LoggerFactory.getLogger(PythonService.class); + + private final String module; + private final List args; + + public PythonService(String module, List args) { + this.module = module; + this.args = args; + } + + public PythonService(String module, String... args) { + this(module, Arrays.asList(args)); + } + + @SuppressWarnings("argument.type.incompatible") + public AutoCloseable start() throws IOException, InterruptedException { + File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py"); + bootstrapScript.deleteOnExit(); + try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) { + ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout); + } + List bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath()); + LOG.info("Running bootstrap command " + bootstrapCommand); + Process bootstrap = + new ProcessBuilder("python", bootstrapScript.getAbsolutePath()) + .redirectError(ProcessBuilder.Redirect.INHERIT) + .start(); + bootstrap.getOutputStream().close(); + BufferedReader reader = + new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8)); + String lastLine = reader.readLine(); + String lastNonEmptyLine = lastLine; + while (lastLine != null) { + LOG.info(lastLine); + if (lastLine.length() > 0) { + lastNonEmptyLine = lastLine; + } + lastLine = reader.readLine(); + } + reader.close(); // Make SpotBugs happy. + int result = bootstrap.waitFor(); + if (result != 0) { + throw new RuntimeException( + "Python boostrap failed with error " + result + ", " + lastNonEmptyLine); + } + String pythonExecutable = lastNonEmptyLine; + List command = new ArrayList<>(); + command.add(pythonExecutable); + command.add("-m"); + command.add(module); + command.addAll(args); + LOG.info("Starting python service with arguments " + command); + Process p = + new ProcessBuilder(command) + .redirectError(ProcessBuilder.Redirect.INHERIT) + .redirectOutput(ProcessBuilder.Redirect.INHERIT) + .start(); + return p::destroy; + } + + public static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + public static void waitForPort(String host, int port, int timeoutMs) + throws TimeoutException, InterruptedException { + long start = System.currentTimeMillis(); + long duration = 10; + while (System.currentTimeMillis() - start < timeoutMs) { + try { + new Socket(host, port).close(); + return; + } catch (IOException exn) { + Thread.sleep(duration); + duration = (long) (duration * 1.2); + } + } + throw new TimeoutException( + "Timeout waiting for Python service startup after " + + (System.currentTimeMillis() - start) + + " seconds."); + } +} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java new file mode 100644 index 000000000000..d57dab3b2efd --- /dev/null +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Extensions for invoking Python transforms from the Beam Java SDK. */ +package org.apache.beam.sdk.extensions.python; diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py new file mode 100644 index 000000000000..08120b84adba --- /dev/null +++ b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py @@ -0,0 +1,115 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""A utility for bootstrapping a BeamPython install. + +This utility can be called with any version of Python, and attempts to create +a Python virtual environment with the requested version of Beam, and any +extra dependencies as required, installed. + +The virtual environment will be placed in Apache Beam's cache directory, and +will be re-used if the parameters match. + +If this script exits successfully, the last line will be the full path to a +suitable python executable. +""" + +import argparse +import distutils.version +import hashlib +import json +import os +import shutil +import subprocess +import sys + + +def main(): + if sys.version_info < (3, ): + # Run this script with Python 3. + os.execlp('python3', 'python3', *sys.argv) + return # In case windows returns... + else: + import urllib.request + + parser = argparse.ArgumentParser() + parser.add_argument('--python_version', help="Python major version.") + parser.add_argument('--beam_version', + help="Beam version.", + default="latest") + parser.add_argument( + '--extra_packages', + help="Semi-colon delimited set of python dependencies.") + parser.add_argument('--cache_dir', + default=os.path.expanduser("~/.apache_beam/cache")) + + options = parser.parse_args() + if options.python_version: + py_version = options.python_version + executable = 'python' + py_version + else: + py_version = '%s.%s' % sys.version_info[:2] + executable = sys.executable + + if options.beam_version == 'latest': + info = json.load( + urllib.request.urlopen("https://pypi.org/pypi/apache_beam/json")) + + def maybe_strict_version(s): + try: + return distutils.version.StrictVersion(s) + except: + return distutils.version.StrictVersion('0.0') + + beam_version = max(info['releases'], key=maybe_strict_version) + beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version + elif (os.path.exists(options.beam_version) + or options.beam_version.startswith('http://') + or options.beam_version.startswith('https://')): + # It's a path to a tarball. + beam_version = os.path.basename(options.beam_version) + beam_package = options.beam_version + else: + beam_version = options.beam_version + beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version + + deps = options.extra_packages.split(';') if options.extra_packages else [] + venv_dir = os.path.join( + options.cache_dir, 'venvs', 'py-%s-beam-%s-%s' % + (py_version, beam_version, + hashlib.sha1(';'.join(sorted(deps)).encode('utf-8')).hexdigest())) + venv_python = os.path.join(venv_dir, 'bin', 'python') + + if not os.path.exists(venv_python): + try: + subprocess.run([executable, '-m', 'venv', venv_dir], check=True) + # See https://issues.apache.org/jira/browse/BEAM-14092 + subprocess.run([ + venv_python, '-m', 'pip', 'install', beam_package, + 'pyparsing==2.4.2' + ], + check=True) + if deps: + subprocess.run([venv_python, '-m', 'pip', 'install'] + deps, + check=True) + # Sanity check the installation. + subprocess.run([venv_python, '-c', 'import apache_beam'], + check=True) + except: + shutil.rmtree(venv_dir) + raise + + print(venv_python) + + +if __name__ == '__main__': + main() diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java new file mode 100644 index 000000000000..9bb25add16e9 --- /dev/null +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python; + +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExternalPythonTransformTest implements Serializable { + @Test + public void trivialPythonTransform() { + Pipeline p = Pipeline.create(); + PCollection output = + p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) + .apply( + new ExternalPythonTransform< + PCollection>, PCollection>>>( + "apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of()))) + .apply(MapElements.into(TypeDescriptors.strings()).via(kv -> kv.getKey())); + PAssert.that(output).containsInAnyOrder("A", "B"); + // TODO: Run this on a multi-language supporting runner. + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 55a0c85ba914..7d3401a06e47 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -129,6 +129,7 @@ include(":sdks:java:extensions:jackson") include(":sdks:java:extensions:join-library") include(":sdks:java:extensions:ml") include(":sdks:java:extensions:protobuf") +include(":sdks:java:extensions:python") include("sdks:java:extensions:sbe") include(":sdks:java:extensions:schemaio-expansion-service") include(":sdks:java:extensions:sketching")