您正在查看: EOS-优秀转载 分类下的文章

同步区块--p2p通信--sync_request_message与signed_block

这篇笔记写同步区块过程中,发送的最后两种消息类型--sync_request_message与signed_block。上篇笔记写到远程节点将链的信息(不可逆区块数、最新区块数、同步方式等)作为notice_message方式发送给本地节点,本地节点接收到notice_message消息,向远程节点发送sync_request_message消息同步区块。

1、同步区块过程图示

本地节点从远程节点同步不可逆区块

2、sync_request_message消息接收处理

继续上篇笔记中的内容,远程节点接收到本地节点发送过来的sync_request_message消息之后,处理函数:

   void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
      if( msg.end_block == 0) {   //结束区块数是否为0
         c->peer_requested.reset();
         c->flush_queues();
      } else {
         c->peer_requested = sync_state( msg.start_block,msg.end_block,msg.start_block-1);
         c->enqueue_sync_block();
      }
   }

本地节点发送的同步消息,默认为1-100的区块数,则处理函数进入enqueue_sync_block

   bool connection::enqueue_sync_block() {
      controller& cc = app().find_plugin<chain_plugin>()->chain();
      if (!peer_requested)
         return false;
      uint32_t num = ++peer_requested->last;
      bool trigger_send = num == peer_requested->start_block;
      if(peer_requested->last == peer_requested->end_block) {
         peer_requested.reset();
      }
      try {
         signed_block_ptr sb = cc.fetch_block_by_number(num);
         if(sb) {
            enqueue( *sb, trigger_send);
            return true;
         }
      } catch ( ... ) {
         wlog( "write loop exception" );
      }
      return false;
   }

num变量为peer_requested成员的last字段的值,表示当前需要同步的区块数。每次调用enqueue_sync_block函数,则该字段加一,即发送下一个区块给本地节点。然后调用fetch_block_by_number函数获取自己的第num个区块,并构造signed_block消息,然后放入到消息队列,这样就把本地节点请求的一个区块发送给了本地节点。

  boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) {
    try {
      ········//省略代码
        while (conn->out_queue.size() > 0) {
            conn->out_queue.pop_front();
        }
        conn->enqueue_sync_block();    ///同步下一个区块
        conn->do_queue_write();
      }
      ······ //省略代码
   }

在net_plugin插件处理消息队列的时候,会在异步发送消息的回调函数里,发送下一个区块给本地节点。

3、接收signed_block消息

本地节点接收远程节点发送过来的signed_block消息,消息内容为区块详细数据

   void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
      controller &cc = chain_plug->chain();
      block_id_type blk_id = msg.id();
      uint32_t blk_num = msg.block_num();
      fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));
      c->cancel_wait();

      try {
         //查看本地是否存在该id的区块
         if( cc.fetch_block_by_id(blk_id)) {
            sync_master->recv_block(c, blk_id, blk_num);
            return;
         }
      } catch( ...) {
         // should this even be caught?
         elog("Caught an unknown exception trying to recall blockID");
      }

      dispatcher->recv_block(c, blk_id, blk_num);
      fc::microseconds age( fc::time_point::now() - msg.timestamp);
      peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",
              ("n",blk_num)("age",age.to_seconds()));

      go_away_reason reason = fatal_other;
      try {
        //写入区块数据,accept_block函数用到了boost库里面的信号插槽signal2库,这里没有分析清楚。
         signed_block_ptr sbp = std::make_shared<signed_block>(msg);
         chain_plug->accept_block(sbp); //, sync_master->is_active(c));
         reason = no_reason;
      } catch( const unlinkable_block_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         reason = unlinkable;
      } catch( const block_validate_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "block_validate_exception accept block #${n} syncing from ${p}",("n",blk_num)("p",c->peer_name()));
         reason = validation;
      } catch( const assert_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));
      } catch( const fc::exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name()));
         reason = no_reason;
      } catch( ...) {
         peer_elog(c, "bad signed_block : unknown exception");
         elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name()));
      }

      update_block_num ubn(blk_num);
      if( reason == no_reason ) {
         for (const auto &recpt : msg.transactions) {
            auto id = (recpt.trx.which() == 0) ? recpt.trx.get<transaction_id_type>() : recpt.trx.get<packed_transaction>().id();
            auto ltx = local_txns.get<by_id>().find(id);
            if( ltx != local_txns.end()) {
               local_txns.modify( ltx, ubn );
            }
            auto ctx = c->trx_state.get<by_id>().find(id);
            if( ctx != c->trx_state.end()) {
               c->trx_state.modify( ctx, ubn );
            }
         }
         sync_master->recv_block(c, blk_id, blk_num);
      }
      else {
         sync_master->rejected_block(c, blk_num);
      }
   }

signed_block 消息内容
signed_block 区块报文内容

