Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate messages when resuming partitions not paused #4636

Merged
merged 4 commits into from
Apr 10, 2024

Conversation

emasab
Copy link
Contributor

@emasab emasab commented Mar 6, 2024

Fixed a bug causing duplicate message consumption from a stale fetch start offset in some particular cases.

In case of subscription change with a consumer using the cooperative assignor it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.

Fixed by ensuring that a resume operation is completely a no-op when the partition isn't paused.

Fixes #4637

@emasab emasab changed the title Fix duplicate messages when resuming Fix duplicate messages when resuming partitions not paused Mar 6, 2024
@emasab emasab force-pushed the dev_fix_duplicate_messages_cooperative_assignor branch from b228618 to 94fd544 Compare March 6, 2024 16:03
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine. Just few minor changes.

/* If partitions isn't paused, avoid bumping its version,
* as it'll result in resuming fetches from a stale
* next_fetch_start */
rd_bool_t paused = rd_false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 variables pause and paused. It feels confusing. Please change the name of this variable to is_already_paused

@@ -2299,7 +2299,22 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
int flag,
rd_kafka_replyq_t replyq) {
int32_t version;
rd_kafka_op_t *rko;
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is checked before the operation is sent to the queue. What about the rd_kafka_toppar_pause_resume in which this op is handled. Is this case handled there?

*
* @param partition_assignment_strategy Assignment strategy to test.
*/
static void test_no_duplicate_messages_unnecessary_resume(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name of the test should be better. It can be pause_resume_mock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to define a better numbering for mock tests. Integration tests starts with '0', unit test with '8', so we can use mock tests with '4' maybe?

@@ -511,7 +512,7 @@ struct test tests[] = {
_TEST(0142_reauthentication, 0, TEST_BRKVER(2, 2, 0, 0)),
_TEST(0143_exponential_backoff_mock, TEST_F_LOCAL),
_TEST(0144_idempotence_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep extra line.

test_msgver_init(&mv, testid);
test_consumer_poll("consume", rk, testid, -1, 0, msgcnt, &mv);

TEST_SAY("Unnecessary resume\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test failing before the change?

@emasab emasab force-pushed the dev_fix_duplicate_messages_cooperative_assignor branch from f11be5a to 58e9166 Compare March 26, 2024 15:29
@emasab emasab force-pushed the dev_fix_duplicate_messages_cooperative_assignor branch from 62583dd to 20bcfd1 Compare April 8, 2024 20:53
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@emasab emasab merged commit 5b798cd into master Apr 10, 2024
2 checks passed
@emasab emasab deleted the dev_fix_duplicate_messages_cooperative_assignor branch April 10, 2024 08:46
anchitj pushed a commit that referenced this pull request Jun 10, 2024
In case of subscription change with a consumer using the cooperative assignor
it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.
Fixed by ensuring that a resume operation is completely a no-op when
the partition isn't paused
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Duplicate messages could be received when resuming partitions that weren't paused
2 participants