From b8b586441caf52d7a31801b674b6ebfe02a78a7e Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 18 Oct 2022 12:22:09 +0200 Subject: [PATCH] Addressed PR comments --- tests/0105-transactions_mock.c | 70 +++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/tests/0105-transactions_mock.c b/tests/0105-transactions_mock.c index 740d2e24f..78d998d9e 100644 --- a/tests/0105-transactions_mock.c +++ b/tests/0105-transactions_mock.c @@ -2876,7 +2876,6 @@ static void do_test_txn_coordinator_null_not_fatal(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; - rd_kafka_topic_t *rkt; rd_kafka_resp_err_t err; int32_t coord_id = 1; const char *topic = "test"; @@ -2884,22 +2883,18 @@ static void do_test_txn_coordinator_null_not_fatal(void) { int msgcnt = 1; int remains = 0; - SUB_TEST_QUICK("Test coordinator NULL shouldn't cause " - "a fatal assertion " - "when transactional producer reconnects"); - - test_curr->is_fatal_cb = error_is_fatal_cb; - test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; - allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; - - rk = create_txn_producer(&mcluster, transactional_id, 1, "debug", "all", NULL); - rkt = test_create_topic_object(rk, topic, "message.timeout.ms", "100", NULL); + SUB_TEST_QUICK(); /* Broker down is not a test-failing error */ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; test_curr->is_fatal_cb = error_is_fatal_cb; + test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; - err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3); + /* One second is the minimum transaction timeout */ + rk = create_txn_producer(&mcluster, transactional_id, 1, + "transaction.timeout.ms", "1000", NULL); + + err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, @@ -2911,42 +2906,55 @@ static void do_test_txn_coordinator_null_not_fatal(void) { TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); - test_produce_msgs_nowait(rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0, - msgcnt, NULL, 0, 0, &remains); + /* Makes the produce request timeout. */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, coord_id, RD_KAFKAP_Produce, 1, + RD_KAFKA_RESP_ERR_NO_ERROR, 3000); + test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, + msgcnt, NULL, 0, &remains); + + /* This value is linked to transaction.timeout.ms, needs enough time + * so the message times out and a DrainBump sequence is started. */ + rd_kafka_flush(rk, 1000); + + /* To trigger the error the COORDINATOR_NOT_AVAILABLE response + * must come AFTER idempotent state has changed to WaitTransport + * but BEFORE it changes to WaitPID. To make it more likely + * rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms + * in rd_kafka_txn_coord_query, when unable to query for + * transaction coordinator. + */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10); + /* Coordinator down starts the FindCoordinatorRequest loop. */ TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id); rd_kafka_mock_broker_set_down(mcluster, coord_id); - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1, - RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10); - - /* This value is linked to message.timeout.ms, needs enough time - * so message time out and aren't purged */ - rd_kafka_flush(rk, 500); - /* Needs a specific timeout so FindCoordinatorRequest is the first thing that - * is done when the coordinator comes up */ - rd_usleep(605 * 1000, NULL); + /* Coordinator down for some time. */ + rd_usleep(100 * 1000, NULL); + /* When it comes up, the error is triggered, if the preconditions + * happen. */ TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id); rd_kafka_mock_broker_set_up(mcluster, coord_id); + /* Make sure DRs are received */ + rd_kafka_flush(rk, 1000); + error = rd_kafka_commit_transaction(rk, -1); TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains); TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); - TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); /* Needs to wait some time before closing to make sure it doesn't go - * into TERMINATING state before error is triggered */ - rd_usleep(1400 * 1000, NULL); - - rd_kafka_topic_destroy(rkt); - rd_kafka_error_destroy(error); - TEST_SAY("destroying rk"); + * into TERMINATING state before error is triggered. */ + rd_usleep(1000 * 1000, NULL); rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;