Skip to content

Commit

Permalink
Merge pull request #33645 Use local loopback worker from Python expan…
Browse files Browse the repository at this point in the history
…sion services.
  • Loading branch information
robertwb authored Jan 17, 2025
2 parents dd16104 + 51b5872 commit f199cf0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,18 @@ def create_proto(
payload=beam_runner_api_pb2.AnyOfEnvironmentPayload(
environments=environments).SerializeToString())

def __hash__(self):
return sum(hash(env) for env in self._environments)

def __eq__(self, other):
return (
isinstance(other, AnyOfEnvironment) and
set(self._environments) == set(other._environments))

def to_runner_api(self, context):
return self.create_proto(
[env.to_runner_api(context) for env in self._environments])


class PyPIArtifactRegistry(object):
_registered_artifacts = set() # type: set[tuple[str, str]]
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,8 @@ def __enter__(self):
'{{PORT}}',
'--fully_qualified_name_glob=*',
'--pickle_library=cloudpickle',
'--requirements_file=' + os.path.join(venv + '-requirements.txt')
'--requirements_file=' + os.path.join(venv + '-requirements.txt'),
'--serve_loopback_worker',
])
self._service = self._service_provider.__enter__()
return self._service
Expand Down

0 comments on commit f199cf0

Please sign in to comment.