您正在查看: EOS 分类下的文章

同步区块--p2p通信--sync_request_message与signed_block

这篇笔记写同步区块过程中,发送的最后两种消息类型--sync_request_message与signed_block。上篇笔记写到远程节点将链的信息(不可逆区块数、最新区块数、同步方式等)作为notice_message方式发送给本地节点,本地节点接收到notice_message消息,向远程节点发送sync_request_message消息同步区块。

1、同步区块过程图示

本地节点从远程节点同步不可逆区块

2、sync_request_message消息接收处理

继续上篇笔记中的内容,远程节点接收到本地节点发送过来的sync_request_message消息之后,处理函数:

   void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
      if( msg.end_block == 0) {   //结束区块数是否为0
         c->peer_requested.reset();
         c->flush_queues();
      } else {
         c->peer_requested = sync_state( msg.start_block,msg.end_block,msg.start_block-1);
         c->enqueue_sync_block();
      }
   }

本地节点发送的同步消息,默认为1-100的区块数,则处理函数进入enqueue_sync_block

   bool connection::enqueue_sync_block() {
      controller& cc = app().find_plugin<chain_plugin>()->chain();
      if (!peer_requested)
         return false;
      uint32_t num = ++peer_requested->last;
      bool trigger_send = num == peer_requested->start_block;
      if(peer_requested->last == peer_requested->end_block) {
         peer_requested.reset();
      }
      try {
         signed_block_ptr sb = cc.fetch_block_by_number(num);
         if(sb) {
            enqueue( *sb, trigger_send);
            return true;
         }
      } catch ( ... ) {
         wlog( "write loop exception" );
      }
      return false;
   }

num变量为peer_requested成员的last字段的值,表示当前需要同步的区块数。每次调用enqueue_sync_block函数,则该字段加一,即发送下一个区块给本地节点。然后调用fetch_block_by_number函数获取自己的第num个区块,并构造signed_block消息,然后放入到消息队列,这样就把本地节点请求的一个区块发送给了本地节点。

  boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) {
    try {
      ········//省略代码
        while (conn->out_queue.size() > 0) {
            conn->out_queue.pop_front();
        }
        conn->enqueue_sync_block();    ///同步下一个区块
        conn->do_queue_write();
      }
      ······ //省略代码
   }

在net_plugin插件处理消息队列的时候,会在异步发送消息的回调函数里,发送下一个区块给本地节点。

3、接收signed_block消息

本地节点接收远程节点发送过来的signed_block消息,消息内容为区块详细数据

   void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
      controller &cc = chain_plug->chain();
      block_id_type blk_id = msg.id();
      uint32_t blk_num = msg.block_num();
      fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));
      c->cancel_wait();

      try {
         //查看本地是否存在该id的区块
         if( cc.fetch_block_by_id(blk_id)) {
            sync_master->recv_block(c, blk_id, blk_num);
            return;
         }
      } catch( ...) {
         // should this even be caught?
         elog("Caught an unknown exception trying to recall blockID");
      }

      dispatcher->recv_block(c, blk_id, blk_num);
      fc::microseconds age( fc::time_point::now() - msg.timestamp);
      peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",
              ("n",blk_num)("age",age.to_seconds()));

      go_away_reason reason = fatal_other;
      try {
        //写入区块数据,accept_block函数用到了boost库里面的信号插槽signal2库,这里没有分析清楚。
         signed_block_ptr sbp = std::make_shared<signed_block>(msg);
         chain_plug->accept_block(sbp); //, sync_master->is_active(c));
         reason = no_reason;
      } catch( const unlinkable_block_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         reason = unlinkable;
      } catch( const block_validate_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "block_validate_exception accept block #${n} syncing from ${p}",("n",blk_num)("p",c->peer_name()));
         reason = validation;
      } catch( const assert_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));
      } catch( const fc::exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name()));
         reason = no_reason;
      } catch( ...) {
         peer_elog(c, "bad signed_block : unknown exception");
         elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name()));
      }

      update_block_num ubn(blk_num);
      if( reason == no_reason ) {
         for (const auto &recpt : msg.transactions) {
            auto id = (recpt.trx.which() == 0) ? recpt.trx.get<transaction_id_type>() : recpt.trx.get<packed_transaction>().id();
            auto ltx = local_txns.get<by_id>().find(id);
            if( ltx != local_txns.end()) {
               local_txns.modify( ltx, ubn );
            }
            auto ctx = c->trx_state.get<by_id>().find(id);
            if( ctx != c->trx_state.end()) {
               c->trx_state.modify( ctx, ubn );
            }
         }
         sync_master->recv_block(c, blk_id, blk_num);
      }
      else {
         sync_master->rejected_block(c, blk_num);
      }
   }

signed_block 消息内容
signed_block 区块报文内容

