您正在查看: Surou 发布的文章

EOS源码分析之六共识

eos源码分析之六共识

一、EOS使用的共识

EOS使用的是与传统的共识方法不同的DPOS共识机制,而且在最新的版本中已经更改为了BFT-DPOS机制,在网上看到BM说他又找到了一种更新的共识机制,可以解决被超级节点控制的问题,不知道最终会是什么样子,在比特币和以太坊都使用POW的共识的前提下,EOS使用DPOS机制,可以说是解决高并发的一个比较好的方法。但是,DPOS机制很容易由于节点太少被攻击,事实上也是如此。那么什么是DPOS呢?EOS是怎么使用其进行块之间的共识的呢?


提到dpos,就不得不提到pos,PoS全称Proof of Stake,意为权益证明。说得直白一些就是谁存款多,存款时间长,谁就有权出块(记帐)。这个解决了POW一个痛点,即它不用挖矿,所以也不用耗费老多的电能。但是这这个算法有个致命问题,资本决定了一切,所以很容易被有钱人垄断。


DPOS比POS多了一个D,它的意义是授权,委托。二者的区别是,DPOS需要POS的持有者来通过选举代表,由代表实现出块。而在EOS中则有21个出块者(BP,BlcokProducer),或者叫超级节点。还有101个备份节点。当21个BP的15个确认交易后,交易即不可逆转。

二、共识的过程



1、初始化的共识


EOS初始启动是外在选举的21个超级节点,所以不涉及代码部分。但是一旦启动后会开始新的节点选举,选举成功后,将进行BFT-DPOS共识。


2、选举


主要的代码在contracts/social和eosio.system/voting.cpp中。在cleos的main.cpp中会发现几个数据结构体和相关的应用:


 auto registerProducer = register_producer_subcommand(system);
 auto unregisterProducer = unregister_producer_subcommand(system);

 auto voteProducer = system->add_subcommand("voteproducer", localized("Vote for a producer"));
 voteProducer->require_subcommand();
 auto voteProxy = vote_producer_proxy_subcommand(voteProducer);
 auto voteProducers = vote_producers_subcommand(voteProducer);
 auto approveProducer = approve_producer_subcommand(voteProducer);
 auto unapproveProducer = unapprove_producer_subcommand(voteProducer);

 auto listProducers = list_producers_subcommand(system);

 auto delegateBandWidth = delegate_bandwidth_subcommand(system);
 auto undelegateBandWidth = undelegate_bandwidth_subcommand(system);
 auto listBandWidth = list_bw_subcommand(system);



这些代码会驱动程序在启动后进行相应的动作。选举在EOS中其实也分成两类,即每人独自发起选举,也可以通过代理人代替自己选举,但结果就是本人就无法再投票了。相应的代码如下:

 /**
 *  @pre producers must be sorted from lowest to highest and must be registered and active
 *  @pre if proxy is set then no producers can be voted for
 *  @pre if proxy is set then proxy account must exist and be registered as a proxy
 *  @pre every listed producer or proxy must have been previously registered
 *  @pre voter must authorize this action
 *  @pre voter must have previously staked some EOS for voting
 *  @pre voter->staked must be up to date
 *
 *  @post every producer previously voted for will have vote reduced by previous vote weight
 *  @post every producer newly voted for will have vote increased by new vote amount
 *  @post prior proxy will proxied_vote_weight decremented by previous vote weight
 *  @post new proxy will proxied_vote_weight incremented by new vote weight
 *
 *  If voting for a proxy, the producer votes will not change until the proxy updates their own vote.
 */
 //上面的介绍过程挺详细
 void system_contract::voteproducer( const account_name voter_name, const account_name proxy, const std::vector<account_name>& producers ) {
    require_auth( voter_name );//验证资格
    update_votes( voter_name, proxy, producers, true );
 }
 void system_contract::update_votes( const account_name voter_name, const account_name proxy, const std::vector<account_name>& producers, bool voting ) {
   //validate input
   if ( proxy ) {//判断是否为代理
      eosio_assert( producers.size() == 0, "cannot vote for producers and proxy at same time" );
      eosio_assert( voter_name != proxy, "cannot proxy to self" );
      require_recipient( proxy );//添加代理帐户
   } else {
      eosio_assert( producers.size() <= 30, "attempt to vote for too many producers" );
      for( size_t i = 1; i < producers.size(); ++i ) { //验证英文注释中的排序
         eosio_assert( producers[i-1] < producers[i], "producer votes must be unique and sorted" );
      }
   }

   //验证资格
   auto voter = \_voters.find(voter_name);
   eosio_assert( voter \!= \_voters.end(), "user must stake before they can vote" ); /// staking creates voter object
   eosio_assert( !proxy || !voter->is_proxy, "account registered as a proxy is not allowed to use a proxy" );

   /*
    * The first time someone votes we calculate and set last_vote_weight, since they cannot unstake until
    * after total_activated_stake hits threshold, we can use last_vote_weight to determine that this is
    * their first vote and should consider their stake activated.
    \*/
  //计算权重,用来控制其抵押股权状态,并确定其是否为第一次投票
   if( voter->last_vote_weight <= 0.0 ) {
    \_gstate.total_activated_stake += voter->staked;
      if( \_gstate.total_activated_stake >= min_activated_stake ) {
         \_gstate.thresh_activated_stake_time = current_time();
      }
   }

   //计算权重
   auto new_vote_weight = stake2vote( voter->staked );
   if( voter->is_proxy ) {//是否代理
      new_vote_weight += voter->proxied_vote_weight;
   }

  //处理投票
   boost::container::flat_map<account_name, pair<double, bool /*new*/> > producer_deltas;
   if ( voter->last_vote_weight > 0 ) {
      if( voter->proxy ) {
         auto old_proxy = \_voters.find( voter->proxy );
         eosio_assert( old_proxy != \_voters.end(), "old proxy not found" ); //data corruption
         \_voters.modify( old_proxy, 0, [&]( auto& vp ) {//投票后减去相应权重,对应英文注释
               vp.proxied_vote_weight -= voter->last_vote_weight;
            });
         propagate_weight_change( *old_proxy ); //继续更新相关权重
      } else {
        //非代理直接操作,一票三十投
         for( const auto& p : voter->producers ) {
            auto& d = producer_deltas[p];
            d.first -= voter->last_vote_weight;
            d.second = false;
         }
      }
   }

   //处理得票
   if( proxy ) {//处理代理
      auto new_proxy = \_voters.find( proxy );
      eosio_assert( new_proxy != \_voters.end(), "invalid proxy specified" ); //if ( !voting ) { data corruption } else { wrong vote }
      eosio_assert( !voting || new_proxy->is_proxy, "proxy not found" );
      if ( new_vote_weight >= 0 ) {
         \_voters.modify( new_proxy, 0, [&]( auto& vp ) {
               vp.proxied_vote_weight += new_vote_weight;
            });
         propagate_weight_change( *new_proxy );
      }
   } else {
      if( new_vote_weight >= 0 ) {
         for( const auto& p : producers ) {
            auto& d = producer_deltas[p];
            d.first += new_vote_weight;
            d.second = true;
         }
      }
   }

  //  投票资格验证
   for( const auto& pd : producer_deltas ) {
      auto pitr = \_producers.find( pd.first );
      if( pitr != \_producers.end() ) {
         eosio_assert( !voting || pitr->active() || !pd.second.second /* not from new set */, "producer is not currently registered" );
         \_producers.modify( pitr, 0, [&]( auto& p ) {
            p.total_votes += pd.second.first;
            if ( p.total_votes < 0 ) { // floating point arithmetics can give small negative numbers
               p.total_votes = 0;
            }
            \_gstate.total_producer_vote_weight += pd.second.first;
            //eosio_assert( p.total_votes >= 0, "something bad happened" );
         });
      } else {
         eosio_assert( !pd.second.second /* not from new set */, "producer is not registered" ); //data corruption
      }
   }

  //更新选举状态
   \_voters.modify( voter, 0, [&]( auto& av ) {
      av.last_vote_weight = new_vote_weight;
      av.producers = producers;
      av.proxy     = proxy;
   });
}



在前面的投票过程中发现,其实要想选举和成为出块者,都需要先行去注册,在最初的Main函数里也提到相应的子命令,那么看一下对应的代码:

 /**
  *  This method will create a producer_config and producer_info object for 'producer'
  *
  *  @pre producer is not already registered
  *  @pre producer to register is an account
  *  @pre authority of producer to register
  *
  */
 void system_contract::regproducer( const account_name producer, const eosio::public_key& producer_key, const std::string& url, uint16_t location ) {
    eosio_assert( url.size() < 512, "url too long" );
    eosio_assert( producer_key != eosio::public_key(), "public key should not be the default value" );
    require_auth( producer );

    //查找是否已注册
    auto prod = \_producers.find( producer );

    if ( prod != \_producers.end() ) { //已注册
       if( producer_key != prod->producer_key ) {//已注册,但KEY不同,即同名不同人,修改相关设置
           \_producers.modify( prod, producer, [&]( producer_info& info ){
                info.producer_key = producer_key;
                info.is_active    = true;
                info.url          = url;
                info.location     = location;
           });
       }
    } else {//全新加入
       \_producers.emplace( producer, [&]( producer_info& info ){
             info.owner         = producer;
             info.total_votes   = 0;
             info.producer_key  = producer_key;
             info.is_active     = true;
             info.url           = url;
             info.location      = location;
       });
    }
 }
//找到相关,删除
 void system_contract::unregprod( const account_name producer ) {
    require_auth( producer );

    const auto& prod = \_producers.get( producer, "producer not found" );

    \_producers.modify( prod, 0, [&]( producer_info& info ){
          info.deactivate();
    });
 }
//更新相关出块人
 void system_contract::update_elected_producers( block_timestamp block_time ) {
    \_gstate.last_producer_schedule_update = block_time;

    auto idx = \_producers.get_index<N(prototalvote)>();

    std::vector< std::pair<eosio::producer_key,uint16_t> > top_producers;
    top_producers.reserve(21);//一票30投,但只取21,后49备用,再后忽略

    for ( auto it = idx.cbegin(); it != idx.cend() && top_producers.size() < 21 && 0 < it->total_votes && it->active(); ++it ) {
       top_producers.emplace_back( std::pair<eosio::producer_key,uint16_t>({{it->owner, it->producer_key}, it->location}) );
    }

    if ( top_producers.size() < \_gstate.last_producer_schedule_size ) {
       return;
    }

    /// sort by producer name
    std::sort( top_producers.begin(), top_producers.end() );

    std::vector<eosio::producer_key> producers;

    producers.reserve(top_producers.size());
    for( const auto& item : top_producers )
       producers.push_back(item.first);

    bytes packed_schedule = pack(producers);

    if( set_proposed_producers( packed_schedule.data(),  packed_schedule.size() ) >= 0 ) {
       \_gstate.last_producer_schedule_size = static_cast<decltype(\_gstate.last_producer_schedule_size)>( top_producers.size() );
    }
 }
 /**
 *  An account marked as a proxy can vote with the weight of other accounts which
 *  have selected it as a proxy. Other accounts must refresh their voteproducer to
 *  update the proxy's weight.
 *
 *  @param isproxy - true if proxy wishes to vote on behalf of others, false otherwise
 *  @pre proxy must have something staked (existing row in voters table)
 *  @pre new state must be different than current state
 */
 //注册成代理人
void system_contract::regproxy( const account_name proxy, bool isproxy ) {
   require_auth( proxy );

   auto pitr = \_voters.find(proxy);
   if ( pitr != \_voters.end() ) {
      eosio_assert( isproxy != pitr->is_proxy, "action has no effect" );
      eosio_assert( !isproxy || !pitr->proxy, "account that uses a proxy is not allowed to become a proxy" );
      \_voters.modify( pitr, 0, [&]( auto& p ) {
            p.is_proxy = isproxy;
         });
      propagate_weight_change( *pitr );
   } else {
      \_voters.emplace( proxy, [&]( auto& p ) {
            p.owner  = proxy;
            p.is_proxy = isproxy;
         });
   }
}



分析投票及相关方法后,开始处理投票的交易动作:

 /**
  * When a user posts we create a record that tracks the total votes and the time it
  * was created. A user can submit this action multiple times, but subsequent calls do
  * nothing.
  *
  * This method only does something when called in the context of the author, if
  * any other contexts are notified
  */
 void apply_social_post() {
    const auto& post   = current_action<post_action>();
    require_auth( post.author );

    eosio_assert( current_context() == post.author, "cannot call from any other context" );

    static post_record& existing;
    if( !Db::get( post.postid, existing ) )
       Db::store( post.postid, post_record( now() ) );
 }

 /**
  * This action is called when a user casts a vote, it requires that this code is executed
  * in the context of both the voter and the author. When executed in the author's context it
  * updates the vote total.  When executed
  */
 void apply_social_vote() {
    const auto& vote  = current_action<vote_action>();
    require_recipient( vote.voter, vote.author );
    disable_context_code( vote.author() ); /// prevent the author's code from rejecting the potentially negative vote

    auto context = current_context();
    auto voter = vote.getVoter();

    if( context == vote.author ) {
       static post_record post;
       eosio_assert( Db::get( vote.postid, post ) > 0, "unable to find post" );
       eosio_assert( now() - post.created < days(7), "cannot vote after 7 days" );
       post.votes += vote.vote_power;
       Db::store( vote.postid, post );
    }
    else if( context == vote.voter ) {
       static account vote_account;
       Db::get( "account", vote_account );
       auto abs_vote = abs(vote.vote_power);
       vote_account.vote_power = min( vote_account.social_power,
                                      vote_account.vote_power + (vote_account.social_power * (now()-last_vote)) / days(7));
       eosio_assert( abs_vote <= vote_account.vote_power, "insufficient vote power" );
       post.votes += vote.vote_power;
       vote_account.vote_power -= abs_vote;
       vote_account.last_vote  = now();
       Db::store( "account", vote_account );
    } else {
       eosio_assert( false, "invalid context for execution of this vote" );
    }
 }



3、共识


前面的选举过程其实就DPOS的过程,只不过,没有出块,体现不出来它的价值,在EOS的最新版本中采用了BFT-DPOS,所以看下面的数据结构:


 struct block_header_state {
......
    uint32_t                          dpos_proposed_irreversible_blocknum = 0;
    uint32_t                          dpos_irreversible_blocknum = 0;
    uint32_t                          bft_irreversible_blocknum = 0;  //BFT
......
  };



这个变量bft_irreversible_blocknum是在push_confirmation中被赋值。connection::blk_send中广播。

三、出块



出块的代码主要在producer_plugin中:

 producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
   ......
   //省略各种出块条件的前期判断
   .......
   if (\_pending_block_mode == pending_block_mode::producing) {
   // determine if our watermark excludes us from producing at this point
   if (currrent_watermark_itr != \_producer_watermarks.end()) {
      if (currrent_watermark_itr->second >= hbs->block_num + 1) {
         elog("Not producing block because \"${producer}\" signed a BFT confirmation OR block at a higher block number (${watermark}) than the current fork's head (${head_block_num})",
             ("producer", scheduled_producer.producer_name)
             ("watermark", currrent_watermark_itr->second)
             ("head_block_num", hbs->block_num));
         \_pending_block_mode = pending_block_mode::speculating;
      }
   }
}

try {
   uint16_t blocks_to_confirm = 0;

   if (\_pending_block_mode == pending_block_mode::producing) {
      // determine how many blocks this producer can confirm
      // 1) if it is not a producer from this node, assume no confirmations (we will discard this block anyway)
      // 2) if it is a producer on this node that has never produced, the conservative approach is to assume no
      //    confirmations to make sure we don't double sign after a crash TODO: make these watermarks durable?
      // 3) if it is a producer on this node where this node knows the last block it produced, safely set it -UNLESS-
      // 4) the producer on this node's last watermark is higher (meaning on a different fork)
      if (currrent_watermark_itr != \_producer_watermarks.end()) {
         auto watermark = currrent_watermark_itr->second;
         if (watermark < hbs->block_num) {
            blocks_to_confirm = std::min<uint16_t>(std::numeric_limits<uint16_t>::max(), (uint16_t)(hbs->block_num - watermark));
         }
      }
   }

   chain.abort_block();
   chain.start_block(block_time, blocks_to_confirm);//调用真正的Controller.cpp出块
} FC_LOG_AND_DROP();
......
}       
 //时间调度不断循环出块
 void producer_plugin_impl::schedule_production_loop() {
    chain::controller& chain = app().get_plugin<chain_plugin>().chain();
    \_timer.cancel();
    std::weak_ptr<producer_plugin_impl> weak_this = shared_from_this();

    auto result = start_block();//出块

    if (result == start_block_result::failed) {
       elog("Failed to start a pending block, will try again later");
       \_timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us  / 10 ));

       // we failed to start a block, so try again later?
       \_timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
          auto self = weak_this.lock();
          if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
             self->schedule_production_loop();
          }
       });
    } else if (\_pending_block_mode == pending_block_mode::producing) {
      \_timer.async_wait([&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
        auto self = weak_this.lock();
        if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
           auto res = self->maybe_produce_block();//完成出块
           fc_dlog(\_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) );
        }
     });
......
} else if (\_pending_block_mode == pending_block_mode::speculating && !\_producers.empty() && !production_disabled_by_policy()){
       // if we have any producers then we should at least set a timer for our next available slot
       optional<fc::time_point> wake_up_time;
       for (const auto&p: \_producers) {
          auto next_producer_block_time = calculate_next_block_time(p);
          if (next_producer_block_time) {
             auto producer_wake_up_time = \*next_producer_block_time - fc::microseconds(config::block_interval_us);
             if (wake_up_time) {
                // wake up with a full block interval to the deadline
                wake_up_time = std::min<fc::time_point>(\*wake_up_time, producer_wake_up_time);
             } else {
                wake_up_time = producer_wake_up_time;
             }
          }
       }

       if (wake_up_time) {
.......
       } else {
          ......
       }
    } else {
       fc_dlog(\_log, "Speculative Block Created");
    }
 }

 //在操作中断时启动异步出块
 bool producer_plugin_impl::maybe_produce_block() {
    auto reschedule = fc::make_scoped_exit([this]{
       //退出本范围重新启动正常出块
       schedule_production_loop();
    });

    try {
       produce_block();//出块
       return true;
    } FC_LOG_AND_DROP();

    //处理异常时的出块
    fc_dlog(\_log, "Aborting block due to produce_block error");
    chain::controller& chain = app().get_plugin<chain_plugin>().chain();
    chain.abort_block();
    return false;
 }
 void producer_plugin_impl::produce_block() {
......

   //idump( (fc::time_point::now() - chain.pending_block_time()) );
   chain.finalize_block();// 完成出块---下面是签名和提交块
   chain.sign_block( [&]( const digest_type& d ) {
      auto debug_logger = maybe_make_debug_time_logger();
      return signature_provider_itr->second(d);
   } );
   chain.commit_block();
......
 }



真正的出块是在controller.hpp.cpp中,需要注意的是按照EOS一惯的风格,真正的代码在controller_impl类中:

 void start_block( block_timestamp_type when, uint16_t confirm_block_count, controller::block_status s ) {
    FC_ASSERT( !pending );

    FC_ASSERT( db.revision() == head->block_num, "",
              ("db.revision()", db.revision())("controller_head_block", head->block_num)("fork_db_head_block", fork_db.head()->block_num) );

    auto guard_pending = fc::make_scoped_exit([this](){
       pending.reset();
    });
    //创建pending,块在其中
    pending = db.start_undo_session(true);

    pending->_block_status = s;

    pending->_pending_block_state = std::make_shared<block_state>( \*head, when ); // promotes pending schedule (if any) to active
    pending->_pending_block_state->in_current_chain = true;

    pending->_pending_block_state->set_confirmed(confirm_block_count);

    auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending();


    //判断当前的状态并设置相关参数
    const auto& gpo = db.get<global_property_object>();
    if( gpo.proposed_schedule_block_num.valid() && // if there is a proposed schedule that was proposed in a block ...
        ( *gpo.proposed_schedule_block_num <= pending->_pending_block_state->dpos_irreversible_blocknum ) && // ... that has now become irreversible ...
        pending->_pending_block_state->pending_schedule.producers.size() == 0 && // ... and there is room for a new pending schedule ...
        !was_pending_promoted // ... and not just because it was promoted to active at the start of this block, then:
      )
    {
       // Promote proposed schedule to pending schedule.
       if( !replaying ) {
          ilog( "promoting proposed schedule (set in block ${proposed_num}) to pending; current block: ${n} lib: ${lib} schedule: ${schedule} ",
                ("proposed_num", \*gpo.proposed_schedule_block_num)("n", pending->_pending_block_state->block_num)
                ("lib", pending->_pending_block_state->dpos_irreversible_blocknum)
                ("schedule", static_cast<producer_schedule_type>(gpo.proposed_schedule) ) );
       }
       pending->_pending_block_state->set_new_producers( gpo.proposed_schedule );
       db.modify( gpo, [&]( auto& gp ) {
          gp.proposed_schedule_block_num = optional<block_num_type>();
          gp.proposed_schedule.clear();
       });
    }

    try {
      //装填交易的实际数据
       auto onbtrx = std::make_shared<transaction_metadata>( get_on_block_transaction() );
       push_transaction( onbtrx, fc::time_point::maximum(), true, self.get_global_properties().configuration.min_transaction_cpu_usage );
    } catch( const boost::interprocess::bad_alloc& e  ) {
       elog( "on block transaction failed due to a bad allocation" );
       throw;
    } catch( const fc::exception& e ) {
       wlog( "on block transaction failed, but shouldn't impact block generation, system contract needs update" );
       edump((e.to_detail_string()));
    } catch( ... ) {
       wlog( "on block transaction failed, but shouldn't impact block generation, system contract needs update" );
    }

    clear_expired_input_transactions();//清除相关交易
    update_producers_authority();//更新生产者相关的权限
    guard_pending.cancel();//解除锁
 }



finalize_block 、sign_block、 commit_block 、abort_block等与签名和提交部分的代码都在这个模块中,就不再赘述,看代码就可以了。

转载自:https://github.com/XChainLab/documentation/edit/master/eos/eos%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E5%85%AD%E5%85%B1%E8%AF%86.md

EOS源码分析之五虚拟机

eos源码分析之五虚拟机

因为6月2日,blockone团队发布了上线的源码,所以从这里开始基于最新的1.01版本来分析。

一、虚拟机的模块

虚拟机的代码主要分散在了以下几个目录, 主要在智能合约目录contracts,一些辅助的ABI的源码,区块链目录library/chain,是一些编译的接口,library/wasm-jit目录下,是主要的文件部分,然后在externals/src下也有相当一部分的二进制编译代码。其它一些目录下也有相关的一些文件,但比较分散代码也很少。重点分析编译过程。


虚拟机的模块分成两部分,也就是编译部分和执行部分。智能合约在编译过程中会产生两个文件,一个是.wast,一个是.abi文件。

二、编译过程

1、wast文件的生成



eoscpp是编译智能合约的命令,在tools目录下,eosiocpp.in中:

function build_contract {
    set -e
    workdir=`mktemp -d`

    if [[ ${VERBOSE} == "1" ]]; then
       PRINT_CMDS="set -x"
    fi

    ($PRINT_CMDS; mkdir $workdir/built)

    for file in $@; do
        name=`basename $file`
        filePath=`dirname $file`

        ($PRINT_CMDS; @WASM_CLANG@ -emit-llvm -O3 --std=c++14 --target=wasm32 -nostdinc \
                                   -nostdlib -nostdlibinc -ffreestanding -nostdlib -fno-threadsafe-statics -fno-rtti \
                                   -fno-exceptions -I ${EOSIO_INSTALL_DIR}/include \
                                   -I${EOSIO_INSTALL_DIR}/include/libc++/upstream/include \
                                   -I${EOSIO_INSTALL_DIR}/include/musl/upstream/include \
                                   -I${BOOST_INCLUDE_DIR} \
                                   -I $filePath \
                                   -c $file -o $workdir/built/$name
        )

    done

    ($PRINT_CMDS; @WASM_LLVM_LINK@ -only-needed -o $workdir/linked.bc $workdir/built/* \
                                   ${EOSIO_INSTALL_DIR}/usr/share/eosio/contractsdk/lib/eosiolib.bc \
                                   ${EOSIO_INSTALL_DIR}/usr/share/eosio/contractsdk/lib/libc++.bc \
                                   ${EOSIO_INSTALL_DIR}/usr/share/eosio/contractsdk/lib/libc.bc


    )
    ($PRINT_CMDS; @WASM_LLC@ -thread-model=single --asm-verbose=false -o $workdir/assembly.s $workdir/linked.bc)
    ($PRINT_CMDS; ${EOSIO_INSTALL_DIR}/bin/eosio-s2wasm -o $outname -s 16384 $workdir/assembly.s)
    ($PRINT_CMDS; ${EOSIO_INSTALL_DIR}/bin/eosio-wast2wasm $outname ${outname%.\*}.wasm -n)

    ($PRINT_CMDS; rm -rf $workdir)
    set +e
}



首先调用了 @WASM_CLANG@ -emit-llvm -O3的编译,这和安装LLVM和CLANG有必然的关系。然后它会调用相关的链接库,关键还是最后几行代码:

bin/eosio-s2wasm和bin/eosio-wast2wasm。


从这里基本已经看出LLVM还是要和EOS内部的一些代码一起工作,才能搞定所有的流程。主要的编译工作由LLVM及其相关的模块构成,在这个过程中使用了一种叫做C++ without Emscripten的过程即:直接用 clang 的前端编译到 LLVM 的 bc,然后 llc 编译到汇编文件 s,再用 Binaryen 的工具 s2wasm 从汇编文件编译到 wasm 的 ast 文件 wast,最后用 wasm-as 编译到 wasm。

可能为了数据的通用性和更好的适配性,编译过程中的许多文件都提供了相关工具命令可以来回转换,比如a.ll和a.bc之间可以通过llvm-as和llvm-dis命令相互转换。


LLVM IR主要有三种格式:一种是在内存中的编译中间语言;一种是硬盘上存储的二进制中间语言(以.bc结尾),最后一种是可读的中间格式(以.ll结尾)。这三种中间格式是完全相等的。

主要编译的流程基本如下面这样:


cpp-(CLANG+LLVM工具)-> *.bc-(LLVM)->*.s-(Binaryen)->s2wasm-(Binaryen)->wasm2wast--->*.wast




abi文件在WIKI中可以找到,但是在WIKI中没有wast的相关格式,下面的wast文件的内容是从EMCC的官网上扒下来的:

;; tests/hello_world.c:4
(drop
  (call $\_printf
    (i32.const 1144)
    (get_local $$vararg_buffer)
  )
)
;; tests/hello_world.c:5
(return
  (i32.const 0)
)



明白了编译流程再看源码就清楚很多了,为了保证多种数据的加载,就得写一些相关的加载的代码,举一个例子:

class wasm_runtime_interface {
......
};
class binaryen_runtime : public eosio::chain::wasm_runtime_interface
{......};
class wavm_runtime : public eosio::chain::wasm_runtime_interface
 {.....};



也就是说,要保证前面说过的相关文件的正确加载,特别是好多可以互相转换的文件的加载。下面以编译一个Assembly(.wast--->.wasm)为例分析一下: libraries/wasm-jit/Source/Programs中的Assemble.cpp

int commandMain(int argc,char** argv)
{
......

    // Load the WAST module.
    IR::Module module;
    if(!loadTextModule(inputFilename,module)) { return EXIT_FAILURE; }

......

    // Write the binary module.
    if(!saveBinaryModule(outputFilename,module)) { return EXIT_FAILURE; }

    return EXIT_SUCCESS;
}



工作其实非常简单,加载WAST的模块到中间IR,然后保存成二进制的文件。保存的那个函数非常简单没啥可说的,分析下加载:

inline bool loadTextModule(const char* filename,IR::Module& outModule)
{
    // Read the file into a string.
    auto wastBytes = loadFile(filename);
.....

    return loadTextModule(filename,wastString,outModule);
}
inline bool loadTextModule(const char* filename,const std::string& wastString,IR::Module& outModule)
{
    std::vector<WAST::Error> parseErrors;
  //分析WASM中的模块,在webassembly中,实例都是以模块出现的,详情可看LLVM及webassembly
    WAST::parseModule(wastString.c_str(),wastString.size(),outModule,parseErrors);
    if(!parseErrors.size()) { return true; }
    else
    {
......
    }
}
bool parseModule(const char* string,Uptr stringLength,IR::Module& outModule,std::vector<Error>& outErrors)
{
  Timing::Timer timer;

  // Lex the string.
  LineInfo* lineInfo = nullptr;
  std::vector<UnresolvedError> unresolvedErrors;
  Token* tokens = lex(string,stringLength,lineInfo);
  ModuleParseState state(string,lineInfo,unresolvedErrors,tokens,outModule);

  try
  {
    // Parse (module ...)<eof>
    parseParenthesized(state,[&]
    {
      require(state,t_module);
      parseModuleBody(state);
    });
    require(state,t_eof);
  }
......
}
}
void parseModuleBody(ModuleParseState& state)
{
  const Token* firstToken = state.nextToken;

  // Parse the module's declarations.
  while(state.nextToken->type != t_rightParenthesis)
  {
    parseDeclaration(state);//直接调用声明分析,用来判断跳转到哪个部分进行具体的分析
  };

......
  IR::setDisassemblyNames(state.module,state.disassemblyNames);
}
static void parseDeclaration(ModuleParseState& state)
{
    parseParenthesized(state,[&]
    {
        switch(state.nextToken->type)
        {
      //WebAssembly 中的导入的相关符号,并进入相关的分析函数
        case t_import: parseImport(state); return true;
        case t_export: parseExport(state); return true;
        case t_global: parseGlobal(state); return true;
        case t_memory: parseMemory(state); return true;
        case t_table: parseTable(state); return true;
        case t_type: parseType(state); return true;
        case t_data: parseData(state); return true;
        case t_elem: parseElem(state); return true;
        case t_func: parseFunc(state); return true;
        case t_start: parseStart(state); return true;
        default:
            parseErrorf(state,state.nextToken,"unrecognized definition in module");
            throw RecoverParseException();
        };
    });
}
//只列举其中一个Start
static void parseStart(ModuleParseState& state)
{
    require(state,t_start);

    Reference functionRef;
    if(!tryParseNameOrIndexRef(state,functionRef))
    {
        parseErrorf(state,state.nextToken,"expected function name or index");
    }

    state.postDeclarationCallbacks.push_back([functionRef](ModuleParseState& state)
    {
        state.module.startFunctionIndex = resolveRef(state,state.functionNameToIndexMap,state.module.functions.size(),functionRef);
    });
}
//最后写IR
void setDisassemblyNames(Module& module,const DisassemblyNames& names)
{
  // Replace an existing name section if one is present, or create a new section.
  Uptr userSectionIndex = 0;
  if(!findUserSection(module,"name",userSectionIndex))
  {
    userSectionIndex = module.userSections.size();
    module.userSections.push_back({"name",{}});
  }

  ArrayOutputStream stream;

  Uptr numFunctionNames = names.functions.size();
  serializeVarUInt32(stream,numFunctionNames);

  for(Uptr functionIndex = 0;functionIndex < names.functions.size();++functionIndex)
  {
    std::string functionName = names.functions[functionIndex].name;
    serialize(stream,functionName);

    Uptr numLocalNames = names.functions[functionIndex].locals.size();
    serializeVarUInt32(stream,numLocalNames);
    for(Uptr localIndex = 0;localIndex < numLocalNames;++localIndex)
    {
      std::string localName = names.functions[functionIndex].locals[localIndex];
      serialize(stream,localName);
    }
  }

  module.userSections[userSectionIndex].data = stream.getBytes();
}



这里分析的比较浅,并没有深入到内部去分析,其实到内部后就是真正的词法主义啥的分析了,有兴趣可以去LLVM的官网或者EMCC的官网去看相关的资料。

2、abi文件的生成



abi文件是一个JSON文件,主要是解释如何将用户动作在JSON和二进制表达之间转换。ABI还解释了如何将数据库状态转换为JSON或从JSON转换数据库状态。通过ABI描述了智能合约,开发人员和用户就可以通过JSON无缝地与相关的合约进行交互。下面是从EOS的WIKI上找的ABI的文件:

{
  "____comment": "This file was generated by eosio-abigen. DO NOT EDIT - 2018-05-07T21:16:48",
  "types": [],
  "structs": [{
      "name": "hi",
      "base": "",
      "fields": [{
          "name": "user",
          "type": "account_name"
        }
      ]  
    }
  ],
  "actions": [{
      "name": "hi",
      "type": "hi",
      "ricardian_contract": ""
    }
  ],
  "tables": [],
  "ricardian_clauses": []
}



在eosiocpp.in中可以看到下面的代码:

function generate_abi {

    if [[ ! -e "$1" ]]; then
        echo "You must specify a file"
        exit 1
    fi

    context_folder=$(cd "$(dirname "$1")" ; pwd -P)

    ${ABIGEN} -extra-arg=-c -extra-arg=--std=c++14 -extra-arg=--target=wasm32 \
        -extra-arg=-nostdinc -extra-arg=-nostdinc++ -extra-arg=-DABIGEN \
        -extra-arg=-I${EOSIO_INSTALL_DIR}/include/libc++/upstream/include \
        -extra-arg=-I${EOSIO_INSTALL_DIR}/include/musl/upstream/include \
        -extra-arg=-I${BOOST_INCLUDE_DIR} \
        -extra-arg=-I${EOSIO_INSTALL_DIR}/include -extra-arg=-I$context_folder \
        -extra-arg=-fparse-all-comments -destination-file=${outname} -verbose=0 \
        -context=$context_folder $1 --

    if [ "$?" -ne 0 ]; then
        exit 1
    fi    

    echo "Generated ${outname} ..."
}



abi文件的生成的main程序在programs/eosio-abigen下,主要内容如下:

using mvo = fc::mutable_variant_object;
//FrontendActionFactory 是用来产生FrontendAction的一个抽象接口,而FrontendAction又是一个Clang中的抽象的前台动作基类
std::unique_ptr<FrontendActionFactory> create_factory(bool verbose, bool opt_sfs, string abi_context, abi_def& output, const string& contract, const vector<string>& actions) {

  struct abi_frontend_action_factory : public FrontendActionFactory {

    bool                   verbose;
    bool                   opt_sfs;
    string                 abi_context;
    abi_def&               output;
    const string&          contract;
    const vector<string>&  actions;

    abi_frontend_action_factory(bool verbose, bool opt_sfs, string abi_context,
      abi_def& output, const string& contract, const vector<string>& actions) : verbose(verbose),
      abi_context(abi_context), output(output), contract(contract), actions(actions) {}

    clang::FrontendAction \*create() override {
      //创建一个generate_abi_action对象,这个对象是生成ABI的重要部分
      return new generate_abi_action(verbose, opt_sfs, abi_context, output, contract, actions);
    }

  };

  return std::unique_ptr<FrontendActionFactory>(
      new abi_frontend_action_factory(verbose, opt_sfs, abi_context, output, contract, actions)
  );
}
//这个函数用来处理接口宏
std::unique_ptr<FrontendActionFactory> create_find_macro_factory(string& contract, vector<string>& actions, string abi_context) {

  struct abi_frontend_macro_action_factory : public FrontendActionFactory {

    string&          contract;
    vector<string>&  actions;
    string           abi_context;

    abi_frontend_macro_action_factory (string& contract, vector<string>& actions,
      string abi_context ) : contract(contract), actions(actions), abi_context(abi_context) {}

    clang::FrontendAction \*create() override {
      return new find_eosio_abi_macro_action(contract, actions, abi_context);
    }

  };

  return std::unique_ptr<FrontendActionFactory>(
    new abi_frontend_macro_action_factory(contract, actions, abi_context)
  );
}
//LLVM选项处理类
static cl::OptionCategory abi_generator_category("ABI generator options");

 //扩展命令行选项,类似于增加了对选项的各种操作,如连接等
static cl::opt<std::string> abi_context(
    "context",
    cl::desc("ABI context"),
    cl::cat(abi_generator_category));

static cl::opt<std::string> abi_destination(
    "destination-file",
    cl::desc("destination json file"),
    cl::cat(abi_generator_category));

static cl::opt<bool> abi_verbose(
    "verbose",
    cl::desc("show debug info"),
    cl::cat(abi_generator_category));

static cl::opt<bool> abi_opt_sfs(
    "optimize-sfs",
    cl::desc("Optimize single field struct"),
    cl::cat(abi_generator_category));

int main(int argc, const char **argv) { abi_def output; try {
   CommonOptionsParser op(argc, argv, abi_generator_category);
   ClangTool Tool(op.getCompilations(), op.getSourcePathList());

   string contract;
   vector<string> actions;
   int result = Tool.run(create_find_macro_factory(contract, actions, abi_context).get());
   if(!result) {
      result = Tool.run(create_factory(abi_verbose, abi_opt_sfs, abi_context, output, contract, actions).get());
      if(!result) {
         abi_serializer(output).validate();
         fc::variant vabi;
         to_variant(output, vabi);

         auto comment = fc::format_string(
           "This file was generated by eosio-abigen. DO NOT EDIT - ${ts}",
           mvo("ts",fc::time_point_sec(fc::time_point::now()).to_iso_string()));

        //处理一声明内容,看一下ABI的格式就明白了
         auto abi_with_comment = mvo("____comment", comment)(mvo(vabi));
         fc::json::save_to_file(abi_with_comment, abi_destination, true);
      }
   }
   return result;
} FC_CAPTURE_AND_LOG((output)); return -1; }



从上面的Main函数可以看,先要查找相关的ABI宏,再根据这个宏,用工厂类创建ABI的创建对象。当然,在前面要使用CLANG的一些分析工具对象。find_eosio_abi_macro_action这个宏主要是对整个智能合约的宏进行解析:

struct find_eosio_abi_macro_action : public PreprocessOnlyAction {

      string& contract;
      vector<string>& actions;
      const string& abi_context;

      find_eosio_abi_macro_action(string& contract, vector<string>& actions, const string& abi_context
         ): contract(contract),
         actions(actions), abi_context(abi_context) {
      }

      struct callback_handler : public PPCallbacks {

         CompilerInstance& compiler_instance;
         find_eosio_abi_macro_action& act;

         callback_handler(CompilerInstance& compiler_instance, find_eosio_abi_macro_action& act)
         : compiler_instance(compiler_instance), act(act) {}

         void MacroExpands (const Token &token, const MacroDefinition &md, SourceRange range, const MacroArgs *args) override {

            auto* id = token.getIdentifierInfo();
            if( id == nullptr ) return;
            if( id->getName() != "EOSIO_ABI" ) return;//看到这个宏没有,这是智能合约里动态创建的标志

            const auto& sm = compiler_instance.getSourceManager();
            auto file_name = sm.getFilename(range.getBegin());
            if ( !act.abi_context.empty() && !file_name.startswith(act.abi_context) ) {
               return;
            }

            ABI_ASSERT( md.getMacroInfo()->getNumArgs() == 2 );

            clang::SourceLocation b(range.getBegin()), _e(range.getEnd());
            clang::SourceLocation e(clang::Lexer::getLocForEndOfToken(\_e, 0, sm, compiler_instance.getLangOpts()));
            auto macrostr = string(sm.getCharacterData(b), sm.getCharacterData(e)-sm.getCharacterData(b));

            //正则匹配,编译器的标配
            //regex r(R"(EOSIO_ABI\s*\(\s*(.+?)\s*,((?:.+?)*)\s*\))");//注释掉是因为格式的问题 fjf 6.7
            smatch smatch;
            auto res = regex_search(macrostr, smatch, r);
            ABI_ASSERT( res );

            act.contract = smatch[1].str();

            auto actions_str = smatch[2].str();
            boost::trim(actions_str);
            actions_str = actions_str.substr(1);
            actions_str.pop_back();
            boost::remove_erase_if(actions_str, boost::is_any_of(" ("));

            boost::split(act.actions, actions_str, boost::is_any_of(")"));
         }
      };

      void ExecuteAction() override {
         getCompilerInstance().getPreprocessor().addPPCallbacks(
            llvm::make_unique<callback_handler>(getCompilerInstance(), *this)
         );
         PreprocessOnlyAction::ExecuteAction();
      };

};



这些个完成后,在Main函数中进行abi_serializer,最后保存到文件,ABI就这个产生了。当然,这背后的细节LLVM和CLAN做了好多,感兴趣的可以多在其官网上看看,最近看虚拟机和JAVA的对比,再和c++编译器编译对比,收益还是颇大。


最后看一下这个类: class generate_abi_action : public ASTFrontendAction{......},这个类在前边的工厂里进行了创建,但是其中有一个主要的函数

std::unique_ptr<ASTConsumer> CreateASTConsumer(CompilerInstance& compiler_instance,
                                               llvm::StringRef) override {
   return llvm::make_unique<abi_generator_astconsumer>(compiler_instance, abi_gen);
}



这个函数是内部调用的,因为,它是protected的类型。在Compile之前,创建ASTConsumer。在建立AST(抽象语法树)的过程中,ASTConsumer提供了众多的Hooks。被FrontendAction的公共接口BeginSourceFile调用。


这里最终会调用abi_generator对象,其中void abi_generator::handle_decl(const Decl* decl)这个函数,用来处理具体的细节。











三、执行过程



加载到虚拟机的过程其实就是JIT做的事儿了,有兴趣可以分析一下wast-jit这个文件下的部分,特别是Runtime内部的一些代码,这里主要分析一下加载过程,在programs/cleos中的主函数中:

int main(int argc,char**argv)
{
  ......
  // set subcommand
    auto setSubcommand = app.add_subcommand("set", localized("Set or update blockchain state"));
    setSubcommand->require_subcommand();

    // set contract subcommand
    string account;
    string contractPath;
    string wastPath;
    string abiPath;
    bool shouldSend = true;
    auto codeSubcommand = setSubcommand->add_subcommand("code", localized("Create or update the code on an account"));
    codeSubcommand->add_option("account", account, localized("The account to set code for"))->required();
    codeSubcommand->add_option("code-file", wastPath, localized("The fullpath containing the contract WAST or WASM"))->required();

    auto abiSubcommand = setSubcommand->add_subcommand("abi", localized("Create or update the abi on an account"));
    abiSubcommand->add_option("account", account, localized("The account to set the ABI for"))->required();
    abiSubcommand->add_option("abi-file", abiPath, localized("The fullpath containing the contract WAST or WASM"))->required();

    auto contractSubcommand = setSubcommand->add_subcommand("contract", localized("Create or update the contract on an account"));
    contractSubcommand->add_option("account", account, localized("The account to publish a contract for"))
                      ->required();
    contractSubcommand->add_option("contract-dir", contractPath, localized("The path containing the .wast and .abi"))
                      ->required();
    contractSubcommand->add_option("wast-file", wastPath, localized("The file containing the contract WAST or WASM relative to contract-dir"));
 //                     ->check(CLI::ExistingFile);
    auto abi = contractSubcommand->add_option("abi-file,-a,--abi", abiPath, localized("The ABI for the contract relative to contract-dir"));
 //                                ->check(CLI::ExistingFile);

    //处理智能合约
    std::vector<chain::action> actions;
    auto set_code_callback = [&]() {
       std::string wast;
       fc::path cpath(contractPath);

       if( cpath.filename().generic_string() == "." ) cpath = cpath.parent_path();

       if( wastPath.empty() )
       {
          wastPath = (cpath / (cpath.filename().generic_string()+".wasm")).generic_string();
          if (!fc::exists(wastPath))
             wastPath = (cpath / (cpath.filename().generic_string()+".wast")).generic_string();
       }

       std::cout << localized(("Reading WAST/WASM from " + wastPath + "...").c_str()) << std::endl;
       fc::read_file_contents(wastPath, wast);
       FC_ASSERT( !wast.empty(), "no wast file found ${f}", ("f", wastPath) );
       vector<uint8_t> wasm;
       const string binary_wasm_header("\x00\x61\x73\x6d", 4);
       if(wast.compare(0, 4, binary_wasm_header) == 0) {
          std::cout << localized("Using already assembled WASM...") << std::endl;
          wasm = vector<uint8_t>(wast.begin(), wast.end());
       }
       else {
          std::cout << localized("Assembling WASM...") << std::endl;
          wasm = wast_to_wasm(wast);//处理可见文件与二进制的执行形式
       }

       actions.emplace_back( create_setcode(account, bytes(wasm.begin(), wasm.end()) ) );
       if ( shouldSend ) {
          std::cout << localized("Setting Code...") << std::endl;
          send_actions(std::move(actions), 10000, packed_transaction::zlib);
       }
    };

    //处理ABI的加载
    auto set_abi_callback = [&]() {
       fc::path cpath(contractPath);
       if( cpath.filename().generic_string() == "." ) cpath = cpath.parent_path();

       if( abiPath.empty() )
       {
          abiPath = (cpath / (cpath.filename().generic_string()+".abi")).generic_string();
       }

       FC_ASSERT( fc::exists( abiPath ), "no abi file found ${f}", ("f", abiPath)  );

       try {
          actions.emplace_back( create_setabi(account, fc::json::from_file(abiPath).as<abi_def>()) );
       } EOS_RETHROW_EXCEPTIONS(abi_type_exception,  "Fail to parse ABI JSON")
       if ( shouldSend ) {
          std::cout << localized("Setting ABI...") << std::endl;
          send_actions(std::move(actions), 10000, packed_transaction::zlib);
       }
    };

    add_standard_transaction_options(contractSubcommand, "account@active");
    add_standard_transaction_options(codeSubcommand, "account@active");
    add_standard_transaction_options(abiSubcommand, "account@active");
    contractSubcommand->set_callback([&] {
       shouldSend = false;
       set_code_callback();
       set_abi_callback();
       std::cout << localized("Publishing contract...") << std::endl;
       send_actions(std::move(actions), 10000, packed_transaction::zlib);
    });
    codeSubcommand->set_callback(set_code_callback);
    abiSubcommand->set_callback(set_abi_callback);
  ......
}



这里只分析一下wast->wasm的转换:

std::vector<uint8_t> wast_to_wasm( const std::string& wast )
{
   std::stringstream ss;

   try {
   IR::Module module; //中间语言
   std::vector<WAST::Error> parse_errors;
   //这里用到了jit的对象
   WAST::parseModule(wast.c_str(),wast.size(),module,parse_errors);//以Module为单元分析文件中的数据
......
   //按照LLVM的编译要求处理节
   for(auto sectionIt = module.userSections.begin();sectionIt != module.userSections.end();++sectionIt)
   {
      if(sectionIt->name == "name") { module.userSections.erase(sectionIt); break; }
   }

   try
   {
      // Serialize the WebAssembly module.串行化,其实就是二进制化
      Serialization::ArrayOutputStream stream;
      WASM::serialize(stream,module);
      return stream.getBytes();
   }
   catch(const Serialization::FatalSerializationException& exception)
   {
      ss << "Error serializing WebAssembly binary file:" << std::endl;
      ss << exception.message << std::endl;
      FC_ASSERT( !"error converting to wasm", "${msg}", ("msg",ss.get()) );
   } catch(const IR::ValidationException& e) {
      ss << "Error validating WebAssembly binary file:" << std::endl;
      ss << e.message << std::endl;
      FC_ASSERT( !"error converting to wasm", "${msg}", ("msg",ss.get()) );
   }

} FC_CAPTURE_AND_RETHROW( (wast) ) }  /// wast_to_wasm
//其下两个是分别处理不同类型的文件来源
std::string     wasm_to_wast( const std::vector<uint8_t>& wasm ) {
   return wasm_to_wast( wasm.data(), wasm.size() );
} /// wasm_to_wast

std::string     wasm_to_wast( const uint8_t* data, uint64_t size )
{ try {
    IR::Module module;
    Serialization::MemoryInputStream stream((const U8*)data,size);
    WASM::serialize(stream,module);
     // Print the module to WAST.
    return WAST::print(module);
} FC_CAPTURE_AND_RETHROW() }



基本到现在就把虚拟机简要的分析了一下,其中有好多关于CLANG,LLVM和Webassembly的知识,需要在看这篇文章前了解一下。

转载自:https://github.com/XChainLab/documentation/edit/master/eos/eos%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E4%BA%94%E8%99%9A%E6%8B%9F%E6%9C%BA.md

EOS源码分析之四智能合约

eos源码分析之四智能合约

智能合约和虚拟机部分会混合在一起讲,然后在各自的范围内偏向于哪个部分。

一、一个简单智能合约



智能合约的编译使用WASM来编译,也使用了一些自定义的代码用来固定智能合约的格式和入口等。智能合约产生二进制后会放到虚拟机中执行。首先看一个入门的智能合约,helloworld.

hello.cpp:

#include<eosiolib/eosio.hpp>
#include<eosiolib/print.hpp>
usingnamespace eosio;
class hello :public eosio::contract
{
  public:using contract::contract;
  /// @abi action
  void helloworld( account_name user )
  {
    print( "Hello, ", name{user} );
  }
};
EOSIO_ABI( hello, (hi) )



在EOS的源码中最EOSIO_ABI被定义成:

#define EOSIO_ABI( TYPE, MEMBERS ) \
extern "C" { \
   void apply( uint64_t receiver, uint64_t code, uint64_t action ) { \
      auto self = receiver; \
      if( code == self ) { \
         TYPE thiscontract( self ); \  //注意这个变量,后面会引用
         switch( action ) { \
            EOSIO_API( TYPE, MEMBERS ) \
         } \
         eosio_exit(0); \
      } \
   } \
} \



这时候再对照一下EOS自带的一个空的智能合约的例子:

//noop.hpp
#pragma once

#include <eosiolib/eosio.hpp>
#include <eosiolib/dispatcher.hpp>

namespace noop {
   using std::string;
   /**
      noop contract
      All it does is require sender authorization.
      Actions: anyaction*/
   class noop {
      public:

         ACTION(N(noop), anyaction) {
            anyaction() { }
            anyaction(account_name f, const string& t, const string& d): from(f), type(t), data(d) { }

            account_name from;
            string type;
            string data;

            EOSLIB_SERIALIZE(anyaction, (from)(type)(data))
         };

         static void on(const anyaction& act)
         {
            require_auth(act.from);
         }
   };
} /// noop

//noop.cpp
#include <noop/noop.hpp>

namespace noop {
   extern "C" {
      /// The apply method implements the dispatch of events to this contract
      void apply( uint64_t receiver, uint64_t code, uint64_t action ) {
         eosio::dispatch<noop, noop::anyaction>(code, action);
      }
   }
}



通过二者的对比可以发现,其实宏EOSIO_ABI自动完成了对action的映射分发。而EOS自带的则手动实现了静态分发,结果是一样的。它们的核心其实都是apply这个函数,如果有std::bind的使用经验,发现他们还是有些类似的。


继续接着分析EOSIO_ABI的内部代码,里面调用了一个宏:

#define EOSIO_API_CALL( r, OP, elem ) \
   case ::eosio::string_to_name( BOOST_PP_STRINGIZE(elem) ): \
      eosio::execute_action( &thiscontract, &OP::elem ); \
      return;

#define EOSIO_API( TYPE,  MEMBERS ) \
   BOOST_PP_SEQ_FOR_EACH( EOSIO_API_CALL, TYPE, MEMBERS )



BOOST_PP_SEQ_FOR_EACH这个宏前面讲过,是按最后一个参数展开第一个宏。再看一执行的代码:

template<typename T, typename Q, typename... Args>
bool execute_action( T* obj, void (Q::*func)(Args...)  ) {
   size_t size = action_data_size();

   //using malloc/free here potentially is not exception-safe, although WASM doesn't support exceptions
   constexpr size_t max_stack_buffer_size = 512;
   void* buffer = max_stack_buffer_size < size ? malloc(size) : alloca(size);
   read_action_data( buffer, size );

   auto args = unpack<std::tuple<std::decay_t<Args>...>>( (char*)buffer, size );

   if ( max_stack_buffer_size < size ) {
      free(buffer);
   }

   auto f2 = [&]( auto... a ){  
      (obj->\*func)( a... ); //调用指定类对象的指定的函数,如果对照前面就是hello对象的helloworld
   };

   boost::mp11::tuple_apply( f2, args );//惰性求值
   return true;
}



在bancor、currency的目录下,主要是货币转换相关的部分,dice是一个掷骰子的游戏的合约。eosio.msig,eosio.token,eosio.bios 都是相关的智能合约的程序,可认为是EOS自带的智能合约或者说自带的软件。

二、智能合约


1、智能合约的内容

看完了上面的代码分析,回到智能合约本身来。智能合约是什么?有几部分?怎么执行?


EOS智能合约通过messages 及 共享内存数据库(比如只要一个合约被包含在transaction的读取域中with an async vibe,它就可以读取另一个合约的数据库)相互通信。异步通信导致的spam问题将由资源限制算法来解决。下面是两个在合约里可定义的通信模型:


1、Inline:Inline保证执行当前的transaction或unwind;无论成功或失败都不会有通知。Inline 操作的scopes和authorities和原来的transaction一样。


2、Deferred: Defer将稍后由区块生产者来安排;结果可能是传递通信结果或者只是超时。Deferred可以触及不同的scopes,可以携带发送它的合约的authority*此特性在STAT不可用


message 和Transaction的关系:


一个message代表一个操作,一个Transaction中可以包含一个或者多个message,合约和帐户通过其来通信。Message既可以单独发送也可以批量发送。

//单MESSAGE的Transaction
{
  "ref_block_num": "100",
  "ref_block_prefix": "137469861",
  "expiration": "2017-09-25T06:28:49",
  "scope": ["initb","initc"],
  "messages": [
  {
    "code": "eos",
    "type": "transfer",
    "authorization": [
    {
      "account": "initb",
      "permission": "active"
      }
      ],
      "data": "000000000041934b000000008041934be803000000000000" }
      ],
  "signatures": [],
  "authorizations": []
}

//多Message的Transaction
{
  "ref_block_num": "100",
  "ref_block_prefix": "137469861",
  "expiration": "2017-09-25T06:28:49",
  "scope": [...],
  "messages":
  [
  {
    "code": "...",
    "type": "...",
    "authorization": [...],
  "data": "..."
  },
  {
    "code": "...",
    "type": "...",
  "authorization": [...],
  "data": "..."
  }, ...
  ],
  "signatures": [],
  "authorizations": []
}


2、Message名的限定和技术限制

Message的类型实际上是base32编码的64位整数。所以Message名的前12个字符需限制在字母a-z, 1-5, 以及'.' 。第13个以后的字符限制在前16个字符('.' and a-p)。


另外需要注意的是,在合约中不得存在浮点数,所有的Transaction必须在1ms内执行完成,否则失败。从目前来看每个帐户每秒最多发出30个Transactions。

3、智能合约的模块



在前面的例程里可以看到在智能合约中有apply这个函数,也知道这个函数是非常重要的,其实还有别的几个函数也挺重要:

init

init仅在被初次部署的时候执行一次。它是用于初始化合约变量的,例如货币合约中提供token的数量。

apply

apply是message处理器,它监听所有输入的messages并根据函数中的规定进行反馈。apply函数需要两个输入参数,code和 action。

code filter

为了响应特定message,您可以如下构建您的apply函数。您也可以忽略code filter来构建一个响应通用messages的函数。

if (code == N(${contract_name}) {
    //响应特定message的处理器
}



在其中您可以定义对不同actions的响应。

action filter

为了相应特定action,您可以如下构建您的apply函数。常和code filter一起使用。

if (action == N(${action_name}) {
    //响应该action的处理器
}


三、智能合约的编译



EOS的智能合约必须使用EOSCPP这个命令来编译,任何需要布置在EOS上的智能合约必须编译成wasm(.wast)文件,并且有一个abi的文件。wasm-jit提供了这个编译的过程,在虚拟机的部分详细的介绍一下编译和执行的过程。

四、智能合约的执行

在加载自定义的智能合约前,一般会加在上面提到的三个智能合约,用来测权限和相关的配置。这里看一看最基础的BIOS这个智能合约:


$ cleos set contract hello hello.wast hello.abi



$ cleos push action hello helloworld '["fred" ]' -p hello

然后就可以在本地的nodeos节点的日志中查阅到上面的信息。

五、智能合约的调试

参考EOS的github上的wiki的智能合约部分,其实上面有相当一部分就是从上面摘抄下来的。





转载自:https://github.com/XChainLab/documentation/edit/master/eos/eos%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E5%9B%9B%E6%99%BA%E8%83%BD%E5%90%88%E7%BA%A6.md

EOS源码分析之三交易

eos源码分析之三交易

一、交易的介绍


说明:最新的1.0及以上代码中已经移除了相关的cycle和shared部分



基本上区块链都是这几板斧,又轮到交易了。一般交易都是最复杂的部分,因为它涉及到网络,帐户,数据库,共识,内存池等等好多部分。EOS的交易数据结构主要有两种:

1、signed_transaction:用户发起的交易


2、deferred_transaction:延期交易,注释的说明这种交易可以返回错误给当事人。


EOS为了应对海量的交易,引入分片(shard)技术,即在区块中维护了一条私有区块链,将一个block分割成多个cycle(循环),每个cycle的生成时间很短,而且不用等待完整的block确认完成(3秒),生成后直接异步广播发送,这样,交易在很快就被确认了。在一个cycle中,如果存在有大量互不想干的交易,但么多核多线程技术将极大的提高交易的处理速度 。
这次仍然从客户端发起一笔交易开始来查看整个代码的流程,基本上是cleos发出Transaction到nodeos的HTTP接口,接口接收到消息,分析打包。

二、交易的具体过程

1、客户端发起交易:


无论是send_actions 还是 send_transaction最终都落到push_transaction

fc::variant push_transaction( signed_transaction& trx, int32_t extra_kcpu = 1000, packed_transaction::compression_type compression = packed_transaction::none ) {
   auto info = get_info();
   trx.expiration = info.head_block_time + tx_expiration;
   trx.set_reference_block(info.head_block_id);

   if (tx_force_unique) {
      trx.context_free_actions.emplace_back( generate_nonce() );
   }

   auto required_keys = determine_required_keys(trx);
   size_t num_keys = required_keys.is_array() ? required_keys.get_array().size() : 1;

   trx.max_kcpu_usage = (tx_max_cpu_usage + 1023)/1024;
   trx.max_net_usage_words = (tx_max_net_usage + 7)/8;

   if (!tx_skip_sign) {
     //生成交易
      sign_transaction(trx, required_keys);
   }

   if (!tx_dont_broadcast) {
     //调用HTTP接口,packed_transaction打包交易
      return call(push_txn_func, packed_transaction(trx, compression));
   } else {
      return fc::variant(trx);
   }
}
void sign_transaction(signed_transaction& trx, fc::variant& required_keys) {
   // TODO determine chain id
   fc::variants sign_args = {fc::variant(trx), required_keys, fc::variant(chain_id_type{})};
   const auto& signed_trx = call(wallet_host, wallet_port, wallet_sign_trx, sign_args);
   trx = signed_trx.as<signed_transaction>();
}



看到调用call,那么进去:

template<typename T>
fc::variant call( const std::string& server, uint16_t port,
                  const std::string& path,
                  const T& v ) { return eosio::client::http::call( server, port, path, fc::variant(v) ); }

template<typename T>
fc::variant call( const std::string& path,
                  const T& v ) { return eosio::client::http::call( host, port, path, fc::variant(v) ); }
//最终调用
fc::variant call( const std::string& server, uint16_t port,
                  const std::string& path,
                  const fc::variant& postdata ) {
try {
......

   while( endpoint_iterator != end ) {
      // Try each endpoint until we successfully establish a connection.
      tcp::socket socket(io_service);
      try {
         boost::asio::connect(socket, endpoint_iterator);
         endpoint_iterator = end;
      }
      ......

      // Form the request. We specify the "Connection: close" header so that the
      // server will close the socket after transmitting the response. This will
      // allow us to treat all data up until the EOF as the content.
      //组建请求的流
      boost::asio::streambuf request;
      std::ostream request_stream(&request);
      request_stream << "POST " << path << " HTTP/1.0\r\n";
      request_stream << "Host: " << server << "\r\n";
      request_stream << "content-length: " << postjson.size() << "\r\n";
      request_stream << "Accept: */*\r\n";
      request_stream << "Connection: close\r\n\r\n";
      request_stream << postjson;

      // Send the request.发送组织好的Request
      boost::asio::write(socket, request);

      // Read the response status line. The response streambuf will automatically
      // grow to accommodate the entire line. The growth may be limited by passing
      // a maximum size to the streambuf constructor.
      //读取并处理Response
      boost::asio::streambuf response;
      boost::asio::read_until(socket, response, "\r\n");

      // Check that response is OK.判断格式
      std::istream response_stream(&response);
      std::string http_version;
      response_stream >> http_version;
      unsigned int status_code;
      response_stream >> status_code;
      std::string status_message;
      std::getline(response_stream, status_message);
      FC_ASSERT( !(!response_stream || http_version.substr(0, 5) != "HTTP/"), "Invalid Response" );

      // Read the response headers, which are terminated by a blank line.
      boost::asio::read_until(socket, response, "\r\n\r\n");

      // Process the response headers.读取头
      std::string header;
      while (std::getline(response_stream, header) && header != "\r")
      {
      //      std::cout << header << "\n";
      }
      //      std::cout << "\n";

      std::stringstream re;
      // Write whatever content we already have to output.
      if (response.size() > 0)
         //   std::cout << &response;
         re << &response;

      // Read until EOF, writing data to output as we go.读取数据
      boost::system::error_code error;
      while (boost::asio::read(socket, response,
                               boost::asio::transfer_at_least(1), error))
         re << &response;

......
   }

   FC_ASSERT( !"unable to connect" );
} FC_CAPTURE_AND_RETHROW()  // error, "Request Path: ${server}:${port}${path}\nRequest Post Data: ${postdata}" ,
                        // ("server", server)("port", port)("path", path)("postdata", postdata) )
}



在HTTP的名空间里定义了一大堆的API接口,这里用的是:


const string chain_func_base = "/v1/chain";


const string push_txns_func = chain_func_base + "/push_transactions";(httpc.hpp中)


这里基本就相当于调用HTTP的接口了,也就是说cleos把相关的API调用及内容发到了服务端,也就是nodeos端。那就去服务端看看nodeos接收到请求后干了些什么,有什么具体的动作。上面使用的路径是chain, 所以到chain_api_plugin.cpp中去看。


在前面的整体NODEOS启动时,会有插件启动这个函数执行:

//最终调用下面这个宏,这个宏展开稍微有一些复杂,其实就是名字替换和参数转换   类似struct get_account_params {name account_name;};
#define CALL(api_name, api_handle, api_namespace, call_name, http_response_code) \
{std::string("/v1/" #api_name "/" #call_name), \
   [this, api_handle](string, string body, url_response_callback cb) mutable { \
          try { \
             if (body.empty()) body = "{}"; \
             auto result = api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## \_params>()); \
             cb(http_response_code, fc::json::to_string(result)); \
          } \
          ......//去除异常
       }}
//用下面两个宏来展开增加的API
#define CHAIN_RO_CALL(call_name, http_response_code) CALL(chain, ro_api, chain_apis::read_only, call_name, http_response_code)
#define CHAIN_RW_CALL(call_name, http_response_code) CALL(chain, rw_api, chain_apis::read_write, call_name, http_response_code)
void chain_api_plugin::plugin_startup() {
   ilog( "starting chain_api_plugin" );
   my.reset(new chain_api_plugin_impl(app().get_plugin<chain_plugin>().chain()));
   auto ro_api = app().get_plugin<chain_plugin>().get_read_only_api();
   auto rw_api = app().get_plugin<chain_plugin>().get_read_write_api();
//注册相关API
   app().get_plugin<http_plugin>().add_api({
      CHAIN_RO_CALL(get_info, 200),
      CHAIN_RO_CALL(get_block, 200),
      CHAIN_RO_CALL(get_account, 200),
      CHAIN_RO_CALL(get_code, 200),
      CHAIN_RO_CALL(get_table_rows, 200),
      CHAIN_RO_CALL(get_currency_balance, 200),
      CHAIN_RO_CALL(get_currency_stats, 200),
      CHAIN_RO_CALL(abi_json_to_bin, 200),
      CHAIN_RO_CALL(abi_bin_to_json, 200),
      CHAIN_RO_CALL(get_required_keys, 200),
      CHAIN_RW_CALL(push_block, 202),
      CHAIN_RW_CALL(push_transaction, 202),
      CHAIN_RW_CALL(push_transactions, 202)
   });
}



上面的api_handle.call_name展开后是:read_write::push_transaction_results read_write::push_transaction(),在chain_plugin.cpp文件中。

read_write::push_transaction_results read_write::push_transaction(const read_write::push_transaction_params& params) {
   packed_transaction pretty_input;
   auto resolver = make_resolver(this);
   try {
      abi_serializer::from_variant(params, pretty_input, resolver);
   } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")

   auto result = db.push_transaction(pretty_input, skip_flags);//这行是根本,db定义为chain_controller
#warning TODO: get transaction results asynchronously
   fc::variant pretty_output;
   abi_serializer::to_variant(result, pretty_output, resolver);
   return read_write::push_transaction_results{ result.id, pretty_output };
}



因为这个函数其实是调用的chain_controller的同名函数:

/**
 * Attempts to push the transaction into the pending queue
 *
 * When called to push a locally generated transaction, set the skip_block_size_check bit on the skip argument. This
 * will allow the transaction to be pushed even if it causes the pending block size to exceed the maximum block size.
 * Although the transaction will probably not propagate further now, as the peers are likely to have their pending
 * queues full as well, it will be kept in the queue to be propagated later when a new block flushes out the pending
 * queues.
 */
transaction_trace chain_controller::push_transaction(const packed_transaction& trx, uint32_t skip)
{ try {
   // If this is the first transaction pushed after applying a block, start a new undo session.
   // This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
   if( !\_pending_block ) {
      _start_pending_block();
   }

   return with_skip_flags(skip, [&]() {
      return \_db.with_write_lock([&]() {
         return _push_transaction(trx);
      });
   });
} EOS_CAPTURE_AND_RETHROW( transaction_exception ) }



看注释说得很清楚了,


如果是交易写入块后的第一个交易,是启动一个可撤销的Session,保证在新块到来时可以进行回滚:

void chain_controller::_start_pending_block( bool skip_deferred )
{
  //配置等待块
   FC_ASSERT( !\_pending_block );
   \_pending_block         = signed_block();
   \_pending_block_trace   = block_trace(\*\_pending_block);
   \_pending_block_session = \_db.start_undo_session(true);
  \_pending_block->regions.resize(1);
   \_pending_block_trace->region_traces.resize(1);

   _start_pending_cycle();//处理里块的cycle
   _apply_on_block_transaction();
   _finalize_pending_cycle();

   _start_pending_cycle();

   if ( !skip_deferred ) {
      _push_deferred_transactions( false );
      if (\_pending_cycle_trace && \_pending_cycle_trace->shard_traces.size() > 0 && \_pending_cycle_trace->shard_traces.back().transaction_traces.size() > 0) {
         _finalize_pending_cycle();
         _start_pending_cycle();
      }
   }
}
//下面的两个函数比较关键,一个是处理cycle,一个是添加交易,下面的英文注释说得也比较清楚
/**
 *  Wraps up all work for current shards, starts a new cycle, and
 *  executes any pending transactions
 */
void chain_controller::_start_pending_cycle() {
   // only add a new cycle if there are no cycles or if the previous cycle isn't empty
   if (\_pending_block->regions.back().cycles_summary.empty() ||
       (!\_pending_block->regions.back().cycles_summary.back().empty() &&
        !\_pending_block->regions.back().cycles_summary.back().back().empty()))
      \_pending_block->regions.back().cycles_summary.resize( \_pending_block->regions[0].cycles_summary.size() + 1 );


   \_pending_cycle_trace = cycle_trace();

   \_pending_cycle_trace->shard_traces.resize(\_pending_cycle_trace->shard_traces.size() + 1 );//当前分片

   auto& bcycle = \_pending_block->regions.back().cycles_summary.back();
   if(bcycle.empty() || !bcycle.back().empty())
      bcycle.resize( bcycle.size()+1 );
}
void chain_controller::_apply_on_block_transaction()
{
   \_pending_block_trace->implicit_transactions.emplace_back(_get_on_block_transaction());
   transaction_metadata mtrx(packed_transaction(\_pending_block_trace->implicit_transactions.back()), get_chain_id(), head_block_time(), optional<time_point>(), true /*is implicit*/);
   _push_transaction(std::move(mtrx));
}
//再处理一下deferred
vector<transaction_trace> chain_controller::_push_deferred_transactions( bool flush )
{
   FC_ASSERT( \_pending_block, " block not started" );

   if (flush && \_pending_cycle_trace && \_pending_cycle_trace->shard_traces.size() > 0) {
      // TODO: when we go multithreaded this will need a better way to see if there are flushable
      // deferred transactions in the shards
      auto maybe_start_new_cycle = [&]() {
         for (const auto &st: \_pending_cycle_trace->shard_traces) {
            for (const auto &tr: st.transaction_traces) {
               for (const auto &req: tr.deferred_transaction_requests) {
                  if ( req.contains<deferred_transaction>() ) {
                     const auto& dt = req.get<deferred_transaction>();
                     if ( fc::time_point(dt.execute_after) <= head_block_time() ) {
                        // force a new cycle and break out
                        _finalize_pending_cycle();
                        _start_pending_cycle();
                        return;
                     }
                  }
               }
            }
         }
      };

      maybe_start_new_cycle();
   }
 }



这里得重点看看下面这个函数:

void chain_controller::_finalize_pending_cycle()
{
   // prune empty shard
   if (!\_pending_block->regions.back().cycles_summary.empty() &&
       !\_pending_block->regions.back().cycles_summary.back().empty() &&
       \_pending_block->regions.back().cycles_summary.back().back().empty()) {
      \_pending_block->regions.back().cycles_summary.back().resize( \_pending_block->regions.back().cycles_summary.back().size() - 1 );
      \_pending_cycle_trace->shard_traces.resize(\_pending_cycle_trace->shard_traces.size() - 1 );
   }
   // prune empty cycle
   if (!\_pending_block->regions.back().cycles_summary.empty() &&
       \_pending_block->regions.back().cycles_summary.back().empty()) {
      \_pending_block->regions.back().cycles_summary.resize( \_pending_block->regions.back().cycles_summary.size() - 1 );
      \_pending_cycle_trace.reset();
      return;
   }

   for( int idx = 0; idx < \_pending_cycle_trace->shard_traces.size(); idx++ ) {
      auto& trace = \_pending_cycle_trace->shard_traces.at(idx);
      auto& shard = \_pending_block->regions.back().cycles_summary.back().at(idx);

      trace.finalize_shard();
      shard.read_locks.reserve(trace.read_locks.size());
      shard.read_locks.insert(shard.read_locks.end(), trace.read_locks.begin(), trace.read_locks.end());

      shard.write_locks.reserve(trace.write_locks.size());
      shard.write_locks.insert(shard.write_locks.end(), trace.write_locks.begin(), trace.write_locks.end());
   }

   _apply_cycle_trace(*\_pending_cycle_trace);
   \_pending_block_trace->region_traces.back().cycle_traces.emplace_back(std::move(*\_pending_cycle_trace));
   \_pending_cycle_trace.reset();
}


这里遇到的问题是,没有找到Cycle的周期性增加,对块内的分片也因此不是非常清楚。


现在接着回到交易,看前面调用了_push_transaction, 它有两个重载,前面的重载会在函数内调用后面的重载函数,即:


transaction_trace chain_controller::_push_transaction(const packed_transaction& packed_trx)
{ try {
......

   // 根据情况来分别打包普通交易和延迟交易
   if( mtrx.delay.count() == 0 ) {
      result = _push_transaction( std::move(mtrx) );
   } else {

      result = wrap_transaction_processing( std::move(mtrx),
                                            [this](transaction_metadata& meta) { return delayed_transaction_processing(meta); } );
   }

   // notify anyone listening to pending transactions
   //这个最终会调用connections的enqueue-queue_write-do_queue_write,然后发送广播消息
   on_pending_transaction(\_pending_transaction_metas.back(), packed_trx);

   \_pending_block->input_transactions.emplace_back(packed_trx);//插入到区块中
......

} FC_CAPTURE_AND_RETHROW( (transaction_header(packed_trx.get_transaction())) ) }

transaction_trace chain_controller::_push_transaction( transaction_metadata&& data )
{ try {
   auto process_apply_transaction = [this](transaction_metadata& meta) {
......
      /// TODO: move \_pending_cycle into db so that it can be undone if transation fails, for now we will apply
      /// the transaction first so that there is nothing to undo... this only works because things are currently
      /// single threaded
      // set cycle, shard, region etc
      meta.region_id = 0;
      meta.cycle_index = cyclenum;
      meta.shard_index = 0;
      return _apply_transaction( meta );//交易打入块中
   };
 //  wdump((transaction_header(data.trx())));
   return wrap_transaction_processing( move(data), process_apply_transaction );
} FC_CAPTURE_AND_RETHROW( ) }



经过上面的处理之后,最后通过_apply_transaction把交易最终打入块中:

//写入并执行交易
transaction_trace chain_controller::__apply_transaction( transaction_metadata& meta )
{ try {
   transaction_trace result(meta.id);

   for (const auto &act : meta.trx().context_free_actions) {
      apply_context context(\*this, \_db, act, meta);
      context.context_free = true;
      context.exec();//执行
      fc::move_append(result.action_traces, std::move(context.results.applied_actions));
      FC_ASSERT( result.deferred_transaction_requests.size() == 0 );
   }

   for (const auto &act : meta.trx().actions) {
      apply_context context(\*this, \_db, act, meta);
      context.exec();
      context.results.applied_actions.back().auths_used = act.authorization.size() - context.unused_authorizations().size();
      fc::move_append(result.action_traces, std::move(context.results.applied_actions));
      fc::move_append(result.deferred_transaction_requests, std::move(context.results.deferred_transaction_requests));
   }

   update_resource_usage(result, meta);

   update_permission_usage(meta);
   record_transaction(meta.trx());//保存到数据库
   return result;
} FC_CAPTURE_AND_RETHROW() }

transaction_trace chain_controller::_apply_transaction( transaction_metadata& meta ) { try {
   auto execute = [this](transaction_metadata& meta) -> transaction_trace {
      try {
         auto temp_session = \_db.start_undo_session(true);
         auto result =  __apply_transaction(meta);
......
      } catch (...) {
......
      }
   };

......
} FC_CAPTURE_AND_RETHROW( (transaction_header(meta.trx())) ) }



交易完成后,就需要打包到块并进行广播了.这里只简单说一下,在介绍区块和共识时再详细说明:


在producer_plugin插件中:

void producer_plugin::plugin_startup()
{ try {
   ilog("producer plugin:  plugin_startup() begin");
   chain::chain_controller& chain = app().get_plugin<chain_plugin>().chain();

   if (!my->_producers.empty())
   {
......
      my->schedule_production_loop();
   } else
......
   } FC_CAPTURE_AND_RETHROW() }


void producer_plugin_impl::schedule_production_loop() {
   //Schedule for the next second's tick regardless of chain state
   // If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
......

   //\_timer.expires_from_now(boost::posix_time::microseconds(time_to_next_block_time));
  \_timer.expires_from_now( boost::posix_time::microseconds(time_to_next_block_time) );
   //\_timer.async_wait(boost::bind(&producer_plugin_impl::block_production_loop, this));
   \_timer.async_wait( [&](const boost::system::error_code&){ block_production_loop(); } );
}

block_production_condition::block_production_condition_enum producer_plugin_impl::block_production_loop() {
   block_production_condition::block_production_condition_enum result;
   fc::mutable_variant_object capture;
   try
   {
      result = maybe_produce_block(capture);//生产块
   }
   catch( const fc::canceled_exception& )
   {
    ......
   }

   if(result != block_production_condition::produced && result == \_prev_result) {
      \_prev_result_count++;
   }
   else {
      \_prev_result_count = 1;
      \_prev_result = result;
      switch(result)
         {
         case block_production_condition::produced: {
            const auto& db = app().get_plugin<chain_plugin>().chain();
            auto producer  = db.head_block_producer();
......
            break;
         }
......
         }
   }
   schedule_production_loop();//循环调用
   return result;
}

block_production_condition::block_production_condition_enum producer_plugin_impl::maybe_produce_block(fc::mutable_variant_object& capture) {
   chain::chain_controller& chain = app().get_plugin<chain_plugin>().chain();
   fc::time_point now = fc::time_point::now();

   if (app().get_plugin<chain_plugin>().is_skipping_transaction_signatures()) {
      \_production_skip_flags |= skip_transaction_signatures;
   }
   // If the next block production opportunity is in the present or future, we're synced.
   if( \!\_production_enabled )
   {
      if( chain.get_slot_time(1) >= now )
         \_production_enabled = true;
      else
         return block_production_condition::not_synced;
   }

   // is anyone scheduled to produce now or one second in the future?
   uint32_t slot = chain.get_slot_at_time( now );
   if( slot == 0 )
   {
      capture("next_time", chain.get_slot_time(1));
      return block_production_condition::not_time_yet;
   }

   //
   // this assert should not fail, because now <= db.head_block_time()
   // should have resulted in slot == 0.
   //
   // if this assert triggers, there is a serious bug in get_slot_at_time()
   // which would result in allowing a later block to have a timestamp
   // less than or equal to the previous block
   //
   assert( now > chain.head_block_time() );

   auto scheduled_producer = chain.get_scheduled_producer( slot );
   // we must control the producer scheduled to produce the next block.
   if( \_producers.find( scheduled_producer ) == \_producers.end() )
   {
      capture("scheduled_producer", scheduled_producer);
      return block_production_condition::not_my_turn;
   }

   auto scheduled_time = chain.get_slot_time( slot );
   eosio::chain::public_key_type scheduled_key = chain.get_producer(scheduled_producer).signing_key;
   auto private_key_itr = \_private_keys.find( scheduled_key );

......

   //此处产生块
   auto block = chain.generate_block(
      scheduled_time,
      scheduled_producer,
      private_key_itr->second,
      \_production_skip_flags
      );

   capture("n", block.block_num())("t", block.timestamp)("c", now)("count",block.input_transactions.size())("id",string(block.id()).substr(8,8));

   app().get_plugin<net_plugin>().broadcast_block(block);//广播块消息
   return block_production_condition::produced;
}



产生块的代码也比较简单,在chain-controller.cpp中:

signed_block chain_controller::generate_block(
   block_timestamp_type when,
   account_name producer,
   const private_key_type& block_signing_private_key,
   uint32_t skip /* = 0 */
   )
{ try {
   return with_skip_flags( skip | created_block, [&](){
      return \_db.with_write_lock( [&](){
        //直接调用同名函数
         return _generate_block( when, producer, block_signing_private_key );
      });
   });
} FC_CAPTURE_AND_RETHROW( (when) ) }

signed_block chain_controller::_generate_block( block_timestamp_type when,
                                              account_name producer,
                                              const private_key_type& block_signing_key )
{ try {

   try {
     //检测并获取相关参数值
      FC_ASSERT( head_block_time() < (fc::time_point)when, "block must be generated at a timestamp after the head block time" );
      uint32_t skip     = \_skip_flags;
      uint32_t slot_num = get_slot_at_time( when );//获取当前生产者的位置
      FC_ASSERT( slot_num > 0 );
      account_name scheduled_producer = get_scheduled_producer( slot_num );//获得当前区块的生产者
      FC_ASSERT( scheduled_producer == producer );

      const auto& producer_obj = get_producer(scheduled_producer);

      //如果符合条件,创建一个未决的块
      if( !\_pending_block ) {
         _start_pending_block();
      }
         //完成Cycle的构建
      _finalize_pending_cycle();

      if( !(skip & skip_producer_signature) )
         FC_ASSERT( producer_obj.signing_key == block_signing_key.get_public_key(),
                    "producer key ${pk}, block key ${bk}", ("pk", producer_obj.signing_key)("bk", block_signing_key.get_public_key()) );

      //设置未决块转成正式块的相关参数
      \_pending_block->timestamp   = when;
      \_pending_block->producer    = producer_obj.owner;
      \_pending_block->previous    = head_block_id();
      \_pending_block->block_mroot = get_dynamic_global_properties().block_merkle_root.get_root();
      \_pending_block->transaction_mroot = \_pending_block_trace->calculate_transaction_merkle_root();
      \_pending_block->action_mroot = \_pending_block_trace->calculate_action_merkle_root();

      if( is_start_of_round( \_pending_block->block_num() ) ) {
         auto latest_producer_schedule = _calculate_producer_schedule();
         if( latest_producer_schedule != _head_producer_schedule() )
            \_pending_block->new_producers = latest_producer_schedule;
      }
      \_pending_block->schedule_version = get_global_properties().active_producers.version;

      if( !(skip & skip_producer_signature) )
         \_pending_block->sign( block_signing_key );

     //结束块并广播消息保存相关数据
      _finalize_block( *\_pending_block_trace, producer_obj );

      \_pending_block_session->push();

      auto result = move( *\_pending_block );

      clear_pending();

      if (!(skip&skip_fork_db)) {
         \_fork_db.push_block(result);//加入到链中
      }
      return result;
   } catch ( ... ) {
      clear_pending();

      elog( "error while producing block" );
      _start_pending_block();
      throw;
   }

} FC_CAPTURE_AND_RETHROW( (producer) ) }



交易基本就完成了,这块比较麻烦,而且有些细节其实资料和代码有此不匹配,留待后面继续解决。





转载自:https://github.com/XChainLab/documentation/edit/master/eos/eos%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E4%B8%89%E4%BA%A4%E6%98%93.md

EOS源码分析之二网络

# eos源码分析之二网络

一、网络的初始化和启动



P2P网络是区块链的运行的基础模块,在EOS中,主要就是net_plugin,http_plugin,net_pai_plugin,当然在这个过程中网络也会引用到其它的一些模块的接口,但为了清晰,重点介绍网络相关部分,其它略过。


首先看一下网络插件生成的时候的代码:

net_plugin::net_plugin()
   :my( new net_plugin_impl ) {
   my_impl = my.get();//此处写得不是太好,从智能指针又退化回到 普通指针
}



可以看到,它生成了一个net_plugin_impl的实例,真正的网络操作相关的代码其实在这个类中,看名字也可以明白,JAVA接口经常这么干。然后接着按Main函数中的初始化来看:

void net_plugin::plugin_initialize( const variables_map& options ) {
......//日志相关忽略

   //初始化相关参数,版本,是否发送完整块,交易周期等
   my->network_version = static_cast<uint16_t>(app().version());
   my->network_version_match = options.at("network-version-match").as<bool>();
   my->send_whole_blocks = def_send_whole_blocks;

   my->sync_master.reset( new sync_manager(options.at("sync-fetch-span").as<uint32_t>() ) );
   my->big_msg_master.reset( new big_msg_manager );

   my->connector_period = std::chrono::seconds(options.at("connection-cleanup-period").as<int>());
   my->txn_exp_period = def_txn_expire_wait;
   my->resp_expected_period = def_resp_expected_wait;
   my->big_msg_master->just_send_it_max = def_max_just_send;
   my->max_client_count = options.at("max-clients").as<int>();

   my->num_clients = 0;
   my->started_sessions = 0;

   //使用BOOST的resolver来处理与网络相关的数据格式的转换
   my->resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );

   //根据options设置来设置相关配置
   if(options.count("p2p-listen-endpoint")) {
      my->p2p_address = options.at("p2p-listen-endpoint").as< string >();
      auto host = my->p2p_address.substr( 0, my->p2p_address.find(':') );
      auto port = my->p2p_address.substr( host.size()+1, my->p2p_address.size() );
      idump((host)(port));
      tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
      // Note: need to add support for IPv6 too?
      //得到监听地址
      my->listen_endpoint = \*my->resolver->resolve( query);
      //重置boost socket网络接收器
      my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) );
   }
   if(options.count("p2p-server-address")) {
      my->p2p_address = options.at("p2p-server-address").as< string >();
   }
   else {
      if(my->listen_endpoint.address().to_v4() == address_v4::any()) {
         boost::system::error_code ec;
         auto host = host_name(ec);
         if( ec.value() != boost::system::errc::success) {

            FC_THROW_EXCEPTION( fc::invalid_arg_exception,
                                "Unable to retrieve host_name. ${msg}",( "msg",ec.message()));

         }
         auto port = my->p2p_address.substr( my->p2p_address.find(':'), my->p2p_address.size());
         my->p2p_address = host + port;
      }
   }
   ......
   //处理连接设置
   if(options.count("allowed-connection")) {
      const std::vector<std::string> allowed_remotes = options["allowed-connection"].as<std::vector<std::string>>();
      for(const std::string& allowed_remote : allowed_remotes)
         {
            if(allowed_remote == "any")
               my->allowed_connections |= net_plugin_impl::Any;
            else if(allowed_remote == "producers")
               my->allowed_connections |= net_plugin_impl::Producers;
            else if(allowed_remote == "specified")
               my->allowed_connections |= net_plugin_impl::Specified;
            else if(allowed_remote == "none")
               my->allowed_connections = net_plugin_impl::None;
         }
   }
......
   //查找依赖的链插件
   my->chain_plug = app().find_plugin<chain_plugin>();//插件已经在上一篇中讲过的宏中注册
   my->chain_plug->get_chain_id(my->chain_id);
   fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size());
   ilog("my node_id is ${id}",("id",my->node_id));
   //重置心跳定时器
   my->keepalive_timer.reset(new boost::asio::steady_timer(app().get_io_service()));
   my->ticker();
}



