Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Publisher #3463

Merged
merged 44 commits into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f784121
[WIP] Getting started on Pub/Sub.
May 25, 2017
c8a9fd6
stuff
May 25, 2017
9b09b8f
WIP
May 25, 2017
8f0832e
wip
lukesneeringer May 25, 2017
02d7658
WIP
May 30, 2017
3a3ebe9
Merge branch 'pubsub' of github.com:GoogleCloudPlatform/google-cloud-…
May 30, 2017
503d11f
WIP, fixing bugs.
May 30, 2017
3b21b93
WIP
May 30, 2017
1e879a1
wip
May 31, 2017
4bf0552
wip
May 31, 2017
c6dc098
wip
May 31, 2017
13821a7
wip
May 31, 2017
c2d7af8
wip
Jun 1, 2017
4c0bf84
Merge branch 'public-master' into pubsub
Jun 1, 2017
783e9dd
Merge branch 'pubsub' into pubsub-publisher
Jun 2, 2017
7dd719f
Clean up a couple small things.
Jun 2, 2017
c1042ac
A couple more small fixes.
Jun 2, 2017
ccaa865
WIP
Jun 2, 2017
f2ee4d4
Rework based on @jonparrott concurrency ideas.
Jun 2, 2017
12d5546
Refactor the batching implementation.
Jun 2, 2017
e99d959
Remove unrelated files.
Jun 2, 2017
4774f2a
wip
lukesneeringer Jun 3, 2017
1a53c37
Honor size and message count limits.
lukesneeringer Jun 15, 2017
9a6b7cb
Update publisher to be thread-based.
Jul 20, 2017
b0f03a8
Merge branch 'public-master' into pubsub-publisher
Aug 21, 2017
dfe52ef
Merge branch 'pubsub' into pubsub-publisher
Aug 21, 2017
f553fd6
Add better Batch docstring.
Aug 21, 2017
356749a
Improve the max latency thread comments.
Aug 21, 2017
8242c9d
Collapse property docstrings.
Aug 21, 2017
db87dab
More @jonparrott feedback.
Aug 21, 2017
58072b8
Remove the client as a public item in the base batch.
Aug 21, 2017
8f67488
Remove the rejection batch.
Aug 21, 2017
a86d9b7
Lock batch acquisition.
Aug 21, 2017
df18615
Alter exception superclass.
Aug 21, 2017
5f0549b
Inherit from google.api.core.future.Future.
Aug 21, 2017
101d9ca
Move to @jonparrott's Future interface.
Aug 21, 2017
e6e58bb
Move Future off into its own module.
Aug 21, 2017
9fd490c
Add is not None.
Aug 21, 2017
469400e
Feedback.
Aug 22, 2017
31c0c81
Use concurrent.futures.TimeoutError where possible.
Aug 22, 2017
04ac278
Add a lock on commit to ensure no duplication.
Aug 22, 2017
37ee908
Move to an Event.
Aug 22, 2017
12d44be
Make _names private.
Aug 22, 2017
6fe1309
Pubsub subscriber (#3637)
lukesneeringer Aug 24, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pubsub/google/cloud/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1 import types


__all__ = (
'PublisherClient',
'SubscriberClient',
'types',
)
25 changes: 25 additions & 0 deletions pubsub/google/cloud/pubsub_v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import PublisherClient
from google.cloud.pubsub_v1.subscriber import SubscriberClient

__all__ = (
'PublisherClient',
'SubscriberClient',

This comment was marked as spam.

'types',
)
73 changes: 73 additions & 0 deletions pubsub/google/cloud/pubsub_v1/_gapic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import functools


def add_methods(source_class, blacklist=()):
"""Add wrapped versions of the `api` member's methods to the class.
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukesneeringer @theacodes @dhermes

Hey folks, happy 2022!

This is a long shot, as the PR is more than 4 years old, but does any of you remember what problem this decorator tried to solve? Why the generated client classes were not directly subclassed by the hand-written classes and instead kept an instance of the generated class in the .api attribute?

The thing is that type checkers issue a lot of complaints about non-existing methods, because they simply cannot know that a lot of these methods are dynamically injected at import time.

We would thus like to subclass the generated classes directly, already have a POC working, but we would like to know if there are any significant downsides to it, if anyone remembers?

Thanks!


Any methods passed in `blacklist` are not added.
Additionally, any methods explicitly defined on the wrapped class are
not added.
"""
def wrap(wrapped_fx):
"""Wrap a GAPIC method; preserve its name and docstring."""
# If this is a static or class method, then we need to *not*
# send self as the first argument.
#
# Similarly, for instance methods, we need to send self.api rather
# than self, since that is where the actual methods were declared.
instance_method = True
self = getattr(wrapped_fx, '__self__', None)
if issubclass(type(self), type):
instance_method = False

# Okay, we have figured out what kind of method this is; send
# down the correct wrapper function.
if instance_method:
fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)
return functools.wraps(wrapped_fx)(fx)
fx = lambda self, *a, **kw: wrapped_fx(*a, **kw)
return functools.wraps(wrapped_fx)(fx)

