Skip to content

Commit

Permalink
Merge pull request redpanda-data#8409 from mmaslankaprv/fix-8091
Browse files Browse the repository at this point in the history
Fixed removing in flight write operations when processing stable offset updates
  • Loading branch information
mmaslankaprv authored Jan 31, 2023
2 parents e728165 + 0a2815d commit 5661a28
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 27 deletions.
9 changes: 9 additions & 0 deletions src/v/storage/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "bytes/iobuf.h"
#include "model/record.h"
#include "storage/logger.h"
#include "storage/parser_errc.h"
#include "vassert.h"
#include "vlog.h"

Expand Down Expand Up @@ -312,6 +313,14 @@ log_reader::do_load_slice(model::timeout_clock::time_point timeout) {
stlog.info,
"stopped reading stream: {}",
recs.error().message());

auto const batch_parse_err
= recs.error() == parser_errc::header_only_crc_missmatch
|| recs.error() == parser_errc::input_stream_not_enough_bytes;

if (batch_parse_err) {
_probe.batch_parse_error();
}
return _iterator.close().then(
[] { return ss::make_ready_future<storage_t>(); });
}
Expand Down
52 changes: 26 additions & 26 deletions src/v/storage/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,36 +78,36 @@ static model::record_batch_header header_from_iobuf(iobuf b) {
hdr.ctx.owner_shard = ss::this_shard_id();
return hdr;
}

// make sure that `msg` parameter is a static string or it is not removed before
// this function finishes
static ss::future<result<iobuf>> verify_read_iobuf(
ss::input_stream<char>& in,
size_t expected,
ss::sstring msg,
const char* msg,
bool recover = false) {
return read_iobuf_exactly(in, expected)
.then([msg = std::move(msg), expected, recover](iobuf b) {
if (likely(b.size_bytes() == expected)) {
return ss::make_ready_future<result<iobuf>>(std::move(b));
}
if (!recover) {
stlog.error(
"cannot continue parsing. recived size:{} bytes, expected:{} "
"bytes. context:{}",
b.size_bytes(),
expected,
msg);
} else {
stlog.debug(
"recovery ended with short read. recived size:{} bytes, "
"expected:{} "
"bytes. context:{}",
b.size_bytes(),
expected,
msg);
}
return ss::make_ready_future<result<iobuf>>(
parser_errc::input_stream_not_enough_bytes);
});
auto b = co_await read_iobuf_exactly(in, expected);

if (likely(b.size_bytes() == expected)) {
co_return b;
}
if (!recover) {
vlog(
stlog.error,
"Stopping parser, short read. Expected to read {} bytes, but read {} "
"bytes. context: {}",
expected,
b.size_bytes(),
msg);
} else {
vlog(
stlog.debug,
"Recovery ended with short read. Expected to read {} bytes, but read "
"{} bytes. context: {}",
expected,
b.size_bytes(),
msg);
}
co_return parser_errc::input_stream_not_enough_bytes;
}

