Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(samples): Add Dataflow to Pub/Sub snippet #11104

Merged
merged 8 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dataflow/snippets/batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
# limitations under the License.

# [START dataflow_batch_write_to_storage]
import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv=None):
def write_to_cloud_storage(argv : List[str] = None) -> None:
# Parse the pipeline options passed into the application.
class MyOptions(PipelineOptions):
@classmethod
# Define a custom pipeline option that specfies the Cloud Storage bucket.
def _add_argparse_args(cls, parser):
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--output", required=True)

wordsList = ["1", "2", "3", "4"]
Expand Down
42 changes: 42 additions & 0 deletions dataflow/snippets/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be imported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12"],
rsamborski marked this conversation as resolved.
Show resolved Hide resolved
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {},
}
4 changes: 2 additions & 2 deletions dataflow/snippets/tests/test_batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@


@pytest.fixture(scope="function")
def setup_and_teardown():
def setup_and_teardown() -> None:
try:
bucket = storage_client.create_bucket(bucket_name)
yield
finally:
bucket.delete(force=True)


def test_write_to_cloud_storage(setup_and_teardown):
def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
sys.argv = ['', f'--output=gs://{bucket_name}/output/out-']
write_to_cloud_storage()

Expand Down
96 changes: 96 additions & 0 deletions dataflow/snippets/tests/test_write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# !/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
from unittest.mock import patch
import uuid

from google.cloud import pubsub_v1

import pytest

from ..write_pubsub import write_to_pubsub


topic_id = f'test-topic-{uuid.uuid4()}'
subscription_id = f'{topic_id}-sub'
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

NUM_MESSAGES = 4
TIMEOUT = 60 * 5


@pytest.fixture(scope="function")
def setup_and_teardown() -> None:
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

try:
publisher.create_topic(request={"name": topic_path})
subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
yield
finally:
subscriber.delete_subscription(
request={"subscription": subscription_path})
publisher.delete_topic(request={"topic": topic_path})


def read_messages() -> None:
received_messages = []
ack_ids = []

# Read messages from Pub/Sub. It might be necessary to read multiple
# batches, Use a timeout value to avoid potentially looping forever.
start_time = time.time()
while time.time() - start_time <= TIMEOUT:
# Pull messages from Pub/Sub.
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)
received_messages.append(response.received_messages)

for received_message in response.received_messages:
ack_ids.append(received_message.ack_id)

# Acknowledge the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)

if (len(received_messages) >= NUM_MESSAGES):
break

time.sleep(5)

return received_messages


def test_write_to_pubsub(setup_and_teardown: None) -> None:
with patch("sys.argv", [
"", '--streaming', f'--project={project_id}', f'--topic={topic_id}'
]):
write_to_pubsub()

# Read from Pub/Sub to verify the pipeline successfully wrote messages.
# Duplicate reads are possible.
messages = read_messages()
assert (len(messages) >= NUM_MESSAGES)
75 changes: 75 additions & 0 deletions dataflow/snippets/write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_pubsub_write_with_attributes]i
import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
attributes = {
'buyer': item['name'],
'timestamp': str(item['ts'])
}
data = bytes(item['product'], 'utf-8')

return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:

# Parse the pipeline options passed into the application. Example:
# --project=$PROJECT_ID --topic=$TOPIC_NAME --streaming
# For more information, see
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
class MyOptions(PipelineOptions):
@classmethod
# Define custom pipeline options that specify the project ID and Pub/Sub
# topic.
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--project", required=True)
parser.add_argument("--topic", required=True)

example_data = [
{'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
{'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
{'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
{'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
]
options = MyOptions()

with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(example_data)
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
| WriteToPubSub(
topic=f'projects/{options.project}/topics/{options.topic}',
with_attributes=True)
)

print('Pipeline ran successfully.')
# [END dataflow_pubsub_write_with_attributes]


if __name__ == "__main__":
write_to_pubsub()