Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Improve nodeos make-index speeds #7607

Merged
295 changes: 266 additions & 29 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/chain/block_log.hpp>
#include <eosio/chain/exceptions.hpp>
#include <fstream>
#include <fc/bitutil.hpp>
#include <fc/io/raw.hpp>

#define LOG_READ (std::ios::in | std::ios::binary)
Expand All @@ -24,6 +25,8 @@ namespace eosio { namespace chain {
const uint32_t block_log::max_supported_version = 2;

namespace detail {
using unique_file = std::unique_ptr<FILE, decltype(&fclose)>;

class block_log_impl {
public:
signed_block_ptr head;
Expand Down Expand Up @@ -68,6 +71,60 @@ namespace eosio { namespace chain {

open_files = true;
}

class reverse_iterator {
public:
reverse_iterator();
// open a block log file and return the total number of blocks in it
uint32_t open(const fc::path& block_file_name);
uint64_t previous();
uint32_t version() const { return _version; }
uint32_t first_block_num() const { return _first_block_num; }
private:
void update_buffer();

unique_file _file;
uint32_t _version = 0;
uint32_t _first_block_num = 0;
uint32_t _last_block_num = 0;
uint32_t _blocks_found = 0;
uint32_t _blocks_expected = 0;
uint64_t _current_position_in_file = 0;
uint64_t _eof_position_in_file = 0;
uint64_t _end_of_buffer_position = _unset_position;
uint64_t _start_of_buffer_position = 0;
std::unique_ptr<char[]> _buffer_ptr;
std::string _block_file_name;
constexpr static int64_t _unset_position = -1;
constexpr static uint32_t _buf_len = 1U << 24;
constexpr static uint64_t _position_size = sizeof(_current_position_in_file);
constexpr static int _blknum_offset_from_pos = 14; //offset from start of block to 4 byte block number, valid for the only allowed versions (1 & 2)
};

constexpr uint64_t buffer_location_to_file_location(uint32_t buffer_location) { return buffer_location << 3; }
constexpr uint32_t file_location_to_buffer_location(uint32_t file_location) { return file_location >> 3; }

class index_writer {
public:
index_writer(const fc::path& block_index_name, uint32_t blocks_expected);
void write(uint64_t pos);
void complete();
void update_buffer_position();
private:
void prepare_buffer();
bool shift_buffer();

unique_file _file;
const std::string _block_index_name;
const uint32_t _blocks_expected;
uint32_t _block_written;
std::unique_ptr<uint64_t[]> _buffer_ptr;
int64_t _current_position = 0;
int64_t _start_of_buffer_position = 0;
int64_t _end_of_buffer_position = 0;
constexpr static uint64_t _buffer_bytes = 1U << 22;
constexpr static uint64_t _max_buffer_length = file_location_to_buffer_location(_buffer_bytes);
};
}

block_log::block_log(const fc::path& data_dir)
Expand Down Expand Up @@ -317,44 +374,38 @@ namespace eosio { namespace chain {

my->reopen();

uint64_t end_pos;

my->block_stream.seekg(-sizeof( uint64_t), std::ios::end);
my->block_stream.read((char*)&end_pos, sizeof(end_pos));
my->close();

if( end_pos == npos ) {
ilog( "Block log contains no blocks. No need to construct index." );
return;
}
block_log::construct_index(my->block_file, my->index_file);

my->reopen();
} // construct_index

signed_block tmp;
void block_log::construct_index(const fc::path& block_file_name, const fc::path& index_file_name) {
detail::reverse_iterator block_log_iter;

uint64_t pos = 0;
if (my->version == 1) {
pos = 4; // Skip version which should have already been checked.
} else {
pos = 8; // Skip version and first block offset which should have already been checked
}
my->block_stream.seekg(pos);
ilog("Will read existing blocks.log file ${file}", ("file", block_file_name.generic_string()));
ilog("Will write new blocks.index file ${file}", ("file", index_file_name.generic_string()));

genesis_state gs;
fc::raw::unpack(my->block_stream, gs);
const uint32_t num_blocks = block_log_iter.open(block_file_name);

ilog("block log version= ${version}", ("version", block_log_iter.version()));

// skip the totem
if (my->version > 1) {
uint64_t totem;
my->block_stream.read((char*) &totem, sizeof(totem));
if (num_blocks == 0) {
return;
}

my->index_stream.seekp(0, std::ios::end);
while( pos < end_pos ) {
fc::raw::unpack(my->block_stream, tmp);
my->block_stream.read((char*)&pos, sizeof(pos));
if(tmp.block_num() % 1000 == 0)
ilog( "Block log index reconstructed for block ${n}", ("n", tmp.block_num()));
my->index_stream.write((char*)&pos, sizeof(pos));
ilog("first block= ${first} last block= ${last}",
("first", block_log_iter.first_block_num())("last", (block_log_iter.first_block_num() + num_blocks)));

detail::index_writer index(index_file_name, num_blocks);
uint64_t position;
while ((position = block_log_iter.previous()) != npos) {
index.write(position);
}
} // construct_index
index.complete();
}

fc::path block_log::repair_log( const fc::path& data_dir, uint32_t truncate_at_block ) {
ilog("Recovering Block Log...");
Expand Down Expand Up @@ -543,4 +594,190 @@ namespace eosio { namespace chain {
return gs;
}

detail::reverse_iterator::reverse_iterator()
: _file(nullptr, &fclose)
, _buffer_ptr(std::make_unique<char[]>(_buf_len)) {
}

uint32_t detail::reverse_iterator::open(const fc::path& block_file_name) {
_block_file_name = block_file_name.generic_string();
_file.reset(fopen(_block_file_name.c_str(), "r"));
EOS_ASSERT( _file, block_log_exception, "Could not open Block log file at '${blocks_log}'", ("blocks_log", _block_file_name) );
_end_of_buffer_position = _unset_position;

//read block log to see if version 1 or 2 and get first blocknum (implicit 1 if version 1)
_version = 0;
auto size = fread((char*)&_version, sizeof(_version), 1, _file.get());
EOS_ASSERT( size == 1, block_log_exception, "Block log file at '${blocks_log}' could not be read.", ("file", _block_file_name) );
EOS_ASSERT( _version == 1 || _version == 2, block_log_unsupported_version, "block log version ${v} is not supported", ("v", _version));
if (_version == 1) {
_first_block_num = 1;
}
else {
size = fread((char*)&_first_block_num, sizeof(_first_block_num), 1, _file.get());
EOS_ASSERT( size == 1, block_log_exception, "Block log file at '${blocks_log}' not formatted consistently with version ${v}.", ("file", _block_file_name)("v", _version) );
}

auto status = fseek(_file.get(), 0, SEEK_END);
EOS_ASSERT( status == 0, block_log_exception, "Could not open Block log file at '${blocks_log}'. Returned status: ${status}", ("blocks_log", _block_file_name)("status", status) );

_eof_position_in_file = ftell(_file.get());
EOS_ASSERT( _eof_position_in_file > 0, block_log_exception, "Block log file at '${blocks_log}' could not be read.", ("blocks_log", _block_file_name) );
_current_position_in_file = _eof_position_in_file - _position_size;

update_buffer();

_blocks_found = 0;
char* buf = _buffer_ptr.get();
const uint32_t index_of_pos = _current_position_in_file - _start_of_buffer_position;
const uint64_t block_pos = *reinterpret_cast<uint64_t*>(buf + index_of_pos);

if (block_pos == block_log::npos) {
return 0;
}

uint32_t bnum = 0;
if (block_pos >= _start_of_buffer_position) {
const uint32_t index_of_block = block_pos - _start_of_buffer_position;
bnum = *reinterpret_cast<uint32_t*>(buf + index_of_block + _blknum_offset_from_pos); //block number of previous block (is big endian)
}
else {
const auto blknum_offset_pos = block_pos + _blknum_offset_from_pos;
auto status = fseek(_file.get(), blknum_offset_pos, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not seek in '${blocks_log}' to position: ${pos}. Returned status: ${status}", ("blocks_log", _block_file_name)("pos", blknum_offset_pos)("status", status) );
auto size = fread((void*)&bnum, sizeof(bnum), 1, _file.get());
EOS_ASSERT( size == 1, block_log_exception, "Could not read in '${blocks_log}' at position: ${pos}", ("blocks_log", _block_file_name)("pos", blknum_offset_pos) );
}
_last_block_num = fc::endian_reverse_u32(bnum) + 1; //convert from big endian to little endian and add 1
_blocks_expected = _last_block_num - _first_block_num + 1;
return _blocks_expected;
}

uint64_t detail::reverse_iterator::previous() {
EOS_ASSERT( _current_position_in_file != block_log::npos,
block_log_exception,
"Block log file at '${blocks_log}' first block already returned by former call to previous(), it is no longer valid to call this function.", ("blocks_log", _block_file_name) );

if (_start_of_buffer_position > _current_position_in_file) {
update_buffer();
}

char* buf = _buffer_ptr.get();
auto offset = _current_position_in_file - _start_of_buffer_position;
uint64_t block_location_in_file = *reinterpret_cast<uint64_t*>(buf + offset);

++_blocks_found;
if (block_location_in_file == block_log::npos) {
_current_position_in_file = block_location_in_file;
EOS_ASSERT( _blocks_found != _blocks_expected,
block_log_exception,
"Block log file at '${blocks_log}' formatting indicated last block: ${last_block_num}, first block: ${first_block_num}, but found ${num} blocks",
("blocks_log", _block_file_name)("last_block_num", _last_block_num)("first_block_num", _first_block_num)("num", _blocks_found) );
}
else {
const uint64_t previous_position_in_file = _current_position_in_file;
_current_position_in_file = block_location_in_file - _position_size;
EOS_ASSERT( _current_position_in_file < previous_position_in_file,
block_log_exception,
"Block log file at '${blocks_log}' formatting is incorrect, indicates position later location in file: ${pos}, which was retrieved at: ${orig_pos}.",
("blocks_log", _block_file_name)("pos", _current_position_in_file)("orig_pos", previous_position_in_file) );
if (_version == 1 && _blocks_found != _blocks_expected) {
_current_position_in_file = block_location_in_file = block_log::npos;
}
}

return block_location_in_file;
}

void detail::reverse_iterator::update_buffer() {
EOS_ASSERT( _current_position_in_file != block_log::npos, block_log_exception, "Block log file not setup properly" );

// since we need to read in a new section, just need to ensure the next position is at the very end of the buffer
_end_of_buffer_position = _current_position_in_file + _position_size;
if (_end_of_buffer_position < _buf_len) {
_start_of_buffer_position = 0;
}
else {
_start_of_buffer_position = _end_of_buffer_position - _buf_len;
}

auto status = fseek(_file.get(), _start_of_buffer_position, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not seek in '${blocks_log}' to position: ${pos}. Returned status: ${status}", ("blocks_log", _block_file_name)("pos", _start_of_buffer_position)("status", status) );
char* buf = _buffer_ptr.get();
auto size = fread((void*)buf, (_end_of_buffer_position - _start_of_buffer_position), 1, _file.get());//read tail of blocks.log file into buf
EOS_ASSERT( size == 1, block_log_exception, "blocks.log read fails" );
}

detail::index_writer::index_writer(const fc::path& block_index_name, uint32_t blocks_expected)
: _file(nullptr, &fclose)
, _block_index_name(block_index_name.generic_string())
, _blocks_expected(blocks_expected)
, _block_written(blocks_expected)
, _buffer_ptr(std::make_unique<uint64_t[]>(_max_buffer_length)) {
}

void detail::index_writer::write(uint64_t pos) {
prepare_buffer();
uint64_t* buffer = _buffer_ptr.get();
buffer[_current_position - _start_of_buffer_position] = pos;
--_current_position;
if ((_block_written & 0xfffff) == 0) { //periodically print a progress indicator
dlog("block: ${block_written} position in file: ${pos}", ("block_written", _block_written)("pos",pos));
}
--_block_written;
}

void detail::index_writer::prepare_buffer() {
if (_file == nullptr) {
_file.reset(fopen(_block_index_name.c_str(), "w"));
EOS_ASSERT( _file, block_log_exception, "Could not open Block index file at '${blocks_index}'", ("blocks_index", _block_index_name) );
// allocate 8 bytes for each block position to store
const auto full_file_size = buffer_location_to_file_location(_blocks_expected);
auto status = fseek(_file.get(), full_file_size, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not allocate in '${blocks_index}' storage for all the blocks, size: ${size}. Returned status: ${status}", ("blocks_index", _block_index_name)("size", full_file_size)("status", status) );
const auto block_end = file_location_to_buffer_location(full_file_size);
_current_position = block_end - 1;
update_buffer_position();
}

shift_buffer();
}

bool detail::index_writer::shift_buffer() {
if (_current_position >= _start_of_buffer_position) {
return false;
}

const auto file_location_start = buffer_location_to_file_location(_start_of_buffer_position);

auto status = fseek(_file.get(), file_location_start, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not navigate in '${blocks_index}' file_location_start: ${loc}, _start_of_buffer_position: ${_start_of_buffer_position}. Returned status: ${status}", ("blocks_index", _block_index_name)("loc", file_location_start)("_start_of_buffer_position",_start_of_buffer_position)("status", status) );

const auto buffer_size = _end_of_buffer_position - _start_of_buffer_position;
const auto file_size = buffer_location_to_file_location(buffer_size);
uint64_t* buf = _buffer_ptr.get();
auto size = fwrite((void*)buf, file_size, 1, _file.get());
EOS_ASSERT( size == 1, block_log_exception, "Writing Block Index file '${file}' failed at location: ${loc}", ("file", _block_index_name)("loc", file_location_start) );
update_buffer_position();
return true;
}

void detail::index_writer::complete() {
const bool shifted = shift_buffer();
EOS_ASSERT(shifted, block_log_exception, "Failed to write buffer to '${blocks_index}'", ("blocks_index", _block_index_name) );
EOS_ASSERT(_current_position == -1,
block_log_exception,
"Should have written buffer, starting at the 0 index block position, to '${blocks_index}' but instead writing ${pos} position",
("blocks_index", _block_index_name)("pos", _current_position) );
}

void detail::index_writer::update_buffer_position() {
_end_of_buffer_position = _current_position + 1;
if (_end_of_buffer_position < _max_buffer_length) {
_start_of_buffer_position = 0;
}
else {
_start_of_buffer_position = _end_of_buffer_position - _max_buffer_length;
}
}
} } /// eosio::chain
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ namespace eosio { namespace chain {

static genesis_state extract_genesis_state( const fc::path& data_dir );

static void construct_index(const fc::path& block_file_name, const fc::path& index_file_name);

private:
void open(const fc::path& data_dir);
void construct_index();
Expand Down
Loading