Skip to content

Commit

Permalink
Merge pull request #1725 from oxarbitrage/issue1682
Browse files Browse the repository at this point in the history
elasticsearch history api #1682
  • Loading branch information
oxarbitrage authored Aug 14, 2019
2 parents 947f0b1 + f1bedc7 commit cea7b74
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 23 deletions.
2 changes: 1 addition & 1 deletion libraries/app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_library( graphene_app

# need to link graphene_debug_witness because plugins aren't sufficiently isolated #246
target_link_libraries( graphene_app
graphene_market_history graphene_account_history graphene_grouped_orders
graphene_market_history graphene_account_history graphene_elasticsearch graphene_grouped_orders
graphene_api_helper_indexes
graphene_chain fc graphene_db graphene_net graphene_utilities graphene_debug_witness )
target_include_directories( graphene_app
Expand Down
12 changes: 12 additions & 0 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,18 @@ namespace graphene { namespace app {
start = node.operation_id;
} catch(...) { return result; }

if(_app.is_plugin_enabled("elasticsearch")) {
auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
if(es.get()->get_running_mode() != elasticsearch::mode::only_save) {
if(!_app.elasticsearch_thread)
_app.elasticsearch_thread= std::make_shared<fc::thread>("elasticsearch");

return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() {
return es->get_account_history(account, stop, limit, start);
}, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
}
}

const auto& hist_idx = db.get_index_type<account_transaction_history_index>();
const auto& by_op_idx = hist_idx.indices().get<by_op>();
auto index_start = by_op_idx.begin();
Expand Down
5 changes: 5 additions & 0 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,11 @@ std::shared_ptr<abstract_plugin> application::get_plugin(const string& name) con
return my->_active_plugins[name];
}

bool application::is_plugin_enabled(const string& name) const
{
return !(my->_active_plugins.find(name) == my->_active_plugins.end());
}

net::node_ptr application::p2p_node()
{
return my->_p2p_network;
Expand Down
2 changes: 2 additions & 0 deletions libraries/app/include/graphene/app/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <graphene/grouped_orders/grouped_orders_plugin.hpp>

#include <graphene/elasticsearch/elasticsearch_plugin.hpp>

#include <graphene/debug_witness/debug_api.hpp>

#include <graphene/net/node.hpp>
Expand Down
6 changes: 5 additions & 1 deletion libraries/app/include/graphene/app/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ namespace graphene { namespace app {

void enable_plugin( const string& name );

private:
bool is_plugin_enabled(const string& name) const;

std::shared_ptr<fc::thread> elasticsearch_thread;

private:
void add_available_plugin( std::shared_ptr<abstract_plugin> p );
std::shared_ptr<detail::application_impl> my;

Expand Down
182 changes: 165 additions & 17 deletions libraries/plugins/elasticsearch/elasticsearch_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
#include <curl/curl.h>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {

Expand Down Expand Up @@ -59,6 +58,8 @@ class elasticsearch_plugin_impl
std::string _elasticsearch_index_prefix = "bitshares-";
bool _elasticsearch_operation_object = false;
uint32_t _elasticsearch_start_es_after_block = 0;
bool _elasticsearch_operation_string = true;
mode _elasticsearch_mode = mode::only_save;
CURL *curl; // curl handler
vector <string> bulk_lines; // vector of op lines
vector<std::string> prepare;
Expand Down Expand Up @@ -223,9 +224,8 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_his
adaptor_struct adaptor;
os.op_object = adaptor.adapt(os.op_object.get_object());
}
else
if(_elasticsearch_operation_string)
os.op = fc::json::to_string(oho->op);

}

void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
Expand Down Expand Up @@ -428,25 +428,32 @@ void elasticsearch_plugin::plugin_set_program_options(
)
{
cli.add_options()
("elasticsearch-node-url", boost::program_options::value<std::string>(), "Elastic Search database node url(http://localhost:9200/)")
("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on replay(10000)")
("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on a syncronied chain(100)")
("elasticsearch-visitor", boost::program_options::value<bool>(), "Use visitor to index additional data(slows down the replay(false))")
("elasticsearch-basic-auth", boost::program_options::value<std::string>(), "Pass basic auth to elasticsearch database('')")
("elasticsearch-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(), "Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
("elasticsearch-node-url", boost::program_options::value<std::string>(),
"Elastic Search database node url(http://localhost:9200/)")
("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
"Number of bulk documents to index on replay(10000)")
("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
"Number of bulk documents to index on a syncronied chain(100)")
("elasticsearch-visitor", boost::program_options::value<bool>(),
"Use visitor to index additional data(slows down the replay(false))")
("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
"Pass basic auth to elasticsearch database('')")
("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
"Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(),
"Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
"Start doing ES job after block(0)")
("elasticsearch-operation-string", boost::program_options::value<bool>(),
"Save operation as string. Needed to serve history api calls(true)")
("elasticsearch-mode", boost::program_options::value<uint16_t>(),
"Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
;
cfg.add(cli);
}

void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().applied_block.connect( [&]( const signed_block& b) {
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying.");
} );

my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();

Expand All @@ -473,7 +480,28 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}

if(my->_elasticsearch_mode != mode::only_query) {
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
"If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");

database().applied_block.connect([this](const signed_block &b) {
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
"Error populating ES database, we are going to keep trying.");
});
}
}

void elasticsearch_plugin::plugin_startup()
Expand All @@ -488,4 +516,124 @@ void elasticsearch_plugin::plugin_startup()
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
}

operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
{
const string operation_id_string = std::string(object_id_type(id));

const string query = R"(
{
"query": {
"match":
{
"account_history.operation_id": )" + operation_id_string + R"("
}
}
}
)";

auto es = prepareHistoryQuery(query);
const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);
const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
return fromEStoOperation(source);
}

vector<operation_history_object> elasticsearch_plugin::get_account_history(
const account_id_type account_id,
operation_history_id_type stop = operation_history_id_type(),
unsigned limit = 100,
operation_history_id_type start = operation_history_id_type())
{
const string account_id_string = std::string(object_id_type(account_id));

const auto stop_number = stop.instance.value;
const auto start_number = start.instance.value;

string range = "";
if(stop_number == 0)
range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
else if(stop_number > 0)
range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";

const string query = R"(
{
"size": )" + fc::to_string(limit) + R"(,
"sort" : [{ "operation_id_num" : {"order" : "desc"}}],
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "account_history.account: )" + account_id_string + range + R"("
}
}
]
}
}
}
)";

auto es = prepareHistoryQuery(query);

vector<operation_history_object> result;

if(!graphene::utilities::checkES(es))
return result;

const auto response = graphene::utilities::simpleQuery(es);
variant variant_response = fc::json::from_string(response);

const auto hits = variant_response["hits"]["total"];
const auto size = std::min(static_cast<uint32_t>(hits.as_uint64()), limit);

for(unsigned i=0; i<size; i++)
{
const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
result.push_back(fromEStoOperation(source));
}
return result;
}

operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
{
operation_history_object result;

const auto operation_id = source["account_history"]["operation_id"];
fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );

const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
fc::from_variant( op, result.op, GRAPHENE_MAX_NESTED_OBJECTS );

const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
fc::from_variant( operation_result, result.result, GRAPHENE_MAX_NESTED_OBJECTS );

result.block_num = source["block_data"]["block_num"].as_uint64();
result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();

return result;
}

graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
{
CURL *curl;
curl = curl_easy_init();

graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.query = query;

return es;
}

mode elasticsearch_plugin::get_running_mode()
{
return my->_elasticsearch_mode;
}


} }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <graphene/app/plugin.hpp>
#include <graphene/chain/database.hpp>
#include <graphene/chain/operation_history_object.hpp>
#include <graphene/utilities/elasticsearch.hpp>

namespace graphene { namespace elasticsearch {
using namespace chain;
Expand All @@ -49,6 +50,8 @@ namespace detail
class elasticsearch_plugin_impl;
}

enum mode { only_save = 0 , only_query = 1, all = 2 };

class elasticsearch_plugin : public graphene::app::plugin
{
public:
Expand All @@ -63,10 +66,20 @@ class elasticsearch_plugin : public graphene::app::plugin
virtual void plugin_initialize(const boost::program_options::variables_map& options) override;
virtual void plugin_startup() override;

operation_history_object get_operation_by_id(operation_history_id_type id);
vector<operation_history_object> get_account_history(const account_id_type account_id,
operation_history_id_type stop, unsigned limit, operation_history_id_type start);
mode get_running_mode();

friend class detail::elasticsearch_plugin_impl;
std::unique_ptr<detail::elasticsearch_plugin_impl> my;

private:
operation_history_object fromEStoOperation(variant source);
graphene::utilities::ES prepareHistoryQuery(string query);
};


struct operation_visitor
{
typedef void result_type;
Expand Down Expand Up @@ -293,6 +306,7 @@ struct adaptor_struct {

} } //graphene::elasticsearch

FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) )
FC_REFLECT( graphene::elasticsearch::operation_history_struct, (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op)(op_object) )
FC_REFLECT( graphene::elasticsearch::block_struct, (block_num)(block_time)(trx_id) )
FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(asset_name)(amount)(amount_units) )
Expand Down
10 changes: 7 additions & 3 deletions tests/common/database_fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ database_fixture::database_fixture(const fc::time_point_sec &initial_timestamp)
boost::unit_test::framework::current_test_case().p_name.value == "track_votes_committee_disabled") {
app.chain_database()->enable_standby_votes_tracking( false );
}
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite") {
if(current_test_name == "elasticsearch_account_history" || current_test_name == "elasticsearch_suite" ||
current_test_name == "elasticsearch_history_api") {
auto esplugin = app.register_plugin<graphene::elasticsearch::elasticsearch_plugin>();
esplugin->plugin_set_app(&app);

options.insert(std::make_pair("elasticsearch-node-url", boost::program_options::variable_value(string("http://localhost:9200/"), false)));
options.insert(std::make_pair("elasticsearch-bulk-replay", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-bulk-sync", boost::program_options::variable_value(uint32_t(2), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(true, false)));
//options.insert(std::make_pair("elasticsearch-basic-auth", boost::program_options::variable_value(string("elastic:changeme"), false)));
options.insert(std::make_pair("elasticsearch-start-es-after-block", boost::program_options::variable_value(uint32_t(0), false)));
options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(false, false)));
options.insert(std::make_pair("elasticsearch-operation-object", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-operation-string", boost::program_options::variable_value(true, false)));
options.insert(std::make_pair("elasticsearch-mode", boost::program_options::variable_value(uint16_t(2), false)));

esplugin->plugin_initialize(options);
esplugin->plugin_startup();
Expand Down
Loading

0 comments on commit cea7b74

Please sign in to comment.