初始化完成后,看一下启动的代码

void net_plugin::plugin_startup() {
   if( my->acceptor ) {
      常见的网络服务操作,打开监听服务,设置选项,绑定地址,启动监听
      my->acceptor->open(my->listen_endpoint.protocol());
      my->acceptor->set_option(tcp::acceptor::reuse_address(true));
      my->acceptor->bind(my->listen_endpoint);
      my->acceptor->listen();
      ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count));
      my->start_listen_loop();//循环接收连接
   }

   //绑定等待交易信号
   my->chain_plug->chain().on_pending_transaction.connect( &net_plugin_impl::transaction_ready);
   my->start_monitors();//启动连接和交易到期的监视(一个自循环)

   for( auto seed_node : my->supplied_peers ) {
      connect( seed_node );//连接种子节点,接入P2P网络
   }
}



代码看上去很少,其实信息量真的不小。下面分别来说明。

二、网络的监听和接收



先看一下循环监听,写得跟别人不一样,但是目的达到的是一样。

void net_plugin_impl::start_listen_loop( ) {
   auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );
    //异步监听的lambada表达式
   acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
         if( !ec ) {
            uint32_t visitors = 0;
            for (auto &conn : connections) {
               if(conn->current() && conn->peer_addr.empty()) {
                  visitors++;
               }
            }
            //判断新连接并增加计数
            if (num_clients != visitors) {
               ilog ("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients));
               num_clients = visitors;
            }
            if( max_client_count == 0 || num_clients < max_client_count ) {
               ++num_clients;
               connection_ptr c = std::make_shared<connection>( socket );
               connections.insert( c );//保存新连接的指针
               start_session( c );
            } else {
               elog( "Error max_client_count ${m} exceeded",
                     ( "m", max_client_count) );
               socket->close( );
            }
            start_listen_loop();//继续监听
         } else {
            elog( "Error accepting connection: ${m}",( "m", ec.message() ) );
         }
      });
}
void net_plugin_impl::start_session( connection_ptr con ) {
   boost::asio::ip::tcp::no_delay nodelay( true );
   con->socket->set_option( nodelay );
   start_read_message( con );//开始读取连接的消息
   ++started_sessions;

   // for now, we can just use the application main loop.
   //     con->readloop_complete  = bf::async( [=](){ read_loop( con ); } );
   //     con->writeloop_complete = bf::async( [=](){ write_loop con ); } );
}

其实上面的代码没什么特殊的,只是引用了BOOST的库,可能得熟悉一下,接着看如何读取消息,真正的数据交互在这里:

void net_plugin_impl::start_read_message( connection_ptr conn ) {

   try {
      if(!conn->socket) {
         return;
      }
      //真正的数据异步读取
      conn->socket->async_read_some
         (conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(),
          [this,conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
            try {
               if( !ec ) {
                 //判断是否超大小读取数据
                  if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
                     elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
                          ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
                  }
                  //判断是不是符合情况
                  FC_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write());
                  conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
                  //处理数据
                  while (conn->pending_message_buffer.bytes_to_read() > 0) {
                     uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();

                     if (bytes_in_buffer < message_header_size) {
                        break;
                     } else {
                        uint32_t message_length;
                        auto index = conn->pending_message_buffer.read_index();
                        conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
                        if(message_length > def_send_buffer_size*2) {
                           elog("incoming message length unexpected (${i})", ("i", message_length));
                           close(conn);
                           return;
                        }
                        if (bytes_in_buffer >= message_length + message_header_size) {
                           conn->pending_message_buffer.advance_read_ptr(message_header_size);
                           if (!conn->process_next_message(*this, message_length)) {
                              return;
                           }
                        } else {
                           conn->pending_message_buffer.add_space(message_length + message_header_size - bytes_in_buffer);
                           break;
                        }
                     }
                  }
                  start_read_message(conn);//继续读取
               } else {
                  auto pname = conn->peer_name();
                  if (ec.value() != boost::asio::error::eof) {
                     elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) );
                  } else {
                     ilog( "Peer ${p} closed connection",("p",pname) );
                  }
                  close( conn );
               }
            }
            catch(const std::exception &ex) {
......
            }
