您正在查看: Surou 发布的文章

使用OpenZeppelin编写可升级合约

为了方便社区新手入门,本文尽可能的详细

准备合约编译环境

测试系统:WSL ubuntu 20.04
node: v10.19.0
npm: 6.14.4

创建环境工程

mkdir myContract
cd myContract
sudo npm i -g truffle
truffle init

此时目录下会生成truffle工程相关文件

contracts  migrations  test  truffle-config.js

编写测试合约

需要部署三个合约,分别是

  1. Parms(逻辑合约)
  2. ProxyAdmin(管理合约)
  3. TransparentUpgradeProxy(代理合约,DAPP直接交互的合约地址)

Params合约代码

myContract/contract/Parms.sol文件,并输入

// SPDX-License-Identifier: MIT

pragma solidity ^0.8.0;

import "@openzeppelin/contracts-upgradeable/proxy/utils/Initializable.sol";
import "@openzeppelin/contracts-upgradeable/access/OwnableUpgradeable.sol";

contract Params is Initializable,OwnableUpgradeable {
    function initialize()public initializer{
        __Context_init_unchained();
        __Ownable_init_unchained();
    }
    mapping(string => uint256) private uint256Params;

    event Uint256ParamSetted(string indexed _key,uint256 _value);

    function SetUint256Param(string memory _key,uint256 _value) external onlyOwner{
        uint256Params[_key] = _value;
        emit Uint256ParamSetted(_key,_value);
    }


    function GetUint256Param(string memory _key)public view returns(uint256){
        return uint256Params[_key];
    }
}

ProxyAdmin合约代码

myContract/contract/ProxyAdmin.sol文件,并输入

// SPDX-License-Identifier: MIT

pragma solidity ^0.8.0;

import "@openzeppelin/contracts/proxy/transparent/TransparentUpgradeableProxy.sol";
import "@openzeppelin/contracts/access/Ownable.sol";

/**
 * @dev This is an auxiliary contract meant to be assigned as the admin of a {TransparentUpgradeableProxy}. For an
 * explanation of why you would want to use this see the documentation for {TransparentUpgradeableProxy}.
 */
contract ProxyAdmin is Ownable {
    /**
     * @dev Returns the current implementation of `proxy`.
     *
     * Requirements:
     *
     * - This contract must be the admin of `proxy`.
     */
    function getProxyImplementation(TransparentUpgradeableProxy proxy) public view virtual returns (address) {
        // We need to manually run the static call since the getter cannot be flagged as view
        // bytes4(keccak256("implementation()")) == 0x5c60da1b
        (bool success, bytes memory returndata) = address(proxy).staticcall(hex"5c60da1b");
        require(success);
        return abi.decode(returndata, (address));
    }

    /**
     * @dev Returns the current admin of `proxy`.
     *
     * Requirements:
     *
     * - This contract must be the admin of `proxy`.
     */
    function getProxyAdmin(TransparentUpgradeableProxy proxy) public view virtual returns (address) {
        // We need to manually run the static call since the getter cannot be flagged as view
        // bytes4(keccak256("admin()")) == 0xf851a440
        (bool success, bytes memory returndata) = address(proxy).staticcall(hex"f851a440");
        require(success);
        return abi.decode(returndata, (address));
    }

    /**
     * @dev Changes the admin of `proxy` to `newAdmin`.
     *
     * Requirements:
     *
     * - This contract must be the current admin of `proxy`.
     */
    function changeProxyAdmin(TransparentUpgradeableProxy proxy, address newAdmin) public virtual onlyOwner {
        proxy.changeAdmin(newAdmin);
    }

    /**
     * @dev Upgrades `proxy` to `implementation`. See {TransparentUpgradeableProxy-upgradeTo}.
     *
     * Requirements:
     *
     * - This contract must be the admin of `proxy`.
     */
    function upgrade(TransparentUpgradeableProxy proxy, address implementation) public virtual onlyOwner {
        proxy.upgradeTo(implementation);
    }

    /**
     * @dev Upgrades `proxy` to `implementation` and calls a function on the new implementation. See
     * {TransparentUpgradeableProxy-upgradeToAndCall}.
     *
     * Requirements:
     *
     * - This contract must be the admin of `proxy`.
     */
    function upgradeAndCall(
        TransparentUpgradeableProxy proxy,
        address implementation,
        bytes memory data
    ) public payable virtual onlyOwner {
        proxy.upgradeToAndCall{value: msg.value}(implementation, data);
    }
}

TransparentUpgradeableProxy 代理合约代码

myContract/contract/TransparentUpgradeProxy.sol文件,并输入

// SPDX-License-Identifier: MIT

pragma solidity ^0.8.0;

import "@openzeppelin/contracts/proxy/ERC1967/ERC1967Proxy.sol";

/**
 * @dev This contract implements a proxy that is upgradeable by an admin.
 *
 * To avoid https://medium.com/nomic-labs-blog/malicious-backdoors-in-ethereum-proxies-62629adf3357[proxy selector
 * clashing], which can potentially be used in an attack, this contract uses the
 * https://blog.openzeppelin.com/the-transparent-proxy-pattern/[transparent proxy pattern]. This pattern implies two
 * things that go hand in hand:
 *
 * 1. If any account other than the admin calls the proxy, the call will be forwarded to the implementation, even if
 * that call matches one of the admin functions exposed by the proxy itself.
 * 2. If the admin calls the proxy, it can access the admin functions, but its calls will never be forwarded to the
 * implementation. If the admin tries to call a function on the implementation it will fail with an error that says
 * "admin cannot fallback to proxy target".
 *
 * These properties mean that the admin account can only be used for admin actions like upgrading the proxy or changing
 * the admin, so it's best if it's a dedicated account that is not used for anything else. This will avoid headaches due
 * to sudden errors when trying to call a function from the proxy implementation.
 *
 * Our recommendation is for the dedicated account to be an instance of the {ProxyAdmin} contract. If set up this way,
 * you should think of the `ProxyAdmin` instance as the real administrative interface of your proxy.
 */
contract TransparentUpgradeableProxy is ERC1967Proxy {
    /**
     * @dev Initializes an upgradeable proxy managed by `_admin`, backed by the implementation at `_logic`, and
     * optionally initialized with `_data` as explained in {ERC1967Proxy-constructor}.
     */
    constructor(
        address _logic,
        address admin_,
        bytes memory _data
    ) payable ERC1967Proxy(_logic, _data) {
        assert(_ADMIN_SLOT == bytes32(uint256(keccak256("eip1967.proxy.admin")) - 1));
        _changeAdmin(admin_);
    }

    /**
     * @dev Modifier used internally that will delegate the call to the implementation unless the sender is the admin.
     */
    modifier ifAdmin() {
        if (msg.sender == _getAdmin()) {
            _;
        } else {
            _fallback();
        }
    }

    /**
     * @dev Returns the current admin.
     *
     * NOTE: Only the admin can call this function. See {ProxyAdmin-getProxyAdmin}.
     *
     * TIP: To get this value clients can read directly from the storage slot shown below (specified by EIP1967) using the
     * https://eth.wiki/json-rpc/API#eth_getstorageat[`eth_getStorageAt`] RPC call.
     * `0xb53127684a568b3173ae13b9f8a6016e243e63b6e8ee1178d6a717850b5d6103`
     */
    function admin() external ifAdmin returns (address admin_) {
        admin_ = _getAdmin();
    }

    /**
     * @dev Returns the current implementation.
     *
     * NOTE: Only the admin can call this function. See {ProxyAdmin-getProxyImplementation}.
     *
     * TIP: To get this value clients can read directly from the storage slot shown below (specified by EIP1967) using the
     * https://eth.wiki/json-rpc/API#eth_getstorageat[`eth_getStorageAt`] RPC call.
     * `0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc`
     */
    function implementation() external ifAdmin returns (address implementation_) {
        implementation_ = _implementation();
    }

    /**
     * @dev Changes the admin of the proxy.
     *
     * Emits an {AdminChanged} event.
     *
     * NOTE: Only the admin can call this function. See {ProxyAdmin-changeProxyAdmin}.
     */
    function changeAdmin(address newAdmin) external virtual ifAdmin {
        _changeAdmin(newAdmin);
    }

    /**
     * @dev Upgrade the implementation of the proxy.
     *
     * NOTE: Only the admin can call this function. See {ProxyAdmin-upgrade}.
     */
    function upgradeTo(address newImplementation) external ifAdmin {
        _upgradeToAndCall(newImplementation, bytes(""), false);
    }

    /**
     * @dev Upgrade the implementation of the proxy, and then call a function from the new implementation as specified
     * by `data`, which should be an encoded function call. This is useful to initialize new storage variables in the
     * proxied contract.
     *
     * NOTE: Only the admin can call this function. See {ProxyAdmin-upgradeAndCall}.
     */
    function upgradeToAndCall(address newImplementation, bytes calldata data) external payable ifAdmin {
        _upgradeToAndCall(newImplementation, data, true);
    }

    /**
     * @dev Returns the current admin.
     */
    function _admin() internal view virtual returns (address) {
        return _getAdmin();
    }

    /**
     * @dev Makes sure the admin cannot access the fallback function. See {Proxy-_beforeFallback}.
     */
    function _beforeFallback() internal virtual override {
        require(msg.sender != _getAdmin(), "TransparentUpgradeableProxy: admin cannot fallback to proxy target");
        super._beforeFallback();
    }
}

修改truffle solc版本为0.8.0

打开truffle-config.js,修改solc版本