0040             b9 00 00 00 07 dc f9 5c 45 00 00 00 00 00   .F¹....Üù\E.....
0050   ea 30 55 00 00 00 00 00 01 40 51 47 47 7a b2 f5   ê0U......@QGGz²õ
0060   f5 1c da 42 7b 63 81 91 c6 6d 2c 59 aa 39 2d 5c   õ.ÚB{c..Æm,Yª9-\
0070   2c 98 07 6c b0 00 00 00 00 00 00 00 00 00 00 00   ,..l°...........
0080   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0090   00 00 00 00 00 e0 24 4d b4 c0 2d 68 ae 64 de c1   .....à$M´À-h®dÞÁ
00a0   60 31 0e 24 7b b0 4e 5c b5 99 af b7 c1 47 10 fb   `1.${°N\µ.¯·ÁG.û
00b0   f3 f4 57 6c 0e 00 00 00 00 00 00 00 20 60 15 f0   óôWl........ `.ð
00c0   39 e2 fd d0 df b2 31 6f ea 28 67 90 c6 b1 55 4f   9âýÐß²1oê(g.ƱUO
00d0   28 5a 54 e7 d2 5d b3 ea 91 ef 2e 8c 11 0a 57 e2   (ZTçÒ]³ê.ï....Wâ
00e0   8e fb ff e0 92 c0 e0 2d 92 f2 88 5e 72 48 43 b4   .ûÿà.Àà-.ò.^rHC´
00f0   a5 5f 29 0e 20 9f 87 1f 41 bb 39 3c 84 00 00      ¥_). ...A»9<...:

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E5%85%AD%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--sync_request_message%E4%B8%8Esigned_block.md

分析nodeos的流程

本篇笔记主要分析nodeos程序的流程

1、代码所在路径(yourpath换成你的路径)
yourpath/eos/programs/nodeos/main.cpp
2、首先main函数中的代码并不长,可以通过五个类来学习这段代码。

      app().set_version(eosio::nodeos::config::version);
      app().register_plugin<history_plugin>();

      auto root = fc::app_path();
      app().set_default_data_dir(root / "eosio/nodeos/data" );
      app().set_default_config_dir(root / "eosio/nodeos/config" );
      http_plugin::set_defaults({
         .address_config_prefix = "",
         .default_unix_socket_path = "",
         .default_http_port = 8888
      });
      if(!app().initialize<chain_plugin, http_plugin, net_plugin, producer_plugin>(argc, argv))
         return INITIALIZE_FAIL;
      initialize_logging();
      ilog("nodeos version ${ver}", ("ver", app().version_string()));
      ilog("eosio root is ${root}", ("root", root.string()));
      app().startup();
      app().exec();

上面的代码,均是通过调用app()的成员函数来实现的。我们很明显的看到main函数的生命周期和app()返回的对象是完全相同的。所以,第一个分析的就是app()返回的类对象。

1、application类(单例模式)

查看app()函数,发现其返回的是一个application类对象的一个引用,对象是通过application类的静态方法instance创建的。instance方法创建一个application类对象,并返回其引用。由此实现一个单例模式,每次调用app()仅创建一个对象。
application& app() { return application::instance(); }
application& application::instance() { static application _app; return _app; }
main函数中出现的set_versionset_default_data_dirset_default_config_dir均为简单的成员函数,这里略过不写。
主要分析一下其他几个函数:register_plugininitializestartupexec
register_plugin函数是一个模板函数,功能是对插件进行注册。注册即创建一个插件对象,并保存在application对象的plugins变量里面。注册插件的时候是需要注册其依赖插件的。比如p2p插件依赖于chain插件,那么在注册p2p插件时,也需要注册chain插件。其中获取请求插件,采用了一个宏实现了递归获取的方式,后续在plugins类中详细说明。

 template<typename Plugin>
    auto& register_plugin() {
    auto existing = find_plugin<Plugin>();   
    if(existing)
         return *existing;
    auto plug = new Plugin();
    plugins[plug->name()].reset(plug);
    plug->register_dependencies(); // 比较重要的地方,调用的是plugins类中的函数,后续分析。
    return *plug;
 }

initialize函数是一个变参模板函数,功能是对插件进行初始化(调用每个插件的initialize方法)。内部将变参参数初始化为一个vector向量,并调用initialize_impl函数来具体实现。

  //application.hpp
  template<typename... Plugin>
     bool                 initialize(int argc, char** argv) {
     return initialize_impl(argc, argv, {find_plugin<Plugin>()...});
  }
 //application.cpp  调用插件的initialize方法。
  for (auto plugin : autostart_plugins)
       if (plugin != nullptr && plugin->get_state() == abstract_plugin::registered)
           plugin->initialize(options);

startup函数很简单,功能用来启动初始化过的插件,内部调用每个插件的startup函数来启动。
for (auto plugin : initialized_plugins) plugin->startup();
exec函数使用了boost::asio::io_service io服务,在每个启动的插件线程里使用异步io的地方,均使用的是application对象的io服务,exec函数创建了异步IO异常终止的信号,并对其做了资源释放处理。

void application::exec() {
   std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
   sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigint_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
   sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigterm_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
   sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigpipe_set->cancel();
   });

   io_serv->run();

   shutdown(); /// perform synchronous shutdown
}

