diff --git a/config.go b/config.go index 7cc91e7617..c0387fde1c 100644 --- a/config.go +++ b/config.go @@ -183,6 +183,10 @@ type Config struct { // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. Initial int64 + + // The retention duration for committed offsets. + // A value of 0 cause the retention time to default to the offsets.retention.minutes broker option. + Retention time.Duration } } diff --git a/offset_manager.go b/offset_manager.go index 0c90f7c217..16fa1dacf3 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() { } func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { - r := &OffsetCommitRequest{ - Version: 1, - ConsumerGroup: bom.parent.group, - ConsumerGroupGeneration: GroupGenerationUndefined, + var r *OffsetCommitRequest + if bom.parent.conf.Consumer.Offsets.Retention == 0 { + r = &OffsetCommitRequest{ + Version: 1, + ConsumerGroup: bom.parent.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + } else { + r = &OffsetCommitRequest{ + Version: 2, + RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond), + ConsumerGroup: bom.parent.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + } for s := range bom.subscriptions { diff --git a/offset_manager_test.go b/offset_manager_test.go index 00d5fba9ed..65a351a300 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -228,6 +228,33 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) { coordinator.Close() } +func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { + om, testClient, broker, coordinator := initOffsetManager(t) + testClient.Config().Consumer.Offsets.Retention = time.Hour + + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + coordinator.Returns(ocResponse) + + pom.MarkOffset(100, "modified_meta") + offset, meta := pom.NextOffset() + + if offset != 101 { + t.Errorf("Expected offset 100. Actual: %v", offset) + } + if meta != "modified_meta" { + t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) + } + + safeClose(t, pom) + safeClose(t, om) + safeClose(t, testClient) + broker.Close() + coordinator.Close() +} + func TestPartitionOffsetManagerCommitErr(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")