compilers: {
    solc: {
      version: "^0.8.0"
    ...

编译合约

首次需要安装依赖

npm i --save @openzeppelin/contracts-upgradeable
npm i --save @openzeppelin/contracts

编译合约

truffle complite

部署合约

等待完成后,会在build\contracts找到对应的编译后的合约
如何使用编译后的文件进行合约部署,参考《MyEtherWallet使用》

Params合约部署完的合约地址:0x808189EB5932Cfe710A849752CDb54F5fb1b85DA
ProxyAdmin合约部署完的合约地址:0x9b05f1F378B52011215464f4Ad4666BC80B7bcA5

部署TransparentUpgradeableProxy时,
LOGIC:逻辑合约地址,这里为 0x808189EB5932Cfe710A849752CDb54F5fb1b85DA
ADMIN
:管理合约地址,这里为 0x9b05f1F378B52011215464f4Ad4666BC80B7bcA5
_DATA:逻辑合约初始化方法调用数据,这里为0x8129fc1c(只调用initialize方法,initialize方法没有入参,如果有参数也是支持的)

0x8129fc1c由以下获得

web3js.eth.abi.encodeFunctionCall({
    name: 'initialize',
    type: 'function',
    inputs: []
}, []);
>0x8129fc1c

部署后的地址是:0x95e19C3609DE02291840CE9093c75e233cF2Cf08

此时用MyEtherWallet执行合约地址:0x95e19C3609DE02291840CE9093c75e233cF2Cf08,使用Params合约的abi执行
调用SetUint256Param,设置_key=K,_VALUE=1,并通过GetUint256Param进行验证
此时完成了基本的通过TransparentUpgradeableProxy代理合约调用Params逻辑合约的过程。

下面开始演示通过ProxyAdmin合约将Params逻辑合约升级

升级Params逻辑合约

修改Params合约,并部署

function GetUint256Param(string memory _key)public view returns(uint256){
    uint256 v = uint256Params[_key];
    return v+1;
}

新部署后的地址为0xBf72f7C1e39B76D7A5b702E16251Fdd132Cd6618

调用ProxyAdmin进行升级

ProxyAdmin提供两个方法进行升级

  1. upgrade,需要传入proxy地址,新的逻辑实现地址
  2. upgradeAndCall,需要传入roxy地址,新的逻辑实现地址,初始化调用数据

本例中,由于数据是保存在代理合约中,这份数据已经初始化过了,不需要再初始化,所以调用upgrade方法即可,参数如下:

  • proxy: 0x95e19C3609DE02291840CE9093c75e233cF2Cf08
  • implementation: 0xBf72f7C1e39B76D7A5b702E16251Fdd132Cd6618

至此,合约升级完毕。调用GetUint256Param方法进行验证,获得_key=K的value为2,合约升级成功。

参考

https://docs.openzeppelin.com/contracts/3.x/api/proxy
https://www.cnblogs.com/cqvoip/p/15033402.html
https://docs.openzeppelin.com/upgrades-plugins/1.x/

https://medium.com/@kenschiller/making-ethereum-smart-contracts-upgradable-aa16a4256d32

以太坊交易池架构设计

当前以太坊公链的平均每秒能处理30到40笔交易,因此以太坊一旦出现火热的DAPP时,极易出现交易拥堵。

偏底的交易处理速度永远无法同现有的中心化服务相比。当网络中出现大量交易排队时,矿工是如何选择并管理这些交易的呢?答案在本篇所介绍的以太坊交易池中,如果你对交易还不特别熟悉,则请先阅读 [以太坊交易]({{< ref "part1/transaction.md" >}})。

交易处理流程

当你通过以太坊钱包,发送一笔转账交易给张三时。这笔交易是如何进入网络,最终被矿工打包到区块中呢?

下图是一笔交易从出生到交易进入区块的关键流程。

transaction-life

首先,用户可通过以太坊钱包或者其他调用以太坊节点API (eth_sendRawTransaction等)发送交易到一个运行中的以太坊 geth 节点。

此时,因为交易时通过节点的API接收,因此此交易被视为一笔来自本地(local)(图中用红球表示),在经过一系列校验和处理后。交易成功进入交易池,随后向已连接的邻近节点发送此交易。

当邻近节点,如矿工节点从邻近节点接收到此交易时,在进入交易池之前。会将交易标记为来自远方(remote)的交易(图中用绿球表示)。也需要经过校验和处理后,进入矿工节点的交易池,等待矿工打包到区块中。

如果邻近节点,不是矿工,也无妨。因为任何节点会默认将接受到得合法交易及时发送给邻近节点。得益于P2P网络,一笔交易平均在6s内扩散到整个以太坊公链网络的各个节点中。

A-Distributed-P2P-Network-with-Elements-of-Blockchain-and-Cryptocurrency

进入以太坊交易池的交易被区分本地还是远方的目的是因为,节点对待local的交易和remote的交易有所差异。简单地说是 local 交易优先级高于 remote 交易。

以太坊交易池设计

前面并未交易池处理细节,这里将详细讲解以太坊交易池处理一笔交易时的完整过程。在讲解前,你还还有先了解以太坊交易池的设计模型。 从2014年到现在,以太坊的交易池一直在不断优化中,从未停止。从这里也说明,交易池不仅仅重要,还需要高性能。

下图是以太坊交易池的主要设计模块,分别是交易池配置、实时的区块链状态、交易管理容器、本地交易存储和新交易事件。

ethereum-tx-pool-desgin

各个模块相互影响,其中最重要的的交易管理。这也是需要我们重点介绍的部分。

交易池配置

交易池配置不多,但每项配置均直接影响交易池对交易的处理行为。配置信息由 TxPoolConfig 所定义,各项信息如下:

// core/tx_pool.go:125
type TxPoolConfig struct {
   Locals    []common.Address
   NoLocals  bool
   Journal   string
   Rejournal time.Duration
   PriceLimit uint64
   PriceBump  uint64
   AccountSlots uint64
   GlobalSlots  uint64
   AccountQueue uint64
   GlobalQueue  uint64
   Lifetime time.Duration
}
  • Locals: 定义了一组视为local交易的账户地址。任何来自此清单的交易均被视为 local 交易。
  • NoLocals: 是否禁止local交易处理。默认为 fasle,允许 local 交易。如果禁止,则来自 local 的交易均视为 remote 交易处理。
  • Journal: 存储local交易记录的文件名,默认是 ./transactions.rlp
  • Rejournal:定期将local交易存储文件中的时间间隔。默认为每小时一次。
  • PriceLimit: remote交易进入交易池的最低 Price 要求。此设置对 local 交易无效。默认值1。
  • PriceBump:替换交易时所要求的价格上调涨幅比例最低要求。任何低于要求的替换交易均被拒绝。
  • AccountSlots: 当交易池中可执行交易(是已在等待矿工打包的交易)量超标时,允许每个账户可以保留在交易池最低交易数。默认值是 16 笔。
  • GlobalSlots: 交易池中所允许的可执行交易量上限,高于上限时将释放部分交易。默认是 4096 笔交易。
  • AccountQueue:交易池中单个账户非可执行交易上限,默认是64笔。
  • GlobalQueue: 交易池中所有非可执行交易上限,默认1024 笔。
  • Lifetime: 允许 remote 的非可执行交易可在交易池存活的最长时间。交易池每分钟检查一次,一旦发现有超期的remote 账户,则移除该账户下的所有非可执行交易。默认为3小时。

上面配置中,包含两个重要概念可执行交易非可执行交易。可执行交易是指从交易池中择优选出的一部分交易可以被执行,打包到区块中。非可执行交易则相反,任何刚进入交易池的交易均属于非可执行状态,在某一个时刻才会提升为可执行状态。

一个节点如何自定义上述交易配置呢?以太坊 geth 节点允许在启动节点时,通过参数修改配置。可修改的交易池配置参数如下(通过 geth -h 查看):

TRANSACTION POOL OPTIONS:
  --txpool.locals value        Comma separated accounts to treat as locals (no flush, priority inclusion)
  --txpool.nolocals            Disables price exemptions for locally submitted transactions
  --txpool.journal value       Disk journal for local transaction to survive node restarts (default: "transactions.rlp")
  --txpool.rejournal value     Time interval to regenerate the local transaction journal (default: 1h0m0s)
  --txpool.pricelimit value    Minimum gas price limit to enforce for acceptance into the pool (default: 1)
  --txpool.pricebump value     Price bump percentage to replace an already existing transaction (default: 10)
  --txpool.accountslots value  Minimum number of executable transaction slots guaranteed per account (default: 16)
  --txpool.globalslots value   Maximum number of executable transaction slots for all accounts (default: 4096)
  --txpool.accountqueue value  Maximum number of non-executable transaction slots permitted per account (default: 64)
  --txpool.globalqueue value   Maximum number of non-executable transaction slots for all accounts (default: 1024)
  --txpool.lifetime value      Maximum amount of time non-executable transaction are queued (default: 3h0m0s)

链状态

所有进入交易池的交易均需要被校验,最基本的是校验账户余额是否足够支付交易执行。或者 交易 nonce 是否合法。在交易池中维护的最新的区块StateDB。当交易池接收到新区块信号时,将立即重置 statedb。

在交易池启动后,将订阅链的区块头事件:

//core/tx_pool.go:274
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

并开始监听新事件:

//core/tx_pool.go:305
for {
   select {
   // Handle ChainHeadEvent
   case ev := <-pool.chainHeadCh:
      if ev.Block != nil {
         pool.mu.Lock()
         if pool.chainconfig.IsHomestead(ev.Block.Number()) {
            pool.homestead = true
         }
         pool.reset(head.Header(), ev.Block.Header())
         head = ev.Block

         pool.mu.Unlock()
      }
  //...
  }
}

接收到事件后,将执行 func (pool *TxPool) reset(oldHead, newHead *types.Header)方法更新 state和处理交易。核心是将交易池中已经不符合要求的交易删除并更新整理交易,这里不展开描述,有兴趣的话,可以到微信群中交流。

本地交易

在交易池中将交易标记为 local 的有多种用途:

  1. 在本地磁盘存储已发送的交易。这样,本地交易不会丢失,重启节点时可以重新加载到交易池,实时广播出去。
  2. 可以作为外部程序和以太坊沟通的一个渠道。外部程序只需要监听文件内容变化,则可以获得交易清单。
  3. local交易可优先于 remote 交易。对交易量的限制等操作,不影响 local 下的账户和交易。

对应本地交易存储,在启动交易池时根据配置开启本地交易存储能力:

//core/tx_pool.go:264
if !config.NoLocals && config.Journal != "" {
        pool.journal = newTxJournal(config.Journal)
        if err := pool.journal.load(pool.AddLocals); err != nil {
            log.Warn("Failed to load transaction journal", "err", err)
        }
    //...
}

并从磁盘中加载已有交易到交易池。在新的local 交易进入交易池时,将被实时写入 journal 文件。

// core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
   if pool.journal == nil || !pool.locals.contains(from) {
      return
   }
   if err := pool.journal.insert(tx); err != nil {
      log.Warn("Failed to journal local transaction", "err", err)
   }
}

从上可看到,只有属于 local 账户的交易才会被记录。你又没有注意到,如果仅仅是这样的话,journal 文件是否会跟随本地交易而无限增长?答案是否定的,虽然无法实时从journal中移除交易。但是支持定期更新journal文件。

journal 并不是保存所有的本地交易以及历史,他仅仅是存储当前交易池中存在的本地交易。因此交易池会定期对 journal 文件执行 rotate,将交易池中的本地交易写入journal文件,并丢弃旧数据。

journal := time.NewTicker(pool.config.Rejournal)
//...
//core/tx_pool.go:353
case <-journal.C:
            if pool.journal != nil {
                pool.mu.Lock()
                if err := pool.journal.rotate(pool.local()); err != nil {
                    log.Warn("Failed to rotate local tx journal", "err", err)
                }
                pool.mu.Unlock()
            }
}

