From f5d62dd56b9d541c734ba9a142d24cdecfed784e Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Tue, 22 Mar 2022 13:17:08 -0700 Subject: [PATCH] Revert "[BEAM-14038] Auto-startup for Python expansion service. (#17035)" This reverts commit 0510cecb8a6bf8950699bf7a2253ee82bf37ea39. --- sdks/java/extensions/python/build.gradle | 34 ----- .../python/ExternalPythonTransform.java | 110 --------------- .../sdk/extensions/python/PythonService.java | 132 ------------------ .../sdk/extensions/python/package-info.java | 20 --- .../extensions/python/bootstrap_beam_venv.py | 115 --------------- .../python/ExternalPythonTransformTest.java | 49 ------- settings.gradle.kts | 1 - 7 files changed, 461 deletions(-) delete mode 100644 sdks/java/extensions/python/build.gradle delete mode 100644 sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java delete mode 100644 sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java delete mode 100644 sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java delete mode 100644 sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py delete mode 100644 sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java diff --git a/sdks/java/extensions/python/build.gradle b/sdks/java/extensions/python/build.gradle deleted file mode 100644 index c48518ec0ae2..000000000000 --- a/sdks/java/extensions/python/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { id 'org.apache.beam.module' } -applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python') - -description = "Apache Beam :: SDKs :: Java :: Extensions :: Python" - -dependencies { - implementation library.java.vendored_guava_26_0_jre - implementation library.java.vendored_grpc_1_43_2 - implementation library.java.slf4j_api - implementation project(path: ":model:pipeline", configuration: "shadow") - implementation project(path: ":runners:core-construction-java") - implementation project(path: ":sdks:java:core", configuration: "shadow") - testImplementation library.java.junit - testImplementation library.java.hamcrest - testRuntimeOnly library.java.slf4j_simple -} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java deleted file mode 100644 index 163f873beb17..000000000000 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.python; - -import java.util.Set; -import java.util.UUID; -import org.apache.beam.model.pipeline.v1.ExternalTransforms; -import org.apache.beam.runners.core.construction.External; -import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; - -/** Wrapper for invoking external Python transforms. */ -public class ExternalPythonTransform - extends PTransform { - private final String fullyQualifiedName; - private final Row args; - private final Row kwargs; - - public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) { - this.fullyQualifiedName = fullyQualifiedName; - this.args = args; - this.kwargs = kwargs; - } - - @Override - public OutputT expand(InputT input) { - int port; - try { - port = PythonService.findAvailablePort(); - PythonService service = - new PythonService( - "apache_beam.runners.portability.expansion_service_main", - "--port", - "" + port, - "--fully_qualified_name_glob", - "*"); - Schema payloadSchema = - Schema.of( - Schema.Field.of("constructor", Schema.FieldType.STRING), - Schema.Field.of("args", Schema.FieldType.row(args.getSchema())), - Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema()))); - payloadSchema.setUUID(UUID.randomUUID()); - Row payloadRow = - Row.withSchema(payloadSchema).addValues(fullyQualifiedName, args, kwargs).build(); - ExternalTransforms.ExternalConfigurationPayload payload = - ExternalTransforms.ExternalConfigurationPayload.newBuilder() - .setSchema(SchemaTranslation.schemaToProto(payloadSchema, true)) - .setPayload( - ByteString.copyFrom( - CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow))) - .build(); - try (AutoCloseable p = service.start()) { - PythonService.waitForPort("localhost", port, 15000); - PTransform transform = - External.of( - "beam:transforms:python:fully_qualified_named", - payload.toByteArray(), - "localhost:" + port) - .withMultiOutputs(); - PCollectionTuple outputs; - if (input instanceof PCollection) { - outputs = ((PCollection) input).apply(transform); - } else if (input instanceof PCollectionTuple) { - outputs = ((PCollectionTuple) input).apply(transform); - } else if (input instanceof PBegin) { - outputs = ((PBegin) input).apply(transform); - } else { - throw new RuntimeException("Unhandled input type " + input.getClass()); - } - Set> tags = outputs.getAll().keySet(); - if (tags.size() == 1) { - return (OutputT) outputs.get(Iterables.getOnlyElement(tags)); - } else { - return (OutputT) outputs; - } - } - } catch (RuntimeException exn) { - throw exn; - } catch (Exception exn) { - throw new RuntimeException(exn); - } - } -} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java deleted file mode 100644 index e4a59939d913..000000000000 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.python; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeoutException; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utility to bootstrap and start a Beam Python service. */ -public class PythonService { - private static final Logger LOG = LoggerFactory.getLogger(PythonService.class); - - private final String module; - private final List args; - - public PythonService(String module, List args) { - this.module = module; - this.args = args; - } - - public PythonService(String module, String... args) { - this(module, Arrays.asList(args)); - } - - @SuppressWarnings("argument.type.incompatible") - public AutoCloseable start() throws IOException, InterruptedException { - File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py"); - bootstrapScript.deleteOnExit(); - try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) { - ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout); - } - List bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath()); - LOG.info("Running bootstrap command " + bootstrapCommand); - Process bootstrap = - new ProcessBuilder("python", bootstrapScript.getAbsolutePath()) - .redirectError(ProcessBuilder.Redirect.INHERIT) - .start(); - bootstrap.getOutputStream().close(); - BufferedReader reader = - new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8)); - String lastLine = reader.readLine(); - String lastNonEmptyLine = lastLine; - while (lastLine != null) { - LOG.info(lastLine); - if (lastLine.length() > 0) { - lastNonEmptyLine = lastLine; - } - lastLine = reader.readLine(); - } - reader.close(); // Make SpotBugs happy. - int result = bootstrap.waitFor(); - if (result != 0) { - throw new RuntimeException( - "Python boostrap failed with error " + result + ", " + lastNonEmptyLine); - } - String pythonExecutable = lastNonEmptyLine; - List command = new ArrayList<>(); - command.add(pythonExecutable); - command.add("-m"); - command.add(module); - command.addAll(args); - LOG.info("Starting python service with arguments " + command); - Process p = - new ProcessBuilder(command) - .redirectError(ProcessBuilder.Redirect.INHERIT) - .redirectOutput(ProcessBuilder.Redirect.INHERIT) - .start(); - return p::destroy; - } - - public static int findAvailablePort() throws IOException { - ServerSocket s = new ServerSocket(0); - try { - return s.getLocalPort(); - } finally { - s.close(); - try { - // Some systems don't free the port for future use immediately. - Thread.sleep(100); - } catch (InterruptedException exn) { - // ignore - } - } - } - - public static void waitForPort(String host, int port, int timeoutMs) - throws TimeoutException, InterruptedException { - long start = System.currentTimeMillis(); - long duration = 10; - while (System.currentTimeMillis() - start < timeoutMs) { - try { - new Socket(host, port).close(); - return; - } catch (IOException exn) { - Thread.sleep(duration); - duration = (long) (duration * 1.2); - } - } - throw new TimeoutException( - "Timeout waiting for Python service startup after " - + (System.currentTimeMillis() - start) - + " seconds."); - } -} diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java deleted file mode 100644 index d57dab3b2efd..000000000000 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Extensions for invoking Python transforms from the Beam Java SDK. */ -package org.apache.beam.sdk.extensions.python; diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py deleted file mode 100644 index 08120b84adba..000000000000 --- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py +++ /dev/null @@ -1,115 +0,0 @@ -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""A utility for bootstrapping a BeamPython install. - -This utility can be called with any version of Python, and attempts to create -a Python virtual environment with the requested version of Beam, and any -extra dependencies as required, installed. - -The virtual environment will be placed in Apache Beam's cache directory, and -will be re-used if the parameters match. - -If this script exits successfully, the last line will be the full path to a -suitable python executable. -""" - -import argparse -import distutils.version -import hashlib -import json -import os -import shutil -import subprocess -import sys - - -def main(): - if sys.version_info < (3, ): - # Run this script with Python 3. - os.execlp('python3', 'python3', *sys.argv) - return # In case windows returns... - else: - import urllib.request - - parser = argparse.ArgumentParser() - parser.add_argument('--python_version', help="Python major version.") - parser.add_argument('--beam_version', - help="Beam version.", - default="latest") - parser.add_argument( - '--extra_packages', - help="Semi-colon delimited set of python dependencies.") - parser.add_argument('--cache_dir', - default=os.path.expanduser("~/.apache_beam/cache")) - - options = parser.parse_args() - if options.python_version: - py_version = options.python_version - executable = 'python' + py_version - else: - py_version = '%s.%s' % sys.version_info[:2] - executable = sys.executable - - if options.beam_version == 'latest': - info = json.load( - urllib.request.urlopen("https://pypi.org/pypi/apache_beam/json")) - - def maybe_strict_version(s): - try: - return distutils.version.StrictVersion(s) - except: - return distutils.version.StrictVersion('0.0') - - beam_version = max(info['releases'], key=maybe_strict_version) - beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version - elif (os.path.exists(options.beam_version) - or options.beam_version.startswith('http://') - or options.beam_version.startswith('https://')): - # It's a path to a tarball. - beam_version = os.path.basename(options.beam_version) - beam_package = options.beam_version - else: - beam_version = options.beam_version - beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version - - deps = options.extra_packages.split(';') if options.extra_packages else [] - venv_dir = os.path.join( - options.cache_dir, 'venvs', 'py-%s-beam-%s-%s' % - (py_version, beam_version, - hashlib.sha1(';'.join(sorted(deps)).encode('utf-8')).hexdigest())) - venv_python = os.path.join(venv_dir, 'bin', 'python') - - if not os.path.exists(venv_python): - try: - subprocess.run([executable, '-m', 'venv', venv_dir], check=True) - # See https://issues.apache.org/jira/browse/BEAM-14092 - subprocess.run([ - venv_python, '-m', 'pip', 'install', beam_package, - 'pyparsing==2.4.2' - ], - check=True) - if deps: - subprocess.run([venv_python, '-m', 'pip', 'install'] + deps, - check=True) - # Sanity check the installation. - subprocess.run([venv_python, '-c', 'import apache_beam'], - check=True) - except: - shutil.rmtree(venv_dir) - raise - - print(venv_python) - - -if __name__ == '__main__': - main() diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java deleted file mode 100644 index 9bb25add16e9..000000000000 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.python; - -import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class ExternalPythonTransformTest implements Serializable { - @Test - public void trivialPythonTransform() { - Pipeline p = Pipeline.create(); - PCollection output = - p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z"))) - .apply( - new ExternalPythonTransform< - PCollection>, PCollection>>>( - "apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of()))) - .apply(MapElements.into(TypeDescriptors.strings()).via(kv -> kv.getKey())); - PAssert.that(output).containsInAnyOrder("A", "B"); - // TODO: Run this on a multi-language supporting runner. - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7d3401a06e47..55a0c85ba914 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -129,7 +129,6 @@ include(":sdks:java:extensions:jackson") include(":sdks:java:extensions:join-library") include(":sdks:java:extensions:ml") include(":sdks:java:extensions:protobuf") -include(":sdks:java:extensions:python") include("sdks:java:extensions:sbe") include(":sdks:java:extensions:schemaio-expansion-service") include(":sdks:java:extensions:sketching")