diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index b0bc5fbe7b662..42cc6da1a48df 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -126,6 +126,8 @@ class partition { const model::ntp& ntp() const { return _raft->ntp(); } + storage::log log() const { return _raft->log(); } + ss::future> timequery(storage::timequery_config); diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 9d07e7dc5c063..3a5b7fcaf267c 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -282,22 +282,27 @@ raft::replicate_stages replicated_partition::replicate( std::optional replicated_partition::get_leader_epoch_last_offset( kafka::leader_epoch epoch) const { const model::term_id term(epoch); - const auto first_local_term = _partition->get_term( - _partition->start_offset()); - // found in local state + const auto first_local_offset = _partition->start_offset(); + const auto first_local_term = _partition->get_term(first_local_offset); + // Look for the highest offset in the requested term, or the first offset + // in the next term. This mirrors behavior in Kafka, see + // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 if (term >= first_local_term) { auto last_offset = _partition->get_term_last_offset(term); if (last_offset) { return _translator->from_log_offset(*last_offset); } } + // The requested term falls below our earliest local segment. + // Check cloud storage for a viable offset. if ( _partition->is_remote_fetch_enabled() && _partition->cloud_data_available()) { return _partition->get_cloud_term_last_offset(term); } - return std::nullopt; + // Return the offset of this next-highest term. + return _translator->from_log_offset(first_local_offset); } ss::future replicated_partition::validate_fetch_offset( diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index b1171526eddd1..4dcaf3fdae04a 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -10,6 +10,7 @@ #include "kafka/client/transport.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/fetch.h" +#include "kafka/protocol/offset_for_leader_epoch.h" #include "kafka/protocol/produce.h" #include "kafka/protocol/request_reader.h" #include "kafka/server/handlers/produce.h" @@ -178,3 +179,104 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) { .get(), kafka::client::kafka_request_disconnected_exception); } + +// TODO: move producer utilities somewhere else and give this test a proper +// home. +FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { + producer = std::make_unique( + make_kafka_client().get0()); + producer->connect().get0(); + model::topic_namespace tp_ns(model::ns("kafka"), test_topic); + add_topic(tp_ns).get0(); + model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0)); + tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] { + auto shard = app.shard_table.local().shard_for(ntp); + if (!shard) { + return ss::make_ready_future(false); + } + return app.partition_manager.invoke_on( + *shard, [ntp](cluster::partition_manager& pm) { + return pm.get(ntp)->is_leader(); + }); + }).get0(); + auto shard = app.shard_table.local().shard_for(ntp); + for (int i = 0; i < 3; i++) { + // Refresh leadership. + app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto raft = mgr.get(ntp)->raft(); + raft->step_down("force_step_down").get(); + tests::cooperative_spin_wait_with_timeout(10s, [raft] { + return raft->is_leader(); + }).get0(); + }) + .get(); + app.partition_manager + .invoke_on( + *shard, + [this, ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + produce([this](size_t cnt) { + return small_batches(cnt); + }).get0(); + }) + .get(); + } + // Prefix truncate the log so the beginning of the log moves forward. + app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + storage::truncate_prefix_config cfg( + model::offset(1), ss::default_priority_class()); + partition->log().truncate_prefix(cfg).get(); + }) + .get(); + + // Make a request getting the offset from a term below the start of the + // log. + auto client = make_kafka_client().get0(); + client.connect().get(); + auto current_term = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + return mgr.get(ntp)->raft()->term(); + }) + .get(); + kafka::offset_for_leader_epoch_request req; + kafka::offset_for_leader_topic t{ + test_topic, + {{model::partition_id(0), + kafka::leader_epoch(current_term()), + kafka::leader_epoch(0)}}, + {}, + }; + req.data.topics.emplace_back(std::move(t)); + auto resp = client.dispatch(req, kafka::api_version(2)).get0(); + client.stop().then([&client] { client.shutdown(); }).get(); + BOOST_REQUIRE_EQUAL(1, resp.data.topics.size()); + const auto& topic_resp = resp.data.topics[0]; + BOOST_REQUIRE_EQUAL(1, topic_resp.partitions.size()); + const auto& partition_resp = topic_resp.partitions[0]; + + BOOST_REQUIRE_NE(partition_resp.end_offset, model::offset(-1)); + + // Check that the returned offset is the start of the log, since the + // requested term has been truncated. + auto earliest_kafka_offset + = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + auto start_offset = partition->log().offsets().start_offset; + return partition->get_offset_translator_state() + ->from_log_offset(start_offset); + }) + .get(); + BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset); +} diff --git a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py index 55ee4fc4a7afb..dfec8e7df4e74 100644 --- a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py +++ b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py @@ -9,6 +9,7 @@ from math import fabs from rptest.services.cluster import cluster +from ducktape.mark import parametrize from ducktape.utils.util import wait_until from rptest.clients.kcl import KCL @@ -65,13 +66,15 @@ def alter_and_verify(): wait_until(alter_and_verify, 15, 0.5) @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) - def test_querying_remote_partitions(self): + @parametrize(remote_reads=[False, True]) + def test_querying_remote_partitions(self, remote_reads): topic = TopicSpec(redpanda_remote_read=True, redpanda_remote_write=True) epoch_offsets = {} rpk = RpkTool(self.redpanda) self.client().create_topic(topic) - rpk.alter_topic_config(topic.name, "redpanda.remote.read", 'true') + rpk.alter_topic_config(topic.name, "redpanda.remote.read", + str(remote_reads)) rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true') def wait_for_topic(): @@ -111,4 +114,11 @@ def wait_for_topic(): self.logger.info( f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}" ) - assert epoch_end_offset == offset + if remote_reads: + assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}" + else: + # Check that the returned offset isn't an invalid (-1) value, + # even if we read from an epoch that has been truncated locally + # and we can't read from cloud storage. + assert epoch_end_offset != -1, f"{epoch_end_offset} vs -1" + assert epoch_end_offset >= offset, f"{epoch_end_offset} vs {offset}"