Skip to content

Commit

Permalink
Propagate errors when subscribing to unavailable topics (#1540)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Apr 15, 2020
1 parent 70d712d commit b50a123
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 21 deletions.
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ rebalance_cb | C | |
offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: see dedicated API*
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
allow.auto.create.topics | C | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuraiton to take effect. Note: The default value (false) is different from the Java consumer (true). Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
Expand Down
34 changes: 31 additions & 3 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ librdkafka also provides a native C++ interface.
- [Consumer groups](#consumer-groups)
- [Static consumer groups](#static-consumer-groups)
- [Topics](#topics)
- [Unknown or unauthorized topics](#unknown-or-unauthorized-topics)
- [Topic auto creation](#topic-auto-creation)
- [Metadata](#metadata)
- [< 0.9.3](#-093)
Expand Down Expand Up @@ -1434,10 +1435,37 @@ To read more about static group membership, see [KIP-345](https://cwiki.apache.o

### Topics

#### Unknown or unauthorized topics

If a consumer application subscribes to non-existent or unauthorized topics
a consumer error will be propagated for each unavailable topic with the
error code set to either `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` or a
broker-specific error code, such as
`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`.

As the topic metadata is refreshed every `topic.metadata.refresh.interval.ms`
the unavailable topics are re-checked for availability, but the same error
will not be raised again for the same topic.


#### Topic auto creation

Topic auto creation is supported by librdkafka.
The broker needs to be configured with `auto.create.topics.enable=true`.
Topic auto creation is supported by librdkafka, if a non-existent topic is
referenced by the client (by produce to, or consuming from, the topic, etc)
the broker will automatically create the topic (with default partition counts
and replication factor) if the broker configuration property
`auto.create.topics.enable=true` is set.

While topic auto creation is useful for producer applications, it is not
particularily valuable for consumer applications since even if the topic
to consume is auto created there is nothing writing messages to the topic.
To avoid consumers automatically creating topics the
`allow.auto.create.topics` consumer configuration property is set to
`false` by default, preventing the consumer to trigger automatic topic
creation on the broker. This requires broker version v0.11.0.0 or later.
The `allow.auto.create.topics` property may be set to `true` to allow
auto topic creation, which also requires `auto.create.topics.enable=true` to
be configured on the broker.



Expand Down Expand Up @@ -1718,7 +1746,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-357 - AdminAPI: list ACLs per principal | 2.1.0 | Not supported |
| KIP-359 - Producer: use EpochLeaderId | 2.4.0 | Not supported |
| KIP-360 - Improve handling of unknown Idempotent Producer | 2.4.0 | Not supported |
| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Not supported |
| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Supported |
| KIP-368 - SASL period reauth | 2.2.0 | Not supported |
| KIP-369 - Always roundRobin partitioner | 2.4.0 | Not supported |
| KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) |
Expand Down
69 changes: 67 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
rd_list_destroy(&rkcg->rkcg_toppars);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
rd_free(rkcg);
}

Expand Down Expand Up @@ -246,6 +247,8 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
rd_interval_init(&rkcg->rkcg_join_intvl);
rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);

rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0);

/* Create a logical group coordinator broker to provide
* a dedicated connection for group coordination.
* This is needed since JoinGroup may block for up to
Expand Down Expand Up @@ -3471,6 +3474,53 @@ void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){



/**
* @brief Generate consumer errors for each topic in the list.
*
* Also replaces the list of last reported topic errors so that repeated
* errors are silenced.
*
* @param errored Errored topics.
* @param error_prefix Error message prefix.
*
* @remark Assumes ownership of \p errored.
*/
static void
rd_kafka_propagate_consumer_topic_errors (
rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *errored,
const char *error_prefix) {
int i;

for (i = 0 ; i < errored->cnt ; i++) {
rd_kafka_topic_partition_t *topic = &errored->elems[i];
rd_kafka_topic_partition_t *prev;

rd_assert(topic->err);

/* Check if this topic errored previously */
prev = rd_kafka_topic_partition_list_find(
rkcg->rkcg_errored_topics, topic->topic,
RD_KAFKA_PARTITION_UA);

if (prev && prev->err == topic->err)
continue; /* This topic already reported same error */

/* Send consumer error to application */
rd_kafka_q_op_err(
rkcg->rkcg_q, RD_KAFKA_OP_CONSUMER_ERR,
topic->err, 0,
rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk,
topic),
RD_KAFKA_OFFSET_INVALID,
"%s: %s: %s",
error_prefix, topic->topic,
rd_kafka_err2str(topic->err));
}

rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
rkcg->rkcg_errored_topics = errored;
}


/**
* @brief Check if the latest metadata affects the current subscription:
Expand All @@ -3483,12 +3533,18 @@ void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){
*/
void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) {
rd_list_t *tinfos;
rd_kafka_topic_partition_list_t *errored;

rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));

if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;

/*
* Unmatched topics will be added to the errored list.
*/
errored = rd_kafka_topic_partition_list_new(0);

/*
* Create a list of the topics in metadata that matches our subscription
*/
Expand All @@ -3497,12 +3553,21 @@ void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) {

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
tinfos, rkcg->rkcg_subscription);
tinfos, rkcg->rkcg_subscription,
errored);
else
rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
tinfos,
rkcg->rkcg_subscription);
rkcg->rkcg_subscription,
errored);


/*
* Propagate consumer errors for any non-existent or errored topics.
* The function takes ownership of errored.
*/
rd_kafka_propagate_consumer_topic_errors(
rkcg, errored, "Subscribed topic not available");

/*
* Update (takes ownership of \c tinfos)
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ typedef struct rd_kafka_cgrp_s {
* Will be updated when the
* coordinator changes. */

/* Current subscription */
/** Current subscription */
rd_kafka_topic_partition_list_t *rkcg_subscription;
/* The actual topics subscribed (after metadata+wildcard matching) */
/** The actual topics subscribed (after metadata+wildcard matching) */
rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
/** Subscribed topics that are errored/not available. */
rd_kafka_topic_partition_list_t *rkcg_errored_topics;

/* Current assignment */
rd_kafka_topic_partition_list_t *rkcg_assignment;
Expand Down
16 changes: 14 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,18 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"on-disk corruption to the messages occurred. This check comes "
"at slightly increased CPU usage.",
0, 1, 0 },
{ _RK_GLOBAL|_RK_CONSUMER, "allow.auto.create.topics", _RK_C_BOOL,
_RK(allow_auto_create_topics),
"Allow automatic topic creation on the broker when subscribing to "
"or assigning non-existent topics. "
"The broker must also be configured with "
"`auto.create.topics.enable=true` for this configuraiton to "
"take effect. "
"Note: The default value (false) is different from the "
"Java consumer (true). "
"Requires broker version >= 0.11.0.0, for older broker versions "
"only the broker configuration applies.",
0, 1, 0 },
{ _RK_GLOBAL, "client.rack", _RK_C_KSTR,
_RK(client_rack),
"A rack identifier for this client. This can be any string value "
Expand Down Expand Up @@ -1473,8 +1485,8 @@ rd_kafka_conf_prop_find (int scope, const char *name) {
* @returns rd_true if property has been set/modified, else rd_false.
* If \p name is unknown 0 is returned.
*/
static rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf,
const char *name) {
rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf,
const char *name) {
const struct rd_kafka_property *prop;

if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name)))
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ struct rd_kafka_conf_s {
int fetch_error_backoff_ms;
char *group_id_str;
char *group_instance_id;
int allow_auto_create_topics;

rd_kafka_pattern_list_t *topic_blacklist;
struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
Expand Down Expand Up @@ -553,6 +554,9 @@ struct rd_kafka_topic_conf_s {

void rd_kafka_anyconf_destroy (int scope, void *conf);

rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf,
const char *name);

void rd_kafka_desensitize_str (char *str);

void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf);
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ struct rd_kafka_s {
* but no more often than every 10s.
* No locks: only accessed by rdkafka main thread. */
rd_interval_t broker_metadata_refresh;

/**< Suppression for allow.auto.create.topics=false not being
* supported by the broker. */
rd_interval_t allow_auto_create_topics;
} rk_suppress;

struct {
Expand Down
Loading

0 comments on commit b50a123

Please sign in to comment.