Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset manager: make initial offset configurable #520

Merged
merged 3 commits into from
Aug 21, 2015

Conversation

wvanbergen
Copy link
Contributor

This allows you to set the initial offset Offset() will return if no offset was committed yet for the partition.

Right now, we depend on the implicit behaviour of the Kafka broker, which is returning -1. This patch makes the behaviour explicit, and allows you to change that to either OffsetOldest or OffsetNewest`.

I prefer using a config setting instead of an extra argument to ManagePartition, because this value will always be the same for every partition in a consumer.

@Shopify/kafka

@wvanbergen
Copy link
Contributor Author

Given that this allows you to send the return value directly to Consumer.ConsumePartition, we should probably make sure it always returns the next offset to consume, instead of the last offset that was committed.

pom.offset = block.Offset
pom.metadata = block.Metadata
} else {
pom.offset = pom.parent.conf.Consumer.Offsets.Initial
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't actually want to return OffsetNewest here, you want to ask the broker what the earliest available offset is and return that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, or is this intentional... that's really not clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer returning the constant. This way to user of the API can know there's no offset stored, and do something specific in that case (e.g. log). Moreover, in the end you feed to value to ConsumePartition, which will always do a FetchOffset API call to validate/lookup the actual offset to use. So this saves you one roundtrip to the broker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I would prefer to have this if in the actual Offset() method I think though; then we can put the +1 in as well and all the processing logic ends up in one place. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also then it should probably be called NextOffset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍, will fix

@eapache
Copy link
Contributor

eapache commented Aug 20, 2015

I am not convinced this makes sense when the offset manager is just a subcomponent of the higher-level consumer.

because this value will always be the same for every partition in a consumer.

Is that true? What about the migration case where you want to pass a distinct constant (manual) offset for each partition?

@wvanbergen
Copy link
Contributor Author

For the migration case, I was thinking either of the following models:

  • Use a separate tool to simply commit the current offsets from the old storage engine to Kafka, and restart the consumer afterwards.
  • Get the Offset(), and if it is negative, get it from to old storage engine, and set it with SetOffset.

@wvanbergen
Copy link
Contributor Author

Renamed Offset to NextOffset, and moved the logic to that function. Also renamed SetOffset to MarkOffset: I think it's a little bit more descriptive, but I am open to other suggestions.

Also, the godoc comments probably deserve some 👀 as well to make sure it's clear how this functions should be used.

@wvanbergen
Copy link
Contributor Author

I added a functional test for the offset manager. I discovered a bug with it: once you have managed a partition, and closed the partition offset manager, you cannot open a new partition offset manager for that partition.

As a consumer this has to work: it's possible to start consuming a partition, then have another instance manage that partition, but later get the partition assigned back to you if the other consumer instances shuts down.

My attempt at fixing: 3d1316a

// will eventually be flushed to the cluster based on configuration. You should only set the offset of
// messages that have been completely processed.
SetOffset(offset int64, metadata string)
// NextOffset returns the next offset should be consumed for the managed partition, accompanied by the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"next offset that should be consumed..."

@wvanbergen wvanbergen force-pushed the offset-manager-initial branch from 3d1316a to cedd9c3 Compare August 21, 2015 18:51
@wvanbergen wvanbergen force-pushed the offset-manager-initial branch from cedd9c3 to 0ae0505 Compare August 21, 2015 21:11
wvanbergen added a commit that referenced this pull request Aug 21, 2015
Offset manager: make initial offset configurable
@wvanbergen wvanbergen merged commit 4c0d6fc into offset-manager Aug 21, 2015
@wvanbergen wvanbergen deleted the offset-manager-initial branch August 21, 2015 21:14
@eapache
Copy link
Contributor

eapache commented Aug 21, 2015

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants