-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub Publisher #3463
PubSub Publisher #3463
Conversation
def __init__(self, client, topic, settings, autocommit=True): | ||
self._client = client | ||
|
||
# Create a namespace that is owned by the client manager; this |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
raise Exception('Empty queue') | ||
response = self._client.api.publish(self._.topic, self._.messages) | ||
|
||
# FIXME (lukesneeringer): Check for failures; retry. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -1,4 +1,4 @@ | |||
# Copyright 2016 Google Inc. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Args: | ||
message_id (str): The message ID, as a string. | ||
""" | ||
for callback in self._callbacks: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
def retry(func, delay=0, count=0, err=None, **kwargs): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
__VERSION__ = pkg_resources.get_distribution('google-cloud-pubsub').version | ||
|
||
|
||
class SubscriberClient(subscriber_client.SubscriberClient): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
__all__ = ( | ||
'PublisherClient', | ||
'SubscriberClient', |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:class:`google.cloud.pubsub_v1.PublisherClient`. | ||
|
||
Args: | ||
client (:class:`google.cloud.pubsub_v1.PublisherClient`): The |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
inferred from this. | ||
topic (str): The topic. The format for this is | ||
``projects/{project}/topics/{topic}``. | ||
settings (:class:`google.cloud.pubsub_v1.types.Batching`): The |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Create a namespace that is owned by the client manager; this | ||
# is necessary to be able to have these values be communicable between | ||
# processes. | ||
self._ = self.manager.Namespace() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self._.messages = self.manager.list() | ||
self._.message_ids = self.manager.dict() | ||
self._.settings = settings | ||
self._.status = 'accepting messages' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Future: An object conforming to the ``concurrent.futures.Future`` | ||
interface. | ||
""" | ||
# Sanity check: Is the data being sent as a bytestring? |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
return f | ||
|
||
|
||
# Make a fake batch. This is used by the client to do single-op checks |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def commit(self): | ||
"""Actually publish all of the messages on the active batch. | ||
|
||
This moves the batch out from being the active batch to an in-flight |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Args: | ||
message_id (str): The message ID, as a string. | ||
""" | ||
for callback in self._callbacks: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
import functools | ||
|
||
|
||
def add_methods(SourceClass, blacklist=()): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:class:`google.cloud.pubsub_v1.PublisherClient`. | ||
|
||
Args: | ||
client (:class:`google.cloud.pubsub_v1.PublisherClient`): The |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
message to be published is received; subsequent messages are added to | ||
that batch until the process of actual publishing _starts_. | ||
|
||
Once this occurs, any new messages sent to ``publish`` open a new batch. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Future: An object conforming to the ``concurrent.futures.Future`` | ||
interface. | ||
""" | ||
# Sanity check: Is the data being sent as a bytestring? |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:class:`~gapic.pubsub.v1.publisher_client.PublisherClient`. | ||
Generally, you should not need to set additional keyword arguments. | ||
""" | ||
_gapic_class = publisher_client.PublisherClient |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
# If there is no matching batch yet, then potentially create one | ||
# and place it on the batches dictionary. | ||
if self._batches.get(topic, FAKE).status != 'accepting messages': |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
import queue | ||
import uuid | ||
import time |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Also rename base.BaseBatch to base.Batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good, some small things and nits.
""" | ||
raise NotImplementedError | ||
|
||
class Status(object): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -0,0 +1,130 @@ | |||
# Copyright 2017, Google Inc. All rights reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
raise NotImplementedError | ||
|
||
@property |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# commit when the max latency is reached. | ||
self._thread = None | ||
if autocommit and self._settings.max_latency < float('inf'): | ||
self._thread = threading.Thread(target=self.monitor) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
version, which calls this one. | ||
""" | ||
# Update the status. | ||
self._status = 'in-flight' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if isinstance(v, six.binary_type): | ||
attrs[k] = v.decode('utf-8') | ||
continue | ||
raise TypeError('All attributes being published to Pub/Sub must ' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pass | ||
|
||
|
||
class TimeoutError(GoogleAPIError): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
return False | ||
|
||
def running(self): | ||
"""Publishes in Pub/Sub currently may not be canceled. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
# Wait a little while and try again. | ||
time.sleep(_wait) | ||
return self.exception( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
) | ||
|
||
|
||
names = ['BatchSettings', 'FlowControl'] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@jonparrott I think this is good other than the inner Status class that I asked about. |
|
Got it. GitHub obscured that and I missed it. |
Also, I think this knocks out everything on this PR. Is that accurate? |
Intentionally bypassing CircleCI because:
|
def add_methods(source_class, blacklist=()): | ||
"""Add wrapped versions of the `api` member's methods to the class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukesneeringer @theacodes @dhermes
Hey folks, happy 2022!
This is a long shot, as the PR is more than 4 years old, but does any of you remember what problem this decorator tried to solve? Why the generated client classes were not directly subclassed by the hand-written classes and instead kept an instance of the generated class in the .api
attribute?
The thing is that type checkers issue a lot of complaints about non-existing methods, because they simply cannot know that a lot of these methods are dynamically injected at import time.
We would thus like to subclass the generated classes directly, already have a POC working, but we would like to know if there are any significant downsides to it, if anyone remembers?
Thanks!
@jonparrott @dhermes
This is what I assert to be a mostly complete publisher for Pub/Sub (minus error checking, which is a pretty big caveat, and tests, which is also a pretty big caveat). I would like feedback on this much before I move forward, since (as we all know) multiprocessing is tricky.
I am about to add some notes inline at specific places, but obviously feel free to comment on any of it.