Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Oct 19, 2022
1 parent 1b8ead0 commit b8b5864
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions tests/0105-transactions_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2876,30 +2876,25 @@ static void do_test_txn_coordinator_null_not_fatal(void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
rd_kafka_topic_t *rkt;
rd_kafka_resp_err_t err;
int32_t coord_id = 1;
const char *topic = "test";
const char *transactional_id = "txnid";
int msgcnt = 1;
int remains = 0;

SUB_TEST_QUICK("Test coordinator NULL shouldn't cause "
"a fatal assertion "
"when transactional producer reconnects");

test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;

rk = create_txn_producer(&mcluster, transactional_id, 1, "debug", "all", NULL);
rkt = test_create_topic_object(rk, topic, "message.timeout.ms", "100", NULL);
SUB_TEST_QUICK();

/* Broker down is not a test-failing error */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
/* One second is the minimum transaction timeout */
rk = create_txn_producer(&mcluster, transactional_id, 1,
"transaction.timeout.ms", "1000", NULL);

err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
Expand All @@ -2911,42 +2906,55 @@ static void do_test_txn_coordinator_null_not_fatal(void) {
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

test_produce_msgs_nowait(rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, 0, &remains);
/* Makes the produce request timeout. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 3000);

test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
msgcnt, NULL, 0, &remains);

/* This value is linked to transaction.timeout.ms, needs enough time
* so the message times out and a DrainBump sequence is started. */
rd_kafka_flush(rk, 1000);

/* To trigger the error the COORDINATOR_NOT_AVAILABLE response
* must come AFTER idempotent state has changed to WaitTransport
* but BEFORE it changes to WaitPID. To make it more likely
* rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms
* in rd_kafka_txn_coord_query, when unable to query for
* transaction coordinator.
*/
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);

/* Coordinator down starts the FindCoordinatorRequest loop. */
TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);

rd_kafka_mock_broker_push_request_error_rtts(
mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);

/* This value is linked to message.timeout.ms, needs enough time
* so message time out and aren't purged */
rd_kafka_flush(rk, 500);
/* Needs a specific timeout so FindCoordinatorRequest is the first thing that
* is done when the coordinator comes up */
rd_usleep(605 * 1000, NULL);
/* Coordinator down for some time. */
rd_usleep(100 * 1000, NULL);

/* When it comes up, the error is triggered, if the preconditions
* happen. */
TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id);
rd_kafka_mock_broker_set_up(mcluster, coord_id);

/* Make sure DRs are received */
rd_kafka_flush(rk, 1000);

error = rd_kafka_commit_transaction(rk, -1);

TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");

TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);

/* Needs to wait some time before closing to make sure it doesn't go
* into TERMINATING state before error is triggered */
rd_usleep(1400 * 1000, NULL);

rd_kafka_topic_destroy(rkt);
rd_kafka_error_destroy(error);
TEST_SAY("destroying rk");
* into TERMINATING state before error is triggered. */
rd_usleep(1000 * 1000, NULL);
rd_kafka_destroy(rk);

allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
Expand Down

0 comments on commit b8b5864

Please sign in to comment.