0040             b9 00 00 00 07 dc f9 5c 45 00 00 00 00 00   .F¹....Üù\E.....
0050   ea 30 55 00 00 00 00 00 01 40 51 47 47 7a b2 f5   ê0U......@QGGz²õ
0060   f5 1c da 42 7b 63 81 91 c6 6d 2c 59 aa 39 2d 5c   õ.ÚB{c..Æm,Yª9-\
0070   2c 98 07 6c b0 00 00 00 00 00 00 00 00 00 00 00   ,..l°...........
0080   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0090   00 00 00 00 00 e0 24 4d b4 c0 2d 68 ae 64 de c1   .....à$M´À-h®dÞÁ
00a0   60 31 0e 24 7b b0 4e 5c b5 99 af b7 c1 47 10 fb   `1.${°N\µ.¯·ÁG.û
00b0   f3 f4 57 6c 0e 00 00 00 00 00 00 00 20 60 15 f0   óôWl........ `.ð
00c0   39 e2 fd d0 df b2 31 6f ea 28 67 90 c6 b1 55 4f   9âýÐß²1oê(g.ƱUO
00d0   28 5a 54 e7 d2 5d b3 ea 91 ef 2e 8c 11 0a 57 e2   (ZTçÒ]³ê.ï....Wâ
00e0   8e fb ff e0 92 c0 e0 2d 92 f2 88 5e 72 48 43 b4   .ûÿà.Àà-.ò.^rHC´
00f0   a5 5f 29 0e 20 9f 87 1f 41 bb 39 3c 84 00 00      ¥_). ...A»9<...:

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E5%85%AD%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--sync_request_message%E4%B8%8Esigned_block.md

分析nodeos的流程

本篇笔记主要分析nodeos程序的流程

1、代码所在路径(yourpath换成你的路径)
yourpath/eos/programs/nodeos/main.cpp
2、首先main函数中的代码并不长,可以通过五个类来学习这段代码。

      app().set_version(eosio::nodeos::config::version);
      app().register_plugin<history_plugin>();

      auto root = fc::app_path();
      app().set_default_data_dir(root / "eosio/nodeos/data" );
      app().set_default_config_dir(root / "eosio/nodeos/config" );
      http_plugin::set_defaults({
         .address_config_prefix = "",
         .default_unix_socket_path = "",
         .default_http_port = 8888
      });
      if(!app().initialize<chain_plugin, http_plugin, net_plugin, producer_plugin>(argc, argv))
         return INITIALIZE_FAIL;
      initialize_logging();
      ilog("nodeos version ${ver}", ("ver", app().version_string()));
      ilog("eosio root is ${root}", ("root", root.string()));
      app().startup();
      app().exec();

上面的代码,均是通过调用app()的成员函数来实现的。我们很明显的看到main函数的生命周期和app()返回的对象是完全相同的。所以,第一个分析的就是app()返回的类对象。

1、application类(单例模式)

查看app()函数,发现其返回的是一个application类对象的一个引用,对象是通过application类的静态方法instance创建的。instance方法创建一个application类对象,并返回其引用。由此实现一个单例模式,每次调用app()仅创建一个对象。
application& app() { return application::instance(); }
application& application::instance() { static application _app; return _app; }
main函数中出现的set_versionset_default_data_dirset_default_config_dir均为简单的成员函数,这里略过不写。
主要分析一下其他几个函数:register_plugininitializestartupexec
register_plugin函数是一个模板函数,功能是对插件进行注册。注册即创建一个插件对象,并保存在application对象的plugins变量里面。注册插件的时候是需要注册其依赖插件的。比如p2p插件依赖于chain插件,那么在注册p2p插件时,也需要注册chain插件。其中获取请求插件,采用了一个宏实现了递归获取的方式,后续在plugins类中详细说明。

 template<typename Plugin>
    auto& register_plugin() {
    auto existing = find_plugin<Plugin>();   
    if(existing)
         return *existing;
    auto plug = new Plugin();
    plugins[plug->name()].reset(plug);
    plug->register_dependencies(); // 比较重要的地方,调用的是plugins类中的函数,后续分析。
    return *plug;
 }

initialize函数是一个变参模板函数,功能是对插件进行初始化(调用每个插件的initialize方法)。内部将变参参数初始化为一个vector向量,并调用initialize_impl函数来具体实现。

  //application.hpp
  template<typename... Plugin>
     bool                 initialize(int argc, char** argv) {
     return initialize_impl(argc, argv, {find_plugin<Plugin>()...});
  }
 //application.cpp  调用插件的initialize方法。
  for (auto plugin : autostart_plugins)
       if (plugin != nullptr && plugin->get_state() == abstract_plugin::registered)
           plugin->initialize(options);

startup函数很简单,功能用来启动初始化过的插件,内部调用每个插件的startup函数来启动。
for (auto plugin : initialized_plugins) plugin->startup();
exec函数使用了boost::asio::io_service io服务,在每个启动的插件线程里使用异步io的地方,均使用的是application对象的io服务,exec函数创建了异步IO异常终止的信号,并对其做了资源释放处理。

void application::exec() {
   std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
   sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigint_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
   sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigterm_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
   sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigpipe_set->cancel();
   });

   io_serv->run();

   shutdown(); /// perform synchronous shutdown
}

2、abstract_plugin类

通过对application类的几个函数的分析,发现初始化启动插件真正的实现,是通过插件自身的initialize方法和startup方法来实现的,下面主要分析其他四个类--插件类。这四个类可以用一个图来表示:
插件类示意图
其中abstract_plugin是虚基类,plugin是它的子类,http_plugin是plugin的子类,http_plugin_impl包含在http_plugin中。即前三个类是继承关系,后面一个类是包含关系。
回到上面遗留的问题--“插件是如何注册其依赖插件的”。
plug->register_dependencies();
父类指针指向的是子类对象,该方法在plugin类中实现。

  virtual void register_dependencies() {
       static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
 }

plugin_requires函数并非Plugin类的成员函数,是其子类的成员函数,所以父类指针需要转换为子类指针,才可以调用。该函数由一个宏来实现,并实现了递归调用。(BOOST_PP_SEQ_FOR_EACH宏的作用,将第三个参数序列分别与第二个参数进行按照第一个参数的方式进行拼接,)
APPBASE_PLUGIN_REQUIRES((chain_plugin))

//宏代码:
#define APPBASE_PLUGIN_REQUIRES_VISIT( r, visitor, elem ) \
  visitor( appbase::app().register_plugin<elem>() ); 

#define APPBASE_PLUGIN_REQUIRES( PLUGINS )                               \
   template<typename Lambda>                                           \
   void plugin_requires( Lambda&& l ) {                                \
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, PLUGINS ) \
   }
//-----------------------------------------展开结果-------------------------------------------
//宏展开:
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, (chain_plugin) ) 
   }
//继续展开
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      l( appbase::app().register_plugin<chain_plugin>() );
   }

所以plugin_requires函数为http_plugin类的成员函数,参数为一个lambda表达式。调用过程为static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});,传入的表达式为[&](auto& plug){}。
所以调用此表达式,参数为appbase::app().register_plugin()。使用宏的方式实现成员函数的递归调用,之前没有遇到过这种写法,很奇怪。
再来看abstract_plugin类,此类很简单,代码很少,实现了插件的初始化、启动、停止等几个虚方法,是一个虚基类。具体代码均有其派生类的多态实现,所以下面分析另外三个类。

     virtual void set_program_options( options_description& cli, options_description& cfg ) = 0;
     virtual void initialize(const variables_map& options) = 0;
     virtual void startup() = 0;
     virtual void shutdown() = 0;

3、plugin类

plugin类是eos各个插件的父类,保存了插件的状态(注册、启动等),实现了初始化、启动等接口,内部逻辑是通过子类来实现的。

  virtual void register_dependencies() {
            static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
         }

         virtual void initialize(const variables_map& options) override {
            if(_state == registered) {
               _state = initialized;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });
               static_cast<Impl*>(this)->plugin_initialize(options);
               //ilog( "initializing plugin ${name}", ("name",name()) );
               app().plugin_initialized(*this);
            }
            assert(_state == initialized); /// if initial state was not registered, final state cannot be initiaized
         }

         virtual void startup() override {
            if(_state == initialized) {
               _state = started;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.startup(); });
               static_cast<Impl*>(this)->plugin_startup();
               app().plugin_started(*this);
            }
            assert(_state == started); // if initial state was not initialized, final state cannot be started
         }

