Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Want to make sure I'm using rd_kafka_poll correctly #71

Closed
winbatch opened this issue Feb 3, 2014 · 8 comments
Closed

Want to make sure I'm using rd_kafka_poll correctly #71

winbatch opened this issue Feb 3, 2014 · 8 comments
Labels

Comments

@winbatch
Copy link

winbatch commented Feb 3, 2014

Hi,

I know we've discussed previously, but I want to make sure I'm using poll correctly. As I understand it, it will return when there is a delivery or error available. In my use case, I don't want to publish the next message unless I've received an acknowledgement for the prior message. I'm in the process of doing testing and am bringing down one of the brokers while publishing. I'm pretty sure there are cases where poll is returning >=1, simply because it called my error callback with a non fatal error (one of the brokers being unreachable). However, that also means that poll is returning, even though I haven't received an acknowledgement for my last message. That would seem to imply that I need to continue polling (in a loop) until I specifically get either a fatal error or an acknowledgement, since checking that poll had returned at least one event may not necessarily indicate that what I just produced() was either successful or failed (and the callbacks could all be just informational. Is this correct?
I'm seeing that multiple messages are 'outstanding' waiting for delivery callbacks, and in my case I should only ever have one outstanding.

@edenhill
Copy link
Contributor

edenhill commented Feb 3, 2014

Oh, you want a sync interface. You do know that is the ultimate performance killer of all times, yes?

Each message produced will take at least the round-trip time to the broker + broker disk flush or propagation to other brokers. e.g: RTT (2ms) and required.acks > 1 (3ms) = 5ms per message = max 200 messages/second.

You typically dont need a sync interface for reliability since rdkafka will make sure the message is acked by the broker or errored back the application properly.

Having said all that, this is how you implement a sync interface:


void dr_cb (...err, , void *msg_opaque) {
     int *produce_statusp = (int *)msg_opaque;

     /* set sync_produce()'s produce_status value to the error code (which can be NO_ERROR) */
     *produce_statusp = err;
}

int sync_produce (rkt, msg..) {
   int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */

   rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */);

   do {
     /* poll dr and error callbacks. */
     rd_kafka_poll(rk, 1000);
    /* wait for dr_cb to be called and setting produce_status to the error value. */
   } while (produce_status == -100000);

  if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR)
   return SUCCESS!;
  else
   return FAILURE;
}

@winbatch
Copy link
Author

winbatch commented Feb 3, 2014

Thanks. I've effectively done the below. Note that I'm sending an MB at a
time so the number of msgs a second is less important to me than the amount
of data transferred per time interval

On Monday, February 3, 2014, Magnus Edenhill [email protected]
wrote:

Oh, you want a sync interface. You do know that is the ultimate
performance killer of all times, yes?

Each message produced will take at least the round-trip time to the broker

  • broker disk flush or propagation to other brokers. e.g: RTT (2ms) and
    required.acks > 1 (3ms) = 5ms per message = max 200 messages/second.

You typically dont need a sync interface for reliability since rdkafka
will make sure the message is acked by the broker or errored back the
application properly.

Having said all that, this is how you implement a sync interface:

void dr_cb (...err, , void *msg_opaque) {
int *produce_statusp = (int *)msg_opaque;

 /* set sync_produce()'s produce_status value to the error code (which can be NO_ERROR) */
 *produce_statusp = err;

}

int sync_produce (rkt, msg..) {
int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */

rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */);

do {
/* poll dr and error callbacks. /
rd_kafka_poll(rk, 1000);
/
wait for dr_cb to be called and setting produce_status to the error value. */
} while (produce_status == -100000);

if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR)
return SUCCESS!;
else
return FAILURE;
}

Reply to this email directly or view it on GitHubhttps://github.com//issues/71#issuecomment-33926763
.

@edenhill
Copy link
Contributor

edenhill commented Feb 3, 2014

Okay, you mentioned earlier this data was compressed. Are you letting the producer compress it or is it already compressed when you hand it to the producer?

@winbatch
Copy link
Author

winbatch commented Feb 3, 2014

Letting producer compress. This is streaming log data as the files are
written

On Monday, February 3, 2014, Magnus Edenhill [email protected]
wrote:

Okay, you mentioned earlier this data was compressed. Are you letting the
producer compress it or is it already compressed when you hand it to the
producer?

Reply to this email directly or view it on GitHubhttps://github.com//issues/71#issuecomment-33947133
.

@edenhill
Copy link
Contributor

edenhill commented Feb 3, 2014

Okay, you might want to compress the message before handing it over to the producer for performance reasons on the broker.

See, when a Kafka producer compresses a message (or actually a set of messages - 1 or more), it compresses the message header and the message payload. When the broker receives this compressed message set it will uncompress it, assign message offsets, and then recompress it.
This makes sense when there are multiple messages in a message set, but in your case there will only ever be one message in each message set, and a pretty large one, so the broker will uncompress+recompress for no real purpose.

@winbatch
Copy link
Author

winbatch commented Feb 3, 2014

If I do that, then the consumer would need to explicitly uncompressed it ,
right? If so, I don't want to impose that.

On Monday, February 3, 2014, Magnus Edenhill [email protected]
wrote:

Okay, you might want to compress the message before handing it over to the
producer for performance reasons on the broker.

See, when a Kafka producer compresses a message (or actually a set of
messages - 1 or more), it compresses the message header and the message
payload. When the broker receives this compressed message set it will
uncompress it, assign message offsets, and then recompress it.
This makes sense when there are multiple messages in a message set, but in
your case there will only ever be one message in each message set, and a
pretty large one, so the broker will uncompress+recompress for no real
purpose.

Reply to this email directly or view it on GitHubhttps://github.com//issues/71#issuecomment-33948568
.

@edenhill
Copy link
Contributor

edenhill commented Feb 3, 2014

Yep, thats correct.

@edenhill
Copy link
Contributor

edenhill commented Feb 3, 2014

All good? Reopen if not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants