-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
partition consumer offset to last offset delta #1006
Conversation
Tests are passing locally (and on Travis), not quite sure what Travis is complaining about. |
Travis seems to be experiencing some issues (travis-ci/travis-ci#8898) - I've played with a few of the workarounds listed in that issue but none of them have worked for me so far. |
return nil, ErrIncompleteResponse | ||
} | ||
|
||
child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be guarded by a check that this actually advances the offset? If it doesn't and/or if messages were present which don't line up, we should probably still return an error of some sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I added a check for that now. The check is after the assignment rather than guarding it as L554 should not advance the offset when processing a normal non-compacted topic.
@wladh I think this makes sense, but curious for your opinion here; looks like another subtle difference between the old and new record-set formats. |
I think this fix makes sense. It seems to be what the Java consumer is doing as well ( |
consumer.go
Outdated
return nil, ErrIncompleteResponse | ||
} | ||
|
||
child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1 | ||
if child.offset == originalOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<=
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this would happen but it is definitely better to check this case. I've the operator to <=
now.
@@ -523,6 +523,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes | |||
var messages []*ConsumerMessage | |||
var incomplete bool | |||
prelude := true | |||
originalOffset := child.offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we save this value around L550 after we've already processed the messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should do that. If we are processing a non-compacted stream, the final message would set the offset correctly in the loop. In this case, the offset advancing check would fail incorrectly.
Looks like |
* Increases a partition consumer's offset to match a RecordBatch' LastOffsetDelta. Fixes an issue where consuming a log compacted topic would get stuck on a compacted offset. (closes #1005)
I've fixed the failing tests now and rebased on master. |
Thanks! |
Hey,
This seems to fix #1005. As far as I understood, we should advance partition consumer's offset with the LastOffsetDelta as this might be higher than the last record's offset in the batch if some of the offsets were compacted away.