Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix flaky testDeleteClosedTopics #427

Merged

Conversation

BewareMyPower
Copy link
Collaborator

@BewareMyPower BewareMyPower commented Apr 7, 2021

Motivation

testDeleteClosedTopics is very easy to fail in CI environment because there's a race condition. #425 closes a group's offset consumers when the channel becomes inactive. However, before Kafka consumer closes, it sends a COMMIT_OFFSET request to commit offsets, which is the behavior of the default enable.auto.commit=true config. Currently, KoP acknowledges the offset's associated message id in the callback of GroupMetadataManager#storeGroup, see GroupCoordinator#handleCommitOffsets:

        result.whenCompleteAsync((ignore, e) ->{
            if (e == null){
                offsetAcker.ackOffsets(groupId, offsetMetadata);
            }
        });

So it's asynchronous with handleCommitOffsets itself. There's a possibility that after the channel was closed and OffsetAcker's consumer was removed and closed, OffsetAcker#ackOffsets would be invoked again, and getConsumer would be invoked so that a new offset consumer would be created to the topic.

Here're some related logs in CI tests as the evidence.

15:49:07.884 [pulsar-client-io-40-1:org.apache.pulsar.client.impl.ConsumerImpl@698] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-delete-closed-topics-partition-0][sub-2] Subscribing to topic on cnx [id: 0xb7 fd0eb7, L:/127.0.0.1:50468 - R:localhost/127.0.0.1:15002], consumerId 2
15:49:07.980 [TestNG-method=testDeleteClosedTopics-1:org.apache.kafka.clients.consumer.KafkaConsumer@2152] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-2, groupId=sub-2] Kafka consumer has been closed
15:49:07.986 [pulsar-client-io-40-1:org.apache.pulsar.client.impl.ConsumerImpl@698] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-delete-closed-topics-partition-0][sub-2] Subscribing to topic on cnx [id: 0xb7fd0eb7, L:/127.0.0.1:50468 - R:localhost/127.0.0.1:15002], consumerId 3
...
org.apache.pulsar.broker.service.BrokerServiceException$TopicBusyException: Topic has 1 connected producers/consumers

We can see after the Kafka consumer was closed, a new offset consumer was created again.

Modifications

ackOffsets does the acknowledgement asynchronously but may triggers the creation of an offset consumer, so this PR makes OffsetAcker#ackOffsets be called before GroupMetadataManager#storeGroup and it prevents an offset consumer being created after channel is inactive.

Besides, it adds some condition checks to the test. In CI environment, sometimes topics cannot be deleted by 404 (Partitioned topic does not exist).

@BewareMyPower BewareMyPower requested a review from jiazhai as a code owner April 7, 2021 06:06
@BewareMyPower BewareMyPower changed the title Fix flaky testDeleteClosedTopics [WIP] Fix flaky testDeleteClosedTopics Apr 7, 2021
@BewareMyPower BewareMyPower changed the title [WIP] Fix flaky testDeleteClosedTopics Fix flaky testDeleteClosedTopics Apr 7, 2021
@jiazhai jiazhai merged commit 643f573 into streamnative:master Apr 8, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-ack-offset branch May 7, 2021 06:54
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants