Skip to content

Commit

Permalink
Do not use partition cache for unknown topics.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 19, 2015
1 parent 4da772a commit 1024968
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 14 deletions.
15 changes: 9 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
switch err.(type) {
case nil:
// valid response, use it
retry, err := client.update(response)
retry, err := client.updateMetadata(response)

if len(retry) > 0 {
if retriesRemaining <= 0 {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) update(data *MetadataResponse) ([]string, error) {
func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

Expand Down Expand Up @@ -571,10 +571,13 @@ func (client *client) update(data *MetadataResponse) ([]string, error) {
toRetry[topic.Name] = true
}
}
var partitionCache [maxPartitionIndex][]int32
partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
client.cachedPartitionsResults[topic.Name] = partitionCache

if topic.Err != ErrUnknownTopicOrPartition && topic.Err != ErrInvalidTopic {
var partitionCache [maxPartitionIndex][]int32
partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
client.cachedPartitionsResults[topic.Name] = partitionCache
}
}

if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,62 @@ func TestCachedPartitions(t *testing.T) {
safeClose(t, client)
}

func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker := newMockBroker(t, 1)

replicas := []int32{seedBroker.BrokerID()}

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

partitions, err := client.Partitions("unknown")

if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

// Should still use the cache of a known topic
partitions, err = client.Partitions("my_topic")
if err != nil {
t.Errorf("Expected no error, found %v", err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

// Should not use cache for unknown topic
partitions, err = client.Partitions("unknown")
if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

seedBroker.Close()
safeClose(t, client)
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)

Expand Down
19 changes: 19 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
}
}

func TestProducingToInvalidTopic(t *testing.T) {
checkKafkaAvailability(t)

producer, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
t.Log("Expected ErrInvalidTopic, found", err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
t.Log("Expected ErrInvalidTopic, found", err)
}

safeClose(t, producer)
}

func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)

Expand Down
22 changes: 14 additions & 8 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,31 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
var match *TopicMetadata
func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
var tmatch *TopicMetadata

for _, tm := range m.Topics {
if tm.Name == topic {
match = tm
tmatch = tm
goto foundTopic
}
}

match = new(TopicMetadata)
match.Name = topic
m.Topics = append(m.Topics, match)
tmatch = new(TopicMetadata)
tmatch.Name = topic
m.Topics = append(m.Topics, tmatch)

foundTopic:

tmatch.Err = err
return tmatch
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
tmatch := m.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata

for _, pm := range match.Partitions {
for _, pm := range tmatch.Partitions {
if pm.ID == partition {
pmatch = pm
goto foundPartition
Expand All @@ -209,7 +215,7 @@ foundTopic:

pmatch = new(PartitionMetadata)
pmatch.ID = partition
match.Partitions = append(match.Partitions, pmatch)
tmatch.Partitions = append(tmatch.Partitions, pmatch)

foundPartition:

Expand Down
30 changes: 30 additions & 0 deletions sync_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ func TestConcurrentSyncProducer(t *testing.T) {
seedBroker.Close()
}

func TestSyncProducerToNonExistingTopic(t *testing.T) {
broker := newMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)

config := NewConfig()
config.Metadata.Retry.Max = 0
config.Producer.Retry.Max = 0

producer, err := NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, producer)

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
broker.Returns(metadataResponse)

_, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
if err != ErrUnknownTopicOrPartition {
t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
}

broker.Close()
}

// This example shows the basic usage pattern of the SyncProducer.
func ExampleSyncProducer() {
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
Expand Down

0 comments on commit 1024968

Please sign in to comment.