2、abstract_plugin类

通过对application类的几个函数的分析,发现初始化启动插件真正的实现,是通过插件自身的initialize方法和startup方法来实现的,下面主要分析其他四个类--插件类。这四个类可以用一个图来表示:
插件类示意图
其中abstract_plugin是虚基类,plugin是它的子类,http_plugin是plugin的子类,http_plugin_impl包含在http_plugin中。即前三个类是继承关系,后面一个类是包含关系。
回到上面遗留的问题--“插件是如何注册其依赖插件的”。
plug->register_dependencies();
父类指针指向的是子类对象,该方法在plugin类中实现。

  virtual void register_dependencies() {
       static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
 }

plugin_requires函数并非Plugin类的成员函数,是其子类的成员函数,所以父类指针需要转换为子类指针,才可以调用。该函数由一个宏来实现,并实现了递归调用。(BOOST_PP_SEQ_FOR_EACH宏的作用,将第三个参数序列分别与第二个参数进行按照第一个参数的方式进行拼接,)
APPBASE_PLUGIN_REQUIRES((chain_plugin))

//宏代码:
#define APPBASE_PLUGIN_REQUIRES_VISIT( r, visitor, elem ) \
  visitor( appbase::app().register_plugin<elem>() ); 

#define APPBASE_PLUGIN_REQUIRES( PLUGINS )                               \
   template<typename Lambda>                                           \
   void plugin_requires( Lambda&& l ) {                                \
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, PLUGINS ) \
   }
//-----------------------------------------展开结果-------------------------------------------
//宏展开:
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, (chain_plugin) ) 
   }
//继续展开
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      l( appbase::app().register_plugin<chain_plugin>() );
   }

所以plugin_requires函数为http_plugin类的成员函数,参数为一个lambda表达式。调用过程为static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});,传入的表达式为[&](auto& plug){}。
所以调用此表达式,参数为appbase::app().register_plugin()。使用宏的方式实现成员函数的递归调用,之前没有遇到过这种写法,很奇怪。
再来看abstract_plugin类,此类很简单,代码很少,实现了插件的初始化、启动、停止等几个虚方法,是一个虚基类。具体代码均有其派生类的多态实现,所以下面分析另外三个类。

     virtual void set_program_options( options_description& cli, options_description& cfg ) = 0;
     virtual void initialize(const variables_map& options) = 0;
     virtual void startup() = 0;
     virtual void shutdown() = 0;

3、plugin类

plugin类是eos各个插件的父类,保存了插件的状态(注册、启动等),实现了初始化、启动等接口,内部逻辑是通过子类来实现的。

  virtual void register_dependencies() {
            static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
         }

         virtual void initialize(const variables_map& options) override {
            if(_state == registered) {
               _state = initialized;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });
               static_cast<Impl*>(this)->plugin_initialize(options);
               //ilog( "initializing plugin ${name}", ("name",name()) );
               app().plugin_initialized(*this);
            }
            assert(_state == initialized); /// if initial state was not registered, final state cannot be initiaized
         }

         virtual void startup() override {
            if(_state == initialized) {
               _state = started;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.startup(); });
               static_cast<Impl*>(this)->plugin_startup();
               app().plugin_started(*this);
            }
            assert(_state == started); // if initial state was not initialized, final state cannot be started
         }

4、net_plugin类(同级插件:http_plugin、chain_plugin等)

net_plugin类是plugin类的子类,该类主要实现了p2p插件的插件配置、初始化、启动、停止、广播区块的方法,但其内部实现均是通过的net_plugin_impl类的方法来完成的,net_plugin类包含一个net_plugin_impl类的实例(每个插件都是类似的结构)。

   class net_plugin : public appbase::plugin<net_plugin>
   {
      public:
        net_plugin();
        virtual ~net_plugin();

        APPBASE_PLUGIN_REQUIRES((chain_plugin))
        virtual void set_program_options(options_description& cli, options_description& cfg) override;

        void plugin_initialize(const variables_map& options);
        void plugin_startup();
        void plugin_shutdown();

        void   broadcast_block(const chain::signed_block &sb);

        string                       connect( const string& endpoint );
        string                       disconnect( const string& endpoint );
        optional<connection_status>  status( const string& endpoint )const;
        vector<connection_status>    connections()const;

        size_t num_peers() const;
      private:
        std::unique_ptr<class net_plugin_impl> my; 
   };

plugin_initialize 初始化插件,其本质是实例化net_plugin_impl。my为net_plugin_impl类对象的指针。

  void net_plugin::plugin_initialize( const variables_map& options ) {
      ilog("Initialize net plugin");
      try {
         // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

plugin_startup函数启动了一个p2p节点网络。包括1、设置监听循环,对其他节点发送过来的消息进行响应。2、根据配置文件中的seed节点信息,连接到其他节点,并发送消息请求,同步区块等消息(详细笔记写在下一篇)。通信用到的消息类型共分为如下几种:

  using net_message = static_variant<handshake_message,
                 chain_size_message,
                 go_away_message,
                 time_message,
                 notice_message,
                 request_message,
                 sync_request_message,
                 signed_block,
                 packed_transaction>;

5、net_plugin_impl类

该类涉及到的是核心业务的具体实现。也是最复杂的一个类,下一篇笔记重点写下net插件的学习过程。
net_plugin_impl类通信使用的是boost::asio异步通信的库。该类成员变量包括了当前连接的p2p节点对象、区块同步管理对象、链id、节点id、网络通信用的相关变量。

   class net_plugin_impl {
   public:
      unique_ptr<tcp::acceptor>        acceptor;
      tcp::endpoint                    listen_endpoint;
      string                           p2p_address;
      uint32_t                         max_client_count = 0;
      uint32_t                         max_nodes_per_host = 1;
      uint32_t                         num_clients = 0;

      vector<string>                   supplied_peers;
      vector<chain::public_key_type>   allowed_peers; ///< peer keys allowed to connect
      std::map<chain::public_key_type,
               chain::private_key_type> private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes

      enum possible_connections : char {
         None = 0,
            Producers = 1 << 0,
            Specified = 1 << 1,
            Any = 1 << 2
            };
      possible_connections             allowed_connections{None};

      connection_ptr find_connection( string host )const;

      std::set< connection_ptr >       connections;               // 已连接的p2p seed节点 指针集合
      bool                             done = false;
      unique_ptr< sync_manager >       sync_master;               // 区块同步管理类指针
      unique_ptr< dispatch_manager >   dispatcher;
      .......
}

net_plugin插件的详细内容,下一篇笔记详细写。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%8C%EF%BC%89%E5%88%86%E6%9E%90nodeos%E7%9A%84%E6%B5%81%E7%A8%8B.md

EOS区块生产和区块同步

1 概述

本文所述基于EOSv1.2.3。
EOS区块生产和同步主要涉及共识算法DPOS和aBFT,其源码实现主要涉及chain_plugin、producer_plugin、net_plugin和controller4个模块以及eosio.system智能合约等。

2 共识算法

EOS的区块生产,遵循DPoS(Delegated Proof-of-Stake)机制。
简单来说,所有拥有EOS token的人都是EOS区块生产的参与者。
任何人都可以申请出块。
任何人都可以选择不直接完成出块工作,而是将自己所持token抵押给出块申请者(PoS),委托(Delegate)他们完成出块工作。
最终,按照token比例,选出前21名出块者(BP,Block Producer),由他们代理出块。

ps:上述部分,EOS选举主要在eosio.system智能合约中实现,设置生产者队列函数为update_elected_producers()。笔者工作EOS不用于公链而是联盟链,不需要选举,故直接调用eosio.bios中的setprods()函数设置。

21名BP依次轮流出块,不像比特币等,同一时刻所有BP是竞争关系。每个BP轮到自己出块时,连续出块12个,每个块耗时500ms。

ps:上述这一块逻辑在producer_plugin中实现。

至此,我们可以简单的说,EOS的共识机制叫DPoS(More than that)。

EOS中每个区块被生产出来后,需要所有BP的确认,按照BFT共识机制确认后,才会变成不可逆的状态,在此之前都是reversible block。因此,需要进行区块在网络中的同步。
数据一致性是分布式系统数据同步的重要话题,BFT(Byzantine fault tolerance)是其中的一种代表共识机制,或者称它为算法。

如上图所示,BFT共识机制主要有以下步骤:

  • 提案1个block;
  • 进入Pre-commitment阶段,所有BP确认该提案;
  • 进入Commitment阶段,所有BP收到2/3+1或更多Pre-commit后,发送Commit;
  • 当某个BP收到2/3+1或更多Commit后,该block即不可逆。
  • 该算法适用于恶意节点不超过1/3的场景,可以保证达到最终一致性。
至此,我们可以说:EOS的共识机制叫DPoS & BFT。

BFT算法对每个区块需要发送至少2条消息,EOS对此进行了优化。BM将其称为pipelined:管道式、流水线式。
即每次生产一个区块,由于本身就需要将该区块广播到p2p网络中供其他节点同步,因此将Pre-Commit和Commit数据与该区块数据一并广播出去。
如下图所示:

上半部分描述了区块number不可逆的过程,下半部分描述了每个区块由谁Pre-Commit,由谁Commit。
假设有ABCD四个BP,每个BP每次出1个区块。BFT要求2/3+1个节点确认,即2/3*4+1=3。
假设每轮都是按照A、B、C、D的顺序出块。

  • A 出块1,Pre-Commit不可逆块为0,Commit不可逆块为0
  • B 出块2,Pre-Commit不可逆块为0,Commit不可逆块为0
  • C 出块3,ABC3个节点Pre-Commit区块1,故Pre-Commit不可逆块为1,Commit不可逆块为0
  • D 出块4,BCD3个节点Pre-Commit区块2,故Pre-Commit不可逆块为2,Commit不可逆块为0
  • A 出块5,CDA3个节点Pre-Commit区块3,故Pre-Commit不可逆块为3,CDA3个节点Commit区块1,故Commit不可逆块为1
  • B 出块6,DAB3个节点Pre-Commit区块4,故Pre-Commit不可逆块为4,DAB3个节点Commit区块2,故Commit不可逆块为2
  • 以此类推

基于上述优化,p2p网络的带宽压力、区块哈希验证频次等大幅降低。
但带来的问题是,出块、不可逆糅合在了一起,而非并行的。
BM的解释是:虽然如此,相对其它平台(比特币,以太坊)的机制来说,一个区块从生产出来到变为不可逆,其时间不算太久,且实际上每两个不可逆块之间的间隙时间非常短。长远考虑,只有这样,才能实现更好的扩展性。

原文地址:DPOS BFT— Pipelined Byzantine Fault Tolerance

至此,我们应当说:EOS的共识机制叫 DPOS BFT— Pipelined Byzantine Fault Tolerance。

3 chain_plugin

chain_plugin插件主要功能为:

  • 检查启动参数,判断是否需要replay区块链
  • 初始化和拉起Controller模块
  • 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥
  • 向用户提供链的其它set/get接口,诸如:get_account、get_transaction_id……

下图为chain_plugin核心代码逻辑

3.1 检查启动参数,判断是否需要replay()区块链

chain_plugin启动前,先调用plugin_initialize()函数,该函数检查启动参数。除了一些基本配置外,还检查replay相关参数:

  • export-reversible-blocks:会将原来的reversible_block进行导出备份,该参数单独优先检查
  • delete-all-blocks:删除所有旧的blocks,重头开始replay区块链,包括清空State DB和Block log
  • hard-replay-blockchain:清空State DB,如果有配置truncate-at-block,按照[first,truncate-at-block]区间,否则[first,end]区间重新加载Block log中的区块,即该区间的区块仍然生效不replay,其它区块全部replay。如果reversible db有数据,一并尝试恢复
  • replay-blockchain:清空State DB,如果有配置fix-reversible_blocks,则尝试恢复reversible DB
  • fix-reversible_blocks:尝试恢复reversible DB
  • import-reversible_blocks:旧的不要,用导入的替代

3.2 初始化和拉起Controller模块

当plugin_initialize()上述操作处理完后,根据最终配置参数,对Controller模块进行初始化(emplace()-->构造函数),然后在调用plugin_startup()时调用了Controller::startup,完成了Controller模块的启动

3.3 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥

  • accepted_block:当net_plugin收到一个区块,或者用户手动push的区块(可能仅仅是一个接口,该场景应当不常见),chain_plugin通过该方法转发区块到producer_plugin中
  • accepted-transactions:同理。需注意,cleos/http的push transaction和push action等,均在此处入链。

3.4 向用户提供链的其它set/get接口

实现了http接口中的一部分接口。

4 producer_plugin

producer_plugin插件主要功能为:

  • 当本节点处于生产阶段时,与chain_plugin::controller交互,调用controller相关接口进行区块生产;
  • 当本节点处于同步阶段时,接收net_plugin插件收到的区块,调用controller相关接口进行区块同步。

producer_plugin核心函数有:

  • schedule_production_loop:递归调用,生产区块
  • start_block:初始化当前区块数据
  • produce_block:打包、签名、提交区块入链
  • on_incoming_block:接收net_plugin收到的区块,入本节点链
  • on_incoming_transaction_async:接收net_plugin收到的交易,入本节点链
  • on_block:本节点区块入链成功后,相关状态更新
  • on_irreversible_block:本节点区块不可逆后,相关状态更新

下图为producer_plugin生产、同步区块的核心代码逻辑:

4.1 生产区块

如上所示,producer_plugin启动后,调用schedule_production_loop()函数,该函数是插件的最核心的函数,是一个递归函数,负责无限循环出块。
EOS500ms出一个块,因此需要启动一个定时器_timer,逻辑如下:

  • 关闭之前的定时器:_timer.cancel()
  • 调用start_block()函数初始化新的区块信息。首先调用controller::abort_block()重置数据,然后调用controller::start_block(),根据上一个区块信息生成一个新的区块,最后将由于controller::abort_block()而未来得及入链的交易(unapplied_transactions)重新push_transaction()进新的区块。
  • 重启定时器_timer(),异步等待新区块截止时间,这期间内,EOS等待并接收用户的交易请求。
  • 如果用户有新交易,调用push_transaction()执行交易并保存到区块中。
  • 如果没有其它情况,定时器到期后,调用produce_block(),对该区块进行打包(finalize_block())、签名(sign_block()),然后提交(commit_block())到本节点区块链(fork_db)上,commit_block()会发送accepted_block信号给订阅者,这其中包括producer_plugin::on_block(),该函数进行相关数据更新。fork_db每次新增一个区块,就会检查是否有新的不可逆区块产生,如果有,发送irrerersible_block信号给订阅者,这其中包括producer_plugin::on_irreversible(),该函数进行相关数据更新。最后,再次递归调用schedule_production_loop()进行下一个区块生产。
  • 如果出现其它原因,如收到网络上发送过来的区块,且定时器未到期,则会转入区块同步逻辑,之前的所有执行会被重置,未来得及执行的交易会被备份到unapplied_transactions中。

4.2 同步区块

producer_plugin的主流程是schedule_production_loop(),其中定时器_timer会根据实际情况设置等待时间。如是轮到该节点生产区块,则每次等待的时间为:总时间(500ms)-已用时间。如果未轮到该节点生产区块,则计算下一次出块时间(more than 500ms),并启动定时器等待。
如果net_plugin插件或bnet_plugin插件收到网络上的区块,则需要同步。调用on_incoming_block()函数,该函数内部逻辑与生产区块逻辑类似,主要调用controller的abort_block(),start_block(),push_transaction(),finalize_block(),commit_block()。
注意,http和cleos提供了一些接口,其中包括push_transaction,push_block接口,其逻辑见上图,比较特殊,不做太多解释。
下图给出了区块同步的更详细的说明:

重点做以下解释:

  • fork_db:区块链,其中的区块尚未不可逆,当新块到达后,可能存在分叉。需要根据最长链原则,选出较长的一条链,较短的链被删除。
  • 每个新区块基于其上一个区块出块,根据该信息,可以对旧链fork_db的head block(HB)和新的区块(new HB)分别进行追溯,将旧链中的区块合并到新链上,然后删除旧链,保存新链。主要代码在fetch_branch_from()中实现,思想请参考:拉链法

5 net_plugin

net_plugin的主要功能如下:

  • 与其他节点建立连接;
  • 向网络中广播本节点区块;
  • 接收其他节点广播的区块;
  • 节点之间区块同步。

net_plugin的代码结构如下:

5.1 与其他节点建立连接

见上图:

  • net_plugin插件启动时,根据p2p-listen-endpoint成员变量(配置文件或命令行参数p2p-listen-endpoint参数(默认值0.0.0.0:9876)),调用tcp::acceptor的listen()启动监听,调用start_listen_loop()递归调用async_accept()异步等待网络连接请求(参考socket通信)。
  • 根据supplied_peers成员变量(配置文件或命令行参数p2p-peer-address(可多个)),逐一调用async_connect()进行连接请求。
  • 其他节点收到请求后回复,并调用start_session()建立会话。
  • 本节点收到回复得知成功后,亦调用start_session()建立会话。
  • start_session()包含两步:
  • 调用start_read_message()异步循环等待其他节点的消息;
  • 调用send_handshake()发送握手消息。
  • net_plugin插件中定义了9个消息类型,其中主要消息为:
  • handshake_message:握手消息
  • notice_message: 通知消息,主要在同步时使用,进行同步状态发送
  • sync_request_message:当本节点区块链的HB number小于对方节点LIB number时发送
  • request_message:当本节点区块链的HB number小于对方节点HB number时发送
  • signed_block_message:每个区块由该消息逐一发送

5.2 向网络中广播本节点区块

net_plugin插件在启动时,订阅了chain_plugin插件的accepted_block信号,该信号在区块被提交到本地待确认不可逆数据库(fork_db)中后发送。
收到该信号后,net_plugin插件向所有连接中的网络节点广播signed_block_message。

5.3 接收其他节点广播的区块

其他节点的net_plugin插件的start_read_message异步循环等待网络消息,收到其他节点signed_block_message后,会进行判断,如果本节点没有该区块且该区块合法,则保存到本地fork_db中,如果存在分叉,则按照最长链的原则,尝试进行合并,再启用该最长链。

5.4 节点之间区块同步

每个BP连续出块12个(12*0.5s=6s),每出一个块便立即广播,理想中各节点的区块链是实时同步的,然而由于网络原因,或者后加入的节点,其往往落后其他节点很多区块。因此涉及到大量区块的同步问题。
各节点每次建立连接时会发送 handshake_message ,该消息主要用于区块同步。每次握手进行一次区块同步状态判断,同步完成后会再次发送 handshake_message,循环进行判断。
同步状态有5中场景:

  • [State 0]双方HB id相同,id为数字摘要,如果相同说明HB完全一致,不需要同步,则Alice向Bob发送notice_message。
  • [State 1]如果Alice的区块链非常短,其HB number 竟然没有对方节点LIB number大,则Alice向Bob发送sync_request_message,附带参数(start,end)表示同步区间。其中start为Alice的LIB number,end为Bob的LIB number。
  • [State 2]与State 1相反,如果Alice发现Bob的HB number < Alice的LIB number,则发送notice_message让Bob主动来请求同步数据。
  • [State 3]如果Alice的HB number >Bob 的LIB number,但Alice的HB number < Bob的HB number,则也需要同步,Alice向Bob发送request_message,Bob收到消息后,从Bob的LIB开始,一直到Bob的HB,逐一发送区块。
  • [State 4]与State 3相反,则Alice发送notice_message让Bob主动来请求同步数据。
5.4.1 同步状态0

同步状态1见总图,由于较简单,不再赘述。

5.4.2 同步状态1


如上图所示:

  • Alice向Bob请求区块同步,总区间为[Alice LIB + 1, Bob LIB]。
  • 调用request_next_chunk()对总区间分组同步,默认大小为100个区块一组(配置文件或命令行参数sync-fetch-span)。
  • net_plugin::sync_manager子模块顺序选择一个网络节点,发送消息,并启动定时器_timer异步等待5秒。如果5秒后未收到对方的signed_block_message,则取消该请求(再次发送sync_request_message,但区间为[0,0]),并重新选择一个网络节点发送消息。
  • 如果5秒内收到对方节点的signed_block_message,取消_timer定时器,判断是否是自己想要的区块,保存后判断是否同步结束。
  • 如果尚未同步结束,重启_timer定时器等待;
  • 如果分组同步结束,再次调用request_next_chunk()请求下一个分组区块;
  • 如果总区间同步结束,向所有网络节点再次发送握手消息,继续进行同步状态判断。
  • 对方节点收到sync_request_message后,按请求区块区间,循环逐一发送区块。
5.4.3 同步状态2


如上图所示,Bob收到Alice的通知后,与Alice的同步状态1类型,调用相关函数进行同步。

5.4.4 同步状态3

如上图所示,Alice向Bob请求区块,Bob收到消息后,以区间[Bob LIB+1,BOB HB]循环逐一发送区块。
如果Bob的HB number 为0,则为异常,需要告知Alice。

5.4.5 同步状态4

如上图所示,该状态下,参考同步状态3。
另附消息发送函数enqueue()逻辑:

5.5 补充

net_plugin插件启动并建立连接后,会调用start_monitors(),一是监听网络连接状态,如果断开会重新尝试建立连接;二是监听交易时间,如果交易超时,则会将其移除入链队列。

6 controller

Controller模块位于/libraries/chain/下,是EOS区块入链的核心控制器,内容非常多,也非常重要。
Controller主要功能为:

  • 被chain_plugin初始化和启动
  • 对上提供区块和交易等相关接口
  • 对上提供区块和交易入链进度相关信号
  • 对下操作相关数据接口,进行数据管理

下图为Controller核心代码逻辑:

6.1 被chain_plugin初始化和启动

Controller模块由chain_plugin负责初始化和启动,chain_plugin根据启动参数启动Controller,从而对底层数据结构进行初始化操作。

6.2 对上提供区块和交易等相关接口

区块相关的接口有:

  • abort_block() :取消上一个正在生产的区块,其中的交易转到本次新区块中处理
  • start_block() :开始一个新区块生产,启动一个异步定时器等待交易被插入,定时器结束后开始打包等后续工作。此处还会进行BFT共识机制的处理。
  • finalize_block():本区块时间已到,打包区块
  • sign_block():对区块进行签名
  • commit_block():提交区块入fork_db,等待不可逆
  • push_block() : 主要用于初始化时或同步时插入区块,内部调用apply_block()函数以及上述几个函数,但没有异步定时器等代码。
  • pop_block():同步时fork_db发现分叉,需要对短链进行pop_block()
  • on_irreversible():区块不可逆时触发

交易相关的接口有:

  • push_transaction() : 插入交易
  • push_scheduled_transaction() : 插入延期的交易

其它接口:

  • set_proposed_producers() :更新BP列表

6.3 对上提供区块和交易入链进度相关信号

提供了一些进度信号,producer_plugin、mongodb_plugin、net_plugin等等会进行订阅,从而完成各自的功能。
相关信号有:

  • pre_accepted_block :调用push_block()(同步、刚启动时从数据库恢复),区块尚未add()到fork_db之前,先发送这个信号
  • accepted_block_header:调用push_block()或commit_block()(生产区块),区块被add()到fork_db后,发送该信号
  • accepted_block:调用commit_block(),区块被add()到fork_db后,发送该信号
  • irreversible_block:调用push_block()恢复数据时或on_irreversible()时,发送该信号
  • accepted_transaction:调用push_transaction()或push_scheduled_transaction()成功后,发送该信号
  • applied_transaction,同上

例如,mongodb_plugin收到accepted_block后会将数据写入mongodb。

6.4 对下操作相关数据接口,进行数据管理

数据结构包括如下:

  • pending :正在生产的区块,abort_block(),start_block()等均对它修改
  • unapplied_transactions:上一个区块未出块成功,则其交易在abort_block()时转存到这里,以便新的区块start_block()时重新push_transaction()
  • fork_db:区块生产完成后插入这里,可能存在分叉。当进行区块同步时,如果检测到分叉,则进行最长链的生成和选择。push_block()和commit_block()时调用add(),不可逆时或分叉时调用erase()
  • head : 指向fork_db的头块,用于生成下一个区块,以及进行快速比较等操作,fork_db变化则同时变化head
  • reversible_blocks:可逆的区块链。commit_block()时调用add(),on_ireeversible(),pop_block()时调用erase()
  • blog:不可逆的区块链,append only,on_irreversile()时调用append()
  • db : 不仅保存了区块,还保存了智能合约数据,账号数据等等其它数据。大部分数据修改的地方也会对其进行更新。

注意:

  • 区块广播时发送的是signed_block结构体,其继承signed_block_header-->block_header。
  • 各节点存储的数据还有更多信息,如DPOS+BFT相关的数据,存储在block_state-->block_header_state中。
  • signed_block中存储了所有交易的摘要:vector

7 eosio.system

主要实现了抵押RAM、申请BP、申请代理,投票、选举BP等功能。

转载自:https://my.oschina.net/u/4069047/blog/3005068

EOS节点数据备份

简介

最近在做eos数据落盘的项目,遇到了很多场景,都需要备份数据。所以,今天跟大家分享下备份节点数据的小工具。

前言

eos节点非正常退出,甚至节点升级,都会意外导致脏数据的产生,节点数据不得不重新同步。官方给的数据重新同步方法,是用下面的命令重启节点:

nodeos --hard-replay

不过,实际使用起来会发现replay数据执行的非常慢,普通的全节点也要两天时间才能完成重新同步。归根结底,还是eos节点数据的验证机制导致的慢。具体原因,大家可以看看这篇文章:痛苦的EOS数据同步,可能的EOS安全隐患

简而言之,eos之所有不能像以太坊那样做到快速同步,是因为eos没有将状态数据生成merkle root,存储上链。所以,eos重新同步数据时,节点不能信任其他节点,只能重新执行一遍交易,亲自验证交易状态,当然会极大影响同步速度。另外,节点的同步只能从创世块开始,不能按照时间分片,同步相当于重新计算一遍全网交易,速度可想而知。

数据同步

eos的节点数据是存储在data目录的,data目录里有blocks和state目录,分别存储区块信息和状态信息:

mac os:~/Library/Application\ Support/eosio/nodeos/data
Linux:~/.local/share/eosio/nodeos/data

因为eos数据是按文件形式存储的,常用的备份方法是将data目录整个拷贝一份,当数据坏了时,启用备用data目录,则节点会从备用data目录的高度开始,再同步之后的数据。这个方法简单,但是普通的全节点,不开history-plugin时数据30G+,开启history-plugin数据可达500G。简单的cp操作处理备份,显然需要数小时不等,还是太慢。

pitreos工具

pitreos是加拿大超级节点 eos canada 出品的节点数据备份工具,项目地址:pitreos。
它实现了大文件的快速备份和恢复。以下是30G大文件备份和恢复的实测效果,备份和恢复处理时间各约30s,极大的提升了效率。

  • 生成30G大文件
  • 备份30G大文件,时间约30s
  • 查看备份tag
  • 恢复备份,时间约30s

pitreos安装步骤

1. 需要安装golang、设置gopath
2. cd $GOPATH/src/github.com/eoscanada
3. git clone https://github.com/eoscanada/pitreos.git
4. cd $GOPATH/src/github.com/eoscanada/pitreos && go get -v ./... && go install -v

pitreos常用命令

备份:pitreos backup ./mydata -t <备份tag>。备份文件默认存在:$HOME/.pitreos/backups
恢复:pitreos -c restore <备份tag> ./mydata。
查询tag:pitreos list

转载自:https://www.jianshu.com/p/16f193b49a39

一些三方推荐插件

队列插件:

将区块链数据推入队列

kafka_plugin - 卡夫卡
eos_zmq_plugin - ZeroMQ
eos-rabbitmq-plugin - RabbitMQ

数据库插件:

将区块链数据同步到数据库中

eosio_sql_plugin - 基于SOCI的SQL数据库
eos_sql_db_plugin - MySQL数据库
elasticsearch_plugin - ElasticSearch

推送插件:

通过其他协议将区块链数据通知给消费者:

eosio-watcher-plugin - 针对链操作的HTTP POST到端点
eosio_block_subscription_plugin - 用于链操作的推送通知的TCP服务器

调试插件

eosio_all_code_dump_plugin - 将所有合同代码转储到目录

Block Producer插件:

eos-producer-heartbeat-plugin - BP Heartbeat
blacklist_plugin - 与BP黑名单合同整合

转载自:https://github.com/firesWu/awesome-eosio-plugins