Skip to content

Commit

Permalink
OffsetManager: match upstream mark/next behaviour
Browse files Browse the repository at this point in the history
Upstream requires you mark last-consumed+1 and then returns that value directly.
We were requiring you mark last-consumed and then adding one to the returned
value.

Match upstream's behaviour so that our offset tracking is interoperable.

Fixes #705.
  • Loading branch information
eapache committed Jul 26, 2016
1 parent 146ec3d commit b096b15
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Bug Fixes:
([#685](https://github.com/Shopify/sarama/pull/685)).
- Fix a possible tight loop in the consumer
([#693](https://github.com/Shopify/sarama/pull/693)).
- Match upstream's offset-tracking behaviour
([#705](https://github.com/Shopify/sarama/pull/705)).
- Report UnknownTopicOrPartition errors from the offset manager
([#706](https://github.com/Shopify/sarama/pull/706)).
- Fix possible negative partition value from the HashPartitioner
Expand Down
4 changes: 2 additions & 2 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestFuncOffsetManager(t *testing.T) {

offset, metadata := pom2.NextOffset()

if offset != 10+1 {
t.Errorf("Expected the next offset to be 11, found %d.", offset)
if offset != 10 {
t.Errorf("Expected the next offset to be 10, found %d.", offset)
}
if metadata != "test metadata" {
t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
Expand Down
8 changes: 6 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,15 @@ type PartitionOffsetManager interface {
// was committed for this partition yet.
NextOffset() (int64, string)

// MarkOffset marks the provided offset as processed, alongside a metadata string
// MarkOffset marks the provided offset, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// To follow upstream conventions, you are expected to mark the offset of the
// next message to read, not the last message read. Thus, when calling `MarkOffset`
// you should typically add one to the offset of the last consumed message.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
Expand Down Expand Up @@ -340,7 +344,7 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
defer pom.lock.Unlock()

if pom.offset >= 0 {
return pom.offset + 1, pom.metadata
return pom.offset, pom.metadata
}

return pom.parent.conf.Consumer.Offsets.Initial, ""
Expand Down
6 changes: 3 additions & 3 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")

offset, meta := pom.NextOffset()
if offset != 6 {
if offset != 5 {
t.Errorf("Expected offset 5. Actual: %v", offset)
}
if meta != "test_meta" {
Expand All @@ -215,7 +215,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
if offset != 100 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
if offset != 100 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
Expand Down

0 comments on commit b096b15

Please sign in to comment.