Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching rd_kafka_topic_t #345

Closed
ottomata opened this issue Aug 7, 2015 · 26 comments
Closed

Caching rd_kafka_topic_t #345

ottomata opened this issue Aug 7, 2015 · 26 comments

Comments

@ottomata
Copy link
Contributor

ottomata commented Aug 7, 2015

It would be handy if librdkafka had the ability to cache initialized rd_kafka_topic_t topics so that (lazy) users wouldn't have to implement this themselves. That is, when producing to multiple topics, each unique rd_kafka_topic_t could be stored librdkafka.

Something like

rd_kafka_topic_t *rkt = rd_kafka_topic_get(rk, "topic_name");

This could either initialize the topic and store it (if topic conf is also given?), or it could return something indicating the topic doesn't exist. I suppose a rd_kafka_topic_store() function of some kind would also be needed.

(This doesn't already exist, does it?)

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

This is your lucky day, that is the exact behaviour of rd_kafka_topic_new() if the topic object already exists:

  • You will be returned the original rkt object with the refcount increased
  • The provided configuration object will be ignored.
  • Lookup is O(n) - where n is the number of unique topics you have rd_kafka_topic_new():ed.
  • Topic objects are refcount based, each .._new() call increments the refcount by one, each .._destroy() call decrements the refcount by one, when the refcount reaches 0 the object will be destroyed (unless there are internal refcounts, which there usually are, that will take some time to decommission).

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

It might be hard for an application to rely on the refcount to retain a cached copy since there is no indication from topic_new() if it returned a new object or a cached one. One way of solving this would be to add a topic.cache.linger.ms that keeps a topic object alive for the specified amount of time after the last _destroy() call.

@eugpermar
Copy link
Contributor

This could be my lucky day too if you decide to integrate it, I was about to use an intermediate rkt "cache" in order to integrate this functionality 😃

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

I think for my purposes (varnishkafka format.topic) this is perfect! Thank you!

@ottomata ottomata closed this as completed Aug 7, 2015
@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

Will you be needing the topic.cache.linger.ms config property?
You dont need it if you know your topics up front (i.e., have an initial ._new() refcount that doesn't go away).

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

@eugpermar :)

@eugpermar
Copy link
Contributor

Not for mine ones. My application needs to send messages to random (for message user provided) topic, so maybe in one moment refcount comes to 0, and in the next message I need to init it again.

If it's easy to develop it, and you give me a few hints, I can do it by myself and send you a PR

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

I'm also trying to do user provided topic name. Why do you need refcount? It should be fine to call rd_kafka_topic_new for every message, right? If rdkafka has the topic initialized (positive refcount), it will return the existing pointer, else it will make a new one, right?

@eugpermar
Copy link
Contributor

Hi @ottomata, sorry for not to introduce myself :).

For the first instance, yes. The problem is the performance: setting down and up sockets, connections, threads, queues, etc. I think that, in that scenario, I will waste more resources in setting up the toppar that in send the actual messages.

With a little timeout, I can save all this disconnection/reconnection process, and let the application to process messages.

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

Hm, so I'm worried about this too, but according to @edenhill, if you have already called rd_kafka_topic_new for a given topic, most likely it will not initialize a new topic the next time you call that with the same topic. It will skip all the connection setup if the topic already exists.

Am I correct?

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

The cost of setting up a new topic object is a metadata refresh from the brokers, which might not seem much but if you start pushing thousands of messages per second it just wont work with a metadata round trip for each message sent.
(To clarify: there are no socket disconnects or similar when adding/deleting a topic.)

@ottomata "most likely it will not" is absolutely correct for development systems but as soon as you roll out that stuff in production it is guaranteed to happen.
I.e., you must not rely on the topic remaining in the cache until your next new() call.
This is where the cache lingering comes in, it will make sure the object is destroyed until a certain linger time has elapsed.

@eugpermar
Copy link
Contributor

Yes, I'm talking about the other side:

Imagine that the application finish to flush all the messages, and I've already deleted my topic handler since I don't need anymore. So, librdkafka shutdown all the resources for that topic, and then... another message for that topic! It need to (TCP) connect Request metadata to broker again, and this process happening time and time again.

Maybe if I receive messages fast enough this does not happen, but I think that it's too risky :) My first (theoretical) approach was to put a layer on top of rd_kafka_topic_new in order to save the references long enough to avoid it happens.

Magnus was faster this time :) and since he knows a lot more of this kafka stuff, let's trust in he's criteria hehe

@edenhill edenhill changed the title Implement an rd_kafka_topic_t init/lookup function to cache already initialized topics Caching rd_kafka_topic_t Aug 7, 2015
@edenhill edenhill reopened this Aug 7, 2015
@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

Hm, @edenhill for varnishkafka, this should be ok though, no? It is very clear when _new() is called, and in fact, _destroy() is never called by it, since it expects to be producing to its topics. Will rdkafka call _destroy() in the background?

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

_new() and _destroy() will need to be symetric from the application side.

The problem is this:

