Skip to content

Commit

Permalink
Merge pull request #2623 from bitshares/pr-2464-es-obj-del
Browse files Browse the repository at this point in the history
Delete objects from ES before loading from object database
  • Loading branch information
abitmore authored Aug 8, 2022
2 parents e2f2790 + d0ead78 commit 8e98c4c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 17 deletions.
56 changes: 42 additions & 14 deletions libraries/plugins/es_objects/es_objects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ class es_objects_plugin_impl
{ index_database( ids, action_type::deletion ); }

void index_database(const vector<object_id_type>& ids, action_type action);
void sync_db();
void remove_from_database( const object_id_type& id, const plugin_options::object_options& opt );
/// Load all data from the object database into ES
void sync_db( bool delete_before_load = false );
/// Delete one object from ES
void delete_from_database( const object_id_type& id, const plugin_options::object_options& opt );
/// Delete all objects of the specified type from ES
void delete_all_from_database( const plugin_options::object_options& opt ) const;

es_objects_plugin& _self;
plugin_options _options;
Expand Down Expand Up @@ -145,19 +149,28 @@ struct data_loader
}

template<typename ObjType>
void load( const es_objects_plugin_impl::plugin_options::object_options& opt )
void load( const es_objects_plugin_impl::plugin_options::object_options& opt,
bool force_delete = false )
{
if( !opt.enabled )
return;

// If no_delete or store_updates is true, do not delete
if( force_delete || !( opt.no_delete || opt.store_updates ) )
{
ilog( "Deleting all data in index " + my->_options.index_prefix + opt.index_name );
my->delete_all_from_database( opt );
}

ilog( "Loading data into index " + my->_options.index_prefix + opt.index_name );
db.get_index( ObjType::space_id, ObjType::type_id ).inspect_all_objects(
[this, &opt](const graphene::db::object &o) {
my->prepareTemplate( static_cast<const ObjType&>(o), opt );
});
}
};

void es_objects_plugin_impl::sync_db()
void es_objects_plugin_impl::sync_db( bool delete_before_load )
{
ilog("elasticsearch OBJECTS: loading data from the object database (chain state)");

Expand All @@ -168,13 +181,15 @@ void es_objects_plugin_impl::sync_db()

data_loader loader( this );

loader.load<account_object >( _options.accounts );
loader.load<asset_object >( _options.assets );
loader.load<asset_bitasset_data_object >( _options.asset_bitasset );
loader.load<account_balance_object >( _options.balances );
loader.load<proposal_object >( _options.proposals );
loader.load<limit_order_object >( _options.limit_orders );
loader.load<budget_record_object >( _options.budget );
loader.load<account_object >( _options.accounts, delete_before_load );
loader.load<asset_object >( _options.assets, delete_before_load );
loader.load<asset_bitasset_data_object >( _options.asset_bitasset, delete_before_load );
loader.load<account_balance_object >( _options.balances, delete_before_load );
loader.load<proposal_object >( _options.proposals, delete_before_load );
loader.load<limit_order_object >( _options.limit_orders, delete_before_load );
loader.load<budget_record_object >( _options.budget, delete_before_load );

ilog("elasticsearch OBJECTS: done loading data from the object database (chain state)");
}

void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, action_type action)
Expand Down Expand Up @@ -213,7 +228,7 @@ void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, a
continue;
const auto& opt = itr->second;
if( action_type::deletion == action )
remove_from_database( value, opt );
delete_from_database( value, opt );
else
{
switch( itr->first )
Expand Down Expand Up @@ -247,7 +262,7 @@ void es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, a

}

void es_objects_plugin_impl::remove_from_database(
void es_objects_plugin_impl::delete_from_database(
const object_id_type& id, const es_objects_plugin_impl::plugin_options::object_options& opt )
{
if( opt.no_delete )
Expand All @@ -266,6 +281,17 @@ void es_objects_plugin_impl::remove_from_database(
send_bulk_if_ready();
}

void es_objects_plugin_impl::delete_all_from_database( const plugin_options::object_options& opt ) const
{
// Note:
// 1. The _delete_by_query API deletes the data but keeps the index mapping, so the function is OK.
// Simply deleting the index is probably faster, but it requires the "delete_index" permission, and
// may probably mess up the index mapping and other existing settings.
// Don't know if there is a good way to only delete objects that do not exist in the object database.
// 2. We don't check the return value here, it's probably OK
es->query( _options.index_prefix + opt.index_name + "/_delete_by_query", R"({"query":{"match_all":{}}})" );
}

template<typename T>
void es_objects_plugin_impl::prepareTemplate(
const T& blockchain_object, const es_objects_plugin_impl::plugin_options::object_options& opt )
Expand Down Expand Up @@ -463,7 +489,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable

void es_objects_plugin::plugin_startup()
{
if( my->_options.sync_db_on_startup || 0 == database().head_block_num() )
if( 0 == database().head_block_num() )
my->sync_db( true );
else if( my->_options.sync_db_on_startup )
my->sync_db();
}

Expand Down
4 changes: 2 additions & 2 deletions tests/common/database_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ std::shared_ptr<boost::program_options::variables_map> database_fixture_base::in
fixture.app.register_plugin<graphene::es_objects::es_objects_plugin>(true);

fc::set_option( options, "es-objects-elasticsearch-url", GRAPHENE_TESTING_ES_URL );
fc::set_option( options, "es-objects-bulk-replay", uint32_t(2) );
fc::set_option( options, "es-objects-bulk-sync", uint32_t(2) );
fc::set_option( options, "es-objects-bulk-replay", uint32_t(1) );
fc::set_option( options, "es-objects-bulk-sync", uint32_t(1) );
fc::set_option( options, "es-objects-proposals", true );
fc::set_option( options, "es-objects-accounts", true );
fc::set_option( options, "es-objects-assets", true );
Expand Down
36 changes: 35 additions & 1 deletion tests/elasticsearch/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
if(delete_objects) { // all records deleted

// asset and bitasset
create_bitasset("USD", account_id_type());
asset_id_type usd_id = create_bitasset("USD", account_id_type()).id;
generate_block();

string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }";
Expand Down Expand Up @@ -268,8 +268,28 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
auto bitasset_object_id = j["hits"]["hits"][size_t(0)]["_source"]["object_id"].as_string();
BOOST_CHECK_EQUAL(bitasset_object_id, bitasset_data_id);

// create a limit order that expires at the next maintenance time
create_sell_order( account_id_type(), asset(1), asset(1, usd_id),
db.get_dynamic_global_properties().next_maintenance_time );
generate_block();

es.endpoint = es.index_prefix + "limitorder/_doc/_count";
es.query = "";
fc::wait_for( ES_WAIT_TIME, [&]() {
res = graphene::utilities::getEndPoint(es);
j = fc::json::from_string(res);
if( !j.is_object() )
return false;
const auto& obj = j.get_object();
if( obj.find("count") == obj.end() )
return false;
total = obj["count"].as_string();
return (total == "1");
});

// maintenance, for budget records
generate_blocks( db.get_dynamic_global_properties().next_maintenance_time );
generate_block();

es.endpoint = es.index_prefix + "budget/_doc/_count";
es.query = "";
Expand All @@ -285,6 +305,20 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
return (total == "1"); // new record inserted at the first maintenance block
});

es.endpoint = es.index_prefix + "limitorder/_doc/_count";
es.query = "";
fc::wait_for( ES_WAIT_TIME, [&]() {
res = graphene::utilities::getEndPoint(es);
j = fc::json::from_string(res);
if( !j.is_object() )
return false;
const auto& obj = j.get_object();
if( obj.find("count") == obj.end() )
return false;
total = obj["count"].as_string();
return (total == "0"); // the limit order expired, so the object is removed
});

}
}
catch (fc::exception &e) {
Expand Down

0 comments on commit 8e98c4c

Please sign in to comment.