From 572059111c6f476d2128fce69636aa64e3877e82 Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Thu, 9 Apr 2015 11:37:37 -0400 Subject: [PATCH] Improve sleep logic around ErrConsumerCoordinatorNotAvailable --- client.go | 3 ++- functional_client_test.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 084de7fff..f01fce9b4 100644 --- a/client.go +++ b/client.go @@ -661,7 +661,7 @@ func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining case ErrConsumerCoordinatorNotAvailable: Logger.Printf("client/coordinator Coordinator for consumer group %s not yet available, trying again in %dms...\n", consumerGroup, client.conf.Metadata.Retry.Backoff/time.Millisecond) - time.Sleep(client.conf.Metadata.Retry.Backoff) + time.Sleep(2 * time.Second) continue default: @@ -674,6 +674,7 @@ func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining if attemptsRemaining > 0 { Logger.Printf("client/coordinator Trying again to find coordinator for consumer group %s... (%d attempts remaining)\n", consumerGroup, attemptsRemaining) + time.Sleep(client.conf.Metadata.Retry.Backoff) client.resurrectDeadBrokers() return client.refreshCoordinator(consumerGroup, attemptsRemaining-1) } diff --git a/functional_client_test.go b/functional_client_test.go index b4a6457ac..83c05205c 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "testing" "time" ) @@ -61,21 +62,20 @@ func TestFuncClientCoordinator(t *testing.T) { checkKafkaVersion(t, "0.8.2") checkKafkaAvailability(t) - config := NewConfig() - config.Metadata.Retry.Max = 10 - config.Metadata.Retry.Backoff = 1 * time.Second - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(kafkaBrokers, nil) if err != nil { t.Fatal(err) } - broker, err := client.Coordinator("new_consumer_group") - if err != nil { - t.Error(err) - } + for i := 0; i < 10; i++ { + broker, err := client.Coordinator(fmt.Sprintf("another_new_consumer_group_%d", i)) + if err != nil { + t.Error(err) + } - if connected, err := broker.Connected(); !connected || err != nil { - t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr()) + if connected, err := broker.Connected(); !connected || err != nil { + t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr()) + } } safeClose(t, client)