Skip to content

Commit

Permalink
Improve reconnect delay handling (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 16, 2017
1 parent 2af8321 commit ee511d4
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
26 changes: 12 additions & 14 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,6 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
"broker in state %s connecting",
rd_kafka_broker_state_names[rkb->rkb_state]);

rkb->rkb_ts_connect = rd_clock();

if (rd_kafka_broker_resolve(rkb) == -1)
return -1;

Expand Down Expand Up @@ -4668,20 +4666,19 @@ static int rd_kafka_broker_thread_main (void *arg) {
* thundering horde of reconnecting clients after
* a broker / network outage. Issue #403 */
if (rkb->rkb_rk->rk_conf.reconnect_jitter_ms &&
rkb->rkb_ts_connect &&
(backoff = rd_clock() - (
rkb->rkb_ts_connect +
(rd_jitter((int)(rkb->rkb_rk->rk_conf.
reconnect_jitter_ms*0.5),
(int)(rkb->rkb_rk->rk_conf.
reconnect_jitter_ms*1.5))
* 1000))) < 0) {
backoff = RD_MAX(-backoff / 1000, 100);
(backoff =
rd_interval_immediate(
&rkb->rkb_connect_intvl,
rd_jitter(rkb->rkb_rk->rk_conf.
reconnect_jitter_ms*500,
rkb->rkb_rk->rk_conf.
reconnect_jitter_ms*1500),
0)) <= 0) {
backoff = -backoff/1000;
rd_rkb_dbg(rkb, BROKER, "RECONNECT",
"Delaying next reconnect by %dms",
(int)backoff);
rd_kafka_broker_ua_idle(rkb, (int)backoff);
rkb->rkb_ts_connect = 0;
continue;
}

Expand All @@ -4692,7 +4689,7 @@ static int rd_kafka_broker_thread_main (void *arg) {
* resolving failed.
* Try the next resolve result until we've
* tried them all, in which case we sleep a
* short while to avoid the busy looping. */
* short while to avoid busy looping. */
if (!rkb->rkb_rsal ||
rkb->rkb_rsal->rsal_cnt == 0 ||
rkb->rkb_rsal->rsal_curr + 1 ==
Expand All @@ -4712,7 +4709,7 @@ static int rd_kafka_broker_thread_main (void *arg) {
/* Connect failure.
* Try the next resolve result until we've
* tried them all, in which case we sleep a
* short while to avoid the busy looping. */
* short while to avoid busy looping. */
if (!rkb->rkb_rsal ||
rkb->rkb_rsal->rsal_cnt == 0 ||
rkb->rkb_rsal->rsal_curr + 1 ==
Expand Down Expand Up @@ -4893,6 +4890,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rd_kafka_bufq_init(&rkb->rkb_waitresps);
rd_kafka_bufq_init(&rkb->rkb_retrybufs);
rkb->rkb_ops = rd_kafka_q_new(rk);
rd_interval_init(&rkb->rkb_connect_intvl);
rd_avg_init(&rkb->rkb_avg_int_latency, RD_AVG_GAUGE);
rd_avg_init(&rkb->rkb_avg_rtt, RD_AVG_GAUGE);
rd_avg_init(&rkb->rkb_avg_throttle, RD_AVG_GAUGE);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
* this is rkb_wakeup_fd[1]
* if enabled. */
rd_ts_t rkb_ts_connect; /* Last connection attempt */
rd_interval_t rkb_connect_intvl; /* Reconnect throttling */

rd_kafka_secproto_t rkb_proto;

Expand Down

0 comments on commit ee511d4

Please sign in to comment.