Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests.system.TestStreamingPull: test_streaming_pull_max_messages[rest-rest] failed #953

Closed
flaky-bot bot opened this issue Jul 14, 2023 · 2 comments
Closed
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. flakybot: flaky Tells the Flaky Bot not to close or comment on this issue. flakybot: issue An issue filed by the Flaky Bot. Should not be added manually. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@flaky-bot
Copy link

flaky-bot bot commented Jul 14, 2023

This test failed!

To configure my behavior, see the Flaky Bot documentation.

If I'm commenting on this issue too often, add the flakybot: quiet label and
I will stop commenting.


commit: 26d2b39
buildURL: Build Status, Sponge
status: failed

Test output
self = 
publisher = 
topic_path_base = 'projects/precise-truck-742/topics/t-1689361995945'
subscription_path_base = 'projects/precise-truck-742/subscriptions/s-1689361995947'
cleanup = [(>, ()...39ab0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1689361995947-streaming-pull-max-messages'})]
def test_streaming_pull_max_messages(
    self, publisher, topic_path_base, subscription_path_base, cleanup
):
    subscriber = pubsub_v1.SubscriberClient(transport="grpc")
    custom_str = "-streaming-pull-max-messages"
    topic_path = topic_path_base + custom_str
    subscription_path = subscription_path_base + custom_str
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # create a topic and subscribe to it
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)

    batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1)  # total: 50
    _publish_messages(publisher, topic_path, batch_sizes=batch_sizes)

    # now subscribe and do the main part, check for max pending messages
    total_messages = sum(batch_sizes)
    flow_control = types.FlowControl(max_messages=5)
    callback = StreamingPullCallback(
        processing_time=1, resolve_at_msg_count=total_messages
    )

    subscription_future = subscriber.subscribe(
        subscription_path, callback, flow_control=flow_control
    )

    # Expected time to process all messages in ideal case:
    #     (total_messages / FlowControl.max_messages) * processing_time
    #
    # With total=50, max messages=5, and processing_time=1 this amounts to
    # 10 seconds (+ overhead), thus a full minute should be more than enough
    # for the processing to complete. If not, fail the test with a timeout.
    try:
      callback.done_future.result(timeout=60)

tests/system.py:654:


self = None, timeout = 60

def result(self, timeout=None):
    """Return the result of the call that the future represents.

    Args:
        timeout: The number of seconds to wait for the result if the future
            isn't done. If None, then there is no limit on the wait time.

    Returns:
        The result of the call that the future represents.

    Raises:
        CancelledError: If the future was cancelled.
        TimeoutError: If the future didn't finish executing before the given
            timeout.
        Exception: If the call raised then that exception will be raised.
    """
    try:
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
              raise TimeoutError()

E concurrent.futures._base.TimeoutError

/usr/local/lib/python3.10/concurrent/futures/_base.py:448: TimeoutError

During handling of the above exception, another exception occurred:

self = <tests.system.TestStreamingPull object at 0x7f55475b28f0>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7f5544354a60>
topic_path_base = 'projects/precise-truck-742/topics/t-1689361995945'
subscription_path_base = 'projects/precise-truck-742/subscriptions/s-1689361995947'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7f5544354a60>>, ()...39ab0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1689361995947-streaming-pull-max-messages'})]

def test_streaming_pull_max_messages(
    self, publisher, topic_path_base, subscription_path_base, cleanup
):
    subscriber = pubsub_v1.SubscriberClient(transport="grpc")
    custom_str = "-streaming-pull-max-messages"
    topic_path = topic_path_base + custom_str
    subscription_path = subscription_path_base + custom_str
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # create a topic and subscribe to it
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)

    batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1)  # total: 50
    _publish_messages(publisher, topic_path, batch_sizes=batch_sizes)

    # now subscribe and do the main part, check for max pending messages
    total_messages = sum(batch_sizes)
    flow_control = types.FlowControl(max_messages=5)
    callback = StreamingPullCallback(
        processing_time=1, resolve_at_msg_count=total_messages
    )

    subscription_future = subscriber.subscribe(
        subscription_path, callback, flow_control=flow_control
    )

    # Expected time to process all messages in ideal case:
    #     (total_messages / FlowControl.max_messages) * processing_time
    #
    # With total=50, max messages=5, and processing_time=1 this amounts to
    # 10 seconds (+ overhead), thus a full minute should be more than enough
    # for the processing to complete. If not, fail the test with a timeout.
    try:
        callback.done_future.result(timeout=60)
    except exceptions.TimeoutError:
      pytest.fail(
            "Timeout: receiving/processing streamed messages took too long."
        )

E Failed: Timeout: receiving/processing streamed messages took too long.

tests/system.py:656: Failed

@flaky-bot flaky-bot bot added flakybot: issue An issue filed by the Flaky Bot. Should not be added manually. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jul 14, 2023
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jul 14, 2023
@flaky-bot flaky-bot bot added the flakybot: flaky Tells the Flaky Bot not to close or comment on this issue. label Jul 14, 2023
@flaky-bot
Copy link
Author

flaky-bot bot commented Jul 14, 2023

Looks like this issue is flaky. 😟

I'm going to leave this open and stop commenting.

A human should fix and close this.


When run at the same commit (26d2b39), this test passed in one build (Build Status, Sponge) and failed in another build (Build Status, Sponge).

@acocuzzo acocuzzo added priority: p2 Moderately-important priority. Fix may not be included in next release. and removed priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Jul 24, 2023
@liuyunnnn liuyunnnn assigned pradn and unassigned liuyunnnn Oct 24, 2023
@parthea
Copy link
Contributor

parthea commented Dec 11, 2023

Closing as obsolete. Flaky bot will re-open if the test is still flaky

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. flakybot: flaky Tells the Flaky Bot not to close or comment on this issue. flakybot: issue An issue filed by the Flaky Bot. Should not be added manually. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

4 participants