Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka] RdKafkaProducer has no way of error handling #749

Closed
Steveb-p opened this issue Feb 1, 2019 · 30 comments · Fixed by #1102
Closed

[kafka] RdKafkaProducer has no way of error handling #749

Steveb-p opened this issue Feb 1, 2019 · 30 comments · Fixed by #1102

Comments

@Steveb-p
Copy link
Contributor

Steveb-p commented Feb 1, 2019

When an error occurs in RdKafkaProducer, it is silently ignored - or, more accurately, since it occurs outside of the main thread of php/php-fpm, it is not reported back.

A simple test can be done to see this issue: trying to send a message to a non-existent Kafka server will result in nothing (see point 2. though)

This results in two things:

  1. Process has no way of knowing, that a message that was supposed to be delivered will be not. Producer returns immediately without waiting for message to be acknowledged.
  2. Due to how it is handled in arnaud-lb/php-rdkafka, process that was supposed to send the message will be locked and retry the operation for a long time (from my testing it seems around 5 mins, wasn't able to find a configuration option to change it to something else). In my case with default configuration of php-fpm docker image it resulted in fpm pool becoming locked after 5 requests being made, since thats the default configuration for max spawned children of php-fpm.
    While this particular part is not really possible to fix inside enqueue, it's important since the message might actually be delivered later on.

I'd like to ask for opinion regarding how RdKafkaProducer should handle this situation. IMO it is worthwile to add a configuration option to make sending messages synchronous for this particular Producer - or at least wait a specified amount of time for message to be potentially acknowledged.
This can be done by introducing this code at the end of send method:

$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
$start = microtime(true);
while ($this->producer->getOutQLen() > 0) {
     $this->producer->poll(1);

     if (microtime(true) - $start > 10) {
           throw new \RuntimeException("Message sending failed");
     }
}

This has a side effect of actually calling dr_msg_cb & error_cb callbacks, which are otherwise ignored (or at least that's what my testing indicated).

Thoughts?

@makasim
Copy link
Member

makasim commented Feb 1, 2019

I don't have an immediate answer. Will think about it

@Steveb-p
Copy link
Contributor Author

Steveb-p commented Feb 1, 2019

When callbacks are set up like so:

echo microtime(true), '<br />';
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
    echo nl2br(sprintf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason));
    echo microtime(true), '<br />';
});

$this->conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        // message permanently failed to be delivered
        echo nl2br(sprintf("Kafka error: %s", $message->err));
        echo microtime(true), '<br />';
    } else {
        // message successfully delivered
    }
});

They are called once and result in:

1549030182.6759 // start
1549030182.7177 // callbacks configure (Conf is built here)
Kafka error: Local: Host resolution failure (reason: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': Name does not resolve)
1549030182.7521 // Error cb was called
Kafka error: Local: All broker connections are down (reason: 1/1 brokers are down)
1549030182.7521 // Error cb was called again
1549030192.7326 // End of polling
1549030192.7373 // end

However, due to

$this->conf->set('debug', 'all);

when Conf instance was built I can see in stderr:

docker   | [01-Feb-2019 14:14:17] WARNING: [pool www] child 56 said into stderr: "%7|1549030457.427|BROKERFAIL|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: failed: err: Local: Host resolution failure: (errno: Bad address)"
docker   | [01-Feb-2019 14:14:17] WARNING: [pool www] child 56 said into stderr: "%7|1549030457.427|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Purging bufq with 0 buffers"
docker   | [01-Feb-2019 14:14:17] WARNING: [pool www] child 56 said into stderr: "%7|1549030457.427|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Updating 0 buffers on connection reset"
docker   | [01-Feb-2019 14:14:17] WARNING: [pool www] child 56 said into stderr: "%7|1549030457.754|NOINFO|rdkafka#producer-2| [thrd:main]: Topic view_events partition count is zero: should refresh metadata"
docker   | [01-Feb-2019 14:14:17] WARNING: [pool www] child 56 said into stderr: "%7|1549030457.754|METADATA|rdkafka#producer-2| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.427|CONNECT|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: broker in state DOWN connecting"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.448|BROKERFAIL|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: failed: err: Local: Host resolution failure: (errno: Bad address)"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.448|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Purging bufq with 0 buffers"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.448|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Updating 0 buffers on connection reset"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.754|NOINFO|rdkafka#producer-2| [thrd:main]: Topic view_events partition count is zero: should refresh metadata"
docker   | [01-Feb-2019 14:14:18] WARNING: [pool www] child 56 said into stderr: "%7|1549030458.754|METADATA|rdkafka#producer-2| [thrd:main]: Skipping metadata refresh of 1 topic(s): no usable brokers"
docker   | [01-Feb-2019 14:14:19] WARNING: [pool www] child 56 said into stderr: "%7|1549030459.447|CONNECT|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: broker in state DOWN connecting"
docker   | [01-Feb-2019 14:14:19] WARNING: [pool www] child 56 said into stderr: "%7|1549030459.475|BROKERFAIL|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: failed: err: Local: Host resolution failure: (errno: Bad address)"
docker   | [01-Feb-2019 14:14:19] WARNING: [pool www] child 56 said into stderr: "%7|1549030459.475|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Purging bufq with 0 buffers"
docker   | [01-Feb-2019 14:14:19] WARNING: [pool www] child 56 said into stderr: "%7|1549030459.475|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Updating 0 buffers on connection reset"

which indicates that librdkafka is still trying to send the message.
Until eventually it reaches:

docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.756|BUFQ|rdkafka#producer-2| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Updating 0 buffers on connection reset"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|TERMINATE|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Handle is terminating: failed 0 request(s) in retry+outbuf"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|BROKERFAIL|rdkafka#producer-2| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: No error information)"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|STATE|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Broker changed state UP -> DOWN"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|BROADCAST|rdkafka#producer-2| [thrd::0/internal]: Broadcasting state change"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|BUFQ|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|BUFQ|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|TERM|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Received TERMINATE op in state DOWN: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|TERMINATE|rdkafka#producer-2| [thrd:main]: Main background thread exiting"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|TERMINATE|rdkafka#producer-2| [thrd:app]: Destroying op queues"
docker   | [01-Feb-2019 14:14:42] WARNING: [pool www] child 56 said into stderr: "%7|1549030482.834|TERMINATE|rdkafka#producer-2| [thrd:app]: Termination done: freeing resources"
docker   | [01-Feb-2019 14:14:43] WARNING: [pool www] child 34 said into stdout: "Kafka error: -192Kafka error: -192Kafka error: -1921549030430<br />"

If Kafka is brought back up after www server received it's response (and callback were called and so on) I can see in Prometheus that message was successfully delivered:
image

Cross-referencing an issue that probably describes this problem in php-librdkafka wrapper:
#arnaud-lb/php-rdkafka/issues/196

@Steveb-p Steveb-p self-assigned this Feb 1, 2019
@Steveb-p
Copy link
Contributor Author

Steveb-p commented Feb 3, 2019

@makasim I've looked into this further and found two issues regarding topic configuration - in .NET client library of librdkafka and librdkafka itself (confluentinc/librdkafka#2202, confluentinc/confluent-kafka-dotnet#322).
It seems that setDefaultTopicConf creates a separate configuration object which has nothing to do with global configuration (as one might assume).
This might be the cause why my process remains alive in the background still trying to send messages despite PHP process starting to destruct all objects - since I was using global configuration to limit how long librdkafka should attempt to send the message.

However, it seems to me that enqueue-kafka should have some way of waiting for message to be delivered. Especially if the process sending the message is short lived and probably would not otherwise wait for potential message errors (by calling poll until queue is empty).

@makasim
Copy link
Member

makasim commented Feb 3, 2019

However, it seems to me that enqueue-kafka should have some way of waiting for message to be delivered. Especially if the process sending the message is short lived and probably would not otherwise wait for potential message errors (by calling poll until queue is empty).

I am completely sold at it. The message queue is async by its nature and should not wait for anything. Also waiting for confirmation might drastically decrease the performance of the producer process. The wait operation kind of blocking and the script does not do anything valuable while waiting for the confirmation.

@Steveb-p
Copy link
Contributor Author

Steveb-p commented Feb 4, 2019

I agree to an extent. However, currently our Kafka integration will silently ignore all Producer errors, unless someone notices that messages are missing and/or looks into syslog. PHP process itself is unaware of any issues and will look as if everything is ok.

Librdkafka itself suggests calling poll method to check subthread for message errors & acknowledgement (https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer-message-delivery-success, https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#threads-and-callbacks, https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.poll.html).

From librdkafka:

A poll-based API is used to provide signaling back to the application, the application should call rd_kafka_poll() at regular intervals.

From PHP wrapper (php-rdkafka)

An application using a sub-class of RdKafka should make sure to call poll() at regular intervals to serve any queued callbacks waiting to be called.

It's the only way for error callbacks to even be fired. From what I've established poll method does not actually query Kafka instances, it just checks whether messages changed their state in subthreads queue. We can safely call $producer->poll(0) (I assumed 1ms timeout in my examples as I often see 0 as "wait indefinitely", which is not the case) which will fire registered callbacks and return immediately.

timeout_ms (integer)
Specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for an event, provide -1.

That's why I've suggested providing a method in the Kafka client to call poll methods of any created Producers and maybe some method to proxy to queue length ($producer->getOutQLen() which returns count of messages that librdkafka is still trying to send). I know that unfortunately it's not part of PsrProducer interface :(

Also, I would make a PR with some common configuration options explained and added to current docs (Kafka part) with potential pitfalls if they are not configured (like message.timeout.ms that should be added to topic configuration property - not global - to prevent php-fpm becoming stuck for 60s for each request if Kafka broker is unavailable) if you're ok with that.

@makasim
Copy link
Member

makasim commented Feb 4, 2019

@Steveb-p thanks for your very well detailed answer. I agree that the issue should be addressed. Would you be able to provide a PR for this issue?

@Steveb-p
Copy link
Contributor Author

Steveb-p commented Feb 4, 2019

@makasim Yes, will do. Just wanted to clarify what approach I should take before doing anything :)

@Steveb-p
Copy link
Contributor Author

@makasim I've looked Interop\Queue\Producer interface looks and there is really no reasonable way other than adding an additional method just for RdKafkaProducer :(
The fact that poll method should be called when producing is specific to RdKafkaProducer and probably should not warrant adding a new method to the interface just for that. On the other hand, it should be called periodically to call any callbacks that we're registered in Conf and to empty the queue and eventually free the process. I was considered using ticks for that, but that seems invasive and out of user control.

WDYT?

To reiterate my previous comments:
Not calling poll has the following effects:

  • Callbacks are not fired (errors, stats)
  • Queue is not emptied on connectivity errors (possibly more), causing processes (especially php-fpm) to get stuck.

poll should be called in regular intervals as long as there are items in queue. Calling it in send is bad since it will cause applications to slow down / block there for no reason.

@makasim
Copy link
Member

makasim commented Mar 29, 2019

Can we call poll just after sending a message, is there a way to wait for callback, stats ?

poll should be called in regular intervals as long as there are items in queue. Calling it in send is bad since it will cause applications to slow down / block there for no reason.

@makasim
Copy link
Member

makasim commented Mar 29, 2019

@mariusbalcytis
Copy link
Contributor

mariusbalcytis commented May 21, 2019

I think it's best to have 2 modes available:

  • slow down / block the execution. I don't see this as "for no reason". It's the default for any other remote calls in PHP - HTTP libraries, database connection etc.
  • make another method to explicitly call for developer herself to get all the responses. We can add on top of this to make methods to send confirmed batches of messages or similar functionality.

Making this automagically would be hard to control. For example, we need flow control available for errors - what we do if there was an error when sending the message (that's the reason for all of this, isn't it?). If we'd get exception sometime later (and we cannot even predict when), we would not be able to handle the exact situation and for example to save (some) messages to disk or database for later processing, if they are really important ones.

@Steveb-p
Copy link
Contributor Author

slow down / block the execution. I don't see this as "for no reason". It's the default for any other remote calls in PHP - HTTP libraries, database connection etc.

Not really. A lot of drivers support asynchronous execution. Producing a message (which is done in a subthread by the way) can take from a couple ms's to a few seconds at worst, and there are legitimate reasons to allow the program to execute further and - possibly - produce more messages.

In my opinion the choice between waiting for message production results should be left to the user. We've mostly talked about what approach would be best, aiming to contain classes within their interface declaration (queue-interop), but it seems that this limitation is hurting this transport.

My current approach would be to either attach some property to message instance, causing it to block (which would be transport specific behavior and will need to be documented) and/or extend Context with a new methods like poll, getQueueLength and flush (first two being name exactly like their counterparts in phprdkafka).

@makasim
Copy link
Member

makasim commented May 27, 2019

@Steveb-p or it could be a similar approach I did for subscription consumer. A context could create a producer responsible for such kind of message publishing. If transport does not support it than NotSupportedException is thrown. That way we do not pollute context with extra methods which is good. WDYT?

That producer could be implemented for kafka and rabbitmq (with publisher confirms mode)

@Steveb-p
Copy link
Contributor Author

@makasim In other words you want to introduce SubscriptionProducer-like interface to queue-interop?

I'm not really familiar with SubscriptionConsumer concept, so I'll have to actually read the code :)
I've taken a skim on tests related to it, but they're not obvious for me at a glance.

@makasim
Copy link
Member

makasim commented May 27, 2019

I am thinking of AsyncProducer. Not sure if this a good idea or not, needs poc.

@Steveb-p
Copy link
Contributor Author

namespace Interop\Queue;

interface AsyncProducer extends Producer
{
    // Async methods
}

created from $context->createProducer(...) and then rely on user code to check against ProducerAsync if they want to use async-specific methods?

@koderoff
Copy link

koderoff commented Jun 7, 2019

Any news on this?

@stale
Copy link

stale bot commented Jul 7, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix label Jul 7, 2019
@Steveb-p
Copy link
Contributor Author

Be silent, bot! I'll work on it. I promise! :)

@stale stale bot removed the wontfix label Jul 11, 2019
@stale
Copy link

stale bot commented Aug 10, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix label Aug 10, 2019
@makasim makasim added the pinned label Aug 10, 2019
@stale stale bot removed the wontfix label Aug 10, 2019
@Steveb-p
Copy link
Contributor Author

Steveb-p commented Sep 2, 2019

Just wanted to mention that phprdkafka 4.0 is being prepared, where producer error handling will be moved to userland code.

See arnaud-lb/php-rdkafka#196 (comment)

@mlively
Copy link

mlively commented Sep 13, 2019

@Steveb-p is a fix for this dependent on phprdkafka 4? Also, do you have recommendations on how to handle guaranteed delivery through enqueue-kafka for this short term? For now I am basically just making the change you'd commented about in RdKafkaProducer->send(). Probably not something that would work as a production change for the library but for a temp patch for us it seems to do the trick (once I implemented both error AND delivery report handling.)

Lastly, any input or help I can give to move forward?

@Steveb-p
Copy link
Contributor Author

@mlively definitely any solution would not depend on phprdkafka 4.0. It’s just a note for my future self to keep it in mind. If anything phprdkafka will force error handling in user code, since it will no longer retry sending messages in thread keeping php process alive during shutdown.

@maks-rafalko
Copy link

I would like to share our experience with the provided code in the description of this PR.

The code with polling actually works and triggers an error callback, which gives us an opportunity to throw custom exception there.

The only one thing I noticed, is that sometimes this loop while ($this->producer->getOutQLen() > 0) { is never ends and we really need a condition to break this infinity loop, which is kinda weird.

Regarding Symfony Messenger integration - we already integrated this approach via custom middleware that allows us to send all failed (due to Kafka's unavailability) dispatched messages to Failure Transport. For more details and code examples, see symfony/symfony#35521


@Steveb-p what are the current blockers to have this code inside RdKafkaProducer in this lib?

I've also benchmarked overhead of calling poll() after produce(v). For ~1000 calls, here is the overhead:

average: 0.0051571083068848 seconds

which is really not noticeable.

Can I help with something to get this code merged to the master branch so other developers don't have to spent their time trying to understand where are their messages :) ?

@Steveb-p
Copy link
Contributor Author

Steveb-p commented Jan 30, 2020

what are the current blockers to have this code inside RdKafkaProducer in this lib?

@maks-rafalko Time really. And some design choice.

Ideally we would like to have something that will either call poll on it's own in set intervals or allow user to do so.
Originally I didn't want to expand the class with another method to keep the interchangeability promise that makasim raised (being the original library author I kind of have to respect his view on the matter). It's up for discussion.

EDIT: You can find me on both enqueue-dev and phprdkafka gitter channels, as well as Symfony#kafka and phprdkafka Slack channels, if you want to talk it through. I'm working on things unrelated to Kafka atm unfortunately, otherwise it would be on my priority list to finish 😞 .

@makasim
Copy link
Member

makasim commented Jan 30, 2020

RdKafkaProducer might introduce a transport-specific method. That's fine. So people can do something like this (for example, in the extension):

if $producer instance of RdKafkaProducer {
  $producer->poolOrSomething()
}

@makasim
Copy link
Member

makasim commented Jan 30, 2020

I would introduce a method to get errors and leave it up to developers to use it or not.
Or set a flag that enabled pooling on publishing.

@Steveb-p
Copy link
Contributor Author

I would introduce a method to get errors and leave it up to developers to use it or not.
Or set a flag that enabled pooling on publishing.

Understood.

I'll have to refresh my memory then 😄

@nick-zh
Copy link
Contributor

nick-zh commented Sep 14, 2020

@Steveb-p my thoughts on this, at least for php-rdkafka:4.x would be, that you call poll(0) right after produce, as @maks-rafalko mentioned, it poses very little overhead. Doing this, you will fire a non-blocking call and give the chance for callbacks getto be fired after every message. Calling flush (which you implemented in #959) will also internally trigger poll during shutdown. Imho this should be a reasonable approach without the need to do it on the side in a timed manner. I am not too familiar with enqueue though, so i might miss something
With not setting an errorCb people will still have the option to ignore errors.

@devnix
Copy link

devnix commented Jul 2, 2022

Hi! I'm struggling trying to get an error when producing a message to a stopped Kafka instance. I guess it's related to the async nature of the rdkafka extension as I'm reading, is this solved or configurable in any way?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants