diff --git a/libraries/chain/apply_context.cpp b/libraries/chain/apply_context.cpp index 0bbfb8e3aec..c89a366b210 100644 --- a/libraries/chain/apply_context.cpp +++ b/libraries/chain/apply_context.cpp @@ -76,7 +76,13 @@ action_trace apply_context::exec_one() action_trace t(r); t.trx_id = trx_context.id; + t.block_num = control.pending_block_state()->block_num; + t.block_time = control.pending_block_time(); + t.producer_block_id = control.pending_producer_block_id(); + t.account_ram_deltas = std::move( _account_ram_deltas ); + _account_ram_deltas.clear(); t.act = act; + t.context_free = context_free; t.console = _pending_console_output.str(); trx_context.executed.emplace_back( move(r) ); @@ -275,7 +281,7 @@ void apply_context::schedule_deferred_transaction( const uint128_t& sender_id, a "Replacing a deferred transaction is temporarily disabled." ); // TODO: The logic of the next line needs to be incorporated into the next hard fork. - // trx_context.add_ram_usage( ptr->payer, -(config::billable_size_v + ptr->packed_trx.size()) ); + // add_ram_usage( ptr->payer, -(config::billable_size_v + ptr->packed_trx.size()) ); d.modify( *ptr, [&]( auto& gtx ) { gtx.sender = receiver; @@ -303,14 +309,14 @@ void apply_context::schedule_deferred_transaction( const uint128_t& sender_id, a EOS_ASSERT( control.is_ram_billing_in_notify_allowed() || (receiver == act.account) || (receiver == payer) || privileged, subjective_block_production_exception, "Cannot charge RAM to other accounts during notify." ); - trx_context.add_ram_usage( payer, (config::billable_size_v + trx_size) ); + add_ram_usage( payer, (config::billable_size_v + trx_size) ); } bool apply_context::cancel_deferred_transaction( const uint128_t& sender_id, account_name sender ) { auto& generated_transaction_idx = db.get_mutable_index(); const auto* gto = db.find(boost::make_tuple(sender, sender_id)); if ( gto ) { - trx_context.add_ram_usage( gto->payer, -(config::billable_size_v + gto->packed_trx.size()) ); + add_ram_usage( gto->payer, -(config::billable_size_v + gto->packed_trx.size()) ); generated_transaction_idx.remove(*gto); } return gto; @@ -369,7 +375,7 @@ void apply_context::update_db_usage( const account_name& payer, int64_t delta ) require_authorization( payer ); } } - trx_context.add_ram_usage(payer, delta); + add_ram_usage(payer, delta); } @@ -634,5 +640,14 @@ uint64_t apply_context::next_auth_sequence( account_name actor ) { return rs.auth_sequence; } +void apply_context::add_ram_usage( account_name account, int64_t ram_delta ) { + trx_context.add_ram_usage( account, ram_delta ); + + auto p = _account_ram_deltas.emplace( account, ram_delta ); + if( !p.second ) { + p.first->delta += ram_delta; + } +} + } } /// eosio::chain diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 26d54d7da70..4e4f3e5f2fc 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -83,6 +83,8 @@ struct pending_state { controller::block_status _block_status = controller::block_status::incomplete; + optional _producer_block_id; + void push() { _db_session.push(); } @@ -629,6 +631,9 @@ struct controller_impl { if( gtrx.expiration < self.pending_block_time() ) { trace = std::make_shared(); trace->id = gtrx.trx_id; + trace->block_num = self.pending_block_state()->block_num; + trace->block_time = self.pending_block_time(); + trace->producer_block_id = self.pending_producer_block_id(); trace->scheduled = true; trace->receipt = push_receipt( gtrx.trx_id, transaction_receipt::expired, billed_cpu_time_us, 0 ); // expire the transaction emit( self.accepted_transaction, trx ); @@ -868,7 +873,9 @@ struct controller_impl { } /// push_transaction - void start_block( block_timestamp_type when, uint16_t confirm_block_count, controller::block_status s ) { + void start_block( block_timestamp_type when, uint16_t confirm_block_count, controller::block_status s, + const optional& producer_block_id ) + { EOS_ASSERT( !pending, block_validate_exception, "pending block already exists" ); auto guard_pending = fc::make_scoped_exit([this](){ @@ -885,6 +892,7 @@ struct controller_impl { } pending->_block_status = s; + pending->_producer_block_id = producer_block_id; pending->_pending_block_state = std::make_shared( *head, when ); // promotes pending schedule (if any) to active pending->_pending_block_state->in_current_chain = true; @@ -953,7 +961,8 @@ struct controller_impl { void apply_block( const signed_block_ptr& b, controller::block_status s ) { try { try { EOS_ASSERT( b->block_extensions.size() == 0, block_validate_exception, "no supported extensions" ); - start_block( b->timestamp, b->confirmed, s ); + auto producer_block_id = b->id(); + start_block( b->timestamp, b->confirmed, s , producer_block_id); transaction_trace_ptr trace; @@ -993,9 +1002,9 @@ struct controller_impl { finalize_block(); // this implicitly asserts that all header fields (less the signature) are identical - EOS_ASSERT(b->id() == pending->_pending_block_state->header.id(), + EOS_ASSERT(producer_block_id == pending->_pending_block_state->header.id(), block_validate_exception, "Block ID does not match", - ("producer_block_id",b->id())("validator_block_id",pending->_pending_block_state->header.id())); + ("producer_block_id",producer_block_id)("validator_block_id",pending->_pending_block_state->header.id())); // We need to fill out the pending block state's block because that gets serialized in the reversible block log // in the future we can optimize this by serializing the original and not the copy @@ -1389,7 +1398,7 @@ fork_database& controller::fork_db()const { return my->fork_db; } void controller::start_block( block_timestamp_type when, uint16_t confirm_block_count) { validate_db_available_size(); - my->start_block(when, confirm_block_count, block_status::incomplete ); + my->start_block(when, confirm_block_count, block_status::incomplete, optional() ); } void controller::finalize_block() { @@ -1521,6 +1530,11 @@ time_point controller::pending_block_time()const { return my->pending->_pending_block_state->header.timestamp; } +optional controller::pending_producer_block_id()const { + EOS_ASSERT( my->pending, block_validate_exception, "no pending block" ); + return my->pending->_producer_block_id; +} + uint32_t controller::last_irreversible_block_num() const { return std::max(my->head->bft_irreversible_blocknum, my->head->dpos_irreversible_blocknum); } diff --git a/libraries/chain/eosio_contract.cpp b/libraries/chain/eosio_contract.cpp index 71d846b38be..33a123981a1 100644 --- a/libraries/chain/eosio_contract.cpp +++ b/libraries/chain/eosio_contract.cpp @@ -121,7 +121,7 @@ void apply_eosio_newaccount(apply_context& context) { ram_delta += owner_permission.auth.get_billable_size(); ram_delta += active_permission.auth.get_billable_size(); - context.trx_context.add_ram_usage(create.name, ram_delta); + context.add_ram_usage(create.name, ram_delta); } FC_CAPTURE_AND_RETHROW( (create) ) } @@ -167,7 +167,7 @@ void apply_eosio_setcode(apply_context& context) { }); if (new_size != old_size) { - context.trx_context.add_ram_usage( act.account, new_size - old_size ); + context.add_ram_usage( act.account, new_size - old_size ); } } @@ -196,7 +196,7 @@ void apply_eosio_setabi(apply_context& context) { }); if (new_size != old_size) { - context.trx_context.add_ram_usage( act.account, new_size - old_size ); + context.add_ram_usage( act.account, new_size - old_size ); } } @@ -254,13 +254,13 @@ void apply_eosio_updateauth(apply_context& context) { int64_t new_size = (int64_t)(config::billable_size_v + permission->auth.get_billable_size()); - context.trx_context.add_ram_usage( permission->owner, new_size - old_size ); + context.add_ram_usage( permission->owner, new_size - old_size ); } else { const auto& p = authorization.create_permission( update.account, update.permission, parent_id, update.auth ); int64_t new_size = (int64_t)(config::billable_size_v + p.auth.get_billable_size()); - context.trx_context.add_ram_usage( update.account, new_size ); + context.add_ram_usage( update.account, new_size ); } } @@ -291,7 +291,7 @@ void apply_eosio_deleteauth(apply_context& context) { authorization.remove_permission( permission ); - context.trx_context.add_ram_usage( remove.account, -old_size ); + context.add_ram_usage( remove.account, -old_size ); } @@ -334,7 +334,7 @@ void apply_eosio_linkauth(apply_context& context) { link.required_permission = requirement.requirement; }); - context.trx_context.add_ram_usage( + context.add_ram_usage( l.account, (int64_t)(config::billable_size_v) ); @@ -354,7 +354,7 @@ void apply_eosio_unlinkauth(apply_context& context) { auto link_key = boost::make_tuple(unlink.account, unlink.code, unlink.type); auto link = db.find(link_key); EOS_ASSERT(link != nullptr, action_validate_exception, "Attempting to unlink authority, but no link found"); - context.trx_context.add_ram_usage( + context.add_ram_usage( link->account, -(int64_t)(config::billable_size_v) ); diff --git a/libraries/chain/include/eosio/chain/apply_context.hpp b/libraries/chain/include/eosio/chain/apply_context.hpp index ef44ca7e0df..8a4f98a7caa 100644 --- a/libraries/chain/include/eosio/chain/apply_context.hpp +++ b/libraries/chain/include/eosio/chain/apply_context.hpp @@ -572,6 +572,8 @@ class apply_context { uint64_t next_recv_sequence( account_name receiver ); uint64_t next_auth_sequence( account_name actor ); + void add_ram_usage( account_name account, int64_t ram_delta ); + private: void validate_referenced_accounts( const transaction& t )const; @@ -607,6 +609,7 @@ class apply_context { vector _inline_actions; ///< queued inline messages vector _cfa_inline_actions; ///< queued inline messages std::ostringstream _pending_console_output; + flat_set _account_ram_deltas; ///< flat_set of account_delta so json is an array of objects //bytes _cached_trx; }; diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index 21b9e5a6f9f..8747d722b20 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -182,8 +182,9 @@ namespace eosio { namespace chain { time_point fork_db_head_block_time()const; account_name fork_db_head_block_producer()const; - time_point pending_block_time()const; - block_state_ptr pending_block_state()const; + time_point pending_block_time()const; + block_state_ptr pending_block_state()const; + optional pending_producer_block_id()const; const producer_schedule_type& active_producers()const; const producer_schedule_type& pending_producers()const; diff --git a/libraries/chain/include/eosio/chain/trace.hpp b/libraries/chain/include/eosio/chain/trace.hpp index 41fb8f079a6..ad02baf5bac 100644 --- a/libraries/chain/include/eosio/chain/trace.hpp +++ b/libraries/chain/include/eosio/chain/trace.hpp @@ -10,18 +10,33 @@ namespace eosio { namespace chain { + struct account_delta { + account_delta( const account_name& n, int64_t d):account(n),delta(d){} + account_delta(){} + + account_name account; + int64_t delta = 0; + + friend bool operator<( const account_delta& lhs, const account_delta& rhs ) { return lhs.account < rhs.account; } + }; + struct base_action_trace { base_action_trace( const action_receipt& r ):receipt(r){} base_action_trace(){} action_receipt receipt; action act; + bool context_free = false; fc::microseconds elapsed; uint64_t cpu_usage = 0; string console; uint64_t total_cpu_usage = 0; /// total of inline_traces[x].cpu_usage + cpu_usage transaction_id_type trx_id; ///< the transaction that generated this action + uint32_t block_num = 0; + block_timestamp_type block_time; + fc::optional producer_block_id; + flat_set account_ram_deltas; }; struct action_trace : public base_action_trace { @@ -35,6 +50,9 @@ namespace eosio { namespace chain { struct transaction_trace { transaction_id_type id; + uint32_t block_num = 0; + block_timestamp_type block_time; + fc::optional producer_block_id; fc::optional receipt; fc::microseconds elapsed; uint64_t net_usage = 0; @@ -48,11 +66,16 @@ namespace eosio { namespace chain { } } /// namespace eosio::chain +FC_REFLECT( eosio::chain::account_delta, + (account)(delta) ) + FC_REFLECT( eosio::chain::base_action_trace, - (receipt)(act)(elapsed)(cpu_usage)(console)(total_cpu_usage)(trx_id) ) + (receipt)(act)(context_free)(elapsed)(cpu_usage)(console)(total_cpu_usage)(trx_id) + (block_num)(block_time)(producer_block_id)(account_ram_deltas) ) FC_REFLECT_DERIVED( eosio::chain::action_trace, (eosio::chain::base_action_trace), (inline_traces) ) -FC_REFLECT( eosio::chain::transaction_trace, (id)(receipt)(elapsed)(net_usage)(scheduled) +FC_REFLECT( eosio::chain::transaction_trace, (id)(block_num)(block_time)(producer_block_id) + (receipt)(elapsed)(net_usage)(scheduled) (action_traces)(failed_dtrx_trace)(except) ) diff --git a/libraries/chain/include/eosio/chain/transaction_context.hpp b/libraries/chain/include/eosio/chain/transaction_context.hpp index 22e8eae36d6..3175994dedd 100644 --- a/libraries/chain/include/eosio/chain/transaction_context.hpp +++ b/libraries/chain/include/eosio/chain/transaction_context.hpp @@ -38,8 +38,6 @@ namespace eosio { namespace chain { void pause_billing_timer(); void resume_billing_timer(); - void add_ram_usage( account_name account, int64_t ram_delta ); - uint32_t update_billed_cpu_time( fc::time_point now ); std::tuple max_bandwidth_billed_accounts_can_pay( bool force_elastic_limits = false )const; @@ -49,6 +47,8 @@ namespace eosio { namespace chain { friend struct controller_impl; friend class apply_context; + void add_ram_usage( account_name account, int64_t ram_delta ); + void dispatch_action( action_trace& trace, const action& a, account_name receiver, bool context_free = false, uint32_t recurse_depth = 0 ); inline void dispatch_action( action_trace& trace, const action& a, bool context_free = false ) { dispatch_action(trace, a, a.account, context_free); diff --git a/libraries/chain/transaction_context.cpp b/libraries/chain/transaction_context.cpp index 64683ebc049..dd58f0364ec 100644 --- a/libraries/chain/transaction_context.cpp +++ b/libraries/chain/transaction_context.cpp @@ -26,6 +26,9 @@ namespace eosio { namespace chain { undo_session = c.db().start_undo_session(true); } trace->id = id; + trace->block_num = c.pending_block_state()->block_num; + trace->block_time = c.pending_block_time(); + trace->producer_block_id = c.pending_producer_block_id(); executed.reserve( trx.total_actions() ); EOS_ASSERT( trx.transaction_extensions.size() == 0, unsupported_feature, "we don't support any extensions yet" ); } diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index c4d11e1bdee..e7f3e38663a 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -91,6 +92,7 @@ class mongo_db_plugin_impl { void purge_abi_cache(); bool add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, + const chain::transaction_trace_ptr& t, bool executed, const std::chrono::milliseconds& now ); void update_account(const chain::action& act); @@ -116,6 +118,7 @@ class mongo_db_plugin_impl { uint32_t start_block_num = 0; std::atomic_bool start_block_reached{false}; + bool is_producer = false; bool filter_on_star = true; std::set filter_on; std::set filter_out; @@ -127,8 +130,17 @@ class mongo_db_plugin_impl { std::string db_name; mongocxx::instance mongo_inst; - mongocxx::client mongo_conn; - mongocxx::collection accounts; + fc::optional mongo_pool; + + // consum thread + mongocxx::collection _accounts; + mongocxx::collection _trans; + mongocxx::collection _trans_traces; + mongocxx::collection _action_traces; + mongocxx::collection _block_states; + mongocxx::collection _blocks; + mongocxx::collection _pub_keys; + mongocxx::collection _account_controls; size_t max_queue_size = 0; int queue_sleep_time = 0; @@ -266,6 +278,28 @@ void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metada void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_ptr& t ) { try { + // Traces emitted from an incomplete block leave the producer_block_id as empty. + // + // Avoid adding the action traces or transaction traces to the database if the producer_block_id is empty. + // This way traces from speculatively executed transactions are not included in the Mongo database which can + // avoid potential confusion for consumers of that database. + // + // Due to forks, it could be possible for multiple incompatible action traces with the same block_num and trx_id + // to exist in the database. And if the producer double produces a block, even the block_time may not + // disambiguate the two action traces. Without a producer_block_id to disambiguate and determine if the action + // trace comes from an orphaned fork branching off of the blockchain, consumers of the Mongo DB database may be + // reacting to a stale action trace that never actually executed in the current blockchain. + // + // It is better to avoid this potential confusion by not logging traces from speculative execution, i.e. emitted + // from an incomplete block. This means that traces will not be recorded in speculative read-mode, but + // users should not be using the mongo_db_plugin in that mode anyway. + // + // Allow logging traces if node is a producer for testing purposes, so a single nodeos can do both for testing. + // + // It is recommended to run mongo_db_plugin in read-mode = read-only. + // + if( !is_producer && !t->producer_block_id.valid() ) + return; // always queue since account information always gathered queue( transaction_trace_queue, t ); } catch (fc::exception& e) { @@ -279,7 +313,7 @@ void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_p void mongo_db_plugin_impl::applied_irreversible_block( const chain::block_state_ptr& bs ) { try { - if( store_blocks || store_transactions ) { + if( store_blocks || store_block_states || store_transactions ) { queue( irreversible_block_state_queue, bs ); } } catch (fc::exception& e) { @@ -312,6 +346,18 @@ void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) { void mongo_db_plugin_impl::consume_blocks() { try { + auto mongo_client = mongo_pool->acquire(); + auto& mongo_conn = *mongo_client; + + _accounts = mongo_conn[db_name][accounts_col]; + _trans = mongo_conn[db_name][trans_col]; + _trans_traces = mongo_conn[db_name][trans_traces_col]; + _action_traces = mongo_conn[db_name][action_traces_col]; + _blocks = mongo_conn[db_name][blocks_col]; + _block_states = mongo_conn[db_name][block_states_col]; + _pub_keys = mongo_conn[db_name][pub_keys_col]; + _account_controls = mongo_conn[db_name][account_controls_col]; + while (true) { boost::mutex::scoped_lock lock(mtx); while ( transaction_metadata_queue.empty() && @@ -409,6 +455,7 @@ void mongo_db_plugin_impl::consume_blocks() { break; } } + mongo_pool.reset(); ilog("mongo_db_plugin consume thread shutdown gracefully"); } catch (fc::exception& e) { elog("FC Exception while consuming block ${e}", ("e", e.to_string())); @@ -505,7 +552,7 @@ optional mongo_db_plugin_impl::get_abi_serializer( account_name return itr->serializer; } - auto account = accounts.find_one( make_document( kvp("name", n.to_string())) ); + auto account = _accounts.find_one( make_document( kvp("name", n.to_string())) ); if(account) { auto view = account->view(); abi_def abi; @@ -632,7 +679,6 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti using bsoncxx::builder::basic::make_array; namespace bbb = bsoncxx::builder::basic; - auto trans = mongo_conn[db_name][trans_col]; auto trans_doc = bsoncxx::builder::basic::document{}; auto now = std::chrono::duration_cast( @@ -692,8 +738,8 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti try { mongocxx::options::update update_opts{}; update_opts.upsert( true ); - if( !trans.update_one( make_document( kvp( "trx_id", trx_id_str ) ), - make_document( kvp( "$set", trans_doc.view() ) ), update_opts ) ) { + if( !_trans.update_one( make_document( kvp( "trx_id", trx_id_str ) ), + make_document( kvp( "$set", trans_doc.view() ) ), update_opts ) ) { EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", trx_id) ); } } catch( ... ) { @@ -704,6 +750,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti bool mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, + const chain::transaction_trace_ptr& t, bool executed, const std::chrono::milliseconds& now ) { using namespace bsoncxx::types; @@ -734,6 +781,9 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces elog( " JSON: ${j}", ("j", json) ); } } + if( t->receipt.valid() ) { + action_traces_doc.append( kvp( "trx_status", std::string( t->receipt->status ) ) ); + } action_traces_doc.append( kvp( "createdAt", b_date{now} ) ); mongocxx::model::insert_one insert_op{action_traces_doc.view()}; @@ -742,7 +792,7 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces } for( const auto& iline_atrace : atrace.inline_traces ) { - added |= add_action_trace( bulk_action_traces, iline_atrace, executed, now ); + added |= add_action_trace( bulk_action_traces, iline_atrace, t, executed, now ); } return added; @@ -753,8 +803,6 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio using namespace bsoncxx::types; using bsoncxx::builder::basic::kvp; - auto trans_traces = mongo_conn[db_name][trans_traces_col]; - auto action_traces = mongo_conn[db_name][action_traces_col]; auto trans_traces_doc = bsoncxx::builder::basic::document{}; auto now = std::chrono::duration_cast( @@ -762,58 +810,65 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio mongocxx::options::bulk_write bulk_opts; bulk_opts.ordered(false); - mongocxx::bulk_write bulk_action_traces = action_traces.create_bulk_write(bulk_opts); + mongocxx::bulk_write bulk_action_traces = _action_traces.create_bulk_write(bulk_opts); bool write_atraces = false; bool executed = t->receipt.valid() && t->receipt->status == chain::transaction_receipt_header::executed; for( const auto& atrace : t->action_traces ) { try { - write_atraces |= add_action_trace( bulk_action_traces, atrace, executed, now ); + write_atraces |= add_action_trace( bulk_action_traces, atrace, t, executed, now ); } catch(...) { handle_mongo_exception("add action traces", __LINE__); } } - if( write_atraces ) { - try { - if( !bulk_action_traces.execute() ) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk action traces insert failed for transaction trace: ${id}", ("id", t->id)); - } - } catch(...) { - handle_mongo_exception("action traces insert", __LINE__); - } - } - - if( !start_block_reached || !store_transaction_traces ) return; + if( !start_block_reached ) return; //< add_action_trace calls update_account which must be called always if( !write_atraces ) return; //< do not insert transaction_trace if all action_traces filtered out // transaction trace insert - auto v = to_variant_with_abi( *t ); - string json = fc::json::to_string( v ); - try { - const auto& value = bsoncxx::from_json( json ); - trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); - } catch( bsoncxx::exception& ) { + if( store_transaction_traces ) { try { - json = fc::prune_invalid_utf8( json ); - const auto& value = bsoncxx::from_json( json ); - trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); - trans_traces_doc.append( kvp( "non-utf8-purged", b_bool{true} )); - } catch( bsoncxx::exception& e ) { - elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", json)); + auto v = to_variant_with_abi( *t ); + string json = fc::json::to_string( v ); + try { + const auto& value = bsoncxx::from_json( json ); + trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8( json ); + const auto& value = bsoncxx::from_json( json ); + trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + trans_traces_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", json) ); + } + } + trans_traces_doc.append( kvp( "createdAt", b_date{now} ) ); + + try { + if( !_trans_traces.insert_one( trans_traces_doc.view() ) ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", t->id) ); + } + } catch( ... ) { + handle_mongo_exception( "trans_traces insert: " + json, __LINE__ ); + } + } catch( ... ) { + handle_mongo_exception( "trans_traces serialization: " + t->id.str(), __LINE__ ); } } - trans_traces_doc.append( kvp( "createdAt", b_date{now} )); + // insert action_traces try { - if( !trans_traces.insert_one( trans_traces_doc.view())) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", t->id)); + if( !bulk_action_traces.execute() ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, + "Bulk action traces insert failed for transaction trace: ${id}", ("id", t->id) ); } - } catch(...) { - handle_mongo_exception("trans_traces insert: " + json, __LINE__); + } catch( ... ) { + handle_mongo_exception( "action traces insert", __LINE__ ); } + } void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr& bs ) { @@ -835,12 +890,10 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); if( store_block_states ) { - auto block_states = mongo_conn[db_name][block_states_col]; auto block_state_doc = bsoncxx::builder::basic::document{}; block_state_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), kvp( "block_id", block_id_str ), - kvp( "validated", b_bool{bs->validated} ), - kvp( "in_current_chain", b_bool{bs->in_current_chain} ) ); + kvp( "validated", b_bool{bs->validated} ) ); const chain::block_header_state& bhs = *bs; @@ -862,8 +915,8 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr block_state_doc.append( kvp( "createdAt", b_date{now} ) ); try { - if( !block_states.update_one( make_document( kvp( "block_id", block_id_str ) ), - make_document( kvp( "$set", block_state_doc.view() ) ), update_opts ) ) { + if( !_block_states.update_one( make_document( kvp( "block_id", block_id_str ) ), + make_document( kvp( "$set", block_state_doc.view() ) ), update_opts ) ) { EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id) ); } } catch( ... ) { @@ -872,7 +925,6 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr } if( store_blocks ) { - auto blocks = mongo_conn[db_name][blocks_col]; auto block_doc = bsoncxx::builder::basic::document{}; block_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), kvp( "block_id", block_id_str ) ); @@ -896,8 +948,8 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr block_doc.append( kvp( "createdAt", b_date{now} ) ); try { - if( !blocks.update_one( make_document( kvp( "block_id", block_id_str ) ), - make_document( kvp( "$set", block_doc.view() ) ), update_opts ) ) { + if( !_blocks.update_one( make_document( kvp( "block_id", block_id_str ) ), + make_document( kvp( "$set", block_doc.view() ) ), update_opts ) ) { EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id) ); } } catch( ... ) { @@ -913,37 +965,49 @@ void mongo_db_plugin_impl::_process_irreversible_block(const chain::block_state_ using bsoncxx::builder::basic::make_document; using bsoncxx::builder::basic::kvp; - auto blocks = mongo_conn[db_name][blocks_col]; - auto trans = mongo_conn[db_name][trans_col]; const auto block_id = bs->block->id(); const auto block_id_str = block_id.str(); - const auto block_num = bs->block->block_num(); auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); if( store_blocks ) { - auto ir_block = find_block( blocks, block_id_str ); + auto ir_block = find_block( _blocks, block_id_str ); if( !ir_block ) { _process_accepted_block( bs ); - ir_block = find_block( blocks, block_id_str ); + ir_block = find_block( _blocks, block_id_str ); if( !ir_block ) return; // should never happen } auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), kvp( "validated", b_bool{bs->validated} ), - kvp( "in_current_chain", b_bool{bs->in_current_chain} ), kvp( "updatedAt", b_date{now} ) ) ) ); - blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid() ) ), update_doc.view() ); + _blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid() ) ), update_doc.view() ); + } + + if( store_block_states ) { + auto ir_block = find_block( _block_states, block_id_str ); + if( !ir_block ) { + _process_accepted_block( bs ); + ir_block = find_block( _block_states, block_id_str ); + if( !ir_block ) return; // should never happen + } + + auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), + kvp( "validated", b_bool{bs->validated} ), + kvp( "updatedAt", b_date{now} ) ) ) ); + + _block_states.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid() ) ), update_doc.view() ); } if( store_transactions ) { + const auto block_num = bs->block->block_num(); bool transactions_in_block = false; mongocxx::options::bulk_write bulk_opts; bulk_opts.ordered( false ); - auto bulk = trans.create_bulk_write( bulk_opts ); + auto bulk = _trans.create_bulk_write( bulk_opts ); for( const auto& receipt : bs->block->transactions ) { string trx_id_str; @@ -990,9 +1054,7 @@ void mongo_db_plugin_impl::add_pub_keys( const vector& keys, if( keys.empty()) return; - auto pub_keys = mongo_conn[db_name][pub_keys_col]; - - mongocxx::bulk_write bulk = pub_keys.create_bulk_write(); + mongocxx::bulk_write bulk = _pub_keys.create_bulk_write(); for( const auto& pub_key_weight : keys ) { auto find_doc = bsoncxx::builder::basic::document(); @@ -1025,10 +1087,8 @@ void mongo_db_plugin_impl::remove_pub_keys( const account_name& name, const perm using bsoncxx::builder::basic::kvp; using bsoncxx::builder::basic::make_document; - auto pub_keys = mongo_conn[db_name][pub_keys_col]; - try { - auto result = pub_keys.delete_many( make_document( kvp( "account", name.to_string()), + auto result = _pub_keys.delete_many( make_document( kvp( "account", name.to_string()), kvp( "permission", permission.to_string()))); if( !result ) { EOS_ASSERT( false, chain::mongo_db_update_fail, @@ -1050,9 +1110,7 @@ void mongo_db_plugin_impl::add_account_control( const vector(); - create_account( accounts, newacc.name, now ); + create_account( _accounts, newacc.name, now ); add_pub_keys( newacc.owner.keys, newacc.name, owner, now ); add_account_control( newacc.owner.accounts, newacc.name, owner, now ); @@ -1169,10 +1225,10 @@ void mongo_db_plugin_impl::update_account(const chain::action& act) abi_cache_index.erase( setabi.account ); - auto account = find_account( accounts, setabi.account ); + auto account = find_account( _accounts, setabi.account ); if( !account ) { - create_account( accounts, setabi.account, now ); - account = find_account( accounts, setabi.account ); + create_account( _accounts, setabi.account, now ); + account = find_account( _accounts, setabi.account ); } if( account ) { abi_def abi_def = fc::raw::unpack( setabi.abi ); @@ -1184,8 +1240,8 @@ void mongo_db_plugin_impl::update_account(const chain::action& act) kvp( "updatedAt", b_date{now} )))); try { - if( !accounts.update_one( make_document( kvp( "_id", account->view()["_id"].get_oid())), - update_from.view())) { + if( !_accounts.update_one( make_document( kvp( "_id", account->view()["_id"].get_oid())), + update_from.view())) { EOS_ASSERT( false, chain::mongo_db_update_fail, "Failed to udpdate account ${n}", ("n", setabi.account)); } } catch( ... ) { @@ -1203,8 +1259,6 @@ void mongo_db_plugin_impl::update_account(const chain::action& act) } mongo_db_plugin_impl::mongo_db_plugin_impl() -: mongo_inst{} -, mongo_conn{} { } @@ -1225,12 +1279,15 @@ mongo_db_plugin_impl::~mongo_db_plugin_impl() { void mongo_db_plugin_impl::wipe_database() { ilog("mongo db wipe_database"); + auto client = mongo_pool->acquire(); + auto& mongo_conn = *client; + auto block_states = mongo_conn[db_name][block_states_col]; auto blocks = mongo_conn[db_name][blocks_col]; auto trans = mongo_conn[db_name][trans_col]; auto trans_traces = mongo_conn[db_name][trans_traces_col]; auto action_traces = mongo_conn[db_name][action_traces_col]; - accounts = mongo_conn[db_name][accounts_col]; + auto accounts = mongo_conn[db_name][accounts_col]; auto pub_keys = mongo_conn[db_name][pub_keys_col]; auto account_controls = mongo_conn[db_name][account_controls_col]; @@ -1242,6 +1299,7 @@ void mongo_db_plugin_impl::wipe_database() { accounts.drop(); pub_keys.drop(); account_controls.drop(); + ilog("done wipe_database"); } void mongo_db_plugin_impl::init() { @@ -1251,60 +1309,69 @@ void mongo_db_plugin_impl::init() { // Create the native contract accounts manually; sadly, we can't run their contracts to make them create themselves // See native_contract_chain_initializer::prepare_database() - accounts = mongo_conn[db_name][accounts_col]; - if (accounts.count(make_document()) == 0) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - - auto doc = make_document( kvp( "name", name( chain::config::system_account_name ).to_string()), - kvp( "createdAt", b_date{now} )); - - try { - if( !accounts.insert_one( doc.view())) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert account ${n}", - ("n", name( chain::config::system_account_name ).to_string())); - } - } catch(...) { - handle_mongo_exception("account insert", __LINE__); - } - - try { - // blocks indexes - auto blocks = mongo_conn[db_name][blocks_col]; - blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); - blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); - - auto block_stats = mongo_conn[db_name][block_states_col]; - block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); - block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); - - // accounts indexes - accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" )); - - // transactions indexes - auto trans = mongo_conn[db_name][trans_col]; - trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); - - auto trans_trace = mongo_conn[db_name][trans_traces_col]; - trans_trace.create_index( bsoncxx::from_json( R"xxx({ "id" : 1 })xxx" )); + ilog("init mongo"); + try { + auto client = mongo_pool->acquire(); + auto& mongo_conn = *client; - // action traces indexes - auto action_traces = mongo_conn[db_name][action_traces_col]; - action_traces.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); + auto accounts = mongo_conn[db_name][accounts_col]; + if( accounts.count( make_document()) == 0 ) { + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); - // pub_keys indexes - auto pub_keys = mongo_conn[db_name][pub_keys_col]; - pub_keys.create_index( bsoncxx::from_json( R"xxx({ "account" : 1, "permission" : 1 })xxx" )); - pub_keys.create_index( bsoncxx::from_json( R"xxx({ "public_key" : 1 })xxx" )); + auto doc = make_document( kvp( "name", name( chain::config::system_account_name ).to_string()), + kvp( "createdAt", b_date{now} )); - // account_controls indexes - auto account_controls = mongo_conn[db_name][account_controls_col]; - account_controls.create_index( bsoncxx::from_json( R"xxx({ "controlled_account" : 1, "controlled_permission" : 1 })xxx" )); - account_controls.create_index( bsoncxx::from_json( R"xxx({ "controlling_account" : 1 })xxx" )); + try { + if( !accounts.insert_one( doc.view())) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert account ${n}", + ("n", name( chain::config::system_account_name ).to_string())); + } + } catch (...) { + handle_mongo_exception( "account insert", __LINE__ ); + } - } catch(...) { - handle_mongo_exception("create indexes", __LINE__); + try { + // blocks indexes + auto blocks = mongo_conn[db_name][blocks_col]; + blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); + blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); + + auto block_states = mongo_conn[db_name][block_states_col]; + block_states.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); + block_states.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); + + // accounts indexes + accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" )); + + // transactions indexes + auto trans = mongo_conn[db_name][trans_col]; + trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); + + auto trans_trace = mongo_conn[db_name][trans_traces_col]; + trans_trace.create_index( bsoncxx::from_json( R"xxx({ "id" : 1 })xxx" )); + + // action traces indexes + auto action_traces = mongo_conn[db_name][action_traces_col]; + action_traces.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); + + // pub_keys indexes + auto pub_keys = mongo_conn[db_name][pub_keys_col]; + pub_keys.create_index( bsoncxx::from_json( R"xxx({ "account" : 1, "permission" : 1 })xxx" )); + pub_keys.create_index( bsoncxx::from_json( R"xxx({ "public_key" : 1 })xxx" )); + + // account_controls indexes + auto account_controls = mongo_conn[db_name][account_controls_col]; + account_controls.create_index( + bsoncxx::from_json( R"xxx({ "controlled_account" : 1, "controlled_permission" : 1 })xxx" )); + account_controls.create_index( bsoncxx::from_json( R"xxx({ "controlling_account" : 1 })xxx" )); + + } catch (...) { + handle_mongo_exception( "create indexes", __LINE__ ); + } } + } catch (...) { + handle_mongo_exception( "mongo init", __LINE__ ); } ilog("starting db plugin thread"); @@ -1438,6 +1505,10 @@ void mongo_db_plugin::plugin_initialize(const variables_map& options) my->filter_out.insert( fe ); } } + if( options.count( "producer-name") ) { + wlog( "mongodb plugin not recommended on producer node" ); + my->is_producer = true; + } if( my->start_block_num == 0 ) { my->start_block_reached = true; @@ -1449,7 +1520,7 @@ void mongo_db_plugin::plugin_initialize(const variables_map& options) my->db_name = uri.database(); if( my->db_name.empty()) my->db_name = "EOS"; - my->mongo_conn = mongocxx::client{uri}; + my->mongo_pool.emplace(uri); // hook up to signals on controller chain_plugin* chain_plug = app().find_plugin(); diff --git a/tests/Cluster.py b/tests/Cluster.py index 5f3d26d3d51..dac53606bf8 100644 --- a/tests/Cluster.py +++ b/tests/Cluster.py @@ -1256,4 +1256,7 @@ def reportStatus(self): self.biosNode.reportStatus() if hasattr(self, "nodes"): for node in self.nodes: - node.reportStatus() + try: + node.reportStatus() + except: + Utils.Print("No reportStatus") diff --git a/tests/Node.py b/tests/Node.py index 39de900aa71..742fe352c39 100644 --- a/tests/Node.py +++ b/tests/Node.py @@ -459,7 +459,12 @@ def getEosAccountFromDb(self, name, exitOnError=False): subcommand='db.accounts.findOne({"name" : "%s"})' % (name) if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand, exitOnError=exitOnError) + timeout = 3 + for i in range(0,(int(60/timeout) - 1)): + trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand, exitOnError=exitOnError) + if trans is not None: + return trans + time.sleep(timeout) return trans except subprocess.CalledProcessError as ex: msg=ex.output.decode("utf-8") diff --git a/tests/testUtils.py b/tests/testUtils.py index ad77cd20c4c..9629272b17c 100755 --- a/tests/testUtils.py +++ b/tests/testUtils.py @@ -6,6 +6,7 @@ import json import shlex from sys import stdout +from sys import exit import traceback ###########################################################################################