Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Log dependencies installed in submission environment #28564

Merged
merged 26 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e441b52
log runtime dependencies
riteshghorse Sep 19, 2023
1497427
log submission env dependencies
riteshghorse Sep 20, 2023
b696f6f
rm spare line
riteshghorse Sep 20, 2023
8cbeb61
update tests for staged files
riteshghorse Sep 20, 2023
a2761c9
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 20, 2023
1e68428
update equals test to ignore artifacts
riteshghorse Sep 21, 2023
284f772
handle unit tests, refactor
riteshghorse Sep 22, 2023
8f03251
unit test sub env staging, convert to string
riteshghorse Sep 22, 2023
7e6e62a
change Log to Printf
riteshghorse Sep 26, 2023
c03378f
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 27, 2023
cc24a1d
change log level to warning
riteshghorse Oct 3, 2023
6b6b222
try env
riteshghorse Dec 6, 2023
b16d23d
Merge remote-tracking branch 'origin' into sdeps
riteshghorse Dec 6, 2023
649c8a0
merge master
riteshghorse Dec 18, 2023
9b65385
add artifact_service method
riteshghorse Dec 20, 2023
f2e821d
merge master
riteshghorse Dec 20, 2023
789c764
correct urn
riteshghorse Dec 20, 2023
df68c1d
fix artifact comparison, file reader
riteshghorse Dec 20, 2023
4dfe7e0
add mock for python sdk dependencies and update artifact service method
riteshghorse Feb 5, 2024
b7d62bc
fix lint
riteshghorse Feb 5, 2024
404286b
use magic mock instead of mocking entire function
riteshghorse Feb 9, 2024
da2b954
update dataflow runner test
riteshghorse Feb 15, 2024
eeedc44
Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
riteshghorse Feb 15, 2024
c527114
use debug option to disable
riteshghorse Mar 7, 2024
279e01d
pull upstream
riteshghorse Mar 7, 2024
80d6d18
remove tmp directory mock
riteshghorse Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,13 @@ def test_environment_override_translation_legacy_worker_harness_image(self):
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
])
first = remote_runner.proto_pipeline.components.environments.values()
second = beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='LEGACY').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
self.assertTrue(first.__eq__(second))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

def test_environment_override_translation_sdk_container_image(self):
self.default_properties.append('--experiments=beam_fn_api')
Expand All @@ -231,15 +229,13 @@ def test_environment_override_translation_sdk_container_image(self):
p | ptransform.Create([1, 2, 3])
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
self.assertEqual(
list(remote_runner.proto_pipeline.components.environments.values()),
[
beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
])
first = remote_runner.proto_pipeline.components.environments.values()
second = beam_runner_api_pb2.Environment(
urn=common_urns.environments.DOCKER.urn,
payload=beam_runner_api_pb2.DockerPayload(
container_image='FOO').SerializeToString(),
capabilities=environments.python_sdk_docker_capabilities())
self.assertTrue(first.__eq__(second))

def test_remote_runner_translation(self):
remote_runner = DataflowRunner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability.artifact_service import BeamFilesystemHandler
from apache_beam.transforms import environments
from apache_beam.transforms import external
from apache_beam.transforms import ptransform
Expand Down Expand Up @@ -128,3 +130,8 @@ def with_pipeline(component, pcoll_id=None):
except Exception: # pylint: disable=broad-except
return beam_expansion_api_pb2.ExpansionResponse(
error=traceback.format_exc())

