Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an assert being triggered when no metrics matched on the client side during send push telemetry call #4826

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
# librdkafka v2.5.3

librdkafka v2.5.3 is a feature release.

* Fix an assert being triggered during push telemetry call when no metrics matched on the client side. (#4826)

## Fixes

### Telemetry fixes

* Issue: #4833
Fix a regression introduced with [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) support in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
Happening since 2.5.0 (#4826).

*Note: there were no v2.5.1 and v2.5.2 librdkafka releases*


# librdkafka v2.5.0

> [!WARNING]
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section could be more tightly worded. I don't think it's a regression. It's a critical defect which is only exposed when a long list of conditions are met.

I wonder if the list of conditions could be worded the other way around. Essentially, the problem only occurs if all of these are true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section could be more tightly worded. I don't think it's a regression. It's a critical defect which is only exposed when a long list of conditions are met.

I didn't understand what exactly needs to be done here. Are we talking about the regression part here?

I wonder if the list of conditions could be worded the other way around. Essentially, the problem only occurs if all of these are true.

Do you mean that we specify the scenario in which the problem occurs instead of specifying the scenario in which the problem doesn't occur?

Is something like this better:

This version has introduced a critical defect which crashes the client by hitting an assert during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. This happens in a specific scenario in which all the following conditions are met:
1) Broker supports KIP-714
2) KIP-714 feature is enabled on the broker side
3) KIP-714 feature is enabled on the client side. Enabled by default.
4) Broker has some subscription configured the client
5) None of the subscription matched on the client side.
If any of the above condition doesn't match for your case, you are good to use this version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change these phrases:
4) Broker has a subscription assigned to the client
5) None of the subscription metrics matched on the client side.
any of the above conditions

>
> You won't face any problem if:
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
>
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.

librdkafka v2.5.0 is a feature release.

* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client)
Expand Down
2 changes: 1 addition & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x020500ff
#define RD_KAFKA_VERSION 0x020503ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x020500ff
#define RD_KAFKA_VERSION 0x020503ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
36 changes: 21 additions & 15 deletions src/rdkafka_telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,25 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
rd_bool_t terminating) {

rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used =
rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
if (metrics_payload) {
compression_used = rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
}
} else {
rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"No metrics to push. Sending empty payload.");
}

rd_kafka_dbg(rk, TELEMETRY, "PUSH",
Expand All @@ -369,7 +374,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry,
NULL);

rd_buf_destroy_free(metrics_payload);
if (metrics_payload)
rd_buf_destroy_free(metrics_payload);
if (compression_used != RD_KAFKA_COMPRESSION_NONE)
rd_free(compressed_metrics_payload);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_telemetry_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
RD_KAFKA_TELEMETRY_METRIC_INFO(rk);
size_t total_metrics_count = metrics_to_encode_count;
size_t i, metric_idx = 0;

if (!metrics_to_encode_count)
return NULL;

opentelemetry_proto_metrics_v1_MetricsData metrics_data =
opentelemetry_proto_metrics_v1_MetricsData_init_zero;

Expand Down
12 changes: 8 additions & 4 deletions tests/0150-telemetry_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ void do_test_telemetry_get_subscription_push_telemetry(void) {
* resent after the push interval until there are subscriptions.
* See `requests_expected` for detailed expected flow.
*/
void do_test_telemetry_empty_subscriptions_list(void) {
void do_test_telemetry_empty_subscriptions_list(char *subscription_regex) {
emasab marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_conf_t *conf;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
char *expected_metrics[] = {"*"};
char *expected_metrics[] = {subscription_regex};
rd_kafka_t *producer = NULL;
rd_kafka_mock_request_t **requests = NULL;
size_t request_cnt;
Expand Down Expand Up @@ -234,7 +234,7 @@ void do_test_telemetry_empty_subscriptions_list(void) {
};


SUB_TEST();
SUB_TEST("Test with subscription regex: %s", subscription_regex);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_telemetry_set_requested_metrics(mcluster, NULL, 0);
Expand Down Expand Up @@ -534,7 +534,11 @@ int main_0150_telemetry_mock(int argc, char **argv) {

do_test_telemetry_get_subscription_push_telemetry();

do_test_telemetry_empty_subscriptions_list();
// All metrics are subscribed
do_test_telemetry_empty_subscriptions_list("*");

// No metrics are subscribed
do_test_telemetry_empty_subscriptions_list("non-existent-metric");

do_test_telemetry_terminating_push();

Expand Down
2 changes: 1 addition & 1 deletion vcpkg.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "librdkafka",
"version": "2.5.0",
"version": "2.5.3",
"dependencies": [
{
"name": "zstd",
Expand Down