4、net_plugin类(同级插件:http_plugin、chain_plugin等)

net_plugin类是plugin类的子类,该类主要实现了p2p插件的插件配置、初始化、启动、停止、广播区块的方法,但其内部实现均是通过的net_plugin_impl类的方法来完成的,net_plugin类包含一个net_plugin_impl类的实例(每个插件都是类似的结构)。

   class net_plugin : public appbase::plugin<net_plugin>
   {
      public:
        net_plugin();
        virtual ~net_plugin();

        APPBASE_PLUGIN_REQUIRES((chain_plugin))
        virtual void set_program_options(options_description& cli, options_description& cfg) override;

        void plugin_initialize(const variables_map& options);
        void plugin_startup();
        void plugin_shutdown();

        void   broadcast_block(const chain::signed_block &sb);

        string                       connect( const string& endpoint );
        string                       disconnect( const string& endpoint );
        optional<connection_status>  status( const string& endpoint )const;
        vector<connection_status>    connections()const;

        size_t num_peers() const;
      private:
        std::unique_ptr<class net_plugin_impl> my; 
   };

plugin_initialize 初始化插件,其本质是实例化net_plugin_impl。my为net_plugin_impl类对象的指针。

  void net_plugin::plugin_initialize( const variables_map& options ) {
      ilog("Initialize net plugin");
      try {
         // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

plugin_startup函数启动了一个p2p节点网络。包括1、设置监听循环,对其他节点发送过来的消息进行响应。2、根据配置文件中的seed节点信息,连接到其他节点,并发送消息请求,同步区块等消息(详细笔记写在下一篇)。通信用到的消息类型共分为如下几种:

  using net_message = static_variant<handshake_message,
                 chain_size_message,
                 go_away_message,
                 time_message,
                 notice_message,
                 request_message,
                 sync_request_message,
                 signed_block,
                 packed_transaction>;

5、net_plugin_impl类

该类涉及到的是核心业务的具体实现。也是最复杂的一个类,下一篇笔记重点写下net插件的学习过程。
net_plugin_impl类通信使用的是boost::asio异步通信的库。该类成员变量包括了当前连接的p2p节点对象、区块同步管理对象、链id、节点id、网络通信用的相关变量。

   class net_plugin_impl {
   public:
      unique_ptr<tcp::acceptor>        acceptor;
      tcp::endpoint                    listen_endpoint;
      string                           p2p_address;
      uint32_t                         max_client_count = 0;
      uint32_t                         max_nodes_per_host = 1;
      uint32_t                         num_clients = 0;

      vector<string>                   supplied_peers;
      vector<chain::public_key_type>   allowed_peers; ///< peer keys allowed to connect
      std::map<chain::public_key_type,
               chain::private_key_type> private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes

      enum possible_connections : char {
         None = 0,
            Producers = 1 << 0,
            Specified = 1 << 1,
            Any = 1 << 2
            };
      possible_connections             allowed_connections{None};

      connection_ptr find_connection( string host )const;

      std::set< connection_ptr >       connections;               // 已连接的p2p seed节点 指针集合
      bool                             done = false;
      unique_ptr< sync_manager >       sync_master;               // 区块同步管理类指针
      unique_ptr< dispatch_manager >   dispatcher;
      .......
}

net_plugin插件的详细内容,下一篇笔记详细写。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%8C%EF%BC%89%E5%88%86%E6%9E%90nodeos%E7%9A%84%E6%B5%81%E7%A8%8B.md

net_plugin插件代码分析

上一篇笔记中,写了nodeos的执行流程。nodeos使用的是一种插件体系,业务代码分布在一个个的插件类中,然后分析了插件类共的继承关系。本篇笔记,就从其中的net插件入手来分析p2p模块的功能,也就是分析学习net_plugin_impl类。同之前的笔记一样,从net插件的生命周期,初始化、启动、停止来分析,并重点分析p2p模块的运行状态。
一个区块链系统的p2p模块,应该包括以下几个职能:
(1)、从对等的节点那里,同步区块数据。
(2)、发送交易给其他节点进行验证。
(3)、验证其他节点发送过来的交易。
(3)、如果自己生产的区块,要发送区块给其他节点。
(4)、验证其他节点发送过来的区块。

1、net_plugin类的plugin_initialize方法(初始化)

此方法主要是用来使用命令行参数或者配置文件中的参数来配置net_plugin_impl类,该插件的业务主要在net_plugin_impl类中实现。下图为执行完plugin_initialize方法后的net_plugin_impl类对象
net_plugin_impl初始化之后

 // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

         my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
         my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
         my->txn_exp_period = def_txn_expire_wait;
         my->resp_expected_period = def_resp_expected_wait;
         my->dispatcher->just_send_it_max = options.at( "max-implicit-request" ).as<uint32_t>();
         my->max_client_count = options.at( "max-clients" ).as<int>();
         my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
         my->num_clients = 0;
         my->started_sessions = 0;
        ······
         my->keepalive_timer.reset( new boost::asio::steady_timer( app().get_io_service()));
         my->ticker();        //定时器,给每个连接发送时间戳`

plugin_initialize函数主要是初始化net_plugin_impl对象。并每隔32s给连接的节点发送心跳数据(时间戳数据),其中send_time发送是消息类型为该模块下定义的几种类型之一。

   void net_plugin_impl::ticker() {
      keepalive_timer->expires_from_now (keepalive_interval);
      keepalive_timer->async_wait ([this](boost::system::error_code ec) {
            ticker ();
            if (ec) {
               wlog ("Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()));
            }
            for (auto &c : connections ) {
               if (c->socket->is_open()) {
                  c->send_time();  //遍历所有的连接,给每个连接定时发送时间戳message
               }
            }
         });
   }

2、net_plugin类的plugin_startup方法(启动运行)

plugin_startup方法是核心方法,包含了网络监听循环、接收数据处理、发送数据等内容。
等待连接部分:绑定、监听,在start_listen_loop函数里,等待其他节点的连接。通过boost::asio实现异步IO,不会阻塞。

      if( my->acceptor ) {
         //使用tcp:v4的协议 打开acceptor接收器   
         my->acceptor->open(my->listen_endpoint.protocol());
         //设置地址复用 Address already in use
         my->acceptor->set_option(tcp::acceptor::reuse_address(true));
         try {
           //绑定 
           my->acceptor->bind(my->listen_endpoint);
         } catch (const std::exception& e) {
           ilog("net_plugin::plugin_startup failed to bind to port ${port}",
             ("port", my->listen_endpoint.port()));
           throw e;
         }
         //监听
         my->acceptor->listen();
         ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count));
         //接受连接 并处理发送过来的消息
         my->start_listen_loop();
      }

等待其他节点的连接

void net_plugin_impl::start_listen_loop( ) {
      //获取单例模式中的io服务,并用其创建一个通信套接字。为什么不重新创建一个io服务?   
      auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );
      acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
            if( !ec ) {
               uint32_t visitors = 0;     //统计共有多少个peer_addr变量为非空的连接
               uint32_t from_addr = 0;    //统计所有的连接里面,有几个是当前监听到的连接
               auto paddr = socket->remote_endpoint(ec).address();
               if (ec) {
                  fc_elog(logger,"Error getting remote endpoint: ${m}",("m", ec.message()));
               }
               else {
                  for (auto &conn : connections) {  //遍历当前节点的所有连接
                     if(conn->socket->is_open()) {
                        if (conn->peer_addr.empty()) {
                           visitors++;
                           boost::system::error_code ec;
                           if (paddr == conn->socket->remote_endpoint(ec).address()) {
                              from_addr++;
                           }
                        }
                     }
                  }
                  //修改当前有效连接数
                  if (num_clients != visitors) {
                     ilog ("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients));
                     num_clients = visitors;
                  }
                  //当前有效连接中不包含 新监听到的连接,则加入到有效连接里面,并启动一个会话
                  if( from_addr < max_nodes_per_host && (max_client_count == 0 || num_clients < max_client_count )) {
                     ++num_clients;
                     connection_ptr c = std::make_shared<connection>( socket );
                     connections.insert( c );
                     start_session( c ); //重要 启动一个会话

                  }
                  else {
                     if (from_addr >= max_nodes_per_host) {
                        fc_elog(logger, "Number of connections (${n}) from ${ra} exceeds limit",
                                ("n", from_addr+1)("ra",paddr.to_string()));
                     }
                     else {
                        fc_elog(logger, "Error max_client_count ${m} exceeded",
                                ( "m", max_client_count) );
                     }
                     socket->close( );
                  }
               }
            } else {
               elog( "Error accepting connection: ${m}",( "m", ec.message() ) );
               // For the listed error codes below, recall start_listen_loop()
               switch (ec.value()) {
                  case ECONNABORTED:
                  case EMFILE:
                  case ENFILE:
                  case ENOBUFS:
                  case ENOMEM:
                  case EPROTO:
                     break;
                  default:
                     return;
               }
            }
            //继续等待下一个连接
            start_listen_loop();
         });
   }

当接收到一个有效连接之后,开启一个会话,调用start_session方法,参数c为接受连接的套接字的指针,用来与连接到的节点收发数据。然后不断递归调用,接收下一个连接。其中start_session方法内部,主要是调用start_read_message( con )方法来处理消息的。所以我们需要重点查看start_read_message( con )函数。con和c指向的是同一个套接字。

void net_plugin_impl::start_read_message( connection_ptr conn ) {

      try {
         if(!conn->socket) {        //验证套接字是否有效
            return;
         }
         connection_wptr weak_conn = conn;      //当前connection对象的一个weak_ptr指针
            // 读取会递归调用,第一次读取时 outstanding_read_bytes未初始化,message_header_size初始化为4
         std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size;

         if (use_socket_read_watermark) {             //一种读取方式,根据node启动时的配置,水印优化读取??? 默认未开启
            const size_t max_socket_read_watermark = 4096;
            std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
            boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
            conn->socket->set_option(read_watermark_opt);
         }

         auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
            if (ec || bytes_transferred >= minimum_read ) {
               return 0;
            } else {
               return minimum_read - bytes_transferred;
            }
         };
            //异步读取数据  pending_message_buffer为缓冲区
         boost::asio::async_read(*conn->socket,
            conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler,
            [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
               auto conn = weak_conn.lock();    //智能指针是否释放了
               if (!conn) {
                  return;
               }

               conn->outstanding_read_bytes.reset();   //重置outstanding_read_bytes 表示字节数

               try {
                  if( !ec ) {
                        //读取的字节数 大于可写入的字节数 错误
                     if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
                        elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
                             ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
                     }
                     EOS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");
                     // 根据读取的字节数,扩展buffer
                     conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
                     while (conn->pending_message_buffer.bytes_to_read() > 0) { // buffer里面可读的字节数
                        uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
                        //如果buffer里面的字节数小于 4个字节
                        if (bytes_in_buffer < message_header_size) {
                           conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer);
                           break;
                        } else {
                           uint32_t message_length;
                           // 返回当前读取的指针位置
                           auto index = conn->pending_message_buffer.read_index();
                           // 返回消息长度
                           conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
                           // 消息长度过长或为0
                           if(message_length > def_send_buffer_size*2 || message_length == 0) {
                              boost::system::error_code ec;
                              elog("incoming message length unexpected (${i}), from ${p}", ("i", message_length)("p",boost::lexical_cast<std::string>(conn->socket->remote_endpoint(ec))));
                              close(conn);
                              return;
                           }

                           auto total_message_bytes = message_length + message_header_size;
                              //读取完一条消息
                           if (bytes_in_buffer >= total_message_bytes) {
                              conn->pending_message_buffer.advance_read_ptr(message_header_size);
                              if (!conn->process_next_message(*this, message_length)) {
                                 return;
                              }
                           } else {
                                 //未读取到某个类型消息结尾 循环重新读取
                              auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
                              auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
                              if (outstanding_message_bytes > available_buffer_bytes) {
                                 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
                              }

                              conn->outstanding_read_bytes.emplace(outstanding_message_bytes);
                              break;
                           }
                        }
                     }
                     start_read_message(conn);
                  } else {
                     auto pname = conn->peer_name();
                     if (ec.value() != boost::asio::error::eof) {
                        elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) );
                     } else {
                        ilog( "Peer ${p} closed connection",("p",pname) );
                     }
                     close( conn );
                  }
               }
               catch(const std::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data from ${p} ${s}",("p",pname)("s",ex.what()));
                  close( conn );
               }
               catch(const fc::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data ${s}", ("p",pname)("s",ex.to_string()));
                  close( conn );
               }
               catch (...) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog( "Undefined exception hanlding the read data from connection ${p}",( "p",pname));
                  close( conn );
               }
            } );
      } catch (...) {
         string pname = conn ? conn->peer_name() : "no connection name";
         elog( "Undefined exception handling reading ${p}",("p",pname) );
         close( conn );
      }
   }

处理接收到的数据的函数比较长,基本重要的地方都标注了注释。该函数对接收数据的处理,主要是循环接收数据,并识别为要处理的消息类型,在eos系统下,通信的消息类型共分为如下几种,每种消息重载了一个handle_message函数来处理。在process_next_message函数里面进行消息分发。

handshake_message, 握手消息类型
chain_size_message, 未使用
go_away_message, 退出连接消息类型
time_message, 时间戳消息类型
notice_message, 通知消息类型,在区块同步中,该类型包含了区块状态等信息
request_message, 同步区块
sync_request_message, 同步区块
signed_block, 区块详细数据
packed_transaction 打包交易

bool connection:: process_next_message(net_plugin_impl& impl, uint32_t message_length) {
      try {
         // If it is a signed_block, then save the raw message for the cache
         // This must be done before we unpack the message.
         // This code is copied from fc::io::unpack(..., unsigned_int)
         auto index = pending_message_buffer.read_index();
         uint64_t which = 0; char b = 0; uint8_t by = 0;
         do {
            pending_message_buffer.peek(&b, 1, index);
            which |= uint32_t(uint8_t(b) & 0x7f) << by;
            by += 7;
         } while( uint8_t(b) & 0x80 && by < 32);  //如果是block,是签名的,需要先验证签名,再解压,其他消息类型随意。

         if (which == uint64_t(net_message::tag<signed_block>::value)) { // 验证签名 读取下一个消息
            blk_buffer.resize(message_length);
            auto index = pending_message_buffer.read_index();
            pending_message_buffer.peek(blk_buffer.data(), message_length, index);
         }
         auto ds = pending_message_buffer.create_datastream();
         net_message msg;
         fc::raw::unpack(ds, msg);              //解压缩message消息 
         msgHandler m(impl, shared_from_this() );
         msg.visit(m);        //调用的是net_plugin_impl 的成员函数handle_message
      } catch(  const fc::exception& e ) {
         edump((e.to_detail_string() ));
         impl.close( shared_from_this() );
         return false;
      }
      return true;
   }

重载的消息处理函数(具体发送的数据类型,下一篇笔记在详细写,目前还没有调试明白)

      void handle_message( connection_ptr c, const notice_message &msg);
      void handle_message( connection_ptr c, const request_message &msg);
      void handle_message( connection_ptr c, const sync_request_message &msg);
      void handle_message( connection_ptr c, const signed_block &msg);
      void handle_message( connection_ptr c, const packed_transaction &msg);

除了监听等待连接之外,该插件启动后也会向其他节点发送数据,发送数据部分:

      my->start_monitors();

      for( auto seed_node : my->supplied_peers ) {
         connect( seed_node );
      }

      if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end())
         logger = fc::get_logger_map()[logger_name];

start_monitors启动两个监控,监控新加入的连接,监控过期的交易,并移除(此种方式没有太理解,后面分析到chain插件的时候,再回过头来看)。

void net_plugin_impl::start_monitors() {
      connector_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      transaction_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      start_conn_timer(connector_period, std::weak_ptr<connection>());
      start_txn_timer();
   }

之后for循环,连接到seed节点。connect函数重载了两个,一个接受节点信息为参数,另一个实现具体业务逻辑。

   void net_plugin_impl::connect( connection_ptr c ) {
      if( c->no_retry != go_away_reason::no_reason) {
         fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( c->no_retry )));
         return;
      }

      auto colon = c->peer_addr.find(':');

      if (colon == std::string::npos || colon == 0) {
         elog ("Invalid peer address. must be \"host:port\": ${p}", ("p",c->peer_addr));
         for ( auto itr : connections ) {
            if((*itr).peer_addr == c->peer_addr) {
               (*itr).reset();
               close(itr);
               connections.erase(itr);
               break;
            }
         }
         return;
      }

      auto host = c->peer_addr.substr( 0, colon );
      auto port = c->peer_addr.substr( colon + 1);
      idump((host)(port));
      tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
      connection_wptr weak_conn = c;
      // Note: need to add support for IPv6 too
    //异步解析seed节点
      resolver->async_resolve( query,
                               [weak_conn, this]( const boost::system::error_code& err,
                                          tcp::resolver::iterator endpoint_itr ){
                                  auto c = weak_conn.lock();
                                  if (!c) return;
                                  if( !err ) {
                                     connect( c, endpoint_itr );  //调用重载函数,实现内部逻辑
                                  } else {
                                     elog( "Unable to resolve ${peer_addr}: ${error}",
                                           (  "peer_addr", c->peer_name() )("error", err.message() ) );
                                  }
                               });
   }
  //重载之后的connect,实现了异步连接到其他节点
   void net_plugin_impl::connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
      if( c->no_retry != go_away_reason::no_reason) {
         string rsn = reason_str(c->no_retry);
         return;
      }
      auto current_endpoint = *endpoint_itr;
      ++endpoint_itr;
      c->connecting = true;
      connection_wptr weak_conn = c;
      c->socket->async_connect( current_endpoint, [weak_conn, endpoint_itr, this] ( const boost::system::error_code& err ) {
            auto c = weak_conn.lock();
            if (!c) return;
            if( !err && c->socket->is_open() ) {
               if (start_session( c )) {
                  c->send_handshake ();   //连接上之后,给其他节点发送握手消息,可见握手消息是非常重要的,下一步重要调试握手消息的报文内容
               }
            } else {
               if( endpoint_itr != tcp::resolver::iterator() ) {
                  close(c);
                  connect( c, endpoint_itr );
               }
               else {
                  elog( "connection failed to ${peer}: ${error}",
                        ( "peer", c->peer_name())("error",err.message()));
                  c->connecting = false;
                  my_impl->close(c);
               }
            }
         } );
   }

//握手消息报文结构
struct handshake_message {
uint16_t network_version = 0; ///< incremental value above a computed base
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time;
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
block_id_type head_id;
string os;
string agent;
int16_t generation;
};

3、net_plugin类的plugin_shutdown方法(停止)

plugin_shutdown方法主要功能是关闭监听接收器,循环关闭每个连接,释放资源,代码较少,如下所示:

   void net_plugin::plugin_shutdown() {
      try {
         ilog( "shutdown.." );
         my->done = true;
         if( my->acceptor ) {
            ilog( "close acceptor" );
            my->acceptor->close();

            ilog( "close ${s} connections",( "s",my->connections.size()) );
            auto cons = my->connections;
            for( auto con : cons ) {
               my->close( con);
            }

            my->acceptor.reset(nullptr);
         }
         ilog( "exit shutdown" );
      }
      FC_CAPTURE_AND_RETHROW()
   }

转载自:
https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%B8%89%EF%BC%89net_plugin%E6%8F%92%E4%BB%B6%E8%AF%A6%E7%BB%86%E5%88%86%E6%9E%90.md

EOS区块生产和区块同步

1 概述

本文所述基于EOSv1.2.3。
EOS区块生产和同步主要涉及共识算法DPOS和aBFT,其源码实现主要涉及chain_plugin、producer_plugin、net_plugin和controller4个模块以及eosio.system智能合约等。

2 共识算法

EOS的区块生产,遵循DPoS(Delegated Proof-of-Stake)机制。
简单来说,所有拥有EOS token的人都是EOS区块生产的参与者。
任何人都可以申请出块。
任何人都可以选择不直接完成出块工作,而是将自己所持token抵押给出块申请者(PoS),委托(Delegate)他们完成出块工作。
最终,按照token比例,选出前21名出块者(BP,Block Producer),由他们代理出块。

ps:上述部分,EOS选举主要在eosio.system智能合约中实现,设置生产者队列函数为update_elected_producers()。笔者工作EOS不用于公链而是联盟链,不需要选举,故直接调用eosio.bios中的setprods()函数设置。

21名BP依次轮流出块,不像比特币等,同一时刻所有BP是竞争关系。每个BP轮到自己出块时,连续出块12个,每个块耗时500ms。

ps:上述这一块逻辑在producer_plugin中实现。

至此,我们可以简单的说,EOS的共识机制叫DPoS(More than that)。

EOS中每个区块被生产出来后,需要所有BP的确认,按照BFT共识机制确认后,才会变成不可逆的状态,在此之前都是reversible block。因此,需要进行区块在网络中的同步。
数据一致性是分布式系统数据同步的重要话题,BFT(Byzantine fault tolerance)是其中的一种代表共识机制,或者称它为算法。

如上图所示,BFT共识机制主要有以下步骤:

  • 提案1个block;
  • 进入Pre-commitment阶段,所有BP确认该提案;
  • 进入Commitment阶段,所有BP收到2/3+1或更多Pre-commit后,发送Commit;
  • 当某个BP收到2/3+1或更多Commit后,该block即不可逆。
  • 该算法适用于恶意节点不超过1/3的场景,可以保证达到最终一致性。
至此,我们可以说:EOS的共识机制叫DPoS & BFT。

BFT算法对每个区块需要发送至少2条消息,EOS对此进行了优化。BM将其称为pipelined:管道式、流水线式。
即每次生产一个区块,由于本身就需要将该区块广播到p2p网络中供其他节点同步,因此将Pre-Commit和Commit数据与该区块数据一并广播出去。
如下图所示:

上半部分描述了区块number不可逆的过程,下半部分描述了每个区块由谁Pre-Commit,由谁Commit。
假设有ABCD四个BP,每个BP每次出1个区块。BFT要求2/3+1个节点确认,即2/3*4+1=3。
假设每轮都是按照A、B、C、D的顺序出块。

  • A 出块1,Pre-Commit不可逆块为0,Commit不可逆块为0
  • B 出块2,Pre-Commit不可逆块为0,Commit不可逆块为0
  • C 出块3,ABC3个节点Pre-Commit区块1,故Pre-Commit不可逆块为1,Commit不可逆块为0
  • D 出块4,BCD3个节点Pre-Commit区块2,故Pre-Commit不可逆块为2,Commit不可逆块为0
  • A 出块5,CDA3个节点Pre-Commit区块3,故Pre-Commit不可逆块为3,CDA3个节点Commit区块1,故Commit不可逆块为1
  • B 出块6,DAB3个节点Pre-Commit区块4,故Pre-Commit不可逆块为4,DAB3个节点Commit区块2,故Commit不可逆块为2
  • 以此类推

基于上述优化,p2p网络的带宽压力、区块哈希验证频次等大幅降低。
但带来的问题是,出块、不可逆糅合在了一起,而非并行的。
BM的解释是:虽然如此,相对其它平台(比特币,以太坊)的机制来说,一个区块从生产出来到变为不可逆,其时间不算太久,且实际上每两个不可逆块之间的间隙时间非常短。长远考虑,只有这样,才能实现更好的扩展性。

原文地址:DPOS BFT— Pipelined Byzantine Fault Tolerance

至此,我们应当说:EOS的共识机制叫 DPOS BFT— Pipelined Byzantine Fault Tolerance。

3 chain_plugin

chain_plugin插件主要功能为:

  • 检查启动参数,判断是否需要replay区块链
  • 初始化和拉起Controller模块
  • 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥
  • 向用户提供链的其它set/get接口,诸如:get_account、get_transaction_id……

下图为chain_plugin核心代码逻辑

3.1 检查启动参数,判断是否需要replay()区块链

chain_plugin启动前,先调用plugin_initialize()函数,该函数检查启动参数。除了一些基本配置外,还检查replay相关参数:

  • export-reversible-blocks:会将原来的reversible_block进行导出备份,该参数单独优先检查
  • delete-all-blocks:删除所有旧的blocks,重头开始replay区块链,包括清空State DB和Block log
  • hard-replay-blockchain:清空State DB,如果有配置truncate-at-block,按照[first,truncate-at-block]区间,否则[first,end]区间重新加载Block log中的区块,即该区间的区块仍然生效不replay,其它区块全部replay。如果reversible db有数据,一并尝试恢复
  • replay-blockchain:清空State DB,如果有配置fix-reversible_blocks,则尝试恢复reversible DB
  • fix-reversible_blocks:尝试恢复reversible DB
  • import-reversible_blocks:旧的不要,用导入的替代

3.2 初始化和拉起Controller模块

当plugin_initialize()上述操作处理完后,根据最终配置参数,对Controller模块进行初始化(emplace()-->构造函数),然后在调用plugin_startup()时调用了Controller::startup,完成了Controller模块的启动

3.3 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥

  • accepted_block:当net_plugin收到一个区块,或者用户手动push的区块(可能仅仅是一个接口,该场景应当不常见),chain_plugin通过该方法转发区块到producer_plugin中
  • accepted-transactions:同理。需注意,cleos/http的push transaction和push action等,均在此处入链。

3.4 向用户提供链的其它set/get接口

实现了http接口中的一部分接口。

4 producer_plugin

producer_plugin插件主要功能为:

  • 当本节点处于生产阶段时,与chain_plugin::controller交互,调用controller相关接口进行区块生产;
  • 当本节点处于同步阶段时,接收net_plugin插件收到的区块,调用controller相关接口进行区块同步。

producer_plugin核心函数有:

  • schedule_production_loop:递归调用,生产区块
  • start_block:初始化当前区块数据
  • produce_block:打包、签名、提交区块入链
  • on_incoming_block:接收net_plugin收到的区块,入本节点链
  • on_incoming_transaction_async:接收net_plugin收到的交易,入本节点链
  • on_block:本节点区块入链成功后,相关状态更新
  • on_irreversible_block:本节点区块不可逆后,相关状态更新

下图为producer_plugin生产、同步区块的核心代码逻辑:

4.1 生产区块

如上所示,producer_plugin启动后,调用schedule_production_loop()函数,该函数是插件的最核心的函数,是一个递归函数,负责无限循环出块。
EOS500ms出一个块,因此需要启动一个定时器_timer,逻辑如下:

  • 关闭之前的定时器:_timer.cancel()
  • 调用start_block()函数初始化新的区块信息。首先调用controller::abort_block()重置数据,然后调用controller::start_block(),根据上一个区块信息生成一个新的区块,最后将由于controller::abort_block()而未来得及入链的交易(unapplied_transactions)重新push_transaction()进新的区块。
  • 重启定时器_timer(),异步等待新区块截止时间,这期间内,EOS等待并接收用户的交易请求。
  • 如果用户有新交易,调用push_transaction()执行交易并保存到区块中。
  • 如果没有其它情况,定时器到期后,调用produce_block(),对该区块进行打包(finalize_block())、签名(sign_block()),然后提交(commit_block())到本节点区块链(fork_db)上,commit_block()会发送accepted_block信号给订阅者,这其中包括producer_plugin::on_block(),该函数进行相关数据更新。fork_db每次新增一个区块,就会检查是否有新的不可逆区块产生,如果有,发送irrerersible_block信号给订阅者,这其中包括producer_plugin::on_irreversible(),该函数进行相关数据更新。最后,再次递归调用schedule_production_loop()进行下一个区块生产。
  • 如果出现其它原因,如收到网络上发送过来的区块,且定时器未到期,则会转入区块同步逻辑,之前的所有执行会被重置,未来得及执行的交易会被备份到unapplied_transactions中。

4.2 同步区块

producer_plugin的主流程是schedule_production_loop(),其中定时器_timer会根据实际情况设置等待时间。如是轮到该节点生产区块,则每次等待的时间为:总时间(500ms)-已用时间。如果未轮到该节点生产区块,则计算下一次出块时间(more than 500ms),并启动定时器等待。
如果net_plugin插件或bnet_plugin插件收到网络上的区块,则需要同步。调用on_incoming_block()函数,该函数内部逻辑与生产区块逻辑类似,主要调用controller的abort_block(),start_block(),push_transaction(),finalize_block(),commit_block()。
注意,http和cleos提供了一些接口,其中包括push_transaction,push_block接口,其逻辑见上图,比较特殊,不做太多解释。
下图给出了区块同步的更详细的说明:

重点做以下解释:

  • fork_db:区块链,其中的区块尚未不可逆,当新块到达后,可能存在分叉。需要根据最长链原则,选出较长的一条链,较短的链被删除。
  • 每个新区块基于其上一个区块出块,根据该信息,可以对旧链fork_db的head block(HB)和新的区块(new HB)分别进行追溯,将旧链中的区块合并到新链上,然后删除旧链,保存新链。主要代码在fetch_branch_from()中实现,思想请参考:拉链法

5 net_plugin

net_plugin的主要功能如下:

  • 与其他节点建立连接;
  • 向网络中广播本节点区块;
  • 接收其他节点广播的区块;
  • 节点之间区块同步。

net_plugin的代码结构如下:

5.1 与其他节点建立连接

见上图:

  • net_plugin插件启动时,根据p2p-listen-endpoint成员变量(配置文件或命令行参数p2p-listen-endpoint参数(默认值0.0.0.0:9876)),调用tcp::acceptor的listen()启动监听,调用start_listen_loop()递归调用async_accept()异步等待网络连接请求(参考socket通信)。
  • 根据supplied_peers成员变量(配置文件或命令行参数p2p-peer-address(可多个)),逐一调用async_connect()进行连接请求。
  • 其他节点收到请求后回复,并调用start_session()建立会话。
  • 本节点收到回复得知成功后,亦调用start_session()建立会话。
  • start_session()包含两步:
  • 调用start_read_message()异步循环等待其他节点的消息;
  • 调用send_handshake()发送握手消息。
  • net_plugin插件中定义了9个消息类型,其中主要消息为:
  • handshake_message:握手消息
  • notice_message: 通知消息,主要在同步时使用,进行同步状态发送
  • sync_request_message:当本节点区块链的HB number小于对方节点LIB number时发送
  • request_message:当本节点区块链的HB number小于对方节点HB number时发送
  • signed_block_message:每个区块由该消息逐一发送

5.2 向网络中广播本节点区块

net_plugin插件在启动时,订阅了chain_plugin插件的accepted_block信号,该信号在区块被提交到本地待确认不可逆数据库(fork_db)中后发送。
收到该信号后,net_plugin插件向所有连接中的网络节点广播signed_block_message。

5.3 接收其他节点广播的区块

其他节点的net_plugin插件的start_read_message异步循环等待网络消息,收到其他节点signed_block_message后,会进行判断,如果本节点没有该区块且该区块合法,则保存到本地fork_db中,如果存在分叉,则按照最长链的原则,尝试进行合并,再启用该最长链。

5.4 节点之间区块同步

每个BP连续出块12个(12*0.5s=6s),每出一个块便立即广播,理想中各节点的区块链是实时同步的,然而由于网络原因,或者后加入的节点,其往往落后其他节点很多区块。因此涉及到大量区块的同步问题。
各节点每次建立连接时会发送 handshake_message ,该消息主要用于区块同步。每次握手进行一次区块同步状态判断,同步完成后会再次发送 handshake_message,循环进行判断。
同步状态有5中场景:

  • [State 0]双方HB id相同,id为数字摘要,如果相同说明HB完全一致,不需要同步,则Alice向Bob发送notice_message。
  • [State 1]如果Alice的区块链非常短,其HB number 竟然没有对方节点LIB number大,则Alice向Bob发送sync_request_message,附带参数(start,end)表示同步区间。其中start为Alice的LIB number,end为Bob的LIB number。
  • [State 2]与State 1相反,如果Alice发现Bob的HB number < Alice的LIB number,则发送notice_message让Bob主动来请求同步数据。
  • [State 3]如果Alice的HB number >Bob 的LIB number,但Alice的HB number < Bob的HB number,则也需要同步,Alice向Bob发送request_message,Bob收到消息后,从Bob的LIB开始,一直到Bob的HB,逐一发送区块。
  • [State 4]与State 3相反,则Alice发送notice_message让Bob主动来请求同步数据。
5.4.1 同步状态0

同步状态1见总图,由于较简单,不再赘述。

5.4.2 同步状态1


如上图所示:

  • Alice向Bob请求区块同步,总区间为[Alice LIB + 1, Bob LIB]。
  • 调用request_next_chunk()对总区间分组同步,默认大小为100个区块一组(配置文件或命令行参数sync-fetch-span)。
  • net_plugin::sync_manager子模块顺序选择一个网络节点,发送消息,并启动定时器_timer异步等待5秒。如果5秒后未收到对方的signed_block_message,则取消该请求(再次发送sync_request_message,但区间为[0,0]),并重新选择一个网络节点发送消息。
  • 如果5秒内收到对方节点的signed_block_message,取消_timer定时器,判断是否是自己想要的区块,保存后判断是否同步结束。
  • 如果尚未同步结束,重启_timer定时器等待;
  • 如果分组同步结束,再次调用request_next_chunk()请求下一个分组区块;
  • 如果总区间同步结束,向所有网络节点再次发送握手消息,继续进行同步状态判断。
  • 对方节点收到sync_request_message后,按请求区块区间,循环逐一发送区块。
5.4.3 同步状态2


如上图所示,Bob收到Alice的通知后,与Alice的同步状态1类型,调用相关函数进行同步。

5.4.4 同步状态3

如上图所示,Alice向Bob请求区块,Bob收到消息后,以区间[Bob LIB+1,BOB HB]循环逐一发送区块。
如果Bob的HB number 为0,则为异常,需要告知Alice。

5.4.5 同步状态4

如上图所示,该状态下,参考同步状态3。
另附消息发送函数enqueue()逻辑:

5.5 补充

net_plugin插件启动并建立连接后,会调用start_monitors(),一是监听网络连接状态,如果断开会重新尝试建立连接;二是监听交易时间,如果交易超时,则会将其移除入链队列。

6 controller

Controller模块位于/libraries/chain/下,是EOS区块入链的核心控制器,内容非常多,也非常重要。
Controller主要功能为:

  • 被chain_plugin初始化和启动
  • 对上提供区块和交易等相关接口
  • 对上提供区块和交易入链进度相关信号
  • 对下操作相关数据接口,进行数据管理

下图为Controller核心代码逻辑:

6.1 被chain_plugin初始化和启动

Controller模块由chain_plugin负责初始化和启动,chain_plugin根据启动参数启动Controller,从而对底层数据结构进行初始化操作。

6.2 对上提供区块和交易等相关接口

区块相关的接口有:

  • abort_block() :取消上一个正在生产的区块,其中的交易转到本次新区块中处理
  • start_block() :开始一个新区块生产,启动一个异步定时器等待交易被插入,定时器结束后开始打包等后续工作。此处还会进行BFT共识机制的处理。
  • finalize_block():本区块时间已到,打包区块
  • sign_block():对区块进行签名
  • commit_block():提交区块入fork_db,等待不可逆
  • push_block() : 主要用于初始化时或同步时插入区块,内部调用apply_block()函数以及上述几个函数,但没有异步定时器等代码。
  • pop_block():同步时fork_db发现分叉,需要对短链进行pop_block()
  • on_irreversible():区块不可逆时触发

交易相关的接口有:

  • push_transaction() : 插入交易
  • push_scheduled_transaction() : 插入延期的交易

其它接口:

  • set_proposed_producers() :更新BP列表

6.3 对上提供区块和交易入链进度相关信号

提供了一些进度信号,producer_plugin、mongodb_plugin、net_plugin等等会进行订阅,从而完成各自的功能。
相关信号有:

  • pre_accepted_block :调用push_block()(同步、刚启动时从数据库恢复),区块尚未add()到fork_db之前,先发送这个信号
  • accepted_block_header:调用push_block()或commit_block()(生产区块),区块被add()到fork_db后,发送该信号
  • accepted_block:调用commit_block(),区块被add()到fork_db后,发送该信号
  • irreversible_block:调用push_block()恢复数据时或on_irreversible()时,发送该信号
  • accepted_transaction:调用push_transaction()或push_scheduled_transaction()成功后,发送该信号
  • applied_transaction,同上

例如,mongodb_plugin收到accepted_block后会将数据写入mongodb。

6.4 对下操作相关数据接口,进行数据管理

数据结构包括如下:

  • pending :正在生产的区块,abort_block(),start_block()等均对它修改
  • unapplied_transactions:上一个区块未出块成功,则其交易在abort_block()时转存到这里,以便新的区块start_block()时重新push_transaction()
  • fork_db:区块生产完成后插入这里,可能存在分叉。当进行区块同步时,如果检测到分叉,则进行最长链的生成和选择。push_block()和commit_block()时调用add(),不可逆时或分叉时调用erase()
  • head : 指向fork_db的头块,用于生成下一个区块,以及进行快速比较等操作,fork_db变化则同时变化head
  • reversible_blocks:可逆的区块链。commit_block()时调用add(),on_ireeversible(),pop_block()时调用erase()
  • blog:不可逆的区块链,append only,on_irreversile()时调用append()
  • db : 不仅保存了区块,还保存了智能合约数据,账号数据等等其它数据。大部分数据修改的地方也会对其进行更新。

注意:

  • 区块广播时发送的是signed_block结构体,其继承signed_block_header-->block_header。
  • 各节点存储的数据还有更多信息,如DPOS+BFT相关的数据,存储在block_state-->block_header_state中。
  • signed_block中存储了所有交易的摘要:vector

7 eosio.system

主要实现了抵押RAM、申请BP、申请代理,投票、选举BP等功能。

转载自:https://my.oschina.net/u/4069047/blog/3005068