Skip to content

Commit

Permalink
[BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompr…
Browse files Browse the repository at this point in the history
…ess artifacts (#12619)

* [BEAM-10702] Do not implicitly decompress artifacts

Also adds a Julia set test on portable local runner, which uses a
setup.py and hence exercises the artifact staging codepath.

This is a squashed cherrypick of #12571

* Add enum34 to manual_licenses
  • Loading branch information
jkff authored Aug 19, 2020
1 parent e65a6a7 commit 33d2437
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 5 deletions.
3 changes: 1 addition & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,15 @@
you may use `--experiments=use_legacy_bq_sink`.
* Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
* Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896 ))
* Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))

## New Features / Improvements

* Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
* OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))
* Fixed BEAM-10702 (Python) - Python portable runner with embedded job endpoint was unable to run pipelines which had dependencies in setup.py or requirements.txt ([BEAM-10702](https://issues.apache.org/jira/browse/BEAM-10702)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from future.moves.urllib.request import urlopen

from apache_beam.io import filesystems
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
Expand Down Expand Up @@ -263,7 +264,8 @@ def __init__(self, root):
self._root = root

def file_reader(self, path):
return filesystems.FileSystems.open(path)
return filesystems.FileSystems.open(
path, compression_type=CompressionTypes.UNCOMPRESSED)

def file_writer(self, name=None):
full_path = filesystems.FileSystems.join(self._root, name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import grpc

from apache_beam.io import filesystems
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
Expand Down Expand Up @@ -464,9 +465,13 @@ def __init__(self,
self.provision_info.provision_info, worker_manager),
self.control_server)

def open_uncompressed(f):
return filesystems.FileSystems.open(
f, compression_type=CompressionTypes.UNCOMPRESSED)

beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
artifact_service.ArtifactRetrievalService(
file_reader=filesystems.FileSystems.open),
file_reader=open_uncompressed),
self.control_server)

self.data_plane_handler = data_plane.BeamFnDataServicer(
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/container/license_scripts/dep_urls_py.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ pip_dependencies:
cython:
license: "https://raw.githubusercontent.com/cython/cython/master/LICENSE.txt"
enum34:
license: "https://bitbucket.org/stoneleaf/enum34/raw/c208549a93b71b948ff7bbdfd29dce8f85527916/enum/LICENSE"
# The original repo is down. This license taken from somebody's clone:
# https://github.com/jamespharaoh/python-enum34/blob/master/enum/LICENSE
license: "file:///tmp/license_scripts/manual_licenses/enum34/LICENSE"
fastavro:
license: "https://raw.githubusercontent.com/fastavro/fastavro/master/LICENSE"
notice: "https://raw.githubusercontent.com/fastavro/fastavro/master/NOTICE.txt"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Copyright (c) 2013, Ethan Furman.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:

Redistributions of source code must retain the above
copyright notice, this list of conditions and the
following disclaimer.

Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials
provided with the distribution.

Neither the name Ethan Furman nor the names of any
contributors may be used to endorse or promote products
derived from this software without specific prior written
permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
25 changes: 25 additions & 0 deletions sdks/python/test-suites/portable/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,30 @@ task flinkTriggerTranscript() {
}
}

// Verifies BEAM-10702.
task portableLocalRunnerJuliaSetWithSetupPy {
dependsOn 'setupVirtualenv'
dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"

doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e . \\
&& cd apache_beam/examples/complete/juliaset \\
&& python juliaset_main.py \\
--runner=PortableRunner \\
--job_endpoint=embed \\
--setup_file=./setup.py \\
--coordinate_output=/tmp/juliaset \\
--grid_size=1
"""
}
}
}

task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
Expand Down Expand Up @@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:job-server:shadowJar',
'portableLocalRunnerJuliaSetWithSetupPy',
'portableWordCountSparkRunnerBatch']
}

Expand Down

0 comments on commit 33d2437

Please sign in to comment.