新交易信号

文章开头,有提到进入交易池的交易将被广播到网络中。这是依赖于交易池支持外部订阅新交易事件信号。任何订阅此事件的子模块,在交易池出现新的可执行交易时,均可实时接受到此事件通知,并获得新交易信息。

需要注意的是并非所有进入交易池的交易均被通知外部,而是只有交易从非可执行状态变成可执行状态后才会发送信号。

//core/tx_pool.go:705
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
//core/tx_pool.go:1022
go pool.txFeed.Send(NewTxsEvent{promoted})

在交易池中,有两处地方才会执行发送信号。一是交易时用于替换已经存在的可执行交易时。二是有新的一批交易从非可执行状态提升到可执行状态后。

外部只需要订阅SubscribeNewTxsEvent(ch chan<- NewTxsEvent)新可执行交易事件,则可实时接受交易。在 geth 中网络层将订阅交易事件,以便实时广播。

//eth/handler.go:213
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
//eth/handler.go:781
func (pm *ProtocolManager) txBroadcastLoop() {
   for {
      select {
      case event := <-pm.txsCh:
         pm.BroadcastTxs(event.Txs)
      //...
   }
}

另外是矿工实时订阅交易,以便将交易打包到区块中。

//miner/worker.go:207
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
//miner/worker.go:462
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
        acc, _ := types.Sender(w.current.signer, tx)
    txs[acc] = append(txs[acc], tx)
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
w.commitTransactions(txset, coinbase, nil)

交易管理

最核心的部分则是交易池对交易的管理机制。以太坊将交易按状态分为两部分:可执行交易和非可执行交易。分别记录在pending容器中和 queue 容器中。

ethereum-tx-pool-txManager

如上图所示,交易池先采用一个 txLookup (内部为map)跟踪所有交易。同时将交易根据本地优先,价格优先原则将交易划分为两部分 queue 和 pending。而这两部交易则按账户分别跟踪。

那么在交易在进入交易池进行管理的细节有是如何的呢?等我下一篇文章详细介绍以太坊交易池交易管理。

转载自:https://learnblockchain.cn/books/geth/part2/txpool/txpool.html

以太坊本地待处理交易存储

上篇在介绍交易池时有讲到对于本地交易的特殊处理。为了不丢失未完成的本地交易,以太坊交易池通过 journal 文件存储和管理当前交易池中的本地交易,并定期更新存储。

下图是交易池对本地待处理交易的磁盘存储管理流程,涉及加载、实时写入和定期更新维护。
以太坊交易池本地待处理交易存储管理

加载已存储交易

在交易池首次启动 journal 时,将主动将该文件已存储的交易加载到交易池。

//core/tx_journal.go:61
if _, err := os.Stat(journal.path); os.IsNotExist(err) { //❶
   return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path) //❷
if err != nil {
   return err
}
defer input.Close()

处理时,如果文件不存在则退出 ❶,否则 Open 文件,获得 input 文件流 ❷。

//core/tx_journal.go:76
stream := rlp.NewStream(input, 0)//❸
total, dropped := 0, 0

因为存储的内容格式是 rlp 编码内容,因此可以直接初始化 rlp 内容流 ❸,为连续解码做准备。

var (
   failure error
   batch   types.Transactions
)
for {
   tx := new(types.Transaction)
   if err = stream.Decode(tx); err != nil { //❹
      if err != io.EOF {
         failure = err
      }
      if batch.Len() > 0 {//❼
         loadBatch(batch)
      }
      break
   }
   total++

   if batch = append(batch, tx); batch.Len() > 1024 {//❺
      loadBatch(batch)//❻
      batch = batch[:0]
   }
}

直接进入 for 循环遍历,不断从 stream 中一笔笔地解码出交易❹。但交易并非单笔直接载入交易池,而是采用批量提交模式,每 1024 笔交易提交一次 ❺。 批量写入,有利于降低交易池在每次写入交易后的更新。一个批次只需要更新(排序与超限处理等)一次。当然在遍历结束时(err==io.EOF),也需要将当前批次中的交易载入❼。

loadBatch := func(txs types.Transactions) {
   for _, err := range add(txs) {
      if err != nil {
         log.Debug("Failed to add journaled transaction", "err", err)
         dropped++ //❽
      }
   }
}

loadBatch 就是将交易一批次加入到交易池,并获得交易池的每笔交易的处理情况。如果交易加入失败,则进行计数 ❽。最终在 load 方法执行完毕时,显示交易载入情况。

log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)

