diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 867107b970..d916ba5c93 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -114,6 +114,7 @@ rebalance_cb | C | | offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: see dedicated API* enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean* check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean* +allow.auto.create.topics | C | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuraiton to take effect. Note: The default value (false) is different from the Java consumer (true). Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. <br>*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string* transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string* transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer* diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 633f0a2973..f684be5fa1 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -70,6 +70,7 @@ librdkafka also provides a native C++ interface. - [Consumer groups](#consumer-groups) - [Static consumer groups](#static-consumer-groups) - [Topics](#topics) + - [Unknown or unauthorized topics](#unknown-or-unauthorized-topics) - [Topic auto creation](#topic-auto-creation) - [Metadata](#metadata) - [< 0.9.3](#-093) @@ -1434,10 +1435,37 @@ To read more about static group membership, see [KIP-345](https://cwiki.apache.o ### Topics +#### Unknown or unauthorized topics + +If a consumer application subscribes to non-existent or unauthorized topics +a consumer error will be propagated for each unavailable topic with the +error code set to either `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` or a +broker-specific error code, such as +`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`. + +As the topic metadata is refreshed every `topic.metadata.refresh.interval.ms` +the unavailable topics are re-checked for availability, but the same error +will not be raised again for the same topic. + + #### Topic auto creation -Topic auto creation is supported by librdkafka. -The broker needs to be configured with `auto.create.topics.enable=true`. +Topic auto creation is supported by librdkafka, if a non-existent topic is +referenced by the client (by produce to, or consuming from, the topic, etc) +the broker will automatically create the topic (with default partition counts +and replication factor) if the broker configuration property +`auto.create.topics.enable=true` is set. + +While topic auto creation is useful for producer applications, it is not +particularily valuable for consumer applications since even if the topic +to consume is auto created there is nothing writing messages to the topic. +To avoid consumers automatically creating topics the +`allow.auto.create.topics` consumer configuration property is set to +`false` by default, preventing the consumer to trigger automatic topic +creation on the broker. This requires broker version v0.11.0.0 or later. +The `allow.auto.create.topics` property may be set to `true` to allow +auto topic creation, which also requires `auto.create.topics.enable=true` to +be configured on the broker. @@ -1718,7 +1746,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-357 - AdminAPI: list ACLs per principal | 2.1.0 | Not supported | | KIP-359 - Producer: use EpochLeaderId | 2.4.0 | Not supported | | KIP-360 - Improve handling of unknown Idempotent Producer | 2.4.0 | Not supported | -| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Not supported | +| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Supported | | KIP-368 - SASL period reauth | 2.2.0 | Not supported | | KIP-369 - Always roundRobin partitioner | 2.4.0 | Not supported | | KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) | diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 54aad9360a..54b4010278 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -189,6 +189,7 @@ void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) { rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars)); rd_list_destroy(&rkcg->rkcg_toppars); rd_list_destroy(rkcg->rkcg_subscribed_topics); + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); rd_free(rkcg); } @@ -246,6 +247,8 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk, rd_interval_init(&rkcg->rkcg_join_intvl); rd_interval_init(&rkcg->rkcg_timeout_scan_intvl); + rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0); + /* Create a logical group coordinator broker to provide * a dedicated connection for group coordination. * This is needed since JoinGroup may block for up to @@ -3471,6 +3474,53 @@ void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){ +/** + * @brief Generate consumer errors for each topic in the list. + * + * Also replaces the list of last reported topic errors so that repeated + * errors are silenced. + * + * @param errored Errored topics. + * @param error_prefix Error message prefix. + * + * @remark Assumes ownership of \p errored. + */ +static void +rd_kafka_propagate_consumer_topic_errors ( + rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *errored, + const char *error_prefix) { + int i; + + for (i = 0 ; i < errored->cnt ; i++) { + rd_kafka_topic_partition_t *topic = &errored->elems[i]; + rd_kafka_topic_partition_t *prev; + + rd_assert(topic->err); + + /* Check if this topic errored previously */ + prev = rd_kafka_topic_partition_list_find( + rkcg->rkcg_errored_topics, topic->topic, + RD_KAFKA_PARTITION_UA); + + if (prev && prev->err == topic->err) + continue; /* This topic already reported same error */ + + /* Send consumer error to application */ + rd_kafka_q_op_err( + rkcg->rkcg_q, RD_KAFKA_OP_CONSUMER_ERR, + topic->err, 0, + rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk, + topic), + RD_KAFKA_OFFSET_INVALID, + "%s: %s: %s", + error_prefix, topic->topic, + rd_kafka_err2str(topic->err)); + } + + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); + rkcg->rkcg_errored_topics = errored; +} + /** * @brief Check if the latest metadata affects the current subscription: @@ -3483,12 +3533,18 @@ void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){ */ void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) { rd_list_t *tinfos; + rd_kafka_topic_partition_list_t *errored; rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) return; + /* + * Unmatched topics will be added to the errored list. + */ + errored = rd_kafka_topic_partition_list_new(0); + /* * Create a list of the topics in metadata that matches our subscription */ @@ -3497,12 +3553,21 @@ void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) { if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) rd_kafka_metadata_topic_match(rkcg->rkcg_rk, - tinfos, rkcg->rkcg_subscription); + tinfos, rkcg->rkcg_subscription, + errored); else rd_kafka_metadata_topic_filter(rkcg->rkcg_rk, tinfos, - rkcg->rkcg_subscription); + rkcg->rkcg_subscription, + errored); + + /* + * Propagate consumer errors for any non-existent or errored topics. + * The function takes ownership of errored. + */ + rd_kafka_propagate_consumer_topic_errors( + rkcg, errored, "Subscribed topic not available"); /* * Update (takes ownership of \c tinfos) diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index a43a49305c..9d2abc8cd7 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -200,10 +200,12 @@ typedef struct rd_kafka_cgrp_s { * Will be updated when the * coordinator changes. */ - /* Current subscription */ + /** Current subscription */ rd_kafka_topic_partition_list_t *rkcg_subscription; - /* The actual topics subscribed (after metadata+wildcard matching) */ + /** The actual topics subscribed (after metadata+wildcard matching) */ rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */ + /** Subscribed topics that are errored/not available. */ + rd_kafka_topic_partition_list_t *rkcg_errored_topics; /* Current assignment */ rd_kafka_topic_partition_list_t *rkcg_assignment; diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 9929e33566..00cbe22c97 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1080,6 +1080,18 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "on-disk corruption to the messages occurred. This check comes " "at slightly increased CPU usage.", 0, 1, 0 }, + { _RK_GLOBAL|_RK_CONSUMER, "allow.auto.create.topics", _RK_C_BOOL, + _RK(allow_auto_create_topics), + "Allow automatic topic creation on the broker when subscribing to " + "or assigning non-existent topics. " + "The broker must also be configured with " + "`auto.create.topics.enable=true` for this configuraiton to " + "take effect. " + "Note: The default value (false) is different from the " + "Java consumer (true). " + "Requires broker version >= 0.11.0.0, for older broker versions " + "only the broker configuration applies.", + 0, 1, 0 }, { _RK_GLOBAL, "client.rack", _RK_C_KSTR, _RK(client_rack), "A rack identifier for this client. This can be any string value " @@ -1473,8 +1485,8 @@ rd_kafka_conf_prop_find (int scope, const char *name) { * @returns rd_true if property has been set/modified, else rd_false. * If \p name is unknown 0 is returned. */ -static rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf, - const char *name) { +rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf, + const char *name) { const struct rd_kafka_property *prop; if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name))) diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 531d13be97..bbb2c83fa6 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -329,6 +329,7 @@ struct rd_kafka_conf_s { int fetch_error_backoff_ms; char *group_id_str; char *group_instance_id; + int allow_auto_create_topics; rd_kafka_pattern_list_t *topic_blacklist; struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config @@ -553,6 +554,9 @@ struct rd_kafka_topic_conf_s { void rd_kafka_anyconf_destroy (int scope, void *conf); +rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf, + const char *name); + void rd_kafka_desensitize_str (char *str); void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf); diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 6fad199d54..364c87a9cb 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -566,6 +566,10 @@ struct rd_kafka_s { * but no more often than every 10s. * No locks: only accessed by rdkafka main thread. */ rd_interval_t broker_metadata_refresh; + + /**< Suppression for allow.auto.create.topics=false not being + * supported by the broker. */ + rd_interval_t allow_auto_create_topics; } rk_suppress; struct { diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 4ef8a38fb8..52eeda1444 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -257,6 +257,9 @@ rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, rkb->rkb_name, rkb_namelen); rd_kafka_broker_unlock(rkb); + if (ApiVersion >= 3) + rd_kafka_buf_read_throttle_time(rkbuf); + /* Read Brokers */ rd_kafka_buf_read_i32a(rkbuf, md->broker_cnt); if (md->broker_cnt > RD_KAFKAP_BROKERS_MAX) @@ -634,8 +637,12 @@ rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, /** * @brief Add all topics in current cached full metadata - * to \p tinfos (rd_kafka_topic_info_t *) * that matches the topics in \p match + * to \p tinfos (rd_kafka_topic_info_t *). + * + * @param errored Any topic or wildcard pattern that did not match + * an available topic will be added to this list with + * the appropriate error set. * * @returns the number of topics matched and added to \p list * @@ -644,11 +651,12 @@ rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, */ size_t rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, - const rd_kafka_topic_partition_list_t *match) { - int ti; + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored) { + int ti, i; size_t cnt = 0; const struct rd_kafka_metadata *metadata; - + rd_kafka_topic_partition_list_t *unmatched; rd_kafka_rdlock(rk); metadata = rk->rk_full_metadata; @@ -657,11 +665,18 @@ rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, return 0; } + /* To keep track of which patterns and topics in `match` that + * did not match any topic (or matched an errored topic), we + * create a set of all topics to match in `unmatched` and then + * remove from this set as a match is found. + * Whatever remains in `unmatched` after all matching is performed + * are the topics and patterns that did not match a topic. */ + unmatched = rd_kafka_topic_partition_list_copy(match); + /* For each topic in the cluster, scan through the match list * to find matching topic. */ for (ti = 0 ; ti < metadata->topic_cnt ; ti++) { const char *topic = metadata->topics[ti].topic; - int i; /* Ignore topics in blacklist */ if (rk->rk_conf.topic_blacklist && @@ -674,18 +689,42 @@ rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, match->elems[i].topic, topic)) continue; - if (metadata->topics[ti].err) + /* Remove from unmatched */ + rd_kafka_topic_partition_list_del( + unmatched, match->elems[i].topic, + RD_KAFKA_PARTITION_UA); + + if (metadata->topics[ti].err) { + rd_kafka_topic_partition_list_add( + errored, topic, + RD_KAFKA_PARTITION_UA)->err = + metadata->topics[ti].err; continue; /* Skip errored topics */ + } rd_list_add(tinfos, rd_kafka_topic_info_new( topic, metadata->topics[ti].partition_cnt)); + cnt++; } } rd_kafka_rdunlock(rk); + /* Any topics/patterns still in unmatched did not match any + * existing topics, add them to `errored`. */ + for (i = 0 ; i < unmatched->cnt ; i++) { + rd_kafka_topic_partition_t *elem = &unmatched->elems[i]; + + rd_kafka_topic_partition_list_add(errored, + elem->topic, + RD_KAFKA_PARTITION_UA)->err = + RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + } + + rd_kafka_topic_partition_list_destroy(unmatched); + return cnt; } @@ -695,12 +734,16 @@ rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, * @remark MUST NOT be used with wildcard topics, * see rd_kafka_metadata_topic_match() for that. * + * @param errored Non-existent and unauthorized topics are added to this + * list with the appropriate error code. + * * @returns the number of topics matched and added to \p tinfos * @locks none */ size_t rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, - const rd_kafka_topic_partition_list_t *match) { + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored) { int i; size_t cnt = 0; @@ -717,7 +760,16 @@ rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, 1/*valid*/); - if (mtopic && !mtopic->err) { + + if (!mtopic) + rd_kafka_topic_partition_list_add( + errored, topic, RD_KAFKA_PARTITION_UA)->err = + RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + else if (mtopic->err) + rd_kafka_topic_partition_list_add( + errored, topic, RD_KAFKA_PARTITION_UA)->err = + mtopic->err; + else { rd_list_add(tinfos, rd_kafka_topic_info_new( topic, mtopic->partition_cnt)); diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index a42f37b560..5e2a3435fc 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -41,10 +41,12 @@ rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size); size_t rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, - const rd_kafka_topic_partition_list_t *match); + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored); size_t rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, - const rd_kafka_topic_partition_list_t *match); + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored); void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac, const struct rd_kafka_metadata *md); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 68cd9b5419..dffa6a0e84 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1768,11 +1768,11 @@ rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb, ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Metadata, - 0, 2, + 0, 4, &features); rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1, - 4 + (50 * topic_cnt)); + 4 + (50 * topic_cnt) + 1); if (!reason) reason = ""; @@ -1850,6 +1850,29 @@ rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb, } + if (ApiVersion >= 4) { + /* AllowAutoTopicCreation (only used by consumer) */ + rd_kafka_buf_write_bool( + rkbuf, + rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER ? + rkb->rkb_rk->rk_conf.allow_auto_create_topics : + rd_true /*producer*/); + } else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER && + !rkb->rkb_rk->rk_conf.allow_auto_create_topics && + rd_kafka_conf_is_modified(&rkb->rkb_rk->rk_conf, + "allow.auto.create.topics") && + rd_interval(&rkb->rkb_rk->rk_suppress. + allow_auto_create_topics, + 30 * 60 * 1000 /* every 30 minutes */, 0) >= 0) { + /* Let user know we can't obey allow.auto.create.topics */ + rd_rkb_log(rkb, LOG_WARNING, "AUTOCREATE", + "allow.auto.create.topics=false not supported " + "by broker: requires broker version >= 0.11.0.0: " + "requested topic(s) may be auto created depending " + "on broker auto.create.topics.enable configuration"); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); /* Metadata requests are part of the important control plane diff --git a/tests/0033-regex_subscribe.c b/tests/0033-regex_subscribe.c index 3bcaaa3fa2..86643ad988 100644 --- a/tests/0033-regex_subscribe.c +++ b/tests/0033-regex_subscribe.c @@ -285,6 +285,7 @@ static int do_test (const char *assignor) { test_conf_set(conf, "partition.assignment.strategy", assignor); /* Speed up propagation of new topics */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); + test_conf_set(conf, "allow.auto.create.topics", "true"); /* Create a single consumer to handle all subscriptions. * Has the nice side affect of testing multiple subscriptions. */ diff --git a/tests/0067-empty_topic.cpp b/tests/0067-empty_topic.cpp index 5fdb148b28..d965e299d1 100644 --- a/tests/0067-empty_topic.cpp +++ b/tests/0067-empty_topic.cpp @@ -48,6 +48,7 @@ static void do_test_empty_topic_consumer () { Test::conf_init(&conf, NULL, 0); Test::conf_set(conf, "enable.partition.eof", "true"); + Test::conf_set(conf, "allow.auto.create.topics", "true"); /* Create simple consumer */ RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr); @@ -100,6 +101,7 @@ static void do_test_empty_topic_consumer () { Test::conf_set(conf, "group.id", topic); Test::conf_set(conf, "enable.partition.eof", "true"); + Test::conf_set(conf, "allow.auto.create.topics", "true"); RdKafka::KafkaConsumer *kconsumer = RdKafka::KafkaConsumer::create(conf, errstr); if (!kconsumer)