Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch history api #1682 #1725

Merged
merged 16 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_library( graphene_app

# need to link graphene_debug_witness because plugins aren't sufficiently isolated #246
target_link_libraries( graphene_app
graphene_market_history graphene_account_history graphene_grouped_orders
graphene_market_history graphene_account_history graphene_elasticsearch graphene_grouped_orders
graphene_api_helper_indexes
graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_include_directories( graphene_app
Expand Down
12 changes: 12 additions & 0 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,18 @@ namespace graphene { namespace app {
start = node.operation_id;
} catch(...) { return result; }

if(_app.is_plugin_enabled("elasticsearch")) {
auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
if(es.get()->get_running_mode() != elasticsearch::mode::only_save) {
if(!_app.elasticsearch_thread)
_app.elasticsearch_thread= std::make_shared<fc::thread>("elasticsearch");

return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() {
return es->get_account_history(account, stop, limit, start);
}, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
}
}

const auto& hist_idx = db.get_index_type<account_transaction_history_index>();
const auto& by_op_idx = hist_idx.indices().get<by_op>();
auto index_start = by_op_idx.begin();
Expand Down
5 changes: 5 additions & 0 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,11 @@ std::shared_ptr<abstract_plugin> application::get_plugin(const string& name) con
return my->_active_plugins[name];
}

bool application::is_plugin_enabled(const string& name) const
{
return !(my->_active_plugins.find(name) == my->_active_plugins.end());
}

net::node_ptr application::p2p_node()
{
return my->_p2p_network;
Expand Down
2 changes: 2 additions & 0 deletions libraries/app/include/graphene/app/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <graphene/grouped_orders/grouped_orders_plugin.hpp>

#include <graphene/elasticsearch/elasticsearch_plugin.hpp>

#include <graphene/debug_witness/debug_api.hpp>

#include <graphene/net/node.hpp>
Expand Down
6 changes: 5 additions & 1 deletion libraries/app/include/graphene/app/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ namespace graphene { namespace app {

void enable_plugin( const string& name );

private:
bool is_plugin_enabled(const string& name) const;

std::shared_ptr<fc::thread> elasticsearch_thread;

private:
void add_available_plugin( std::shared_ptr<abstract_plugin> p );
std::shared_ptr<detail::application_impl> my;

Expand Down
182 changes: 165 additions & 17 deletions libraries/plugins/elasticsearch/elasticsearch_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
#include <curl/curl.h>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {

Expand Down Expand Up @@ -59,6 +58,8 @@ class elasticsearch_plugin_impl
std::string _elasticsearch_index_prefix = "bitshares-";
bool _elasticsearch_operation_object = false;
uint32_t _elasticsearch_start_es_after_block = 0;
bool _elasticsearch_operation_string = true;
mode _elasticsearch_mode = mode::only_save;
CURL *curl; // curl handler
vector <string> bulk_lines; // vector of op lines
vector<std::string> prepare;
Expand Down Expand Up @@ -223,9 +224,8 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_his
adaptor_struct adaptor;
os.op_object = adaptor.adapt(os.op_object.get_object());
}
else
if(_elasticsearch_operation_string)
os.op = fc::json::to_string(oho->op);

}

void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
Expand Down Expand Up @@ -428,25 +428,32 @@ void elasticsearch_plugin::plugin_set_program_options(
)
{
cli.add_options()
("elasticsearch-node-url", boost::program_options::value<std::string>(), "Elastic Search database node url(http://localhost:9200/)")
("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on replay(10000)")
("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on a syncronied chain(100)")
("elasticsearch-visitor", boost::program_options::value<bool>(), "Use visitor to index additional data(slows down the replay(false))")
("elasticsearch-basic-auth", boost::program_options::value<std::string>(), "Pass basic auth to elasticsearch database('')")
("elasticsearch-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(), "Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
("elasticsearch-node-url", boost::program_options::value<std::string>(),
"Elastic Search database node url(http://localhost:9200/)")
("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
"Number of bulk documents to index on replay(10000)")
("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
"Number of bulk documents to index on a syncronied chain(100)")
("elasticsearch-visitor", boost::program_options::value<bool>(),
"Use visitor to index additional data(slows down the replay(false))")
("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
"Pass basic auth to elasticsearch database('')")
("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
"Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(),
"Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
"Start doing ES job after block(0)")
("elasticsearch-operation-string", boost::program_options::value<bool>(),
"Save operation as string. Needed to serve history api calls(true)")
("elasticsearch-mode", boost::program_options::value<uint16_t>(),
"Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use values 1,2,3 so the option behaves like a bitset. But that's just me.

;
cfg.add(cli);
}

void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().applied_block.connect( [&]( const signed_block& b) {
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying.");
} );

my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();

Expand All @@ -473,7 +480,28 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}

if(my->_elasticsearch_mode != mode::only_query) {
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
"If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");

database().applied_block.connect([this](const signed_block &b) {
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
"Error populating ES database, we are going to keep trying.");
});
}
}

void elasticsearch_plugin::plugin_startup()
Expand All @@ -488,4 +516,124 @@ void elasticsearch_plugin::plugin_startup()
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
}

operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
{
const string operation_id_string = std::string(object_id_type(id));

const string query = R"(
{
"query": {
"match":
{
"account_history.operation_id": )" + operation_id_string + R"("
Copy link
Member

@abitmore abitmore Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A " is missing before )" + operation_id_string.

It seems this code is not used anywhere though.

@sschiessl-bcp I found that you were in the conversation, do you remember anything about this?

Update: fixed in #2573 via 8cf4144.

}
}
}
)";

auto es = prepareHistoryQuery(query);
const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);
const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
return fromEStoOperation(source);
}

vector<operation_history_object> elasticsearch_plugin::get_account_history(
const account_id_type account_id,
operation_history_id_type stop = operation_history_id_type(),
unsigned limit = 100,
operation_history_id_type start = operation_history_id_type())
{
const string account_id_string = std::string(object_id_type(account_id));

const auto stop_number = stop.instance.value;
const auto start_number = start.instance.value;

string range = "";
if(stop_number == 0)
range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
else if(stop_number > 0)
range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";

const string query = R"(
{
"size": )" + fc::to_string(limit) + R"(,
"sort" : [{ "operation_id_num" : {"order" : "desc"}}],
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.account: )" + account_id_string + range + R"("
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);

vector<operation_history_object> result;

if(!graphene::utilities::checkES(es))
return result;

const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);

const auto hits = variant_response["hits"]["total"];
const auto size = std::min(static_cast<uint32_t>(hits.as_uint64()), limit);

for(unsigned i=0; i<size; i++)
{
const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
result.push_back(fromEStoOperation(source));
}
return result;
}

operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
{
operation_history_object result;

const auto operation_id = source["account_history"]["operation_id"];
fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );

const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
fc::from_variant( op, result.op, GRAPHENE_MAX_NESTED_OBJECTS );

const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
fc::from_variant( operation_result, result.result, GRAPHENE_MAX_NESTED_OBJECTS );

result.block_num = source["block_data"]["block_num"].as_uint64();
result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();

return result;
}

graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
{
CURL *curl;
curl = curl_easy_init();

graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.query = query;

return es;
}

mode elasticsearch_plugin::get_running_mode()
{
return my->_elasticsearch_mode;
}


} }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <graphene/app/plugin.hpp>
#include <graphene/chain/database.hpp>
#include <graphene/chain/operation_history_object.hpp>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {
using namespace chain;
Expand All @@ -49,6 +50,8 @@ namespace detail
class elasticsearch_plugin_impl;
}

enum mode { only_save = 0 , only_query = 1, all = 2 };

class elasticsearch_plugin : public graphene::app::plugin
{
public:
Expand All @@ -63,10 +66,20 @@ class elasticsearch_plugin : public graphene::app::plugin
virtual void plugin_initialize(const boost::program_options::variables_map& options) override;
virtual void plugin_startup() override;

operation_history_object get_operation_by_id(operation_history_id_type id);
vector<operation_history_object> get_account_history(const account_id_type account_id,
operation_history_id_type stop, unsigned limit, operation_history_id_type start);
mode get_running_mode();

friend class detail::elasticsearch_plugin_impl;
std::unique_ptr<detail::elasticsearch_plugin_impl> my;

private:
operation_history_object fromEStoOperation(variant source);
graphene::utilities::ES prepareHistoryQuery(string query);
};


struct operation_visitor
{
typedef void result_type;
Expand Down Expand Up @@ -293,6 +306,7 @@ struct adaptor_struct {

} } //graphene::elasticsearch

FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) )
FC_REFLECT( graphene::elasticsearch::operation_history_struct, (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op)(op_object) )
FC_REFLECT( graphene::elasticsearch::block_struct, (block_num)(block_time)(trx_id) )
FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(asset_name)(amount)(amount_units) )
Expand Down
10 changes: 7 additions & 3 deletions tests/common/database_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,19 @@ database_fixture::database_fixture(const fc::time_point_sec &initial_timestamp)
boost::unit_test::framework::current_test_case().p_name.value == "track_votes_committee_disabled") {
app.chain_database()->enable_standby_votes_tracking( false );
}
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite") {
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite" ||
current_test_name == "elasticsearch_history_api") {
auto esplugin = app.register_plugin<graphene::elasticsearch::elasticsearch_plugin>();
esplugin->plugin_set_app(&app);

options.insert(std::make_pair("elasticsearch-node-url", boost::program_options::variable_value(string("http://localhost:9200/"), false)));
options.insert(std::make_pair("elasticsearch-bulk-replay", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-bulk-sync", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(true, false)));
//options.insert(std::make_pair("elasticsearch-basic-auth", boost::program_options::variable_value(string("elastic:changeme"), false)));
options.insert(std::make_pair("elasticsearch-start-es-after-block", boost::program_options::variable_value(uint32_t(0), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(false, false)));
options.insert(std::make_pair("elasticsearch-operation-object", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-operation-string", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-mode", boost::program_options::variable_value(uint16_t(2), false)));

esplugin->plugin_initialize(options);
esplugin->plugin_startup();
Expand Down
Loading