Skip to content

Commit

Permalink
tests: test for offset_for_leader_epoch_test
Browse files Browse the repository at this point in the history
kafka: return lowest offset if requested epoch is lower than local log

If reading from tiered storage is disabled, we would previously return
an offset -1 if the epoch queried fell below the start of our local log.

This doesn't match what happens in Kafka, which returns the next
available offset in a higher epoch. This behavior is matched by our
lookup when reading from cloud storage.

This patch updates the Kafka handler to return the start offset of our
local log if the requested term falls below the local log's beginning.
  • Loading branch information
andrwng committed Mar 7, 2023
1 parent 7f1bcfd commit 350c8ec
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 7 deletions.
13 changes: 9 additions & 4 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,27 @@ raft::replicate_stages replicated_partition::replicate(
std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
kafka::leader_epoch epoch) const {
const model::term_id term(epoch);
const auto first_local_term = _partition->get_term(
_partition->start_offset());
// found in local state
const auto first_local_offset = _partition->start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
if (term >= first_local_term) {
auto last_offset = _partition->get_term_last_offset(term);
if (last_offset) {
return _translator->from_log_offset(*last_offset);
}
}
// The requested term falls below our earliest local segment.

// Check cloud storage for a viable offset.
if (
_partition->is_remote_fetch_enabled()
&& _partition->cloud_data_available()) {
return _partition->get_cloud_term_last_offset(term);
}
return std::nullopt;
// Return the offset of this next-highest term.
return _translator->from_log_offset(first_local_offset);
}

ss::future<error_code> replicated_partition::validate_fetch_offset(
Expand Down
104 changes: 104 additions & 0 deletions src/v/kafka/server/tests/produce_consume_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "kafka/client/transport.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/offset_for_leader_epoch.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/server/handlers/produce.h"
Expand Down Expand Up @@ -529,3 +530,106 @@ FIXTURE_TEST(test_quota_balancer_config_balancer_period, prod_consume_fixture) {
abs(br - br_last - 7) <= 1,
"Expected 7±1 balancer runs, got " << br - br_last);
}

// TODO: move producer utilities somewhere else and give this test a proper
// home.
FIXTURE_TEST(test_offset_for_leader_epoch_test, prod_consume_fixture) {
producer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
producer->connect().get0();
model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
add_topic(tp_ns).get0();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}).get0();
auto shard = app.shard_table.local().shard_for(ntp);
for (int i = 0; i < 3; i++) {
// Step down.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
mgr.get(ntp)->raft()->step_down("force_step_down").get();
})
.get();
auto tout = ss::lowres_clock::now() + std::chrono::seconds(10);
app.controller->get_partition_leaders()
.local()
.wait_for_leader(ntp, tout, {})
.get();
// Become leader and write.
app.partition_manager
.invoke_on(
*shard,
[this, ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
produce([this](size_t cnt) {
return small_batches(cnt);
}).get0();
})
.get();
}
// Prefix truncate the log so the beginning of the log moves forward.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
storage::truncate_prefix_config cfg(
model::offset(1), ss::default_priority_class());
partition->log().truncate_prefix(cfg).get();
})
.get();

// Make a request getting the offset from a term below the start of the
// log.
auto client = make_kafka_client().get0();
client.connect().get();
auto current_term = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
return mgr.get(ntp)->raft()->term();
})
.get();
kafka::offset_for_leader_epoch_request req;
kafka::offset_for_leader_topic t{
test_topic,
{{model::partition_id(0),
kafka::leader_epoch(current_term()),
kafka::leader_epoch(0)}},
{},
};
req.data.topics.emplace_back(std::move(t));
auto resp = client.dispatch(req, kafka::api_version(2)).get0();
client.stop().then([&client] { client.shutdown(); }).get();
BOOST_REQUIRE_EQUAL(1, resp.data.topics.size());
const auto& topic_resp = resp.data.topics[0];
BOOST_REQUIRE_EQUAL(1, topic_resp.partitions.size());
const auto& partition_resp = topic_resp.partitions[0];

BOOST_REQUIRE_NE(partition_resp.end_offset, model::offset(-1));

// Check that the returned offset is the start of the log, since the
// requested term has been truncated.
auto earliest_kafka_offset
= app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto start_offset = partition->log().offsets().start_offset;
return partition->get_offset_translator_state()
->from_log_offset(start_offset);
})
.get();
BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset);
}
16 changes: 13 additions & 3 deletions tests/rptest/tests/offset_for_leader_epoch_archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from math import fabs
from rptest.services.cluster import cluster
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until

from rptest.clients.kcl import KCL
Expand Down Expand Up @@ -66,13 +67,15 @@ def alter_and_verify():
wait_until(alter_and_verify, 15, 0.5)

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_querying_remote_partitions(self):
@parametrize(remote_reads=[False, True])
def test_querying_remote_partitions(self, remote_reads):
topic = TopicSpec(redpanda_remote_read=True,
redpanda_remote_write=True)
epoch_offsets = {}
rpk = RpkTool(self.redpanda)
self.client().create_topic(topic)
rpk.alter_topic_config(topic.name, "redpanda.remote.read", 'true')
rpk.alter_topic_config(topic.name, "redpanda.remote.read",
str(remote_reads))
rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true')

def wait_for_topic():
Expand Down Expand Up @@ -111,4 +114,11 @@ def wait_for_topic():
self.logger.info(
f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}"
)
assert epoch_end_offset == offset
if remote_reads:
assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}"
else:
# Check that the returned offset isn't an invalid (-1) value,
# even if we read from an epoch that has been truncated locally
# and we can't read from cloud storage.
assert epoch_end_offset != -1, f"{epoch_end_offset} vs -1"
assert epoch_end_offset >= offset, f"{epoch_end_offset} vs {offset}"

0 comments on commit 350c8ec

Please sign in to comment.