-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
producer message queue persistance on disk #31
Comments
Hi, I am interested in this feature, esp if this can persist any pending messages that have no ack and are still in the buffer before destroy (which will be re-queued on startup). Do you have any ideas on how you would implement this? Otherwise I will try to look into this to see if I can create a pull request at one time with this functionality. |
My idea for this is to simply use mmap()ed files and point the rkbuf's there. |
Hi Magnus, Would this have to survive gracefull shutdown and startups? My initial thought is to create files equally sized to the max.messages Write to this file not on every queued message, but let a separate For as long as there are messages in a file which are not added in the Since the kafka and topic settings can change (if we want to restore Regards, B. On Mon, 2014-04-14 at 11:28 -0700, Magnus Edenhill wrote:
|
Good input, B! Regarding writev: Regarding reading incomplete spool files on restart after a non-graceful shutdown: Re message metadata:
Since messages may finish (through delivery or error) in a non-sequencial order (mainly due to them belonging to different partitions), there needs to be a spool file management layer that knows when the last message from a spool has finished and then remove the spool file. I do have an old working branch somewhere with some early work for something like this, I'll see if I can dig it up. |
The main issue for persistency is in producing the messages in my use-case. I'd rather have the consumer client rely on kafka for reading, so if you want to re-read data which was not properly read earlier, they do so by restarting from the last know good point (offset). This minimizes the duplication of kafka functionality in the client. Also the chance of having to store data locally is reduced to producers which connect to brokers that are not high available (safety net). |
Hi I'm really interested by this thread. We are about to start using Kafka over an inferior (but simpler!) inhouse "buffer" store that was based on Facebook's Scribe. We need to continue to support "fire and forget" style publishing from large PHP web app for high volume logging type purposes where delaying user request is not acceptable and we have several other parts of our infrastructure integrated with Scribe already. A significant benefit of Scribe is it's Buffer store where if it looses connection to upstream Scribe server it will start writing to disk until periodic check re-establishes connection. It then streams disk file back to upstream and when done turns back to straightforward in-memory streaming. Trying to figure out how to write a Scribe store that uses librdkafka to write to Kafka cluster as upstream is pretty complex given the same desire to tolerate longer outages without memory pressure on web app machines. Although Kafka is more reliable/highly available than current solution, we don't want to lose the protection of disk spooling on web servers for best-effort eventual delivery, especially since Kafka might be deployed in a different datacenter (low latency but still bigger risk of partition/network failure) to the web servers. In some ways the easiest option would be if librdkafka had a totally synchronous producer API such that we can try to produce a batch (to a single partition) and either get synchronous success or error returned. That way we can leave Scribe doing the partitioning and batching and handling failures by switching to buffering. Even then we would need a reasonable way to periodically check the partition is now writable again in order to start re-delivery of spooled messages. @edenhill do you have any thoughts on this? I know this thread is about implementing something similar within the library but, would it make sense to you to instead provide additional public API hooks that allow apps to customise this behaviour for themselves? For example if the library could accept a callback that was notified when a partition becomes unavailable and another when it becomes available again (in terms of successfully connected to a leader broker). Is there some way I can get close to that now without re-implementing a lot of details of meta-data retrieval etc. - the |
Hi @banks, that is an interesting use case, here are my thoughts: Sync APIWhile librdkafka is very asynchronous by nature it would be possible to create a synchronous batch producer API with a bit of work. However, the problem with synch APIs is performance, they will be bound to the round-trip-time (rtt) of the broker. This can be alleviated in some part by sending larger batches, but they on the other hand are limited by max message sizes, and will increase end-to-end latency. So if possible it is mostly a better idea to use async APIs, but the door isn't closed. Topic and partition statePropagating topic&partition state to the application in some way has been mentioned a bunch of times in different issues and maybe it's about time to actually implement it! The thinking goes along these lines:
Persistent queueThe existing plan is to provide a pluggable framework for message queue persistence, There are two alternative approaches here:
1 is simpler to implement and with mmaped files the overhead should be minimal, expect the occassional disk io flushes. If reordering is not a (big) concern a naiive implementation on your side could simply persist messages from the |
@edenhill Thanks for the detailed thoughts. I need a bit of time to digest that. I understand the performance issue around sync API but I guess the rationale is that Scribe already handles many of the concerns such as partitioning and batching and I'm trying to plug into the part that just sends a batch upstream which either succeeds or fails. If I wrote my own kafka protocol serializers for metadata and produce RPCs I could make that work really simply, at the expense of duplicating low level client code you've done and tested - I couldn't see an easy way to re-use that code from librdkafka on a quick glance. The max message size restriction in kafka is a limiter on batch size and so throughput for messages sent this way but in practice for our uses it would be unlikely to be a constraint. Especially since multiple partitions can be sending batches in parallel (at least waiting on acks in separate threads) on the same broker connection still without interfering and still give a per-partition sync send/ack interface. I'll let you know what we end up doing once we've had a chance to try a few options. |
Requests for a sync interface comes up quite frequently and I try to steer people clear off those ideas, but I acknowledge there are situations where such an interface is warranted and I'd like to accomodate those use cases in the best manner. To summarize: ThroughputThe main problem with a sync interface is throughput, the application must wait for the full server round-trip-time (rtt) before being able to send the next message(s). BatchingBatching alleviates this situation, thus the sync interface should also provide a batched version. Transactional batchingShould the produced batches be treated as atomic batches when sent to the broker? I'd be happy to work out a solution with you that suits your needs, it is not always optimal to conceive new functionality without an existing use case/problem to solve. |
In my company we use librdkafka to send events collected from an nginx server. These servers could be in locations that occasionally have connection issues to our kafka cluster. Because of this we wanted to have a (somewhat) reliable buffer so we do not lose too much if our producer dies when there are a lot of messages stuck in the output queue. What we do right now is to parse the the nginx log files and create a separate progress file per log file that stores the sent/unset status per log line. In this way we can make sure all messages are sent at least once. Throughput has not been an issue because so far it has been faster than our nginx server during normal network conditions (20k+ messages/second). Pros
Cons
MmapThe problem I see with mmap (correct me if I am wrong) is that it would still be a fixed size queue, and could potentially block the producer and whatever application it is connected to. In our case it would still help a lot though since I could simply store a single offset in the progress file (we process nginx logfiles sequentially in a single thread). Transactional batchingI like this feature and think it could be very useful (provides a lot of flexibility). Could each batch be treated as a queue and block when it is full? In this way the application does not necessary need to know about the sizes. Only needs to know what to do when the batch is full. Thanks! |
Hi @cleaton and thanks for your input. MmapIt would use fixed sized ledger files, when the first file is full, a second one is created and new messages are appended to it, and so on. Transactional batchingThat's a good idea, what would such an interface look like?
There is no point in blocking on an atomic sync batch send with too many messages since it is limited by a hard configuration value rather than any current queue depths. |
@edenhill, that transactional batching interface suggested would work wonderfully for my case too I think.
I'm not sure it's directly related to this issues (built in persistence) so would you prefer to have that conversation elsewhere? Another issue? Or an alternate discussion thread? I'm very happy to write up a more thorough overview of Scribe and our specific needs (with suggestions on potential API I think would fit them) if that helps? |
Is the sync interface for producer available now? |
@ankurjohri The sync interface is something else, see here: This issue here is tracking persisting messages on disk on the producer until they are acked by the broker. |
+1 I am also interested in this. |
mmap or not file corruption is related to the sync frequency, so this can be controlled at the expense of performance. |
It should optionally encrypt messages before writing to disk. |
@edenhill: At my company, we are looking to replace a rsyslog based sender with librdkafka, and one of the sticking points is the advantage given by rsyslog's "disk-assisted queues" (DAQ), which appears to be the same thing as is being discussed here, with the exception that with disk-assisted queues, the messages are written to disk only when they fail to be received by the rsyslog receiver, which sounds like your second alternative ("messages are only provided to the persistence framework when the destination is down"). I have considered rolling my own disk-persistence triggered from the "dr_cb" callback. I find it quite excellent that the entire message sticks around and is passed to that callback, and isn't deallocated until after the callback returns. Do you see any issues with using that callback for the purposes of writing failed messages to disk? I'd much rather use whatever you eventually come up with for this internal to librdkafka, so here's me hoping this gets implemented soon! Thanks! |
@billygout Unfortunately there are no plans to implement this in librdkafka in the foreseeable future, so you are better off writing your own, and you should be fine using the delivery callback. I do suggest, however, that you write the message to disk prior to produce(), and only mark the on-disk copy as "delivered" from your dr_msg_cb, this way your messages will also survive application crashes. |
@edenhill OK, thanks!
|
@billygout Do you plan to release this feature or make a PR? I'm currently looking for something similar. |
@zyrikby nope. i wish i was :) |
I implemented this with SQLite for our librdkafka wrapper. Works with about 350Msg/s on bare metal HP server. So I can tell you its working with (for me) acceptable speed. |
@DEvil0000, can you reveal any of the design aspects of it? I'd like to know: 1) whether you store every record or only the ones that fail, 2) are you using the delivery report callback (or any callback, for that matter), 3) how do you keep track of which ones have been sent, 4) does same program send the current messages along with the stored ones or do you have a separate program to send the stored ones, and 5) assuming you use the same program for producing both current and stored messages, how you switch between these tasks. |
SQLite db structure looks like: (id, key, message, state) on producer startup (after kafka connection is ok):
on produce call:
on dr_callback:
|
I was initially using a conservative configured sqlite approach for doing this but I do not remember full details. It was doing the retry based on metadata but also other things. The max msg rate with a reasonable size was about 10-15kMsg/s (single threaded).mmap is not a good idea for persisting data i think.Now I am with different architecture, erlang brod and mnesia (to persist) but did not test the throuput there.
Von meinem Samsung Gerät gesendet.
…-------- Ursprüngliche Nachricht --------
Von: sing3u <[email protected]>
Datum: 07.05.2018 20:52 (GMT+01:00)
An: edenhill/librdkafka <[email protected]>
Cc: "A. Binzxxxxxx" <[email protected]>, Mention <[email protected]>
Betreff: Re: [edenhill/librdkafka] producer message queue persistance on disk
(#31)
@DEvil0000, I thought partition leaderships are changed when brokers are online/offline while consumer are rebalanced when the consumers are offline/online.
@edenhill, do you think the storage management complexity warrants tools such as SQLLite or RocksDB?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
{"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/edenhill/librdkafka","title":"edenhill/librdkafka","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/edenhill/librdkafka"}},"updates":{"snippets":[{"icon":"PERSON","message":"@chienhsingwu in #31: @DEvil0000, I thought partition leaderships are changed when brokers are online/offline while consumer are rebalanced when the consumers are offline/online. \r\n\r\n@edenhill, do you think the storage management complexity warrants tools such as SQLLite or RocksDB?"}],"action":{"name":"View Issue","url":"#31 (comment)
|
@DEvil0000 I'm curious why mmap would be a bad idea for persisting data, care to elaborate? |
I am not an expert on this but as far as i know the data might get corrupted in case of process crashes and such.
Von meinem Samsung Gerät gesendet.
…-------- Ursprüngliche Nachricht --------
Von: Magnus Edenhill <[email protected]>
Datum: 07.05.2018 21:58 (GMT+01:00)
An: edenhill/librdkafka <[email protected]>
Cc: "A. Binzxxxxxx" <[email protected]>, Mention <[email protected]>
Betreff: Re: [edenhill/librdkafka] producer message queue persistance on disk
(#31)
@DEvil0000 I'm curious why mmap would be a bad idea for persisting data, care to elaborate?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
{"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/edenhill/librdkafka","title":"edenhill/librdkafka","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/edenhill/librdkafka"}},"updates":{"snippets":[{"icon":"PERSON","message":"@edenhill in #31: @DEvil0000 I'm curious why mmap would be a bad idea for persisting data, care to elaborate?"}],"action":{"name":"View Issue","url":"#31 (comment)"}}}
|
Use msync() to flush page cache to disk, just like fsync(): |
Thanks guys for the tips. @edenhill: another question, for our case, we would like to send the messages as soon as possible, I see queue.buffering.max.ms can be set to 1 or 0. What does the value 0 mean? Should we set to 0 instead of 1? |
Kafka is not meant to deliver messages as fast as possible end-to-end in terms of rtt. You can tune it to some degree with those settings and with related settings in the broker config but this will lower your bandwith and overall throuput dramatically. You can not have all the things.If you need fast end-to-end delivery you may use something else or rethink your architecture.
Von meinem Samsung Gerät gesendet.
…-------- Ursprüngliche Nachricht --------
Von: sing3u <[email protected]>
Datum: 08.05.2018 17:58 (GMT+01:00)
An: edenhill/librdkafka <[email protected]>
Cc: "A. Binzxxxxxx" <[email protected]>, Mention <[email protected]>
Betreff: Re: [edenhill/librdkafka] producer message queue persistance on disk
(#31)
Thanks guys for the tips.
@edenhill: another question, for our case, we would like to send the messages as soon as possible, I see queue.buffering.max.ms can be set to 1 or 0. What does the value 0 mean? Should we set to 0 instead of 1?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
{"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/edenhill/librdkafka","title":"edenhill/librdkafka","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/edenhill/librdkafka"}},"updates":{"snippets":[{"icon":"PERSON","message":"@chienhsingwu in #31: Thanks guys for the tips.\r\n\r\n@edenhill: another question, for our case, we would like to send the messages as soon as possible, I see queue.buffering.max.ms can be set to 1 or 0. What does the value 0 mean? Should we set to 0 instead of 1?"}],"action":{"name":"View Issue","url":"#31 (comment)"}}}
{"@type":"MessageCard","@context":"http://schema.org/extensions","hideOriginalBody":"false","originator":"37567f93-e2a7-4e2a-ad37-a9160fc62647","title":"Re: [edenhill/librdkafka] producer message queue persistance on disk (#31)","sections":[{"text":"","activityTitle":"**sing3u**","activityImage":"https://avatars0.githubusercontent.com/u/9533964?s=160\u0026v=4","activitySubtitle":"@chienhsingwu","facts":[]}],"potentialAction":[{"name":"Add a comment","@type":"ActionCard","inputs":[{"isMultiLine":true,"@type":"TextInput","id":"IssueComment","isRequired":false}],"actions":[{"name":"Comment","@type":"HttpPOST","target":"https://api.github.com","body":"{\"commandName\":\"IssueComment\",\"repositoryFullName\":\"edenhill/librdkafka\",\"issueId\":31,\"IssueComment\":\"{{IssueComment.value}}\"}"}]},{"targets":[{"os":"default","uri":"https://github.com/edenhill/librdkafka/issues/31#issuecomment-387452690"}],"@type":"OpenUri","name":"View on GitHub"},{"name":"Unsubscribe","@type":"HttpPOST","target":"https://api.github.com","body":"{\"commandName\":\"MuteNotification\",\"threadId\":21141975}"}],"themeColor":"26292E"}
|
@chienhsingwu See CONFIGURATION.md. Basically it sets the maximum amount of time the producer will buffer messages into a message-set/batch before sending to the broker. Lower buffer time = lower latency, and perhaps smaller batches depending on your produce rate. Smaller batches will result in slightly lower thruput. |
@DEvil0000 It all depends on your expectations and requirements, round trips in single digit milliseconds is indeed possible with Kafka. |
Hi @edenhill looking into the difference between callbacks such as dr_msg_cb and interceptor, sounds like interceptors are called independent from poll calls while dr_msg_cb requires poll calls. Do I need to call poll when using interceptor? |
on_send() is triggered from produce*() while on_acknowledgement() is called from internal librdkafka threads or produce*(), so there is no need to call poll() from the interceptors point of view. |
Hi @edenhill we started implementing based on interceptor and came to a couple of snags. We tested sending messages to invalid topic or invalid broker list to understand the call back semantics. The on_acknowledge interceptor call returns Success of delivery of a few messages before reporting the rest of those 10 failed. The number of success ones varies for invalid topic. For invalid broker list all were successful. we are using version 0.11.4. Are those cases reasonable tests? Any insights to share with us? below is the on_acknowledge call back:
}_ |
@chienhsingwu Interesting! What was your |
We did not configure request.required.acks. So it's whatever the default. the code does not use dr_cb/dr_msg_cb. But we did a separate test using them it seemed fine. |
Default acks is 1, so should be fine for this (but do consider using acks=all). I think the problem might be with missing dr_msg_cb, could you try setting one up (it doesnt need to do anything) and see if you get the same behaviour? |
I don't need to call poll, right? |
You will need to call poll() eventually to avoid the queue filling up. |
hmm, then in that case not much of a difference between interceptor and dr_msg_cb from our perspective... Independently of that, it's probably is desirable to break that dependency between interceptor and dr_msg_cb, right? |
The idea was to set a dr_msg_cb now to check if the lack of a dr_msg_cb is causing on_ack to receive success even on failure. |
Thanks. I will report back the results. |
OK just did that. The results did not change. |
Did you get the same error code in the dr_msg_cb as on_ack? |
I did not have poll called so I did not get called on dr_msg_cb. But a separate test with poll call on dr_msg_cb, without interceptor, showed correct errors. |
Can you try with dr_msg_cb and your interceptor at the same time? It would be interesting to compare error codes between the two |
@edenhill, we did that. The result remained the same. |
Thank you. We'll fix this on master soon and include it in the next maintenance release. |
Thanks! |
Persisting the producer message queue on disk would allow longer broker unavailability times.
The text was updated successfully, but these errors were encountered: