Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache client partition results to save the repeated sort() calls #287

Merged
merged 1 commit into from
Feb 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ type Client struct {

brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
lock sync.RWMutex // protects access to the maps, only one since they're always written together

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
cachedPartitionsResults map[string][maxPartitionIndex][]int32
lock sync.RWMutex // protects access to the maps, only one since they're always written together
}

// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
Expand All @@ -87,14 +91,15 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
}

client := &Client{
id: id,
config: *config,
closer: make(chan none),
seedBrokerAddrs: addrs,
seedBroker: NewBroker(addrs[0]),
deadBrokerAddrs: make(map[string]none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
id: id,
config: *config,
closer: make(chan none),
seedBrokerAddrs: addrs,
seedBroker: NewBroker(addrs[0]),
deadBrokerAddrs: make(map[string]none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
}
_ = client.seedBroker.Open(config.DefaultBrokerConf)

Expand Down Expand Up @@ -387,9 +392,15 @@ func (client *Client) any() *Broker {

// private caching/lazy metadata helpers

type partitionType int

const (
allPartitions = iota
allPartitions partitionType = iota
writablePartitions
// If you add any more types, update the partition cache in update()

// Ensure this is the last partition type value
maxPartitionIndex
)

func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
Expand Down Expand Up @@ -422,11 +433,21 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
return nil
}

func (client *Client) cachedPartitions(topic string, partitionSet int) []int32 {
func (client *Client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
client.lock.RLock()
defer client.lock.RUnlock()

partitions, exists := client.cachedPartitionsResults[topic]

if !exists {
return nil
}
return partitions[partitionSet]
}

func (client *Client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
partitions := client.metadata[topic]

if partitions == nil {
return nil
}
Expand Down Expand Up @@ -584,12 +605,17 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
}

client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
delete(client.cachedPartitionsResults, topic.Name)
for _, partition := range topic.Partitions {
client.metadata[topic.Name][partition.ID] = partition
if partition.Err == LeaderNotAvailable {
toRetry[topic.Name] = true
}
}
var partitionCache [maxPartitionIndex][]int32
partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
client.cachedPartitionsResults[topic.Name] = partitionCache
}

if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,44 @@ func TestSimpleClient(t *testing.T) {
defer mb.Close()
}

func TestCachedPartitions(t *testing.T) {
mb1 := NewMockBroker(t, 1)
mb5 := NewMockBroker(t, 5)
replicas := []int32{3, 1, 5}
isr := []int32{5, 1}

mdr := new(MetadataResponse)
mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
mb1.Returns(mdr)

config := NewClientConfig()
config.MetadataRetries = 0
client, err := NewClient("client_id", []string{mb1.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)
defer mb1.Close()
defer mb5.Close()

// Verify they aren't cached the same
allP := client.cachedPartitionsResults["my_topic"][allPartitions]
writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
if len(allP) == len(writeP) {
t.Fatal("Invalid lengths!")
}

tmp := client.cachedPartitionsResults["my_topic"]
// Verify we actually use the cache at all!
tmp[allPartitions] = []int32{1, 2, 3, 4}
client.cachedPartitionsResults["my_topic"] = tmp
if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
t.Fatal("Not using the cache!")
}
}

func TestClientSeedBrokers(t *testing.T) {

mb1 := NewMockBroker(t, 1)
Expand Down