From 621ae081dae7da9e0abeaad492271c65edd27b5f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 09:57:21 +0100 Subject: [PATCH 1/7] s/parser: migrated verify_read_iobuf to coroutines Signed-off-by: Michal Maslanka --- src/v/storage/parser.cc | 46 ++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/v/storage/parser.cc b/src/v/storage/parser.cc index 808b848c568cb..9bcd84e579b50 100644 --- a/src/v/storage/parser.cc +++ b/src/v/storage/parser.cc @@ -84,30 +84,28 @@ static ss::future> verify_read_iobuf( size_t expected, ss::sstring msg, bool recover = false) { - return read_iobuf_exactly(in, expected) - .then([msg = std::move(msg), expected, recover](iobuf b) { - if (likely(b.size_bytes() == expected)) { - return ss::make_ready_future>(std::move(b)); - } - if (!recover) { - stlog.error( - "cannot continue parsing. recived size:{} bytes, expected:{} " - "bytes. context:{}", - b.size_bytes(), - expected, - msg); - } else { - stlog.debug( - "recovery ended with short read. recived size:{} bytes, " - "expected:{} " - "bytes. context:{}", - b.size_bytes(), - expected, - msg); - } - return ss::make_ready_future>( - parser_errc::input_stream_not_enough_bytes); - }); + auto b = co_await read_iobuf_exactly(in, expected); + + if (likely(b.size_bytes() == expected)) { + co_return b; + } + if (!recover) { + stlog.error( + "cannot continue parsing. recived size:{} bytes, expected:{} " + "bytes. context:{}", + b.size_bytes(), + expected, + msg); + } else { + stlog.debug( + "recovery ended with short read. recived size:{} bytes, " + "expected:{} " + "bytes. context:{}", + b.size_bytes(), + expected, + msg); + } + co_return parser_errc::input_stream_not_enough_bytes; } ss::future> continuous_batch_parser::consume_header() { From a1cb5791613c79e1a5c8f63fa0eee82e90c51a6f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 10:05:48 +0100 Subject: [PATCH 2/7] s/parser: do not copy context message Signed-off-by: Michal Maslanka --- src/v/storage/parser.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v/storage/parser.cc b/src/v/storage/parser.cc index 9bcd84e579b50..9115a0c22c932 100644 --- a/src/v/storage/parser.cc +++ b/src/v/storage/parser.cc @@ -78,11 +78,12 @@ static model::record_batch_header header_from_iobuf(iobuf b) { hdr.ctx.owner_shard = ss::this_shard_id(); return hdr; } - +// make sure that `msg` parameter is a static string or it is not removed before +// this function finishes static ss::future> verify_read_iobuf( ss::input_stream& in, size_t expected, - ss::sstring msg, + const char* msg, bool recover = false) { auto b = co_await read_iobuf_exactly(in, expected); From 17f6a8782ab5de6990768d551753066b9a7a7eb8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 10:00:34 +0100 Subject: [PATCH 3/7] s/parser: refactored short read log messages Signed-off-by: Michal Maslanka --- src/v/storage/parser.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/v/storage/parser.cc b/src/v/storage/parser.cc index 9115a0c22c932..07a993139bc29 100644 --- a/src/v/storage/parser.cc +++ b/src/v/storage/parser.cc @@ -91,19 +91,20 @@ static ss::future> verify_read_iobuf( co_return b; } if (!recover) { - stlog.error( - "cannot continue parsing. recived size:{} bytes, expected:{} " - "bytes. context:{}", - b.size_bytes(), + vlog( + stlog.error, + "Stopping parser, short read. Expected to read {} bytes, but read {} " + "bytes. context: {}", expected, + b.size_bytes(), msg); } else { - stlog.debug( - "recovery ended with short read. recived size:{} bytes, " - "expected:{} " - "bytes. context:{}", - b.size_bytes(), + vlog( + stlog.debug, + "Recovery ended with short read. Expected to read {} bytes, but read " + "{} bytes. context: {}", expected, + b.size_bytes(), msg); } co_return parser_errc::input_stream_not_enough_bytes; From 60093dfaa86f0df33d4fbb7b3a00df18a4f4f53c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 17:07:27 +0100 Subject: [PATCH 4/7] s/probe: added batch parse errors getter Signed-off-by: Michal Maslanka --- src/v/storage/probe.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/v/storage/probe.h b/src/v/storage/probe.h index 50a3c95ad2530..c22034d6bf3fe 100644 --- a/src/v/storage/probe.h +++ b/src/v/storage/probe.h @@ -93,6 +93,8 @@ class probe { void remove_partition_bytes(size_t remove) { _partition_bytes -= remove; } void set_compaction_ratio(double r) { _compaction_ratio = r; } + int64_t get_batch_parse_errors() const { return _batch_parse_errors; } + private: uint64_t _partition_bytes = 0; uint64_t _bytes_written = 0; From 93c5e7b8eadc0849f2ffaf5b1bebafd4214c6b71 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 17:08:03 +0100 Subject: [PATCH 5/7] s/log_reader: update error probe when parser returns an error When batch parser returns an error indicating that batch read was unsuccessful the probe should be updated. Signed-off-by: Michal Maslanka --- src/v/storage/log_reader.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/v/storage/log_reader.cc b/src/v/storage/log_reader.cc index 4afe08986fb89..6468dffd8e79d 100644 --- a/src/v/storage/log_reader.cc +++ b/src/v/storage/log_reader.cc @@ -12,6 +12,7 @@ #include "bytes/iobuf.h" #include "model/record.h" #include "storage/logger.h" +#include "storage/parser_errc.h" #include "vassert.h" #include "vlog.h" @@ -312,6 +313,14 @@ log_reader::do_load_slice(model::timeout_clock::time_point timeout) { stlog.info, "stopped reading stream: {}", recs.error().message()); + + auto const batch_parse_err + = recs.error() == parser_errc::header_only_crc_missmatch + || recs.error() == parser_errc::input_stream_not_enough_bytes; + + if (batch_parse_err) { + _probe.batch_parse_error(); + } return _iterator.close().then( [] { return ss::make_ready_future(); }); } From 7a2f975be069cd214076f61128dae71424bf781e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 17:32:43 +0100 Subject: [PATCH 6/7] s/segment: actually remove inflight op that was processed The `absl::btree_map::erase` function erases all elements in range [start, end) (end is exclusive). In current implementation the iterator used to update the stable offset wasn't removed as only all the elements preceding it were removed. Fixed the problem by removing the iterator related with currently processed in-flight update. Signed-off-by: Michal Maslanka --- src/v/storage/segment.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index f2b7512fef068..e145fda1b2899 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -539,7 +539,7 @@ void segment::advance_stable_offset(size_t offset) { _reader.set_file_size(it->first); _tracker.stable_offset = it->second; - _inflight.erase(_inflight.begin(), it); + _inflight.erase(_inflight.begin(), std::next(it)); } std::ostream& operator<<(std::ostream& o, const segment::offset_tracker& t) { From 0a2815df81d7c031e092379d108f46d2bbaa84ed Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 25 Jan 2023 17:31:58 +0100 Subject: [PATCH 7/7] s/e2e_tests: added test validating correct interop between read/truncate/write Signed-off-by: Michal Maslanka --- src/v/storage/tests/storage_e2e_test.cc | 135 ++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index e0a0d42628928..45f14bdc364bb 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -13,6 +13,7 @@ #include "model/namespace.h" #include "model/record.h" #include "model/record_batch_reader.h" +#include "model/record_batch_types.h" #include "model/tests/random_batch.h" #include "model/timeout_clock.h" #include "model/timestamp.h" @@ -45,6 +46,7 @@ #include #include #include +#include storage::disk_log_impl* get_disk_log(storage::log log) { return dynamic_cast(log.get_impl()); @@ -3294,3 +3296,136 @@ FIXTURE_TEST(test_bytes_eviction_overrides, storage_test_fixture) { disk_log->size_bytes(), tc.expected_bytes_left - segment_size); } } + +FIXTURE_TEST(issue_8091, storage_test_fixture) { + /** + * Test validating concurrent reads, writes and truncations + */ + auto cfg = default_log_config(test_dir); + cfg.stype = storage::log_config::storage_type::disk; + cfg.cache = storage::with_cache::no; + cfg.max_segment_size = config::mock_binding(100_MiB); + storage::ntp_config::default_overrides overrides; + ss::abort_source as; + storage::log_manager mgr = make_log_manager(cfg); + info("config: {}", mgr.config()); + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + + auto ntp = model::ntp(model::kafka_namespace, "test", 0); + auto log + = mgr.manage(storage::ntp_config(ntp, mgr.config().base_dir)).get0(); + + int cnt = 0; + int max = 500; + mutex log_mutex; + model::offset last_truncate; + + auto produce = ss::do_until( + [&] { return cnt > max; }, + [&log, &cnt, &log_mutex] { + ss::circular_buffer batches; + auto bt = random_generators::random_choice( + std::vector{ + model::record_batch_type::raft_data, + model::record_batch_type::raft_configuration}); + + // single batch + storage::record_batch_builder builder(bt, model::offset(0)); + if (bt == model::record_batch_type::raft_data) { + builder.add_raw_kv( + reflection::to_iobuf("key"), + bytes_to_iobuf(random_generators::get_bytes(16 * 1024))); + } else { + builder.add_raw_kv( + std::nullopt, + bytes_to_iobuf(random_generators::get_bytes(128))); + } + batches.push_back(std::move(builder).build()); + + auto reader = model::make_memory_record_batch_reader( + std::move(batches)); + + storage::log_append_config cfg{ + .should_fsync = storage::log_append_config::fsync::no, + .io_priority = ss::default_priority_class(), + .timeout = model::no_timeout, + }; + info("append"); + return log_mutex + .with([reader = std::move(reader), cfg, &log]() mutable { + info("append_lock"); + return std::move(reader) + .for_each_ref(log.make_appender(cfg), cfg.timeout) + .then([](storage::append_result res) { + info("append_result: {}", res.last_offset); + }) + .then([&log] { return log.flush(); }); + }) + .finally([&cnt] { cnt++; }); + }); + + auto read = ss::do_until( + [&] { return cnt > max; }, + [&log, &last_truncate] { + auto offset = log.offsets(); + storage::log_reader_config cfg( + last_truncate - model::offset(1), + offset.dirty_offset, + ss::default_priority_class()); + cfg.type_filter = model::record_batch_type::raft_data; + + auto start = ss::steady_clock_type::now(); + return log.make_reader(cfg) + .then([start](model::record_batch_reader rdr) { + // assert that creating a reader took less than 5 seconds + BOOST_REQUIRE_LT( + (ss::steady_clock_type::now() - start) / 1ms, 5000); + return model::consume_reader_to_memory( + std::move(rdr), model::no_timeout); + }) + .then([](ss::circular_buffer batches) { + if (batches.empty()) { + info("read empty range"); + return; + } + info( + "read range: {}, {}", + batches.front().base_offset(), + batches.back().last_offset()); + }); + }); + + auto truncate = ss::do_until( + [&] { return cnt > max; }, + [&log, &log_mutex, &last_truncate] { + auto offset = log.offsets(); + if (offset.dirty_offset <= model::offset(0)) { + return ss::now(); + } + return log_mutex + .with([&log, &last_truncate] { + auto offset = log.offsets(); + info("truncate offsets: {}", offset); + auto start = ss::steady_clock_type::now(); + last_truncate = offset.dirty_offset; + return log + .truncate(storage::truncate_config( + offset.dirty_offset, ss::default_priority_class())) + .finally([start] { + // assert that truncation took less than 5 seconds + BOOST_REQUIRE_LT( + (ss::steady_clock_type::now() - start) / 1ms, 5000); + info("truncate_done"); + }); + }) + .then([] { return ss::sleep(10ms); }); + }); + + produce.get(); + read.get(); + truncate.get(); + auto disk_log = get_disk_log(log); + + // at the end of this test there must be no batch parse errors + BOOST_REQUIRE_EQUAL(disk_log->get_probe().get_batch_parse_errors(), 0); +}