......
         } );
   } catch (...) {
......
   }
}

/*
 *  创建一个数据接收的缓冲区
 *  Creates and returns a vector of boost mutable_buffers that can
 *  be passed to boost async_read() and async_read_some() functions.
 *  The beginning of the vector will be the write pointer, which
 *  should be advanced the number of bytes read after the read returns.
 */
std::vector<boost::asio::mutable_buffer> get_buffer_sequence_for_boost_async_read() {
  std::vector<boost::asio::mutable_buffer> seq;
  FC_ASSERT(write_ind.first < buffers.size());
  seq.push_back(boost::asio::buffer(&buffers[write_ind.first]->at(write_ind.second),
                                            buffer_len - write_ind.second));
  for (std::size_t i = write_ind.first + 1; i < buffers.size(); i++) {
    seq.push_back(boost::asio::buffer(&buffers[i]->at(0), buffer_len));
  }
  return seq;
}


三、网络的连接



处理完成监听和接收,来看一下主动连接:

void net_plugin_impl::start_monitors() {
   connector_check.reset(new boost::asio::steady_timer( app().get_io_service()));
   transaction_check.reset(new boost::asio::steady_timer( app().get_io_service()));
   start_conn_timer();//调用两个函数
   start_txn_timer();
}
//调用的start_conn_timer
void net_plugin_impl::start_conn_timer( ) {
   connector_check->expires_from_now( connector_period);// 设置定时器
   connector_check->async_wait( [&](boost::system::error_code ec) {
         if( !ec) {
            connection_monitor( );//调用连接监控
         }
         else {
            elog( "Error from connection check monitor: ${m}",( "m", ec.message()));
            start_conn_timer( );
         }
      });
}
void net_plugin_impl::connection_monitor( ) {
   start_conn_timer();//循环调用
   vector <connection_ptr> discards;
   num_clients = 0;
   for( auto &c : connections ) {
      if( !c->socket->is_open() && !c->connecting) {
         if( c->peer_addr.length() > 0) {
            connect(c);//连接指定的点。
         }
         else {
            discards.push_back( c);
         }
      } else {
         if( c->peer_addr.empty()) {
            num_clients++;
         }
      }
   }
   //处理断开的连接
   if( discards.size( ) ) {
      for( auto &c : discards) {
         connections.erase( c );
         c.reset();
      }
   }
}
//交易的定时器监视
void net_plugin_impl::start_txn_timer() {
   transaction_check->expires_from_now( txn_exp_period);
   transaction_check->async_wait( [&](boost::system::error_code ec) {
         if( !ec) {
            expire_txns( );//处理到期交易的情况
         }
         else {
            elog( "Error from transaction check monitor: ${m}",( "m", ec.message()));
            start_txn_timer( );
         }
      });
}
void net_plugin_impl::expire_txns() {
   start_txn_timer( );
   auto &old = local_txns.get<by_expiry>();
   auto ex_up = old.upper_bound( time_point::now());
   auto ex_lo = old.lower_bound( fc::time_point_sec( 0));
   old.erase( ex_lo, ex_up);

   auto &stale = local_txns.get<by_block_num>();
   chain_controller &cc = chain_plug->chain();
   uint32_t bn = cc.last_irreversible_block_num();
   auto bn_up = stale.upper_bound(bn);
   auto bn_lo = stale.lower_bound(1);
   stale.erase( bn_lo, bn_up);
}



最后看一看连接的代码:


/**
 *  Used to trigger a new connection from RPC API
 */
string net_plugin::connect( const string& host ) {
   if( my->find_connection( host ) )
      return "already connected";

   connection_ptr c = std::make_shared<connection>(host);
   fc_dlog(my->logger,"adding new connection to the list");
   my->connections.insert( c );
   fc_dlog(my->logger,"calling active connector");
   my->connect( c );
   return "added connection";
}
//两个连接的重载,其实都很简单,第个Connect负责解析,第二个Connect负责真正连接
void net_plugin_impl::connect( connection_ptr c ) {
   if( c->no_retry != go_away_reason::no_reason) {
      fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( c->no_retry )));
      return;
   }

   auto colon = c->peer_addr.find(':');

   if (colon == std::string::npos || colon == 0) {
      elog ("Invalid peer address. must be \"host:port\": ${p}", ("p",c->peer_addr));
      return;
   }

   auto host = c->peer_addr.substr( 0, colon );
   auto port = c->peer_addr.substr( colon + 1);
   idump((host)(port));
   tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
   // Note: need to add support for IPv6 too

   resolver->async_resolve( query,
                            [c, this]( const boost::system::error_code& err,
                                       tcp::resolver::iterator endpoint_itr ){
                               if( !err ) {
                                  connect( c, endpoint_itr );
                               } else {
                                  elog( "Unable to resolve ${peer_addr}: ${error}",
                                        (  "peer_addr", c->peer_name() )("error", err.message() ) );
                               }
                            });
}

