Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Fix ship truncate problem with stride - 2.1 #9829

Merged
merged 3 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions libraries/chain/include/eosio/chain/log_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ struct log_catalog {
static bfs::path make_abosolute_dir(const bfs::path& base_dir, bfs::path new_dir) {
if (new_dir.is_relative())
new_dir = base_dir / new_dir;

if (!bfs::is_directory(new_dir))
bfs::create_directories(new_dir);

return new_dir;
}

Expand All @@ -78,7 +78,7 @@ struct log_catalog {

this->retained_dir = make_abosolute_dir(log_dir, retained_dir.empty() ? log_dir : retained_dir);
if (!archive_dir.empty()) {
this->archive_dir = make_abosolute_dir(log_dir, archive_dir);
this->archive_dir = make_abosolute_dir(log_dir, archive_dir);
}

for_each_file_in_dir_matches(this->retained_dir, std::string(name) + suffix_pattern, [this](bfs::path path) {
Expand Down Expand Up @@ -114,7 +114,8 @@ struct log_catalog {
}

bool index_matches_data(const bfs::path& index_path, const LogData& log) const {
if (!bfs::exists(index_path)) return false;
if (!bfs::exists(index_path))
return false;

auto num_blocks_in_index = bfs::file_size(index_path) / sizeof(uint64_t);
if (num_blocks_in_index != log.num_blocks())
Expand Down Expand Up @@ -185,10 +186,10 @@ struct log_catalog {
static void rename_if_not_exists(bfs::path old_name, bfs::path new_name) {
if (!bfs::exists(new_name)) {
bfs::rename(old_name, new_name);
}
else {
} else {
bfs::remove(old_name);
wlog("${new_name} already exists, just removing ${old_name}", ("old_name", old_name.string())("new_name", new_name.string()));
wlog("${new_name} already exists, just removing ${old_name}",
("old_name", old_name.string())("new_name", new_name.string()));
}
}

Expand Down Expand Up @@ -223,7 +224,7 @@ struct log_catalog {
bfs::remove(orig_name.replace_extension("index"));
} else {
// move the the archive dir
rename_bundle(orig_name, archive_dir/orig_name.filename() );
rename_bundle(orig_name, archive_dir / orig_name.filename());
}
}
this->collection.erase(this->collection.begin(), this->collection.begin() + items_to_erase);
Expand All @@ -234,6 +235,40 @@ struct log_catalog {
if (max_retained_files > 0)
this->collection.emplace(start_block_num, mapped_type{end_block_num, new_path});
}

/// Truncate the catalog so that the log/index bundle containing the block with \c block_num
/// would be rename to \c new_name; the log/index bundles with blocks strictly higher
/// than \c block_num would be deleted, and all the renamed/removed entries would be erased
/// from the catalog.
///
/// \return if nonzero, it's the starting block number for the log/index bundle being renamed.
uint32_t truncate(uint32_t block_num, bfs::path new_name) {
if (collection.empty())
return 0;

auto remove_files = [](typename collection_t::const_reference v) {
auto name = v.second.filename_base;
bfs::remove(name.replace_extension("log"));
bfs::remove(name.replace_extension("index"));
};

auto it = collection.upper_bound(block_num);

if (it == collection.begin() || block_num > (it - 1)->second.last_block_num) {
std::for_each(it, collection.end(), remove_files);
collection.erase(it, collection.end());
return 0;
} else {
auto truncate_it = --it;
auto name = truncate_it->second.filename_base;
bfs::rename(name.replace_extension("log"), new_name.replace_extension("log"));
bfs::rename(name.replace_extension("index"), new_name.replace_extension("index"));
std::for_each(truncate_it + 1, collection.end(), remove_files);
auto result = truncate_it->first;
collection.erase(truncate_it, collection.end());
return result;
}
}
};

} // namespace chain
Expand Down
8 changes: 4 additions & 4 deletions libraries/state_history/include/eosio/state_history/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ class state_history_log {

template <typename F>
void write_entry(state_history_log_header& header, const chain::block_id_type& prev_id, F write_payload) {
auto start_pos = write_log.tellp();

auto [block_num, start_pos] = write_entry_header(header, prev_id);
try {
auto block_num = write_entry_header(header, prev_id);
write_payload(write_log);
write_entry_position(header, start_pos, block_num);
} catch (...) {
Expand Down Expand Up @@ -176,9 +176,9 @@ class state_history_log {
void split_log();

/**
* @returns the block num
* @returns the block num and the file position
**/
block_num_type write_entry_header(const state_history_log_header& header, const chain::block_id_type& prev_id);
std::pair<block_num_type,file_position_type> write_entry_header(const state_history_log_header& header, const chain::block_id_type& prev_id);
void write_entry_position(const state_history_log_header& header, file_position_type pos, block_num_type block_num);
}; // state_history_log

Expand Down
48 changes: 31 additions & 17 deletions libraries/state_history/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void state_history_log::get_entry_header(state_history_log::block_num_type block

std::optional<chain::block_id_type> state_history_log::get_block_id(state_history_log::block_num_type block_num) {
auto result = catalog.id_for_block(block_num);
if (!result && block_num >= _begin_block && block_num < _end_block){
if (!result && block_num >= _begin_block && block_num < _end_block) {
state_history_log_header header;
get_entry_header(block_num, header);
return header.block_id;
Expand Down Expand Up @@ -187,31 +187,44 @@ state_history_log::file_position_type state_history_log::get_pos(state_history_l
void state_history_log::truncate(state_history_log::block_num_type block_num) {
write_log.flush();
index.flush();
uint64_t num_removed = 0;
auto first_block_num = catalog.empty() ? _begin_block : catalog.first_block_num();
auto new_begin_block_num = catalog.truncate(block_num, read_log.get_file_path());

if (new_begin_block_num > 0) {
// in this case, the original index/log file has been replaced from some files from the catalog, we need to
// reopen read_log, write_log and index.
index.close();
index.open("rb");
_begin_block = new_begin_block_num;
}

uint64_t num_removed;
if (block_num <= _begin_block) {
num_removed = _end_block - _begin_block;
write_log.seek(0);
index.seek(0);
num_removed = _end_block - first_block_num;
boost::filesystem::resize_file(read_log.get_file_path(), 0);
boost::filesystem::resize_file(index.get_file_path(), 0);
_begin_block = _end_block = 0;
_begin_block = _end_block = block_num;
} else {
num_removed = _end_block - block_num;
uint64_t pos = get_pos(block_num);
write_log.seek(0);
index.seek(0);
boost::filesystem::resize_file(read_log.get_file_path(), pos);
boost::filesystem::resize_file(index.get_file_path(), (block_num - _begin_block) * sizeof(uint64_t));
_end_block = block_num;
}
write_log.flush();
index.flush();

read_log.close();
read_log.open("rb");
write_log.close();
write_log.open("rb+");
index.close();
index.open("a+b");

ilog("fork or replay: removed ${n} blocks from ${name}.log", ("n", num_removed)("name", name));
}

state_history_log::block_num_type state_history_log::write_entry_header(const state_history_log_header& header,
const chain::block_id_type& prev_id) {
auto block_num = chain::block_header::num_from_id(header.block_id);
std::pair<state_history_log::block_num_type, state_history_log::file_position_type>
state_history_log::write_entry_header(const state_history_log_header& header, const chain::block_id_type& prev_id) {
block_num_type block_num = chain::block_header::num_from_id(header.block_id);
EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::state_history_exception,
"missed a block in ${name}.log", ("name", name));

Expand All @@ -231,8 +244,9 @@ state_history_log::block_num_type state_history_log::write_entry_header(const st
truncate(block_num);
}
write_log.seek_end(0);
file_position_type pos = write_log.tellp();
write_header(header);
return block_num;
return std::make_pair(block_num, pos);
}

void state_history_log::write_entry_position(const state_history_log_header& header,
Expand Down Expand Up @@ -268,8 +282,7 @@ void state_history_log::split_log() {

catalog.add(_begin_block, _end_block - 1, read_log.get_file_path().parent_path(), name);

_begin_block = 0;
_end_block = 0;
_begin_block = _end_block;

write_log.open("w+b");
read_log.open("rb");
Expand Down Expand Up @@ -346,7 +359,8 @@ chain::bytes state_history_chain_state_log::get_log_entry(block_num_type block_n
return state_history::zlib_decompress(read_log);
}

void state_history_chain_state_log::store(const chain::combined_database& db, const chain::block_state_ptr& block_state) {
void state_history_chain_state_log::store(const chain::combined_database& db,
const chain::block_state_ptr& block_state) {
bool fresh = this->begin_block() == this->end_block();
if (fresh)
ilog("Placing initial state in block ${n}", ("n", block_state->block->block_num()));
Expand Down
73 changes: 73 additions & 0 deletions unittests/state_history_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,79 @@ BOOST_AUTO_TEST_CASE(test_splitted_log) {
BOOST_CHECK(std::holds_alternative<prunable_data_type::none>(get_prunable_data_from_traces(pruned_traces, cfd_trace->id)));
}

void push_blocks( tester& from, tester& to ) {
while( to.control->fork_db_pending_head_block_num()
< from.control->fork_db_pending_head_block_num() )
{
auto fb = from.control->fetch_block_by_number( to.control->fork_db_pending_head_block_num()+1 );
to.push_block( fb );
}
}

bool test_fork(uint32_t stride, uint32_t max_retained_files) {
namespace bfs = boost::filesystem;

scoped_temp_path state_history_dir;
fc::create_directories(state_history_dir.path);

eosio::state_history_config config{
.log_dir = state_history_dir.path,
.retained_dir = "retained",
.archive_dir = "archive",
.stride = stride,
.max_retained_files = max_retained_files
};

state_history_tester chain1(config);
chain1.produce_blocks(2);

chain1.create_accounts( {"dan"_n,"sam"_n,"pam"_n} );
chain1.produce_block();
chain1.set_producers( {"dan"_n,"sam"_n,"pam"_n} );
chain1.produce_blocks(30);

tester chain2(setup_policy::none);
push_blocks(chain1, chain2);

auto fork_block_num = chain1.control->head_block_num();

chain1.produce_blocks(12);
auto create_account_traces = chain2.create_accounts( {"adam"_n} );
auto create_account_trace_id = create_account_traces[0]->id;

auto b = chain2.produce_block();
chain2.produce_blocks(11+12);

for( uint32_t start = fork_block_num + 1, end = chain2.control->head_block_num(); start <= end; ++start ) {
auto fb = chain2.control->fetch_block_by_number( start );
chain1.push_block( fb );
}
auto traces = chain1.traces_log.get_traces(b->block_num());

bool trace_found = std::find_if(traces.begin(), traces.end(), [create_account_trace_id](const auto& v) {
return std::get<eosio::state_history::transaction_trace_v0>(v).id == create_account_trace_id;
}) != traces.end();

return trace_found;
}

BOOST_AUTO_TEST_CASE(test_fork_no_stride) {
// In this case, the chain fork would NOT trunk the trace log across the stride boundary.
BOOST_CHECK(test_fork(UINT32_MAX, 10));
}
BOOST_AUTO_TEST_CASE(test_fork_with_stride1) {
// In this case, the chain fork would trunk the trace log across the stride boundary.
// However, there are still some traces remains after the truncation.
BOOST_CHECK(test_fork(10, 10));
}
BOOST_AUTO_TEST_CASE(test_fork_with_stride2) {
// In this case, the chain fork would trunk the trace log across the stride boundary.
// However, no existing trace remain after the truncation. Because we only keep a very
// short history, the create_account_trace is not available to be found. We just need
// to make sure no exception is throw.
BOOST_CHECK_NO_THROW(test_fork(5, 1));
}

BOOST_AUTO_TEST_CASE(test_corrupted_log_recovery) {
namespace bfs = boost::filesystem;

Expand Down