Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

As a consumer, how can I get partition count for a topic #94

Closed
winbatch opened this issue Mar 21, 2014 · 10 comments
Closed

As a consumer, how can I get partition count for a topic #94

winbatch opened this issue Mar 21, 2014 · 10 comments
Assignees
Milestone

Comments

@winbatch
Copy link

Let's say I have a known algorithm for choosing the partition in my publisher. However, that algorithm takes the partition count into consideration via the partitioner_callback. How can I get the partition count in a consumer so I can apply a similar algorithm?

@edenhill
Copy link
Contributor

At what time do you want the partition count?

  • before calling consume_start()
  • updated periodically (e.g., every 10s)
  • for each consumed messages

@winbatch
Copy link
Author

In this particular case, before calling consume start

On Friday, March 21, 2014, Magnus Edenhill [email protected] wrote:

At what time do you want the partition count?

  • before calling consume_start()
  • updated periodically (e.g., every 10s)
  • for each consumed messages

Reply to this email directly or view it on GitHubhttps://github.com//issues/94#issuecomment-38339854
.

@edenhill
Copy link
Contributor

Ok, your program would have to wait for this information to be available (rdkafka needs to connect to brokers, query metadata, etc). I see two alternatives here:

  • async approach: you register a callback for topic metadata updates which will be called when information regarding a topic changes (partition count, leader change, ..)
  • sync approach: will block until the metadata is available, possibly with a timeout argument - this is sketchy since timeout values are arbitrary.

The async approach is preferred and the application can provide its own sync version on top of it if necessary.

Comment?

@edenhill
Copy link
Contributor

This is related to issue #90

@winbatch
Copy link
Author

that's fine. In this particular use case, I just want it at program
startup. Basically, I'm implementing a simple kafka 'grep' so that someone
gives me a topic and I find their message. One of the things they pass in
is the key, and I'll call my algorithm for determining which of the
partitions to look on. (And since my algorithm requires the # of
partitions I need this info)

On Fri, Mar 21, 2014 at 10:19 PM, Magnus Edenhill
[email protected]:

Ok, your program would have to wait for this information to be available
(rdkafka needs to connect to brokers, query metadata, etc). I see two
alternatives here:

  • async approach: you register a callback for topic metadata updates
    which will be called when information regarding a topic changes (partition
    count, leader change, ..)
  • sync approach: will block until the metadata is available, possibly
    with a timeout argument - this is sketchy since timeout values are
    arbitrary.

The async approach is preferred and the application can provide its own
sync version on top of it if necessary.

Comment?

Reply to this email directly or view it on GitHubhttps://github.com//issues/94#issuecomment-38340524
.

@edenhill
Copy link
Contributor

Okay, so then your program would look something like:

int the_part_cnt = -1;
topic_metadata_callback(..) {
    the_part_cnt = good_intel.part_cnt;
}

main() {

   rd_kafka_new();
   ... register callbacks bla bla...

  wait_for(the_part_cnt != -1);

  part = partitioner(key, the_part_cnt);

  consume_start(,..part);

   ...consume messages...

 ...decommission...
}


Right?

@winbatch
Copy link
Author

looks reasonable

On Fri, Mar 21, 2014 at 10:30 PM, Magnus Edenhill
[email protected]:

Okay, so then your program would look something like:

int the_part_cnt = -1;
topic_metadata_callback(..) {
the_part_cnt = good_intel.part_cnt;
}

main() {

rd_kafka_new();
... register callbacks bla bla...

wait_for(the_part_cnt != -1);

part = partitioner(key, the_part_cnt);

consume_start(,..part);

...consume messages...

...decommission...
}

Right?

Reply to this email directly or view it on GitHubhttps://github.com//issues/94#issuecomment-38340752
.

@edenhill
Copy link
Contributor

The topic metadata is a bit complex, it will be formatted something like this:

void your_topic_info_callback(rd_kafka_topic_t *rkt, const struct *TopicInfo, void *rkt_opaque);

struct TopicInfo {
    int partition_cnt;
    struct PartitionInfo[partition_cnt];
};

struct PartitionInfo {
   int32_t partition_id;
   int32_t leader;
   int replica_cnt;
   int32_t replicas[replica_cnt];
   int isr_cnt;
   int32_t isrs[isr_cnt];
};

@edenhill edenhill added this to the 0.8.4 milestone Mar 22, 2014
@edenhill edenhill self-assigned this Mar 22, 2014
edenhill added a commit that referenced this issue Mar 26, 2014
@edenhill
Copy link
Contributor

Have a look in rdkafka_example.c (look for metadata) for how to use the new Metadata API.

This also adds a handy -L switch rdkafka_example:

$ examples/rdkafka_example -L -t onepart
Metadata for onepart (from broker -1):
 1 brokers:
  broker 3 at bib.local:9093
 1 topics:
  topic "onepart" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 3, replicas: 3, isrs: 3
$ examples/rdkafka_example -L | head -30
Metadata for all topics (from broker -1):
 3 brokers:
  broker 1 at bib.local:9091
  broker 2 at bib.local:9092
  broker 3 at bib.local:9093
 14 topics:
  topic "z7" with 2 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
  topic "rdkafkatest1_auto_49f744a4327b1b1e" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  topic "rdkafkatest1_auto_e02f58f2c581cba" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  topic "aba" with 2 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
    partition 1, leader 1, replicas: 1, isrs: 1
  topic "aba3" with 2 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
  topic "z" with 2 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
  topic "onepartx" with 2 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
  topic "rdkafkatest2" with 2 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
...

@edenhill
Copy link
Contributor

Please verify this on your end aswell.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants