diff --git a/src/rdkafka.h b/src/rdkafka.h index 2677ea251..07556cfdf 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -894,20 +894,6 @@ typedef struct rd_kafka_scram_credential_list_s { int size; rd_kafka_scram_credential_t *credentials; } rd_kafka_scram_credential_list_t; -/* ignore this one below*/ -typedef struct rd_kafka_describe_user_scram_result_s { - char *user; - int16_t errorcode; - char *err; - rd_kafka_scram_credential_list_t *credentiallist; -} rd_kafka_describe_user_scram_result_t; - -/* Ignore the one below*/ -typedef struct rd_kafka_user_scram_result_list_s { - int cnt; - int size; - rd_kafka_user_scram_result_t *userscramresults; -} rd_kafka_user_scram_result_list_t; /** * @brief Topic+Partition place holder @@ -956,32 +942,6 @@ typedef struct rd_kafka_topic_partition_list_s { rd_kafka_topic_partition_t *elems; /**< Element array[] */ } rd_kafka_topic_partition_list_t; -typedef enum rd_kafka_offset_spec_s{ - RD_KAFKA_LATEST_OFFSET_SPEC = 0, - RD_KAFKA_EARLIEST_OFFSET_SPEC = 1, - RD_KAFKA_TIMESTAMP_OFFSET_SPEC = 2 -} rd_kafka_offset_spec_t; - -typedef struct rd_kafka_list_offset_s{ - rd_kafka_topic_partition_t topicPartition; - rd_kafka_offset_spec_t offsetSpec; - int64_t timestamp; -} rd_kafka_list_offset_t; - -typedef struct rd_kafka_list_offset_list_s{ - int count; - int size; - rd_kafka_list_offset_t *offsets; -} rd_kafka_list_offset_list_t; - -RD_EXPORT -rd_kafka_list_offset_t *rd_kafka_list_offset_list_add(); - -RD_EXPORT -rd_kafka_list_offset_t *rd_kafka_list_offset_new(); - -RD_EXPORT -rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size); /** * @brief Create a new list/vector Topic+Partition container. * @@ -999,12 +959,6 @@ rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size); RD_EXPORT rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new(int size); - -RD_EXPORT -static void -rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t *rktpar, - int do_free); - /** * @brief Free all resources used by the list and the list itself. */ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 28ababd93..de3eed9df 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -3598,172 +3598,6 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req, return reply->rkbuf_err; } -rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size){ - - rd_kafka_list_offset_list_t *listoffsets; - listoffsets = rd_calloc(1,sizeof(*listoffsets)); - rd_kafka_list_offset_t *offset; - listoffsets->offsets = rd_calloc(size,sizeof(*offset)); - listoffsets->size = size; - listoffsets->cnt = 0; - return listoffsets; -} - -void rd_kafka_list_offset_list_add_copy(rd_kafka_list_offset_list_t *offsetlist,rd_kafka_list_offset_t *offset){ - - rd_kafka_list_offset_t *copiedoffset; - copiedoffset = &offsetlist->offsets[offsetlist->cnt++]; - copiedoffset->offsetSpec = offset->offsetSpec; - copiedoffset->timestamp = offset->timestamp; - copiedoffset->topicPartition.topic = rd_strdup(offset->topicPartition.topic); - copiedoffset->topicPartition.partition = offset->topicPartition.partition; - copiedoffset->topicPartition.offset = offset->topicPartition.offset; - copiedoffset->topicPartition.err = offset->topicPartition.err; - copiedoffset->topicPartition.metadata_size = offset->topicPartition.metadata_size; - copiedoffset->topicPartition.metadata = NULL; - if(copiedoffset->topicPartition.metadata_size > 0){ - copiedoffset->topicPartition.metadata = rd_strdup(offset->topicPartition.metadata); - } - -} -rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_copy(rd_kafka_list_offset_list_t *offsetlist){ - rd_kafka_list_offset_list_t *offsets; - offsets = rd_kafka_list_offset_list_new(offsetlist->size); - int i; - for(i=0;icnt;i++){ - rd_kafka_list_offset_list_add_copy(offsets,&offsetlist->offsets[i]); - } - return offsets; -} -void rd_kafka_list_offset_destroy(rd_kafka_list_offset_t *offset,int8_t do_free){ - rd_kafka_topic_partition_destroy(&offset->topicPartition); - if(do_free) - rd_free(offset); - -} -void rd_kafka_list_offset_list_destroy(rd_kafka_list_offset_list_t *offsetlist){ - int i; - for(i=0;icnt;i++){ - rd_kafka_list_offset_destroy(&offsetlist->offsets[i],0); - } - if(offsetlist->offsets) - rd_free(offsetlist->offsets); - - rd_free(offsetlist); -} - -static void -rd_kafka_ListOffsetsResponse_merge(rd_kafka_op_t *rko_fanout, - const rd_kafka_op_t *rko_partial){ - rd_kafka_list_offset_list_t *offsets = rd_list_elem(&rko_partial->rko_u.admin_result.results,0); - int i; - - for(i=0;icount;i++){ - rd_kafka_list_offset_t *offset = rd_kafka_list_offset_list_add(rko_fanout->rko_u.admin_result.results); - offset->timestamp = offsets->offsets[i].timestamp; - offset->topicPartition.partition = offsets->offsets[i].topicPartition.partition; - offset->topicPartition.topic = strdup(offsets->offsets[i].topicPartition.topic); - offset->topicPartition.offset = offsets->offsets[i].topicPartition.offset; - } - - -} -static rd_kafka_resp_err_t -rd_kafka_ListOffsetsResponse_parse(rd_kafka_op_t *rko_req, - rd_kafka_op_t **rko_resultp, - rd_kafka_buf_t *reply, - char *errstr, - size_t errstr_size){ - - rd_kafka_topic_partition_list_t *requested_offsets = rd_list_elem(&rko_req->rko_u.admin_request.args,0); - rd_kafka_list_offset_list_t *offsets = rd_kafka_list_offset_list_new(requested_offsets->count); - - rd_kafka_parse_ListOffsets(reply,offsets); - - - - - -} - - -static rd_kafka_op_res_t -rd_kafka_ListOffsets_leaders_queried_cb(rd_kafka_t *rk, - rd_kafka_q_t *rkq, - rd_kafka_op_t *reply){ - - rd_kafka_resp_err_t err = reply->rko_err; - const rd_list_t *leaders = - reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */ - rd_kafka_topic_partition_list_t *partitions = - reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */ - rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque; - - static const struct rd_kafka_admin_worker_cbs cbs = { - rd_kafka_ListOffsetsRequest, - rd_kafka_ListOffsetsResponse_parse, - }; - - rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) == - RD_KAFKA_OP_ADMIN_FANOUT); - - if (err == RD_KAFKA_RESP_ERR__DESTROY) - goto err; - - rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1, - rd_kafka_list_offset_result_t_destroy_free); - rd_kafka_list_offset_list_t *response_list = rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results,0); - - RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { - rd_kafka_topic_partition_t *rktpar2; - - if (!rktpar->err) - continue; - - rd_kafka_list_offset_t *response = rd_kafka_list_offset_list_add(response_list); - response->topicPartition.topic = strdup(rktpar->topic); - response->topicPartititon.partition = rktpar->partition; - response->topicPartition.err = strdup(rktpar2->err); - } - if (err) { - err: - rd_kafka_admin_result_fail( - rko_fanout, err, "Failed to query partition leaders: %s", - err == RD_KAFKA_RESP_ERR__NOENT ? "No leaders found" - : rd_kafka_err2str(err)); - rd_kafka_admin_common_worker_destroy(rk, rko_fanout, - rd_true /*destroy*/); - return RD_KAFKA_OP_RES_HANDLED; - } - rko_fanout->rko_u.admin_request.fanout.outstanding = - rd_list_cnt(leaders); - - rd_assert(rd_list_cnt(leaders) > 0); - - /* For each leader send a request for its partitions */ - RD_LIST_FOREACH(leader, leaders, i) { - rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( - rk, RD_KAFKA_OP_LISTOFFSETS, - RD_KAFKA_EVENT_LISTOFFSETS_RESULT, &cbs, - &rko_fanout->rko_u.admin_request.options, rk->rk_ops); - rko->rko_u.admin_request.fanout_parent = rko_fanout; - rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid; - - rd_kafka_topic_partition_list_sort_by_topic(leader->partitions); - - rd_list_init(&rko->rko_u.admin_request.args, 1, - rd_kafka_topic_partition_list_destroy_free); - rd_list_add( - &rko->rko_u.admin_request.args, - rd_kafka_topic_partition_list_copy(leader->partitions)); - - /* Enqueue op for admin_worker() to transition to next state */ - rd_kafka_q_enq(rk->rk_ops, rko); - } - - return RD_KAFKA_OP_RES_HANDLED; -} - /** * @brief Call when leaders have been queried to progress the DeleteRecords * admin op to its next phase, sending DeleteRecords to partition @@ -4987,8 +4821,7 @@ rd_kafka_resp_err_t rd_kafka_DescribeUserScramCredentialsRequest(rd_kafka_broker } rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - + return RD_KAFKA_RESP_ERR_NO_ERROR; } void rd_kafka_scram_credential_destroy(rd_kafka_scram_credential_t *scram_credential){ @@ -5155,8 +4988,7 @@ rd_kafka_resp_err_t rd_kafka_AlterUserScramCredentialsRequest(rd_kafka_broker_t } rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - + return RD_KAFKA_RESP_ERR_NO_ERROR; } rd_kafka_resp_err_t rd_kafka_AlterUserScramCredentialsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 6fa51518b..846203c2c 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -86,6 +86,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "SaslOAuthBearerTokenRefresh"; case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: return "DescribeUserScramCredentials"; + case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: + return "AlterUserScramCredentials"; default: return "?unknown?"; } diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 306164936..38acdde77 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2477,29 +2477,6 @@ void rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t *rktpar, *partition = rktp->rktp_partition; } -rd_kafka_list_offset_t *rd_kafka_list_offset_list_add(rd_kafka_list_offset_list_t *offsetlist){ - int count = offsetlist->count; - offsetlist->count++; - return &offsetlist->offsets[count]; -} - -rd_kafka_list_offset_t *rd_kafka_list_offset_new(){ - rd_kafka_list_offset_t *offset ; - offset = rd_calloc(1,sizeof(*offset)); - - return offset; -} - - -rd_kafka_list_offset_list_t *rd_kafka_list_offset_list_new(int size){ - rd_kafka_list_offset_list_t *list_offset_list; - list_offset_list = rd_calloc(1,sizeof(*list_offset_list)); - list_offset_list->size = size; - list_offset_list->count = 0; - list_offset_list->offsets = rd_calloc(size,sizeof(list_offset_list->offsets)); - return list_offset_list; -} - /** * @@ -2652,12 +2629,6 @@ void rd_kafka_topic_partition_list_destroy_free(void *ptr) { (rd_kafka_topic_partition_list_t *)ptr); } -rd_kafka_list_offset_t * -rd_kafka_list_offset_list_add(rd_kafka_list_offset_list_t *offsets){ - int count = offsets->count; - offsets->count++; - return &(offsets->offsets[count-1]); -} /** * Add a partition to an rktpar list. * The list must have enough room to fit it. diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 2ec215b7d..366441967 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -412,289 +412,6 @@ rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } - - -/** - * @brief Parses a ListOffsets reply. - * - * Returns the parsed offsets (and errors) in \p offsets which must have been - * initialized by caller. - * - * @returns 0 on success, else an error (\p offsets may be completely or - * partially updated, depending on the nature of the error, and per - * partition error codes should be checked by the caller). - */ -static rd_kafka_resp_err_t -rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, - rd_kafka_list_offset_list_t *offsets) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int16_t api_version; - rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR; - - api_version = rkbuf->rkbuf_reqhdr.ApiVersion; - - if (api_version >= 2) - rd_kafka_buf_read_throttle_time(rkbuf); - - /* NOTE: - * Broker may return offsets in a different constellation than - * in the original request .*/ - - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); - while (TopicArrayCnt-- > 0) { - rd_kafkap_str_t ktopic; - int32_t PartArrayCnt; - char *topic_name; - - rd_kafka_buf_read_str(rkbuf, &ktopic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); - - RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); - - while (PartArrayCnt-- > 0) { - int32_t kpartition; - int16_t ErrorCode; - int32_t OffsetArrayCnt; - int64_t Offset = -1; - int64_t Timestamp = -1; - rd_kafka_list_offset_t *list_offset; - - rd_kafka_buf_read_i32(rkbuf, &kpartition); - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - - if (api_version >= 1) { - rd_kafka_buf_read_i64(rkbuf, &Timestamp); - rd_kafka_buf_read_i64(rkbuf, &Offset); - } else if (api_version == 0) { - rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); - /* We only request one offset so just grab - * the first one. */ - while (OffsetArrayCnt-- > 0) - rd_kafka_buf_read_i64(rkbuf, &Offset); - } else { - rd_kafka_assert(NULL, !*"NOTREACHED"); - } - - list_offset = rd_kafka_list_offset_list_add( - offsets); - list_offset->timestamp = Timestamp; - list_offset->topicPartition.partition = kpartition; - list_offset->topicPartition.topic = strdup(topic_name); - list_offset->topicPartition.err = ErrorCode; - list_offset->topicPartition.offset = Offset; - - if (ErrorCode && !all_err) - all_err = ErrorCode; - } - } - - return all_err; - -err_parse: - return rkbuf->rkbuf_err; -} - - - -/** - * @brief Parses and handles ListOffsets replies. - * - * Returns the parsed offsets (and errors) in \p offsets. - * \p offsets must be initialized by the caller. - * - * @returns 0 on success, else an error. \p offsets may be populated on error, - * depending on the nature of the error. - * On error \p actionsp (unless NULL) is updated with the recommended - * error actions. - */ -rd_kafka_resp_err_t -rd_kafka_handle_ListOffsets(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_list_offset_list_t *offsets, - int *actionsp) { - - int actions; - - if (!err) - err = rd_kafka_parse_ListOffsets(rkbuf, offsets); - if (!err) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - actions = rd_kafka_err_action( - rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, - - RD_KAFKA_ERR_ACTION_END); - - if (actionsp) - *actionsp = actions; - - if (rkb) - rd_rkb_dbg( - rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)", - rd_kafka_err2str(err), rd_kafka_actions2str(actions)); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - char tmp[256]; - /* Re-query for leader */ - rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s", - rd_kafka_err2str(err)); - rd_kafka_metadata_refresh_known_topics(rk, NULL, - rd_true /*force*/, tmp); - } - - if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && - rd_kafka_buf_retry(rkb, request)) - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - - return err; -} - - - -/** - * @brief Async maker for ListOffsetsRequest. - */ -static rd_kafka_resp_err_t -rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *rkbuf, - void *make_opaque) { - const rd_kafka_topic_partition_list_t *partitions = - (const rd_kafka_topic_partition_list_t *)make_opaque; - int i; - size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; - const char *last_topic = ""; - int32_t topic_cnt = 0, part_cnt = 0; - int16_t ApiVersion; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_ListOffsets, 0, 2, NULL); - if (ApiVersion == -1) - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - - /* ReplicaId */ - rd_kafka_buf_write_i32(rkbuf, -1); - - /* IsolationLevel */ - if (ApiVersion >= 2) - rd_kafka_buf_write_i8(rkbuf, - rkb->rkb_rk->rk_conf.isolation_level); - - /* TopicArrayCnt */ - of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ - - for (i = 0; i < partitions->cnt; i++) { - const rd_kafka_topic_partition_t *rktpar = - &partitions->elems[i]; - - if (strcmp(rktpar->topic, last_topic)) { - /* Finish last topic, if any. */ - if (of_PartArrayCnt > 0) - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, - part_cnt); - - /* Topic */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - topic_cnt++; - last_topic = rktpar->topic; - /* New topic so reset partition count */ - part_cnt = 0; - - /* PartitionArrayCnt: updated later */ - of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - part_cnt++; - - /* Time/Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - - if (ApiVersion == 0) { - /* MaxNumberOfOffsets */ - rd_kafka_buf_write_i32(rkbuf, 1); - } - } - - if (of_PartArrayCnt > 0) { - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_rkb_dbg(rkb, TOPIC, "OFFSET", - "ListOffsetsRequest (v%hd, opv %d) " - "for %" PRId32 " topic(s) and %" PRId32 " partition(s)", - ApiVersion, rkbuf->rkbuf_replyq.version, topic_cnt, - partitions->cnt); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Send ListOffsetsRequest for partitions in \p partitions. - */ -void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb, - rd_kafka_topic_partition_list_t *partitions, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - rd_kafka_topic_partition_list_t *make_parts; - - make_parts = rd_kafka_topic_partition_list_copy(partitions); - rd_kafka_topic_partition_list_sort_by_topic(make_parts); - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_ListOffsets, 1, - /* ReplicaId + IsolationLevel + topicArrayCount + topicArrayCount*(topicName + partition_count*(partitionIndex + Timestamp + MaxnumOffsets))*/ - /*topicArrayCount*partition_count evaluates to total topicPartitions*/ - /*ReplicaId + IsolationLevel + topicArrayCount + topicArrayCount*topic + total_partitions*(partitionIndex+Timestamp+maxnumOffsets)*/ - - /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */ - 4 + 1 + 4 + 100 + - /* PartArrayCnt */ - 4 + - /* partition_cnt * Partition+Time+MaxNumOffs */ - (make_parts->cnt * (4 + 8 + 4))); - - /* Postpone creating the request contents until time to send, - * at which time the ApiVersion is known. */ - rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest, - make_parts, - rd_kafka_topic_partition_list_destroy_free); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - /** * Generic handler for OffsetFetch responses. * Offsets for included partitions will be propagated through the passed diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index b7ffdaebe..7bfc97a5d 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -384,9 +384,6 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, int unittest_request(void); -rd_kafka_resp_err_t -rd_kafka_parse_ListOffsets(rd_kafka_buf_t *reply,rd_kafka_list_offset_list_t *offsets); - rd_kafka_resp_err_t rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb, /*(rd_topic_partition_list_t*)*/