Skip to content

Commit

Permalink
Remove unneeded Dataflow Runner v1 code. (#27196)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jul 10, 2023
1 parent c5d6183 commit ca674aa
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 1,692 deletions.
133 changes: 0 additions & 133 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,29 +280,6 @@ def _get_component_coders(self):
# refined in user-defined Coders.
return []

def as_cloud_object(self, coders_context=None):
"""For internal use only; no backwards-compatibility guarantees.
Returns Google Cloud Dataflow API description of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.

value = {
# We pass coders in the form "<coder_name>$<pickled_data>" to make the
# job description JSON more readable. Data before the $ is ignored by
# the worker.
'@type': serialize_coder(self),
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

if coders_context:
value['pipeline_proto_coder_id'] = coders_context.get_id(self)

return value

def __repr__(self):
return self.__class__.__name__

Expand Down Expand Up @@ -493,11 +470,6 @@ def is_deterministic(self):
def to_type_hint(self):
return bytes

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:bytes',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -667,11 +639,6 @@ def is_deterministic(self):
def to_type_hint(self):
return int

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:varint',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -846,21 +813,6 @@ def is_deterministic(self):
# GroupByKey operations.
return False

def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]

return value

# We allow .key_coder() and .value_coder() to be called on PickleCoder since
# we can't always infer the return values of lambdas in ParDo operations, the
# result of which may be used in a GroupBykey.
Expand Down Expand Up @@ -983,21 +935,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
def to_type_hint(self):
return Any

def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]

return value

# We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder
# since we can't always infer the return values of lambdas in ParDo
# operations, the result of which may be used in a GroupBykey.
Expand Down Expand Up @@ -1231,19 +1168,6 @@ def from_type_hint(cls, typehint, registry):
# type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
return cls([registry.get_coder(t) for t in typehint.tuple_types])

def as_cloud_object(self, coders_context=None):
if self.is_kv_coder():
return {
'@type': 'kind:pair',
'is_pair_like': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

return super().as_cloud_object(coders_context)

def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return self.coders()
Expand Down Expand Up @@ -1353,15 +1277,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
return type(self)(
self._elem_coder.as_deterministic_coder(step_label, error_message))

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:stream',
'is_stream_like': True,
'component_encodings': [
self._elem_coder.as_cloud_object(coders_context)
],
}

def value_coder(self):
return self._elem_coder

Expand Down Expand Up @@ -1409,11 +1324,6 @@ def __init__(self):
from apache_beam.transforms import window
super().__init__(window.GlobalWindow())

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:global_window',
}


Coder.register_structured_urn(
common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder)
Expand All @@ -1428,11 +1338,6 @@ def is_deterministic(self):
# type: () -> bool
return True

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:interval_window',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -1466,16 +1371,6 @@ def is_deterministic(self):
c.is_deterministic() for c in
[self.wrapped_value_coder, self.timestamp_coder, self.window_coder])

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:windowed_value',
'is_wrapper': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

def _get_component_coders(self):
# type: () -> List[Coder]
return [self.wrapped_value_coder, self.window_coder]
Expand Down Expand Up @@ -1527,10 +1422,6 @@ def is_deterministic(self):
# type: () -> bool
return self.wrapped_value_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
raise NotImplementedError(
"as_cloud_object not supported for ParamWindowedValueCoder")

def __repr__(self):
return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder

Expand Down Expand Up @@ -1577,14 +1468,6 @@ def estimate_size(self, value):
def value_coder(self):
return self._value_coder

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:length_prefix',
'component_encodings': [
self._value_coder.as_cloud_object(coders_context)
],
}

def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._value_coder, )
Expand Down Expand Up @@ -1680,14 +1563,6 @@ def is_deterministic(self):
# type: () -> bool
return self._key_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:sharded_key',
'component_encodings': [
self._key_coder.as_cloud_object(coders_context)
],
}

def to_type_hint(self):
from apache_beam.typehints import sharded_key_type
return sharded_key_type.ShardedKeyTypeConstraint(
Expand Down Expand Up @@ -1738,14 +1613,6 @@ def _get_component_coders(self) -> List[Coder]:
def is_deterministic(self) -> bool:
return self._window_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:custom_window',
'component_encodings': [
self._window_coder.as_cloud_object(coders_context)
],
}

def __repr__(self):
return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder

Expand Down
41 changes: 0 additions & 41 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,6 @@ def test_timer_coder(self):

def test_tuple_coder(self):
kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:pair',
'is_pair_like': True,
'component_encodings': [
coders.VarIntCoder().as_cloud_object(),
coders.BytesCoder().as_cloud_object()
],
},
kv_coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x04abc', kv_coder.encode((4, b'abc')))
# Test unnested
Expand Down Expand Up @@ -424,13 +414,6 @@ def test_utf8_coder(self):

def test_iterable_coder(self):
iterable_coder = coders.IterableCoder(coders.VarIntCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:stream',
'is_stream_like': True,
'component_encodings': [coders.VarIntCoder().as_cloud_object()]
},
iterable_coder.as_cloud_object())
# Test unnested
self.check_coder(iterable_coder, [1], [-1, 0, 100])
# Test nested
Expand Down Expand Up @@ -507,16 +490,6 @@ def test_windowedvalue_coder_paneinfo(self):
def test_windowed_value_coder(self):
coder = coders.WindowedValueCoder(
coders.VarIntCoder(), coders.GlobalWindowCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:windowed_value',
'is_wrapper': True,
'component_encodings': [
coders.VarIntCoder().as_cloud_object(),
coders.GlobalWindowCoder().as_cloud_object(),
],
},
coder.as_cloud_object())
# Test binary representation
self.assertEqual(
b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
Expand Down Expand Up @@ -618,8 +591,6 @@ def test_proto_coder(self):
def test_global_window_coder(self):
coder = coders.GlobalWindowCoder()
value = window.GlobalWindow()
# Verify cloud object representation
self.assertEqual({'@type': 'kind:global_window'}, coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'', coder.encode(value))
self.assertEqual(value, coder.decode(b''))
Expand All @@ -630,12 +601,6 @@ def test_global_window_coder(self):

def test_length_prefix_coder(self):
coder = coders.LengthPrefixCoder(coders.BytesCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:length_prefix',
'component_encodings': [coders.BytesCoder().as_cloud_object()]
},
coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x00', coder.encode(b''))
self.assertEqual(b'\x01a', coder.encode(b'a'))
Expand Down Expand Up @@ -725,12 +690,6 @@ def test_sharded_key_coder(self):

for key, bytes_repr, key_coder in key_and_coders:
coder = coders.ShardedKeyCoder(key_coder)
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:sharded_key',
'component_encodings': [key_coder.as_cloud_object()]
},
coder.as_cloud_object())

# Test str repr
self.assertEqual('%s' % coder, 'ShardedKeyCoder[%s]' % key_coder)
Expand Down
9 changes: 0 additions & 9 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

# pytype: skip-file

from google.protobuf import json_format

from apache_beam.coders import typecoders
from apache_beam.coders.coder_impl import LogicalTypeCoderImpl
from apache_beam.coders.coder_impl import RowCoderImpl
Expand Down Expand Up @@ -91,13 +89,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
def to_type_hint(self):
return self._type_hint

def as_cloud_object(self, coders_context=None):
value = super().as_cloud_object(coders_context)

value['schema'] = json_format.MessageToJson(self.schema).encode('utf-8')

return value

def __hash__(self):
return hash(self.schema.SerializeToString())

Expand Down
11 changes: 0 additions & 11 deletions sdks/python/apache_beam/coders/row_coder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from itertools import chain

import numpy as np
from google.protobuf import json_format
from numpy.testing import assert_array_equal

import apache_beam as beam
Expand Down Expand Up @@ -398,16 +397,6 @@ def test_row_coder_fail_early_bad_schema(self):
self.assertRaisesRegex(
ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))

def test_row_coder_cloud_object_schema(self):
schema_proto = schema_pb2.Schema(id='some-cloud-object-schema')
schema_proto_json = json_format.MessageToJson(schema_proto).encode('utf-8')

coder = RowCoder(schema_proto)

cloud_object = coder.as_cloud_object()

self.assertEqual(schema_proto_json, cloud_object['schema'])

def test_batch_encode_decode(self):
coder = RowCoder(typing_to_runner_api(Person).row_type.schema).get_impl()
seq_out = coder_impl.create_OutputStream()
Expand Down
Loading

0 comments on commit ca674aa

Please sign in to comment.