存储交易

以太坊存储本地交易

当交易池新交易来自于本地账户时❶,如果已开启记录本地交易,则将此交易加入journal ❷。到交易池时,将实时存储到 journal 文件中。

//core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
   // Only journal if it's enabled and the transaction is local
   if pool.journal == nil || !pool.locals.contains(from) {//❶
      return
   }
   if err := pool.journal.insert(tx); err != nil { //❷
      log.Warn("Failed to journal local transaction", "err", err)
   }
}

journal.insert则将交易实时写入文件流中❸,相当于实时存储到磁盘。而在写入时,是将交易进行RLP编码。

//core/tx_journal.go:120
func (journal *txJournal) insert(tx *types.Transaction) error {
   if journal.writer == nil {
      return errNoActiveJournal
   }
   if err := rlp.Encode(journal.writer, tx); err != nil {//❸
      return err
   }
   return nil
}

这里引发了在上面载入已存储交易时将交易重复写入文件。因此在加载交易时,使用一个 空 writer 替代 ❹。

//core/tx_journal.go:72
journal.writer = new(devNull) //❹
defer func() { journal.writer = nil }() //❺

并且在加载结束时清理❺。

定期更新 journal

image-20190622234757114

journal 的目的是长期存储本地尚未完成的交易,以便交易不丢失。而文件内容属于交易的RLP编码内容,不便于实时清空已完成或已无效的交易。因此以太坊采取的是定期将交易池在途交易更新到 journal 文件中。

