Skip to content

Commit

Permalink
docs: Add ingestion from GCS sample (#1273)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelpri10 authored Oct 10, 2024
1 parent c240fde commit b59cc8d
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
105 changes: 105 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,88 @@ def create_topic_with_kinesis_ingestion(
# [END pubsub_create_topic_with_kinesis_ingestion]


def create_topic_with_cloud_storage_ingestion(
project_id: str,
topic_id: str,
bucket: str,
input_format: str,
text_delimiter: str,
match_glob: str,
minimum_object_create_time: str,
) -> None:
"""Create a new Pub/Sub topic with Cloud Storage Ingestion Settings."""
# [START pubsub_create_topic_with_cloud_storage_ingestion]
from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text" (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
bucket=bucket,
)
if input_format == "text":
cloud_storage_settings.text_format = (
IngestionDataSourceSettings.CloudStorage.TextFormat(
delimiter=text_delimiter
)
)
elif input_format == "avro":
cloud_storage_settings.avro_format = (
IngestionDataSourceSettings.CloudStorage.AvroFormat()
)
elif input_format == "pubsub_avro":
cloud_storage_settings.pubsub_avro_format = (
IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
)
else:
print(
"Invalid input_format: "
+ input_format
+ "; must be in ('text', 'avro', 'pubsub_avro')"
)
return

if match_glob:
cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
try:
minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
minimum_object_create_time_timestamp.FromJsonString(
minimum_object_create_time
)
cloud_storage_settings.minimum_object_create_time = (
minimum_object_create_time_timestamp
)
except ValueError:
print("Invalid minimum_object_create_time: " + minimum_object_create_time)
return

request = Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
cloud_storage=cloud_storage_settings,
),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")
# [END pubsub_create_topic_with_cloud_storage_ingestion]


def update_topic_type(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -615,6 +697,19 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
create_topic_with_kinesis_ingestion_parser.add_argument("aws_role_arn")
create_topic_with_kinesis_ingestion_parser.add_argument("gcp_service_account")

create_topic_with_cloud_storage_ingestion_parser = subparsers.add_parser(
"create_cloud_storage_ingestion",
help=create_topic_with_cloud_storage_ingestion.__doc__,
)
create_topic_with_cloud_storage_ingestion_parser.add_argument("topic_id")
create_topic_with_cloud_storage_ingestion_parser.add_argument("bucket")
create_topic_with_cloud_storage_ingestion_parser.add_argument("input_format")
create_topic_with_cloud_storage_ingestion_parser.add_argument("text_delimiter")
create_topic_with_cloud_storage_ingestion_parser.add_argument("match_glob")
create_topic_with_cloud_storage_ingestion_parser.add_argument(
"minimum_object_create_time"
)

update_topic_type_parser = subparsers.add_parser(
"update_kinesis_ingestion", help=update_topic_type.__doc__
)
Expand Down Expand Up @@ -693,6 +788,16 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
args.aws_role_arn,
args.gcp_service_account,
)
elif args.command == "create_cloud_storage_ingestion":
create_topic_with_cloud_storage_ingestion(
args.project_id,
args.topic_id,
args.bucket,
args.input_format,
args.text_delimiter,
args.match_glob,
args.minimum_object_create_time,
)
elif args.command == "update_kinesis_ingestion":
update_topic_type(
args.project_id,
Expand Down
34 changes: 34 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,40 @@ def test_create_topic_with_kinesis_ingestion(
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_topic_with_cloud_storage_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

bucket = "pubsub-cloud-storage-bucket"
input_format = "text"
text_delimiter = ","
match_glob = "**.txt"
minimum_object_create_time = "1970-01-01T00:00:01Z"

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic_with_cloud_storage_ingestion(
PROJECT_ID,
TOPIC_ID,
bucket,
input_format,
text_delimiter,
match_glob,
minimum_object_create_time,
)

out, _ = capsys.readouterr()
assert f"Created topic: {topic_path} with Cloud Storage Ingestion Settings" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_update_topic_type(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
google-cloud-pubsub==2.25.0
google-cloud-pubsub==2.26.0
avro==1.12.0
protobuf===4.24.4; python_version == '3.7'
protobuf==5.28.0; python_version >= '3.8'
Expand Down

0 comments on commit b59cc8d

Please sign in to comment.