-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub Publisher #3463
PubSub Publisher #3463
Changes from 23 commits
f784121
c8a9fd6
9b09b8f
8f0832e
02d7658
3a3ebe9
503d11f
3b21b93
1e879a1
4bf0552
c6dc098
13821a7
c2d7af8
4c0bf84
783e9dd
7dd719f
c1042ac
ccaa865
f2ee4d4
12d5546
e99d959
4774f2a
1a53c37
9a6b7cb
b0f03a8
dfe52ef
f553fd6
356749a
8242c9d
db87dab
58072b8
8f67488
a86d9b7
df18615
5f0549b
101d9ca
e6e58bb
9fd490c
469400e
31c0c81
04ac278
37ee908
12d44be
6fe1309
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Copyright 2017, Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
|
||
from google.cloud.pubsub_v1 import PublisherClient | ||
from google.cloud.pubsub_v1 import SubscriberClient | ||
from google.cloud.pubsub_v1 import types | ||
|
||
|
||
__all__ = ( | ||
'PublisherClient', | ||
'SubscriberClient', | ||
'types', | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Copyright 2017, Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
|
||
from google.cloud.pubsub_v1 import types | ||
from google.cloud.pubsub_v1.publisher import PublisherClient | ||
from google.cloud.pubsub_v1.subscriber import SubscriberClient | ||
|
||
__all__ = ( | ||
'PublisherClient', | ||
'SubscriberClient', | ||
'types', | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# Copyright 2017, Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
|
||
import functools | ||
|
||
|
||
def add_methods(source_class, blacklist=()): | ||
"""Add wrapped versions of the `api` member's methods to the class. | ||
Comment on lines
+20
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lukesneeringer @theacodes @dhermes Hey folks, happy 2022! This is a long shot, as the PR is more than 4 years old, but does any of you remember what problem this decorator tried to solve? Why the generated client classes were not directly subclassed by the hand-written classes and instead kept an instance of the generated class in the The thing is that type checkers issue a lot of complaints about non-existing methods, because they simply cannot know that a lot of these methods are dynamically injected at import time. We would thus like to subclass the generated classes directly, already have a POC working, but we would like to know if there are any significant downsides to it, if anyone remembers? Thanks! |
||
|
||
Any methods passed in `blacklist` are not added. | ||
Additionally, any methods explicitly defined on the wrapped class are | ||
not added. | ||
""" | ||
def wrap(wrapped_fx): | ||
"""Wrap a GAPIC method; preserve its name and docstring.""" | ||
# If this is a static or class method, then we need to *not* | ||
# send self as the first argument. | ||
# | ||
# Similarly, for instance methods, we need to send self.api rather | ||
# than self, since that is where the actual methods were declared. | ||
instance_method = True | ||
self = getattr(wrapped_fx, '__self__', None) | ||
if issubclass(type(self), type): | ||
instance_method = False | ||
|
||
# Okay, we have figured out what kind of method this is; send | ||
# down the correct wrapper function. | ||
if instance_method: | ||
fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) | ||
return functools.wraps(wrapped_fx)(fx) | ||
fx = lambda self, *a, **kw: wrapped_fx(*a, **kw) | ||
return functools.wraps(wrapped_fx)(fx) | ||
|
||
def actual_decorator(cls): | ||
# Reflectively iterate over most of the methods on the source class | ||
# (the GAPIC) and make wrapped versions available on this client. | ||
for name in dir(source_class): | ||
# Ignore all private and magic methods. | ||
if name.startswith('_'): | ||
continue | ||
|
||
# Ignore anything on our blacklist. | ||
if name in blacklist: | ||
continue | ||
|
||
# Retrieve the attribute, and ignore it if it is not callable. | ||
attr = getattr(source_class, name) | ||
if not callable(attr): | ||
continue | ||
|
||
# Add a wrapper method to this object. | ||
fx = wrap(getattr(source_class, name)) | ||
setattr(cls, name, fx) | ||
|
||
# Return the augmented class. | ||
return cls | ||
|
||
# Simply return the actual decorator; this is returned from this method | ||
# and actually used to decorate the class. | ||
return actual_decorator |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright 2017, Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from google.cloud.pubsub_v1.publisher.client import PublisherClient | ||
|
||
|
||
__all__ = ( | ||
'PublisherClient', | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# Copyright 2017, Google Inc. All rights reserved. | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import absolute_import | ||
|
||
import abc | ||
import collections | ||
|
||
import six | ||
|
||
|
||
@six.add_metaclass(abc.ABCMeta) | ||
class BaseBatch(object): | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
"""The base batching class for Pub/Sub publishing. | ||
|
||
Although the :class:`~.pubsub_v1.publisher.batch.mp.Batch` class, based | ||
on :class:`multiprocessing.Process`, is fine for most cases, advanced | ||
users may need to implement something based on a different concurrency | ||
model. | ||
|
||
This class defines the interface for the Batch implementation; | ||
subclasses may be passed as the ``batch_class`` argument to | ||
:class:`~.pubsub_v1.client.PublisherClient`. | ||
""" | ||
@property | ||
@abc.abstractmethod | ||
def client(self): | ||
"""Return the client used to create this batch. | ||
|
||
Returns: | ||
~.pubsub_v1.client.PublisherClient: A publisher client. | ||
""" | ||
raise NotImplementedError | ||
|
||
@property | ||
@abc.abstractmethod | ||
def settings(self): | ||
"""Return the settings for this batch. | ||
|
||
Returns: | ||
~.pubsub_v1.types.Batching: The settings for batch | ||
publishing. These should be considered immutable once the batch | ||
has been opened. | ||
""" | ||
raise NotImplementedError | ||
|
||
@property | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
@abc.abstractmethod | ||
def status(self): | ||
"""Return the status of this batch. | ||
|
||
Returns: | ||
str: The status of this batch. All statuses are human-readable, | ||
all-lowercase strings, and represented in the | ||
:class:`BaseBatch.Status` enum. | ||
""" | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def commit(self): | ||
"""Asychronously commit everything in this batch. | ||
|
||
Subclasses must define this as an asychronous method; it may be called | ||
from the primary process by :meth:`check_limits`. | ||
""" | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def publish(self, data, **attrs): | ||
"""Publish a single message. | ||
|
||
.. note:: | ||
Messages in Pub/Sub are blobs of bytes. They are *binary* data, | ||
not text. You must send data as a bytestring | ||
(``bytes`` in Python 3; ``str`` in Python 2), and this library | ||
will raise an exception if you send a text string. | ||
|
||
The reason that this is so important (and why we do not try to | ||
coerce for you) is because Pub/Sub is also platform independent | ||
and there is no way to know how to decode messages properly on | ||
the other side; therefore, encoding and decoding is a required | ||
exercise for the developer. | ||
|
||
Add the given message to this object; this will cause it to be | ||
published once the batch either has enough messages or a sufficient | ||
period of time has elapsed. | ||
|
||
Args: | ||
data (bytes): A bytestring representing the message body. This | ||
must be a bytestring (a text string will raise TypeError). | ||
attrs (Mapping[str, str]): A dictionary of attributes to be | ||
sent as metadata. (These may be text strings or byte strings.) | ||
|
||
Raises: | ||
TypeError: If the ``data`` sent is not a bytestring, or if the | ||
``attrs`` are not either a ``str`` or ``bytes``. | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
Returns: | ||
~.pubsub_v1.publisher.future.Future: An object conforming to the | ||
:class:`concurrent.futures.Future` interface. | ||
""" | ||
raise NotImplementedError | ||
|
||
class Status(object): | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
"""An enum class representing valid statuses for a batch. | ||
|
||
It is acceptable for a class to use a status that is not on this | ||
class; this represents the list of statuses where the existing | ||
library hooks in functionality. | ||
""" | ||
ACCEPTING_MESSAGES = 'accepting messages' | ||
ERROR = 'error' | ||
SUCCESS = 'success' | ||
|
||
|
||
# Make a fake batch. This is used by the client to do single-op checks | ||
# for batch existence. | ||
FakeBatch = collections.namedtuple('FakeBatch', ['status']) | ||
FAKE = FakeBatch(status='fake') |
This comment was marked as spam.
Sorry, something went wrong.