Skip to content

Commit

Permalink
Some lint hygiene for Pub/Sub. (googleapis#4455)
Browse files Browse the repository at this point in the history
- Configured PyLint to ignore generated code
- Addressed all PyLint failures in `subscriber.policy.base`
  • Loading branch information
dhermes authored Nov 27, 2017
1 parent 7a9e4f8 commit 052323d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 38 deletions.
94 changes: 57 additions & 37 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Base class for concurrency policy."""

from __future__ import absolute_import, division

import abc
Expand All @@ -25,7 +27,8 @@
from google.cloud.pubsub_v1.subscriber import _consumer
from google.cloud.pubsub_v1.subscriber import _histogram

logger = logging.getLogger(__name__)

_LOGGER = logging.getLogger(__name__)


@six.add_metaclass(abc.ABCMeta)
Expand All @@ -40,30 +43,31 @@ class BasePolicy(object):
This class defines the interface for the policy implementation;
subclasses may be passed as the ``policy_class`` argument to
:class:`~.pubsub_v1.client.SubscriberClient`.
Args:
client (google.cloud.pubsub_v1.subscriber.client.Client): The
subscriber client used to create this instance.
subscription (str): The name of the subscription. The canonical
format for this is
``projects/{project}/subscriptions/{subscription}``.
flow_control (google.cloud.pubsub_v1.types.FlowControl): The flow
control settings.
histogram_data (dict): Optional: A structure to store the histogram
data for predicting appropriate ack times. If set, this should
be a dictionary-like object.
.. note::
Additionally, the histogram relies on the assumption
that the dictionary will properly sort keys provided
that all keys are positive integers. If you are sending
your own dictionary class, ensure this assumption holds
or you will get strange behavior.
"""

_managed_ack_ids = None

def __init__(self, client, subscription,
flow_control=types.FlowControl(), histogram_data=None):
"""Instantiate the policy.
Args:
client (~.pubsub_v1.subscriber.client): The subscriber client used
to create this instance.
subscription (str): The name of the subscription. The canonical
format for this is
``projects/{project}/subscriptions/{subscription}``.
flow_control (~.pubsub_v1.types.FlowControl): The flow control
settings.
histogram_data (dict): Optional: A structure to store the histogram
data for predicting appropriate ack times. If set, this should
be a dictionary-like object.
.. note::
Additionally, the histogram relies on the assumption
that the dictionary will properly sort keys provided
that all keys are positive integers. If you are sending
your own dictionary class, ensure this assumption holds
or you will get strange behavior.
"""
self._client = client
self._subscription = subscription
self._consumer = _consumer.Consumer(self)
Expand Down Expand Up @@ -103,8 +107,8 @@ def future(self):
"""Return the Future in use, if any.
Returns:
~.pubsub_v1.subscriber.future.Future: A Future conforming to the
``~concurrent.futures.Future`` interface.
google.cloud.pubsub_v1.subscriber.futures.Future: A Future
conforming to the :class:`~concurrent.futures.Future` interface.
"""
return self._future

Expand All @@ -115,7 +119,7 @@ def managed_ack_ids(self):
Returns:
set: The set of ack IDs being managed.
"""
if not hasattr(self, '_managed_ack_ids'):
if self._managed_ack_ids is None:
self._managed_ack_ids = set()
return self._managed_ack_ids

Expand Down Expand Up @@ -184,6 +188,10 @@ def call_rpc(self, request_generator):
request_generator (Generator): A generator that yields requests,
and blocks if there are no outstanding requests (until such
time as there are).
Returns:
Iterable[~google.cloud.pubsub_v1.types.StreamingPullResponse]: An
iterable of pull responses.
"""
return self._client.api.streaming_pull(request_generator)

Expand Down Expand Up @@ -222,9 +230,9 @@ def get_initial_request(self, ack_queue=False):
while the connection was paused.
Returns:
~.pubsub_v1.types.StreamingPullRequest: A request suitable
for being the first request on the stream (and not suitable
for any other purpose).
google.cloud.pubsub_v1.types.StreamingPullRequest: A request
suitable for being the first request on the stream (and not
suitable for any other purpose).
.. note::
If ``ack_queue`` is set to True, this includes the ack_ids, but
Expand Down Expand Up @@ -302,14 +310,14 @@ def maintain_leases(self):
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
p99 = self.histogram.percentile(99)
logger.debug('The current p99 value is %d seconds.' % p99)
_LOGGER.debug('The current p99 value is %d seconds.', p99)

# Create a streaming pull request.
# We do not actually call `modify_ack_deadline` over and over
# because it is more efficient to make a single request.
ack_ids = list(self.managed_ack_ids)
logger.debug('Renewing lease for %d ack IDs.' % len(ack_ids))
if len(ack_ids) > 0 and self._consumer.active:
_LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))
if ack_ids and self._consumer.active:
request = types.StreamingPullRequest(
modify_deadline_ack_ids=ack_ids,
modify_deadline_seconds=[p99] * len(ack_ids),
Expand All @@ -323,7 +331,7 @@ def maintain_leases(self):
# jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
# where there are many clients.
snooze = random.uniform(0.0, p99 * 0.9)
logger.debug('Snoozing lease management for %f seconds.' % snooze)
_LOGGER.debug('Snoozing lease management for %f seconds.', snooze)
time.sleep(snooze)

def modify_ack_deadline(self, ack_id, seconds):
Expand Down Expand Up @@ -351,7 +359,11 @@ def nack(self, ack_id, byte_size=None):

@abc.abstractmethod
def close(self):
"""Close the existing connection."""
"""Close the existing connection.
Raises:
NotImplementedError: Always
"""
raise NotImplementedError

@abc.abstractmethod
Expand All @@ -364,6 +376,9 @@ def on_exception(self, exception):
Args:
exception (Exception): The exception raised by the RPC.
Raises:
NotImplementedError: Always
"""
raise NotImplementedError

Expand All @@ -385,6 +400,9 @@ def on_response(self, response):
Args:
response (Any): The protobuf response from the RPC.
Raises:
NotImplementedError: Always
"""
raise NotImplementedError

Expand All @@ -396,13 +414,15 @@ def open(self, callback):
a :class:`~.pubsub_v1.subscriber.message.Message` as its only
argument.
This method is virtual, but concrete implementations should return
a :class:`~google.api_core.future.Future` that provides an interface
to block on the subscription if desired, and handle errors.
Args:
callback (Callable[Message]): A callable that receives a
Pub/Sub Message.
Returns:
~google.api_core.future.Future: A future that provides
an interface to block on the subscription if desired, and
handle errors.
Raises:
NotImplementedError: Always
"""
raise NotImplementedError
9 changes: 8 additions & 1 deletion pubsub/pylint.config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@

"""This module is used to configure gcp-devrel-py-tools run-pylint."""

import copy

from gcp_devrel.tools import pylint

# Library configuration

# library_additions = {}
# library_replacements = {}
# Ignore generated code
library_replacements = copy.deepcopy(pylint.DEFAULT_LIBRARY_RC_REPLACEMENTS)
library_replacements['MASTER']['ignore'].append('gapic')
library_replacements['MASTER']['ignore'].append('proto')

# Test configuration

Expand Down

0 comments on commit 052323d

Please sign in to comment.