首先,在首次加载文件中的交易到交易池后,利用交易池的检查功能,将已完成或者已完成的交易拒绝在交易池外。在加载完成后,交易池中的交易仅仅是本地账户待处理的交易,因此在加载完成后❶,立即将交易池中的所有本地交易覆盖journal文件❷。

//core/tx_pool.go:264
pool.journal = newTxJournal(config.Journal)

if err := pool.journal.load(pool.AddLocals); err != nil {//❶
   log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {//❷
   log.Warn("Failed to rotate transaction journal", "err", err)
}

在 rotate 中,并非直接覆盖。而是先创建另一个新文件❸,将所有交易RLP编码写入此文件❹ 。

replacement, err := os.OpenFile(journal.path+".new",  //❸
                                os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
   return err
}
journaled := 0
for _, txs := range all {
   for _, tx := range txs {
      if err = rlp.Encode(replacement, tx); err != nil {//❹
         replacement.Close()
         return err
      }
}
   journaled += len(txs)
}
replacement.Close()

写入完毕,将此文件直接移动(重命名),已覆盖原 journal 文件。

if err = os.Rename(journal.path+".new", journal.path); err != nil {
   return err
}

其次,是交易池根据参数 txpool.rejournal 所设置的更新间隔定期更新❺。将交易池中的本地交易存储到磁盘❻。

//core/tx_pool.go:298
journal := time.NewTicker(pool.config.Rejournal)//❺
//...
for {
  select {
    //...
    case <-journal.C:
            if pool.journal != nil {
                pool.mu.Lock()
                if err := pool.journal.rotate(pool.local()); err != nil { //❻
                    log.Warn("Failed to rotate local tx journal", "err", err)
                }
                pool.mu.Unlock()
            }
        }
  }
}

上述,是以太坊交易池对于本地交易进行持久化存储管理细节。
转载自:https://learnblockchain.cn/books/geth/part2/txpool/txjournal.html

以太坊交易入队列

这是关于以太坊交易池的第三篇文章,第一篇是[整体概况以太坊交易池]({{< ref "/part2/miner">}}),第二篇是讲解[以太坊本地交易存储]({{< ref "txJournal.md" >}})。而第三篇文章详解一笔交易时如何进入交易池,以及影响。内容较多,请坐好板凳。

交易进入交易池分三步走:校验、入队列、容量检查。拿 AddLocalTx举例。核心代码集中在交易池的func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error)方法。

校验交易合法性

任何交易进入交易池之前均需要校验交易数据的合法性。如果交易校验失败则拒绝此交易。

//core/tx_pool.go:662
if err := pool.validateTx(tx, local); err != nil {
   log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
   invalidTxCounter.Inc(1)
   return false, err
}

那么是如何进行校验的呢?代码逻辑集中在func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error方法中。

首先是防止DOS攻击,不允许交易数据超过32KB。

if tx.Size() > 32*1024 {
   return ErrOversizedData
}

接着不允许交易的转账金额为负数,实际上这次判断难以命中,原因是从外部接收的交易数据属RLP编码,是无法处理负数的。当然这里做一次校验,更加保险。

if tx.Value().Sign() < 0 {
   return ErrNegativeValue
}

交易在虚拟机中执行时将消耗GAS,为了防止程序错误,允许用户在交易中携带一个GAS上限,防止意外发生。同样,为了避免区块总消耗异常,和控制区块数据大小。也同样存在区块GAS上限。而区块中的GAS量是每笔交易执行消耗GAS之和,故不可能一笔交易的GAS上限超过区块GAS限制。一旦超过,这笔交易不可能会打包到区块中,则可在交易池中直接拒绝超过限制的交易。

if pool.currentMaxGas < tx.Gas() {
   return ErrGasLimit
}

每笔交易都需要携带[交易签名]({{< ref "part3/sign-and-valid.md" >}})信息,并从签名中解析出签名者地址。只有合法的签名才能成功解析出签名者。一旦解析失败拒绝此交易。

from, err := types.Sender(pool.signer, tx)
if err != nil {
   return ErrInvalidSender
}

既然知道是交易发送者(签名者),那么该发送者也可能是来自于交易池所标记的local账户。因此当交易不是local交易时,还进一步检查是否属于local账户。

local = local || pool.locals.contains(from)

如果不是local交易,那么交易的GasPrice 也必须不小于交易池设定的最低GasPrice。这样的限制检查,允许矿工自行决定GasPrice。有些矿工,可能只愿意处理愿意支付高手续费的交易。当然local交易则忽略,避免将本地产生的交易拦截。

if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
   return ErrUnderpriced
}

以太坊中每个[账户]({{< ref "part1/account.md" >}})都有一个数字类型的 Nonce 字段。是一个有序数字,一次比一次大。虚拟机每执行一次该账户的交易,则新 Nonce 将在此交易的Nonce上加1。如果使用恰当,该 Nonce 可间接表示已打包了 Nonce 笔该账户交易。既然不会变小,那么在交易池中不允许出现交易的Nonce 小于此账户当前Nonce的交易。

if pool.currentState.GetNonce(from) > tx.Nonce() {
   return ErrNonceTooLow
}

如果交易被打包到区块中,应该花费多少手续费呢?虽然无法知道最终花费多少,但至少花费多少手续费是可预知的。手续费加上本次交易转移的以太币数量,将会从该账户上扣除。那么账户至少需要转移多少以太坊是明确的。

因此在交易池中,将检查该账户余额,只有账户资产充足时,才允许交易继续,否则在虚拟机中执行交易,交易也必将失败。

if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
   return ErrInsufficientFunds
}

我们不但知道最低花费,也可以知道将最低花费多少GAS。因此也检查交易所设置的Gas上限是否正确。一旦交易至少需要2万Gas,而交易中设置的Gas上限确是 1万GAS。那么交易必然失败,且剩余了 1万GAS。

intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
   return err
}
if tx.Gas() < intrGas {
   return ErrIntrinsicGas
}

因此,在最后。如果交易GAS上限低于已知的最低GAS开销,则拒绝这笔必将失败的交易。

交易入队列

在交易池中并不是一个队列管理数据,而是由多个数据集一起管理交易。

ethereum-tx-pool-txManager

如上图所示,交易池先采用一个 txLookup (内部为map)跟踪所有交易。同时将交易根据本地优先,价格优先原则将交易划分为两部分 queue 和 pending。而这两部交易则按账户分别跟踪。