def artifact_service(self):
"""Returns a service to retrieve artifacts for use in a job."""
return artifact_service.ArtifactRetrievalService(
BeamFilesystemHandler(None).file_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.testing.util import equal_to
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms.environments_test import mock_python_sdk_dependencies

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -308,12 +309,22 @@ def create_options(self):


class PortableRunnerInternalTest(unittest.TestCase):
def setUp(self) -> None:
self.actual_python_sdk_dependencies = environments.python_sdk_dependencies
environments.python_sdk_dependencies = mock_python_sdk_dependencies

def tearDown(self) -> None:
environments.python_sdk_dependencies = self.actual_python_sdk_dependencies

def test__create_default_environment(self):
docker_image = environments.DockerEnvironment.default_docker_image()
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({'sdk_location': 'container'})),
environments.DockerEnvironment(container_image=docker_image))
options=PipelineOptions.from_dictionary(
{'sdk_location': 'container'})),
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_docker_environment(self):
docker_image = 'py-docker'
Expand All @@ -324,7 +335,9 @@ def test__create_docker_environment(self):
'environment_config': docker_image,
'sdk_location': 'container',
})),
environments.DockerEnvironment(container_image=docker_image))
environments.DockerEnvironment(
container_image=docker_image,
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_process_environment(self):
self.assertEqual(
Expand All @@ -337,15 +350,21 @@ def test__create_process_environment(self):
'sdk_location': 'container',
})),
environments.ProcessEnvironment(
'run.sh', os='linux', arch='amd64', env={'k1': 'v1'}))
'run.sh',
os='linux',
arch='amd64',
env={'k1': 'v1'},
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
self.assertEqual(
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
'environment_type': 'PROCESS',
'environment_config': '{"command": "run.sh"}',
'sdk_location': 'container',
})),
environments.ProcessEnvironment('run.sh'))
environments.ProcessEnvironment(
'run.sh',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))

def test__create_external_environment(self):
self.assertEqual(
Expand All @@ -355,7 +374,9 @@ def test__create_external_environment(self):
'environment_config': 'localhost:50000',
'sdk_location': 'container',
})),
environments.ExternalEnvironment('localhost:50000'))
environments.ExternalEnvironment(
'localhost:50000',
artifacts=environments.python_sdk_dependencies(PipelineOptions())))
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
Expand All @@ -366,7 +387,10 @@ def test__create_external_environment(self):
'sdk_location': 'container',
})),
environments.ExternalEnvironment(
'localhost:50000', params={"k1": "v1"}))
'localhost:50000',
params={"k1": "v1"},
artifacts=environments.python_sdk_dependencies(
PipelineOptions())))
with self.assertRaises(ValueError):
PortableRunner._create_environment(
PipelineOptions.from_dictionary({
Expand Down
52 changes: 50 additions & 2 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import logging
import os
import shutil
import subprocess
import sys
import tempfile
from importlib.metadata import distribution
Expand Down Expand Up @@ -84,6 +85,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# Filename that stores the submission environment dependencies.
SUBMISSION_ENV_DEPENDENCIES_FILE = 'submission_environment_dependencies.txt'
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

Expand Down Expand Up @@ -159,9 +162,10 @@ def extract_staging_tuple_iter(
def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
skip_prestaged_dependencies=False, # type: Optional[bool]
log_submission_env_dependencies=True, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.

Expand All @@ -183,6 +187,8 @@ def create_job_resources(options, # type: PipelineOptions
cache. Used only for testing.
skip_prestaged_dependencies: Skip staging dependencies that can be
added into SDK containers during prebuilding.
log_submission_env_dependencies: (Optional) param to stage and log
submission environment dependencies. Defaults to True.

Returns:
A list of ArtifactInformation to be used for staging resources.
Expand Down Expand Up @@ -365,6 +371,11 @@ def create_job_resources(options, # type: PipelineOptions
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

# stage the submission environment dependencies, if enabled.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
if log_submission_env_dependencies:
resources.extend(
Stager._create_stage_submission_env_dependencies(temp_dir))

return resources

def stage_job_resources(self,
Expand Down Expand Up @@ -850,3 +861,40 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
return [
Stager._create_file_stage_to_artifact(local_download_file, staged_name)
]

@staticmethod
def _create_stage_submission_env_dependencies(temp_dir):
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
"""Create and stage a file with list of dependencies installed in the
submission environment.

This list can be used at runtime to compare against the dependencies in the
runtime environment. This allows runners to warn users about any potential
dependency mismatches and help debug issues related to
environment mismatches.

Args:
temp_dir: path to temporary location where the file should be
downloaded.

Returns:
A list of ArtifactInformation of local file path that will be staged to
the staging location.
"""
try:
local_dependency_file_path = os.path.join(
temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILE)
dependencies = subprocess.check_output(
[sys.executable, '-m', 'pip', 'freeze'])
local_python_path = f"Python Path: {sys.executable}\n"
with open(local_dependency_file_path, 'w') as f:
f.write(local_python_path + str(dependencies))
return [
Stager._create_file_stage_to_artifact(
local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILE),
]
except Exception as e:
_LOGGER.warning(
"Couldn't stage a list of installed dependencies in "
"submission environment. Got exception: %s",
e)
return []
Loading
Loading