Skip to content

Commit

Permalink
Merge pull request #376 from Shopify/retry-over-error
Browse files Browse the repository at this point in the history
client: given both retries and errors, prefer retries
  • Loading branch information
eapache committed Mar 20, 2015
2 parents 58fb767 + b946091 commit 6743438
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 9 deletions.
8 changes: 2 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
if len(retry) > 0 {
if retriesRemaining <= 0 {
Logger.Println("Some partitions are leaderless, but we're out of retries")
return nil
return err
}
Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
Expand Down Expand Up @@ -591,13 +591,9 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.cachedPartitionsResults[topic.Name] = partitionCache
}

if err != nil {
return nil, err
}

ret := make([]string, 0, len(toRetry))
for topic := range toRetry {
ret = append(ret, topic)
}
return ret, nil
return ret, err
}
89 changes: 89 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,95 @@ func TestClientMetadata(t *testing.T) {
safeClose(t, client)
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataUnknownTopic := new(MetadataResponse)
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataUnknownTopic)
seedBroker.Returns(metadataUnknownTopic)

if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
t.Error("ErrUnknownTopicOrPartition expected, got", err)
}

// If we are asking for the leader of a partition of the non-existing topic.
// we will request metadata again.
seedBroker.Returns(metadataUnknownTopic)
seedBroker.Returns(metadataUnknownTopic)

if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

safeClose(t, client)
seedBroker.Close()
}

func TestClientReceivingPartialMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}

metadataPartial := new(MetadataResponse)
metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
seedBroker.Returns(metadataPartial)

if err := client.RefreshMetadata("new_topic"); err != nil {
t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
}

// Even though the metadata was incomplete, we should be able to get the leader of a partition
// for which we did get a useful response, without doing additional requests.

partition0Leader, err := client.Leader("new_topic", 0)
if err != nil {
t.Error(err)
} else if partition0Leader.Addr() != leader.Addr() {
t.Error("Unexpected leader returned", partition0Leader.Addr())
}

// If we are asking for the leader of a partition that didn't have a leader before,
// we will do another metadata request.

seedBroker.Returns(metadataPartial)

// Still no leader for the partition, so asking for it should return an error.
_, err = client.Leader("new_topic", 1)
if err != ErrLeaderNotAvailable {
t.Error("Expected ErrLeaderNotAvailable, got", err)
}

safeClose(t, client)
seedBroker.Close()
leader.Close()
}

func TestClientRefreshBehaviour(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ func (c *Config) Validate() error {
switch {
case c.Metadata.Retry.Max < 0:
return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
case c.Metadata.Retry.Backoff <= time.Duration(0):
return ConfigurationError("Invalid Metadata.Retry.Backoff, must be > 0")
case c.Metadata.RefreshFrequency < time.Duration(0):
case c.Metadata.Retry.Backoff < 0:
return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0")
case c.Metadata.RefreshFrequency < 0:
return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
}

Expand Down
5 changes: 5 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"log"
"net"
"os"
"strings"
Expand Down Expand Up @@ -36,6 +37,10 @@ func init() {
}

kafkaShouldBeAvailable = os.Getenv("CI") != ""

if os.Getenv("DEBUG") == "true" {
Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
}

func checkKafkaAvailability(t *testing.T) {
Expand Down

0 comments on commit 6743438

Please sign in to comment.