From ee511d44fa2e1032e181e1093aeef2362fd05da2 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 16 Mar 2017 23:44:23 +0100 Subject: [PATCH] Improve reconnect delay handling (#1089) --- src/rdkafka_broker.c | 26 ++++++++++++-------------- src/rdkafka_broker.h | 2 +- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 9e90afa01e..b4cce7b6bc 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -1301,8 +1301,6 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) { "broker in state %s connecting", rd_kafka_broker_state_names[rkb->rkb_state]); - rkb->rkb_ts_connect = rd_clock(); - if (rd_kafka_broker_resolve(rkb) == -1) return -1; @@ -4668,20 +4666,19 @@ static int rd_kafka_broker_thread_main (void *arg) { * thundering horde of reconnecting clients after * a broker / network outage. Issue #403 */ if (rkb->rkb_rk->rk_conf.reconnect_jitter_ms && - rkb->rkb_ts_connect && - (backoff = rd_clock() - ( - rkb->rkb_ts_connect + - (rd_jitter((int)(rkb->rkb_rk->rk_conf. - reconnect_jitter_ms*0.5), - (int)(rkb->rkb_rk->rk_conf. - reconnect_jitter_ms*1.5)) - * 1000))) < 0) { - backoff = RD_MAX(-backoff / 1000, 100); + (backoff = + rd_interval_immediate( + &rkb->rkb_connect_intvl, + rd_jitter(rkb->rkb_rk->rk_conf. + reconnect_jitter_ms*500, + rkb->rkb_rk->rk_conf. + reconnect_jitter_ms*1500), + 0)) <= 0) { + backoff = -backoff/1000; rd_rkb_dbg(rkb, BROKER, "RECONNECT", "Delaying next reconnect by %dms", (int)backoff); rd_kafka_broker_ua_idle(rkb, (int)backoff); - rkb->rkb_ts_connect = 0; continue; } @@ -4692,7 +4689,7 @@ static int rd_kafka_broker_thread_main (void *arg) { * resolving failed. * Try the next resolve result until we've * tried them all, in which case we sleep a - * short while to avoid the busy looping. */ + * short while to avoid busy looping. */ if (!rkb->rkb_rsal || rkb->rkb_rsal->rsal_cnt == 0 || rkb->rkb_rsal->rsal_curr + 1 == @@ -4712,7 +4709,7 @@ static int rd_kafka_broker_thread_main (void *arg) { /* Connect failure. * Try the next resolve result until we've * tried them all, in which case we sleep a - * short while to avoid the busy looping. */ + * short while to avoid busy looping. */ if (!rkb->rkb_rsal || rkb->rkb_rsal->rsal_cnt == 0 || rkb->rkb_rsal->rsal_curr + 1 == @@ -4893,6 +4890,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, rd_kafka_bufq_init(&rkb->rkb_waitresps); rd_kafka_bufq_init(&rkb->rkb_retrybufs); rkb->rkb_ops = rd_kafka_q_new(rk); + rd_interval_init(&rkb->rkb_connect_intvl); rd_avg_init(&rkb->rkb_avg_int_latency, RD_AVG_GAUGE); rd_avg_init(&rkb->rkb_avg_rtt, RD_AVG_GAUGE); rd_avg_init(&rkb->rkb_avg_throttle, RD_AVG_GAUGE); diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index e6da35ca45..a71365735f 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -183,7 +183,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd, * this is rkb_wakeup_fd[1] * if enabled. */ - rd_ts_t rkb_ts_connect; /* Last connection attempt */ + rd_interval_t rkb_connect_intvl; /* Reconnect throttling */ rd_kafka_secproto_t rkb_proto;