Skip to content

Commit

Permalink
Merge pull request #9297 from andrwng/v22.3.x-offset-for-leader
Browse files Browse the repository at this point in the history
[v22.3.x] kafka: return lowest offset if requested epoch is lower than local log
  • Loading branch information
andrwng authored Mar 8, 2023
2 parents abd8990 + 41022b3 commit afb66bd
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class partition {

const model::ntp& ntp() const { return _raft->ntp(); }

storage::log log() const { return _raft->log(); }

ss::future<std::optional<storage::timequery_result>>
timequery(storage::timequery_config);

Expand Down
13 changes: 9 additions & 4 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,27 @@ raft::replicate_stages replicated_partition::replicate(
std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
kafka::leader_epoch epoch) const {
const model::term_id term(epoch);
const auto first_local_term = _partition->get_term(
_partition->start_offset());
// found in local state
const auto first_local_offset = _partition->start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
if (term >= first_local_term) {
auto last_offset = _partition->get_term_last_offset(term);
if (last_offset) {
return _translator->from_log_offset(*last_offset);
}
}
// The requested term falls below our earliest local segment.

// Check cloud storage for a viable offset.
if (
_partition->is_remote_fetch_enabled()
&& _partition->cloud_data_available()) {
return _partition->get_cloud_term_last_offset(term);
}
return std::nullopt;
// Return the offset of this next-highest term.
return _translator->from_log_offset(first_local_offset);
}

ss::future<error_code> replicated_partition::validate_fetch_offset(
Expand Down
102 changes: 102 additions & 0 deletions src/v/kafka/server/tests/produce_consume_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "kafka/client/transport.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/offset_for_leader_epoch.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/server/handlers/produce.h"
Expand Down Expand Up @@ -178,3 +179,104 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) {
.get(),
kafka::client::kafka_request_disconnected_exception);
}

// TODO: move producer utilities somewhere else and give this test a proper
// home.
FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) {
producer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
producer->connect().get0();
model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
add_topic(tp_ns).get0();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}).get0();
auto shard = app.shard_table.local().shard_for(ntp);
for (int i = 0; i < 3; i++) {
// Refresh leadership.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto raft = mgr.get(ntp)->raft();
raft->step_down("force_step_down").get();
tests::cooperative_spin_wait_with_timeout(10s, [raft] {
return raft->is_leader();
}).get0();
})
.get();
app.partition_manager
.invoke_on(
*shard,
[this, ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
produce([this](size_t cnt) {
return small_batches(cnt);
}).get0();
})
.get();
}
// Prefix truncate the log so the beginning of the log moves forward.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
storage::truncate_prefix_config cfg(
model::offset(1), ss::default_priority_class());
partition->log().truncate_prefix(cfg).get();
})
.get();

// Make a request getting the offset from a term below the start of the
// log.
auto client = make_kafka_client().get0();
client.connect().get();
auto current_term = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
return mgr.get(ntp)->raft()->term();
})
.get();
kafka::offset_for_leader_epoch_request req;
kafka::offset_for_leader_topic t{
test_topic,
{{model::partition_id(0),
kafka::leader_epoch(current_term()),
kafka::leader_epoch(0)}},
{},
};
req.data.topics.emplace_back(std::move(t));
auto resp = client.dispatch(req, kafka::api_version(2)).get0();
client.stop().then([&client] { client.shutdown(); }).get();
BOOST_REQUIRE_EQUAL(1, resp.data.topics.size());
const auto& topic_resp = resp.data.topics[0];
BOOST_REQUIRE_EQUAL(1, topic_resp.partitions.size());
const auto& partition_resp = topic_resp.partitions[0];

BOOST_REQUIRE_NE(partition_resp.end_offset, model::offset(-1));

// Check that the returned offset is the start of the log, since the
// requested term has been truncated.
auto earliest_kafka_offset
= app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto start_offset = partition->log().offsets().start_offset;
return partition->get_offset_translator_state()
->from_log_offset(start_offset);
})
.get();
BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset);
}
16 changes: 13 additions & 3 deletions tests/rptest/tests/offset_for_leader_epoch_archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from math import fabs
from rptest.services.cluster import cluster
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until

from rptest.clients.kcl import KCL
Expand Down Expand Up @@ -65,13 +66,15 @@ def alter_and_verify():
wait_until(alter_and_verify, 15, 0.5)

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_querying_remote_partitions(self):
@parametrize(remote_reads=[False, True])
def test_querying_remote_partitions(self, remote_reads):
topic = TopicSpec(redpanda_remote_read=True,
redpanda_remote_write=True)
epoch_offsets = {}
rpk = RpkTool(self.redpanda)
self.client().create_topic(topic)
rpk.alter_topic_config(topic.name, "redpanda.remote.read", 'true')
rpk.alter_topic_config(topic.name, "redpanda.remote.read",
str(remote_reads))
rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true')

def wait_for_topic():
Expand Down Expand Up @@ -111,4 +114,11 @@ def wait_for_topic():
self.logger.info(
f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}"
)
assert epoch_end_offset == offset
if remote_reads:
assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}"
else:
# Check that the returned offset isn't an invalid (-1) value,
# even if we read from an epoch that has been truncated locally
# and we can't read from cloud storage.
assert epoch_end_offset != -1, f"{epoch_end_offset} vs -1"
assert epoch_end_offset >= offset, f"{epoch_end_offset} vs {offset}"

0 comments on commit afb66bd

Please sign in to comment.