Skip to content

Commit

Permalink
Do not use partition cache for unknown topics.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 19, 2015
1 parent 4da772a commit 5d4f0f8
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 13 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Bug Fixes:
([#369](https://github.com/Shopify/sarama/pull/369)).
- Fix a condition where the producer's internal control messages could have
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
- Fix an issue where invalid partition lists would be cached when asking for
metadata for a non-existing topic ([#372](https://github.com/Shopify/sarama/pull/372)).


#### Version 1.0.0 (2015-03-17)

Expand Down
23 changes: 18 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
switch err.(type) {
case nil:
// valid response, use it
retry, err := client.update(response)
retry, err := client.updateMetadata(response)

if len(retry) > 0 {
if retriesRemaining <= 0 {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) update(data *MetadataResponse) ([]string, error) {
func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -554,23 +554,36 @@ func (client *client) update(data *MetadataResponse) ([]string, error) {

var err error
for _, topic := range data.Topics {

delete(client.metadata, topic.Name)
delete(client.cachedPartitionsResults, topic.Name)

switch topic.Err {
case ErrNoError:
break
case ErrLeaderNotAvailable, ErrUnknownTopicOrPartition:
case ErrInvalidTopic: // don't retry, don't store partial results
err = topic.Err
continue
case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
toRetry[topic.Name] = true
default:
continue
case ErrLeaderNotAvailable: // retry, but store partiial partition results
toRetry[topic.Name] = true
break
default: // don't retry, don't store partial results
Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
err = topic.Err
continue
}

client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
delete(client.cachedPartitionsResults, topic.Name)
for _, partition := range topic.Partitions {
client.metadata[topic.Name][partition.ID] = partition
if partition.Err == ErrLeaderNotAvailable {
toRetry[topic.Name] = true
}
}

var partitionCache [maxPartitionIndex][]int32
partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
Expand Down
54 changes: 54 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,60 @@ func TestCachedPartitions(t *testing.T) {
safeClose(t, client)
}

func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker := newMockBroker(t, 1)

replicas := []int32{seedBroker.BrokerID()}

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

partitions, err := client.Partitions("unknown")

if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

// Should still use the cache of a known topic
partitions, err = client.Partitions("my_topic")
if err != nil {
t.Errorf("Expected no error, found %v", err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

// Should not use cache for unknown topic
partitions, err = client.Partitions("unknown")
if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

seedBroker.Close()
safeClose(t, client)
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)

Expand Down
19 changes: 19 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
}
}

func TestProducingToInvalidTopic(t *testing.T) {
checkKafkaAvailability(t)

producer, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
t.Log("Expected ErrInvalidTopic, found", err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
t.Log("Expected ErrInvalidTopic, found", err)
}

safeClose(t, producer)
}

func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)

Expand Down
22 changes: 14 additions & 8 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,31 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
var match *TopicMetadata
func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
var tmatch *TopicMetadata

for _, tm := range m.Topics {
if tm.Name == topic {
match = tm
tmatch = tm
goto foundTopic
}
}

match = new(TopicMetadata)
match.Name = topic
m.Topics = append(m.Topics, match)
tmatch = new(TopicMetadata)
tmatch.Name = topic
m.Topics = append(m.Topics, tmatch)

foundTopic:

tmatch.Err = err
return tmatch
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
tmatch := m.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata

for _, pm := range match.Partitions {
for _, pm := range tmatch.Partitions {
if pm.ID == partition {
pmatch = pm
goto foundPartition
Expand All @@ -209,7 +215,7 @@ foundTopic:

pmatch = new(PartitionMetadata)
pmatch.ID = partition
match.Partitions = append(match.Partitions, pmatch)
tmatch.Partitions = append(tmatch.Partitions, pmatch)

foundPartition:

Expand Down
30 changes: 30 additions & 0 deletions sync_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ func TestConcurrentSyncProducer(t *testing.T) {
seedBroker.Close()
}

func TestSyncProducerToNonExistingTopic(t *testing.T) {
broker := newMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)

config := NewConfig()
config.Metadata.Retry.Max = 0
config.Producer.Retry.Max = 0

producer, err := NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
broker.Returns(metadataResponse)

_, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
if err != ErrUnknownTopicOrPartition {
t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
}

safeClose(t, producer)
broker.Close()
}

// This example shows the basic usage pattern of the SyncProducer.
func ExampleSyncProducer() {
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
Expand Down

0 comments on commit 5d4f0f8

Please sign in to comment.