diff --git a/CMakeLists.txt b/CMakeLists.txt index 2039df3546f..0d1504122e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,7 +35,7 @@ set( CXX_STANDARD_REQUIRED ON) set(VERSION_MAJOR 3) set(VERSION_MINOR 0) -set(VERSION_PATCH 3) +set(VERSION_PATCH 4) if(VERSION_SUFFIX) set(VERSION_FULL "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}-${VERSION_SUFFIX}") diff --git a/Docker/README.md b/Docker/README.md index 087105fed7b..b30685382c4 100644 --- a/Docker/README.md +++ b/Docker/README.md @@ -20,10 +20,10 @@ cd bos/Docker docker build . -t boscore/bos -s BOS ``` -The above will build off the most recent commit to the master branch by default. If you would like to target a specific branch/tag, you may use a build argument. For example, if you wished to generate a docker image based off of the v3.0.3 tag, you could do the following: +The above will build off the most recent commit to the master branch by default. If you would like to target a specific branch/tag, you may use a build argument. For example, if you wished to generate a docker image based off of the v3.0.4 tag, you could do the following: ```bash -docker build -t boscore/bos:v3.0.3 --build-arg branch=v3.0.3 . +docker build -t boscore/bos:v3.0.4 --build-arg branch=v3.0.4 . ``` diff --git a/README.md b/README.md index cab40f08390..1e7c08f698b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # BOSCore - Born for DApps. Born for Usability. -## BOSCore Version: v3.0.3 -### Basic EOSIO Version: v1.6.6 (support REX) +## BOSCore Version: v3.0.4 +### Basic EOSIO Version: v1.6.6 (support REX, part 1.8.x) # Background The emergence of EOS has brought new imagination to the blockchain. In just a few months since the main network was launched, the version has undergone dozens of upgrades, not only the stability has been greatly improved, but also the new functions have been gradually realized. The node team is also actively involved in building the EOSIO ecosystem. What is even more exciting is that EOS has attracted more and more development teams. There are already hundreds of DApp running on the EOS main network. The transaction volume and circulation market value far exceed Ethereum, and the space for development is growing broader. diff --git a/README_CN.md b/README_CN.md index 5603854c913..a6465826236 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,7 +1,7 @@ # BOSCore - 更可用的链,为DApp而生。 -## BOSCore Version: v3.0.3 -### Basic EOSIO Version: v1.6.6 (support REX) +## BOSCore Version: v3.0.4 +### Basic EOSIO Version: v1.6.6 (support REX, part 1.8.x) # 背景 EOS的出现给区块链带来了新的想象力,主网启动短短几个月以来,版本经历了几十次升级,不仅稳定性得到了很大提高,并且新功能也逐步实现,各个节点团队也积极参与建设EOSIO生态。让人更加兴奋的是,EOS已经吸引了越来越多的开发团队,当前已经有数百个DApp在EOS主网上面运行,其交易量和流通市值远超以太坊,可发展的空间愈来愈广阔。 diff --git a/libraries/chain/block_log.cpp b/libraries/chain/block_log.cpp index 1c06422c0a7..5a490bd9432 100644 --- a/libraries/chain/block_log.cpp +++ b/libraries/chain/block_log.cpp @@ -9,7 +9,6 @@ #define LOG_READ (std::ios::in | std::ios::binary) #define LOG_WRITE (std::ios::out | std::ios::binary | std::ios::app) -#define LOG_RW ( std::ios::in | std::ios::out | std::ios::binary ) namespace eosio { namespace chain { @@ -32,42 +31,47 @@ namespace eosio { namespace chain { std::fstream index_stream; fc::path block_file; fc::path index_file; - bool open_files = false; + bool block_write; + bool index_write; bool genesis_written_to_block_log = false; uint32_t version = 0; uint32_t first_block_num = 0; - inline void check_open_files() { - if( !open_files ) { - reopen(); + inline void check_block_read() { + if (block_write) { + block_stream.close(); + block_stream.open(block_file.generic_string().c_str(), LOG_READ); + block_write = false; } } - void reopen(); - void close() { - if( block_stream.is_open() ) + inline void check_block_write() { + if (!block_write) { block_stream.close(); - if( index_stream.is_open() ) - index_stream.close(); - open_files = false; + block_stream.open(block_file.generic_string().c_str(), LOG_WRITE); + block_write = true; + } } - }; - - void block_log_impl::reopen() { - close(); - - // open to create files if they don't exist - //ilog("Opening block log at ${path}", ("path", my->block_file.generic_string())); - block_stream.open(block_file.generic_string().c_str(), LOG_WRITE); - index_stream.open(index_file.generic_string().c_str(), LOG_WRITE); - close(); - - block_stream.open(block_file.generic_string().c_str(), LOG_RW); - index_stream.open(index_file.generic_string().c_str(), LOG_RW); + inline void check_index_read() { + try { + if (index_write) { + index_stream.close(); + index_stream.open(index_file.generic_string().c_str(), LOG_READ); + index_write = false; + } + } + FC_LOG_AND_RETHROW() + } - open_files = true; - } + inline void check_index_write() { + if (!index_write) { + index_stream.close(); + index_stream.open(index_file.generic_string().c_str(), LOG_WRITE); + index_write = true; + } + } + }; } block_log::block_log(const fc::path& data_dir) @@ -84,21 +88,26 @@ namespace eosio { namespace chain { block_log::~block_log() { if (my) { flush(); - my->close(); my.reset(); } } void block_log::open(const fc::path& data_dir) { - my->close(); + if (my->block_stream.is_open()) + my->block_stream.close(); + if (my->index_stream.is_open()) + my->index_stream.close(); if (!fc::is_directory(data_dir)) fc::create_directories(data_dir); - my->block_file = data_dir / "blocks.log"; my->index_file = data_dir / "blocks.index"; - my->reopen(); + //ilog("Opening block log at ${path}", ("path", my->block_file.generic_string())); + my->block_stream.open(my->block_file.generic_string().c_str(), LOG_WRITE); + my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE); + my->block_write = true; + my->index_write = true; /* On startup of the block log, there are several states the log file and the index file can be * in relation to each other. @@ -123,6 +132,7 @@ namespace eosio { namespace chain { if (log_size) { ilog("Log is nonempty"); + my->check_block_read(); my->block_stream.seekg( 0 ); my->version = 0; my->block_stream.read( (char*)&my->version, sizeof(my->version) ); @@ -149,6 +159,9 @@ namespace eosio { namespace chain { } if (index_size) { + my->check_block_read(); + my->check_index_read(); + ilog("Index is nonempty"); uint64_t block_pos; my->block_stream.seekg(-sizeof(uint64_t), std::ios::end); @@ -171,9 +184,10 @@ namespace eosio { namespace chain { } } else if (index_size) { ilog("Index is nonempty, remove and recreate it"); - my->close(); + my->index_stream.close(); fc::remove_all(my->index_file); - my->reopen(); + my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE); + my->index_write = true; } } @@ -181,10 +195,9 @@ namespace eosio { namespace chain { try { EOS_ASSERT( my->genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" ); - my->check_open_files(); + my->check_block_write(); + my->check_index_write(); - my->block_stream.seekp(0, std::ios::end); - my->index_stream.seekp(0, std::ios::end); uint64_t pos = my->block_stream.tellp(); EOS_ASSERT(my->index_stream.tellp() == sizeof(uint64_t) * (b->block_num() - my->first_block_num), block_log_append_fail, @@ -211,17 +224,22 @@ namespace eosio { namespace chain { } void block_log::reset( const genesis_state& gs, const signed_block_ptr& first_block, uint32_t first_block_num ) { - my->close(); + if (my->block_stream.is_open()) + my->block_stream.close(); + if (my->index_stream.is_open()) + my->index_stream.close(); fc::remove_all(my->block_file); fc::remove_all(my->index_file); - my->reopen(); + my->block_stream.open(my->block_file.generic_string().c_str(), LOG_WRITE); + my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE); + my->block_write = true; + my->index_write = true; auto data = fc::raw::pack(gs); my->version = 0; // version of 0 is invalid; it indicates that the genesis was not properly written to the block log my->first_block_num = first_block_num; - my->block_stream.seekp(0, std::ios::end); my->block_stream.write((char*)&my->version, sizeof(my->version)); my->block_stream.write((char*)&my->first_block_num, sizeof(my->first_block_num)); my->block_stream.write(data.data(), data.size()); @@ -233,20 +251,29 @@ namespace eosio { namespace chain { if (first_block) { append(first_block); + } else { + my->head.reset(); + my->head_id = {}; } auto pos = my->block_stream.tellp(); + my->block_stream.close(); + my->block_stream.open(my->block_file.generic_string().c_str(), std::ios::in | std::ios::out | std::ios::binary ); // Bypass append-only writing just once + static_assert( block_log::max_supported_version > 0, "a version number of zero is not supported" ); my->version = block_log::max_supported_version; my->block_stream.seekp( 0 ); my->block_stream.write( (char*)&my->version, sizeof(my->version) ); my->block_stream.seekp( pos ); flush(); + + my->block_write = false; + my->check_block_write(); // Reset to append-only writing. } std::pair block_log::read_block(uint64_t pos)const { - my->check_open_files(); + my->check_block_read(); my->block_stream.seekg(pos); std::pair result; @@ -270,7 +297,7 @@ namespace eosio { namespace chain { } uint64_t block_log::get_block_pos(uint32_t block_num) const { - my->check_open_files(); + my->check_index_read(); if (!(my->head && block_num <= block_header::num_from_id(my->head_id) && block_num >= my->first_block_num)) return npos; my->index_stream.seekg(sizeof(uint64_t) * (block_num - my->first_block_num)); @@ -280,7 +307,7 @@ namespace eosio { namespace chain { } signed_block_ptr block_log::read_head()const { - my->check_open_files(); + my->check_block_read(); uint64_t pos; @@ -308,13 +335,13 @@ namespace eosio { namespace chain { void block_log::construct_index() { ilog("Reconstructing Block Log Index..."); - my->close(); - + my->index_stream.close(); fc::remove_all(my->index_file); - - my->reopen(); + my->index_stream.open(my->index_file.generic_string().c_str(), LOG_WRITE); + my->index_write = true; uint64_t end_pos; + my->check_block_read(); my->block_stream.seekg(-sizeof( uint64_t), std::ios::end); my->block_stream.read((char*)&end_pos, sizeof(end_pos)); @@ -343,10 +370,11 @@ namespace eosio { namespace chain { my->block_stream.read((char*) &totem, sizeof(totem)); } - my->index_stream.seekp(0, std::ios::end); while( pos < end_pos ) { fc::raw::unpack(my->block_stream, tmp); my->block_stream.read((char*)&pos, sizeof(pos)); + if(tmp.block_num() % 1000 == 0) + ilog( "Block log index reconstructed for block ${n}", ("n", tmp.block_num())); my->index_stream.write((char*)&pos, sizeof(pos)); } } // construct_index @@ -468,6 +496,8 @@ namespace eosio { namespace chain { new_block_stream.write( data.data(), data.size() ); new_block_stream.write( reinterpret_cast(&pos), sizeof(pos) ); block_num = tmp.block_num(); + if(block_num % 1000 == 0) + ilog( "Recovered block ${num}", ("num", block_num) ); pos = new_block_stream.tellp(); if( block_num == truncate_at_block ) break; diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 75a799b74fa..c198d037f9c 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -322,15 +322,14 @@ struct controller_impl { auto start = fc::time_point::now(); while( auto next = blog.read_block_by_num( head->block_num + 1 ) ) { replay_push_block( next, controller::block_status::irreversible ); - if( next->block_num() % 100 == 0 ) { - std::cerr << std::setw(10) << next->block_num() << " of " << blog_head->block_num() <<"\r"; + if( next->block_num() % 500 == 0 ) { + ilog( "${n} of ${head}", ("n", next->block_num())("head", blog_head->block_num()) ); if( shutdown() ) break; } } - std::cerr<< "\n"; ilog( "${n} blocks replayed", ("n", head->block_num - start_block_num) ); - // if the irreverible log is played without undo sessions enabled, we need to sync the + // if the irreversible log is played without undo sessions enabled, we need to sync the // revision ordinal to the appropriate expected value here. if( self.skip_db_sessions( controller::block_status::irreversible ) ) db.set_revision(head->block_num); @@ -389,7 +388,7 @@ struct controller_impl { report_integrity_hash = true; } } - + if( shutdown() ) return; const auto& ubi = reversible_blocks.get_index(); @@ -921,17 +920,14 @@ struct controller_impl { void commit_block( bool add_to_fork_db ) { auto reset_pending_on_exit = fc::make_scoped_exit([this]{ pending.reset(); - }); try { - if (add_to_fork_db) { pending->_pending_block_state->validated = true; auto new_bsp = fork_db.add(pending->_pending_block_state, true, pbft_enabled); emit(self.accepted_block_header, pending->_pending_block_state); - head = fork_db.head(); EOS_ASSERT(new_bsp == head, fork_database_exception, "committed block did not become the new head in fork database"); } @@ -1381,7 +1377,6 @@ struct controller_impl { pending->_pending_block_state->set_confirmed(confirm_block_count, pbft_enabled); - auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(pbft_enabled); //modify state in speculative block only if we are speculative reads mode (other wise we need clean state for head or irreversible reads) diff --git a/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp b/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp index d7a64f809d0..b8bd39823b5 100644 --- a/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp +++ b/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp @@ -759,8 +759,8 @@ namespace eosio { namespace chain { namespace wasm_injections { struct post_op_injectors : wasm_ops::op_types { - using loop_t = wasm_ops::loop ; - using call_t = wasm_ops::call ; + using loop_t = wasm_ops::loop ; + using call_t = wasm_ops::call ; using grow_memory_t = wasm_ops::grow_memory ; }; diff --git a/libraries/chain/transaction.cpp b/libraries/chain/transaction.cpp index 2724a31b28d..08cd6a0222c 100644 --- a/libraries/chain/transaction.cpp +++ b/libraries/chain/transaction.cpp @@ -213,7 +213,7 @@ static bytes zlib_decompress(const bytes& data) { bytes out; bio::filtering_ostream decomp; decomp.push(bio::zlib_decompressor()); - decomp.push(read_limiter<1*1024*1024>()); // limit to 10 megs decompressed for zip bomb protections + decomp.push(read_limiter<1*1024*1024>()); // limit to 1 meg decompressed for zip bomb protections decomp.push(bio::back_inserter(out)); bio::write(decomp, data.data(), data.size()); bio::close(decomp); diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index c720b348e10..df43dc312ce 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -1273,7 +1273,7 @@ uint64_t convert_to_type(const string& str, const string& desc) { try { return boost::lexical_cast(str.c_str(), str.size()); } catch( ... ) { } - + try { auto trimmed_str = str; boost::trim(trimmed_str); @@ -1287,7 +1287,7 @@ uint64_t convert_to_type(const string& str, const string& desc) { return symb.value(); } catch( ... ) { } } - + try { return ( eosio::chain::string_to_symbol( 0, str.c_str() ) >> 8 ); } catch( ... ) { @@ -1668,14 +1668,24 @@ read_only::get_scheduled_transactions( const read_only::get_scheduled_transactio fc::variant read_only::get_block(const read_only::get_block_params& params) const { signed_block_ptr block; - EOS_ASSERT(!params.block_num_or_id.empty() && params.block_num_or_id.size() <= 64, chain::block_id_type_exception, "Invalid Block number or ID, must be greater than 0 and less than 64 characters" ); + optional block_num; + + EOS_ASSERT( !params.block_num_or_id.empty() && params.block_num_or_id.size() <= 64, + chain::block_id_type_exception, + "Invalid Block number or ID, must be greater than 0 and less than 64 characters" + ); + try { - block = db.fetch_block_by_id(fc::variant(params.block_num_or_id).as()); - if (!block) { - block = db.fetch_block_by_number(fc::to_uint64(params.block_num_or_id)); - } + block_num = fc::to_uint64(params.block_num_or_id); + } catch( ... ) {} - } EOS_RETHROW_EXCEPTIONS(chain::block_id_type_exception, "Invalid block ID: ${block_num_or_id}", ("block_num_or_id", params.block_num_or_id)) + if( block_num.valid() ) { + block = db.fetch_block_by_number( *block_num ); + } else { + try { + block = db.fetch_block_by_id( fc::variant(params.block_num_or_id).as() ); + } EOS_RETHROW_EXCEPTIONS(chain::block_id_type_exception, "Invalid block ID: ${block_num_or_id}", ("block_num_or_id", params.block_num_or_id)) + } EOS_ASSERT( block, unknown_block_exception, "Could not find block: ${block}", ("block", params.block_num_or_id)); diff --git a/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp b/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp index 3de8b9a8d34..a194213537f 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp @@ -30,7 +30,6 @@ namespace eosio { void plugin_startup(); void plugin_shutdown(); - void broadcast_block(const chain::signed_block &sb); string connect( const string& endpoint ); @@ -43,7 +42,7 @@ namespace eosio { size_t num_peers() const; private: - std::unique_ptr my; + std::shared_ptr my; }; } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 31fdeec6dd1..5ab0ca6b918 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -122,6 +122,7 @@ namespace eosio { unique_ptr acceptor; tcp::endpoint listen_endpoint; string p2p_address; + string p2p_server_address; uint32_t max_client_count = 0; uint32_t max_nodes_per_host = 1; uint32_t num_clients = 0; @@ -173,8 +174,6 @@ namespace eosio { node_transaction_index local_txns; - shared_ptr resolver; - bool use_socket_read_watermark = false; std::unordered_map pbft_message_cache{}; @@ -194,12 +193,22 @@ namespace eosio { eosio::chain::plugin_interface::pbft::incoming::new_view_channel::channel_type& pbft_incoming_new_view_channel; eosio::chain::plugin_interface::pbft::incoming::checkpoint_channel::channel_type& pbft_incoming_checkpoint_channel; - void connect(const connection_ptr& c); - void connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr); + void connect( const connection_ptr& c ); + void connect( const connection_ptr& c, const std::shared_ptr& resolver, tcp::resolver::results_type endpoints ); bool start_session(const connection_ptr& c); void start_listen_loop(); void start_read_message(const connection_ptr& c); + /** \brief Process the next message from the pending message buffer + * + * Process the next message from the pending_message_buffer. + * message_length is the already determined length of the data + * part of the message that will handle the message. + * Returns true is successful. Returns false if an error was + * encountered unpacking or processing the message. + */ + bool process_next_message(const connection_ptr& conn, uint32_t message_length); + void close(const connection_ptr& c); size_t count_open_sockets() const; @@ -357,7 +366,6 @@ namespace eosio { constexpr auto def_send_buffer_size = 1024*1024*def_send_buffer_size_mb; constexpr auto def_max_write_queue_size = def_send_buffer_size*10; constexpr boost::asio::chrono::milliseconds def_read_delay_for_full_write_queue{100}; - constexpr auto def_max_reads_in_flight = 1000; constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB constexpr auto def_max_clients = 25; // 0 for unlimited clients constexpr auto def_max_nodes_per_host = 1; @@ -591,7 +599,6 @@ namespace eosio { queued_buffer buffer_queue; - uint32_t reads_in_flight = 0; uint32_t trx_in_progress_size = 0; fc::sha256 node_id; handshake_message last_handshake_recv; @@ -668,6 +675,9 @@ namespace eosio { const string peer_name(); + void txn_send_pending(const vector& ids); + void txn_send(const vector& txn_lis); + void blk_send_branch(); void blk_send(const block_id_type& blkid); void stop_send(); @@ -701,16 +711,6 @@ namespace eosio { void send_p2p_request(bool discoverable); void send_p2p_response(bool discoverable,string p2p_peer_list); - /** \brief Process the next message from the pending message buffer - * - * Process the next message from the pending_message_buffer. - * message_length is the already determined length of the data - * part of the message and impl in the net plugin implementation - * that will handle the message. - * Returns true is successful. Returns false if an error was - * encountered unpacking or processing the message. - */ - bool process_next_message(net_plugin_impl& impl, uint32_t message_length); bool add_peer_block(const peer_block_state& pbs); @@ -790,7 +790,7 @@ namespace eosio { chain_plugin* chain_plug = nullptr; - constexpr auto stage_str(stages s ); + constexpr static auto stage_str(stages s); public: explicit sync_manager(uint32_t span); @@ -802,13 +802,13 @@ namespace eosio { void request_next_chunk(const connection_ptr& conn = connection_ptr()); void start_sync(const connection_ptr& c, uint32_t target); void reassign_fetch(const connection_ptr& c, go_away_reason reason); - void verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id); + bool verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id); void rejected_block(const connection_ptr& c, uint32_t blk_num); void recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num); void recv_handshake(const connection_ptr& c, const handshake_message& msg); void recv_notice(const connection_ptr& c, const notice_message& msg); bool is_syncing(); - void sync_stable_checkpoints(const connection_ptr& c, uint32_t target); + bool sync_stable_checkpoints(const connection_ptr& c, uint32_t target); }; class dispatch_manager { @@ -835,7 +835,7 @@ namespace eosio { : blk_state(), trx_state(), peer_requested(), - socket( std::make_shared( std::ref( app().get_io_service() ))), + socket( std::make_shared( std::ref(app().get_io_service()) )), node_id(), last_handshake_recv(), last_handshake_sent(), @@ -917,6 +917,7 @@ namespace eosio { void connection::close() { if(socket) { socket->close(); + socket.reset( new tcp::socket( std::ref(app().get_io_service())) ); } else { wlog("no socket to close!"); @@ -930,14 +931,37 @@ namespace eosio { } reset(); sent_handshake_count = 0; + trx_in_progress_size = 0; + node_id = fc::sha256(); last_handshake_recv = handshake_message(); last_handshake_sent = handshake_message(); my_impl->sync_master->reset_lib_num(shared_from_this()); + fc_ilog(logger, "closing ${a}, ${p}", ("a",peer_addr)("p",peer_name())); fc_dlog(logger, "canceling wait on ${p}", ("p",peer_name())); cancel_wait(); if( read_delay_timer ) read_delay_timer->cancel(); } + void connection::txn_send_pending(const vector& ids) { + const std::set known_ids(ids.cbegin(), ids.cend()); + my_impl->expire_local_txns(); + for(auto tx = my_impl->local_txns.begin(); tx != my_impl->local_txns.end(); ++tx ){ + const bool found = known_ids.find( tx->id ) != known_ids.cend(); + if( !found ) { + queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} ); + } + } + } + + void connection::txn_send(const vector& ids) { + for(const auto& t : ids) { + auto tx = my_impl->local_txns.get().find(t); + if( tx != my_impl->local_txns.end() ) { + queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} ); + } + } + } + void connection::blk_send_branch() { controller& cc = my_impl->chain_plug->chain(); uint32_t head_num = cc.fork_db_head_block_num(); @@ -960,13 +984,7 @@ namespace eosio { fc_dlog(logger, "maybe truncating branch at = ${h}:${id}",("h",remote_head_num)("id",remote_head_id)); } - // base our branch off of the last handshake we sent the peer instead of our current - // LIB which could have moved forward in time as packets were in flight. - if (last_handshake_sent.generation >= 1) { - lib_id = last_handshake_sent.last_irreversible_block_id; - } else { - lib_id = cc.last_irreversible_block_id(); - } + lib_id = last_handshake_recv.last_irreversible_block_id; head_id = cc.fork_db_head_block_id(); } catch (const assert_exception& ex) { @@ -988,7 +1006,13 @@ namespace eosio { uint32_t end = std::max( peer_requested->end_block, block_header::num_from_id(head_id) ); peer_requested = sync_state( start, end, start - 1 ); } - enqueue_sync_block(); + + if( peer_requested->start_block <= peer_requested->end_block ) { + fc_dlog( logger, "enqueue ${s} - ${e}", ("s", peer_requested->start_block)( "e", peer_requested->end_block ) ); + enqueue_sync_block(); + } else { + peer_requested.reset(); + } // still want to send transactions along during blk branch sync syncing = false; @@ -1104,7 +1128,7 @@ namespace eosio { buffer_queue.fill_out_buffer( bufs ); fill_out_buffer_with_pbft_queue( bufs ); - boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) { + boost::asio::async_write(*socket, bufs, [c, socket=socket](boost::system::error_code ec, std::size_t w) { try { auto conn = c.lock(); if(!conn) @@ -1277,7 +1301,8 @@ namespace eosio { connection_ptr conn = weak_this.lock(); if (conn) { if (close_after_send != no_reason) { - elog ("sent a go away message: ${r}, closing connection to ${p}",("r", reason_str(close_after_send))("p", conn->peer_name())); + fc_elog( logger, "sent a go away message: ${r}, closing connection to ${p}", + ("r", reason_str(close_after_send))("p", conn->peer_name()) ); my_impl->close(conn); return; } @@ -1347,6 +1372,13 @@ namespace eosio { if( !peer_addr.empty() ) { return peer_addr; } + if( socket != nullptr ) { + boost::system::error_code ec; + auto rep = socket->remote_endpoint(ec); + if( !ec ) { + return rep.address().to_string() + ':' + std::to_string( rep.port() ); + } + } return "connecting client"; } @@ -1372,27 +1404,6 @@ namespace eosio { sync_wait(); } - bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) { - try { - auto ds = pending_message_buffer.create_datastream(); - net_message msg; - fc::raw::unpack(ds, msg); - msg_handler m(impl, shared_from_this() ); - if( msg.contains() ) { - m( std::move( msg.get() ) ); - } else if( msg.contains() ) { - m( std::move( msg.get() ) ); - } else { - msg.visit( m ); - } - } catch( const fc::exception& e ) { - edump((e.to_detail_string() )); - impl.close( shared_from_this() ); - return false; - } - return true; - } - bool connection::add_peer_block(const peer_block_state& entry) { auto bptr = blk_state.get().find(entry.id); bool added = (bptr == blk_state.end()); @@ -1497,43 +1508,46 @@ namespace eosio { source = conn; } else { - if (my_impl->connections.size() == 1) { + if( my_impl->connections.size() == 0 ) { + source.reset(); + } else if( my_impl->connections.size() == 1 ) { if (!source) { source = *my_impl->connections.begin(); } - } - else { + } else { // init to a linear array search auto cptr = my_impl->connections.begin(); auto cend = my_impl->connections.end(); // do we remember the previous source? - if (source) { + if( source ) { //try to find it in the list - cptr = my_impl->connections.find(source); + cptr = my_impl->connections.find( source ); cend = cptr; - if (cptr == my_impl->connections.end()) { + if( cptr == my_impl->connections.end() ) { //not there - must have been closed! cend is now connections.end, so just flatten the ring. source.reset(); cptr = my_impl->connections.begin(); } else { //was found - advance the start to the next. cend is the old source. - if (++cptr == my_impl->connections.end() && cend != my_impl->connections.end() ) { + if( ++cptr == my_impl->connections.end() && cend != my_impl->connections.end() ) { cptr = my_impl->connections.begin(); } } } //scan the list of peers looking for another able to provide sync blocks. - auto cstart_it = cptr; - do { - //select the first one which is current and break out. - if((*cptr)->current()) { - source = *cptr; - break; - } - if(++cptr == my_impl->connections.end()) + if( cptr != my_impl->connections.end() ) { + auto cstart_it = cptr; + do { + //select the first one which is current and break out. + if( (*cptr)->current() ) { + source = *cptr; + break; + } + if( ++cptr == my_impl->connections.end() ) cptr = my_impl->connections.begin(); - } while(cptr != cstart_it); + } while( cptr != cstart_it ); + } // no need to check the result, either source advanced or the whole list was checked and the old source is reused. } } @@ -1575,8 +1589,8 @@ namespace eosio { sync_known_lib_num = target; } - if (!sync_required()) { - uint32_t bnum = chain_plug->chain().last_irreversible_block_num(); + uint32_t bnum = chain_plug->chain().last_irreversible_block_num(); + if (!sync_required() || target <= bnum) { uint32_t hnum = chain_plug->chain().fork_db_head_block_num(); fc_dlog( logger, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}", ("b",bnum)("h",hnum)("t",target)); @@ -1594,25 +1608,25 @@ namespace eosio { request_next_chunk(c); } - void sync_manager::sync_stable_checkpoints(const connection_ptr& c, uint32_t target) { + bool sync_manager::sync_stable_checkpoints(const connection_ptr& c, uint32_t target) { controller& cc = chain_plug->chain(); uint32_t lscb_num = cc.last_stable_checkpoint_block_num(); - auto head_num = cc.head_block_num(); - if (last_req_scp_num < lscb_num - || last_req_scp_num == 0 - || last_req_scp_num > target) last_req_scp_num = lscb_num; - auto pbft_checkpoint_granularity = chain_plug->pbft_ctrl().pbft_db.get_checkpoint_interval(); - auto end = target; + if (last_req_scp_num < lscb_num || last_req_scp_num == 0) last_req_scp_num = lscb_num; + auto max_target_scp_num = last_req_scp_num + pbft_checkpoint_granularity * 10; - if (target > max_target_scp_num) end = std::min(max_target_scp_num, head_num); + auto end = std::min(max_target_scp_num, target); - if (end - last_req_scp_num < pbft_checkpoint_granularity) return; + if (end - last_req_scp_num < pbft_checkpoint_granularity) { + last_req_scp_num = lscb_num; + return false; + } checkpoint_request_message crm = {last_req_scp_num+1,end}; c->enqueue( net_message(crm)); fc_dlog(logger, "request sync stable checkpoints from ${s} to ${e}", ("s", last_req_scp_num+1)("e", end)); last_req_scp_num = end; + return true; } void sync_manager::reassign_fetch(const connection_ptr& c, go_away_reason reason) { @@ -1640,8 +1654,10 @@ namespace eosio { // 1. my head block num < peer lib - start sync locally // 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation // - // 3 my head block num <= peer head block num - update sync state and send a catchup request - // 4 my head block num > peer block num send a notice catchup if this is not the first generation + // 3 my head block num < peer head block num - update sync state and send a catchup request + // 4 my head block num >= peer block num send a notice catchup if this is not the first generation + // 4.1 if peer appears to be on a different fork ( our_id_for( msg.head_num ) != msg.head_id ) + // then request peer's blocks // //----------------------------- @@ -1679,7 +1695,7 @@ namespace eosio { return; } - if (head <= msg.head_num ) { + if (head < msg.head_num ) { fc_dlog(logger, "sync check state 3"); verify_catchup(c, msg.head_num, msg.head_id); return; @@ -1695,12 +1711,22 @@ namespace eosio { c->enqueue( note ); } c->syncing = true; + bool on_fork = true; + try { + on_fork = cc.get_block_id_for_num( msg.head_num ) != msg.head_id; + } catch( ... ) {} + if( on_fork ) { + request_message req; + req.req_blocks.mode = catch_up; + req.req_trx.mode = none; + c->enqueue( req ); + } return; } elog("sync check failed to resolve status"); } - void sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { + bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { request_message req; req.req_blocks.mode = catch_up; for (const auto& cc : my_impl->connections) { @@ -1711,13 +1737,13 @@ namespace eosio { } } if( req.req_blocks.mode == catch_up ) { - c->fork_head = id; - c->fork_head_num = num; ilog("got a catch_up notice while in ${s}, fork head num = ${fhn} target LIB = ${lib} next_expected = ${ne}", ("s",stage_str(state))("fhn",num)("lib",sync_known_lib_num)("ne", sync_next_expected_num)); if (state == lib_catchup) - return; + return false; set_state(head_catchup); + c->fork_head = id; + c->fork_head_num = num; } else { c->fork_head = block_id_type(); @@ -1725,6 +1751,7 @@ namespace eosio { } req.req_trx.mode = none; c->enqueue( req ); + return true; } void sync_manager::recv_notice(const connection_ptr& c, const notice_message& msg) { @@ -1738,7 +1765,14 @@ namespace eosio { if (msg.known_blocks.ids.size() == 0) { elog("got a catch up with ids size = 0"); } else { - verify_catchup(c, msg.known_blocks.pending, msg.known_blocks.ids.back()); + const block_id_type& id = msg.known_blocks.ids.back(); + controller& cc = chain_plug->chain(); + if( !cc.fetch_block_by_id( id ) ) { + verify_catchup( c, msg.known_blocks.pending, id ); + } else { + // we already have the block, so update peer with our view of the world + c->send_handshake(); + } } } else { @@ -1750,7 +1784,7 @@ namespace eosio { void sync_manager::rejected_block(const connection_ptr& c, uint32_t blk_num) { if (state != in_sync ) { - fc_ilog(logger, "block ${bn} not accepted from ${p}",("bn",blk_num)("p",c->peer_name())); + fc_wlog( logger, "block ${bn} not accepted from ${p}, closing connection", ("bn",blk_num)("p",c->peer_name()) ); sync_last_requested_num = 0; source.reset(); my_impl->close(c); @@ -1759,11 +1793,11 @@ namespace eosio { } } void sync_manager::recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num) { - fc_dlog(logger," got block ${bn} from ${p}",("bn",blk_num)("p",c->peer_name())); + fc_dlog(logger, "got block ${bn} from ${p}",("bn",blk_num)("p",c->peer_name())); if (state == lib_catchup) { if (blk_num != sync_next_expected_num) { - fc_ilog(logger, "expected block ${ne} but got ${bn}",("ne",sync_next_expected_num)("bn",blk_num)); - my_impl->close(c); + fc_wlog( logger, "expected block ${ne} but got ${bn}, from connection: ${p}", + ("ne",sync_next_expected_num)("bn",blk_num)("p",c->peer_name()) ); return; } sync_next_expected_num = blk_num + 1; @@ -1786,6 +1820,10 @@ namespace eosio { set_state(head_catchup); } } + + if (state == in_sync) { + send_handshakes(); + } } else if (state == lib_catchup) { if( blk_num == sync_known_lib_num ) { @@ -2142,50 +2180,43 @@ namespace eosio { connection_wptr weak_conn = c; // Note: need to add support for IPv6 too + auto resolver = std::make_shared( std::ref(app().get_io_service()) ); resolver->async_resolve( query, - [weak_conn, this]( const boost::system::error_code& err, - tcp::resolver::iterator endpoint_itr ){ - auto c = weak_conn.lock(); - if (!c) return; - if( !err ) { - connect( c, endpoint_itr ); - } else { - elog( "Unable to resolve ${peer_addr}: ${error}", - ( "peer_addr", c->peer_name() )("error", err.message() ) ); - } - }); - } - - void net_plugin_impl::connect(const connection_ptr& c, tcp::resolver::iterator endpoint_itr) { + [weak_conn, resolver, this]( const boost::system::error_code& err, tcp::resolver::results_type endpoints ) { + auto c = weak_conn.lock(); + if( !c ) return; + if( !err ) { + connect( c, resolver, endpoints ); + } else { + elog( "Unable to resolve ${peer_addr}: ${error}", + ("peer_addr", c->peer_name())( "error", err.message()) ); + } + } ); + } + + void net_plugin_impl::connect( const connection_ptr& c, const std::shared_ptr& resolver, tcp::resolver::results_type endpoints ) { if( c->no_retry != go_away_reason::no_reason) { string rsn = reason_str(c->no_retry); return; } - auto current_endpoint = *endpoint_itr; - ++endpoint_itr; c->connecting = true; c->pending_message_buffer.reset(); c->connecting_deadline = fc::time_point::now()+fc::seconds(c->connecting_timeout_in_seconds); + c->buffer_queue.clear_out_queue(); connection_wptr weak_conn = c; - c->socket->async_connect( current_endpoint, [weak_conn, endpoint_itr, this] ( const boost::system::error_code& err ) { + boost::asio::async_connect( *c->socket, endpoints, + [weak_conn, resolver, socket=c->socket, this]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { auto c = weak_conn.lock(); - if (!c) return; - if( !err && c->socket->is_open() ) { - if (start_session( c )) { + if( !c ) return; + if( !err && c->socket->is_open()) { + if( start_session( c )) { c->send_handshake(); send_p2p_request(c); - } - } else { - if( endpoint_itr != tcp::resolver::iterator() ) { - close(c); - connect( c, endpoint_itr ); - } - else { - elog( "connection failed to ${peer}: ${error}", - ( "peer", c->peer_name())("error",err.message())); - c->connecting = false; - my_impl->close(c); } + } else { + elog( "connection failed to ${peer}: ${error}", ("peer", c->peer_name())( "error", err.message()) ); + c->connecting = false; + my_impl->close( c ); } } ); } @@ -2223,7 +2254,6 @@ namespace eosio { } bool net_plugin_impl::start_session(const connection_ptr& con) { - boost::asio::ip::tcp::no_delay nodelay( true ); boost::system::error_code ec; con->socket->set_option( nodelay, ec ); @@ -2251,9 +2281,10 @@ namespace eosio { if( !ec ) { uint32_t visitors = 0; uint32_t from_addr = 0; - auto paddr = socket->remote_endpoint(ec).address(); - if (ec) { - fc_elog(logger,"Error getting remote endpoint: ${m}",("m", ec.message())); + boost::system::error_code rec; + auto paddr = socket->remote_endpoint(rec).address(); + if (rec) { + fc_elog(logger,"Error getting remote endpoint: ${m}",("m", rec.message())); } else { for (auto &conn : connections) { @@ -2268,7 +2299,7 @@ namespace eosio { } } if (num_clients != visitors) { - ilog("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients)); + fc_ilog( logger,"checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients) ); num_clients = visitors; } if( from_addr < max_nodes_per_host && (max_client_count == 0 || num_clients < max_client_count )) { @@ -2291,7 +2322,7 @@ namespace eosio { } } } else { - elog( "Error accepting connection: ${m}",( "m", ec.message() ) ); + fc_elog( logger, "Error accepting connection: ${m}",( "m", ec.message() ) ); // For the listed error codes below, recall start_listen_loop() switch (ec.value()) { case ECONNABORTED: @@ -2335,53 +2366,54 @@ namespace eosio { }; if( conn->buffer_queue.write_queue_size() > def_max_write_queue_size || - conn->reads_in_flight > def_max_reads_in_flight || conn->trx_in_progress_size > def_max_trx_in_progress_size ) { // too much queued up, reschedule if( conn->buffer_queue.write_queue_size() > def_max_write_queue_size ) { peer_wlog( conn, "write_queue full ${s} bytes", ("s", conn->buffer_queue.write_queue_size()) ); - } else if( conn->reads_in_flight > def_max_reads_in_flight ) { - peer_wlog( conn, "max reads in flight ${s}", ("s", conn->reads_in_flight) ); } else { peer_wlog( conn, "max trx in progress ${s} bytes", ("s", conn->trx_in_progress_size) ); } if( conn->buffer_queue.write_queue_size() > 2*def_max_write_queue_size || - conn->reads_in_flight > 2*def_max_reads_in_flight || conn->trx_in_progress_size > 2*def_max_trx_in_progress_size ) { - fc_wlog( logger, "queues over full, giving up on connection ${p}", ("p", conn->peer_name()) ); + fc_elog( logger, "queues over full, giving up on connection ${p}", ("p", conn->peer_name()) ); + fc_elog( logger, " write_queue ${s} bytes", ("s", conn->buffer_queue.write_queue_size()) ); + fc_elog( logger, " max trx in progress ${s} bytes", ("s", conn->trx_in_progress_size) ); my_impl->close( conn ); return; } if( !conn->read_delay_timer ) return; conn->read_delay_timer->expires_from_now( def_read_delay_for_full_write_queue ); - conn->read_delay_timer->async_wait([this, weak_conn]( boost::system::error_code ec ) { - if ( ec == boost::asio::error::operation_aborted ) return; - auto conn = weak_conn.lock(); - if( !conn ) return; - start_read_message( conn ); + conn->read_delay_timer->async_wait( [this, weak_conn]( boost::system::error_code ec ) { + auto conn = weak_conn.lock(); + if( !conn ) return; + if( !ec ) { + start_read_message( conn ); + } else { + fc_elog( logger, "Read delay timer error: ${e}, closing connection: ${p}", + ("e", ec.message())("p",conn->peer_name()) ); + close( conn ); + } } ); return; } - ++conn->reads_in_flight; boost::asio::async_read(*conn->socket, - conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler, - [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) { + conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler, + [this,weak_conn,socket=conn->socket]( boost::system::error_code ec, std::size_t bytes_transferred ) { auto conn = weak_conn.lock(); - if (!conn || !conn->socket || !conn->socket->is_open()) { + if (!conn || !conn->socket || !conn->socket->is_open() || !socket->is_open()) { return; } - --conn->reads_in_flight; conn->outstanding_read_bytes.reset(); try { if( !ec ) { if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) { - elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}", - ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write())); + fc_elog( logger,"async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}", + ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()) ); } EOS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, ""); conn->pending_message_buffer.advance_write_ptr(bytes_transferred); @@ -2407,7 +2439,7 @@ namespace eosio { if (bytes_in_buffer >= total_message_bytes) { conn->pending_message_buffer.advance_read_ptr(message_header_size); - if (!conn->process_next_message(*this, message_length)) { + if (!process_next_message(conn, message_length)) { return; } } else { @@ -2426,34 +2458,55 @@ namespace eosio { } else { auto pname = conn->peer_name(); if (ec.value() != boost::asio::error::eof) { - elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) ); + fc_elog( logger, "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) ); } else { - ilog( "Peer ${p} closed connection",("p",pname) ); + fc_ilog( logger, "Peer ${p} closed connection",("p",pname) ); } close( conn ); } } catch(const std::exception &ex) { - string pname = conn ? conn->peer_name() : "no connection name"; - elog("Exception in handling read data from ${p} ${s}",("p",pname)("s",ex.what())); + fc_elog( logger, "Exception in handling read data from ${p}: ${s}", + ("p",conn->peer_name())("s",ex.what()) ); close( conn ); } catch(const fc::exception &ex) { - string pname = conn ? conn->peer_name() : "no connection name"; - elog("Exception in handling read data ${s}", ("p",pname)("s",ex.to_string())); + fc_elog( logger, "Exception in handling read data from ${p}: ${s}", + ("p",conn->peer_name())("s",ex.to_string()) ); close( conn ); } catch (...) { - string pname = conn ? conn->peer_name() : "no connection name"; - elog( "Undefined exception hanlding the read data from connection ${p}",( "p",pname)); + fc_elog( logger, "Undefined exception handling the read data from ${p}",( "p",conn->peer_name()) ); close( conn ); } - } ); + }); } catch (...) { string pname = conn ? conn->peer_name() : "no connection name"; - elog( "Undefined exception handling reading ${p}",("p",pname) ); + fc_elog( logger, "Undefined exception handling reading ${p}",("p",pname) ); + close( conn ); + } + } + + bool net_plugin_impl::process_next_message(const connection_ptr& conn, uint32_t message_length) { + try { + auto ds = conn->pending_message_buffer.create_datastream(); + net_message msg; + fc::raw::unpack( ds, msg ); + msg_handler m( *this, conn ); + if( msg.contains() ) { + m( std::move( msg.get() ) ); + } else if( msg.contains() ) { + m( std::move( msg.get() ) ); + } else { + msg.visit( m ); + } + } catch( const fc::exception& e ) { + fc_elog( logger, "Exception in handling message from ${p}: ${s}", + ("p", conn->peer_name())("s", e.to_detail_string()) ); close( conn ); + return false; } + return true; } size_t net_plugin_impl::count_open_sockets() const @@ -2482,20 +2535,20 @@ namespace eosio { // affecting state. bool valid = true; if (msg.last_irreversible_block_num > msg.head_num) { - wlog("Handshake message validation: last irreversible block (${i}) is greater than head block (${h})", - ("i", msg.last_irreversible_block_num)("h", msg.head_num)); + fc_wlog( logger, "Handshake message validation: last irreversible block (${i}) is greater than head block (${h})", + ("i", msg.last_irreversible_block_num)("h", msg.head_num) ); valid = false; } if (msg.p2p_address.empty()) { - wlog("Handshake message validation: p2p_address is null string"); + fc_wlog( logger, "Handshake message validation: p2p_address is null string" ); valid = false; } if (msg.os.empty()) { - wlog("Handshake message validation: os field is null string"); + fc_wlog( logger, "Handshake message validation: os field is null string" ); valid = false; } if ((msg.sig != chain::signature_type() || msg.token != sha256()) && (msg.token != fc::sha256::hash(msg.time))) { - wlog("Handshake message validation: token field invalid"); + fc_wlog( logger, "Handshake message validation: token field invalid" ); valid = false; } return valid; @@ -2558,7 +2611,6 @@ namespace eosio { void net_plugin_impl::handle_message(const connection_ptr& c, const handshake_message& msg) { - peer_ilog(c, "received handshake_message"); if (!is_valid(msg)) { peer_elog( c, "bad handshake message"); @@ -2573,7 +2625,7 @@ namespace eosio { } if (msg.generation == 1) { if( msg.node_id == node_id) { - elog( "Self connection detected. Closing connection"); + fc_elog( logger, "Self connection detected. Closing connection" ); c->enqueue( go_away_message( self ) ); return; } @@ -2666,10 +2718,7 @@ namespace eosio { } void net_plugin_impl::handle_message(const connection_ptr& c, const go_away_message& msg) { - string rsn = reason_str( msg.reason ); - peer_ilog(c, "received go_away_message"); - ilog( "received a go away message from ${p}, reason = ${r}", - ("p", c->peer_name())("r",rsn)); + peer_wlog(c, "received go_away_message, reason = ${r}", ("r",reason_str( msg.reason )) ); c->no_retry = msg.reason; if(msg.reason == duplicate ) { c->node_id = msg.node_id; @@ -2731,6 +2780,17 @@ namespace eosio { break; } case catch_up : { + if( msg.known_trx.pending > 0) { + // plan to get all except what we already know about. + req.req_trx.mode = catch_up; + send_req = true; + size_t known_sum = local_txns.size(); + if( known_sum ) { + for( const auto& t : local_txns.get() ) { + req.req_trx.ids.push_back( t.id ); + } + } + } break; } case normal: { @@ -2791,17 +2851,14 @@ namespace eosio { switch (msg.req_trx.mode) { case catch_up : + c->txn_send_pending(msg.req_trx.ids); + break; + case normal : + c->txn_send(msg.req_trx.ids); break; case none : if(msg.req_blocks.mode == none) c->stop_send(); - // no break - case normal : - if( !msg.req_trx.ids.empty() ) { - elog( "Invalid request_message, req_trx.ids.size ${s}", ("s", msg.req_trx.ids.size()) ); - close(c); - return; - } break; default:; } @@ -2878,7 +2935,6 @@ namespace eosio { auto ptrx = std::make_shared( trx ); const auto& tid = ptrx->id; - c->cancel_wait(); if(local_txns.get().find(tid) != local_txns.end()) { fc_dlog(logger, "got a duplicate transaction - dropping"); return; @@ -2932,7 +2988,7 @@ namespace eosio { } } catch( ...) { // should this even be caught? - elog("Caught an unknown exception trying to recall blockID"); + fc_elog( logger,"Caught an unknown exception trying to recall blockID" ); } dispatcher->recv_block(c, blk_id, blk_num); @@ -3439,11 +3495,12 @@ namespace eosio { void handshake_initializer::populate( handshake_message &hello) { + namespace sc = std::chrono; hello.network_version = net_version_base + net_version; hello.chain_id = my_impl->chain_id; hello.node_id = my_impl->node_id; hello.key = my_impl->get_authentication_key(); - hello.time = std::chrono::system_clock::now().time_since_epoch().count(); + hello.time = sc::duration_cast(sc::system_clock::now().time_since_epoch()).count(); hello.token = fc::sha256::hash(hello.time); hello.sig = my_impl->sign_compact(hello.key, hello.token); // If we couldn't sign, don't send a token. @@ -3556,37 +3613,14 @@ namespace eosio { my->p2p_discoverable=options.at( "p2p-discoverable" ).as(); - my->resolver = std::make_shared( std::ref( app().get_io_service())); if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as().length()) { my->p2p_address = options.at( "p2p-listen-endpoint" ).as(); - auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' )); - auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size()); - idump((host)( port )); - tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str()); - // Note: need to add support for IPv6 too? - - my->listen_endpoint = *my->resolver->resolve( query ); - - my->acceptor.reset( new tcp::acceptor( app().get_io_service())); - - if( options.count( "p2p-server-address" )) { - my->p2p_address = options.at( "p2p-server-address" ).as(); - } else { - if( my->listen_endpoint.address().to_v4() == address_v4::any()) { - boost::system::error_code ec; - auto host = host_name( ec ); - if( ec.value() != boost::system::errc::success ) { - - FC_THROW_EXCEPTION( fc::invalid_arg_exception, - "Unable to retrieve host_name. ${msg}", ("msg", ec.message())); - - } - auto port = my->p2p_address.substr( my->p2p_address.find( ':' ), my->p2p_address.size()); - my->p2p_address = host + port; - } - } + } + if( options.count( "p2p-server-address" ) ) { + my->p2p_server_address = options.at( "p2p-server-address" ).as(); } + if( options.count( "p2p-peer-address" )) { my->supplied_peers = options.at( "p2p-peer-address" ).as >(); } @@ -3635,84 +3669,130 @@ namespace eosio { fc::rand_pseudo_bytes( my->node_id.data(), my->node_id.data_size()); ilog( "my node_id is ${id}", ("id", my->node_id)); - my->keepalive_timer.reset( new boost::asio::steady_timer( app().get_io_service())); - my->ticker(); - my->pbft_message_cache_timer.reset( new boost::asio::steady_timer( app().get_io_service())); - my->pbft_message_cache_ticker(); } FC_LOG_AND_RETHROW() } void net_plugin::plugin_startup() { - my->producer_plug = app().find_plugin(); - if( my->acceptor ) { - my->acceptor->open(my->listen_endpoint.protocol()); - my->acceptor->set_option(tcp::acceptor::reuse_address(true)); - try { - my->acceptor->bind(my->listen_endpoint); - } catch (const std::exception& e) { - ilog("net_plugin::plugin_startup failed to bind to port ${port}", - ("port", my->listen_endpoint.port())); - throw e; - } - my->acceptor->listen(); - ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count)); - my->start_listen_loop(); - } - chain::controller&cc = my->chain_plug->chain(); - { - cc.accepted_block.connect( boost::bind(&net_plugin_impl::accepted_block, my.get(), _1)); - } + try { + my->producer_plug = app().find_plugin(); - my->incoming_transaction_ack_subscription = app().get_channel().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1)); - my->pbft_outgoing_prepare_subscription = app().get_channel().subscribe( - boost::bind(&net_plugin_impl::pbft_outgoing_prepare, my.get(), _1)); - my->pbft_outgoing_commit_subscription = app().get_channel().subscribe( - boost::bind(&net_plugin_impl::pbft_outgoing_commit, my.get(), _1)); - my->pbft_outgoing_view_change_subscription = app().get_channel().subscribe( - boost::bind(&net_plugin_impl::pbft_outgoing_view_change, my.get(), _1)); - my->pbft_outgoing_new_view_subscription = app().get_channel().subscribe( - boost::bind(&net_plugin_impl::pbft_outgoing_new_view, my.get(), _1)); - my->pbft_outgoing_checkpoint_subscription = app().get_channel().subscribe( - boost::bind(&net_plugin_impl::pbft_outgoing_checkpoint, my.get(), _1)); + auto resolver = std::make_shared( std::ref(app().get_io_service()) ); + if( my->p2p_address.size() > 0 ) { + auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' )); + auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size()); + tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str()); + // Note: need to add support for IPv6 too? - if( cc.get_read_mode() == chain::db_read_mode::READ_ONLY ) { - my->max_nodes_per_host = 0; - ilog( "node in read-only mode setting max_nodes_per_host to 0 to prevent connections" ); - } + my->listen_endpoint = *resolver->resolve( query ); - my->start_monitors(); + my->acceptor.reset( new tcp::acceptor( std::ref(app().get_io_service())) ); - for( auto seed_node : my->supplied_peers ) { - p2p_peer_record p2prcd; - p2prcd.peer_address=seed_node; - p2prcd.discoverable=false; - p2prcd.is_config=true; - p2prcd.connected=false; - p2prcd.expiry=time_point_sec((time_point::now()).sec_since_epoch()+10); - my->p2p_peer_records.insert(pair(seed_node,p2prcd)); + if( !my->p2p_server_address.empty() ) { + my->p2p_address = my->p2p_server_address; + } else { + if( my->listen_endpoint.address().to_v4() == address_v4::any()) { + boost::system::error_code ec; + auto host = host_name( ec ); + if( ec.value() != boost::system::errc::success ) { - connect( seed_node ); - } + FC_THROW_EXCEPTION( fc::invalid_arg_exception, + "Unable to retrieve host_name. ${msg}", ("msg", ec.message())); - if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end()) - logger = fc::get_logger_map()[logger_name]; + } + auto port = my->p2p_address.substr( my->p2p_address.find( ':' ), my->p2p_address.size()); + my->p2p_address = host + port; + } + } + } + + if (my->acceptor) { + my->acceptor->open(my->listen_endpoint.protocol()); + my->acceptor->set_option(tcp::acceptor::reuse_address(true)); + try { + my->acceptor->bind(my->listen_endpoint); + } catch (const std::exception &e) { + elog("net_plugin::plugin_startup failed to bind to port ${port}", + ("port", my->listen_endpoint.port())); + throw e; + } + my->acceptor->listen(); + ilog("starting listener, max clients is ${mc}", ("mc", my->max_client_count)); + my->start_listen_loop(); + } + + chain::controller &cc = my->chain_plug->chain(); + { + cc.accepted_block.connect(boost::bind(&net_plugin_impl::accepted_block, my.get(), _1)); + } + + my->keepalive_timer.reset(new boost::asio::steady_timer(app().get_io_service())); + my->ticker(); + my->pbft_message_cache_timer.reset(new boost::asio::steady_timer(app().get_io_service())); + my->pbft_message_cache_ticker(); + + my->incoming_transaction_ack_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1)); + my->pbft_outgoing_prepare_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::pbft_outgoing_prepare, my.get(), _1)); + my->pbft_outgoing_commit_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::pbft_outgoing_commit, my.get(), _1)); + my->pbft_outgoing_view_change_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::pbft_outgoing_view_change, my.get(), _1)); + my->pbft_outgoing_new_view_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::pbft_outgoing_new_view, my.get(), _1)); + my->pbft_outgoing_checkpoint_subscription = app().get_channel().subscribe( + boost::bind(&net_plugin_impl::pbft_outgoing_checkpoint, my.get(), _1)); + + if (cc.get_read_mode() == chain::db_read_mode::READ_ONLY) { + my->max_nodes_per_host = 0; + ilog("node in read-only mode setting max_nodes_per_host to 0 to prevent connections"); + } + + my->start_monitors(); + + for (auto seed_node : my->supplied_peers) { + p2p_peer_record p2prcd; + p2prcd.peer_address = seed_node; + p2prcd.discoverable = false; + p2prcd.is_config = true; + p2prcd.connected = false; + p2prcd.expiry = time_point_sec((time_point::now()).sec_since_epoch() + 10); + my->p2p_peer_records.insert(pair(seed_node, p2prcd)); + + connect(seed_node); + } + + if (fc::get_logger_map().find(logger_name) != fc::get_logger_map().end()) + logger = fc::get_logger_map()[logger_name]; + } catch (...) { + // always want plugin_shutdown even on exception + plugin_shutdown(); + throw; + } } void net_plugin::plugin_shutdown() { try { - ilog( "shutdown.." ); + fc_ilog( logger, "shutdown.." ); + if( my->connector_check ) + my->connector_check->cancel(); + if( my->transaction_check ) + my->transaction_check->cancel(); + if( my->keepalive_timer ) + my->keepalive_timer->cancel(); + my->done = true; if( my->acceptor ) { ilog( "close acceptor" ); + my->acceptor->cancel(); my->acceptor->close(); ilog( "close ${s} connections",( "s",my->connections.size()) ); - auto cons = my->connections; - for( auto con : cons ) { - my->close( con); + for( auto& con : my->connections ) { + fc_dlog( logger, "close: ${p}", ("p",con->peer_name()) ); + my->close( con ); } - - my->acceptor.reset(nullptr); + my->connections.clear(); } ilog( "exit shutdown" ); } @@ -3742,6 +3822,7 @@ namespace eosio { for( auto itr = my->connections.begin(); itr != my->connections.end(); ++itr ) { if( (*itr)->peer_addr == host ) { (*itr)->reset(); + fc_ilog( logger, "disconnecting: ${p}", ("p", (*itr)->peer_name()) ); my->close(*itr); my->connections.erase(itr); return "connection removed"; @@ -3778,7 +3859,8 @@ namespace eosio { for (auto const &c: my->connections) { if (c->current()) { - my->sync_master->sync_stable_checkpoints(c, head); + auto requested = my->sync_master->sync_stable_checkpoints(c, head); + if (!requested) break; } } diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index f9841368840..6232a711221 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -762,6 +762,7 @@ void producer_plugin::plugin_startup() _trx_trace_log = logger_map[trx_trace_logger_name]; } + try { ilog("producer plugin: plugin_startup() begin"); chain::controller& chain = my->chain_plug->chain(); @@ -797,6 +798,11 @@ void producer_plugin::plugin_startup() my->schedule_production_loop(); ilog("producer plugin: plugin_startup() end"); + } catch( ... ) { + // always call plugin_shutdown, even on exception + plugin_shutdown(); + throw; + } } FC_CAPTURE_AND_RETHROW() } void producer_plugin::plugin_shutdown() { diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 67b61587440..946f6e6e91f 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -345,7 +345,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thisreceipt) { + if (p->receipt && trace_log) { if (is_onblock(p)) onblock_trace = p; else if (p->failed_dtrx_trace) diff --git a/programs/cleos/main.cpp b/programs/cleos/main.cpp index 9fe50efa771..8f9f3a97590 100644 --- a/programs/cleos/main.cpp +++ b/programs/cleos/main.cpp @@ -1320,9 +1320,50 @@ struct get_transaction_id_subcommand { get_transaction_id->set_callback([&] { try { - auto trx_var = json_from_file_or_string(trx_to_check); - auto trx = trx_var.as(); - std::cout << string(trx.id()) << std::endl; + fc::variant trx_var = json_from_file_or_string(trx_to_check); + if( trx_var.is_object() ) { + fc::variant_object& vo = trx_var.get_object(); + // if actions.data & actions.hex_data provided, use the hex_data since only currently support unexploded data + if( vo.contains("actions") ) { + if( vo["actions"].is_array() ) { + fc::mutable_variant_object mvo = vo; + fc::variants& action_variants = mvo["actions"].get_array(); + for( auto& action_v : action_variants ) { + if( !action_v.is_object() ) { + std::cerr << "Empty 'action' in transaction" << endl; + return; + } + fc::variant_object& action_vo = action_v.get_object(); + if( action_vo.contains( "data" ) && action_vo.contains( "hex_data" ) ) { + fc::mutable_variant_object maction_vo = action_vo; + maction_vo["data"] = maction_vo["hex_data"]; + action_vo = maction_vo; + vo = mvo; + } else if( action_vo.contains( "data" ) ) { + if( !action_vo["data"].is_string() ) { + std::cerr << "get transaction_id only supports un-exploded 'data' (hex form)" << std::endl; + return; + } + } + } + } else { + std::cerr << "transaction json 'actions' is not an array" << std::endl; + return; + } + } else { + std::cerr << "transaction json does not include 'actions'" << std::endl; + return; + } + auto trx = trx_var.as(); + transaction_id_type id = trx.id(); + if( id == transaction().id() ) { + std::cerr << "file/string does not represent a transaction" << std::endl; + } else { + std::cout << string( id ) << std::endl; + } + } else { + std::cerr << "file/string does not represent a transaction" << std::endl; + } } EOS_RETHROW_EXCEPTIONS(transaction_type_exception, "Fail to parse transaction JSON '${data}'", ("data",trx_to_check)) }); } diff --git a/programs/eosio-launcher/main.cpp b/programs/eosio-launcher/main.cpp index 297adde7296..677d0a56235 100644 --- a/programs/eosio-launcher/main.cpp +++ b/programs/eosio-launcher/main.cpp @@ -1615,20 +1615,35 @@ launcher_def::kill (launch_modes mode, string sig_opt) { case LM_LOCAL: case LM_REMOTE : { bfs::path source = "last_run.json"; - fc::json::from_file(source).as(last_run); - for (auto &info : last_run.running_nodes) { - if (mode == LM_ALL || (info.remote && mode == LM_REMOTE) || - (!info.remote && mode == LM_LOCAL)) { - if (info.pid_file.length()) { - string pid; - fc::json::from_file(info.pid_file).as(pid); - string kill_cmd = "kill " + sig_opt + " " + pid; - boost::process::system (kill_cmd); - } - else { - boost::process::system (info.kill_cmd); - } - } + try { + fc::json::from_file( source ).as( last_run ); + for( auto& info : last_run.running_nodes ) { + if( mode == LM_ALL || (info.remote && mode == LM_REMOTE) || + (!info.remote && mode == LM_LOCAL) ) { + try { + if( info.pid_file.length() ) { + string pid; + fc::json::from_file( info.pid_file ).as( pid ); + string kill_cmd = "kill " + sig_opt + " " + pid; + boost::process::system( kill_cmd ); + } else { + boost::process::system( info.kill_cmd ); + } + } catch( fc::exception& fce ) { + cerr << "unable to kill fc::exception=" << fce.to_detail_string() << endl; + } catch( std::exception& stde ) { + cerr << "unable to kill std::exception=" << stde.what() << endl; + } catch( ... ) { + cerr << "Unable to kill" << endl; + } + } + } + } catch( fc::exception& fce ) { + cerr << "unable to open " << source << " fc::exception=" << fce.to_detail_string() << endl; + } catch( std::exception& stde ) { + cerr << "unable to open " << source << " std::exception=" << stde.what() << endl; + } catch( ... ) { + cerr << "Unable to open " << source << endl; } } }