Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer returns Queue full #117

Closed
ghost opened this issue May 14, 2014 · 12 comments
Closed

producer returns Queue full #117

ghost opened this issue May 14, 2014 · 12 comments

Comments

@ghost
Copy link

ghost commented May 14, 2014

If I set
queue.buffering.max.messages=10
and
queue.buffering.max.ms=60000

the producer returns "Queue full" after sending 10 messages . It seems that the producer doesn't send the messages to kafka. I should suppose that when the queue is full ( or maybe 3/4 full) the messages are send to kafka even if queue.buffering.max.ms is not expired.
Maybe it is better to define a max and min value.
When the size of queue is more than buffering.min then producer tries to send the data to kafka

@edenhill
Copy link
Contributor

Did you set batch.num.messages accordingly too? (<= queue.buffering.max.messages)

librdkafka is asynch on the inside, the produce() API cannot easily trigger the broker thread to send off the currently buffered messages.
I guess your queue.buffering.max.messages value is fabricated for testing, though?
It should typically be set quite high.

@ghost
Copy link
Author

ghost commented May 14, 2014

I set the batch.num.messages <= queue.buffering.max.messages
So there is no way to trigger the broker thread when the queues are running
full ?
So I need to set queue.buffering.max.ms very low so that the queues are not
running full

On Wed, May 14, 2014 at 2:01 PM, Magnus Edenhill
[email protected]:

Did you set batch.num.messages accordingly too? (<=
queue.buffering.max.messages)

librdkafka is asynch on the inside, the produce() API cannot easily
trigger the broker thread to send off the currently buffered messages.
I guess tyour queue.buffering.max.messages value is fabricated for
testing, though?
It should typically be set quite high.


Reply to this email directly or view it on GitHubhttps://github.com//issues/117#issuecomment-43071705
.

@edenhill
Copy link
Contributor

Currently no way no.

May I ask what is the reason for decreasing the internal queue size? (queue.buffering.max.messages)

If you need to decrease the internal queues, for whatever reason, you need to calculate proper values for batch.num.messages, queue.buffering.max.messages and queue.buffering.max.ms according to your expected produce() rate.

E.g. if you think you will produce() 1000 messages/s, then set:
batch.num.messages = 100,
queue.buffering.max.ms=100
queue.buffering.max.messages=1000

This will make sure one batch of messages is sent to the broker every 100ms (containing 100 messages or less).
But you will also need to take some head room in ..max.messages for the broker latency, so lets say that is 200ms for ack:ing 100 messages, then set queue.buffering.max.messages to 1200.

@ghost
Copy link
Author

ghost commented May 14, 2014

I was playing with these parameters to see what happens with the producer
I need to send a lot of messages and the rate is not fixed

On Wed, May 14, 2014 at 2:26 PM, Magnus Edenhill
[email protected]:

Currently no way no.

May I ask what is the reason for decreasing the internal queue size?
(queue.buffering.max.messages)

If you need to decrease the internal queues, for whatever reason, you need
to calculate proper values for batch.num.messages,
queue.buffering.max.messages and queue.buffering.max.ms according to your
expected produce() rate.

E.g. if you think you will produce() 1000 messages/s, then set:
batch.num.messages = 100,
queue.buffering.max.ms=100
queue.buffering.max.messages=1000

This will make sure one batch of messages is sent to the broker every
100ms (containing 100 messages or less).
But you will also need to take some head room in ..max.messages for the
broker latency, so lets say that is 200ms for ack:ing 100 messages, then
set queue.buffering.max.messages to 1200.


Reply to this email directly or view it on GitHubhttps://github.com//issues/117#issuecomment-43073632
.

@edenhill
Copy link
Contributor

Okay, then I suggest you keep max.messages to a high value.

Think of it as your safety harness if the broker or broker connection starts acting up,
the more messages you are allowed to queue, the less likelyhood of undeliverable messages in case of shorter broker or network failures.
This comes at the expense of memory of course.

@edenhill
Copy link
Contributor

Let me know if this solved it for you so I can close the issue. Thanks

@ghost
Copy link
Author

ghost commented May 15, 2014

you can close the issue

On Thu, May 15, 2014 at 10:53 AM, Magnus Edenhill
[email protected]:

Let me know if this solved it for you so I can close the issue. Thanks


Reply to this email directly or view it on GitHubhttps://github.com//issues/117#issuecomment-43183872
.

@edenhill
Copy link
Contributor

Thanks

@mikesparr
Copy link

@edenhill I got through the consumer issues and noticed the topic message count was greater than the Elasticsearch index count. In grep'ing logs I found these errors and error count almost matches the missing record count.

I notice you recommend some setting above and it's not clear exactly where to apply the settings for librdkafka using the confluent-python-kafka client library. When I try some of these in the conf Dictionary the app won't run. I appreciate an example python client config to adjust the settings to prevent Queue full issues we're seeing:

conf = {'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'client.id': socket.gethostname(),
                'default.topic.config': {'acks': 'all'}
            }

Errors in logs:
screen shot 2016-07-06 at 12 48 20 pm

@mikesparr
Copy link

If I edit the confluent-kafka-python conf object to the following increasing the millis for buffering and max messages but decreasing the batch size, is that recommended way to make sure I have plenty of buffer to avoid this?

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'queue.buffering.max.messages': 500000,
                'queue.buffering.max.ms': 60000,
                'batch.num.messages': 100,
                'log.connection.close': False,
                'client.id': socket.gethostname(),
                'default.topic.config': {'acks': 'all'}
            }

@mikesparr
Copy link

With revised settings above, 0 errors in logs for diff/transform apps now!

screen shot 2016-07-06 at 3 08 04 pm

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2016

I'm guessing this is related to the confluent-kafka-python issue (not calling poll() frequently enough)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants