Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Improve nodeos make-index speeds #7607

Merged
305 changes: 277 additions & 28 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#include <eosio/chain/block_log.hpp>
#include <eosio/chain/exceptions.hpp>
#include <fstream>
#include <fc/bitutil.hpp>
#include <fc/io/raw.hpp>

#define LOG_READ (std::ios::in | std::ios::binary)
#define LOG_WRITE (std::ios::out | std::ios::binary | std::ios::app)
#define LOG_WRITE_NEW (std::ios::out | std::ios::binary)
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
#define LOG_RW ( std::ios::in | std::ios::out | std::ios::binary )

namespace eosio { namespace chain {
Expand Down Expand Up @@ -68,6 +70,63 @@ namespace eosio { namespace chain {

open_files = true;
}

class reverse_iterator {
public:
reverse_iterator();
~reverse_iterator();
// open a block log file and return the total number of blocks in it
uint32_t open(const fc::path& block_file_name);
uint64_t previous();
uint32_t version() const { return _version; }
uint32_t first_block_num() const { return _first_block_num; }
private:
void update_buffer();

FILE* _file = nullptr;
uint32_t _version = 0;
uint32_t _first_block_num = 0;
uint32_t _last_block_num = 0;
uint32_t _blocks_found = 0;
uint32_t _blocks_expected = 0;
uint64_t _current_position_in_file = 0;
uint64_t _eof_position_in_file = 0;
uint64_t _end_of_buffer_position = _unset_position;
uint64_t _start_of_buffer_position = 0;
std::unique_ptr<char[]> _buffer_ptr;
std::string _block_file_name;
constexpr static int64_t _unset_position = -1;
constexpr static uint32_t _buf_len = 1U << 24;
constexpr static uint64_t _position_size = sizeof(_current_position_in_file);
constexpr static int _blknum_offset_from_pos = 14; //offset from start of block to 4 byte block number, valid for the only allowed versions (1 & 2)
};

constexpr uint64_t buffer_location_to_file_location(uint32_t buffer_location) { return buffer_location << 3; }
constexpr uint32_t file_location_to_buffer_location(uint32_t file_location) { return file_location >> 3; }

class index_writer {
public:
index_writer(const fc::path& block_index_name, uint32_t blocks_expected, bool report_status);
~index_writer();
void write(uint64_t pos);
void complete();
void calculate_buffer_position();
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
private:
void prepare_buffer();
bool shift_buffer();

FILE* _file = nullptr;
const std::string _block_index_name;
const uint32_t _blocks_expected;
uint32_t _block_written;
bool _report_status;
std::unique_ptr<uint64_t[]> _buffer_ptr;
int64_t _current_position;
int64_t _start_of_buffer_position;
int64_t _end_of_buffer_position;
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
constexpr static uint64_t _buffer_bytes = 1U << 22;
constexpr static uint64_t _max_buffer_length = file_location_to_buffer_location(_buffer_bytes);
};
}

block_log::block_log(const fc::path& data_dir)
Expand Down Expand Up @@ -317,44 +376,37 @@ namespace eosio { namespace chain {

my->reopen();

uint64_t end_pos;

my->block_stream.seekg(-sizeof( uint64_t), std::ios::end);
my->block_stream.read((char*)&end_pos, sizeof(end_pos));
my->close();

if( end_pos == npos ) {
ilog( "Block log contains no blocks. No need to construct index." );
return;
}
block_log::construct_index(my->block_file, my->index_file);

signed_block tmp;
my->reopen();
} // construct_index

uint64_t pos = 0;
if (my->version == 1) {
pos = 4; // Skip version which should have already been checked.
} else {
pos = 8; // Skip version and first block offset which should have already been checked
void block_log::construct_index(const fc::path& block_file_name, const fc::path& index_file_name, bool provide_index_log_status) {
detail::reverse_iterator block_log_iter;

const uint32_t num_blocks = block_log_iter.open(block_file_name);
if (provide_index_log_status) {
std::cout << "\nblock log version= " << block_log_iter.version() << '\n';
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
}
my->block_stream.seekg(pos);

genesis_state gs;
fc::raw::unpack(my->block_stream, gs);
if (num_blocks == 0) {
return;
}

// skip the totem
if (my->version > 1) {
uint64_t totem;
my->block_stream.read((char*) &totem, sizeof(totem));
if (provide_index_log_status) {
std::cout << "\nfirst block= " << block_log_iter.first_block_num() << ", last block= " << (block_log_iter.first_block_num() + num_blocks) << "\n\n";
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
}

my->index_stream.seekp(0, std::ios::end);
while( pos < end_pos ) {
fc::raw::unpack(my->block_stream, tmp);
my->block_stream.read((char*)&pos, sizeof(pos));
if(tmp.block_num() % 1000 == 0)
ilog( "Block log index reconstructed for block ${n}", ("n", tmp.block_num()));
my->index_stream.write((char*)&pos, sizeof(pos));
detail::index_writer index(index_file_name, num_blocks, provide_index_log_status);
uint64_t position;
while ((position = block_log_iter.previous()) != npos) {
index.write(position);
}
} // construct_index
index.complete();
}

fc::path block_log::repair_log( const fc::path& data_dir, uint32_t truncate_at_block ) {
ilog("Recovering Block Log...");
Expand Down Expand Up @@ -543,4 +595,201 @@ namespace eosio { namespace chain {
return gs;
}

detail::reverse_iterator::reverse_iterator()
: _buffer_ptr(std::make_unique<char[]>(_buf_len)) {
}

detail::reverse_iterator::~reverse_iterator() {
if (_file != nullptr) {
fclose(_file);
}
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
}

uint32_t detail::reverse_iterator::open(const fc::path& block_file_name) {
_block_file_name = block_file_name.generic_string();
_file = fopen(_block_file_name.c_str(), "r");
EOS_ASSERT( _file != nullptr, block_log_exception, "Could not open Block log file at '${blocks_log}'", ("blocks_log", _block_file_name) );
_end_of_buffer_position = _unset_position;

//read block log to see if version 1 or 2 and get first blocknum (implicit 1 if version 1)
_version = 0;
auto size = fread((char*)&_version, sizeof(_version), 1, _file);
EOS_ASSERT( size == 1, block_log_exception, "Block log file at '${blocks_log}' could not be read.", ("file", _block_file_name) );
EOS_ASSERT( _version == 1 || _version == 2, block_log_unsupported_version, "block log version ${v} is not supported", ("v", _version));
if (_version == 1) {
_first_block_num = 1;
}
else {
size = fread((char*)&_first_block_num, sizeof(_first_block_num), 1, _file);
EOS_ASSERT( size == 1, block_log_exception, "Block log file at '${blocks_log}' not formatted consistently with version ${v}.", ("file", _block_file_name)("v", _version) );
}

auto status = fseek(_file, 0, SEEK_END);
EOS_ASSERT( status == 0, block_log_exception, "Could not open Block log file at '${blocks_log}'", ("blocks_log", _block_file_name) );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include status value in assert message.


_eof_position_in_file = ftell(_file);
EOS_ASSERT( _eof_position_in_file > 0, block_log_exception, "Block log file at '${blocks_log}' could not be read.", ("blocks_log", _block_file_name) );
_current_position_in_file = _eof_position_in_file - _position_size;

update_buffer();

_blocks_found = 0;
char* buf = _buffer_ptr.get();
const uint32_t index_of_pos = _current_position_in_file - _start_of_buffer_position;
const uint64_t block_pos = *reinterpret_cast<uint64_t*>(buf + index_of_pos);
uint32_t bnum;
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved

if (block_pos == block_log::npos) {
return 0;
}

if (block_pos >= _start_of_buffer_position) {
const uint32_t index_of_block = block_pos - _start_of_buffer_position;
bnum = *reinterpret_cast<uint32_t*>(buf + index_of_block + _blknum_offset_from_pos); //block number of previous block (is big endian)
}
else {
const auto blknum_offset_pos = block_pos + _blknum_offset_from_pos;
auto status = fseek(_file, blknum_offset_pos, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not seek in '${blocks_log}' to position: ${pos}", ("blocks_log", _block_file_name)("pos", blknum_offset_pos) );
auto size = fread((void*)&bnum, sizeof(bnum), 1, _file);
EOS_ASSERT( size == 1, block_log_exception, "Could not read in '${blocks_log}' at position: ${pos}", ("blocks_log", _block_file_name)("pos", blknum_offset_pos) );
}
_last_block_num = fc::endian_reverse_u32(bnum) + 1; //convert from big endian to little endian and add 1
_blocks_expected = _last_block_num - _first_block_num + 1;
return _blocks_expected;
}

uint64_t detail::reverse_iterator::previous() {
EOS_ASSERT( _current_position_in_file != block_log::npos,
block_log_exception,
"Block log file at '${blocks_log}' first block already returned by former call to previous(), it is no longer valid to call this function.", ("blocks_log", _block_file_name) );

if (_start_of_buffer_position > _current_position_in_file) {
update_buffer();
}

char* buf = _buffer_ptr.get();
auto offset = _current_position_in_file - _start_of_buffer_position;
uint64_t block_location_in_file = *reinterpret_cast<uint64_t*>(buf + offset);

++_blocks_found;
if (block_location_in_file == block_log::npos) {
_current_position_in_file = block_location_in_file;
EOS_ASSERT( _blocks_found != _blocks_expected,
block_log_exception,
"Block log file at '${blocks_log}' formatting indicated last block: ${last_block_num}, first block: ${first_block_num}, but found ${num} blocks",
("blocks_log", _block_file_name)("last_block_num", _last_block_num)("first_block_num", _first_block_num)("num", _blocks_found) );
}
else {
const uint64_t previous_position_in_file = _current_position_in_file;
_current_position_in_file = block_location_in_file - _position_size;
EOS_ASSERT( _current_position_in_file < previous_position_in_file,
block_log_exception,
"Block log file at '${blocks_log}' formatting is incorrect, indicates position later location in file: ${pos}, which was retrieved at: ${orig_pos}.",
("blocks_log", _block_file_name)("pos", _current_position_in_file)("orig_pos", previous_position_in_file) );
if (_version == 1 && _blocks_found != _blocks_expected) {
_current_position_in_file = block_location_in_file = block_log::npos;
}
}

return block_location_in_file;
}

void detail::reverse_iterator::update_buffer() {
EOS_ASSERT( _current_position_in_file != block_log::npos, block_log_exception, "Block log file not setup properly" );

// since we need to read in a new section, just need to ensure the next position is at the very end of the buffer
_end_of_buffer_position = _current_position_in_file + _position_size;
if (_end_of_buffer_position < _buf_len) {
_start_of_buffer_position = 0;
}
else {
_start_of_buffer_position = _end_of_buffer_position - _buf_len;
}

auto status = fseek(_file, _start_of_buffer_position, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not seek in '${blocks_log}' to position: ${pos}", ("blocks_log", _block_file_name)("pos", _start_of_buffer_position) );
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
char* buf = _buffer_ptr.get();
auto size = fread((void*)buf, (_end_of_buffer_position - _start_of_buffer_position), 1, _file);//read tail of blocks.log file into buf
EOS_ASSERT( size == 1, block_log_exception, "blocks.log read fails" );
}

detail::index_writer::index_writer(const fc::path& block_index_name, uint32_t blocks_expected, bool report_status)
: _block_index_name(block_index_name.generic_string())
, _blocks_expected(blocks_expected)
, _block_written(blocks_expected)
, _report_status(report_status)
, _buffer_ptr(std::make_unique<uint64_t[]>(_max_buffer_length)) {
}

detail::index_writer::~index_writer() {
if (_file != nullptr) {
fclose(_file);
}
}

void detail::index_writer::write(uint64_t pos) {
prepare_buffer();
uint64_t* buffer = _buffer_ptr.get();
buffer[_current_position - _start_of_buffer_position] = pos;
--_current_position;
if (_report_status && (_block_written & 0xfffff) == 0) { //periodically print a progress indicator
std::cout << "block: " << std::setw(10) << _block_written << " position in file " << std::setw(14) << pos << '\n';
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
}
--_block_written;
}

void detail::index_writer::prepare_buffer() {
if (_file == nullptr) {
_file = fopen(_block_index_name.c_str(), "w");
EOS_ASSERT( _file != nullptr, block_log_exception, "Could not open Block index file at '${blocks_index}'", ("blocks_index", _block_index_name) );
// allocate 8 bytes for each block position to store
const auto full_file_size = buffer_location_to_file_location(_blocks_expected);
auto status = fseek(_file, full_file_size, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not allocate in '${blocks_index}' storage for all the blocks, size: ${size}", ("blocks_index", _block_index_name)("size", full_file_size) );
const auto block_end = file_location_to_buffer_location(full_file_size);
_current_position = block_end - 1;
calculate_buffer_position();
}

shift_buffer();
}

bool detail::index_writer::shift_buffer() {
if (_current_position >= _start_of_buffer_position) {
return false;
}

const auto file_location_start = buffer_location_to_file_location(_start_of_buffer_position);

auto status = fseek(_file, file_location_start, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not navigate in '${blocks_index}' file_location_start: ${loc}, _start_of_buffer_position: ${_start_of_buffer_position}", ("blocks_index", _block_index_name)("loc", file_location_start)("_start_of_buffer_position",_start_of_buffer_position) );
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved

const auto buffer_size = _end_of_buffer_position - _start_of_buffer_position;
const auto file_size = buffer_location_to_file_location(buffer_size);
uint64_t* buf = _buffer_ptr.get();
auto size = fwrite((void*)buf, file_size, 1, _file);
EOS_ASSERT( size == 1, block_log_exception, "Writing Block Index file '${file}' failed at location: ${loc}", ("file", _block_index_name)("loc", file_location_start) );
calculate_buffer_position();
return true;
}

void detail::index_writer::complete() {
const bool shifted = shift_buffer();
EOS_ASSERT(shifted, block_log_exception, "Failed to write buffer to '${blocks_index}'", ("blocks_index", _block_index_name) );
EOS_ASSERT(_current_position == -1,
block_log_exception,
"Should have written buffer, starting at the 0 index block position, to '${blocks_index}' but instead writing ${pos} position",
("blocks_index", _block_index_name)("pos", _current_position) );
}

void detail::index_writer::calculate_buffer_position() {
_end_of_buffer_position = _current_position + 1;
if (_end_of_buffer_position < _max_buffer_length) {
_start_of_buffer_position = 0;
}
else {
_start_of_buffer_position = _end_of_buffer_position - _max_buffer_length;
}
}
} } /// eosio::chain
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ namespace eosio { namespace chain {

static genesis_state extract_genesis_state( const fc::path& data_dir );

static void construct_index(const fc::path& block_file_name, const fc::path& index_file_name, bool provide_index_log_status = false);

private:
void open(const fc::path& data_dir);
void construct_index();
Expand Down
13 changes: 11 additions & 2 deletions programs/eosio-blocklog/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,19 @@ int main(int argc, char** argv) {
return 0;
}
if (blog.make_index) {
bfs::path out_file = "blocks.index";
const bfs::path blocks_dir = vmap.at("blocks-dir").as<bfs::path>();
bfs::path out_file = blocks_dir / "blocks.index";
const bfs::path block_file = blocks_dir / "blocks.log";

if (vmap.count("output-file") > 0)
out_file = vmap.at("output-file").as<bfs::path>();
return make_index(vmap.at("blocks-dir").as<bfs::path>(), out_file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The make_index function can be removed, right?


report_time rt("making index");
std::cout << "\nWill read existing blocks.log file " << block_file << '\n';
std::cout << "Will write new blocks.index file " << out_file << '\n';
block_log::construct_index(block_file.generic_string(), out_file.generic_string(), true);
rt.report();
return 0;
}
//else print blocks.log as JSON
blog.initialize(vmap);
Expand Down