-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18060: new coordinator does not handle TxnOffsetCommitRequest with empty member id when using CONSUMER group #17914
Conversation
…th empty member id when using CONSUMER group
@brandboat Thanks for the quick patch. Could you please add a description to the pull request? We usually also use it for the commit message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat Thanks for the patch. I left some comments for consideration.
// TxnOffsetCommitRequest versions v0-v2 do not include a member ID. | ||
// And we can still send UNKNOWN_GENERATION_ID and UNKNOWN_MEMBER_ID through Producer#sendOffsetsToTransaction. | ||
// Therefore, we should not throw exception in these cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would simplify it a bit. What about The TxnOffsetCommit API does not require the member id, the generation id and the group instance id fields. Hence the are only validated if any of them is provided.
?
@@ -26,6 +26,7 @@ | |||
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please extend unit tests in ConsumerGroupTest too?
adminClient = cluster.admin() | ||
adminClient.createTopics(singleton(new NewTopic(topic, 1, 1.toShort))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could actually use createTopic
method from the parent class.
// add this line to avoid error INVALID_TXN_STATE | ||
producer.sendOffsetsToTransaction( | ||
Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5)), | ||
new ConsumerGroupMetadata(groupId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that you must do this in order to add the __consumer_offsets partition to the transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I prefer to build the AddOffsetsToTxnRequest
explicitly. for example:
protected def addOffsetsToTxn(groupId: String,
producerId: Int,
producerEpoch: Short,
transactionalId: String,
version: Short): Unit = {
val request = new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData()
.setTransactionalId(transactionalId)
.setProducerId(producerId).setProducerEpoch(producerEpoch).setGroupId(groupId)
).build(version)
val expectedResponse = new AddOffsetsToTxnResponseData()
val response = connectAndReceive[AddOffsetsToTxnResponse](request)
assertEquals(expectedResponse, response.data)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you both! I believe all comments have been addressed. Please take a look when you have a chance.
Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5)), | ||
new ConsumerGroupMetadata(groupId)) | ||
|
||
// verify that the TXN_OFFSET_COMMIT request is processed correctly when member id is UNKNOWN_MEMBER_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Verify....
producer.close() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are here, do you mind adding a test which verifies that the fields are correctly validated when provided?
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), | ||
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), | ||
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), | ||
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can define all the common configs at the class level in ClusterTestDefaults
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat Thanks for the update. I left a few more comments for consideration.
protected def addOffsetsToTxn(groupId: String, | ||
producerId: Int, | ||
producerEpoch: Short, | ||
transactionalId: String, | ||
version: Short): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's follow the format of the other methods please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that comes from my auto format... I forgot to set the scala code style, thanks
version: Short): Unit = { | ||
val request = new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData() | ||
.setTransactionalId(transactionalId) | ||
.setProducerId(producerId).setProducerEpoch(producerEpoch).setGroupId(groupId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's put setProducerEpoch
and setGroupId
on new lines to be consistent.
.setProducerId(producerId).setProducerEpoch(producerEpoch).setGroupId(groupId) | ||
).build(version) | ||
|
||
val expectedResponse = new AddOffsetsToTxnResponseData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could inline it.
producer = cluster.producer(Collections.singletonMap(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)) | ||
producer.initTransactions() | ||
producer.beginTransaction() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have addOffsetsToTxn
, I wonder if we should also add one to init the transaction so we could remove the producer. It is a bit weird to use both because you actually don't know whether the producer id/epoch passes to addOffsetsToTxn
are actually correct.
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, | ||
generationId = JoinGroupRequest.UNKNOWN_GENERATION_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a similar test but which sets member id/generation id too?
TestUtils.waitUntilTrue(() => { | ||
consumer.poll(Duration.ofSeconds(5)).asScala | ||
.filter(record => record.key() != null && record.value() != null) | ||
.map(record => new GroupCoordinatorRecordSerde() | ||
.deserialize(ByteBuffer.wrap(record.key().get()), ByteBuffer.wrap(record.value().get()))) | ||
.filter(coordinatorRecord => | ||
coordinatorRecord.key().message().isInstanceOf[OffsetCommitKey] && | ||
coordinatorRecord.value().message().isInstanceOf[OffsetCommitValue]) | ||
.exists(coordinatorRecord => { | ||
val offsetCommitKey = coordinatorRecord.key().message().asInstanceOf[OffsetCommitKey] | ||
val offsetCommitValue = coordinatorRecord.value().message().asInstanceOf[OffsetCommitValue] | ||
offsetCommitKey.group() == groupId && | ||
offsetCommitKey.topic() == topic && | ||
offsetCommitKey.partition == partition && | ||
offsetCommitValue.offset() == 100 + version | ||
}) | ||
}, "Txn offset commit not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit extreme. I would rather prefer to use fetchOffset
to validate the committed offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, I have addressed all comments in the latest commits. Also loop the integration test 247 times and all passed.
I=0; while ./gradlew cleanTest core:test --tests kafka.server.TxnOffsetCommitRequestTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat Thanks for the update and your patience. I left a few more minor suggestions.
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
Outdated
Show resolved
Hide resolved
// Verify that the TXN_OFFSET_COMMIT request is processed correctly when the member ID | ||
// and generation ID are known. This validation starts from version 3, as the member ID | ||
// must not be empty from version 3 onwards. | ||
if (version >= 3) { | ||
verifyTxnCommitAndFetch( | ||
groupId = groupId, | ||
memberId = memberId, | ||
generationId = memberEpoch, | ||
offset = 200 + version, | ||
version = version.toShort | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to add a negative cases too? We could add them in this test or in another one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can put the negative case to TxnOffsetCommitRequestTest.java ? I thought the TxnOffsetCommitRequestTest should take care the scenario that GroupMetadata is defined while version < 3.
Let me know if you have any concern. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By negative cases, I meant a request with invalid memberId and/or generationId to ensure that the request is rejected with the correct error. Is it worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I added those test in the latest commit, thanks for the suggestion !
core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat Thanks for the update and your patience. We're almost there! I replied to one of the open comments and left a small nit.
// Verify that the TXN_OFFSET_COMMIT request is processed correctly when the member ID | ||
// and generation ID are known. This validation starts from version 3, as the member ID | ||
// must not be empty from version 3 onwards. | ||
if (version >= 3) { | ||
verifyTxnCommitAndFetch( | ||
groupId = groupId, | ||
memberId = memberId, | ||
generationId = memberEpoch, | ||
offset = 200 + version, | ||
version = version.toShort | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By negative cases, I meant a request with invalid memberId and/or generationId to ensure that the request is rejected with the correct error. Is it worth it?
|
||
createTopic(topic, 1) | ||
|
||
def verifyTxnCommitAndFetch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This method is pretty long to be inlined now. Would it make sense to extract it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this great patch. some trivial comments are left. PTAL
expectedTxnCommitError: Errors | ||
): Unit = { | ||
var producerIdAndEpoch: ProducerIdAndEpoch = null | ||
// Wait until ALLOCATE_PRODUCER_ID request finished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that a more accurate comment would be "Wait until the coordinator finishes loading."
val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head | ||
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head | ||
assertEquals(offset, partitionRecord.committedOffset) | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset == partitionRecord.committedOffset
expectedError = Errors.NONE | ||
) | ||
|
||
if (expectedTxnCommitError != Errors.NONE) return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please confirm that committed offsets remain unchanged after a failed TXN_OFFSET_COMMIT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the latest commit, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat Thanks for the update. Overall, LGTM. I left a few nits.
|
||
for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { | ||
// Verify that the TXN_OFFSET_COMMIT request is processed correctly when member id is UNKNOWN_MEMBER_ID | ||
// and generation id is UNKNOWN_GENERATION_ID under all api versions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .
at the end of the comment.
expectedTxnCommitError = Errors.NONE | ||
) | ||
|
||
// Verify TXN_OFFSET_COMMIT request failed with incorrect memberId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
expectedTxnCommitError = Errors.UNKNOWN_MEMBER_ID | ||
) | ||
|
||
// Verify TXN_OFFSET_COMMIT request failed with incorrect generationId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
@chia7712 Do you want to take another look at it? Otherwise, I can merge it tomorrow. |
…th empty member id when using CONSUMER group (apache#17914) There are two issues in KAFKA-18060: 1) New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations 2) The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions. Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest. Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai <[email protected]>
There are two issues in KAFKA-18060:
New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations
The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions.
Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest.
Committer Checklist (excluded from commit message)