Skip to content

Commit

Permalink
Add support for custom offset retention durations to offset manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Matvey Arye committed Feb 10, 2016
1 parent 4ba9bba commit cc3d30f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 4 deletions.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ type Config struct {
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64

// The retention duration for committed offsets.
// A value of 0 cause the retention time to default to the offsets.retention.minutes broker option.
Retention time.Duration
}
}

Expand Down
19 changes: 15 additions & 4 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() {
}

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
var r *OffsetCommitRequest
if bom.parent.conf.Consumer.Offsets.Retention == 0 {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}

}

for s := range bom.subscriptions {
Expand Down
27 changes: 27 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,33 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour

pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)

pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
Expand Down

0 comments on commit cc3d30f

Please sign in to comment.