Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Publisher: sometime cannot publish until a number of publish called #4607

Closed
FcrbPeter opened this issue Dec 18, 2017 · 3 comments
Closed
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: question Request for information or clarification. Not an issue.

Comments

@FcrbPeter
Copy link

Ubuntu 16.04
Python 2.7.12
google-cloud-pubsub (0.29.4)

I am working on an IM message forwarding server. However, sometimes the Pubsub cannot publish message to topic until the server pushed some more times, like 3-4 times. Am I using the function wrong way or settings problem?

The flow of the case:

  1. Receives the messages with a user id.
  2. Save the message with userid, receive time to Datastore.
  3. Publish a message to topic with userid as data
  4. I set a cron job run once per minute, which fetch the un-executed message from Datastore
  5. The message finally push to topic after about 3 minutes. (which pushed about 3-4 times)

The flow log would be like this

INFO 2017-12-18 09:21:23,547 Log Receive Message A from IM
INFO 2017-12-18 09:21:23,656 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- Message A 1
INFO 2017-12-18 09:21:32,381 Log Receive Message B from IM
INFO 2017-12-18 09:21:32,445 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- Message B 1
INFO 2017-12-18 09:22:01,598 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- From cron job, Message A 2
INFO 2017-12-18 09:22:01,603 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- From cron job, Message B 2
INFO 2017-12-18 09:23:01,825 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- From cron job, Message A 3
INFO 2017-12-18 09:23:01,832 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- From cron job, Message B 3
INFO 2017-12-18 09:23:02,351 Log task sent. 12192324138020  /-- Future returned message id   1
INFO 2017-12-18 09:23:02,351 Log task sent. 12192428992705  \-- Message A & B
INFO 2017-12-18 09:23:02,659 Log processMessage  <-- Received a push subscriber message, process it, Message A
INFO 2017-12-18 09:23:02,827 Log task sent. 12192721150677  /-- Future returned message id   2
INFO 2017-12-18 09:23:02,950 Log task sent. 12192897418694  \-- Message A & B
INFO 2017-12-18 09:23:04,399 Log start send task with user ot0qZ09oyCzWOQR5gy3R1qIb4rGg  <-- Message B
INFO 2017-12-18 09:23:04,498 Log processMessage  <-- Received a push subscriber message, process it, Message B
INFO 2017-12-18 09:23:04,511 Log task sent. 12193182125391  <-- Future returned message id, Message B   3

Here is the code to push the message to topic.
The method 'sendTask' is used from both receiving IM message case and cron job.

def onTaskPushed( future ):
	message_id = future.result()
	Log.info( 'task sent. ' + message_id )

def sendTask( userId ):
	Log.info('start send task with user ' + userId)

	project_id = get_project_id() # get project id
	publisher = pubsub_v1.PublisherClient()
	topic_path = publisher.topic_path(project_id, 'tasks')

	data = {}
	data['user_id'] = userId
	data['time'] = Time.time() # get unix time in second
	data = json.dumps( data )

	data = data.encode( 'utf-8' )
	future = publisher.publish(topic_path, data=data )
	future.add_done_callback(onTaskPushed)
@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Dec 18, 2017
@dhermes
Copy link
Contributor

dhermes commented Dec 18, 2017

@FcrbPeter Calling PublisherClient.publish adds to a batch, but doesn't necessarily commit that batch. The batch will be committed in the background after the default max_latency==0.05s has elapsed. If you'd like to commit the batch immediately you could do something like:

from google.cloud.pubsub_v1 import types

message = types.PubsubMessage(data=data)
batch = publisher.batch(topic_path, message)
batch.publish(message)
batch.commit()

Does this answer your question? Do you think there is a particular place in the documentation where we could make this clearer?

@dhermes dhermes added documentation priority: p2 Moderately-important priority. Fix may not be included in next release. type: question Request for information or clarification. Not an issue. labels Dec 18, 2017
@FcrbPeter
Copy link
Author

FcrbPeter commented Dec 19, 2017

@dhermes, thanks for the reply.

After reading the code about the batch, I found that the batch need to use threading.
The project is using uWSGI to run the python code and talk to Nginx. However, by default, uWSGI will not run any threads.
So, it is solved by enabling the enable-threads option in uWSGI.

I think it is good to mention the usage of the threading in the documentation.

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2017

@FcrbPeter Definitely a good idea! I am hoping to see how easy it is to make it do-able to have alternate implementations, i.e. ones that don't use threads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

5 participants