ss::future<result<stop_parser>> continuous_batch_parser::consume_header() {
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class probe {
void remove_partition_bytes(size_t remove) { _partition_bytes -= remove; }
void set_compaction_ratio(double r) { _compaction_ratio = r; }

int64_t get_batch_parse_errors() const { return _batch_parse_errors; }

private:
uint64_t _partition_bytes = 0;
uint64_t _bytes_written = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ void segment::advance_stable_offset(size_t offset) {

_reader.set_file_size(it->first);
_tracker.stable_offset = it->second;
_inflight.erase(_inflight.begin(), it);
_inflight.erase(_inflight.begin(), std::next(it));
}

std::ostream& operator<<(std::ostream& o, const segment::offset_tracker& t) {
Expand Down
135 changes: 135 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "model/namespace.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "model/record_batch_types.h"
#include "model/tests/random_batch.h"
#include "model/timeout_clock.h"
#include "model/timestamp.h"
Expand Down Expand Up @@ -45,6 +46,7 @@
#include <iterator>
#include <numeric>
#include <optional>
#include <vector>

storage::disk_log_impl* get_disk_log(storage::log log) {
return dynamic_cast<storage::disk_log_impl*>(log.get_impl());
Expand Down Expand Up @@ -3294,3 +3296,136 @@ FIXTURE_TEST(test_bytes_eviction_overrides, storage_test_fixture) {
disk_log->size_bytes(), tc.expected_bytes_left - segment_size);
}
}

FIXTURE_TEST(issue_8091, storage_test_fixture) {
/**
* Test validating concurrent reads, writes and truncations
*/
auto cfg = default_log_config(test_dir);
cfg.stype = storage::log_config::storage_type::disk;
cfg.cache = storage::with_cache::no;
cfg.max_segment_size = config::mock_binding<size_t>(100_MiB);
storage::ntp_config::default_overrides overrides;
ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
info("config: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });

auto ntp = model::ntp(model::kafka_namespace, "test", 0);
auto log
= mgr.manage(storage::ntp_config(ntp, mgr.config().base_dir)).get0();

int cnt = 0;
int max = 500;
mutex log_mutex;
model::offset last_truncate;

auto produce = ss::do_until(
[&] { return cnt > max; },
[&log, &cnt, &log_mutex] {
ss::circular_buffer<model::record_batch> batches;
auto bt = random_generators::random_choice(
std::vector<model::record_batch_type>{
model::record_batch_type::raft_data,
model::record_batch_type::raft_configuration});

// single batch
storage::record_batch_builder builder(bt, model::offset(0));
if (bt == model::record_batch_type::raft_data) {
builder.add_raw_kv(
reflection::to_iobuf("key"),
bytes_to_iobuf(random_generators::get_bytes(16 * 1024)));
} else {
builder.add_raw_kv(
std::nullopt,
bytes_to_iobuf(random_generators::get_bytes(128)));
}
batches.push_back(std::move(builder).build());

auto reader = model::make_memory_record_batch_reader(
std::move(batches));

storage::log_append_config cfg{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
.timeout = model::no_timeout,
};
info("append");
return log_mutex
.with([reader = std::move(reader), cfg, &log]() mutable {
info("append_lock");
return std::move(reader)
.for_each_ref(log.make_appender(cfg), cfg.timeout)
.then([](storage::append_result res) {
info("append_result: {}", res.last_offset);
})
.then([&log] { return log.flush(); });
})
.finally([&cnt] { cnt++; });
});

auto read = ss::do_until(
[&] { return cnt > max; },
[&log, &last_truncate] {
auto offset = log.offsets();
storage::log_reader_config cfg(
last_truncate - model::offset(1),
offset.dirty_offset,
ss::default_priority_class());
cfg.type_filter = model::record_batch_type::raft_data;

auto start = ss::steady_clock_type::now();
return log.make_reader(cfg)
.then([start](model::record_batch_reader rdr) {
// assert that creating a reader took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
return model::consume_reader_to_memory(
std::move(rdr), model::no_timeout);
})
.then([](ss::circular_buffer<model::record_batch> batches) {
if (batches.empty()) {
info("read empty range");
return;
}
info(
"read range: {}, {}",
batches.front().base_offset(),
batches.back().last_offset());
});
});

auto truncate = ss::do_until(
[&] { return cnt > max; },
[&log, &log_mutex, &last_truncate] {
auto offset = log.offsets();
if (offset.dirty_offset <= model::offset(0)) {
return ss::now();
}
return log_mutex
.with([&log, &last_truncate] {
auto offset = log.offsets();
info("truncate offsets: {}", offset);
auto start = ss::steady_clock_type::now();
last_truncate = offset.dirty_offset;
return log
.truncate(storage::truncate_config(
offset.dirty_offset, ss::default_priority_class()))
.finally([start] {
// assert that truncation took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
info("truncate_done");
});
})
.then([] { return ss::sleep(10ms); });
});

produce.get();
read.get();
truncate.get();
auto disk_log = get_disk_log(log);

// at the end of this test there must be no batch parse errors
BOOST_REQUIRE_EQUAL(disk_log->get_probe().get_batch_parse_errors(), 0);
}

0 comments on commit 5661a28

Please sign in to comment.