Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in subscribe - google cloud pubsub 0.35.4 #5792

Closed
letusfly85 opened this issue Aug 13, 2018 · 4 comments
Closed

Exception in subscribe - google cloud pubsub 0.35.4 #5792

letusfly85 opened this issue Aug 13, 2018 · 4 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@letusfly85
Copy link

Summary:

Hi, thank you for the awesome Google Cloud Pub/Sub python library. 😃
I have a question about this library.

  • running the publishing process from last week by using Google Cloud Pub/Sub.
  • running a subscriber process by using Google Cloud Pub/Sub.
  • After 8 ~ 9 hours from starting subscribing, I got the following stdout.
  • I used logger and try~except, however, I couldn't handle to logging them.
  • I want to continue subscribing after I got errors, how should I do? 👀

Environment:

  • Ubuntu 18.04 (Docker)
  • Python version (Python 3.6.5 :: Anaconda, Inc..)
  • PubSub 0.35.4

Warning and Error Stdout:

WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
WARNING:google.cloud.pubsub_v1.subscriber._protocol.leaser:Dropping 1 items because they were leased too long.
ERROR:root:Exception iterating requests!
Traceback (most recent call last):
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/grpc/_channel.py", line 165, in consume_request_iterator
    request = next(request_iterator)
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 100, in __iter__
    yield self._initial_request()
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 380, in _get_initial_request
    lease_ids = self._leaser.ack_ids
AttributeError: 'NoneType' object has no attribute 'ack_ids'
ERROR:root:Exception iterating requests!
Traceback (most recent call last):
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/grpc/_channel.py", line 165, in consume_request_iterator
    request = next(request_iterator)
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 100, in __iter__
    yield self._initial_request()
  File "/root/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 380, in _get_initial_request
    lease_ids = self._leaser.ack_ids

requirements.txt:

google-cloud-translate==1.3.1                                                                                                                                
google-cloud-videointelligence==1.0.1
google-cloud-vision==0.29.0
google-gax==0.15.16
google-resumable-media==0.3.1
googleapis-common-protos==1.5.3
GPy==1.9.2
grpc-google-iam-v1==0.11.4
grpcio==1.12.0
h5py==2.8.0
html5lib==0.9999999
httplib2==0.11.3
idna==2.6
ipykernel==4.8.2
ipython==6.4.0
ipython-genutils==0.2.0
jedi==0.12.0
jsonlines==1.2.0
jsonpickle==0.9.6
jsonref==0.1
jsonschema==2.6.0
google-api-core==1.3.0
google-auth==1.5.0
google-cloud==0.33.1
google-cloud-bigquery==1.5.0
google-cloud-bigquery-datatransfer==0.1.1
google-cloud-bigtable==0.28.1
google-cloud-container==0.1.1
google-cloud-core==0.28.1
google-cloud-datastore==1.4.0
google-cloud-dns==0.28.0
google-cloud-error-reporting==0.28.0
google-cloud-firestore==0.28.0
google-cloud-language==1.0.2
google-cloud-logging==1.4.0
google-cloud-monitoring==0.28.1
google-cloud-pubsub==0.35.4
google-cloud-resource-manager==0.28.1
google-cloud-runtimeconfig==0.28.1
google-cloud-spanner==0.29.0
google-cloud-speech==0.30.0
google-cloud-storage==1.6.0
google-cloud-trace==0.17.0
google-cloud-translate==1.3.1
google-cloud-videointelligence==1.0.1

Program:

from threading import Thread, Event
from google.cloud import pubsub
import os
import json
from logging import ERROR, INFO
import logging

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/my_credential.json"


class MyLogger:

    @staticmethod
    def generate_logger(clazz_name):
        logger = logging.getLogger(clazz_name)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s')

        file_handler = logging.FileHandler('log/subscriber.log')
        file_handler.setLevel(INFO)
        file_handler.setFormatter(formatter)
        file_handler.addFilter(LoggingFilter(ERROR))
        logger.addHandler(file_handler)

        logger.setLevel(INFO)
        logger.propagate = False

        return logger


