From 350c8ec6fe846f2417acd1e375d19f9e84139865 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 6 Mar 2023 17:33:28 -0800 Subject: [PATCH] tests: test for offset_for_leader_epoch_test kafka: return lowest offset if requested epoch is lower than local log If reading from tiered storage is disabled, we would previously return an offset -1 if the epoch queried fell below the start of our local log. This doesn't match what happens in Kafka, which returns the next available offset in a higher epoch. This behavior is matched by our lookup when reading from cloud storage. This patch updates the Kafka handler to return the start offset of our local log if the requested term falls below the local log's beginning. --- src/v/kafka/server/replicated_partition.cc | 13 ++- .../server/tests/produce_consume_test.cc | 104 ++++++++++++++++++ .../offset_for_leader_epoch_archival_test.py | 16 ++- 3 files changed, 126 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 766a6de49efe7..e3ba953a87a6d 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -282,22 +282,27 @@ raft::replicate_stages replicated_partition::replicate( std::optional replicated_partition::get_leader_epoch_last_offset( kafka::leader_epoch epoch) const { const model::term_id term(epoch); - const auto first_local_term = _partition->get_term( - _partition->start_offset()); - // found in local state + const auto first_local_offset = _partition->start_offset(); + const auto first_local_term = _partition->get_term(first_local_offset); + // Look for the highest offset in the requested term, or the first offset + // in the next term. This mirrors behavior in Kafka, see + // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 if (term >= first_local_term) { auto last_offset = _partition->get_term_last_offset(term); if (last_offset) { return _translator->from_log_offset(*last_offset); } } + // The requested term falls below our earliest local segment. + // Check cloud storage for a viable offset. if ( _partition->is_remote_fetch_enabled() && _partition->cloud_data_available()) { return _partition->get_cloud_term_last_offset(term); } - return std::nullopt; + // Return the offset of this next-highest term. + return _translator->from_log_offset(first_local_offset); } ss::future replicated_partition::validate_fetch_offset( diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index f3c043f2288ed..2a2c481acea5d 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -10,6 +10,7 @@ #include "kafka/client/transport.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/fetch.h" +#include "kafka/protocol/offset_for_leader_epoch.h" #include "kafka/protocol/produce.h" #include "kafka/protocol/request_reader.h" #include "kafka/server/handlers/produce.h" @@ -529,3 +530,106 @@ FIXTURE_TEST(test_quota_balancer_config_balancer_period, prod_consume_fixture) { abs(br - br_last - 7) <= 1, "Expected 7±1 balancer runs, got " << br - br_last); } + +// TODO: move producer utilities somewhere else and give this test a proper +// home. +FIXTURE_TEST(test_offset_for_leader_epoch_test, prod_consume_fixture) { + producer = std::make_unique( + make_kafka_client().get0()); + producer->connect().get0(); + model::topic_namespace tp_ns(model::ns("kafka"), test_topic); + add_topic(tp_ns).get0(); + model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); + tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] { + auto shard = app.shard_table.local().shard_for(ntp); + if (!shard) { + return ss::make_ready_future(false); + } + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& pm) { + return pm.get(ntp)->is_leader(); + }); + }).get0(); + auto shard = app.shard_table.local().shard_for(ntp); + for (int i = 0; i < 3; i++) { + // Step down. + app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + mgr.get(ntp)->raft()->step_down("force_step_down").get(); + }) + .get(); + auto tout = ss::lowres_clock::now() + std::chrono::seconds(10); + app.controller->get_partition_leaders() + .local() + .wait_for_leader(ntp, tout, {}) + .get(); + // Become leader and write. + app.partition_manager + .invoke_on( + *shard, + [this, ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + produce([this](size_t cnt) { + return small_batches(cnt); + }).get0(); + }) + .get(); + } + // Prefix truncate the log so the beginning of the log moves forward. + app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + storage::truncate_prefix_config cfg( + model::offset(1), ss::default_priority_class()); + partition->log().truncate_prefix(cfg).get(); + }) + .get(); + + // Make a request getting the offset from a term below the start of the + // log. + auto client = make_kafka_client().get0(); + client.connect().get(); + auto current_term = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + return mgr.get(ntp)->raft()->term(); + }) + .get(); + kafka::offset_for_leader_epoch_request req; + kafka::offset_for_leader_topic t{ + test_topic, + {{model::partition_id(0), + kafka::leader_epoch(current_term()), + kafka::leader_epoch(0)}}, + {}, + }; + req.data.topics.emplace_back(std::move(t)); + auto resp = client.dispatch(req, kafka::api_version(2)).get0(); + client.stop().then([&client] { client.shutdown(); }).get(); + BOOST_REQUIRE_EQUAL(1, resp.data.topics.size()); + const auto& topic_resp = resp.data.topics[0]; + BOOST_REQUIRE_EQUAL(1, topic_resp.partitions.size()); + const auto& partition_resp = topic_resp.partitions[0]; + + BOOST_REQUIRE_NE(partition_resp.end_offset, model::offset(-1)); + + // Check that the returned offset is the start of the log, since the + // requested term has been truncated. + auto earliest_kafka_offset + = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + auto start_offset = partition->log().offsets().start_offset; + return partition->get_offset_translator_state() + ->from_log_offset(start_offset); + }) + .get(); + BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset); +} diff --git a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py index 4e69fdc21437a..0989e8414f1d4 100644 --- a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py +++ b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py @@ -9,6 +9,7 @@ from math import fabs from rptest.services.cluster import cluster +from ducktape.mark import parametrize from ducktape.utils.util import wait_until from rptest.clients.kcl import KCL @@ -66,13 +67,15 @@ def alter_and_verify(): wait_until(alter_and_verify, 15, 0.5) @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) - def test_querying_remote_partitions(self): + @parametrize(remote_reads=[False, True]) + def test_querying_remote_partitions(self, remote_reads): topic = TopicSpec(redpanda_remote_read=True, redpanda_remote_write=True) epoch_offsets = {} rpk = RpkTool(self.redpanda) self.client().create_topic(topic) - rpk.alter_topic_config(topic.name, "redpanda.remote.read", 'true') + rpk.alter_topic_config(topic.name, "redpanda.remote.read", + str(remote_reads)) rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true') def wait_for_topic(): @@ -111,4 +114,11 @@ def wait_for_topic(): self.logger.info( f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}" ) - assert epoch_end_offset == offset + if remote_reads: + assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}" + else: + # Check that the returned offset isn't an invalid (-1) value, + # even if we read from an epoch that has been truncated locally + # and we can't read from cloud storage. + assert epoch_end_offset != -1, f"{epoch_end_offset} vs -1" + assert epoch_end_offset >= offset, f"{epoch_end_offset} vs {offset}"