Skip to content

Commit

Permalink
Added Metadata API (issues #90 and #94)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 26, 2014
1 parent fb5d311 commit 6968e4b
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 233 deletions.
138 changes: 135 additions & 3 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,23 +192,33 @@ rd_kafka_op_t *rd_kafka_op_new (rd_kafka_op_type_t type) {


void rd_kafka_op_destroy (rd_kafka_op_t *rko) {

/* Decrease refcount on rkbuf to eventually free the shared buffer */
if (rko->rko_rkbuf)
rd_kafka_buf_destroy(rko->rko_rkbuf);
else if (rko->rko_payload && rko->rko_flags & RD_KAFKA_OP_F_FREE)
free(rko->rko_payload);

if (rko->rko_rkt)
rd_kafka_topic_destroy0(rko->rko_rkt);
if (rko->rko_metadata)
rd_kafka_metadata_destroy(rko->rko_metadata);

free(rko);
}

/**
* Destroy a queue. The queue must be empty.
*/
void rd_kafka_q_destroy (rd_kafka_q_t *rkq) {
rd_kafka_assert(NULL, TAILQ_EMPTY(&rkq->rkq_q));
if (rd_atomic_sub(&rkq->rkq_refcnt, 1) > 0)
return;

rd_kafka_q_purge(rkq);
pthread_mutex_destroy(&rkq->rkq_lock);
pthread_cond_destroy(&rkq->rkq_cond);

if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED)
free(rkq);
}

/**
Expand All @@ -218,12 +228,26 @@ void rd_kafka_q_init (rd_kafka_q_t *rkq) {
TAILQ_INIT(&rkq->rkq_q);
rkq->rkq_qlen = 0;
rkq->rkq_qsize = 0;
rkq->rkq_refcnt = 1;
rkq->rkq_flags = 0;

pthread_mutex_init(&rkq->rkq_lock, NULL);
pthread_cond_init(&rkq->rkq_cond, NULL);
}


/**
* Allocate a new queue and initialize it.
*/
rd_kafka_q_t *rd_kafka_q_new (void) {
rd_kafka_q_t *rkq = malloc(sizeof(*rkq));
rd_kafka_q_init(rkq);
rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
return rkq;
}



/**
* Purge all entries from a queue.
*/
Expand Down Expand Up @@ -469,6 +493,49 @@ void rd_kafka_op_err (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_log_buf(rk, LOG_ERR, "ERROR", buf);
}

/**
* Send request to queue, wait for response.
*/
rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq,
rd_kafka_q_t *recvq,
rd_kafka_op_t *rko,
int timeout_ms) {
rd_kafka_op_t *reply;

/* Bump refcount for destination, destination will decrease refcount
* after posting reply. */
rd_kafka_q_keep(recvq);

/* Indicate to destination where to send reply. */
rko->rko_replyq = recvq;

/* Wait for reply */
reply = rd_kafka_q_pop(recvq, timeout_ms);

/* May be NULL for timeout */
return reply;
}

/**
* Send request to queue, wait for response.
* Creates a temporary reply queue.
*/
rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
rd_kafka_op_t *rko,
int timeout_ms) {
rd_kafka_q_t *recvq;
rd_kafka_op_t *reply;

recvq = rd_kafka_q_new();

reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms);

rd_kafka_q_destroy(recvq);

return reply;
}



