Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Log dependencies installed in submission environment #28564

Merged
merged 26 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e441b52
log runtime dependencies
riteshghorse Sep 19, 2023
1497427
log submission env dependencies
riteshghorse Sep 20, 2023
b696f6f
rm spare line
riteshghorse Sep 20, 2023
8cbeb61
update tests for staged files
riteshghorse Sep 20, 2023
a2761c9
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 20, 2023
1e68428
update equals test to ignore artifacts
riteshghorse Sep 21, 2023
284f772
handle unit tests, refactor
riteshghorse Sep 22, 2023
8f03251
unit test sub env staging, convert to string
riteshghorse Sep 22, 2023
7e6e62a
change Log to Printf
riteshghorse Sep 26, 2023
c03378f
Merge branch 'master' of https://github.com/apache/beam into sdeps
riteshghorse Sep 27, 2023
cc24a1d
change log level to warning
riteshghorse Oct 3, 2023
6b6b222
try env
riteshghorse Dec 6, 2023
b16d23d
Merge remote-tracking branch 'origin' into sdeps
riteshghorse Dec 6, 2023
649c8a0
merge master
riteshghorse Dec 18, 2023
9b65385
add artifact_service method
riteshghorse Dec 20, 2023
f2e821d
merge master
riteshghorse Dec 20, 2023
789c764
correct urn
riteshghorse Dec 20, 2023
df68c1d
fix artifact comparison, file reader
riteshghorse Dec 20, 2023
4dfe7e0
add mock for python sdk dependencies and update artifact service method
riteshghorse Feb 5, 2024
b7d62bc
fix lint
riteshghorse Feb 5, 2024
404286b
use magic mock instead of mocking entire function
riteshghorse Feb 9, 2024
da2b954
update dataflow runner test
riteshghorse Feb 15, 2024
eeedc44
Update sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
riteshghorse Feb 15, 2024
c527114
use debug option to disable
riteshghorse Mar 7, 2024
279e01d
pull upstream
riteshghorse Mar 7, 2024
80d6d18
remove tmp directory mock
riteshghorse Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from urllib.parse import urlparse

from packaging import version
from pip._internal.operations import freeze
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

from apache_beam.internal import pickler
from apache_beam.internal.http_client import get_new_http
Expand All @@ -84,6 +85,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# Filename that stores the submission environment dependencies.
SUBMISSION_ENV_DEPENDENCIES_FILENAME = 'submission_environment_dependencies.txt'
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

Expand Down Expand Up @@ -365,6 +368,16 @@ def create_job_resources(options, # type: PipelineOptions
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

# stage the submission environment dependencies
local_dependency_file_path = os.path.join(
temp_dir, SUBMISSION_ENV_DEPENDENCIES_FILENAME)
dependencies = freeze.freeze()
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
with open(local_dependency_file_path, 'w') as f:
f.write('\n'.join(dependencies))
resources.append(
Stager._create_file_stage_to_artifact(
local_dependency_file_path, SUBMISSION_ENV_DEPENDENCIES_FILENAME))

return resources

def stage_job_resources(self,
Expand Down
94 changes: 69 additions & 25 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_no_main_session(self):
options.view_as(SetupOptions).save_main_session = False
self.update_options(options)

self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -180,7 +180,10 @@ def test_with_main_session(self):
options.view_as(SetupOptions).pickle_library = pickler.USE_DILL
self.update_options(options)

self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
self.assertEqual([
names.PICKLED_MAIN_SESSION_FILE,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertTrue(
Expand All @@ -199,7 +202,7 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
# session is saved when pickle_library==cloudpickle.
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
self.update_options(options)
self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -208,7 +211,7 @@ def test_default_resources(self):
options = PipelineOptions()
self.update_options(options)

self.assertEqual([],
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -225,7 +228,12 @@ def test_with_requirements_file(self):
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
self.assertEqual(
sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
sorted([
stager.REQUIREMENTS_FILE,
'abc.txt',
'def.txt',
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
]),
sorted(
self.stager.create_and_stage_job_resources(
options,
Expand All @@ -246,9 +254,12 @@ def test_with_pypi_requirements(self):
pypi_requirements=['nothing>=1.0,<2.0'],
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]
self.assertEqual(3, len(resources))
self.assertEqual(4, len(resources))
self.assertTrue({'abc.txt', 'def.txt'} <= set(resources))
generated_requirements = (set(resources) - {'abc.txt', 'def.txt'}).pop()
generated_requirements = (
set(resources) -
{'abc.txt', 'def.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
}).pop()
with open(os.path.join(staging_dir, generated_requirements)) as f:
data = f.read()
self.assertEqual('nothing>=1.0,<2.0', data)
Expand Down Expand Up @@ -282,7 +293,12 @@ def test_with_requirements_file_and_cache(self):
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
self.assertEqual(
sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
sorted([
stager.REQUIREMENTS_FILE,
'abc.txt',
'def.txt',
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
]),
sorted(
self.stager.create_and_stage_job_resources(
options,
Expand Down Expand Up @@ -313,7 +329,10 @@ def test_requirements_cache_not_populated_when_cache_disabled(self):
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]
assert not populate_requirements_cache.called
self.assertEqual([stager.REQUIREMENTS_FILE], resources)
self.assertEqual([
stager.REQUIREMENTS_FILE, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
resources)
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt')))

Expand Down Expand Up @@ -378,7 +397,8 @@ def test_sdk_location_default(self):
_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)

self.assertEqual([], staged_resources)
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
staged_resources)

def test_sdk_location_local_directory(self):
staging_dir = self.make_temp_dir()
Expand All @@ -391,7 +411,10 @@ def test_sdk_location_local_directory(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
Expand All @@ -409,7 +432,10 @@ def test_sdk_location_local_source_file(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
Expand All @@ -427,9 +453,10 @@ def test_sdk_location_local_wheel_file(self):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([sdk_filename],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertEqual(
[sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
tarball_path = os.path.join(staging_dir, sdk_filename)
with open(tarball_path) as f:
self.assertEqual(f.read(), 'Package content.')
Expand Down Expand Up @@ -463,7 +490,10 @@ def test_sdk_location_remote_source_file(self, *unused_mocks):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location

self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand All @@ -485,9 +515,10 @@ def file_download(_, to_path):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(file_download)):
self.assertEqual([sdk_filename],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertEqual(
[sdk_filename, stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

wheel_file_path = os.path.join(staging_dir, sdk_filename)
with open(wheel_file_path) as f:
Expand All @@ -509,7 +540,10 @@ def file_download(_, to_path):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(file_download)):
self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.assertEqual([
names.STAGED_SDK_SOURCES_FILENAME,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])

Expand Down Expand Up @@ -551,7 +585,8 @@ def test_with_extra_packages(self):
'xyz2.tar',
'whl.whl',
'remote_file.tar.gz',
stager.EXTRA_PACKAGES_FILE
stager.EXTRA_PACKAGES_FILE,
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
Expand Down Expand Up @@ -659,7 +694,13 @@ def test_with_jar_packages(self):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._is_remote_path',
staticmethod(self.is_remote_path)):
self.assertEqual(['abc.jar', 'xyz.jar', 'ijk.jar', 'remote.jar'],
self.assertEqual([
'abc.jar',
'xyz.jar',
'ijk.jar',
'remote.jar',
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files)
Expand Down Expand Up @@ -719,7 +760,8 @@ def test_populate_requirements_cache_with_bdist(self):
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]
for f in resources:
if f != stager.REQUIREMENTS_FILE:
if (f != stager.REQUIREMENTS_FILE and
f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME):
self.assertTrue(('.tar.gz' in f) or ('.whl' in f))

# requirements cache will populated only with sdists/sources
Expand All @@ -744,7 +786,8 @@ def test_populate_requirements_cache_with_sdist(self):
options, staging_location=staging_dir)[1]

for f in resources:
if f != stager.REQUIREMENTS_FILE:
if (f != stager.REQUIREMENTS_FILE and
f != stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME):
self.assertTrue('.tar.gz' in f)
self.assertTrue('.whl' not in f)

Expand Down Expand Up @@ -777,7 +820,8 @@ def test_populate_requirements_cache_with_local_files(self):
stager.REQUIREMENTS_FILE,
stager.EXTRA_PACKAGES_FILE,
'nothing.tar.gz',
'local_package.tar.gz'
'local_package.tar.gz',
stager.SUBMISSION_ENV_DEPENDENCIES_FILENAME
]),
sorted(resources))

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def __init__(self,
dict(resource_hints) if resource_hints else {})

def __eq__(self, other):
# don't compare artifacts since they have different hashes in their names.
return (
self.__class__ == other.__class__ and
self._artifacts == other._artifacts
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
self.__class__ == other.__class__
# Assuming that we don't have instances of the same Environment subclass
# with different set of capabilities.
and self._resource_hints == other._resource_hints)
Expand Down
34 changes: 34 additions & 0 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
Expand Down Expand Up @@ -409,7 +410,40 @@ func installSetupPackages(ctx context.Context, logger *tools.Logger, files []str
if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil {
return fmt.Errorf("failed to install workflow: %v", err)
}
if err := logRuntimeDependencies(ctx, logger); err != nil {
logger.Warnf(ctx, "couldn't fetch the runtime dependencies: %v", err)
}
if err := logSubmissionEnvDependencies(ctx, logger, workDir); err != nil {
logger.Warnf(ctx, "couldn't fetch the submission environment dependencies: %v", err)
}

return nil
}

func logSubmissionEnvDependencies(ctx context.Context, logger *tools.Logger, dir string) error {
logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging submission environment dependencies:")
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
filename := filepath.Join(dir, "submission_environment_dependencies.txt")
content, err := os.ReadFile(filename)
if err != nil {
return err
}
logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, string(content))
return nil
}

func logRuntimeDependencies(ctx context.Context, logger *tools.Logger) error {
logger.Log(ctx, fnexecution_v1.LogEntry_Severity_DEBUG, "Logging runtime dependencies:")
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
args := []string{"-m", "pip", "freeze"}
bufLogger := tools.NewBufferedLogger(logger)
if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
bufLogger.FlushAtError(ctx)
} else {
bufLogger.FlushAtDebug(ctx)
}
return nil
}

Expand Down