diff --git a/client.go b/client.go index e30e8d355..7ebbce8f2 100644 --- a/client.go +++ b/client.go @@ -497,7 +497,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) if len(retry) > 0 { if retriesRemaining <= 0 { Logger.Println("Some partitions are leaderless, but we're out of retries") - return nil + return err } Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining) @@ -591,13 +591,9 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { client.cachedPartitionsResults[topic.Name] = partitionCache } - if err != nil { - return nil, err - } - ret := make([]string, 0, len(toRetry)) for topic := range toRetry { ret = append(ret, topic) } - return ret, nil + return ret, err } diff --git a/client_test.go b/client_test.go index e778842df..a430e988b 100644 --- a/client_test.go +++ b/client_test.go @@ -199,6 +199,95 @@ func TestClientMetadata(t *testing.T) { safeClose(t, client) } +func TestClientReceivingUnknownTopic(t *testing.T) { + seedBroker := newMockBroker(t, 1) + + metadataResponse1 := new(MetadataResponse) + seedBroker.Returns(metadataResponse1) + + config := NewConfig() + config.Metadata.Retry.Max = 1 + config.Metadata.Retry.Backoff = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataUnknownTopic) + seedBroker.Returns(metadataUnknownTopic) + + if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition { + t.Error("ErrUnknownTopicOrPartition expected, got", err) + } + + // If we are asking for the leader of a partition of the non-existing topic. + // we will request metadata again. + seedBroker.Returns(metadataUnknownTopic) + seedBroker.Returns(metadataUnknownTopic) + + if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition { + t.Error("Expected ErrUnknownTopicOrPartition, got", err) + } + + safeClose(t, client) + seedBroker.Close() +} + +func TestClientReceivingPartialMetadata(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 5) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) + seedBroker.Returns(metadataResponse1) + + config := NewConfig() + config.Metadata.Retry.Max = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} + + metadataPartial := new(MetadataResponse) + metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) + metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError) + metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable) + seedBroker.Returns(metadataPartial) + + if err := client.RefreshMetadata("new_topic"); err != nil { + t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error") + } + + // Even though the metadata was incomplete, we should be able to get the leader of a partition + // for which we did get a useful response, without doing additional requests. + + partition0Leader, err := client.Leader("new_topic", 0) + if err != nil { + t.Error(err) + } else if partition0Leader.Addr() != leader.Addr() { + t.Error("Unexpected leader returned", partition0Leader.Addr()) + } + + // If we are asking for the leader of a partition that didn't have a leader before, + // we will do another metadata request. + + seedBroker.Returns(metadataPartial) + + // Still no leader for the partition, so asking for it should return an error. + _, err = client.Leader("new_topic", 1) + if err != ErrLeaderNotAvailable { + t.Error("Expected ErrLeaderNotAvailable, got", err) + } + + safeClose(t, client) + seedBroker.Close() + leader.Close() +} + func TestClientRefreshBehaviour(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 5) diff --git a/config.go b/config.go index 8ff16dd0a..5150b62e4 100644 --- a/config.go +++ b/config.go @@ -192,9 +192,9 @@ func (c *Config) Validate() error { switch { case c.Metadata.Retry.Max < 0: return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0") - case c.Metadata.Retry.Backoff <= time.Duration(0): - return ConfigurationError("Invalid Metadata.Retry.Backoff, must be > 0") - case c.Metadata.RefreshFrequency < time.Duration(0): + case c.Metadata.Retry.Backoff < 0: + return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0") + case c.Metadata.RefreshFrequency < 0: return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0") } diff --git a/functional_test.go b/functional_test.go index 590564323..1471a4614 100644 --- a/functional_test.go +++ b/functional_test.go @@ -2,6 +2,7 @@ package sarama import ( "fmt" + "log" "net" "os" "strings" @@ -36,6 +37,10 @@ func init() { } kafkaShouldBeAvailable = os.Getenv("CI") != "" + + if os.Getenv("DEBUG") == "true" { + Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + } } func checkKafkaAvailability(t *testing.T) {