Skip to content

Commit

Permalink
Revert "[YAML] Increase re-use of providers with implicitly overlappi…
Browse files Browse the repository at this point in the history
…ng trans…" (#31169)

This reverts commit dc5841d.
  • Loading branch information
damccorm authored May 3, 2024
1 parent 782b644 commit af1ff2f
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 228 deletions.
61 changes: 1 addition & 60 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ def config_schema(self, type):
def description(self, type):
return None

def __repr__(self):
ts = ','.join(sorted(self.provided_transforms()))
return f'{self.__class__.__name__}[{ts}]'

def to_json(self):
return {'type': self.__class__.__name__}

def requires_inputs(self, typ: str, args: Mapping[str, Any]) -> bool:
"""Returns whether this transform requires inputs.
Expand Down Expand Up @@ -114,16 +107,6 @@ def underlying_provider(self):
"""
return self

def with_underlying_provider(self, provider):
"""Returns a provider that vends as many transforms of this provider as
possible but uses the given underlying provider.
This is used to try to minimize the number of distinct environments that
are used as many providers implicitly or explicitly link in many common
transforms.
"""
return None

def affinity(self, other: "Provider"):
"""Returns a value approximating how good it would be for this provider
to be used immediately following a transform from the other provider
Expand Down Expand Up @@ -171,17 +154,6 @@ def __init__(self, urns, service):
def provided_transforms(self):
return self._urns.keys()

def with_underlying_provider(self, other_provider):
if not isinstance(other_provider, ExternalProvider):
return

all_urns = set(other_provider.schema_transforms().keys()).union(
set(other_provider._urns.values()))
for name, urn in self._urns.items():
if urn in all_urns and name not in other_provider._urns:
other_provider._urns[name] = urn
return other_provider

def schema_transforms(self):
if callable(self._service):
self._service = self._service()
Expand Down Expand Up @@ -548,15 +520,6 @@ def cache_artifacts(self):
def underlying_provider(self):
return self.sql_provider()

def with_underlying_provider(self, other_provider):
new_sql_provider = self.sql_provider().with_underlying_provider(
other_provider)
if new_sql_provider is None:
new_sql_provider = other_provider
if new_sql_provider and 'Sql' in set(
new_sql_provider.provided_transforms()):
return SqlBackedProvider(self._transforms, new_sql_provider)

def to_json(self):
return {'type': "SqlBackedProvider"}

Expand Down Expand Up @@ -1044,12 +1007,7 @@ def __exit__(self, *args):

@ExternalProvider.register_provider_type('renaming')
class RenamingProvider(Provider):
def __init__(
self,
transforms: Mapping[str, str],
mappings: Mapping[str, Mapping[str, str]],
underlying_provider: Provider,
defaults: Optional[Mapping[str, Mapping[str, Any]]] = None):
def __init__(self, transforms, mappings, underlying_provider, defaults=None):
if isinstance(underlying_provider, dict):
underlying_provider = ExternalProvider.provider_from_spec(
underlying_provider)
Expand Down Expand Up @@ -1151,23 +1109,6 @@ def _affinity(self, other):
def underlying_provider(self):
return self._underlying_provider.underlying_provider()

def with_underlying_provider(self, other_provider):
new_underlying_provider = (
self._underlying_provider.with_underlying_provider(other_provider))
if new_underlying_provider:
provided = set(new_underlying_provider.provided_transforms())
new_transforms = {
alias: actual
for alias,
actual in self._transforms.items() if actual in provided
}
if new_transforms:
return RenamingProvider(
new_transforms,
self._mappings,
new_underlying_provider,
self._defaults)

def cache_artifacts(self):
self._underlying_provider.cache_artifacts()

Expand Down
120 changes: 0 additions & 120 deletions sdks/python/apache_beam/yaml/yaml_provider_test.py

This file was deleted.

50 changes: 5 additions & 45 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import List
from typing import Mapping
from typing import Set
from typing import Tuple

import yaml
from yaml.loader import SafeLoader
Expand Down Expand Up @@ -179,46 +178,6 @@ def get_line(obj):
return getattr(obj, '_line_', 'unknown')


class ProviderSet:
def __init__(self, providers: Mapping[str, Iterable[yaml_provider.Provider]]):
self._providers = providers
self._original_providers = set(p for ps in providers.values() for p in ps)
self._expanded: Set[yaml_provider.Provider] = set()

def __contains__(self, key: str):
return key in self._providers

def __getitem__(self, key: str) -> Iterable[yaml_provider.Provider]:
return self._providers[key]

def items(self) -> Iterable[Tuple[str, Iterable[yaml_provider.Provider]]]:
return self._providers.items()

def use(self, provider):
underlying_provider = provider.underlying_provider()
if provider not in self._original_providers:
return
if underlying_provider in self._expanded:
return
self._expanded.add(underlying_provider)

# For every original provider, see if it can also be implemented in terms
# of this provider.
# This is useful because providers often vend everything that's been linked
# in, and we'd like to minimize provider boundary crossings.
# We do this lazily rather than when constructing the provider set to avoid
# instantiating every possible provider.
for other_provider in self._original_providers:
new_provider = other_provider.with_underlying_provider(
underlying_provider)
if new_provider is not None:
for t in new_provider.provided_transforms():
existing_providers = set(
p.underlying_provider() for p in self._providers[t])
if new_provider.underlying_provider() not in existing_providers:
self._providers[t].append(new_provider)


class LightweightScope(object):
def __init__(self, transforms):
self._transforms = transforms
Expand Down Expand Up @@ -261,7 +220,7 @@ def __init__(
root,
inputs: Mapping[str, Any],
transforms: Iterable[dict],
providers: ProviderSet,
providers: Mapping[str, Iterable[yaml_provider.Provider]],
input_providers: Iterable[yaml_provider.Provider]):
super().__init__(transforms)
self.root = root
Expand Down Expand Up @@ -412,7 +371,6 @@ def create_ptransform(self, spec, input_pcolls):
if pcoll in providers_by_input
]
provider = self.best_provider(spec, input_providers)
self.providers.use(provider)

config = SafeLineLoader.strip_metadata(spec.get('config', {}))
if not isinstance(config, dict):
Expand Down Expand Up @@ -551,7 +509,9 @@ def expand_composite_transform(spec, scope):
for (key, value) in empty_if_explicitly_empty(spec['input']).items()
},
spec['transforms'],
scope.providers,
yaml_provider.merge_providers(
yaml_provider.parse_providers(spec.get('providers', [])),
scope.providers),
scope.input_providers)

class CompositePTransform(beam.PTransform):
Expand Down Expand Up @@ -1052,7 +1012,7 @@ def expand(self, pcolls):
root,
pcolls,
transforms=[self._spec],
providers=ProviderSet(self._providers),
providers=self._providers,
input_providers={
pcoll: python_provider
for pcoll in pcolls.values()
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_scope_by_spec(self, p, spec):
scope = Scope(
beam.pvalue.PBegin(p), {},
spec['transforms'],
yaml_transform.ProviderSet(yaml_provider.standard_providers()), {})
yaml_provider.standard_providers(), {})
return scope, spec

def test_get_pcollection_input(self):
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from apache_beam.yaml import YamlTransform
from apache_beam.yaml import yaml_provider
from apache_beam.yaml.yaml_provider import InlineProvider
from apache_beam.yaml.yaml_transform import ProviderSet
from apache_beam.yaml.yaml_transform import SafeLineLoader
from apache_beam.yaml.yaml_transform import Scope
from apache_beam.yaml.yaml_transform import chain_as_composite
Expand Down Expand Up @@ -120,7 +119,7 @@ def get_scope_by_spec(self, p, spec, inputs=None):
beam.pvalue.PBegin(p),
inputs,
spec['transforms'],
ProviderSet(yaml_provider.standard_providers()), {})
yaml_provider.standard_providers(), {})
return scope, spec

def test_pipeline_as_composite_with_type_transforms(self):
Expand Down

0 comments on commit af1ff2f

Please sign in to comment.