def actual_decorator(cls):
# Reflectively iterate over most of the methods on the source class
# (the GAPIC) and make wrapped versions available on this client.
for name in dir(source_class):
# Ignore all private and magic methods.
if name.startswith('_'):
continue

# Ignore anything on our blacklist.
if name in blacklist:
continue

# Retrieve the attribute, and ignore it if it is not callable.
attr = getattr(source_class, name)
if not callable(attr):
continue

# Add a wrapper method to this object.
fx = wrap(getattr(source_class, name))
setattr(cls, name, fx)

# Return the augmented class.
return cls

# Simply return the actual decorator; this is returned from this method
# and actually used to decorate the class.
return actual_decorator
22 changes: 22 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2017, Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

from google.cloud.pubsub_v1.publisher.client import Client

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



__all__ = (
'Client',
)
File renamed without changes.
171 changes: 171 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright 2017, Google Inc. All rights reserved.

This comment was marked as spam.

This comment was marked as spam.

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import abc
import collections

import six


@six.add_metaclass(abc.ABCMeta)
class BaseBatch(object):

This comment was marked as spam.

"""The base batching class for Pub/Sub publishing.

Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based
on :class:`threading.Thread`, is fine for most cases, advanced
users may need to implement something based on a different concurrency
model.

This class defines the interface for the Batch implementation;
subclasses may be passed as the ``batch_class`` argument to
:class:`~.pubsub_v1.client.PublisherClient`.
"""
def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)

@property
@abc.abstractmethod
def client(self):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Return the client used to create this batch.

Returns:
~.pubsub_v1.client.PublisherClient: A publisher client.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def client(self):
"""Return the client used to create this batch.

Returns:
~.pubsub_v1.client.PublisherClient: A publisher client.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def messages(self):
"""Return the messages currently in the batch.

Returns:
Sequence: The messages currently in the batch.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def size(self):
"""Return the total size of all of the messages currently in the batch.

Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def settings(self):
"""Return the batch settings.

Returns:
~.pubsub_v1.types.BatchSettings: The batch settings. These are
considered immutable once the batch has been opened.
"""
raise NotImplementedError

@property

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@abc.abstractmethod
def status(self):
"""Return the status of this batch.

Returns:
str: The status of this batch. All statuses are human-readable,
all-lowercase strings. The ones represented in the
:class:`BaseBatch.Status` enum are special, but other statuses
are permitted.
"""
raise NotImplementedError

def will_accept(self, message):
"""Return True if the batch is able to accept the message.

Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.

Returns:
bool: Whether this batch can accept the message.
"""
# If this batch is not accepting messages generally, return False.
if self.status != self.Status.ACCEPTING_MESSAGES:
return False

# If this batch can not hold the message in question, return False.
if self.size + message.ByteSize() > self.settings.max_bytes:
return False

# Okay, everything is good.
return True

@abc.abstractmethod
def publish(self, message):
"""Publish a single message.

Add the given message to this object; this will cause it to be
published once the batch either has enough messages or a sufficient
period of time has elapsed.

This method is called by :meth:`~.PublisherClient.publish`.

Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.

Returns:
~.pubsub_v1.publisher.batch.mp.Future: An object conforming to the
:class:`concurrent.futures.Future` interface.
"""
raise NotImplementedError

class Status(object):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""An enum class representing valid statuses for a batch.

It is acceptable for a class to use a status that is not on this
class; this represents the list of statuses where the existing
library hooks in functionality.
"""
ACCEPTING_MESSAGES = 'accepting messages'
ERROR = 'error'
SUCCESS = 'success'


class RejectionBatch(object):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""A fake batch-like object that refuses to accept any message.

This is used by the client to do single-op checks for batch
existence.
"""
def will_accept(self, message):
"""Return False.

Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.

Returns:
bool: Whether this batch can accept the message. It never can.
"""
return False
Loading