Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unneeded Dataflow Runner v1 code. #27196

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 0 additions & 133 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,29 +280,6 @@ def _get_component_coders(self):
# refined in user-defined Coders.
return []

def as_cloud_object(self, coders_context=None):
"""For internal use only; no backwards-compatibility guarantees.

Returns Google Cloud Dataflow API description of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.

value = {
# We pass coders in the form "<coder_name>$<pickled_data>" to make the
# job description JSON more readable. Data before the $ is ignored by
# the worker.
'@type': serialize_coder(self),
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

if coders_context:
value['pipeline_proto_coder_id'] = coders_context.get_id(self)

return value

def __repr__(self):
return self.__class__.__name__

Expand Down Expand Up @@ -493,11 +470,6 @@ def is_deterministic(self):
def to_type_hint(self):
return bytes

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:bytes',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -667,11 +639,6 @@ def is_deterministic(self):
def to_type_hint(self):
return int

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:varint',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -846,21 +813,6 @@ def is_deterministic(self):
# GroupByKey operations.
return False

def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]

return value

# We allow .key_coder() and .value_coder() to be called on PickleCoder since
# we can't always infer the return values of lambdas in ParDo operations, the
# result of which may be used in a GroupBykey.
Expand Down Expand Up @@ -983,21 +935,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
def to_type_hint(self):
return Any

def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super().as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
# use for the value type in a more granular way. In places where the
# service expects a pair, it checks for the "is_pair_like" key, in which
# case we would fail without the hack below.
if is_pair_like:
value['is_pair_like'] = True
value['component_encodings'] = [
self.as_cloud_object(coders_context, is_pair_like=False),
self.as_cloud_object(coders_context, is_pair_like=False)
]

return value

# We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder
# since we can't always infer the return values of lambdas in ParDo
# operations, the result of which may be used in a GroupBykey.
Expand Down Expand Up @@ -1231,19 +1168,6 @@ def from_type_hint(cls, typehint, registry):
# type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
return cls([registry.get_coder(t) for t in typehint.tuple_types])

def as_cloud_object(self, coders_context=None):
if self.is_kv_coder():
return {
'@type': 'kind:pair',
'is_pair_like': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

return super().as_cloud_object(coders_context)

def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return self.coders()
Expand Down Expand Up @@ -1353,15 +1277,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
return type(self)(
self._elem_coder.as_deterministic_coder(step_label, error_message))

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:stream',
'is_stream_like': True,
'component_encodings': [
self._elem_coder.as_cloud_object(coders_context)
],
}

def value_coder(self):
return self._elem_coder

Expand Down Expand Up @@ -1409,11 +1324,6 @@ def __init__(self):
from apache_beam.transforms import window
super().__init__(window.GlobalWindow())

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:global_window',
}


Coder.register_structured_urn(
common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder)
Expand All @@ -1428,11 +1338,6 @@ def is_deterministic(self):
# type: () -> bool
return True

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:interval_window',
}

def __eq__(self, other):
return type(self) == type(other)

Expand Down Expand Up @@ -1466,16 +1371,6 @@ def is_deterministic(self):
c.is_deterministic() for c in
[self.wrapped_value_coder, self.timestamp_coder, self.window_coder])

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:windowed_value',
'is_wrapper': True,
'component_encodings': [
component.as_cloud_object(coders_context)
for component in self._get_component_coders()
],
}

def _get_component_coders(self):
# type: () -> List[Coder]
return [self.wrapped_value_coder, self.window_coder]
Expand Down Expand Up @@ -1527,10 +1422,6 @@ def is_deterministic(self):
# type: () -> bool
return self.wrapped_value_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
raise NotImplementedError(
"as_cloud_object not supported for ParamWindowedValueCoder")

def __repr__(self):
return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder

Expand Down Expand Up @@ -1577,14 +1468,6 @@ def estimate_size(self, value):
def value_coder(self):
return self._value_coder

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:length_prefix',
'component_encodings': [
self._value_coder.as_cloud_object(coders_context)
],
}

def _get_component_coders(self):
# type: () -> Tuple[Coder, ...]
return (self._value_coder, )
Expand Down Expand Up @@ -1680,14 +1563,6 @@ def is_deterministic(self):
# type: () -> bool
return self._key_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:sharded_key',
'component_encodings': [
self._key_coder.as_cloud_object(coders_context)
],
}

def to_type_hint(self):
from apache_beam.typehints import sharded_key_type
return sharded_key_type.ShardedKeyTypeConstraint(
Expand Down Expand Up @@ -1738,14 +1613,6 @@ def _get_component_coders(self) -> List[Coder]:
def is_deterministic(self) -> bool:
return self._window_coder.is_deterministic()

def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:custom_window',
'component_encodings': [
self._window_coder.as_cloud_object(coders_context)
],
}

def __repr__(self):
return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder

Expand Down
41 changes: 0 additions & 41 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,6 @@ def test_timer_coder(self):

def test_tuple_coder(self):
kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:pair',
'is_pair_like': True,
'component_encodings': [
coders.VarIntCoder().as_cloud_object(),
coders.BytesCoder().as_cloud_object()
],
},
kv_coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x04abc', kv_coder.encode((4, b'abc')))
# Test unnested
Expand Down Expand Up @@ -424,13 +414,6 @@ def test_utf8_coder(self):

def test_iterable_coder(self):
iterable_coder = coders.IterableCoder(coders.VarIntCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:stream',
'is_stream_like': True,
'component_encodings': [coders.VarIntCoder().as_cloud_object()]
},
iterable_coder.as_cloud_object())
# Test unnested
self.check_coder(iterable_coder, [1], [-1, 0, 100])
# Test nested
Expand Down Expand Up @@ -507,16 +490,6 @@ def test_windowedvalue_coder_paneinfo(self):
def test_windowed_value_coder(self):
coder = coders.WindowedValueCoder(
coders.VarIntCoder(), coders.GlobalWindowCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:windowed_value',
'is_wrapper': True,
'component_encodings': [
coders.VarIntCoder().as_cloud_object(),
coders.GlobalWindowCoder().as_cloud_object(),
],
},
coder.as_cloud_object())
# Test binary representation
self.assertEqual(
b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
Expand Down Expand Up @@ -618,8 +591,6 @@ def test_proto_coder(self):
def test_global_window_coder(self):
coder = coders.GlobalWindowCoder()
value = window.GlobalWindow()
# Verify cloud object representation
self.assertEqual({'@type': 'kind:global_window'}, coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'', coder.encode(value))
self.assertEqual(value, coder.decode(b''))
Expand All @@ -630,12 +601,6 @@ def test_global_window_coder(self):

def test_length_prefix_coder(self):
coder = coders.LengthPrefixCoder(coders.BytesCoder())
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:length_prefix',
'component_encodings': [coders.BytesCoder().as_cloud_object()]
},
coder.as_cloud_object())
# Test binary representation
self.assertEqual(b'\x00', coder.encode(b''))
self.assertEqual(b'\x01a', coder.encode(b'a'))
Expand Down Expand Up @@ -725,12 +690,6 @@ def test_sharded_key_coder(self):

for key, bytes_repr, key_coder in key_and_coders:
coder = coders.ShardedKeyCoder(key_coder)
# Verify cloud object representation
self.assertEqual({
'@type': 'kind:sharded_key',
'component_encodings': [key_coder.as_cloud_object()]
},
coder.as_cloud_object())

# Test str repr
self.assertEqual('%s' % coder, 'ShardedKeyCoder[%s]' % key_coder)
Expand Down
9 changes: 0 additions & 9 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

# pytype: skip-file

from google.protobuf import json_format

from apache_beam.coders import typecoders
from apache_beam.coders.coder_impl import LogicalTypeCoderImpl
from apache_beam.coders.coder_impl import RowCoderImpl
Expand Down Expand Up @@ -91,13 +89,6 @@ def as_deterministic_coder(self, step_label, error_message=None):
def to_type_hint(self):
return self._type_hint

def as_cloud_object(self, coders_context=None):
value = super().as_cloud_object(coders_context)

value['schema'] = json_format.MessageToJson(self.schema).encode('utf-8')

return value

def __hash__(self):
return hash(self.schema.SerializeToString())

Expand Down
10 changes: 0 additions & 10 deletions sdks/python/apache_beam/coders/row_coder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,6 @@ def test_row_coder_fail_early_bad_schema(self):
self.assertRaisesRegex(
ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))

def test_row_coder_cloud_object_schema(self):
schema_proto = schema_pb2.Schema(id='some-cloud-object-schema')
schema_proto_json = json_format.MessageToJson(schema_proto).encode('utf-8')

coder = RowCoder(schema_proto)

cloud_object = coder.as_cloud_object()

self.assertEqual(schema_proto_json, cloud_object['schema'])

def test_batch_encode_decode(self):
coder = RowCoder(typing_to_runner_api(Person).row_type.schema).get_impl()
seq_out = coder_impl.create_OutputStream()
Expand Down
Loading