Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OffsetManager: match upstream mark/next behaviour #713

Merged
merged 1 commit into from
Jul 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ features that may not be compatible with old Kafka versions. If you don't
specify this value it will default to 0.8.2 (the minimum supported), and trying
to use more recent features (like the offset manager) will fail with an error.

_Also:_ The offset-manager's behaviour has been changed to match the upstream
java consumer (see [#705](https://github.com/Shopify/sarama/pull/705) and
[#713](https://github.com/Shopify/sarama/pull/713)). If you use the
offset-manager, please ensure that you are committing one *greater* than the
last consumed message offset or else you may end up consuming duplicate
messages.

New Features:
- Support for Kafka 0.10
([#672](https://github.com/Shopify/sarama/pull/672),
Expand Down Expand Up @@ -35,6 +42,8 @@ Bug Fixes:
([#685](https://github.com/Shopify/sarama/pull/685)).
- Fix a possible tight loop in the consumer
([#693](https://github.com/Shopify/sarama/pull/693)).
- Match upstream's offset-tracking behaviour
([#705](https://github.com/Shopify/sarama/pull/705)).
- Report UnknownTopicOrPartition errors from the offset manager
([#706](https://github.com/Shopify/sarama/pull/706)).
- Fix possible negative partition value from the HashPartitioner
Expand Down
4 changes: 2 additions & 2 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestFuncOffsetManager(t *testing.T) {

offset, metadata := pom2.NextOffset()

if offset != 10+1 {
t.Errorf("Expected the next offset to be 11, found %d.", offset)
if offset != 10 {
t.Errorf("Expected the next offset to be 10, found %d.", offset)
}
if metadata != "test metadata" {
t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
Expand Down
8 changes: 6 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,15 @@ type PartitionOffsetManager interface {
// was committed for this partition yet.
NextOffset() (int64, string)

// MarkOffset marks the provided offset as processed, alongside a metadata string
// MarkOffset marks the provided offset, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// To follow upstream conventions, you are expected to mark the offset of the
// next message to read, not the last message read. Thus, when calling `MarkOffset`
// you should typically add one to the offset of the last consumed message.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
Expand Down Expand Up @@ -340,7 +344,7 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
defer pom.lock.Unlock()

if pom.offset >= 0 {
return pom.offset + 1, pom.metadata
return pom.offset, pom.metadata
}

return pom.parent.conf.Consumer.Offsets.Initial, ""
Expand Down
6 changes: 3 additions & 3 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")

offset, meta := pom.NextOffset()
if offset != 6 {
if offset != 5 {
t.Errorf("Expected offset 5. Actual: %v", offset)
}
if meta != "test_meta" {
Expand All @@ -215,7 +215,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
if offset != 100 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
if offset != 100 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
Expand Down