void net_plugin_impl::connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
   if( c->no_retry != go_away_reason::no_reason) {
      string rsn = reason_str(c->no_retry);
      return;
   }
   auto current_endpoint = \*endpoint_itr;
   ++endpoint_itr;
   c->connecting = true;
   c->socket->async_connect( current_endpoint, [c, endpoint_itr, this] ( const boost::system::error_code& err ) {
         if( !err ) {
            start_session( c );//读取数据
            c->send_handshake ();//发送握手
         } else {
            if( endpoint_itr != tcp::resolver::iterator() ) {
               c->close();
               connect( c, endpoint_itr );
            }
            else {
               elog( "connection failed to ${peer}: ${error}",
                     ( "peer", c->peer_name())("error",err.message()));
               c->connecting = false;
               my_impl->close(c);
            }
         }
      } );
}


四、网络的数据同步



在前面看了start_read_message,对内部没有怎么做细节的分析,网络也启动了,节点也发现了,那么P2P的职责开始实现了,首先就是同步数据,和比特币类似,也有一个中心的消息处理系统,名字都有点像。

bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {
   try {
      // If it is a signed_block, then save the raw message for the cache
      // This must be done before we unpack the message.
      // This code is copied from fc::io::unpack(..., unsigned_int)
      auto index = pending_message_buffer.read_index();
      uint64_t which = 0; char b = 0; uint8_t by = 0;
      do {
         pending_message_buffer.peek(&b, 1, index);
         which |= uint32_t(uint8_t(b) & 0x7f) << by;
         by += 7;
      } while( uint8_t(b) & 0x80 );

      if (which == uint64_t(net_message::tag<signed_block>::value)) {
         blk_buffer.resize(message_length);
         auto index = pending_message_buffer.read_index();
         pending_message_buffer.peek(blk_buffer.data(), message_length, index);
      }
      auto ds = pending_message_buffer.create_datastream();
      net_message msg;
      fc::raw::unpack(ds, msg);
      msgHandler m(impl, shared_from_this() );//impl是net_plugin_impl
      msg.visit(m);//注意这里最终是调用一个仿函数,static_variant.hpp中
   } catch(  const fc::exception& e ) {
      edump((e.to_detail_string() ));
      impl.close( shared_from_this() );
      return false;
   }
   return true;
}

//仿函数实现是通过重载了小括号
struct msgHandler : public fc::visitor<void> {
   net_plugin_impl &impl;
   connection_ptr c;
   msgHandler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}

   template <typename T>
   void operator()(const T &msg) const
   {
      impl.handle_message( c, msg); //这里会调用net_plugin_impl中的handle_message
   }
};



下面开始调用分发函数:


void net_plugin_impl::handle_message( connection_ptr c, const handshake_message &msg) {
   fc_dlog( logger, "got a handshake_message from ${p} ${h}", ("p",c->peer_addr)("h",msg.p2p_address));
   if (!is_valid(msg)) {
      elog( "Invalid handshake message received from ${p} ${h}", ("p",c->peer_addr)("h",msg.p2p_address));
      c->enqueue( go_away_message( fatal_other ));
      return;
   }
   chain_controller& cc = chain_plug->chain();
   uint32_t lib_num = cc.last_irreversible_block_num( );
   uint32_t peer_lib = msg.last_irreversible_block_num;
   if( c->connecting ) {
      c->connecting = false;
   }
   if (msg.generation == 1) {
      if( msg.node_id == node_id) {
         elog( "Self connection detected. Closing connection");
         c->enqueue( go_away_message( self ) );
         return;
      }

      if( c->peer_addr.empty() || c->last_handshake_recv.node_id == fc::sha256()) {
         fc_dlog(logger, "checking for duplicate" );
         for(const auto &check : connections) {
            if(check == c)
               continue;
            if(check->connected() && check->peer_name() == msg.p2p_address) {
               // It's possible that both peers could arrive here at relatively the same time, so
               // we need to avoid the case where they would both tell a different connection to go away.
               // Using the sum of the initial handshake times of the two connections, we will
               // arbitrarily (but consistently between the two peers) keep one of them.
               if (msg.time + c->last_handshake_sent.time <= check->last_handshake_sent.time + check->last_handshake_recv.time)
                  continue;

               fc_dlog( logger, "sending go_away duplicate to ${ep}", ("ep",msg.p2p_address) );
               go_away_message gam(duplicate);
               gam.node_id = node_id;
               c->enqueue(gam);
               c->no_retry = duplicate;
               return;
            }
         }
      }
      else {
         fc_dlog(logger, "skipping duplicate check, addr == ${pa}, id = ${ni}",("pa",c->peer_addr)("ni",c->last_handshake_recv.node_id));
      }

      if( msg.chain_id != chain_id) {
         elog( "Peer on a different chain. Closing connection");
         c->enqueue( go_away_message(go_away_reason::wrong_chain) );
         return;
      }
      if( msg.network_version != network_version) {
         if (network_version_match) {
            elog("Peer network version does not match expected ${nv} but got ${mnv}",
                 ("nv", network_version)("mnv", msg.network_version));
            c->enqueue(go_away_message(wrong_version));
            return;
         } else {
            wlog("Peer network version does not match expected ${nv} but got ${mnv}",
                 ("nv", network_version)("mnv", msg.network_version));
         }
      }

      if(  c->node_id != msg.node_id) {
         c->node_id = msg.node_id;
      }

      if(!authenticate_peer(msg)) {
         elog("Peer not authenticated.  Closing connection.");
         c->enqueue(go_away_message(authentication));
         return;
      }

      bool on_fork = false;
      fc_dlog(logger, "lib_num = ${ln} peer_lib = ${pl}",("ln",lib_num)("pl",peer_lib));

      if( peer_lib <= lib_num && peer_lib > 0) {
         try {
            block_id_type peer_lib_id =  cc.get_block_id_for_num( peer_lib);
            on_fork =( msg.last_irreversible_block_id != peer_lib_id);
         }
         catch( const unknown_block_exception &ex) {
            wlog( "peer last irreversible block ${pl} is unknown", ("pl", peer_lib));
            on_fork = true;
         }
         catch( ...) {
            wlog( "caught an exception getting block id for ${pl}",("pl",peer_lib));
            on_fork = true;
         }
         if( on_fork) {
            elog( "Peer chain is forked");
            c->enqueue( go_away_message( forked ));
            return;
         }
      }

      if (c->sent_handshake_count == 0) {
         c->send_handshake();
      }
   }

   c->last_handshake_recv = msg;
   sync_master->recv_handshake(c,msg);//这里开始同步
}
void sync_manager::recv_handshake (connection_ptr c, const handshake_message &msg) {
   chain_controller& cc = chain_plug->chain();
......
   //--------------------------------
   // sync need checkz; (lib == last irreversible block)
   //
   // 0. my head block id == peer head id means we are all caugnt up block wise
   // 1. my head block num < peer lib - start sync locally
   // 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation
   //
   // 3  my head block num <= peer head block num - update sync state and send a catchup request
   // 4  my head block num > peer block num ssend a notice catchup if this is not the first generation
   //
   //-----------------------------

   uint32_t head = cc.head_block_num( );
   block_id_type head_id = cc.head_block_id();
   if (head_id == msg.head_id) {
      fc_dlog(logger, "sync check state 0");
      // notify peer of our pending transactions
      notice_message note;
      note.known_blocks.mode = none;
      note.known_trx.mode = catch_up;
      note.known_trx.pending = my_impl->local_txns.size();
      c->enqueue( note );
      return;
   }
   if (head < peer_lib) {
      fc_dlog(logger, "sync check state 1");
      start_sync( c, peer_lib);//同步
      return;
   }
......
}




再深入的细节就不再分析了,就是基本的数据交互通信。

转载自:https://github.com/XChainLab/documentation/edit/master/eos/eos%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E4%BA%8C%E7%BD%91%E7%BB%9C.md