Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YAML] Add PubSub reading and writing transforms. #28595

Merged
merged 9 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,55 @@ def read_from_pubsub(
schema: Optional[Any] = None,
attributes: Optional[Iterable[str]] = None,
attributes_map: Optional[str] = None,
id_attribute: Optional[str] = None,
timestamp_attribute: Optional[str] = None):
robertwb marked this conversation as resolved.
Show resolved Hide resolved
"""Reads messages from Cloud Pub/Sub.

Args:
topic: Cloud Pub/Sub topic in the form
"projects/<project>/topics/<topic>". If provided, subscription must be
None.
subscription: Existing Cloud Pub/Sub subscription to use in the
form "projects/<project>/subscriptions/<subscription>". If not
specified, a temporary subscription will be created from the specified
topic. If provided, topic must be None.
format: The expected format of the message payload. Currently suported
formats are

- raw: Produces records with a single `payload` field whose contents
are the raw bytes of the pubsub message.

schema: Schema specification for the given format.
attributes: List of attribute keys whose values will be flattened into the
output message as additional fields. For example, if the format is `raw`
and attributes is `["a", "b"]` then this read will produce elements of
the form `Row(payload=..., a=..., b=...)`.
attribute_map: Name of a field in which to store the full set of attributes
associated with this message. For example, if the format is `raw` and
`attribute_map` is set to `"attrs"` then this read will produce elements
of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type
of string to string.
If both `attributes` and `attribute_map` are set, the overlapping
attribute values will be present in both the flattened structure and the
attribute map.
id_attribute: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
timestamp_attribute: Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.

Timestamp values should be in one of two formats:

- A numerical value representing the number of milliseconds since the
Unix epoch.
- A string in RFC 3339 format, UTC timezone. Example:
``2015-10-29T23:41:41.123Z``. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.
"""
if topic and subscription:
raise TypeError('Only one of topic and subscription may be specified.')
elif not topic and not subscription:
Expand Down Expand Up @@ -200,6 +248,7 @@ def mapper(msg):
topic=topic,
robertwb marked this conversation as resolved.
Show resolved Hide resolved
subscription=subscription,
with_attributes=bool(attributes or attributes_map),
id_label=id_attribute,
timestamp_attribute=timestamp_attribute)
| 'ParseMessage' >> beam.Map(mapper))
robertwb marked this conversation as resolved.
Show resolved Hide resolved
output.element_type = schemas.named_tuple_from_schema(
Expand All @@ -217,8 +266,41 @@ def write_to_pubsub(
schema: Optional[Any] = None,
attributes: Optional[Iterable[str]] = None,
attributes_map: Optional[str] = None,
id_attribute: Optional[str] = None,
timestamp_attribute: Optional[str] = None):

"""Writes messages from Cloud Pub/Sub.

Args:
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
format: How to format the message payload. Currently suported
formats are

- raw: Expects a message with a single field (excluding
attribute-related fields )whose contents are used as the raw bytes
of the pubsub message.

schema: Schema specification for the given format.
attributes: List of attribute keys whose values will be pulled out as
PubSub message attributes. For example, if the format is `raw`
and attributes is `["a", "b"]` then elements of the form
`Row(any_field=..., a=..., b=...)` will result in PubSub messages whose
payload has the contents of any_field and whose attribute will be
populated with the values of `a` and `b`.
attribute_map: Name of a string-to-string map field in which to pull a set
of attributes associated with this message. For example, if the format
is `raw` and `attribute_map` is set to `"attrs"` then elements of the form
`Row(any_field=..., attrs=...)` will result in PubSub messages whose
payload has the contents of any_field and whose attribute will be
populated with the values from attrs.
If both `attributes` and `attribute_map` are set, the union of attributes
from these two sources will be used to populate the PubSub message
attributes.
id_attribute: If set, will set an attribute for each Cloud Pub/Sub message
with the given name and a unique value. This attribute can then be used
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the value.
"""
input_schema = schemas.schema_from_element_type(pcoll.element_type)

extra_fields = []
Expand Down Expand Up @@ -256,7 +338,10 @@ def attributes_extractor(row):
lambda row: beam.io.gcp.pubsub.PubsubMessage(
formatter(row), attributes_extractor(row)))
| beam.io.WriteToPubSub(
topic, with_attributes=True, timestamp_attribute=timestamp_attribute))
topic,
with_attributes=True,
id_label=id_attribute,
timestamp_attribute=timestamp_attribute))


def io_providers():
Expand Down
65 changes: 61 additions & 4 deletions sdks/python/apache_beam/yaml/yaml_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,27 @@

class FakeReadFromPubSub:
robertwb marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self, topic, messages, subscription=None, timestamp_attribute=None):
self,
topic,
messages,
subscription=None,
id_attribute=None,
timestamp_attribute=None):
self._topic = topic
self._messages = messages
self._id_attribute = id_attribute
self._timestamp_attribute = timestamp_attribute

def __call__(
self, *, topic, subscription, with_attributes, timestamp_attribute):
self,
*,
topic,
subscription,
with_attributes,
id_label,
timestamp_attribute):
assert topic == self._topic
assert id_label == self._id_attribute
assert timestamp_attribute == self._timestamp_attribute
assert subscription == subscription
if with_attributes:
Expand All @@ -50,14 +63,17 @@ def __call__(


class FakeWriteToPubSub:
def __init__(self, topic, messages, timestamp_attribute=None):
def __init__(
self, topic, messages, id_attribute=None, timestamp_attribute=None):
self._topic = topic
self._messages = messages
self._id_attribute = id_attribute
self._timestamp_attribute = timestamp_attribute

def __call__(self, topic, *, with_attributes, timestamp_attribute):
def __call__(self, topic, *, with_attributes, id_label, timestamp_attribute):
robertwb marked this conversation as resolved.
Show resolved Hide resolved
assert topic == self._topic
assert with_attributes == True
assert id_label == self._id_attribute
assert timestamp_attribute == self._timestamp_attribute
return AssertThat(equal_to(self._messages))

Expand Down Expand Up @@ -131,6 +147,27 @@ def test_read_with_attribute_map(self):
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))

def test_read_with_id_attribute(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
with mock.patch('apache_beam.io.ReadFromPubSub',
FakeReadFromPubSub(
topic='my_topic',
messages=[PubsubMessage(b'msg1', {'attr': 'value1'}),
PubsubMessage(b'msg2', {'attr': 'value2'})],
id_attribute='some_attr')):
result = p | YamlTransform(
'''
type: ReadFromPubSub
config:
topic: my_topic
format: raw
id_attribute: some_attr
''')
assert_that(
result,
equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')]))

def test_simple_write(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Expand Down Expand Up @@ -195,6 +232,26 @@ def test_write_with_attribute_map(self):
attributes_map: attrMap
'''))

def test_write_with_id_attribute(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
with mock.patch('apache_beam.io.WriteToPubSub',
FakeWriteToPubSub(topic='my_topic',
messages=[PubsubMessage(b'msg1', {}),
PubsubMessage(b'msg2', {})],
id_attribute='some_attr')):
_ = (
p | beam.Create([beam.Row(a=b'msg1'), beam.Row(a=b'msg2')])
| YamlTransform(
'''
type: WriteToPubSub
input: input
config:
topic: my_topic
format: raw
id_attribute: some_attr
'''))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down