Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge SHiP stability fixes & log splitting & memory usage improvements #628

Merged
merged 60 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
95d99fb
reduce ship memory usage
huangminghuang Dec 15, 2022
f7c341b
Suport hole punching with lock
huangminghuang Dec 16, 2022
2fb428f
Merge branch 'main' into huangminghuang/ship-memory
huangminghuang Dec 16, 2022
ed08c5e
use regular mutex for prune mutex
huangminghuang Dec 16, 2022
2b6f6bb
address PR comments
huangminghuang Dec 20, 2022
e0aa788
fix counter overflow problem
huangminghuang Dec 20, 2022
914c0cb
add 4GB test case
huangminghuang Dec 21, 2022
e370adf
address PR comments
huangminghuang Dec 22, 2022
b90e937
Merge branch 'main' into huangminghuang/ship-memory
huangminghuang Dec 22, 2022
fb4b5d5
remove unused code
huangminghuang Dec 22, 2022
d9fd83b
Merge branch 'huangminghuang/ship-memory' of https://github.com/antel…
huangminghuang Dec 22, 2022
afbb418
move session.hpp
huangminghuang Dec 22, 2022
a33f610
add log catalog files
huangminghuang Dec 23, 2022
ec3c5ed
add ro_stream_for_block overload
huangminghuang Dec 23, 2022
1d3bc44
add ship log partition options
huangminghuang Dec 22, 2022
068ac71
fix truncate
huangminghuang Dec 26, 2022
9329fa3
fix issue #592
huangminghuang Dec 25, 2022
65163a9
fix ship threaded write
huangminghuang Dec 28, 2022
7b2efb8
remove ff
huangminghuang Dec 29, 2022
a9cea07
revert commit for threaded write
huangminghuang Dec 29, 2022
e75b2dd
GH-592 Merge GH-624 changes into split-ship-log branch.
heifner Jan 9, 2023
2970357
GH-592 Debug log for request messages.
heifner Jan 9, 2023
912cc20
Merge branch 'named-tp-except' into GH-592-ship-crash-split-ship-log
heifner Jan 11, 2023
f164461
GH-592 Start thread after listen
heifner Jan 12, 2023
baac068
GH-592 Remove unused code
heifner Jan 12, 2023
ba0f168
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
heifner Jan 16, 2023
1381a0f
GH-592 Remove unused methods
heifner Jan 16, 2023
b69a03f
GH-592 WIP Fixing threading issues with state_history_log and sessions
heifner Jan 18, 2023
9aa3ffc
GH-592 Fix bad merge issue which was causing core dump
heifner Jan 18, 2023
21bdaed
GH-592 truncate/prune only called from other public methods that alre…
heifner Jan 18, 2023
fe5e1f7
Merge branch 'main' into GH-592-ship-crash-split-ship-log
heifner Jan 19, 2023
3ed8ac7
Merge branch 'main' into GH-592-ship-crash-split-ship-log
heifner Jan 20, 2023
93173ce
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
heifner Jan 25, 2023
6fda644
GH-592 Fix spelling
heifner Jan 26, 2023
0271a56
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
heifner Jan 26, 2023
23089d6
GH-592 Move log_* files only used by state_history into state_history…
heifner Jan 26, 2023
34b8e11
GH-592 Fix shutdown issue with session_test
heifner Jan 27, 2023
f149ce6
GH-592 Remove explicit close and just let websocket destructor take c…
heifner Jan 27, 2023
33656e3
GH-592 Remove unneeded use of this->
heifner Jan 27, 2023
dc542bd
GH-592 Increase to have more than one connection to ship
heifner Jan 30, 2023
337b811
GH-592 Add missing return
heifner Jan 30, 2023
0e5cd16
Merge branch 'main' into GH-592-ship-crash-split-ship-log
heifner Jan 30, 2023
ed420c2
GH-592 Minor cleanup from peer review comments
heifner Jan 30, 2023
c137951
Merge branch 'GH-592-ship-crash-split-ship-log' of https://github.com…
heifner Jan 30, 2023
24b5333
GH-592 Move files back as they are used by block log splitting as well
heifner Jan 31, 2023
7e1ef84
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
heifner Feb 3, 2023
89963f6
address some PR comments
huangminghuang Feb 6, 2023
94cb1d6
use block_num_t consistently
huangminghuang Feb 6, 2023
ffc8716
update based on PR comment
huangminghuang Feb 6, 2023
54ff3fe
Merge branch 'main' into GH-592-ship-crash-split-ship-log
huangminghuang Feb 7, 2023
273a4a2
Merge branch 'huangminghuang/block_log' into GH-592-ship-crash-split-…
huangminghuang Feb 10, 2023
4fc996d
remove memory mapped file
huangminghuang Feb 10, 2023
86ba65b
make split log safer
huangminghuang Feb 10, 2023
bd66489
Merge branch 'huangminghuang/block_log' into GH-592-ship-crash-split-…
heifner Feb 20, 2023
d12726c
Merge branch 'huangminghuang/block_log' into GH-592-ship-crash-split-…
heifner Feb 21, 2023
63de610
remove unnecessary return
huangminghuang Feb 27, 2023
248ed21
Merge branch 'main' into GH-592-ship-crash-split-ship-log
heifner Mar 1, 2023
6bc3df8
address PR comments
huangminghuang Mar 2, 2023
6080511
GH-592 Add comment
heifner Mar 2, 2023
ef96d95
remove unnecessary using
huangminghuang Mar 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions libraries/chain/include/eosio/chain/log_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ struct log_catalog {
return nullptr;
}

template <typename ...Rest>
auto ro_stream_for_block(uint32_t block_num, Rest&& ...rest) -> std::optional<decltype( std::declval<LogData>().ro_stream_at(0, std::forward<Rest&&>(rest)...))> {
auto pos = get_block_position(block_num);
if (pos) {
return log_data.ro_stream_at(*pos, std::forward<Rest&&>(rest)...);
}
return {};
}

std::optional<block_id_type> id_for_block(uint32_t block_num) {
auto pos = get_block_position(block_num);
if (pos) {
Expand Down
7 changes: 0 additions & 7 deletions libraries/chain/include/eosio/chain/log_data_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@
namespace eosio {
namespace chain {

template <typename T>
T read_buffer(const char* buf) {
T result;
memcpy(&result, buf, sizeof(T));
return result;
}

template <typename T>
T read_data_at(fc::datastream<fc::cfile>& file, std::size_t offset) {
file.seek(offset);
Expand Down
8 changes: 5 additions & 3 deletions libraries/libfc/include/fc/io/datastream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ class datastream<size_t, void> {
};

template <typename Streambuf>
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, Streambuf>>> {
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, std::remove_reference_t<Streambuf>>>> {
private:
Streambuf buf;

using reference_type = std::add_lvalue_reference_t<Streambuf>;

public:
template <typename... Args>
datastream(Args&&... args)
Expand All @@ -120,8 +122,8 @@ class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::str
}
bool remaining() { return buf.in_avail(); }

Streambuf& storage() { return buf; }
const Streambuf& storage() const { return buf; }
reference_type storage() { return buf; }
const reference_type storage() const { return buf; }
};

template <typename Container>
Expand Down
6 changes: 4 additions & 2 deletions libraries/state_history/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ bytes zlib_compress_bytes(const bytes& in) {
return out;
}

bytes zlib_decompress(const bytes& in) {
bytes zlib_decompress(std::string_view data) {
bytes out;
bio::filtering_ostream decomp;
decomp.push(bio::zlib_decompressor());
decomp.push(bio::back_inserter(out));
bio::write(decomp, in.data(), in.size());
bio::write(decomp, data.data(), data.size());
bio::close(decomp);
return out;
}



} // namespace state_history
} // namespace eosio
150 changes: 99 additions & 51 deletions libraries/state_history/create_deltas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ bool include_delta(const chain::protocol_state_object& old, const chain::protoco
return old.activated_protocol_features != curr.activated_protocol_features;
}

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot) {
std::vector<table_delta> deltas;
void pack_deltas(boost::iostreams::filtering_ostreambuf& obuf, const chainbase::database& db, bool full_snapshot) {

fc::datastream<boost::iostreams::filtering_ostreambuf&> ds{obuf};

const auto& table_id_index = db.get_index<chain::table_id_multi_index>();
std::map<uint64_t, const chain::table_id_object*> removed_table_id;
for (auto& rem : table_id_index.last_undo_session().removed_values)
Expand All @@ -73,72 +75,118 @@ std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_
return *it->second;
};

auto pack_row = [&](auto& row) { return fc::raw::pack(make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& row) {
return fc::raw::pack(make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
auto pack_row = [&](auto& ds, auto& row) { fc::raw::pack(ds, make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& ds, auto& row) {
fc::raw::pack(ds, make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
};

auto process_table = [&](auto* name, auto& index, auto& pack_row) {
auto process_table = [&](auto& ds, auto* name, auto& index, auto& pack_row) {

auto pack_row_v0 = [&](auto& ds, bool present, auto& row) {
fc::raw::pack(ds, present);
fc::datastream<size_t> ps;
pack_row(ps, row);
fc::raw::pack(ds, fc::unsigned_int(ps.tellp()));
pack_row(ds, row);
};

if (full_snapshot) {
if (index.indices().empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& row : index.indices())
delta.rows.obj.emplace_back(true, pack_row(row));

fc::raw::pack(ds, fc::unsigned_int(0)); // table_delta = std::variant<table_delta_v0> and fc::unsigned_int struct_version
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int(index.indices().size()));
for (auto& row : index.indices()) {
pack_row_v0(ds, true, row);
}
} else {
auto undo = index.last_undo_session();
if (undo.old_values.empty() && undo.new_values.empty() && undo.removed_values.empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row))
delta.rows.obj.emplace_back(true, pack_row(row));
}
for (auto& old : undo.removed_values)
delta.rows.obj.emplace_back(false, pack_row(old));
for (auto& row : undo.new_values) {
delta.rows.obj.emplace_back(true, pack_row(row));
}

if(delta.rows.obj.empty()) {
deltas.pop_back();
size_t num_entries =
std::count_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) +
std::distance(undo.removed_values.begin(), undo.removed_values.end()) +
std::distance(undo.new_values.begin(), undo.new_values.end());

if (num_entries) {
fc::raw::pack(ds, fc::unsigned_int(0)); // table_delta = std::variant<table_delta_v0> and fc::unsigned_int struct_version
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int((uint32_t)num_entries));

for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row)) {
pack_row_v0(ds, true, row);
}
}

for (auto& old : undo.removed_values) {
pack_row_v0(ds, false, old);
}

for (auto& row : undo.new_values) {
pack_row_v0(ds, true, row);
}
}
}
};

process_table("account", db.get_index<chain::account_index>(), pack_row);
process_table("account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table("code", db.get_index<chain::code_index>(), pack_row);

process_table("contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table("contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table("contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table("contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table("contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table("contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table("contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);

process_table("global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table("generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table("protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);

process_table("permission", db.get_index<chain::permission_index>(), pack_row);
process_table("permission_link", db.get_index<chain::permission_link_index>(), pack_row);
auto has_table = [&](auto x) -> int {
auto& index = db.get_index<std::remove_pointer_t<decltype(x)>>();
if (full_snapshot) {
return !index.indices().empty();
} else {
auto undo = index.last_undo_session();
return std::find_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) != undo.old_values.end() ||
!undo.removed_values.empty() || !undo.new_values.empty();
}
};

process_table("resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table("resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table("resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
int num_tables = std::apply(
[&has_table](auto... args) { return (has_table(args) + ... ); },
std::tuple<chain::account_index*, chain::account_metadata_index*, chain::code_index*,
chain::table_id_multi_index*, chain::key_value_index*, chain::index64_index*, chain::index128_index*,
chain::index256_index*, chain::index_double_index*, chain::index_long_double_index*,
chain::global_property_multi_index*, chain::generated_transaction_multi_index*,
chain::protocol_state_multi_index*, chain::permission_index*, chain::permission_link_index*,
chain::resource_limits::resource_limits_index*, chain::resource_limits::resource_usage_index*,
chain::resource_limits::resource_limits_state_index*,
chain::resource_limits::resource_limits_config_index*>());

fc::raw::pack(ds, fc::unsigned_int(num_tables));

process_table(ds, "account", db.get_index<chain::account_index>(), pack_row);
process_table(ds, "account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table(ds, "code", db.get_index<chain::code_index>(), pack_row);

process_table(ds, "contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table(ds, "contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table(ds, "contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table(ds, "contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table(ds, "contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table(ds, "contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table(ds, "contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);

process_table(ds, "global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table(ds, "generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table(ds, "protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);

process_table(ds, "permission", db.get_index<chain::permission_index>(), pack_row);
process_table(ds, "permission_link", db.get_index<chain::permission_link_index>(), pack_row);

process_table(ds, "resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table(ds, "resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table(ds, "resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
pack_row);
process_table("resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
process_table(ds, "resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
pack_row);

return deltas;
obuf.pubsync();

}


} // namespace state_history
} // namespace eosio
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace state_history {
using chain::bytes;

bytes zlib_compress_bytes(const bytes& in);
bytes zlib_decompress(const bytes& in);
bytes zlib_decompress(std::string_view);

} // namespace state_history
} // namespace eosio
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <eosio/state_history/types.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>

namespace eosio {
namespace state_history {

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot);
void pack_deltas(boost::iostreams::filtering_ostreambuf& ds, const chainbase::database& db, bool full_snapshot);


} // namespace state_history
} // namespace eosio
Loading