From 082ae794f97e4de7e6a64c71911df7e73b67a646 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 12 Jun 2024 16:54:59 -0700 Subject: [PATCH 01/28] [#31403] Broken initial changes. --- .../apache_beam/options/pipeline_options.py | 11 ++ .../runners/portability/prism_runner.py | 109 ++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 sdks/python/apache_beam/runners/portability/prism_runner.py diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index b204adc7fc5d..e17f0722040a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1707,6 +1707,17 @@ def _add_argparse_args(cls, parser): help='Spark major version to use.') +class PrismRunnerOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + # TODO(lostluck): Add additional prism configuration options here as they're added to prism. + parser.add_argument( + '--prism_binary_location', + help='Path or URL to a prism binary, or zipped binary. ' + ' Binary must be for the current platform.') + + class TestOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py new file mode 100644 index 000000000000..f6fb7f117ec3 --- /dev/null +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A runner for executing portable pipelines on Apache Beam Prism.""" + +# pytype: skip-file + +import logging +import os +import re +import urllib + +from apache_beam.options import pipeline_options +from apache_beam.runners.portability import job_server +from apache_beam.runners.portability import portable_runner +from apache_beam.utils import subprocess_server +from apache_beam.version import __version__ as beam_version + + +MAGIC_HOST_NAMES = ['[local]', '[auto]'] + +_LOGGER = logging.getLogger(__name__) + + +class PrismRunner(portable_runner.PortableRunner): + """A runner for launching jobs on Prism, automatically downloading and starting a + Prism instance if needed. + """ + + # Inherits run_portable_pipeline from PortableRunner. + + def default_environment(self, options): + portable_options = options.view_as(pipeline_options.PortableOptions) + prism_options = options.view_as(pipeline_options.PrismRunnerOptions) + if (not portable_options.environment_type and + not portable_options.output_executable_path): + portable_options.environment_type = 'LOOPBACK' + return super().default_environment(options) + + def default_job_server(self, options): + return job_server.StopOnExitJobServer(PrismJobServer(options)) + + def create_job_service_handle(self, job_service, options): + return portable_runner.JobServiceHandle( + job_service, + options, + retain_unknown_options=True) + + +class PrismJobServer(job_server.SubprocessJobServer): + def __init__(self, options): + super().__init__(options) + options = options.view_as(pipeline_options.PrismRunnerOptions) + + def path_to_jar(self): + if self._jar: + if not os.path.exists(self._jar): + url = urllib.parse.urlparse(self._jar) + if not url.scheme: + raise ValueError( + 'Unable to parse jar URL "%s". If using a full URL, make sure ' + 'the scheme is specified. If using a local file path, make sure ' + 'the file exists; you may have to first build the job server ' + 'using `./gradlew runners:flink:%s:job-server:shadowJar`.' % + (self._jar, self._flink_version)) + return self._jar + else: + return self.path_to_beam_jar( + ':runners:flink:%s:job-server:shadowJar' % self._flink_version) + + def subprocess_cmd_and_endpoint(self): + jar_path = self.local_jar(self.path_to_jar()) + artifacts_dir = ( + self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( + prefix='artifacts')) + job_port, = subprocess_server.pick_port(self._job_port) + subprocess_cmd = [self._java_launcher, '-jar'] + self._jvm_properties + [ + jar_path + ] + list( + self.prism_arguments( + job_port, self._artifact_port, self._expansion_port, artifacts_dir)) + return (subprocess_cmd, 'localhost:%s' % job_port) + + def prism_arguments( + self, job_port, artifact_port, expansion_port, artifacts_dir): + return [ + '--artifacts-dir', + artifacts_dir, + '--job-port', + job_port, + '--artifact-port', + artifact_port, + '--expansion-port', + expansion_port + ] From 8a4c08a13ea9c78567c4a1b3dceb152c5db7609b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 18 Jun 2024 15:40:02 -0700 Subject: [PATCH 02/28] Basic overridden file, unzip success. --- .../apache_beam/options/pipeline_options.py | 5 + .../runners/portability/prism_runner.py | 114 +++++++++++++----- 2 files changed, 88 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index e17f0722040a..ef680782e7d9 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -515,6 +515,7 @@ class StandardOptions(PipelineOptions): 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner', 'apache_beam.runners.portability.flink_runner.FlinkRunner', 'apache_beam.runners.portability.portable_runner.PortableRunner', + 'apache_beam.runners.portability.prism_runner.PrismRunner', 'apache_beam.runners.portability.spark_runner.SparkRunner', 'apache_beam.runners.test.TestDirectRunner', 'apache_beam.runners.test.TestDataflowRunner', @@ -1716,6 +1717,10 @@ def _add_argparse_args(cls, parser): '--prism_binary_location', help='Path or URL to a prism binary, or zipped binary. ' ' Binary must be for the current platform.') + parser.add_argument( + '--prism_beam_version_override', + help='Override the SDK\'s version for deriving the Github Release URLs for downloading a zipped prism binary,' + ' for the current platform.') class TestOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index f6fb7f117ec3..74859c2dbe6b 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -23,15 +23,21 @@ import os import re import urllib +import shutil +import stat +import zipfile +from apache_beam.io.filesystems import FileSystems from apache_beam.options import pipeline_options from apache_beam.runners.portability import job_server from apache_beam.runners.portability import portable_runner from apache_beam.utils import subprocess_server from apache_beam.version import __version__ as beam_version +from urllib.request import urlopen MAGIC_HOST_NAMES = ['[local]', '[auto]'] +GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' _LOGGER = logging.getLogger(__name__) @@ -62,48 +68,94 @@ def create_job_service_handle(self, job_service, options): class PrismJobServer(job_server.SubprocessJobServer): - def __init__(self, options): - super().__init__(options) - options = options.view_as(pipeline_options.PrismRunnerOptions) + PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism") + BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin") - def path_to_jar(self): - if self._jar: - if not os.path.exists(self._jar): - url = urllib.parse.urlparse(self._jar) + def __init__(self, options): + super().__init__() + prism_options = options.view_as(pipeline_options.PrismRunnerOptions) + self._path = prism_options.prism_binary_location + self._version = prism_options.prism_beam_version_override if prism_options.prism_beam_version_override else beam_version + + job_options = options.view_as(pipeline_options.JobServerOptions) + self._job_port = job_options.job_port + + # Finds the bin or zip in the local cache, and if not, fetches it. + @classmethod + def local_bin(cls, url, cache_dir=None): + if cache_dir is None: + cache_dir = cls.BIN_CACHE + if os.path.exists(url): + if zipfile.is_zipfile(url): + z = zipfile.ZipFile(url) + url = z.extract(os.path.splitext(os.path.basename(url))[0], path=cache_dir) + + # Make the binary executable. + st = os.stat(url) + os.chmod(url, st.st_mode | stat.S_IEXEC) + return url + else: + #TODO VALIDATE/REWRITE THE REST OF THIS FUNCTION + cached_jar = os.path.join(cache_dir, os.path.basename(url)) + if os.path.exists(cached_jar): + _LOGGER.info('Using cached job server jar from %s' % url) + else: + _LOGGER.info('Downloading job server jar from %s' % url) + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + # TODO: Clean up this cache according to some policy. + try: + try: + url_read = FileSystems.open(url) + except ValueError: + url_read = urlopen(url) + with open(cached_jar + '.tmp', 'wb') as jar_write: + shutil.copyfileobj(url_read, jar_write, length=1 << 20) + os.rename(cached_jar + '.tmp', cached_jar) + except URLError as e: + raise RuntimeError( + 'Unable to fetch remote job server jar at %s: %s' % (url, e)) + return cached_jar + + def path_to_binary(self): + if self._path: + if not os.path.exists(self._path): + url = urllib.parse.urlparse(self._path) if not url.scheme: raise ValueError( - 'Unable to parse jar URL "%s". If using a full URL, make sure ' - 'the scheme is specified. If using a local file path, make sure ' - 'the file exists; you may have to first build the job server ' - 'using `./gradlew runners:flink:%s:job-server:shadowJar`.' % - (self._jar, self._flink_version)) - return self._jar + 'Unable to parse binary URL "%s". If using a full URL, make sure ' + 'the scheme is specified. If using a local file xpath, make sure ' + 'the file exists; you may have to first build prism ' + 'using `go build `.' % + (self._path)) + return self._path else: - return self.path_to_beam_jar( - ':runners:flink:%s:job-server:shadowJar' % self._flink_version) + if '.dev' in self._version: + raise ValueError( + 'Unable to derive URL for dev versions "%s". Please provide an alternate ' + 'version to derive the release URL' % + (self._version)) + + # TODO Add figuring out what platform we're using + opsys = 'linux' + arch = 'amd64' + + return 'http://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % (self._version,self._version, opsys, arch) def subprocess_cmd_and_endpoint(self): - jar_path = self.local_jar(self.path_to_jar()) - artifacts_dir = ( - self._artifacts_dir if self._artifacts_dir else self.local_temp_dir( - prefix='artifacts')) + bin_path = self.local_bin(self.path_to_binary()) job_port, = subprocess_server.pick_port(self._job_port) - subprocess_cmd = [self._java_launcher, '-jar'] + self._jvm_properties + [ - jar_path + subprocess_cmd = [ + bin_path ] + list( - self.prism_arguments( - job_port, self._artifact_port, self._expansion_port, artifacts_dir)) + self.prism_arguments(job_port)) return (subprocess_cmd, 'localhost:%s' % job_port) def prism_arguments( - self, job_port, artifact_port, expansion_port, artifacts_dir): + self, job_port): return [ - '--artifacts-dir', - artifacts_dir, - '--job-port', + '--job_port', job_port, - '--artifact-port', - artifact_port, - '--expansion-port', - expansion_port + '--serve_http', + False, ] From 21cea72ec3c1ee60c5f039d4ff4fb81f7663af1f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 11:55:47 -0700 Subject: [PATCH 03/28] Add platform selection, and override handling. --- .../apache_beam/options/pipeline_options.py | 10 +- .../runners/portability/prism_runner.py | 104 +++++++++++++----- 2 files changed, 82 insertions(+), 32 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index ef680782e7d9..1ba992290b97 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1714,13 +1714,15 @@ class PrismRunnerOptions(PipelineOptions): def _add_argparse_args(cls, parser): # TODO(lostluck): Add additional prism configuration options here as they're added to prism. parser.add_argument( - '--prism_binary_location', - help='Path or URL to a prism binary, or zipped binary. ' - ' Binary must be for the current platform.') + '--prism_location', + help='Path or URL to a prism binary, or zipped binary for the current platform (Operating System and Architecture).' + ' May also be an Apache Beam Github Release page URL, with a matching beam_version_override set.' + ' This option overrides all others for finding a prism binary.') parser.add_argument( '--prism_beam_version_override', help='Override the SDK\'s version for deriving the Github Release URLs for downloading a zipped prism binary,' - ' for the current platform.') + ' for the current platform. If prism_location is set to a Github Release page URL, them it will use that' + ' release page as a base when constructing the download URL.') class TestOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 74859c2dbe6b..c1d0c9c60386 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -21,23 +21,29 @@ import logging import os +import platform import re import urllib import shutil import stat import zipfile +from urllib.request import Request, urlopen +from urllib.error import URLError + from apache_beam.io.filesystems import FileSystems from apache_beam.options import pipeline_options from apache_beam.runners.portability import job_server from apache_beam.runners.portability import portable_runner from apache_beam.utils import subprocess_server from apache_beam.version import __version__ as beam_version -from urllib.request import urlopen MAGIC_HOST_NAMES = ['[local]', '[auto]'] +# Prefix for constructing a download URL GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' +# Prefix for constructing a release URL, so we can derive a download URL +GITHUB_TAG_PREFIX = 'https://github.com/apache/beam/releases/tag/' _LOGGER = logging.getLogger(__name__) @@ -74,48 +80,82 @@ class PrismJobServer(job_server.SubprocessJobServer): def __init__(self, options): super().__init__() prism_options = options.view_as(pipeline_options.PrismRunnerOptions) - self._path = prism_options.prism_binary_location - self._version = prism_options.prism_beam_version_override if prism_options.prism_beam_version_override else beam_version + # Options flow: + # If the path is set, always download and unzip the provided path, even if a binary is cached. + self._path = prism_options.prism_location + # Use the given string as the beam version for downloading prism from github. + self._version = prism_options.prism_beam_version_override if prism_options.prism_beam_version_override else 'v' + beam_version job_options = options.view_as(pipeline_options.JobServerOptions) self._job_port = job_options.job_port - # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod - def local_bin(cls, url, cache_dir=None): - if cache_dir is None: - cache_dir = cls.BIN_CACHE - if os.path.exists(url): + def maybe_unzip_and_make_executable(cls, url, cache_dir): if zipfile.is_zipfile(url): z = zipfile.ZipFile(url) url = z.extract(os.path.splitext(os.path.basename(url))[0], path=cache_dir) - # Make the binary executable. + # Make sure the binary is executable. st = os.stat(url) os.chmod(url, st.st_mode | stat.S_IEXEC) return url + + # Finds the bin or zip in the local cache, and if not, fetches it. + @classmethod + def local_bin(cls, url, cache_dir=None, ignore_cache=False): + # ignore_cache sets whether we should always be downloading and unzipping the file or not, to avoid staleness issues. + if cache_dir is None: + cache_dir = cls.BIN_CACHE + if os.path.exists(url): + _LOGGER.info('Using local prism binary from %s' % url) + return cls.maybe_unzip_and_make_executable(url, cache_dir=cache_dir) else: - #TODO VALIDATE/REWRITE THE REST OF THIS FUNCTION - cached_jar = os.path.join(cache_dir, os.path.basename(url)) - if os.path.exists(cached_jar): - _LOGGER.info('Using cached job server jar from %s' % url) + cached_bin = os.path.join(cache_dir, os.path.basename(url)) + if os.path.exists(cached_bin) and not ignore_cache: + _LOGGER.info('Using cached prism binary from %s' % url) else: - _LOGGER.info('Downloading job server jar from %s' % url) + _LOGGER.info('Downloading prism binary from %s' % url) if not os.path.exists(cache_dir): os.makedirs(cache_dir) - # TODO: Clean up this cache according to some policy. try: try: url_read = FileSystems.open(url) except ValueError: url_read = urlopen(url) - with open(cached_jar + '.tmp', 'wb') as jar_write: - shutil.copyfileobj(url_read, jar_write, length=1 << 20) - os.rename(cached_jar + '.tmp', cached_jar) + with open(cached_bin + '.tmp', 'wb') as zip_write: + shutil.copyfileobj(url_read, zip_write, length=1 << 20) + os.rename(cached_bin + '.tmp', cached_bin) except URLError as e: raise RuntimeError( - 'Unable to fetch remote job server jar at %s: %s' % (url, e)) - return cached_jar + 'Unable to fetch remote prism binary at %s: %s' % (url, e)) + return cls.maybe_unzip_and_make_executable(cached_bin, cache_dir=cache_dir) + + def construct_download_url(self, root_tag, sys, mach): + # Construct the prism download URL with the appropriate release tag. + # This maps operating systems and machine architectures to the compatible + # and canonical names used by the Go build targets. + + # platform.system() provides compatible listings, so we need to filter out + # the unsupported versions. + opsys = sys.lower() + if opsys not in ['linux','windows','darwin']: + raise ValueError( + 'Operating System "%s" unsupported for constructing a Prism release binary URL.' % (opsys) + ) + + # platform.machine() will vary by system, but many names are compatible with each other. + arch = mach.lower() + if arch in ['amd64','x86_64', 'x86-64','x64']: + arch = 'amd64' + if arch in ['arm64','aarch64_be','aarch64','armv8b','armv8l']: + arch = 'arm64' + + if arch not in ['amd64','arm64']: + raise ValueError( + 'Machine archictecture "%s" unsupported for constructing a Prism release binary URL.' % (opsys) + ) + return 'https://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % (root_tag, self._version, opsys, arch) + def path_to_binary(self): if self._path: @@ -128,22 +168,30 @@ def path_to_binary(self): 'the file exists; you may have to first build prism ' 'using `go build `.' % (self._path)) + + # We have a URL, see if we need to construct a valid file name. + if self._path.startswith(GITHUB_DOWNLOAD_PREFIX): + # If this URL starts with the download prefix, let it through. + return self._path + # The only other valid option is a github release page. + if not self._path.startswith(GITHUB_TAG_PREFIX): + raise ValueError( + 'Provided --prism_location URL is not an Apache Beam Github Release page URL or download URL: %s' % + (self._path)) + # Get the root tag for this URL + root_tag = os.path.basename(os.path.normpath(self._path)) + return self.construct_download_url(root_tag, platform.system(), platform.machine()) return self._path else: if '.dev' in self._version: raise ValueError( 'Unable to derive URL for dev versions "%s". Please provide an alternate ' - 'version to derive the release URL' % + 'version to derive the release URL with the --prism_beam_version_override flag.' % (self._version)) - - # TODO Add figuring out what platform we're using - opsys = 'linux' - arch = 'amd64' - - return 'http://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % (self._version,self._version, opsys, arch) + return self.construct_download_url(self._version, platform.system(), platform.machine()) def subprocess_cmd_and_endpoint(self): - bin_path = self.local_bin(self.path_to_binary()) + bin_path = self.local_bin(self.path_to_binary(), ignore_cache=(self._path != None)) job_port, = subprocess_server.pick_port(self._job_port) subprocess_cmd = [ bin_path From 2d8a8e3daa24ea9e9d8cd8971198176ce67461f3 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 13:51:39 -0700 Subject: [PATCH 04/28] formatting --- .../runners/portability/prism_runner.py | 86 +++++++++---------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index c1d0c9c60386..792955da2f0c 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -38,8 +38,6 @@ from apache_beam.utils import subprocess_server from apache_beam.version import __version__ as beam_version - -MAGIC_HOST_NAMES = ['[local]', '[auto]'] # Prefix for constructing a download URL GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' # Prefix for constructing a release URL, so we can derive a download URL @@ -64,13 +62,11 @@ def default_environment(self, options): return super().default_environment(options) def default_job_server(self, options): - return job_server.StopOnExitJobServer(PrismJobServer(options)) + return job_server.StopOnExitJobServer(PrismJobServer(options)) def create_job_service_handle(self, job_service, options): return portable_runner.JobServiceHandle( - job_service, - options, - retain_unknown_options=True) + job_service, options, retain_unknown_options=True) class PrismJobServer(job_server.SubprocessJobServer): @@ -91,14 +87,15 @@ def __init__(self, options): @classmethod def maybe_unzip_and_make_executable(cls, url, cache_dir): - if zipfile.is_zipfile(url): - z = zipfile.ZipFile(url) - url = z.extract(os.path.splitext(os.path.basename(url))[0], path=cache_dir) - - # Make sure the binary is executable. - st = os.stat(url) - os.chmod(url, st.st_mode | stat.S_IEXEC) - return url + if zipfile.is_zipfile(url): + z = zipfile.ZipFile(url) + url = z.extract( + os.path.splitext(os.path.basename(url))[0], path=cache_dir) + + # Make sure the binary is executable. + st = os.stat(url) + os.chmod(url, st.st_mode | stat.S_IEXEC) + return url # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod @@ -128,8 +125,9 @@ def local_bin(cls, url, cache_dir=None, ignore_cache=False): except URLError as e: raise RuntimeError( 'Unable to fetch remote prism binary at %s: %s' % (url, e)) - return cls.maybe_unzip_and_make_executable(cached_bin, cache_dir=cache_dir) - + return cls.maybe_unzip_and_make_executable( + cached_bin, cache_dir=cache_dir) + def construct_download_url(self, root_tag, sys, mach): # Construct the prism download URL with the appropriate release tag. # This maps operating systems and machine architectures to the compatible @@ -138,24 +136,24 @@ def construct_download_url(self, root_tag, sys, mach): # platform.system() provides compatible listings, so we need to filter out # the unsupported versions. opsys = sys.lower() - if opsys not in ['linux','windows','darwin']: + if opsys not in ['linux', 'windows', 'darwin']: raise ValueError( - 'Operating System "%s" unsupported for constructing a Prism release binary URL.' % (opsys) - ) - + 'Operating System "%s" unsupported for constructing a Prism release binary URL.' + % (opsys)) + # platform.machine() will vary by system, but many names are compatible with each other. arch = mach.lower() - if arch in ['amd64','x86_64', 'x86-64','x64']: + if arch in ['amd64', 'x86_64', 'x86-64', 'x64']: arch = 'amd64' - if arch in ['arm64','aarch64_be','aarch64','armv8b','armv8l']: + if arch in ['arm64', 'aarch64_be', 'aarch64', 'armv8b', 'armv8l']: arch = 'arm64' - if arch not in ['amd64','arm64']: + if arch not in ['amd64', 'arm64']: raise ValueError( - 'Machine archictecture "%s" unsupported for constructing a Prism release binary URL.' % (opsys) - ) - return 'https://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % (root_tag, self._version, opsys, arch) - + 'Machine archictecture "%s" unsupported for constructing a Prism release binary URL.' + % (opsys)) + return 'https://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % ( + root_tag, self._version, opsys, arch) def path_to_binary(self): if self._path: @@ -166,41 +164,39 @@ def path_to_binary(self): 'Unable to parse binary URL "%s". If using a full URL, make sure ' 'the scheme is specified. If using a local file xpath, make sure ' 'the file exists; you may have to first build prism ' - 'using `go build `.' % - (self._path)) - + 'using `go build `.' % (self._path)) + # We have a URL, see if we need to construct a valid file name. if self._path.startswith(GITHUB_DOWNLOAD_PREFIX): # If this URL starts with the download prefix, let it through. - return self._path + return self._path # The only other valid option is a github release page. if not self._path.startswith(GITHUB_TAG_PREFIX): raise ValueError( - 'Provided --prism_location URL is not an Apache Beam Github Release page URL or download URL: %s' % - (self._path)) + 'Provided --prism_location URL is not an Apache Beam Github Release page URL or download URL: %s' + % (self._path)) # Get the root tag for this URL root_tag = os.path.basename(os.path.normpath(self._path)) - return self.construct_download_url(root_tag, platform.system(), platform.machine()) + return self.construct_download_url( + root_tag, platform.system(), platform.machine()) return self._path else: if '.dev' in self._version: raise ValueError( 'Unable to derive URL for dev versions "%s". Please provide an alternate ' - 'version to derive the release URL with the --prism_beam_version_override flag.' % - (self._version)) - return self.construct_download_url(self._version, platform.system(), platform.machine()) + 'version to derive the release URL with the --prism_beam_version_override flag.' + % (self._version)) + return self.construct_download_url( + self._version, platform.system(), platform.machine()) def subprocess_cmd_and_endpoint(self): - bin_path = self.local_bin(self.path_to_binary(), ignore_cache=(self._path != None)) + bin_path = self.local_bin( + self.path_to_binary(), ignore_cache=(self._path != None)) job_port, = subprocess_server.pick_port(self._job_port) - subprocess_cmd = [ - bin_path - ] + list( - self.prism_arguments(job_port)) + subprocess_cmd = [bin_path] + list(self.prism_arguments(job_port)) return (subprocess_cmd, 'localhost:%s' % job_port) - - def prism_arguments( - self, job_port): + + def prism_arguments(self, job_port): return [ '--job_port', job_port, From 4dfb53a384d8034ab212aade771d97516dda2e73 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:41:34 -0700 Subject: [PATCH 05/28] formatting --- sdks/python/apache_beam/options/pipeline_options.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 1ba992290b97..4a7507b98c34 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1709,18 +1709,19 @@ def _add_argparse_args(cls, parser): class PrismRunnerOptions(PipelineOptions): - @classmethod def _add_argparse_args(cls, parser): # TODO(lostluck): Add additional prism configuration options here as they're added to prism. parser.add_argument( '--prism_location', - help='Path or URL to a prism binary, or zipped binary for the current platform (Operating System and Architecture).' + help= + 'Path or URL to a prism binary, or zipped binary for the current platform (Operating System and Architecture).' ' May also be an Apache Beam Github Release page URL, with a matching beam_version_override set.' ' This option overrides all others for finding a prism binary.') parser.add_argument( '--prism_beam_version_override', - help='Override the SDK\'s version for deriving the Github Release URLs for downloading a zipped prism binary,' + help= + 'Override the SDK\'s version for deriving the Github Release URLs for downloading a zipped prism binary,' ' for the current platform. If prism_location is set to a Github Release page URL, them it will use that' ' release page as a base when constructing the download URL.') From ac3ad8b0cc6978b79c9120d60fe6c45ba505436a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:45:24 -0700 Subject: [PATCH 06/28] line len --- .../apache_beam/options/pipeline_options.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 4a7507b98c34..a91e68eccea2 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1711,19 +1711,20 @@ def _add_argparse_args(cls, parser): class PrismRunnerOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): - # TODO(lostluck): Add additional prism configuration options here as they're added to prism. parser.add_argument( '--prism_location', help= - 'Path or URL to a prism binary, or zipped binary for the current platform (Operating System and Architecture).' - ' May also be an Apache Beam Github Release page URL, with a matching beam_version_override set.' - ' This option overrides all others for finding a prism binary.') + 'Path or URL to a prism binary, or zipped binary for the current ' + 'platform (Operating System and Architecture). May also be an Apache ' + 'Beam Github Release page URL, with a matching beam_version_override ' + 'set. This option overrides all others for finding a prism binary.') parser.add_argument( '--prism_beam_version_override', help= - 'Override the SDK\'s version for deriving the Github Release URLs for downloading a zipped prism binary,' - ' for the current platform. If prism_location is set to a Github Release page URL, them it will use that' - ' release page as a base when constructing the download URL.') + 'Override the SDK\'s version for deriving the Github Release URLs for ' + 'downloading a zipped prism binary, for the current platform. If ' + 'prism_location is set to a Github Release page URL, them it will use ' + 'that release page as a base when constructing the download URL.') class TestOptions(PipelineOptions): From 40e09455b1be4a2dba8133ddfc59f03b32235f9e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:52:35 -0700 Subject: [PATCH 07/28] delint --- .../runners/portability/prism_runner.py | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 792955da2f0c..32408336a7b3 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -22,13 +22,12 @@ import logging import os import platform -import re import urllib import shutil import stat import zipfile -from urllib.request import Request, urlopen +from urllib.request import urlopen from urllib.error import URLError from apache_beam.io.filesystems import FileSystems @@ -47,15 +46,14 @@ class PrismRunner(portable_runner.PortableRunner): - """A runner for launching jobs on Prism, automatically downloading and starting a - Prism instance if needed. + """A runner for launching jobs on Prism, automatically downloading and + starting a Prism instance if needed. """ # Inherits run_portable_pipeline from PortableRunner. def default_environment(self, options): portable_options = options.view_as(pipeline_options.PortableOptions) - prism_options = options.view_as(pipeline_options.PrismRunnerOptions) if (not portable_options.environment_type and not portable_options.output_executable_path): portable_options.environment_type = 'LOOPBACK' @@ -77,10 +75,14 @@ def __init__(self, options): super().__init__() prism_options = options.view_as(pipeline_options.PrismRunnerOptions) # Options flow: - # If the path is set, always download and unzip the provided path, even if a binary is cached. + # If the path is set, always download and unzip the provided path, + # even if a binary is cached. self._path = prism_options.prism_location - # Use the given string as the beam version for downloading prism from github. - self._version = prism_options.prism_beam_version_override if prism_options.prism_beam_version_override else 'v' + beam_version + # Which version to use when constructing the prism download url. + if prism_options.prism_beam_version_override: + self._version = prism_options.prism_beam_version_override + else: + self._version = 'v' + beam_version job_options = options.view_as(pipeline_options.JobServerOptions) self._job_port = job_options.job_port @@ -100,7 +102,8 @@ def maybe_unzip_and_make_executable(cls, url, cache_dir): # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod def local_bin(cls, url, cache_dir=None, ignore_cache=False): - # ignore_cache sets whether we should always be downloading and unzipping the file or not, to avoid staleness issues. + # ignore_cache sets whether we should always be downloading and unzipping + # the file or not, to avoid staleness issues. if cache_dir is None: cache_dir = cls.BIN_CACHE if os.path.exists(url): @@ -138,10 +141,11 @@ def construct_download_url(self, root_tag, sys, mach): opsys = sys.lower() if opsys not in ['linux', 'windows', 'darwin']: raise ValueError( - 'Operating System "%s" unsupported for constructing a Prism release binary URL.' + 'Operating System "%s" unsupported for constructing a Prism release ' + 'binary URL.' % (opsys)) - # platform.machine() will vary by system, but many names are compatible with each other. + # platform.machine() will vary by system, but many names are compatible. arch = mach.lower() if arch in ['amd64', 'x86_64', 'x86-64', 'x64']: arch = 'amd64' @@ -150,10 +154,12 @@ def construct_download_url(self, root_tag, sys, mach): if arch not in ['amd64', 'arm64']: raise ValueError( - 'Machine archictecture "%s" unsupported for constructing a Prism release binary URL.' + 'Machine archictecture "%s" unsupported for constructing a Prism ' + 'release binary URL.' % (opsys)) - return 'https://github.com/apache/beam/releases/download/%s/apache_beam-%s-prism-%s-%s.zip' % ( - root_tag, self._version, opsys, arch) + return ('https://github.com/apache/beam/releases/download/' + '%s/apache_beam-%s-prism-%s-%s.zip' % ( + root_tag, self._version, opsys, arch)) def path_to_binary(self): if self._path: @@ -161,9 +167,9 @@ def path_to_binary(self): url = urllib.parse.urlparse(self._path) if not url.scheme: raise ValueError( - 'Unable to parse binary URL "%s". If using a full URL, make sure ' - 'the scheme is specified. If using a local file xpath, make sure ' - 'the file exists; you may have to first build prism ' + 'Unable to parse binary URL "%s". If using a full URL, make ' + 'sure the scheme is specified. If using a local file xpath, ' + 'make sure the file exists; you may have to first build prism ' 'using `go build `.' % (self._path)) # We have a URL, see if we need to construct a valid file name. @@ -173,7 +179,8 @@ def path_to_binary(self): # The only other valid option is a github release page. if not self._path.startswith(GITHUB_TAG_PREFIX): raise ValueError( - 'Provided --prism_location URL is not an Apache Beam Github Release page URL or download URL: %s' + 'Provided --prism_location URL is not an Apache Beam Github ' + 'Release page URL or download URL: %s' % (self._path)) # Get the root tag for this URL root_tag = os.path.basename(os.path.normpath(self._path)) @@ -183,15 +190,16 @@ def path_to_binary(self): else: if '.dev' in self._version: raise ValueError( - 'Unable to derive URL for dev versions "%s". Please provide an alternate ' - 'version to derive the release URL with the --prism_beam_version_override flag.' + 'Unable to derive URL for dev versions "%s". Please provide an ' + 'alternate version to derive the release URL with the ' + '--prism_beam_version_override flag.' % (self._version)) return self.construct_download_url( self._version, platform.system(), platform.machine()) def subprocess_cmd_and_endpoint(self): bin_path = self.local_bin( - self.path_to_binary(), ignore_cache=(self._path != None)) + self.path_to_binary(), ignore_cache=(self._path is not None)) job_port, = subprocess_server.pick_port(self._job_port) subprocess_cmd = [bin_path] + list(self.prism_arguments(job_port)) return (subprocess_cmd, 'localhost:%s' % job_port) From a7d08152fba2ac1849a4aa3d46f02160908b6d9c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:07:05 -0700 Subject: [PATCH 08/28] more formatting opinions --- .../apache_beam/options/pipeline_options.py | 3 +-- .../runners/portability/prism_runner.py | 21 ++++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index a91e68eccea2..6b1dd8bb48c0 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1713,8 +1713,7 @@ class PrismRunnerOptions(PipelineOptions): def _add_argparse_args(cls, parser): parser.add_argument( '--prism_location', - help= - 'Path or URL to a prism binary, or zipped binary for the current ' + help='Path or URL to a prism binary, or zipped binary for the current ' 'platform (Operating System and Architecture). May also be an Apache ' 'Beam Github Release page URL, with a matching beam_version_override ' 'set. This option overrides all others for finding a prism binary.') diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 32408336a7b3..2af227791196 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -75,7 +75,7 @@ def __init__(self, options): super().__init__() prism_options = options.view_as(pipeline_options.PrismRunnerOptions) # Options flow: - # If the path is set, always download and unzip the provided path, + # If the path is set, always download and unzip the provided path, # even if a binary is cached. self._path = prism_options.prism_location # Which version to use when constructing the prism download url. @@ -142,8 +142,7 @@ def construct_download_url(self, root_tag, sys, mach): if opsys not in ['linux', 'windows', 'darwin']: raise ValueError( 'Operating System "%s" unsupported for constructing a Prism release ' - 'binary URL.' - % (opsys)) + 'binary URL.' % (opsys)) # platform.machine() will vary by system, but many names are compatible. arch = mach.lower() @@ -155,11 +154,11 @@ def construct_download_url(self, root_tag, sys, mach): if arch not in ['amd64', 'arm64']: raise ValueError( 'Machine archictecture "%s" unsupported for constructing a Prism ' - 'release binary URL.' - % (opsys)) - return ('https://github.com/apache/beam/releases/download/' - '%s/apache_beam-%s-prism-%s-%s.zip' % ( - root_tag, self._version, opsys, arch)) + 'release binary URL.' % (opsys)) + return ( + 'https://github.com/apache/beam/releases/download/' + '%s/apache_beam-%s-prism-%s-%s.zip' % + (root_tag, self._version, opsys, arch)) def path_to_binary(self): if self._path: @@ -180,8 +179,7 @@ def path_to_binary(self): if not self._path.startswith(GITHUB_TAG_PREFIX): raise ValueError( 'Provided --prism_location URL is not an Apache Beam Github ' - 'Release page URL or download URL: %s' - % (self._path)) + 'Release page URL or download URL: %s' % (self._path)) # Get the root tag for this URL root_tag = os.path.basename(os.path.normpath(self._path)) return self.construct_download_url( @@ -192,8 +190,7 @@ def path_to_binary(self): raise ValueError( 'Unable to derive URL for dev versions "%s". Please provide an ' 'alternate version to derive the release URL with the ' - '--prism_beam_version_override flag.' - % (self._version)) + '--prism_beam_version_override flag.' % (self._version)) return self.construct_download_url( self._version, platform.system(), platform.machine()) From a1eba51bba9980ee89a7c7d28f669cd9ec9fb7e0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:28:04 -0700 Subject: [PATCH 09/28] plus indent --- sdks/python/apache_beam/runners/portability/prism_runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 2af227791196..a6484c540dd8 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -156,9 +156,9 @@ def construct_download_url(self, root_tag, sys, mach): 'Machine archictecture "%s" unsupported for constructing a Prism ' 'release binary URL.' % (opsys)) return ( - 'https://github.com/apache/beam/releases/download/' - '%s/apache_beam-%s-prism-%s-%s.zip' % - (root_tag, self._version, opsys, arch)) + 'https://github.com/apache/beam/releases/download/' + '%s/apache_beam-%s-prism-%s-%s.zip' % + (root_tag, self._version, opsys, arch)) def path_to_binary(self): if self._path: From 54022fd3c0cc45e54869323966107b76ece93dc8 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:28:40 -0700 Subject: [PATCH 10/28] rm trailing ws --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index a6484c540dd8..2626897a1b2e 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -157,7 +157,7 @@ def construct_download_url(self, root_tag, sys, mach): 'release binary URL.' % (opsys)) return ( 'https://github.com/apache/beam/releases/download/' - '%s/apache_beam-%s-prism-%s-%s.zip' % + '%s/apache_beam-%s-prism-%s-%s.zip' % (root_tag, self._version, opsys, arch)) def path_to_binary(self): From 22c05288437e1a328384d2b3221d821f791700f2 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:55:21 -0700 Subject: [PATCH 11/28] Lint and types and comments. --- .../runners/portability/prism_runner.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 2626897a1b2e..4783151db5b2 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -27,6 +27,11 @@ import stat import zipfile +# this will make using the list parameterized generic happy +# on python 3.8 so we aren't revisiting this code after we +# sunset it +from __future__ import annotations + from urllib.request import urlopen from urllib.error import URLError @@ -50,8 +55,6 @@ class PrismRunner(portable_runner.PortableRunner): starting a Prism instance if needed. """ - # Inherits run_portable_pipeline from PortableRunner. - def default_environment(self, options): portable_options = options.view_as(pipeline_options.PortableOptions) if (not portable_options.environment_type and @@ -88,7 +91,7 @@ def __init__(self, options): self._job_port = job_options.job_port @classmethod - def maybe_unzip_and_make_executable(cls, url, cache_dir): + def maybe_unzip_and_make_executable(cls, url, cache_dir) -> (str): if zipfile.is_zipfile(url): z = zipfile.ZipFile(url) url = z.extract( @@ -101,7 +104,7 @@ def maybe_unzip_and_make_executable(cls, url, cache_dir): # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod - def local_bin(cls, url, cache_dir=None, ignore_cache=False): + def local_bin(cls, url, cache_dir=None, ignore_cache=False) -> (str): # ignore_cache sets whether we should always be downloading and unzipping # the file or not, to avoid staleness issues. if cache_dir is None: @@ -131,13 +134,13 @@ def local_bin(cls, url, cache_dir=None, ignore_cache=False): return cls.maybe_unzip_and_make_executable( cached_bin, cache_dir=cache_dir) - def construct_download_url(self, root_tag, sys, mach): - # Construct the prism download URL with the appropriate release tag. - # This maps operating systems and machine architectures to the compatible - # and canonical names used by the Go build targets. + def construct_download_url(self, root_tag, sys, mach) -> (str): + """Construct the prism download URL with the appropriate release tag. + This maps operating systems and machine architectures to the compatible + and canonical names used by the Go build targets. - # platform.system() provides compatible listings, so we need to filter out - # the unsupported versions. + platform.system() provides compatible listings, so we need to filter out + the unsupported versions.""" opsys = sys.lower() if opsys not in ['linux', 'windows', 'darwin']: raise ValueError( @@ -156,11 +159,10 @@ def construct_download_url(self, root_tag, sys, mach): 'Machine archictecture "%s" unsupported for constructing a Prism ' 'release binary URL.' % (opsys)) return ( - 'https://github.com/apache/beam/releases/download/' - '%s/apache_beam-%s-prism-%s-%s.zip' % - (root_tag, self._version, opsys, arch)) + f"https://github.com/apache/beam/releases/download/{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip" + ) - def path_to_binary(self): + def path_to_binary(self) -> str: if self._path: if not os.path.exists(self._path): url = urllib.parse.urlparse(self._path) @@ -194,14 +196,14 @@ def path_to_binary(self): return self.construct_download_url( self._version, platform.system(), platform.machine()) - def subprocess_cmd_and_endpoint(self): + def subprocess_cmd_and_endpoint(self) -> tuple[list[Any], str]: bin_path = self.local_bin( self.path_to_binary(), ignore_cache=(self._path is not None)) job_port, = subprocess_server.pick_port(self._job_port) - subprocess_cmd = [bin_path] + list(self.prism_arguments(job_port)) - return (subprocess_cmd, 'localhost:%s' % job_port) + subprocess_cmd = [bin_path] + self.prism_arguments(job_port) + return (subprocess_cmd, f"localhost:{job_port}") - def prism_arguments(self, job_port): + def prism_arguments(self, job_port) -> list[Any]: return [ '--job_port', job_port, From 98da84205b2a0983f097eaaed964f36451143d4f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:57:50 -0700 Subject: [PATCH 12/28] import order --- sdks/python/apache_beam/runners/portability/prism_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 4783151db5b2..844c18de77d7 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -22,9 +22,9 @@ import logging import os import platform -import urllib import shutil import stat +import urllib import zipfile # this will make using the list parameterized generic happy @@ -32,8 +32,8 @@ # sunset it from __future__ import annotations -from urllib.request import urlopen from urllib.error import URLError +from urllib.request import urlopen from apache_beam.io.filesystems import FileSystems from apache_beam.options import pipeline_options From f36d03c5fd44097e70f4e77521b317b922796fbd Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:59:26 -0700 Subject: [PATCH 13/28] future at top --- .../apache_beam/runners/portability/prism_runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 844c18de77d7..cef19af69349 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -17,6 +17,11 @@ """A runner for executing portable pipelines on Apache Beam Prism.""" +# this will make using the list parameterized generic happy +# on python 3.8 so we aren't revisiting this code after we +# sunset it +from __future__ import annotations + # pytype: skip-file import logging @@ -27,10 +32,6 @@ import urllib import zipfile -# this will make using the list parameterized generic happy -# on python 3.8 so we aren't revisiting this code after we -# sunset it -from __future__ import annotations from urllib.error import URLError from urllib.request import urlopen From 51654c21883843e25a4795f1155db5613cfe24e1 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:00:02 -0700 Subject: [PATCH 14/28] Initial test pass (all skipped, expected to fail) --- .../runners/portability/prism_runner_test.py | 300 ++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 sdks/python/apache_beam/runners/portability/prism_runner_test.py diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py new file mode 100644 index 000000000000..f7190fb3c921 --- /dev/null +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -0,0 +1,300 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import argparse +import logging +import shlex +import typing +import unittest +from os import linesep +from os import path +from os.path import exists +from shutil import rmtree +from tempfile import mkdtemp + +import pytest + +import apache_beam as beam +from apache_beam import Impulse +from apache_beam import Map +from apache_beam.io.external.generate_sequence import GenerateSequence +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import PrismRunnerOptions +from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.portability import job_server +from apache_beam.runners.portability import portable_runner +from apache_beam.runners.portability import portable_runner_test +from apache_beam.runners.portability import prism_runner +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.sql import SqlTransform + +# Run as +# +# pytest prism_runner_test.py[::TestClass::test_case] \ +# --test-pipeline-options="--environment_type=LOOPBACK" + +_LOGGER = logging.getLogger(__name__) + +Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)]) +beam.coders.registry.register_coder(Row, beam.coders.RowCoder) + + +class PrismRunnerTest(portable_runner_test.PortableRunnerTest): + _use_grpc = True + _use_subprocesses = True + + conf_dir = None + expansion_port = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.environment_type = None + self.environment_config = None + self.enable_commit = False + + def setUp(self): + self.enable_commit = False + + @pytest.fixture(autouse=True) + def parse_options(self, request): + if not request.config.option.test_pipeline_options: + raise unittest.SkipTest( + 'Skipping because --test-pipeline-options is not specified.') + test_pipeline_options = request.config.option.test_pipeline_options + parser = argparse.ArgumentParser(add_help=True) + parser.add_argument( + '--prism_bin', help='Prism binary to submit jobs.', action='store') + parser.add_argument( + '--environment_type', + default='LOOPBACK', + choices=['DOCKER', 'PROCESS', 'LOOPBACK'], + help='Set the environment type for running user code. DOCKER runs ' + 'user code in a container. PROCESS runs user code in ' + 'automatically started processes. LOOPBACK runs user code on ' + 'the same process that originally submitted the job.') + parser.add_argument( + '--environment_option', + '--environment_options', + dest='environment_options', + action='append', + default=None, + help=( + 'Environment configuration for running the user code. ' + 'Recognized options depend on --environment_type.\n ' + 'For DOCKER: docker_container_image (optional)\n ' + 'For PROCESS: process_command (required), process_variables ' + '(optional, comma-separated)\n ' + 'For EXTERNAL: external_service_address (required)')) + known_args, unknown_args = parser.parse_known_args( + shlex.split(test_pipeline_options)) + if unknown_args: + _LOGGER.warning('Discarding unrecognized arguments %s' % unknown_args) + self.set_prism_bin(known_args.prism_bin) + self.environment_type = known_args.environment_type + self.environment_options = known_args.environment_options + + @classmethod + def tearDownClass(cls): + if cls.conf_dir and exists(cls.conf_dir): + _LOGGER.info("removing conf dir: %s" % cls.conf_dir) + rmtree(cls.conf_dir) + super().tearDownClass() + + @classmethod + def _create_conf_dir(cls): + """Create (and save a static reference to) a "conf dir", used to provide + metrics configs and verify metrics output + + It gets cleaned up when the suite is done executing""" + + if hasattr(cls, 'conf_dir'): + cls.conf_dir = mkdtemp(prefix='prismtest-conf') + + # path for a FileReporter to write metrics to + cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') + + # path to write Prism configuration to + conf_path = path.join(cls.conf_dir, 'prism-conf.yaml') + file_reporter = 'org.apache.beam.runners.prism.metrics.FileReporter' + with open(conf_path, 'w') as f: + f.write( + linesep.join([ + 'metrics.reporters: file', + 'metrics.reporter.file.class: %s' % file_reporter, + 'metrics.reporter.file.path: %s' % cls.test_metrics_path, + 'metrics.scope.operator: ', + ])) + + @classmethod + def _subprocess_command(cls, job_port, expansion_port): + # will be cleaned up at the end of this method, and recreated and used by + # the job server + tmp_dir = mkdtemp(prefix='prismtest') + + cls._create_conf_dir() + cls.expansion_port = expansion_port + + try: + return [ + cls.prism_bin, + '--job_port', + str(job_port), + ] + finally: + rmtree(tmp_dir) + + @classmethod + def get_expansion_service(cls): + # TODO Move expansion address resides into PipelineOptions + return 'localhost:%s' % cls.expansion_port + + @classmethod + def set_prism_bin(cls, prism_bin): + cls.prism_bin = prism_bin + + def create_options(self): + options = super().create_options() + options.view_as(DebugOptions).experiments = ['beam_fn_api'] + options.view_as(DebugOptions).experiments = [ + 'pre_optimize=default' + ] + options.view_as(DebugOptions).experiments + options.view_as(PortableOptions).environment_type = self.environment_type + options.view_as( + PortableOptions).environment_options = self.environment_options + + return options + + # Can't read host files from within docker, read a "local" file there. + def test_read(self): + print('name:', __name__) + with self.create_pipeline() as p: + lines = p | beam.io.ReadFromText('/etc/profile') + assert_that(lines, lambda lines: len(lines) > 0) + +# def test_no_subtransform_composite(self): +# raise unittest.SkipTest("BEAM-4781") + + def test_external_transform(self): + with self.create_pipeline() as p: + res = ( + p + | GenerateSequence( + start=1, stop=10, expansion_service=self.get_expansion_service())) + + assert_that(res, equal_to([i for i in range(1, 10)])) + + def test_expand_kafka_read(self): + # We expect to fail here because we do not have a Kafka cluster handy. + # Nevertheless, we check that the transform is expanded by the + # ExpansionService and that the pipeline fails during execution. + with self.assertRaises(Exception) as ctx: + self.enable_commit = True + with self.create_pipeline() as p: + # pylint: disable=expression-not-assigned + ( + p + | ReadFromKafka( + consumer_config={ + 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531', + 'group.id': 'any_group' + }, + topics=['topic1', 'topic2'], + key_deserializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArrayDeserializer', + value_deserializer='org.apache.kafka.' + 'common.serialization.' + 'LongDeserializer', + commit_offset_in_finalize=True, + timestamp_policy=ReadFromKafka.create_time_policy, + expansion_service=self.get_expansion_service())) + self.assertTrue( + 'No resolvable bootstrap urls given in bootstrap.servers' + in str(ctx.exception), + 'Expected to fail due to invalid bootstrap.servers, but ' + 'failed due to:\n%s' % str(ctx.exception)) + + def test_expand_kafka_write(self): + # We just test the expansion but do not execute. + # pylint: disable=expression-not-assigned + ( + self.create_pipeline() + | Impulse() + | Map(lambda input: (1, input)) + | WriteToKafka( + producer_config={ + 'bootstrap.servers': 'localhost:9092, notvalid2:3531' + }, + topic='topic1', + key_serializer='org.apache.kafka.' + 'common.serialization.' + 'LongSerializer', + value_serializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArraySerializer', + expansion_service=self.get_expansion_service())) + + def test_sql(self): + with self.create_pipeline() as p: + output = ( + p + | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)]) + | 'Sql' >> SqlTransform( + """SELECT col1, col2 || '*' || col2 as col2, + power(col1, 2) as col3 + FROM PCOLLECTION + """, + expansion_service=self.get_expansion_service())) + assert_that( + output, + equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)])) + + +# def test_flattened_side_input(self): +# # Blocked on support for transcoding +# # https://jira.apache.org/jira/browse/BEAM-6523 +# super().test_flattened_side_input(with_transcoding=False) + +# def test_metrics(self): +# super().test_metrics(check_gauge=False) + +# def test_sdf_with_watermark_tracking(self): +# raise unittest.SkipTest("BEAM-2939") + +# def test_callbacks_with_exception(self): +# raise unittest.SkipTest("https://github.com/apache/beam/issues/19526") + +# def test_register_finalizations(self): +# raise unittest.SkipTest("https://github.com/apache/beam/issues/19526") + +# def test_custom_merging_window(self): +# raise unittest.SkipTest("https://github.com/apache/beam/issues/20641") + +# def test_custom_window_type(self): +# raise unittest.SkipTest("https://github.com/apache/beam/issues/20641") + +# Inherits all other tests. + +if __name__ == '__main__': + # Run the tests. + logging.getLogger().setLevel(logging.INFO) + unittest.main() From 66c779f7996106c70a691a625fc0a8fff0a90c8c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:14:06 -0700 Subject: [PATCH 15/28] one more format lint --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index cef19af69349..bf0239fd0873 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -32,7 +32,6 @@ import urllib import zipfile - from urllib.error import URLError from urllib.request import urlopen @@ -55,7 +54,6 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment(self, options): portable_options = options.view_as(pipeline_options.PortableOptions) if (not portable_options.environment_type and From d02dc753c88e0e5e7713fd4f36c4a05a3a501173 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 08:27:26 -0700 Subject: [PATCH 16/28] lintfmt --- .../apache_beam/runners/portability/prism_runner.py | 8 +++++--- .../apache_beam/runners/portability/prism_runner_test.py | 9 ++------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index bf0239fd0873..fcd3dff134f6 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -158,7 +158,8 @@ def construct_download_url(self, root_tag, sys, mach) -> (str): 'Machine archictecture "%s" unsupported for constructing a Prism ' 'release binary URL.' % (opsys)) return ( - f"https://github.com/apache/beam/releases/download/{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip" + "https://github.com/apache/beam/releases/download/" + f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip" ) def path_to_binary(self) -> str: @@ -195,14 +196,15 @@ def path_to_binary(self) -> str: return self.construct_download_url( self._version, platform.system(), platform.machine()) - def subprocess_cmd_and_endpoint(self) -> tuple[list[Any], str]: + def subprocess_cmd_and_endpoint( + self) -> typing.Tuple[typing.List[typing.Any], str]: bin_path = self.local_bin( self.path_to_binary(), ignore_cache=(self._path is not None)) job_port, = subprocess_server.pick_port(self._job_port) subprocess_cmd = [bin_path] + self.prism_arguments(job_port) return (subprocess_cmd, f"localhost:{job_port}") - def prism_arguments(self, job_port) -> list[Any]: + def prism_arguments(self, job_port) -> typing.List[Any]: return [ '--job_port', job_port, diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index f7190fb3c921..d96b62ce7526 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -36,13 +36,8 @@ from apache_beam.io.kafka import ReadFromKafka from apache_beam.io.kafka import WriteToKafka from apache_beam.options.pipeline_options import DebugOptions -from apache_beam.options.pipeline_options import PrismRunnerOptions from apache_beam.options.pipeline_options import PortableOptions -from apache_beam.options.pipeline_options import StandardOptions -from apache_beam.runners.portability import job_server -from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test -from apache_beam.runners.portability import prism_runner from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.sql import SqlTransform @@ -228,8 +223,8 @@ def test_expand_kafka_read(self): timestamp_policy=ReadFromKafka.create_time_policy, expansion_service=self.get_expansion_service())) self.assertTrue( - 'No resolvable bootstrap urls given in bootstrap.servers' - in str(ctx.exception), + 'No resolvable bootstrap urls given in bootstrap.servers' in str( + ctx.exception), 'Expected to fail due to invalid bootstrap.servers, but ' 'failed due to:\n%s' % str(ctx.exception)) From e380c8ed164d5313225d74e2ffff64715c634da8 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 09:56:21 -0700 Subject: [PATCH 17/28] import typing, fmt. --- .../apache_beam/runners/portability/prism_runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index fcd3dff134f6..8d469e86b221 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -20,7 +20,7 @@ # this will make using the list parameterized generic happy # on python 3.8 so we aren't revisiting this code after we # sunset it -from __future__ import annotations +from __future__ import annotations,typing # pytype: skip-file @@ -29,6 +29,7 @@ import platform import shutil import stat +import typing import urllib import zipfile @@ -159,8 +160,7 @@ def construct_download_url(self, root_tag, sys, mach) -> (str): 'release binary URL.' % (opsys)) return ( "https://github.com/apache/beam/releases/download/" - f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip" - ) + f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip") def path_to_binary(self) -> str: if self._path: @@ -204,7 +204,7 @@ def subprocess_cmd_and_endpoint( subprocess_cmd = [bin_path] + self.prism_arguments(job_port) return (subprocess_cmd, f"localhost:{job_port}") - def prism_arguments(self, job_port) -> typing.List[Any]: + def prism_arguments(self, job_port) -> typing.List[typing.Any]: return [ '--job_port', job_port, From 46e05c1d1bf77bb0b7be01f9259995111edb7c7a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:03:17 -0700 Subject: [PATCH 18/28] rm typing typo --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 8d469e86b221..c66d2ab8e723 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -20,7 +20,7 @@ # this will make using the list parameterized generic happy # on python 3.8 so we aren't revisiting this code after we # sunset it -from __future__ import annotations,typing +from __future__ import annotations # pytype: skip-file From 8b58b07ece35baf0e02aefab575c111b4e043e4d Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:57:24 -0700 Subject: [PATCH 19/28] Linting... linting never ends. --- .../apache_beam/runners/portability/prism_runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index c66d2ab8e723..87c64930f459 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -22,8 +22,6 @@ # sunset it from __future__ import annotations -# pytype: skip-file - import logging import os import platform @@ -32,7 +30,6 @@ import typing import urllib import zipfile - from urllib.error import URLError from urllib.request import urlopen @@ -43,6 +40,10 @@ from apache_beam.utils import subprocess_server from apache_beam.version import __version__ as beam_version +# pytype: skip-file + + + # Prefix for constructing a download URL GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' # Prefix for constructing a release URL, so we can derive a download URL @@ -159,7 +160,7 @@ def construct_download_url(self, root_tag, sys, mach) -> (str): 'Machine archictecture "%s" unsupported for constructing a Prism ' 'release binary URL.' % (opsys)) return ( - "https://github.com/apache/beam/releases/download/" + GITHUB_DOWNLOAD_PREFIX f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip") def path_to_binary(self) -> str: From 0c0c34a931368574c0ec35be5a2fcacd0385e3b5 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 11:33:30 -0700 Subject: [PATCH 20/28] Initial code comments, and some typing. --- .../runners/portability/prism_runner.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 87c64930f459..7824a6c6a5ce 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -37,6 +37,7 @@ from apache_beam.options import pipeline_options from apache_beam.runners.portability import job_server from apache_beam.runners.portability import portable_runner +from apache_beam.transforms import environments from apache_beam.utils import subprocess_server from apache_beam.version import __version__ as beam_version @@ -56,7 +57,7 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment(self, options): + def default_environment(self, options: pipeline_options.PipelineOptions) -> environments.Environment: portable_options = options.view_as(pipeline_options.PortableOptions) if (not portable_options.environment_type and not portable_options.output_executable_path): @@ -92,11 +93,11 @@ def __init__(self, options): self._job_port = job_options.job_port @classmethod - def maybe_unzip_and_make_executable(cls, url, cache_dir) -> (str): + def maybe_unzip_and_make_executable(cls, url, bin_cache) -> str: if zipfile.is_zipfile(url): z = zipfile.ZipFile(url) url = z.extract( - os.path.splitext(os.path.basename(url))[0], path=cache_dir) + os.path.splitext(os.path.basename(url))[0], path=bin_cache) # Make sure the binary is executable. st = os.stat(url) @@ -105,22 +106,22 @@ def maybe_unzip_and_make_executable(cls, url, cache_dir) -> (str): # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod - def local_bin(cls, url, cache_dir=None, ignore_cache=False) -> (str): + def local_bin(cls, url, bin_cache=None, ignore_cache=False) -> str: # ignore_cache sets whether we should always be downloading and unzipping # the file or not, to avoid staleness issues. - if cache_dir is None: - cache_dir = cls.BIN_CACHE + if bin_cache is None: + bin_cache = cls.BIN_CACHE if os.path.exists(url): _LOGGER.info('Using local prism binary from %s' % url) - return cls.maybe_unzip_and_make_executable(url, cache_dir=cache_dir) + return cls.maybe_unzip_and_make_executable(url, bin_cache=bin_cache) else: - cached_bin = os.path.join(cache_dir, os.path.basename(url)) + cached_bin = os.path.join(bin_cache, os.path.basename(url)) if os.path.exists(cached_bin) and not ignore_cache: _LOGGER.info('Using cached prism binary from %s' % url) else: _LOGGER.info('Downloading prism binary from %s' % url) - if not os.path.exists(cache_dir): - os.makedirs(cache_dir) + if not os.path.exists(bin_cache): + os.makedirs(bin_cache) try: try: url_read = FileSystems.open(url) @@ -133,9 +134,9 @@ def local_bin(cls, url, cache_dir=None, ignore_cache=False) -> (str): raise RuntimeError( 'Unable to fetch remote prism binary at %s: %s' % (url, e)) return cls.maybe_unzip_and_make_executable( - cached_bin, cache_dir=cache_dir) + cached_bin, bin_cache=bin_cache) - def construct_download_url(self, root_tag, sys, mach) -> (str): + def construct_download_url(self, root_tag, sys, mach) -> str: """Construct the prism download URL with the appropriate release tag. This maps operating systems and machine architectures to the compatible and canonical names used by the Go build targets. @@ -164,7 +165,7 @@ def construct_download_url(self, root_tag, sys, mach) -> (str): f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip") def path_to_binary(self) -> str: - if self._path: + if self._path is not None: if not os.path.exists(self._path): url = urllib.parse.urlparse(self._path) if not url.scheme: From 063cf3018847749a6535a6696c846019a41ba071 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 11:34:11 -0700 Subject: [PATCH 21/28] Remove commented out test overrides. --- .../runners/portability/prism_runner_test.py | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index d96b62ce7526..f8a52ccadb4f 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -264,29 +264,6 @@ def test_sql(self): equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)])) -# def test_flattened_side_input(self): -# # Blocked on support for transcoding -# # https://jira.apache.org/jira/browse/BEAM-6523 -# super().test_flattened_side_input(with_transcoding=False) - -# def test_metrics(self): -# super().test_metrics(check_gauge=False) - -# def test_sdf_with_watermark_tracking(self): -# raise unittest.SkipTest("BEAM-2939") - -# def test_callbacks_with_exception(self): -# raise unittest.SkipTest("https://github.com/apache/beam/issues/19526") - -# def test_register_finalizations(self): -# raise unittest.SkipTest("https://github.com/apache/beam/issues/19526") - -# def test_custom_merging_window(self): -# raise unittest.SkipTest("https://github.com/apache/beam/issues/20641") - -# def test_custom_window_type(self): -# raise unittest.SkipTest("https://github.com/apache/beam/issues/20641") - # Inherits all other tests. if __name__ == '__main__': From c45188111f8f3086fa921e9413ba0a6c57b3fbd4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 11:37:17 -0700 Subject: [PATCH 22/28] rm additional commented out test --- .../apache_beam/runners/portability/prism_runner_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index f8a52ccadb4f..f1ccf66a2289 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -185,9 +185,6 @@ def test_read(self): lines = p | beam.io.ReadFromText('/etc/profile') assert_that(lines, lambda lines: len(lines) > 0) -# def test_no_subtransform_composite(self): -# raise unittest.SkipTest("BEAM-4781") - def test_external_transform(self): with self.create_pipeline() as p: res = ( From c78fb0a0c70787f9c0f111e55bac47beca80088a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:33:17 -0700 Subject: [PATCH 23/28] Fix invalid syntax. --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 7824a6c6a5ce..33cad7b79ecc 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -161,7 +161,7 @@ def construct_download_url(self, root_tag, sys, mach) -> str: 'Machine archictecture "%s" unsupported for constructing a Prism ' 'release binary URL.' % (opsys)) return ( - GITHUB_DOWNLOAD_PREFIX + GITHUB_DOWNLOAD_PREFIX + f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip") def path_to_binary(self) -> str: From a43380cd17f00b388ab5c7144a05a526f28f445f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:49:25 -0700 Subject: [PATCH 24/28] Formatter --- .../python/apache_beam/runners/portability/prism_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 33cad7b79ecc..f15a0dfb52b1 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -43,8 +43,6 @@ # pytype: skip-file - - # Prefix for constructing a download URL GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/' # Prefix for constructing a release URL, so we can derive a download URL @@ -57,7 +55,10 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment(self, options: pipeline_options.PipelineOptions) -> environments.Environment: + + def default_environment( + self, + options: pipeline_options.PipelineOptions) -> environments.Environment: portable_options = options.view_as(pipeline_options.PortableOptions) if (not portable_options.environment_type and not portable_options.output_executable_path): From ebed96fb2ad957ac9096c78c5df4da9c2d7348c2 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:55:04 -0700 Subject: [PATCH 25/28] fmt --- sdks/python/apache_beam/runners/portability/prism_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index f15a0dfb52b1..03d3211cd85d 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -55,7 +55,6 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment( self, options: pipeline_options.PipelineOptions) -> environments.Environment: From e8394f78c2e855e0a2b014f89a4579b89a54a678 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:09:47 -0700 Subject: [PATCH 26/28] comments and types --- .../python/apache_beam/runners/portability/prism_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 03d3211cd85d..dad7b092fe8e 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -93,7 +93,7 @@ def __init__(self, options): self._job_port = job_options.job_port @classmethod - def maybe_unzip_and_make_executable(cls, url, bin_cache) -> str: + def maybe_unzip_and_make_executable(cls, url: str, bin_cache: str) -> str: if zipfile.is_zipfile(url): z = zipfile.ZipFile(url) url = z.extract( @@ -106,7 +106,8 @@ def maybe_unzip_and_make_executable(cls, url, bin_cache) -> str: # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod - def local_bin(cls, url, bin_cache=None, ignore_cache=False) -> str: + def local_bin( + cls, url: str, bin_cache: str = None, ignore_cache: bool = False) -> str: # ignore_cache sets whether we should always be downloading and unzipping # the file or not, to avoid staleness issues. if bin_cache is None: @@ -136,7 +137,7 @@ def local_bin(cls, url, bin_cache=None, ignore_cache=False) -> str: return cls.maybe_unzip_and_make_executable( cached_bin, bin_cache=bin_cache) - def construct_download_url(self, root_tag, sys, mach) -> str: + def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str: """Construct the prism download URL with the appropriate release tag. This maps operating systems and machine architectures to the compatible and canonical names used by the Go build targets. From 444b528fd608a3bed8fb3af4ace86102d6a87c53 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:55:08 -0700 Subject: [PATCH 27/28] Fix bin_cache type. --- sdks/python/apache_beam/runners/portability/prism_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index dad7b092fe8e..b86a6b16b3a8 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -107,10 +107,10 @@ def maybe_unzip_and_make_executable(cls, url: str, bin_cache: str) -> str: # Finds the bin or zip in the local cache, and if not, fetches it. @classmethod def local_bin( - cls, url: str, bin_cache: str = None, ignore_cache: bool = False) -> str: + cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str: # ignore_cache sets whether we should always be downloading and unzipping # the file or not, to avoid staleness issues. - if bin_cache is None: + if bin_cache is '': bin_cache = cls.BIN_CACHE if os.path.exists(url): _LOGGER.info('Using local prism binary from %s' % url) From 72ffed5e1fe6430807a8ee16e8e1c02f67c764e8 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 11:32:50 -0700 Subject: [PATCH 28/28] use == for literals. --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index b86a6b16b3a8..eeccaf5748ce 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -110,7 +110,7 @@ def local_bin( cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str: # ignore_cache sets whether we should always be downloading and unzipping # the file or not, to avoid staleness issues. - if bin_cache is '': + if bin_cache == '': bin_cache = cls.BIN_CACHE if os.path.exists(url): _LOGGER.info('Using local prism binary from %s' % url)