Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] kafka: return lowest offset if requested epoch is lower than local log #9297

Merged
merged 2 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class partition {

const model::ntp& ntp() const { return _raft->ntp(); }

storage::log log() const { return _raft->log(); }

ss::future<std::optional<storage::timequery_result>>
timequery(storage::timequery_config);

Expand Down
13 changes: 9 additions & 4 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,27 @@ raft::replicate_stages replicated_partition::replicate(
std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
kafka::leader_epoch epoch) const {
const model::term_id term(epoch);
const auto first_local_term = _partition->get_term(
_partition->start_offset());
// found in local state
const auto first_local_offset = _partition->start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
if (term >= first_local_term) {
auto last_offset = _partition->get_term_last_offset(term);
if (last_offset) {
return _translator->from_log_offset(*last_offset);
}
}
// The requested term falls below our earliest local segment.

// Check cloud storage for a viable offset.
if (
_partition->is_remote_fetch_enabled()
&& _partition->cloud_data_available()) {
return _partition->get_cloud_term_last_offset(term);
}
return std::nullopt;
// Return the offset of this next-highest term.
return _translator->from_log_offset(first_local_offset);
}

ss::future<error_code> replicated_partition::validate_fetch_offset(
Expand Down
102 changes: 102 additions & 0 deletions src/v/kafka/server/tests/produce_consume_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "kafka/client/transport.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/protocol/offset_for_leader_epoch.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/request_reader.h"
#include "kafka/server/handlers/produce.h"
Expand Down Expand Up @@ -178,3 +179,104 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) {
.get(),
kafka::client::kafka_request_disconnected_exception);
}

// TODO: move producer utilities somewhere else and give this test a proper
// home.
FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) {
producer = std::make_unique<kafka::client::transport>(
make_kafka_client().get0());
producer->connect().get0();
model::topic_namespace tp_ns(model::ns("kafka"), test_topic);
add_topic(tp_ns).get0();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
tests::cooperative_spin_wait_with_timeout(10s, [ntp, this] {
auto shard = app.shard_table.local().shard_for(ntp);
if (!shard) {
return ss::make_ready_future<bool>(false);
}
return app.partition_manager.invoke_on(
*shard, [ntp](cluster::partition_manager& pm) {
return pm.get(ntp)->is_leader();
});
}).get0();
auto shard = app.shard_table.local().shard_for(ntp);
for (int i = 0; i < 3; i++) {
// Refresh leadership.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto raft = mgr.get(ntp)->raft();
raft->step_down("force_step_down").get();
tests::cooperative_spin_wait_with_timeout(10s, [raft] {
return raft->is_leader();
}).get0();
})
.get();
app.partition_manager
.invoke_on(
*shard,
[this, ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
produce([this](size_t cnt) {
return small_batches(cnt);
}).get0();
})
.get();
}
// Prefix truncate the log so the beginning of the log moves forward.
app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
storage::truncate_prefix_config cfg(
model::offset(1), ss::default_priority_class());
partition->log().truncate_prefix(cfg).get();
})
.get();

// Make a request getting the offset from a term below the start of the
// log.
auto client = make_kafka_client().get0();
client.connect().get();
auto current_term = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
return mgr.get(ntp)->raft()->term();
})
.get();
kafka::offset_for_leader_epoch_request req;
kafka::offset_for_leader_topic t{
test_topic,
{{model::partition_id(0),
kafka::leader_epoch(current_term()),
kafka::leader_epoch(0)}},
{},
};
req.data.topics.emplace_back(std::move(t));
auto resp = client.dispatch(req, kafka::api_version(2)).get0();
client.stop().then([&client] { client.shutdown(); }).get();
BOOST_REQUIRE_EQUAL(1, resp.data.topics.size());
const auto& topic_resp = resp.data.topics[0];
BOOST_REQUIRE_EQUAL(1, topic_resp.partitions.size());
const auto& partition_resp = topic_resp.partitions[0];

BOOST_REQUIRE_NE(partition_resp.end_offset, model::offset(-1));

// Check that the returned offset is the start of the log, since the
// requested term has been truncated.
auto earliest_kafka_offset
= app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto start_offset = partition->log().offsets().start_offset;
return partition->get_offset_translator_state()
->from_log_offset(start_offset);
})
.get();
BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset);
}
16 changes: 13 additions & 3 deletions tests/rptest/tests/offset_for_leader_epoch_archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from math import fabs
from rptest.services.cluster import cluster
from ducktape.mark import parametrize
from ducktape.utils.util import wait_until

from rptest.clients.kcl import KCL
Expand Down Expand Up @@ -65,13 +66,15 @@ def alter_and_verify():
wait_until(alter_and_verify, 15, 0.5)

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_querying_remote_partitions(self):
@parametrize(remote_reads=[False, True])
def test_querying_remote_partitions(self, remote_reads):
topic = TopicSpec(redpanda_remote_read=True,
redpanda_remote_write=True)
epoch_offsets = {}
rpk = RpkTool(self.redpanda)
self.client().create_topic(topic)
rpk.alter_topic_config(topic.name, "redpanda.remote.read", 'true')
rpk.alter_topic_config(topic.name, "redpanda.remote.read",
str(remote_reads))
rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true')

def wait_for_topic():
Expand Down Expand Up @@ -111,4 +114,11 @@ def wait_for_topic():
self.logger.info(
f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}"
)
assert epoch_end_offset == offset
if remote_reads:
assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}"
else:
# Check that the returned offset isn't an invalid (-1) value,
# even if we read from an epoch that has been truncated locally
# and we can't read from cloud storage.
assert epoch_end_offset != -1, f"{epoch_end_offset} vs -1"
assert epoch_end_offset >= offset, f"{epoch_end_offset} vs {offset}"