Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net_plugin多线程整改(net_plugin部分) #17

Open
45 of 49 tasks
crazy-dog-zheng opened this issue Aug 29, 2019 · 4 comments
Open
45 of 49 tasks

net_plugin多线程整改(net_plugin部分) #17

crazy-dog-zheng opened this issue Aug 29, 2019 · 4 comments
Assignees

Comments

@crazy-dog-zheng
Copy link

crazy-dog-zheng commented Aug 29, 2019

当前net_plugin和其它plugin都是在同一个线程中运行的,现在希望能把net_plugin独立拿出来做成单独的net_plugin线程。


经分析,主要难度在于修改net_plugin与其它plugin之间的通信,下面是主要的一些通信方式:

  1. appbase::channel通信(本质是使用的boost::signals2::signal)
  2. appbase::method通信(本质也是使用的boost::signals2::signal)
  3. 使用原生的boost::signals2::signal
  4. 通过引入其它的plugin直接进行调用

net_plugin调用(通知)其它plugin的情况

  1. net_plugin发布消息的通道(在net_plugin新启动的子线程中通过post发送数据给net_plugin父线程函数A,然后由父线程函数A publish给其它plugin):
    • pbft_incoming_prepare_channel
      发布点: net_plugin中
      pbft_incoming_prepare_channel.publish(std::make_shared<pbft_message_metadata<pbft_prepare>>(std::move(pmm)));
      订阅点: chain_plugin中
      my->pbft_incoming_prepare_channel.subscribe
    • pbft_incoming_commit_channel
      发布点: net_plugin中
      pbft_incoming_commit_channel.publish(std::make_shared<pbft_message_metadata<pbft_commit>>(std::move(pmm)));
      订阅点: chain_plugin
      my->pbft_incoming_commit_channel.subscribe
    • pbft_incoming_view_change_channel
      发布点: net_plugin
      pbft_incoming_view_change_channel.publish(std::make_shared<pbft_message_metadata<pbft_view_change>>(std::move(pmm)));
      订阅点: chain_plugin
      my->pbft_incoming_view_change_channel.subscribe
    • pbft_incoming_new_view_channel
      发布点: net_plugin
      pbft_incoming_new_view_channel.publish(std::make_shared<pbft_message_metadata<pbft_new_view>>(std::move(pmm)));
      订阅点: chain_plugin
      my->pbft_incoming_new_view_channel.subscribe
    • pbft_incoming_checkpoint_channel
      发布点: net_plugin
      pbft_incoming_checkpoint_channel.publish(std::make_shared<pbft_message_metadata<pbft_checkpoint>>(std::move(pmm)));
      订阅点: chain_plugin
      my->pbft_incoming_checkpoint_channel.subscribe
  2. net_plugin中method相关:
  3. net_plugin中boost::signals2::signal相关:
  4. net_plugin中函数间接调用其它plugin的channel/method等
    • fc::logger extern std::unordered_mapstd::string,logger& get_logger_map(); 是否线程安全??
      • 两个方面整改,1. 日志需要打印线程名称;2. 线程之间打印日志不能混乱(需要加锁);
    • sync_manager
    • dispatch_manager
    • struct by_expiry;
    • chain_plug,
      • uint32_t head_num = cc.fork_db_head_block_num();
      • lib_id = cc.last_irreversible_block_id();
      • sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
      • head_id = cc.fork_db_head_block_id();
      • signed_block_ptr sb = cc.fetch_block_by_number(num)
      • signed_block_ptr b = cc.fetch_block_by_id(blkid);
      • uint32_t lscb_num = cc.last_stable_checkpoint_block_num();
      • auto head_num = cc.head_block_num();
      • chain_plug->pbft_ctrl().pbft_db.get_checkpoint_interval();
      • block_id_type peer_lib_id = cc.get_block_id_for_num( peer_lib);
      • auto scp = pcc.pbft_db.get_stable_checkpoint_by_id(bid);
      • cc.get_read_mode()
      • chain_plug->accept_transaction(ptrx,
      • auto scp = pcc.pbft_db.fetch_stable_checkpoint_from_blk_extn(msg);
      • pcc.pbft_db.checkpoint_local();
      • chain_plug->accept_block(msg);
      • pcc.pbft_db.is_valid_prepare(pmm.msg, pmm.sender_key)
      • pcc.pbft_db.is_valid_commit(pmm.msg, pmm.sender_key)
      • pcc.pbft_db.is_valid_view_change(pmm.msg, pmm.sender_key)
      • pcc.state_machine.get_current_view()
      • pcc.pbft_db.get_new_view_primary_key(pmm.msg.new_view)
      • pcc.pbft_db.is_valid_checkpoint(pmm.msg, pmm.sender_key)
      • pcc.pbft_db.is_valid_stable_checkpoint(msg, true)
      • my->chain_id = my->chain_plug->get_chain_id();
      • cc.accepted_block.connect( boost::bind(&net_plugin_impl::accepted_block, my.get(), _1));
      • cc.is_pbft_enabled()
    • producer_plug
      • producer_plug->is_producer_key(msg.key)
      • producer_plug->get_state()
      • producer_plug->sign_compact(signer, digest)
    • local_txns
    • net_plugin 依赖chain_plugin, 所以net_plugin可能有直接调用chain_plugin的行为

其它plugin调用net_plugin的情况

  1. net_plugin订阅的消息通道:
    • app().get_channelchannels::transaction_ack().subscribe
      • 有两个地方publish,都在producer_plugin
        1. _transaction_ack_channel.publish(std::pair<fc::exception_ptr, transaction_metadata_ptr>(response.getfc::exception_ptr(), trx));
        2. _transaction_ack_channel.publish(std::pair<fc::exception_ptr, transaction_metadata_ptr>(nullptr, trx));
    • app().get_channeleosio::chain::plugin_interface::pbft::outgoing::prepare_channel().subscribe
      • chain_plugin
        • my->pbft_outgoing_prepare_channel.publish( prepare );
    • app().get_channeleosio::chain::plugin_interface::pbft::outgoing::commit_channel().subscribe
      • chain_plugin
        • my->pbft_outgoing_commit_channel.publish( commit );
    • app().get_channeleosio::chain::plugin_interface::pbft::outgoing::view_change_channel().subscribe
      • chain_plugin
        • my->pbft_outgoing_view_change_channel.publish( view_change );
    • app().get_channeleosio::chain::plugin_interface::pbft::outgoing::new_view_channel().subscribe
      • chain_plugin
        • my->pbft_outgoing_new_view_channel.publish( new_view );
    • app().get_channeleosio::chain::plugin_interface::pbft::outgoing::checkpoint_channel().subscribe
      • chain_plugin
        • my->pbft_outgoing_checkpoint_channel.publish( checkpoint );
  2. net_plugin订阅的method:
  3. 直接使用boost::signals2::signal
  4. 其它plugin直接或间接调用
    1. txn_test_gen_plugin.cpp
      • auto peers_conn = app().get_plugin<net_plugin>().connections();
      • app().get_plugin<net_plugin>().disconnect(c.peer);
      • app().get_plugin<net_plugin>().connect(c.peer);
    2. net_api_plugin.cpp
      • CALL(net, net_mgr, connect,
        INVOKE_R_R(net_mgr, connect, std::string), 201),
      • CALL(net, net_mgr, disconnect,
        INVOKE_R_R(net_mgr, disconnect, std::string), 201),
      • CALL(net, net_mgr, status,
        INVOKE_R_R(net_mgr, status, std::string), 201),
      • CALL(net, net_mgr, connections,
        INVOKE_R_V(net_mgr, connections), 201),
    3. pbft_plugin.cpp
      • app().get_plugin<net_plugin>().maybe_sync_stable_checkpoints();
      • app().get_plugin<net_plugin>().is_syncing();
@crazy-dog-zheng crazy-dog-zheng self-assigned this Aug 29, 2019
@crazy-dog-zheng
Copy link
Author

  1. channel/method/boost::signals2::signal在多线程之后的解决方案
    image_from_ios
  2. 通过直接函数调用和被调用的函数的解决方案
    • 当前正在梳理,不同的调用点处理方式有一些差异

@crazy-dog-zheng
Copy link
Author

channel(boost::signals2::signal)
channel跨线程通信的解决办法:

以plugin A , plugin B通信举例

  1. 修改之前
    1. 创建一个channel x(创建者是A或者B都可以)
    2. B对x进行订阅,订阅的时候会注册一个回调函数func
    3. 待A在x上执行pubulish之后,func会被回调(即此时会执行B放置在回调函数中的逻辑)
  2. 修改之后(即A和B表示两个不同的plugin,1和2比表示两个线程,例如A1表示plugin A在线程1中运行的那部分代码)
    1. 下面展示A1发布任务,B2进行处理
    2. 创建一个channel x(创建者是A1或者B1)
    3. 在x上进行订阅,

@crazy-dog-zheng
Copy link
Author

对于channel(signals2)中传递的指针或者指针引用的解决方案:

在net_plugin订阅的回调函数中,需要对指针指向的数据进行deepcopy,然后再将数据boost::asio::post到子线程,从而避免竞争条件.

@crazy-dog-zheng crazy-dog-zheng changed the title net_plugin多线程整改 net_plugin多线程整改(net_plugin网络部分) Sep 9, 2019
@crazy-dog-zheng crazy-dog-zheng changed the title net_plugin多线程整改(net_plugin网络部分) net_plugin多线程整改(net_plugin部分) Sep 9, 2019
@crazy-dog-zheng
Copy link
Author

crazy-dog-zheng commented Sep 16, 2019


测试中遇到的问题

  • last_irreversible_block_num 一直停留在1202不能提升;
  • 发送signedblock非常慢,5-10秒1个;
  • 正常出块的场景,稍微加压(10~20TPS)就会core
info  2019-09-17T03:56:55.876 thread-1  net_plugin.cpp:1760           verify_catchup       ] got a catch_up notice while in in sync, fork head num = 1974 target LIB = 1973 next_expected = 1974
terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >'
  what():  close: Bad file descriptor
  • 测试的时候发现,部分handle_message是在主线程中进行调用的,导致async_write失败(socket是在子线程中创建的)
  • CPU占用率100%
  • 任务结束,Ctrl + C 会core;
  • view-change和new-view在所有节点都hard-replay之后没有生效。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant