From f922f12eb99991623a19bbae822dc77a27dbefa0 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 25 Mar 2015 14:10:37 +0000 Subject: [PATCH] Add retry logic to GetOffset() just like Leader() Which is: refresh the metadata and retry once, in case e.g. our connection or metadata is stale. Also add a test for it. --- client.go | 57 +++++++++++++++++++++++++++++++------------------- client_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index b2e4772a4..6aa1ecdab 100644 --- a/client.go +++ b/client.go @@ -292,31 +292,16 @@ func (client *client) RefreshMetadata(topics ...string) error { } func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) { - broker, err := client.Leader(topic, partitionID) - if err != nil { - return -1, err - } - - request := &OffsetRequest{} - request.AddBlock(topic, partitionID, time, 1) + offset, err := client.getOffset(topic, partitionID, time) - response, err := broker.GetAvailableOffsets(request) if err != nil { - return -1, err - } - - block := response.GetBlock(topic, partitionID) - if block == nil { - return -1, ErrIncompleteResponse - } - if block.Err != ErrNoError { - return -1, block.Err - } - if len(block.Offsets) != 1 { - return -1, ErrOffsetOutOfRange + if err := client.RefreshMetadata(topic); err != nil { + return -1, err + } + return client.getOffset(topic, partitionID, time) } - return block.Offsets[0], nil + return offset, err } // private broker management helpers @@ -442,6 +427,36 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er return nil, ErrUnknownTopicOrPartition } +func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) { + broker, err := client.Leader(topic, partitionID) + if err != nil { + return -1, err + } + + request := &OffsetRequest{} + request.AddBlock(topic, partitionID, time, 1) + + response, err := broker.GetAvailableOffsets(request) + if err != nil { + _ = broker.Close() + return -1, err + } + + block := response.GetBlock(topic, partitionID) + if block == nil { + _ = broker.Close() + return -1, ErrIncompleteResponse + } + if block.Err != ErrNoError { + return -1, block.Err + } + if len(block.Offsets) != 1 { + return -1, ErrOffsetOutOfRange + } + + return block.Offsets[0], nil +} + // core metadata update logic func (client *client) backgroundMetadataUpdater() { diff --git a/client_test.go b/client_test.go index 34015994f..b72040d97 100644 --- a/client_test.go +++ b/client_test.go @@ -200,6 +200,54 @@ func TestClientMetadata(t *testing.T) { safeClose(t, client) } +func TestClientGetOffset(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + leaderAddr := leader.Addr() + + metadata := new(MetadataResponse) + metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadata.AddBroker(leaderAddr, leader.BrokerID()) + seedBroker.Returns(metadata) + + client, err := NewClient([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + + offsetResponse := new(OffsetResponse) + offsetResponse.AddTopicPartition("foo", 0, 123) + leader.Returns(offsetResponse) + + offset, err := client.GetOffset("foo", 0, OffsetNewest) + if err != nil { + t.Error(err) + } + if offset != 123 { + t.Error("Unexpected offset, got ", offset) + } + + leader.Close() + seedBroker.Returns(metadata) + + leader = newMockBrokerAddr(t, 2, leaderAddr) + offsetResponse = new(OffsetResponse) + offsetResponse.AddTopicPartition("foo", 0, 456) + leader.Returns(offsetResponse) + + offset, err = client.GetOffset("foo", 0, OffsetNewest) + if err != nil { + t.Error(err) + } + if offset != 456 { + t.Error("Unexpected offset, got ", offset) + } + + seedBroker.Close() + leader.Close() + safeClose(t, client) +} + func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := newMockBroker(t, 1)