void my_producer () {
  while (logline = read_a_logline()) {
   // 1
   rkt = rd_kafka_topic_new(rk, logline->some_topic_name, rd_kafka_topic_conf_dup(topic_conf));
   // 2 
   rd_kafka_produce(rkt, logline->a_message);
   // 3
   rd_kafka_topic_destroy(rkt);
  }
}

When the time between 3 and 1 (i.e., the time read_a_logline() takes to return) is longer than it takes for librdkafka to decommission and destroy a topic internally (which is an arbitrary number noone really knows) it means that a new topic object will be created for each produced message - something very costly.

On the other hand if you already know at application start all topics it might produce to you can seed the cache and create a topic objects up front, like so:

void app_init () {
  foreach topic in future_topics {
         rd_kafka_topic_new(rk, topic, rd_kafka_topic_conf_dup(topic_conf));
  } 
}

void my_producer () {
  app_init();

  while (logline = read_a_logline()) {
   // 1
   rkt = rd_kafka_topic_new(rk, logline->some_topic_name, NULL); // get cached object
   // 2 
   rd_kafka_produce(rkt, logline->a_message);
   // 3: destroy our refcnt from 1
   rd_kafka_topic_destroy(rkt);
  }

  app_term();
}

void app_term () {
  foreach topic in future_topics {
          rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL); // look up the handle
          // destroy refcnt from line above
          rd_kafka_topic_destroy(rkt);
          // destroy refcont from app_init()
          rd_kafka_topic_destroy(rkt);
     }
}

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

Ah, hm. But varnishkafka has no calls to rd_kafka_topic_destroy(). Perhaps this is fine in the original case where varnishkafka will only ever produce to one topic. I'm trying to implement dynamic topics based on format.topic like format.key (if I wanted to produce to a topic based on incoming Host HTTP header, I'd set format.topic = %{Host}i).

In my current WIP, I just call rd_kafka_topic_new() for every produce request. I suppose you are saying this is not good, and will cause rdkafka's refcount for a topic to get huge?

@eugpermar
Copy link
Contributor

Everything that is learned must be forgotten :) if you do not call destroy,
the memory usage will be growing in every new topic creation, and it will
never be freed.
El 07/08/2015 22:12, "Andrew Otto" [email protected] escribió:

Ah, hm. But varnishkafka has no calls to rd_kafka_topic_destroy().
Perhaps this is fine in the original case where varnishkafka will only ever
produce to one topic. I'm trying to implement dynamic topics based on
format.topic like format.key (if I wanted to produce to a topic based on
incoming Host HTTP header, I'd set format.topic = %{Host}i).

In my current WIP, I just call rd_kafka_topic_new() for every produce
request. I suppose you are saying this is not good, and will cause
rdkafka's refcount for a topic to get huge?


Reply to this email directly or view it on GitHub
#345 (comment)
.

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

I'd say not calling topic_destroy() is a termination bug, i.e., varnishkafka will not terminate cleanly, but that is not always a big concern.
So in theory your ever increasing refcnt with only _new() calls will work, until the refcnt wraps at 2.1 billion (which is perhaps 500 million messages, about 5 hours at 20k msgs/s) and then mayhem will ensue.

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

@eugpermar actually there is no memory allocation in calling topic_new() on an existing topic object. As long as the number of topic names is finite that should be fine.
refcnt wrapping wont

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

Aye, makes sense :)

So, the functionality that is really needed is a way to tell rdkafka that _destroy() won't be called, so don't bother keeping refcounts. Just keep the topic around forever.

?

@eugpermar
Copy link
Contributor

I meant in different topic creation. As you said, if you do not control the
different topics that you can create, and the client sent more and more new
topics, you will never be able to free used memory.
El 07/08/2015 22:18, "Magnus Edenhill" [email protected] escribió:

@eugpermar https://github.com/eugpermar actually there is no memory
allocation in calling topic_new() on an existing topic object. As long as
the number of topic names is finite that should be fine.
refcnt wrapping wont


Reply to this email directly or view it on GitHub
#345 (comment)
.

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

Aye.

@edenhill, would everything explode if I did rkt->rkt_refcnt = 1; after every _new() call? :D

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2015

@ottomata yes, dont ever mess with refcnts! :)
But I think that cache.linger.ms will solve your issue.

@ottomata
Copy link
Contributor Author

ottomata commented Aug 7, 2015

I think so too.

edenhill added a commit that referenced this issue Nov 16, 2016
…858, #707, #908, #345)

 - adds support for specifying the message timestamp at produce time (#707)
 - adds support for specifying topic by name rather than topic_t object (#908)
@edenhill
Copy link
Contributor

The new rd_kafka_producev() API can take a topic name rather than topic_t object, thus the application will no longer need to create and hold on to a topic_t.

@chienhsingwu
Copy link

@edenhill, the use case we have would require us to have a large number of topics. I am concerned that we might have too many of them in memory. How are the topics created from rd_kafka_producev managed? Is it still based on refcount internally?

@edenhill
Copy link
Contributor

edenhill commented Aug 7, 2018

Yes, they are automatically created under the hood and refcounted.
librdkafka currently never deletes any topic objects (until the client instance is destroyed).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants