同步区块--p2p通信--notice_message

2019-07-02 09:38:06

上篇笔记写到远程节点给本地节点发送notice消息,通知本地节点同步区块。本篇笔记继续学习本地节点如何从远程节点同步区块,这次重点写notice消息的发送与处理过程。

1、notice_message消息结构

//消息结构 定义部分
struct notice_message {
  notice_message () : known_trx(), known_blocks() {}
  ordered_txn_ids known_trx;
  ordered_blk_ids known_blocks;
};
//选择查看定义,跳转到
using ordered_txn_ids = select_ids<transaction_id_type>;
using ordered_blk_ids = select_ids<block_id_type>;
//选择查看定义,跳转到
template<typename T>
struct select_ids {
  select_ids () : mode(none),pending(0),ids() {}
  id_list_modes  mode;
  uint32_t       pending;
  vector<T>      ids;        //模板类
  bool           empty () const { return (mode == none || ids.empty()); }
};

notice_message消息类型包括id_list_modes类型的变量,该类型为一个枚举类型,包括` enum id_list_modes {

none,
catch_up,
last_irr_catch_up,
normal

};`,pending表示区块数目。

2、发送notice_message消息

远程节点给本地节点发送notice_message,通知本地节点同步到不可逆区块。

//远程节点给本地节点发送的通知消息内容 消息调用部分
if (lib_num > msg.head_num ) {
   fc_dlog(logger, "sync check state 2");
   if (msg.generation > 1 || c->protocol_version > proto_base) {
      notice_message note;
      note.known_trx.pending = lib_num;
      note.known_trx.mode = last_irr_catch_up;   //类型为不可逆区块类型
      note.known_blocks.mode = last_irr_catch_up;
      note.known_blocks.pending = head;
      c->enqueue( note );
   }
   c->syncing = true;
   return;
}

远程节点发送的notice_message消息内容比较简单,包括不可逆区块数、最新区块数、同步方式(同步到不可逆区块或者同步到最新区块),然后放入到消息队列等待发送出去。

3、接收notice_message消息

同上篇笔记所写的一样,处理notice_message消息同样在一个handle_message的重载函数里进行,参数为当前通信的远程节点的connect对象指针和消息。

void net_plugin_impl::handle_message( connection_ptr c, const notice_message &msg) {
      // peer tells us about one or more blocks or txns. When done syncing, forward on
      // notices of previously unknown blocks or txns,
      //
      peer_ilog(c, "received notice_message");
      c->connecting = false;
      request_message req;
      bool send_req = false;
      if (msg.known_trx.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_trx.mode))("n",msg.known_trx.pending));
      }
      switch (msg.known_trx.mode) {
      case none:
         break;
      case last_irr_catch_up: {
         c->last_handshake_recv.head_num = msg.known_trx.pending;
         req.req_trx.mode = none;
         break;
      }
      case catch_up : {
         if( msg.known_trx.pending > 0) {
            // plan to get all except what we already know about.
            req.req_trx.mode = catch_up;
            send_req = true;
            size_t known_sum = local_txns.size();
            if( known_sum ) {
               for( const auto& t : local_txns.get<by_id>( ) ) {
                  req.req_trx.ids.push_back( t.id );
               }
            }
         }
         break;
      }
      case normal: {
         dispatcher->recv_notice (c, msg, false);
      }
      }

      if (msg.known_blocks.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_blocks.mode))("n",msg.known_blocks.pending));
      }
      switch (msg.known_blocks.mode) {
      case none : {
         if (msg.known_trx.mode != normal) {
            return;
         }
         break;
      }
      case last_irr_catch_up: //同步到最新区块和同步到不可逆区块 调用的函数相同
      case catch_up: {
         sync_master->recv_notice(c,msg); //通过sync_master进行区块同步
         break;
      }
      case normal : {
         dispatcher->recv_notice (c, msg, false);
         break;
      }
      default: {
         peer_elog(c, "bad notice_message : invalid known_blocks.mode ${m}",("m",static_cast<uint32_t>(msg.known_blocks.mode)));
      }
      }
      fc_dlog(logger, "send req = ${sr}", ("sr",send_req));
      if( send_req) {
         c->enqueue(req);
      }
   }

由于远程节点发送的notice_message数据包内容为同步到不可逆区块,mode为last_irr_catch_up,所以代码中调用sync_master->recv_notice(c,msg)函数来进行区块同步。此函数同步区块分为两种情况

   void sync_manager::recv_notice (connection_ptr c, const notice_message &msg) {
      fc_ilog (logger, "sync_manager got ${m} block notice",("m",modes_str(msg.known_blocks.mode)));
      if (msg.known_blocks.mode == catch_up) {
         if (msg.known_blocks.ids.size() == 0) {
            elog ("got a catch up with ids size = 0");
         }
         else {
             //同步到最新区块
            verify_catchup(c,  msg.known_blocks.pending, msg.known_blocks.ids.back());
         }
      }
      else {
         c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
         reset_lib_num (c);
         //同步到不可逆区块
         start_sync(c, msg.known_blocks.pending);
      }
   }

我们重点看同步到不可逆区块,函数为start_sync,参数为connection对象指针和消息中包含的最新区块数(不过我觉得应该是不可逆区块数)。

   void sync_manager::start_sync( connection_ptr c, uint32_t target) {
      if( target > sync_known_lib_num) {
         sync_known_lib_num = target;
      }

      if (!sync_required()) {
         uint32_t bnum = chain_plug->chain().last_irreversible_block_num();
         uint32_t hnum = chain_plug->chain().fork_db_head_block_num();
         fc_dlog( logger, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
                  ("b",bnum)("h",hnum)("t",target));
         return;
      }

      if (state == in_sync) {
         set_state(lib_catchup);
         sync_next_expected_num = chain_plug->chain().last_irreversible_block_num() + 1;
      }

      fc_ilog(logger, "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}",
              ( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));

      request_next_chunk(c);
   }

在这里,成员变量sync_known_lib_num赋值为sync_known_lib_num和target的最大值。该成员变量表示已知的当前不可逆区块数,可见上一步同步到不可逆区块的调用应该为start_sync(c, msg.known_trx.pending);,在这个函数里,主要是检查区块同步信息,然后调用request_next_chunk(c)函数进行区块同步。

   void sync_manager::request_next_chunk( connection_ptr conn ) {
      uint32_t head_block = chain_plug->chain().fork_db_head_block_num();
      ······ //省略代码
      if( sync_last_requested_num != sync_known_lib_num ) {
         uint32_t start = sync_next_expected_num;
         uint32_t end = start + sync_req_span - 1; // sync_req_span为配置文件中设置,每次同步多少个区块,默认是100个。
         if( end > sync_known_lib_num )
            end = sync_known_lib_num;
         if( end > 0 && end >= start ) {
            fc_ilog(logger, "requesting range ${s} to ${e}, from ${n}",
                    ("n",source->peer_name())("s",start)("e",end));
            source->request_sync_blocks(start, end);//同样是发送某种消息来同步区块
            sync_last_requested_num = end;
         }
      }
   }

查看request_next_chunk函数的功能,我们发现内部调用request_sync_blocks函数来发送同步消息,具体实现需要进入分析。

void connection::request_sync_blocks (uint32_t start, uint32_t end) {
      sync_request_message srm = {start,end};
      enqueue( net_message(srm));
      sync_wait();
   }

request_sync_blocks函数内部实现很简单,构造了一个sync_request_message类型的消息,然后放入到消息队列中,消息的内容为起始区块和结束区块,每次请求100个(config.ini中默认设置,可以修改)。
此时这篇笔记讨论的消息类型先到这里,下一篇笔记讨论sync_request_message类型的消息,分析本地节点是如何从远程同步区块的。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%94%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--notice_message.md

当前页面是本站的「Baidu MIP」版。发表评论请点击:完整版 »