Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #7763 from EOSIO/state-history-cfile
Browse files Browse the repository at this point in the history
Use fc::cfile instead of std::fstream for state_history
  • Loading branch information
heifner authored Aug 15, 2019
2 parents 725d4f0 + 33b93bd commit 8d84a95
Showing 1 changed file with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/types.hpp>
#include <fc/log/logger.hpp>
#include <fc/io/cfile.hpp>

namespace eosio {

Expand Down Expand Up @@ -47,8 +48,8 @@ class state_history_log {
const char* const name = "";
std::string log_filename;
std::string index_filename;
std::fstream log;
std::fstream index;
fc::cfile log;
fc::cfile index;
uint32_t _begin_block = 0;
uint32_t _end_block = 0;
chain::block_id_type last_block_id;
Expand Down Expand Up @@ -104,28 +105,28 @@ class state_history_log {

if (block_num < _end_block)
truncate(block_num);
log.seekg(0, std::ios_base::end);
uint64_t pos = log.tellg();
log.seek_end(0);
uint64_t pos = log.tellp();
write_header(header);
write_payload(log);
uint64_t end = log.tellg();
uint64_t end = log.tellp();
EOS_ASSERT(end == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
"wrote payload with incorrect size to ${name}.log", ("name", name));
log.write((char*)&pos, sizeof(pos));

index.seekg(0, std::ios_base::end);
index.seek_end(0);
index.write((char*)&pos, sizeof(pos));
if (_begin_block == _end_block)
_begin_block = block_num;
_end_block = block_num + 1;
last_block_id = header.block_id;
}

// returns stream positioned at payload
std::fstream& get_entry(uint32_t block_num, state_history_log_header& header) {
// returns cfile positioned at payload
fc::cfile& get_entry(uint32_t block_num, state_history_log_header& header) {
EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
"read non-existing block in ${name}.log", ("name", name));
log.seekg(get_pos(block_num));
log.seek(get_pos(block_num));
read_header(header);
return log;
}
Expand All @@ -140,13 +141,13 @@ class state_history_log {
bool get_last_block(uint64_t size) {
state_history_log_header header;
uint64_t suffix;
log.seekg(size - sizeof(suffix));
log.seek(size - sizeof(suffix));
log.read((char*)&suffix, sizeof(suffix));
if (suffix > size || suffix + state_history_log_header_serial_size > size) {
elog("corrupt ${name}.log (2)", ("name", name));
return false;
}
log.seekg(suffix);
log.seek(suffix);
read_header(header, false);
if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) ||
suffix + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) != size) {
Expand All @@ -170,7 +171,7 @@ class state_history_log {
state_history_log_header header;
if (pos + state_history_log_header_serial_size > size)
break;
log.seekg(pos);
log.seek(pos);
read_header(header, false);
uint64_t suffix;
if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) || header.payload_size > size ||
Expand All @@ -179,7 +180,7 @@ class state_history_log {
"${name}.log has an unsupported version", ("name", name));
break;
}
log.seekg(pos + state_history_log_header_serial_size + header.payload_size);
log.seek(pos + state_history_log_header_serial_size + header.payload_size);
log.read((char*)&suffix, sizeof(suffix));
if (suffix != pos)
break;
Expand All @@ -191,17 +192,18 @@ class state_history_log {
}
log.flush();
boost::filesystem::resize_file(log_filename, pos);
log.sync();
log.flush();
EOS_ASSERT(get_last_block(pos), chain::plugin_exception, "recover ${name}.log failed", ("name", name));
}

void open_log() {
log.open(log_filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
log.seekg(0, std::ios_base::end);
uint64_t size = log.tellg();
log.set_file_path( log_filename );
log.open( "a+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app
log.seek_end(0);
uint64_t size = log.tellp();
if (size >= state_history_log_header_serial_size) {
state_history_log_header header;
log.seekg(0);
log.seek(0);
read_header(header, false);
EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic) &&
state_history_log_header_serial_size + header.payload_size + sizeof(uint64_t) <= size,
Expand All @@ -218,30 +220,31 @@ class state_history_log {
}

void open_index() {
index.open(index_filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
index.seekg(0, std::ios_base::end);
if (index.tellg() == (static_cast<int>(_end_block) - _begin_block) * sizeof(uint64_t))
index.set_file_path( index_filename );
index.open( "a+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app
index.seek_end(0);
if (index.tellp() == (static_cast<int>(_end_block) - _begin_block) * sizeof(uint64_t))
return;
ilog("Regenerate ${name}.index", ("name", name));
index.close();
index.open(index_filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc);
index.open( "w+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc

log.seekg(0, std::ios_base::end);
uint64_t size = log.tellg();
log.seek_end(0);
uint64_t size = log.tellp();
uint64_t pos = 0;
uint32_t num_found = 0;
while (pos < size) {
state_history_log_header header;
EOS_ASSERT(pos + state_history_log_header_serial_size <= size, chain::plugin_exception,
"corrupt ${name}.log (6)", ("name", name));
log.seekg(pos);
log.seek(pos);
read_header(header, false);
uint64_t suffix_pos = pos + state_history_log_header_serial_size + header.payload_size;
uint64_t suffix;
EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic) &&
suffix_pos + sizeof(suffix) <= size,
chain::plugin_exception, "corrupt ${name}.log (7)", ("name", name));
log.seekg(suffix_pos);
log.seek(suffix_pos);
log.read((char*)&suffix, sizeof(suffix));
// ilog("block ${b} at ${pos}-${end} suffix=${suffix} file_size=${fs}",
// ("b", header.block_num)("pos", pos)("end", suffix_pos + sizeof(suffix))("suffix", suffix)("fs", size));
Expand All @@ -258,7 +261,7 @@ class state_history_log {

uint64_t get_pos(uint32_t block_num) {
uint64_t pos;
index.seekg((block_num - _begin_block) * sizeof(pos));
index.seek((block_num - _begin_block) * sizeof(pos));
index.read((char*)&pos, sizeof(pos));
return pos;
}
Expand All @@ -269,22 +272,22 @@ class state_history_log {
uint64_t num_removed = 0;
if (block_num <= _begin_block) {
num_removed = _end_block - _begin_block;
log.seekg(0);
index.seekg(0);
log.seek(0);
index.seek(0);
boost::filesystem::resize_file(log_filename, 0);
boost::filesystem::resize_file(index_filename, 0);
_begin_block = _end_block = 0;
} else {
num_removed = _end_block - block_num;
uint64_t pos = get_pos(block_num);
log.seekg(0);
index.seekg(0);
log.seek(0);
index.seek(0);
boost::filesystem::resize_file(log_filename, pos);
boost::filesystem::resize_file(index_filename, (block_num - _begin_block) * sizeof(uint64_t));
_end_block = block_num;
}
log.sync();
index.sync();
log.flush();
index.flush();
ilog("fork or replay: removed ${n} blocks from ${name}.log", ("n", num_removed)("name", name));
}
}; // state_history_log
Expand Down

0 comments on commit 8d84a95

Please sign in to comment.