diff --git a/samples/snippets/schema_test.py b/samples/snippets/schema_test.py index f0a4470f7..1e0dc8f1a 100644 --- a/samples/snippets/schema_test.py +++ b/samples/snippets/schema_test.py @@ -17,6 +17,7 @@ import os import uuid +from flaky import flaky from google.api_core.exceptions import NotFound from google.cloud.pubsub import PublisherClient, SchemaServiceClient, SubscriberClient from google.pubsub_v1.types import Encoding @@ -251,6 +252,7 @@ def test_subscribe_with_proto_schema( assert "Received a binary-encoded message" in out +@flaky(max_runs=3, min_passes=1) def test_delete_schema(proto_schema, capsys): schema.delete_schema(PROJECT_ID, PROTO_SCHEMA_ID) out, _ = capsys.readouterr() diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index d01860cf8..011414296 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -499,7 +499,7 @@ def receive_messages_with_blocking_shutdown(project_id, subscription_id, timeout def callback(message): print(f"Received {message.data}.") - time.sleep(timeout + 5.0) # Pocess longer than streaming pull future timeout. + time.sleep(timeout + 3.0) # Pocess longer than streaming pull future timeout. message.ack() print(f"Done processing the message {message.data}.") diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 8d034949d..20355fe2b 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -425,21 +425,26 @@ def test_receive_with_blocking_shutdown( if re.search(r".*done waiting.*stream shutdown.*", line, flags=re.IGNORECASE) ] - assert "Listening" in out - assert subscription_async in out + try: + assert "Listening" in out + assert subscription_async in out - assert len(stream_canceled_lines) == 1 - assert len(shutdown_done_waiting_lines) == 1 - assert len(msg_received_lines) == 3 - assert len(msg_done_lines) == 3 + assert len(stream_canceled_lines) == 1 + assert len(shutdown_done_waiting_lines) == 1 + assert len(msg_received_lines) == 3 + assert len(msg_done_lines) == 3 - # The stream should have been canceled *after* receiving messages, but before - # message processing was done. - assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] + # The stream should have been canceled *after* receiving messages, but before + # message processing was done. + assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] - # Yet, waiting on the stream shutdown should have completed *after* the processing - # of received messages has ended. - assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] + # Yet, waiting on the stream shutdown should have completed *after* + # the processing of received messages has ended. + assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] + except AssertionError: # pragma: NO COVER + from pprint import pprint + pprint(out_lines) # To make possible flakiness debugging easier. + raise def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): @@ -464,6 +469,7 @@ def test_receive_synchronously(publisher_client, topic, subscription_sync, capsy assert f"{subscription_sync}" in out +@flaky(max_runs=3, min_passes=1) def test_receive_synchronously_with_lease( publisher_client, topic, subscription_sync, capsys ):