diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index cb00a4b91ecd..fd7473e1e53b 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -17,6 +17,7 @@ import datetime import itertools import operator as op +import os import threading import time @@ -488,6 +489,45 @@ def test_streaming_pull_max_messages( finally: subscription_future.cancel() # trigger clean shutdown + @pytest.mark.skipif( + "KOKORO_GFILE_DIR" not in os.environ, + reason="Requires Kokoro environment with a limited subscriber service account.", + ) + def test_streaming_pull_subscriber_permissions_sufficient( + self, publisher, topic_path, subscriber, subscription_path, cleanup + ): + + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and subscribe to it + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + + # A service account granting only the pubsub.subscriber role must be used. + filename = os.path.join( + os.environ["KOKORO_GFILE_DIR"], "pubsub-subscriber-service-account.json" + ) + streaming_pull_subscriber = type(subscriber).from_service_account_file(filename) + + # Subscribe to the topic, publish a message, and verify that subscriber + # successfully pulls and processes it. + callback = StreamingPullCallback(processing_time=0.01, resolve_at_msg_count=1) + future = streaming_pull_subscriber.subscribe(subscription_path, callback) + self._publish_messages(publisher, topic_path, batch_sizes=[1]) + + try: + callback.done_future.result(timeout=10) + except exceptions.TimeoutError: + pytest.fail( + "Timeout: receiving/processing streamed messages took too long." + ) + else: + assert 1 in callback.seen_message_ids + finally: + future.cancel() + def _publish_messages(self, publisher, topic_path, batch_sizes): """Publish ``count`` messages in batches and wait until completion.""" publish_futures = []