在进入交易队列前,将判断所有交易队列 all 是否已经达到上限。如果到底上限,则需要从交易池或者当前交易中移除优先级最低交易 。

//core/tx_pool.go:668
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { //❶
   if !local && pool.priced.Underpriced(tx, pool.locals) {//❷
      log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
      underpricedTxCounter.Inc(1)
      return false, ErrUnderpriced
   }
   drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) //❸
   for _, tx := range drop {
      log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
      underpricedTxCounter.Inc(1)
      pool.removeTx(tx.Hash(), false)
   }
}

那么哪些交易的优先级最低呢?首先,本地交易是受保护的,因此如果交易来自remote 时,将检查该交易的价格是否是整个交易池中属于最低价格的。如果是,则拒绝该交易❷。否则在加入此交易前,将从交易队列 all 中删除价格最低的一部分交易❸。为了高效获得不同价格的交易,交易池已经将交易按价格从低到高实施排列存储在 pool.priced中。

解决交易容量问题后,这笔交易过关斩将,立马将驶入交易内存池中。上图中,交易是有根据 from 分组管理,且一个 from 由分非可执行交易队列(queue)和可执行交易队列(pending)。新交易默认是要在非可执行队列中等待指示,但是一种情况时,如果该 from 的可执行队列中存在一个相同 nonce 的交易时,需要进一步识别是否能替换❹。

怎样的交易才能替换掉已在等待执行的交易呢?以太坊早起的默认设计是,只要价格(gasPrice)高于原交易,则允许替换。但是17年7月底在 #15401被改进。人们愿意支付更多手续费的原因有两种情况,一是急于处理交易,但如果真是紧急交易,那么在发送交易之处,会使用高于推荐的gasprice来处理交易。另一种情况时,以太坊价格下跌,人们愿意支付更多手续费。上调多少手续费是合理的呢?以太币下跌10%,那么便可以上调10%的手续费,毕竟对于用户来说,手续费的面值是一样的。交易池的默认配置(pool.config.PriceBump)是10%,只有上调10%手续费的交易才允许替换掉已在等待执行的交易❺。一旦可以替换,则替换掉旧交易❺,移除旧交易❻,并将交易同步存储到 all 交易内存池中。

//core/tx_pool.go:685
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {//❹
   inserted, old := list.Add(tx, pool.config.PriceBump)//❺
   if !inserted {
      pendingDiscardCounter.Inc(1)
      return false, ErrReplaceUnderpriced
   }
   if old != nil { //❻
      pool.all.Remove(old.Hash())
      pool.priced.Removed()
      pendingReplaceCounter.Inc(1)
   }
   pool.all.Add(tx)
   pool.priced.Put(tx)
   pool.journalTx(from, tx)
   //...
   return old != nil, nil
}
replace, err := pool.enqueueTx(hash, tx)//❼
if err != nil {
    return false, err
}

检查完是否需要替换 pending 交易后,则将交易存入非可执行队列❼。同样,在进入非可执行队列之前,也要检查是否需要替换掉相同 nonce 的交易❽。

func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
   //...
   inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) //❽
   if !inserted {
      queuedDiscardCounter.Inc(1)
      return false, ErrReplaceUnderpriced
   }
   if old != nil {
      pool.all.Remove(old.Hash())
      pool.priced.Removed()
      queuedReplaceCounter.Inc(1)
   }
   if pool.all.Get(hash) == nil {
      pool.all.Add(tx)
      pool.priced.Put(tx)
   }
   return old != nil, nil
}

最后,如果交易属于本地交易还需要额外关照。如果交易属于本地交易,但是本地账户集中不存在此 from 时,更新本地账户集❾,避免交易无法被存储⑩。另外,如果已开启存储本地交易,则实时存储本地交易⑪。

// core/tx_pool.go:715
if local {
   if !pool.locals.contains(from) {
      log.Info("Setting new local account", "address", from)
      pool.locals.add(from)//❾
   }
}
pool.journalTx(from, tx)
//....
//core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
    // Only journal if it's enabled and the transaction is local
    if pool.journal == nil || !pool.locals.contains(from) {//⑩
        return
    }
    if err := pool.journal.insert(tx); err != nil {//⑪
        log.Warn("Failed to journal local transaction", "err", err)
    }
}

至此,一笔交易经过千山万水,进入了交易内存池,等待执行。

另外,不难看出,priced 队列是在交易进入队列内存池时便被编排到priced 队列,已让 priced 队列是对 all 交易内存池的同步排序。且交易是在进入pending 队列或者 queue 队列后,才同步更新到 all 交易内存池中。

这里不打算讲解 pending 和 queue 队列的内部实现,请自行研究。因为忽略技术细节不会影响你对以太坊各个技术点,模块的理解。下一讲讲解交易池内存容量处理。
转载自:https://learnblockchain.cn/books/geth/part2/txpool/txaddtx.html