Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch history api #1682 #1725

Merged
merged 16 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ add_library( graphene_app
)

# need to link graphene_debug_witness because plugins aren't sufficiently isolated #246
target_link_libraries( graphene_app graphene_market_history graphene_account_history graphene_grouped_orders graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_link_libraries( graphene_app graphene_market_history graphene_account_history graphene_elasticsearch graphene_grouped_orders graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_include_directories( graphene_app
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
"${CMAKE_CURRENT_SOURCE_DIR}/../egenesis/include" )
Expand Down
7 changes: 7 additions & 0 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ namespace graphene { namespace app {
start = node.operation_id;
} catch(...) { return result; }

if(_app.is_plugin_enabled("elasticsearch")) {
auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
auto _thread = std::make_shared<fc::thread>("elasticsearch");
return _thread->async([&](){ return es->get_account_history(account, stop, limit, start); },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please capture only what's necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, don't use this if mode is only_save.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually work as intended? That would be cool.
We should load-test this before putting it into production.

"thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
}

const auto& hist_idx = db.get_index_type<account_transaction_history_index>();
const auto& by_op_idx = hist_idx.indices().get<by_op>();
auto index_start = by_op_idx.begin();
Expand Down
7 changes: 7 additions & 0 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,13 @@ std::shared_ptr<abstract_plugin> application::get_plugin(const string& name) con
return my->_active_plugins[name];
}

bool application::is_plugin_enabled(const string& name) const
{
if(my->_active_plugins.find(name) == my->_active_plugins.end())
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
return false;
return true;
}

net::node_ptr application::p2p_node()
{
return my->_p2p_network;
Expand Down
2 changes: 2 additions & 0 deletions libraries/app/include/graphene/app/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <graphene/grouped_orders/grouped_orders_plugin.hpp>

#include <graphene/elasticsearch/elasticsearch_plugin.hpp>

#include <graphene/debug_witness/debug_api.hpp>

#include <graphene/net/node.hpp>
Expand Down
5 changes: 4 additions & 1 deletion libraries/app/include/graphene/app/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ namespace graphene { namespace app {

void enable_plugin( const string& name );

private:
bool is_plugin_enabled(const string& name) const;


private:
void add_available_plugin( std::shared_ptr<abstract_plugin> p );
std::shared_ptr<detail::application_impl> my;

Expand Down
166 changes: 157 additions & 9 deletions libraries/plugins/elasticsearch/elasticsearch_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
#include <curl/curl.h>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {

Expand Down Expand Up @@ -59,6 +58,8 @@ class elasticsearch_plugin_impl
std::string _elasticsearch_index_prefix = "bitshares-";
bool _elasticsearch_operation_object = false;
uint32_t _elasticsearch_start_es_after_block = 0;
bool _elasticsearch_operation_string = true;
std::string _elasticsearch_mode = "only_save";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more efficient comparison, make this an enum type.

Copy link
Member Author

@oxarbitrage oxarbitrage Jul 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit tricky to do this because the boost::program_options will not accept the enum but basically just a string, number or boolean.
I picked the number option to do a static cast to the enum by checking the upper boundary to avoid inserting invalid options.

Implemented at de76301

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the exception introduced here to graphene::chain::plugin_exception in the context of 5838a38

CURL *curl; // curl handler
vector <string> bulk_lines; // vector of op lines
vector<std::string> prepare;
Expand Down Expand Up @@ -223,9 +224,8 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_his
adaptor_struct adaptor;
os.op_object = adaptor.adapt(os.op_object.get_object());
}
else
if(_elasticsearch_operation_string)
os.op = fc::json::to_string(oho->op);

}

void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
Expand Down Expand Up @@ -436,17 +436,14 @@ void elasticsearch_plugin::plugin_set_program_options(
("elasticsearch-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(), "Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
("elasticsearch-operation-string", boost::program_options::value<bool>(), "Save operation as string. Needed to serve history api calls(true)")
("elasticsearch-mode", boost::program_options::value<std::string>(), "Mode of operation: only_save, only_query, all(only_save)")
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
;
cfg.add(cli);
}

void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().applied_block.connect( [&]( const signed_block& b) {
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying.");
} );

my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();

Expand All @@ -473,7 +470,23 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
my->_elasticsearch_mode = options["elasticsearch-mode"].as<std::string>();
}

if(my->_elasticsearch_mode != "only_query") {
if (my->_elasticsearch_mode == "all")
my->_elasticsearch_operation_string = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently overriding this will cause confusion.
IMO add log output if user has configured this to false. Or perhaps fail to make this very explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, i am now not allowing the mode to be all if operation_string is false so no silent changes are made.

5838a38


database().applied_block.connect([&](const signed_block &b) {
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying.");
});
}
}

void elasticsearch_plugin::plugin_startup()
Expand All @@ -488,4 +501,139 @@ void elasticsearch_plugin::plugin_startup()
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
}

operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
{
const string operation_id_string = std::string(object_id_type(id));

const string query = R"(
{
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.operation_id: )" + operation_id_string + R"("
}
},
{
"range": {
"block_data.block_time": {
"gte": "now-20y",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a TODO in your calendar for the year 2034: bump this. ;-)

Can't you leave out this part of the query?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the range could be omitted here.

Also I am wondering why you do query_string and not

"query": {
        "match" : {
            "account_history.operation_id" : "operation_id_string"
        }
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my apologies, this was not a production ready query but just a proof of concept probably pasted from kibana. i had changed this at 1f6ac14

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the second query i removed the range but query_string needs to stay as we are making a lucene query there and not just matching a field. 63f7aff

"lte": "now"
}
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);
const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);
const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
return fromEStoOperation(source);
}

vector<operation_history_object> elasticsearch_plugin::get_account_history(
const account_id_type account_id,
operation_history_id_type stop = operation_history_id_type(),
unsigned limit = 100,
operation_history_id_type start = operation_history_id_type())
{
const string account_id_string = std::string(object_id_type(account_id));

const auto stop_number = stop.instance.value;
const auto start_number = start.instance.value;

string range = "";
if(stop_number == 0)
range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
else if(stop_number > 0)
range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";

const string query = R"(
{
"size": )" + fc::to_string(limit) + R"(,
"sort" : [{ "operation_id_num" : {"order" : "desc"}}],
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.account: )" + account_id_string + range + R"("
}
},
{
"range": {
"block_data.block_time": {
"gte": "now-20y",
"lte": "now"
}
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);

vector<operation_history_object> result;

if(!graphene::utilities::checkES(es))
return result;

const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);

const auto hits = variant_response["hits"]["total"];
const auto size = std::min(static_cast<uint32_t>(hits.as_uint64()), limit);

for(unsigned i=0; i<size; i++)
{
const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
result.push_back(fromEStoOperation(source));
}
return result;
}

operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
{
operation_history_object result;

const auto operation_id = source["account_history"]["operation_id"];
fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );

const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
fc::from_variant( op, result.op, GRAPHENE_MAX_NESTED_OBJECTS );

const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
fc::from_variant( operation_result, result.result, GRAPHENE_MAX_NESTED_OBJECTS );

result.block_num = source["block_data"]["block_num"].as_uint64();
result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();

return result;
}

graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
{
CURL *curl;
curl = curl_easy_init();

graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.query = query;

return es;
}

} }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <graphene/app/plugin.hpp>
#include <graphene/chain/database.hpp>
#include <graphene/chain/operation_history_object.hpp>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {
using namespace chain;
Expand Down Expand Up @@ -63,8 +64,16 @@ class elasticsearch_plugin : public graphene::app::plugin
virtual void plugin_initialize(const boost::program_options::variables_map& options) override;
virtual void plugin_startup() override;

operation_history_object get_operation_by_id(operation_history_id_type id);
vector<operation_history_object> get_account_history(const account_id_type account_id,
operation_history_id_type stop, unsigned limit, operation_history_id_type start);

friend class detail::elasticsearch_plugin_impl;
std::unique_ptr<detail::elasticsearch_plugin_impl> my;

private:
operation_history_object fromEStoOperation(variant source);
graphene::utilities::ES prepareHistoryQuery(string query);
};

struct operation_visitor
Expand Down
9 changes: 6 additions & 3 deletions tests/common/database_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,18 @@ database_fixture::database_fixture(const fc::time_point_sec &initial_timestamp)
boost::unit_test::framework::current_test_case().p_name.value == "track_votes_committee_disabled") {
app.chain_database()->enable_standby_votes_tracking( false );
}
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite") {
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite" ||
current_test_name == "elasticsearch_history_api") {
auto esplugin = app.register_plugin<graphene::elasticsearch::elasticsearch_plugin>();
esplugin->plugin_set_app(&app);

options.insert(std::make_pair("elasticsearch-node-url", boost::program_options::variable_value(string("http://localhost:9200/"), false)));
options.insert(std::make_pair("elasticsearch-bulk-replay", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-bulk-sync", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(true, false)));
//options.insert(std::make_pair("elasticsearch-basic-auth", boost::program_options::variable_value(string("elastic:changeme"), false)));
options.insert(std::make_pair("elasticsearch-start-es-after-block", boost::program_options::variable_value(uint32_t(0), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(false, false)));
options.insert(std::make_pair("elasticsearch-operation-object", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-operation-string", boost::program_options::variable_value(true, false)));

esplugin->plugin_initialize(options);
esplugin->plugin_startup();
Expand Down
Loading