Skip to content

Commit

Permalink
Some Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed Apr 3, 2023
1 parent c789847 commit 4ee6965
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 531 deletions.
46 changes: 0 additions & 46 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -894,20 +894,6 @@ typedef struct rd_kafka_scram_credential_list_s {
int size;
rd_kafka_scram_credential_t *credentials;
} rd_kafka_scram_credential_list_t;
/* ignore this one below*/
typedef struct rd_kafka_describe_user_scram_result_s {
char *user;
int16_t errorcode;
char *err;
rd_kafka_scram_credential_list_t *credentiallist;
} rd_kafka_describe_user_scram_result_t;

/* Ignore the one below*/
typedef struct rd_kafka_user_scram_result_list_s {
int cnt;
int size;
rd_kafka_user_scram_result_t *userscramresults;
} rd_kafka_user_scram_result_list_t;

/**
* @brief Topic+Partition place holder
Expand Down Expand Up @@ -956,32 +942,6 @@ typedef struct rd_kafka_topic_partition_list_s {
rd_kafka_topic_partition_t *elems; /**< Element array[] */
} rd_kafka_topic_partition_list_t;

typedef enum rd_kafka_offset_spec_s{
RD_KAFKA_LATEST_OFFSET_SPEC = 0,
RD_KAFKA_EARLIEST_OFFSET_SPEC = 1,
RD_KAFKA_TIMESTAMP_OFFSET_SPEC = 2
} rd_kafka_offset_spec_t;

typedef struct rd_kafka_list_offset_s{
rd_kafka_topic_partition_t topicPartition;
rd_kafka_offset_spec_t offsetSpec;
int64_t timestamp;
} rd_kafka_list_offset_t;

typedef struct rd_kafka_list_offset_list_s{
int count;
int size;
rd_kafka_list_offset_t *offsets;
} rd_kafka_list_offset_list_t;

RD_EXPORT
rd_kafka_list_offset_t *rd_kafka_list_offset_list_add();

RD_EXPORT
rd_kafka_list_offset_t *rd_kafka_list_offset_new();

RD_EXPORT
rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size);
/**
* @brief Create a new list/vector Topic+Partition container.
*
Expand All @@ -999,12 +959,6 @@ rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size);
RD_EXPORT
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new(int size);


RD_EXPORT
static void
rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t *rktpar,
int do_free);

/**
* @brief Free all resources used by the list and the list itself.
*/
Expand Down
172 changes: 2 additions & 170 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3598,172 +3598,6 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
return reply->rkbuf_err;
}

rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size){

rd_kafka_list_offset_list_t *listoffsets;
listoffsets = rd_calloc(1,sizeof(*listoffsets));
rd_kafka_list_offset_t *offset;
listoffsets->offsets = rd_calloc(size,sizeof(*offset));
listoffsets->size = size;
listoffsets->cnt = 0;
return listoffsets;
}

void rd_kafka_list_offset_list_add_copy(rd_kafka_list_offset_list_t *offsetlist,rd_kafka_list_offset_t *offset){

rd_kafka_list_offset_t *copiedoffset;
copiedoffset = &offsetlist->offsets[offsetlist->cnt++];
copiedoffset->offsetSpec = offset->offsetSpec;
copiedoffset->timestamp = offset->timestamp;
copiedoffset->topicPartition.topic = rd_strdup(offset->topicPartition.topic);
copiedoffset->topicPartition.partition = offset->topicPartition.partition;
copiedoffset->topicPartition.offset = offset->topicPartition.offset;
copiedoffset->topicPartition.err = offset->topicPartition.err;
copiedoffset->topicPartition.metadata_size = offset->topicPartition.metadata_size;
copiedoffset->topicPartition.metadata = NULL;
if(copiedoffset->topicPartition.metadata_size > 0){
copiedoffset->topicPartition.metadata = rd_strdup(offset->topicPartition.metadata);
}

}
rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_copy(rd_kafka_list_offset_list_t *offsetlist){
rd_kafka_list_offset_list_t *offsets;
offsets = rd_kafka_list_offset_list_new(offsetlist->size);
int i;
for(i=0;i<offsetlist->cnt;i++){
rd_kafka_list_offset_list_add_copy(offsets,&offsetlist->offsets[i]);
}
return offsets;
}
void rd_kafka_list_offset_destroy(rd_kafka_list_offset_t *offset,int8_t do_free){
rd_kafka_topic_partition_destroy(&offset->topicPartition);
if(do_free)
rd_free(offset);

}
void rd_kafka_list_offset_list_destroy(rd_kafka_list_offset_list_t *offsetlist){
int i;
for(i=0;i<offsetlist->cnt;i++){
rd_kafka_list_offset_destroy(&offsetlist->offsets[i],0);
}
if(offsetlist->offsets)
rd_free(offsetlist->offsets);

rd_free(offsetlist);
}

static void
rd_kafka_ListOffsetsResponse_merge(rd_kafka_op_t *rko_fanout,
const rd_kafka_op_t *rko_partial){
rd_kafka_list_offset_list_t *offsets = rd_list_elem(&rko_partial->rko_u.admin_result.results,0);
int i;

for(i=0;i<offsets->count;i++){
rd_kafka_list_offset_t *offset = rd_kafka_list_offset_list_add(rko_fanout->rko_u.admin_result.results);
offset->timestamp = offsets->offsets[i].timestamp;
offset->topicPartition.partition = offsets->offsets[i].topicPartition.partition;
offset->topicPartition.topic = strdup(offsets->offsets[i].topicPartition.topic);
offset->topicPartition.offset = offsets->offsets[i].topicPartition.offset;
}


}
static rd_kafka_resp_err_t
rd_kafka_ListOffsetsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_op_t **rko_resultp,
rd_kafka_buf_t *reply,
char *errstr,
size_t errstr_size){

rd_kafka_topic_partition_list_t *requested_offsets = rd_list_elem(&rko_req->rko_u.admin_request.args,0);
rd_kafka_list_offset_list_t *offsets = rd_kafka_list_offset_list_new(requested_offsets->count);

rd_kafka_parse_ListOffsets(reply,offsets);





}


