diff --git a/CHANGES.md b/CHANGES.md index 1df3cb35b2bf..22cfe3f93d83 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -50,6 +50,15 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> +# [2.44.0] - Unreleased + +## New Features / Improvements + +* Local packages can now be used as dependencies in the requirements.txt file, rather + than requiring them to be passed separately via the `--extra_package` option. + ([#23684](https://github.com/apache/beam/pull/23684)) + + # [2.43.0] - Unreleased ## Highlights diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index e06c71c917d2..abcef4679c20 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -224,9 +224,14 @@ def create_job_resources(options, # type: PipelineOptions 'The file %s cannot be found. It was specified in the ' '--requirements_file command line option.' % setup_options.requirements_file) + extra_packages, thinned_requirements_file = ( + Stager._extract_local_packages(setup_options.requirements_file)) + if extra_packages: + setup_options.extra_packages = ( + setup_options.extra_packages or []) + extra_packages resources.append( Stager._create_file_stage_to_artifact( - setup_options.requirements_file, REQUIREMENTS_FILE)) + thinned_requirements_file, REQUIREMENTS_FILE)) # Populate cache with packages from the requirement file option and # stage the files in the cache. if not use_beam_default_container: @@ -683,6 +688,25 @@ def _remove_dependency_from_requirements( return tmp_requirements_filename + @staticmethod + def _extract_local_packages(requirements_file): + local_deps = [] + pypi_deps = [] + with open(requirements_file, 'r') as fin: + for line in fin: + dep = line.strip() + if os.path.exists(dep): + local_deps.append(dep) + else: + pypi_deps.append(dep) + if local_deps: + with tempfile.NamedTemporaryFile(suffix='-requirements.txt', + delete=False) as fout: + fout.write('\n'.join(pypi_deps).encode('utf-8')) + return local_deps, fout.name + else: + return [], requirements_file + @staticmethod def _get_platform_for_default_sdk_container(): """ diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index b221bb1ec6f6..c1806c384941 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -832,6 +832,49 @@ def test_populate_requirements_cache_with_sdist(self): self.assertTrue('.tar.gz' in f) self.assertTrue('.whl' not in f) + def test_populate_requirements_cache_with_local_files(self): + staging_dir = self.make_temp_dir() + requirements_cache_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + pkg_dir = self.make_temp_dir() + + options = PipelineOptions() + self.update_options(options) + + options.view_as(SetupOptions).requirements_cache = requirements_cache_dir + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, stager.REQUIREMENTS_FILE) + local_package = os.path.join(pkg_dir, 'local_package.tar.gz') + self.create_temp_file(local_package, 'local-package-content') + self.create_temp_file( + os.path.join(source_dir, stager.REQUIREMENTS_FILE), + '\n'.join(['fake_pypi', local_package])) + with mock.patch('apache_beam.runners.portability.stager_test' + '.stager.Stager._populate_requirements_cache', + staticmethod(self._populate_requitements_cache_fake)): + options.view_as(SetupOptions).requirements_cache_only_sources = True + resources = self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1] + + self.assertEqual( + sorted([ + stager.REQUIREMENTS_FILE, + stager.EXTRA_PACKAGES_FILE, + 'nothing.tar.gz', + 'local_package.tar.gz' + ]), + sorted(resources)) + + with open(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)) as fin: + requirements_contents = fin.read() + self.assertIn('fake_pypi', requirements_contents) + self.assertNotIn('local_package', requirements_contents) + + with open(os.path.join(staging_dir, stager.EXTRA_PACKAGES_FILE)) as fin: + extra_packages_contents = fin.read() + self.assertNotIn('fake_pypi', extra_packages_contents) + self.assertIn('local_package', extra_packages_contents) + class TestStager(stager.Stager): def stage_artifact(self, local_path_to_artifact, artifact_name, sha256): diff --git a/website/www/site/content/en/documentation/sdks/python-pipeline-dependencies.md b/website/www/site/content/en/documentation/sdks/python-pipeline-dependencies.md index bf2e44e55866..330a8af8e449 100644 --- a/website/www/site/content/en/documentation/sdks/python-pipeline-dependencies.md +++ b/website/www/site/content/en/documentation/sdks/python-pipeline-dependencies.md @@ -36,7 +36,7 @@ If your pipeline uses public packages from the [Python Package Index](https://py This command creates a `requirements.txt` file that lists all packages that are installed on your machine, regardless of where they were installed from. -2. Edit the `requirements.txt` file and leave only the packages that were installed from PyPI and are used in the workflow source. Delete all packages that are not relevant to your code. +2. Edit the `requirements.txt` file and delete all packages that are not relevant to your code. 3. Run your pipeline with the following command-line option: @@ -44,7 +44,6 @@ If your pipeline uses public packages from the [Python Package Index](https://py The runner will use the `requirements.txt` file to install your additional dependencies onto the remote workers. -**Important:** Remote workers will install all packages listed in the `requirements.txt` file. Because of this, it's very important that you delete non-PyPI packages from the `requirements.txt` file, as stated in step 2. If you don't remove non-PyPI packages, the remote workers will fail when attempting to install packages from sources that are unknown to them. > **NOTE**: An alternative to `pip freeze` is to use a library like [pip-tools](https://github.com/jazzband/pip-tools) to compile all the dependencies required for the pipeline from a `--requirements_file`, where only top-level dependencies are mentioned. ## Custom Containers {#custom-containers}