From 7f2cdf6cd2c5b4d323e31a74f2dd36461346bba1 Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Thu, 19 Mar 2015 14:50:52 -0400 Subject: [PATCH] Do not use partition cache for unknown topics. --- CHANGELOG.md | 3 +++ client.go | 24 +++++++++++++++---- client_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++ functional_test.go | 19 +++++++++++++++ metadata_response.go | 22 +++++++++++------- sync_producer_test.go | 30 ++++++++++++++++++++++++ 6 files changed, 139 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 092623e6f..70b76a429 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ Bug Fixes: ([#369](https://github.com/Shopify/sarama/pull/369)). - Fix a condition where the producer's internal control messages could have gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)). + - Fix an issue where invalid partition lists would be cached when asking for + metadata for a non-existing topic ([#372](https://github.com/Shopify/sarama/pull/372)). + #### Version 1.0.0 (2015-03-17) diff --git a/client.go b/client.go index e46ef81a3..e30e8d355 100644 --- a/client.go +++ b/client.go @@ -492,7 +492,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) switch err.(type) { case nil: // valid response, use it - retry, err := client.update(response) + retry, err := client.updateMetadata(response) if len(retry) > 0 { if retriesRemaining <= 0 { @@ -531,7 +531,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) } // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable -func (client *client) update(data *MetadataResponse) ([]string, error) { +func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { client.lock.Lock() defer client.lock.Unlock() @@ -554,23 +554,37 @@ func (client *client) update(data *MetadataResponse) ([]string, error) { var err error for _, topic := range data.Topics { + + delete(client.metadata, topic.Name) + delete(client.cachedPartitionsResults, topic.Name) + switch topic.Err { case ErrNoError: break - case ErrLeaderNotAvailable, ErrUnknownTopicOrPartition: + case ErrInvalidTopic: // don't retry, don't store partial results + err = topic.Err + continue + case ErrUnknownTopicOrPartition: // retry, do not store partial partition results + err = topic.Err toRetry[topic.Name] = true - default: + continue + case ErrLeaderNotAvailable: // retry, but store partiial partition results + toRetry[topic.Name] = true + break + default: // don't retry, don't store partial results + Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err) err = topic.Err + continue } client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions)) - delete(client.cachedPartitionsResults, topic.Name) for _, partition := range topic.Partitions { client.metadata[topic.Name][partition.ID] = partition if partition.Err == ErrLeaderNotAvailable { toRetry[topic.Name] = true } } + var partitionCache [maxPartitionIndex][]int32 partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions) partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions) diff --git a/client_test.go b/client_test.go index 3084266e6..e778842df 100644 --- a/client_test.go +++ b/client_test.go @@ -65,6 +65,60 @@ func TestCachedPartitions(t *testing.T) { safeClose(t, client) } +func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { + seedBroker := newMockBroker(t, 1) + + replicas := []int32{seedBroker.BrokerID()} + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Metadata.Retry.Max = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + metadataResponse = new(MetadataResponse) + metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataResponse) + + partitions, err := client.Partitions("unknown") + + if err != ErrUnknownTopicOrPartition { + t.Error("Expected ErrUnknownTopicOrPartition, found", err) + } + if partitions != nil { + t.Errorf("Should return nil as partition list, found %v", partitions) + } + + // Should still use the cache of a known topic + partitions, err = client.Partitions("my_topic") + if err != nil { + t.Errorf("Expected no error, found %v", err) + } + + metadataResponse = new(MetadataResponse) + metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataResponse) + + // Should not use cache for unknown topic + partitions, err = client.Partitions("unknown") + if err != ErrUnknownTopicOrPartition { + t.Error("Expected ErrUnknownTopicOrPartition, found", err) + } + if partitions != nil { + t.Errorf("Should return nil as partition list, found %v", partitions) + } + + seedBroker.Close() + safeClose(t, client) +} + func TestClientSeedBrokers(t *testing.T) { seedBroker := newMockBroker(t, 1) diff --git a/functional_test.go b/functional_test.go index 79b6f2f1a..88a64294f 100644 --- a/functional_test.go +++ b/functional_test.go @@ -124,6 +124,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) { } } +func TestProducingToInvalidTopic(t *testing.T) { + checkKafkaAvailability(t) + + producer, err := NewSyncProducer(kafkaBrokers, nil) + if err != nil { + t.Fatal(err) + } + + if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic { + t.Log("Expected ErrInvalidTopic, found", err) + } + + if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic { + t.Log("Expected ErrInvalidTopic, found", err) + } + + safeClose(t, producer) +} + func testProducingMessages(t *testing.T, config *Config) { checkKafkaAvailability(t) diff --git a/metadata_response.go b/metadata_response.go index f4e8251a6..b82221f7e 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -182,25 +182,31 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) { m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr}) } -func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) { - var match *TopicMetadata +func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { + var tmatch *TopicMetadata for _, tm := range m.Topics { if tm.Name == topic { - match = tm + tmatch = tm goto foundTopic } } - match = new(TopicMetadata) - match.Name = topic - m.Topics = append(m.Topics, match) + tmatch = new(TopicMetadata) + tmatch.Name = topic + m.Topics = append(m.Topics, tmatch) foundTopic: + tmatch.Err = err + return tmatch +} + +func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) { + tmatch := m.AddTopic(topic, ErrNoError) var pmatch *PartitionMetadata - for _, pm := range match.Partitions { + for _, pm := range tmatch.Partitions { if pm.ID == partition { pmatch = pm goto foundPartition @@ -209,7 +215,7 @@ foundTopic: pmatch = new(PartitionMetadata) pmatch.ID = partition - match.Partitions = append(match.Partitions, pmatch) + tmatch.Partitions = append(tmatch.Partitions, pmatch) foundPartition: diff --git a/sync_producer_test.go b/sync_producer_test.go index 6ecb32d0e..d378949b1 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -97,6 +97,36 @@ func TestConcurrentSyncProducer(t *testing.T) { seedBroker.Close() } +func TestSyncProducerToNonExistingTopic(t *testing.T) { + broker := newMockBroker(t, 1) + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + config := NewConfig() + config.Metadata.Retry.Max = 0 + config.Producer.Retry.Max = 0 + + producer, err := NewSyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + metadataResponse = new(MetadataResponse) + metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) + broker.Returns(metadataResponse) + + _, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"}) + if err != ErrUnknownTopicOrPartition { + t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err) + } + + safeClose(t, producer) + broker.Close() +} + // This example shows the basic usage pattern of the SyncProducer. func ExampleSyncProducer() { producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)