static rd_kafka_op_res_t
rd_kafka_ListOffsets_leaders_queried_cb(rd_kafka_t *rk,
rd_kafka_q_t *rkq,
rd_kafka_op_t *reply){

rd_kafka_resp_err_t err = reply->rko_err;
const rd_list_t *leaders =
reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */
rd_kafka_topic_partition_list_t *partitions =
reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */
rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque;

static const struct rd_kafka_admin_worker_cbs cbs = {
rd_kafka_ListOffsetsRequest,
rd_kafka_ListOffsetsResponse_parse,
};

rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
RD_KAFKA_OP_ADMIN_FANOUT);

if (err == RD_KAFKA_RESP_ERR__DESTROY)
goto err;

rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1,
rd_kafka_list_offset_result_t_destroy_free);
rd_kafka_list_offset_list_t *response_list = rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results,0);

RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
rd_kafka_topic_partition_t *rktpar2;

if (!rktpar->err)
continue;

rd_kafka_list_offset_t *response = rd_kafka_list_offset_list_add(response_list);
response->topicPartition.topic = strdup(rktpar->topic);
response->topicPartititon.partition = rktpar->partition;
response->topicPartition.err = strdup(rktpar2->err);
}
if (err) {
err:
rd_kafka_admin_result_fail(
rko_fanout, err, "Failed to query partition leaders: %s",
err == RD_KAFKA_RESP_ERR__NOENT ? "No leaders found"
: rd_kafka_err2str(err));
rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
rd_true /*destroy*/);
return RD_KAFKA_OP_RES_HANDLED;
}
rko_fanout->rko_u.admin_request.fanout.outstanding =
rd_list_cnt(leaders);

rd_assert(rd_list_cnt(leaders) > 0);

/* For each leader send a request for its partitions */
RD_LIST_FOREACH(leader, leaders, i) {
rd_kafka_op_t *rko = rd_kafka_admin_request_op_new(
rk, RD_KAFKA_OP_LISTOFFSETS,
RD_KAFKA_EVENT_LISTOFFSETS_RESULT, &cbs,
&rko_fanout->rko_u.admin_request.options, rk->rk_ops);
rko->rko_u.admin_request.fanout_parent = rko_fanout;
rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid;

rd_kafka_topic_partition_list_sort_by_topic(leader->partitions);

rd_list_init(&rko->rko_u.admin_request.args, 1,
rd_kafka_topic_partition_list_destroy_free);
rd_list_add(
&rko->rko_u.admin_request.args,
rd_kafka_topic_partition_list_copy(leader->partitions));

/* Enqueue op for admin_worker() to transition to next state */
rd_kafka_q_enq(rk->rk_ops, rko);
}

return RD_KAFKA_OP_RES_HANDLED;
}

/**
* @brief Call when leaders have been queried to progress the DeleteRecords
* admin op to its next phase, sending DeleteRecords to partition
Expand Down Expand Up @@ -4987,8 +4821,7 @@ rd_kafka_resp_err_t rd_kafka_DescribeUserScramCredentialsRequest(rd_kafka_broker
}

rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);


return RD_KAFKA_RESP_ERR_NO_ERROR;
}

void rd_kafka_scram_credential_destroy(rd_kafka_scram_credential_t *scram_credential){
Expand Down Expand Up @@ -5155,8 +4988,7 @@ rd_kafka_resp_err_t rd_kafka_AlterUserScramCredentialsRequest(rd_kafka_broker_t
}

rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);


return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t rd_kafka_AlterUserScramCredentialsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_op_t **rko_resultp,
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) {
return "SaslOAuthBearerTokenRefresh";
case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT:
return "DescribeUserScramCredentials";
case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT:
return "AlterUserScramCredentials";
default:
return "?unknown?";
}
Expand Down
29 changes: 0 additions & 29 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2477,29 +2477,6 @@ void rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t *rktpar,
*partition = rktp->rktp_partition;
}

rd_kafka_list_offset_t *rd_kafka_list_offset_list_add(rd_kafka_list_offset_list_t *offsetlist){
int count = offsetlist->count;
offsetlist->count++;
return &offsetlist->offsets[count];
}

rd_kafka_list_offset_t *rd_kafka_list_offset_new(){
rd_kafka_list_offset_t *offset ;
offset = rd_calloc(1,sizeof(*offset));

return offset;
}


rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size){
rd_kafka_list_offset_list_t *list_offset_list;
list_offset_list = rd_calloc(1,sizeof(*list_offset_list));
list_offset_list->size = size;
list_offset_list->count = 0;
list_offset_list->offsets = rd_calloc(size,sizeof(list_offset_list->offsets));
return list_offset_list;
}


/**
*
Expand Down Expand Up @@ -2652,12 +2629,6 @@ void rd_kafka_topic_partition_list_destroy_free(void *ptr) {
(rd_kafka_topic_partition_list_t *)ptr);
}

rd_kafka_list_offset_t *
rd_kafka_list_offset_list_add(rd_kafka_list_offset_list_t *offsets){
int count = offsets->count;
offsets->count++;
return &(offsets->offsets[count-1]);
}
/**
* Add a partition to an rktpar list.
* The list must have enough room to fit it.
Expand Down
Loading

0 comments on commit 4ee6965

Please sign in to comment.