diff --git a/CHANGES.md b/CHANGES.md index a4882b3c52a8..45e29c97905c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,16 +63,15 @@ you may use `--experiments=use_legacy_bq_sink`. * Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)). * Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)). -* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896 )) * Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601)) ## New Features / Improvements * Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)). -* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)). * OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839)) +* Fixed BEAM-10702 (Python) - Python portable runner with embedded job endpoint was unable to run pipelines which had dependencies in setup.py or requirements.txt ([BEAM-10702](https://issues.apache.org/jira/browse/BEAM-10702)). ## Breaking Changes diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py index f2bbf534c3bf..1f3ec1c0b0ea 100644 --- a/sdks/python/apache_beam/runners/portability/artifact_service.py +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py @@ -41,6 +41,7 @@ from future.moves.urllib.request import urlopen from apache_beam.io import filesystems +from apache_beam.io.filesystems import CompressionTypes from apache_beam.portability import common_urns from apache_beam.portability.api import beam_artifact_api_pb2 from apache_beam.portability.api import beam_artifact_api_pb2_grpc @@ -263,7 +264,8 @@ def __init__(self, root): self._root = root def file_reader(self, path): - return filesystems.FileSystems.open(path) + return filesystems.FileSystems.open( + path, compression_type=CompressionTypes.UNCOMPRESSED) def file_writer(self, name=None): full_path = filesystems.FileSystems.join(self._root, name) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index 9d2754940b61..1f1d483b5a1d 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -45,6 +45,7 @@ import grpc from apache_beam.io import filesystems +from apache_beam.io.filesystems import CompressionTypes from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_artifact_api_pb2_grpc @@ -464,9 +465,13 @@ def __init__(self, self.provision_info.provision_info, worker_manager), self.control_server) + def open_uncompressed(f): + return filesystems.FileSystems.open( + f, compression_type=CompressionTypes.UNCOMPRESSED) + beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( artifact_service.ArtifactRetrievalService( - file_reader=filesystems.FileSystems.open), + file_reader=open_uncompressed), self.control_server) self.data_plane_handler = data_plane.BeamFnDataServicer( diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml index d1e194f0add1..13aa4cfac599 100644 --- a/sdks/python/container/license_scripts/dep_urls_py.yaml +++ b/sdks/python/container/license_scripts/dep_urls_py.yaml @@ -48,7 +48,9 @@ pip_dependencies: cython: license: "https://raw.githubusercontent.com/cython/cython/master/LICENSE.txt" enum34: - license: "https://bitbucket.org/stoneleaf/enum34/raw/c208549a93b71b948ff7bbdfd29dce8f85527916/enum/LICENSE" + # The original repo is down. This license taken from somebody's clone: + # https://github.com/jamespharaoh/python-enum34/blob/master/enum/LICENSE + license: "file:///tmp/license_scripts/manual_licenses/enum34/LICENSE" fastavro: license: "https://raw.githubusercontent.com/fastavro/fastavro/master/LICENSE" notice: "https://raw.githubusercontent.com/fastavro/fastavro/master/NOTICE.txt" diff --git a/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE new file mode 100644 index 000000000000..9003b8850e7e --- /dev/null +++ b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE @@ -0,0 +1,32 @@ +Copyright (c) 2013, Ethan Furman. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + Redistributions of source code must retain the above + copyright notice, this list of conditions and the + following disclaimer. + + Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name Ethan Furman nor the names of any + contributors may be used to endorse or promote products + derived from this software without specific prior written + permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 48312a637328..60f825cc65ac 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -101,6 +101,30 @@ task flinkTriggerTranscript() { } } +// Verifies BEAM-10702. +task portableLocalRunnerJuliaSetWithSetupPy { + dependsOn 'setupVirtualenv' + dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker" + + doLast { + exec { + executable 'sh' + args '-c', """ + . ${envdir}/bin/activate \\ + && cd ${pythonRootDir} \\ + && pip install -e . \\ + && cd apache_beam/examples/complete/juliaset \\ + && python juliaset_main.py \\ + --runner=PortableRunner \\ + --job_endpoint=embed \\ + --setup_file=./setup.py \\ + --coordinate_output=/tmp/juliaset \\ + --grid_size=1 + """ + } + } +} + task createProcessWorker { dependsOn ':sdks:python:container:build' dependsOn 'setupVirtualenv' @@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") { dependsOn = ['setupVirtualenv', "postCommitPy${pythonVersionSuffix}IT", ':runners:spark:job-server:shadowJar', + 'portableLocalRunnerJuliaSetWithSetupPy', 'portableWordCountSparkRunnerBatch'] }