class LoggingFilter(object):
    def __init__(self, level):
        self.__level = level

    def filter(self, log_record):
        return log_record.levelno <= self.__level


class SubscribeThread(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.logger = MyLogger.generate_logger('subscribe_thread')
        self.latest_data = {}
        self.event = Event()

        self.subscriber = pubsub.SubscriberClient()
        project_name = '************'
        self.topic_name = 'projects/{}/topics/***********'.format(project_name)
        subscription_path = '***********'

        try:
            self.logger.info('deleting exists subscription')
            subscription = self.subscriber.subscription_path(project_name, subscription_path)
            self.subscriber.delete_subscription(subscription)
        except Exception as e:
            self.logger.error(e)

        self.subscription_name = 'projects/{}/subscriptions/{}'.format(project_name, subscription_path)
        try:
            self.logger.info('creating subscription')
            self.subscriber.create_subscription(name=self.subscription_name, topic=self.topic_name)

            self.logger.info('start subscription')
            self.subscriber.subscribe(self.subscription_name, self.callback)
        except Exception as e:
            self.logger.error(e)

    def callback(self, message):
        try:
            data = json.loads(message.data.decode())
            self.latest_data = data
            self.event.set()
            # message.ack()
        except Exception as e:
            self.logger.error(e)

    def get_latest_data(self):
        return self.latest_data

    def run(self):
        while True:
            try:
                while not self.event.wait(3):
                    self.logger.info('waiting subscriber timed out..')
                    self.event.clear()
                    raise RuntimeError("waiting subscriber timed out..")

                latest_data = self.get_latest_data()
                # NOTE: do something with latest_data

                self.event.clear()

            except Exception as e:
                self.logger.error(e)
                self.subscriber = pubsub.SubscriberClient()
                self.subscriber.subscribe(self.subscription_name, self.callback)


subscriber_thread = SubscribeThread()
subscriber_thread.start()

If you want more info, please let me know.

@JustinBeckwith JustinBeckwith added the triage me I really want to be triaged. label Aug 14, 2018
@letusfly85
Copy link
Author

Sorry, maybe this is my mistake, I haven't called ack() method for each message.
So, a lot of messages had come to the callback process, and CPU and Network Input increased.

Now, I'm trying to above program with ack().
If tomorrow, I don't see the same error, I want to close this issue 🙇

@letusfly85
Copy link
Author

However, if I can, I want to handle such error on the user code 💦

@theacodes
Copy link
Contributor

Yeah, we should fix that codepath so it doesn't totally choke, but for sure the biggest issue here is not acking or nacking your message.

If you want to be able to handle errors, you need to keep track of the future returned by subscribe and call result() on it, for example:

future = self.subscriber.subscribe(...)
future.result() # blocks forever or until an exception occurs and then raises that exception

theacodes added a commit that referenced this issue Aug 14, 2018
There's a rare case where the stream can be restarted while the streaming pull
manager is shutting down. This causes get_initial_request to be called while
the manager is in a bad state, which will trigger an AttributeError when
attempting to read the list of outstanding Ack IDs from the leaser.

Closes #5792
@theacodes theacodes added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. and removed triage me I really want to be triaged. labels Aug 14, 2018
@theacodes theacodes self-assigned this Aug 14, 2018
theacodes added a commit that referenced this issue Aug 14, 2018
There's a rare case where the stream can be restarted while the streaming pull
manager is shutting down. This causes get_initial_request to be called while
the manager is in a bad state, which will trigger an AttributeError when
attempting to read the list of outstanding Ack IDs from the leaser.

Closes #5792
@letusfly85
Copy link
Author

@theacodes

Thank you for replying.
Now, I don't get any errors.

And thank you for the snippets!

I'll also try to use when I want to block messages.

AVaksman pushed a commit to AVaksman/google-cloud-python that referenced this issue Aug 16, 2018
…is#5803)

There's a rare case where the stream can be restarted while the streaming pull
manager is shutting down. This causes get_initial_request to be called while
the manager is in a bad state, which will trigger an AttributeError when
attempting to read the list of outstanding Ack IDs from the leaser.

Closes googleapis#5792
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

3 participants