static const char *rd_kafka_type2str (rd_kafka_type_t type) {
static const char *types[] = {
Expand Down Expand Up @@ -1527,3 +1594,68 @@ rd_kafka_crash (const char *file, int line, const char *function,
rd_kafka_dump0(stderr, rk, 0/*no locks*/);
abort();
}


rd_kafka_resp_err_t
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
rd_kafka_topic_t *only_rkt,
const struct rd_kafka_metadata **metadatap,
int timeout_ms) {
rd_kafka_q_t *replyq;
rd_kafka_broker_t *rkb;
rd_kafka_op_t *rko;

/* Query any broker that is up, and if none are up pick the first one,
* if we're lucky it will be up before the timeout */
rd_kafka_lock(rk);
if (!(rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP))) {
rkb = TAILQ_FIRST(&rk->rk_brokers);
if (rkb)
rd_kafka_broker_keep(rkb);
}
rd_kafka_unlock(rk);

if (!rkb)
return RD_KAFKA_RESP_ERR__TRANSPORT;

/* Give one refcount to destination, will be decreased when
* reply is enqueued on replyq.
* This ensures the replyq stays alive even after we timeout here. */
replyq = rd_kafka_q_new();
rd_kafka_q_keep(replyq);

/* Async: request metadata */
rd_kafka_broker_metadata_req(rkb, all_topics, only_rkt, replyq,
"application requested");

rd_kafka_broker_destroy(rkb);

/* Wait for reply (or timeout) */
rko = rd_kafka_q_pop(replyq, timeout_ms);

rd_kafka_q_destroy(replyq);

/* Timeout */
if (!rko)
return RD_KAFKA_RESP_ERR__TIMED_OUT;

/* Error */
if (rko->rko_err) {
rd_kafka_resp_err_t err = rko->rko_err;
rd_kafka_op_destroy(rko);
return err;
}

/* Reply: pass metadata pointer to application who now owns it*/
rd_kafka_assert(rk, rko->rko_metadata);
*metadatap = rko->rko_metadata;
rko->rko_metadata = NULL;
rd_kafka_op_destroy(rko);

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

void rd_kafka_metadata_destroy (const struct rd_kafka_metadata *metadata) {
free((void *)metadata);
}

85 changes: 85 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,91 @@ int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partitition,



/*******************************************************************
* *
* Metadata API *
* *
*******************************************************************/


/**
* Metadata: Broker information
*/
struct rd_kafka_metadata_broker {
int32_t id; /* Broker Id */
char *host; /* Broker hostname */
int port; /* Broker listening port */
};

/**
* Metadata: Partition information
*/
struct rd_kafka_metadata_partition {
int32_t id; /* Partition Id */
rd_kafka_resp_err_t err; /* Partition error reported by broker */
int32_t leader; /* Leader broker */
int replica_cnt; /* Number of brokers in 'replicas' */
int32_t *replicas; /* Replica brokers */
int isr_cnt; /* Number of ISR brokers in 'isrs' */
int32_t *isrs; /* In-Sync-Replica brokers */
};

/**
* Metadata: Topic information
*/
struct rd_kafka_metadata_topic {
char *topic; /* Topic name */
int partition_cnt; /* Number of partitions in 'partitions' */
struct rd_kafka_metadata_partition *partitions; /* Partitions */
rd_kafka_resp_err_t err; /* Topic error reported by broker */
};


/**
* Metadata container
*/
struct rd_kafka_metadata {
int broker_cnt; /* Number of brokers in 'brokers' */
struct rd_kafka_metadata_broker *brokers; /* Brokers */

int topic_cnt; /* Number of topics in 'topics' */
struct rd_kafka_metadata_topic *topics; /* Topics */

int32_t orig_broker_id; /* Broker originating this metadata */
};


/**
* Request Metadata from broker.
*
* all_topics - if non-zero: request info about all topics in cluster,
* if zero: only request info about locally known topics.
* only_rkt - only request info about this topic
* metadatap - pointer to hold metadata result.
* The '*metadatap' pointer must be released
* with rd_kafka_metadata_destroy().
* timeout_ms - maximum response time before failing.
*
* Returns RD_KAFKA_RESP_ERR_NO_ERROR on success (in which case *metadatap)
* will be set, else RD_KAFKA_RESP_ERR__TIMED_OUT on timeout or
* other error code on error.
*/
rd_kafka_resp_err_t
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
rd_kafka_topic_t *only_rkt,
const struct rd_kafka_metadata **metadatap,
int timeout_ms);

/**
* Release metadata memory.
*/
void rd_kafka_metadata_destroy (const struct rd_kafka_metadata *metadata);








/*******************************************************************
Expand Down
Loading

0 comments on commit 6968e4b

Please sign in to comment.