您正在查看: Optimism 分类下的文章

深入OpStack,提现,储值的处理逻辑

OpStack 各个角色

  • op-node 负责和op-geth交易打包落块,交易状态推导,数据传输同步的客户端
  • batcher 将数据同步到L1的EOA账户
  • op-processer 提交区块状态到 L1 的 L2OutputOracle 合约
  • crossDomainMessagener 跨链信使合约,负责L1->L2,L2->L1的通信
  • OptimismPortal 是 op-stack 的充值提现纽带合约
  • Bridges 桥合约,主要功能是承载充值提现
  • L2OutputOracle 在L1层接收L2过来的状态根的合约

L2->L1 提现逻辑

提现的核心步骤

  1. 第一步 用户在L2层调用withdraw给自己地址提币
  2. 第二步 业务逻辑在L2合约层进行处理,中间会经过以下几个合约和步骤
    1.首先会在L2StandradBridge上面执行call_initiateWithdrawal。根据ETH/ERC20
    2.如果提现的是ETH,则会调用CrossDomainMessenger的sendMessage方法,将msgNonce+1,并在方法体内部调用L2CrossDomainMessenger的_sendMessage方法
    3.L2CrossDomainMessenger的_sendMessage 会调用L2ToL1MessagePasser的initateWithdrawal。构造出withdrawalHash,并维护msgNonce自增为1。完事发送事件
  3. 第三步 sequencer中的op-node 监听到交易事件,将事件打包成交易 (此步在链下处理)
  4. 第四步 Op-batch负责发打包好的交易rollup到L1里面,Op-proposer负责将这批次的状态根stateroot提交到L1
  5. 第五步 用户在L1提取资金(但是要注意的是,需要在挑战期过后才能提取),可以使用op-stack-SDK。它内部的逻辑会调用L1层的OptimismPortal来提取资金。

L2链层源码

function _initiateWithdrawal(
    address _l2Token,
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    if (_l2Token == Predeploys.LEGACY_ERC20_ETH) {  // 判断是否是ETH
        _initiateBridgeETH(_from, _to, _amount, _minGasLimit, _extraData);
    } else {
        address l1Token = OptimismMintableERC20(_l2Token).l1Token();  //属于ERC20
        _initiateBridgeERC20(_l2Token, l1Token, _from, _to, _amount, _minGasLimit, _extraData);
    }
}

执行父类的方法,_initiateBridgeETH

function _initiateBridgeETH(
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    require(isCustomGasToken() == false, "StandardBridge: cannot bridge ETH with custom gas token");
    require(msg.value == _amount, "StandardBridge: bridging ETH must include sufficient ETH value");

    _emitETHBridgeInitiated(_from, _to, _amount, _extraData);

    messenger.sendMessage{ value: _amount }({
        _target: address(otherBridge),
        _message: abi.encodeWithSelector(this.finalizeBridgeETH.selector, _from, _to, _amount, _extraData),
        _minGasLimit: _minGasLimit
    });
}

此时,方法进入到CrossDomainMessenger

function sendMessage(address _target, bytes calldata _message, uint32 _minGasLimit) external payable {
    if (isCustomGasToken()) {
        require(msg.value == 0, "CrossDomainMessenger: cannot send value with custom gas token");
    }

    _sendMessage({
        _to: address(otherMessenger),
        _gasLimit: baseGas(_message, _minGasLimit),
        _value: msg.value,
        _data: abi.encodeWithSelector(
            this.relayMessage.selector, messageNonce(), msg.sender, _target, msg.value, _minGasLimit, _message
        )
    });

    emit SentMessage(_target, msg.sender, _message, messageNonce(), _minGasLimit);
    emit SentMessageExtension1(msg.sender, msg.value);

    unchecked {
        ++msgNonce;
    }
}

在调用_sendMessage 的时候,执行的是子类的_sendMessage

function _sendMessage(address _to, uint64 _gasLimit, uint256 _value, bytes memory _data) internal override {
    IL2ToL1MessagePasser(payable(Predeploys.L2_TO_L1_MESSAGE_PASSER)).initiateWithdrawal{ value: _value }(
        _to, _gasLimit, _data
    );
}

最终执行的是L2ToL1MessagePasser,逻辑是将执行交易的参数打包成hash,并发送事件,到这里,L2层的逻辑已经执行完毕。

function initiateWithdrawal(address _target, uint256 _gasLimit, bytes memory _data) public payable {
    bytes32 withdrawalHash = Hashing.hashWithdrawal(
        Types.WithdrawalTransaction({
            nonce: messageNonce(),
            sender: msg.sender,
            target: _target,
            value: msg.value,
            gasLimit: _gasLimit,
            data: _data
        })
    );

    sentMessages[withdrawalHash] = true;

    emit MessagePassed(messageNonce(), msg.sender, _target, msg.value, _gasLimit, _data, withdrawalHash);

    unchecked {
        ++msgNonce;
    }
}

链下执行逻辑

func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
    // 获取并判断需要提交的最新 L2 的 start 和 end 块号
    start, end, err := l.calculateL2BlockRangeToStore(ctx)
    if err != nil {
       l.Log.Warn("Error calculating L2 block range", "err", err)
       return err
    } else if start.Number >= end.Number {
       return errors.New("start number is >= end number")
    }

    var latestBlock *types.Block
    // 从起始区块开始获取区块信息,并将区块加到 channelManager 的 blocks
    for i := start.Number + 1; i < end.Number+1; i++ {
       //核心逻辑就是 l.loadBlockIntoState
       block, err := l.loadBlockIntoState(ctx, i)
       if errors.Is(err, ErrReorg) {
          l.Log.Warn("Found L2 reorg", "block_number", i)
          l.lastStoredBlock = eth.BlockID{}
          return err
       } else if err != nil {
          l.Log.Warn("Failed to load block into state", "err", err)
          return err
       }
       l.lastStoredBlock = eth.ToBlockID(block)
       latestBlock = block
    }
    // 提取基本的 L2BlockRef 信息
    l2ref, err := derive.L2BlockToBlockRef(l.RollupConfig, latestBlock)
    if err != nil {
       l.Log.Warn("Invalid L2 block loaded into state", "err", err)
       return err
    }
    // 将L2BlockRef 加载到当前状态根中
    l.Metr.RecordL2BlocksLoaded(l2ref)
    return nil
}
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
    // 获取当前 Layer 1 的最新区块(tip)
    l1tip, err := l.l1Tip(ctx)
    if err != nil {
       l.Log.Error("Failed to query L1 tip", "err", err)
       return err
    }
    // 记录当前的 l1tip
    l.recordL1Tip(l1tip)

    // 从状态中获取与当前 L1 tip 相关的交易数据。这一步比较关键,来看一下逻辑
    txdata, err := l.state.TxData(l1tip.ID())

    if err == io.EOF {
       l.Log.Trace("No transaction data available")
       return err
    } else if err != nil {
       l.Log.Error("Unable to get tx data", "err", err)
       return err
    }

    // 发送交易数据到L1  
    if err = l.sendTransaction(txdata, queue, receiptsCh, daGroup); err != nil {
       return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
    }
    return nil
}

获取与当前L1 tip 相关的交易数据

func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 上面的代码逻辑是设置互斥锁
    var firstWithTxData *channel
    // 寻找第一个包含交易数据的通道
    for _, ch := range s.channelQueue {
       if ch.HasTxData() {
          firstWithTxData = ch
          break
       }
    }

    dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
    s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

    // 存在待处理数据或达成短路条件,则调用 nextTxData(firstWithTxData) 返回该通道的交易数据
    if dataPending || s.closed {
       return s.nextTxData(firstWithTxData)
    }

    // 没有待处理数据,我们可以添加一个新块到channel,同时返回一个EOF
    if len(s.blocks) == 0 {
       return txData{}, io.EOF
    }

    // 确保当前有足够的空间处理新块
    if err := s.ensureChannelWithSpace(l1Head); err != nil {
       return txData{}, err
    }

    // 处理待处理的块
    if err := s.processBlocks(); err != nil {
       return txData{}, err
    }

    // 处理完所有待处理的块后,注册当前的 L1 头
    s.registerL1Block(l1Head)

    // 将处理后的数据输出
    if err := s.outputFrames(); err != nil {
       return txData{}, err
    }
    // 返回当前通道的交易数据
    return s.nextTxData(s.currentChannel)
}

op-proposer 逻辑,发送状态根

func (l *L2OutputSubmitter) FetchL2OOOutput(ctx context.Context) (*eth.OutputResponse, bool, error) {
    if l.l2ooContract == nil {
       return nil, false, fmt.Errorf("L2OutputOracle contract not set, cannot fetch next output info")
    }

    cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
    defer cancel()
    callOpts := &bind.CallOpts{
       From:    l.Txmgr.From(),
       Context: cCtx,
    }
    // 获取下一个检查点的区块号
    nextCheckpointBlockBig, err := l.l2ooContract.NextBlockNumber(callOpts)
    if err != nil {
       return nil, false, fmt.Errorf("querying next block number: %w", err)
    }
    nextCheckpointBlock := nextCheckpointBlockBig.Uint64()
    // 方法获取当前区块号
    currentBlockNumber, err := l.FetchCurrentBlockNumber(ctx)
    if err != nil {
       return nil, false, err
    }

    // 对比当前区块号和下一个检查点的区块号,确保不会在未来的时间提交区块
    if currentBlockNumber < nextCheckpointBlock {
       l.Log.Debug("Proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
       return nil, false, nil
    }
    //使用下一个检查点的区块号来获取输出信息
    output, err := l.FetchOutput(ctx, nextCheckpointBlock)
    if err != nil {
       return nil, false, fmt.Errorf("fetching output: %w", err)
    }

    // 检查输出信息的区块引用是否大于最终化的 L2 状态的区块号,且是否允许非最终化的状态
    if output.BlockRef.Number > output.Status.FinalizedL2.Number && (!l.Cfg.AllowNonFinalized || output.BlockRef.Number > output.Status.SafeL2.Number) {
       l.Log.Debug("Not proposing yet, L2 block is not ready for proposal",
          "l2_proposal", output.BlockRef,
          "l2_safe", output.Status.SafeL2,
          "l2_finalized", output.Status.FinalizedL2,
          "allow_non_finalized", l.Cfg.AllowNonFinalized)
       return output, false, nil
    }
    return output, true, nil
}
func (l *L2OutputSubmitter) proposeOutput(ctx context.Context, output *eth.OutputResponse) {
    cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
    defer cancel()
    //  如果上述的检查结果为true,则直接提交状态根transaction
    if err := l.sendTransaction(cCtx, output); err != nil {
       l.Log.Error("Failed to send proposal transaction",
          "err", err,
          "l1blocknum", output.Status.CurrentL1.Number,
          "l1blockhash", output.Status.CurrentL1.Hash,
          "l1head", output.Status.HeadL1.Number)
       return
    }
    l.Metr.RecordL2BlocksProposed(output.BlockRef)
}

至此,链下部分也已经处理完成

L1层的处理

在L2OutputOracle.proposeL2Output方法中

function proposeL2Output(
    bytes32 _outputRoot,
    uint256 _l2BlockNumber,
    bytes32 _l1BlockHash,
    uint256 _l1BlockNumber
)
    external
    payable
{
    // 校验发送人
    require(msg.sender == proposer, "L2OutputOracle: only the proposer address can propose new outputs");
    // 校验下一区块号
    require(
        _l2BlockNumber == nextBlockNumber(),
        "L2OutputOracle: block number must be equal to next expected block number"
    );
    // 校验区块时间
    require(
        computeL2Timestamp(_l2BlockNumber) < block.timestamp,
        "L2OutputOracle: cannot propose L2 output in the future"
    );

    require(_outputRoot != bytes32(0), "L2OutputOracle: L2 output proposal cannot be the zero hash");

    if (_l1BlockHash != bytes32(0)) {
        require(
            blockhash(_l1BlockNumber) == _l1BlockHash,
            "L2OutputOracle: block hash does not match the hash at the expected height"
        );
    }

    emit OutputProposed(_outputRoot, nextOutputIndex(), _l2BlockNumber, block.timestamp);
    // 将对应的状态根方入到l2Outputs中
    l2Outputs.push(
        Types.OutputProposal({
            outputRoot: _outputRoot,
            timestamp: uint128(block.timestamp),
            l2BlockNumber: uint128(_l2BlockNumber)
        })
    );
}

结合着时序图来看一下

L1-> L2 储值逻辑

储值的核心步骤

第一步 用户在L1层发起储值
第二步 用户会在L1链上经历几个核心步骤
1.先进入L1StandardBridge,执行_initiateETHDeposit
2.调用 CrossDomainMessenger 合约的 sendMessage
3.在CrossDomainMessenger.sendMessage 方法中,内部调用L1CrossDomainMessenger的_sendMessage方法,同时维护msgNonce
4.L1CrossDomainMessenger._sendMessage 会抛出TransactionDeposited 事件,至此,L1链执行处理完毕
第三步 链下,op-node监听到TransactionDeposited,构建交易的参数,并让op-geth调用L2StandardBridge的finalizeDeposit
第四步 finalizeDeposit执行完成之后,整个充值链路就完成了。

储值在L1层的源码

function depositETH(uint32 _minGasLimit, bytes calldata _extraData) external payable onlyEOA {
    _initiateETHDeposit(msg.sender, msg.sender, _minGasLimit, _extraData);
}

调用父类StandardBridge

function _initiateBridgeETH(
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    require(isCustomGasToken() == false, "StandardBridge: cannot bridge ETH with custom gas token");
    require(msg.value == _amount, "StandardBridge: bridging ETH must include sufficient ETH value");

    // Emit the correct events. By default this will be _amount, but child
    // contracts may override this function in order to emit legacy events as well.
    _emitETHBridgeInitiated(_from, _to, _amount, _extraData);
    // 发送message信息,进入的是CrossDomainMessenger合约中
    messenger.sendMessage{ value: _amount }({
        _target: address(otherBridge),
        _message: abi.encodeWithSelector(this.finalizeBridgeETH.selector, _from, _to, _amount, _extraData),
        _minGasLimit: _minGasLimit
    });
}

逻辑和提现一致,调用_sendMessage方法,此方法是执行子类L1CrossDomainMessenger的重写方法

function sendMessage(address _target, bytes calldata _message, uint32 _minGasLimit) external payable {
    if (isCustomGasToken()) {
        require(msg.value == 0, "CrossDomainMessenger: cannot send value with custom gas token");
    }

    // Triggers a message to the other messenger. Note that the amount of gas provided to the
    // message is the amount of gas requested by the user PLUS the base gas value. We want to
    // guarantee the property that the call to the target contract will always have at least
    // the minimum gas limit specified by the user.
    _sendMessage({
        _to: address(otherMessenger),
        _gasLimit: baseGas(_message, _minGasLimit),
        _value: msg.value,
        _data: abi.encodeWithSelector(
            this.relayMessage.selector, messageNonce(), msg.sender, _target, msg.value, _minGasLimit, _message
        )
    });

    emit SentMessage(_target, msg.sender, _message, messageNonce(), _minGasLimit);
    emit SentMessageExtension1(msg.sender, msg.value);

    unchecked {
        ++msgNonce;
    }
}
function _sendMessage(address _to, uint64 _gasLimit, uint256 _value, bytes memory _data) internal override {
    portal.depositTransaction{ value: _value }({
        _to: _to,
        _value: _value,
        _gasLimit: _gasLimit,
        _isCreation: false,
        _data: _data
    });
}

进入到OptimismPortal._depositTransaction方法

function _depositTransaction(
    address _to,
    uint256 _mint,
    uint256 _value,
    uint64 _gasLimit,
    bool _isCreation,
    bytes memory _data
)
    internal
{
    if (_isCreation && _to != address(0)) revert BadTarget();

    if (_gasLimit < minimumGasLimit(uint64(_data.length))) revert SmallGasLimit();

    if (_data.length > 120_000) revert LargeCalldata();

    // Transform the from-address to its alias if the caller is a contract.
    address from = msg.sender;
    if (msg.sender != tx.origin) {
        from = AddressAliasHelper.applyL1ToL2Alias(msg.sender);
    }

    // 对交易参数进行打包
    bytes memory opaqueData = abi.encodePacked(_mint, _value, _gasLimit, _isCreation, _data);

    // 发送存款事件
    emit TransactionDeposited(from, _to, DEPOSIT_VERSION, opaqueData);
}

以上,在L1层的存款逻辑处理完毕

链下处理

负责整合来自 L1 的信息、处理存款事务以及确保所有数据在时间和逻辑上的一致性。它确保生成的 L2 区块能够正确反映 L1 的状态

func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) {
    var l1Info eth.BlockInfo
    var depositTxs []hexutil.Bytes
    var seqNumber uint64

    sysConfig, err := ba.l2.SystemConfigByL2Hash(ctx, l2Parent.Hash)
    if err != nil {
       return nil, NewTemporaryError(fmt.Errorf("failed to retrieve L2 parent block: %w", err))
    }

    // If the L1 origin changed in this block, then we are in the first block of the epoch. In this
    // case we need to fetch all transaction receipts from the L1 origin block so we can scan for
    // user deposits.
    if l2Parent.L1Origin.Number != epoch.Number {
       info, receipts, err := ba.l1.FetchReceipts(ctx, epoch.Hash)
       if err != nil {
          return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
       }
       if l2Parent.L1Origin.Hash != info.ParentHash() {
          return nil, NewResetError(
             fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
                epoch, info.ParentHash(), l2Parent.L1Origin))
       }

       deposits, err := DeriveDeposits(receipts, ba.rollupCfg.DepositContractAddress)
       if err != nil {
          // deposits may never be ignored. Failing to process them is a critical error.
          return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err))
       }
       // apply sysCfg changes
       if err := UpdateSystemConfigWithL1Receipts(&sysConfig, receipts, ba.rollupCfg, info.Time()); err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to apply derived L1 sysCfg updates: %w", err))
       }

       l1Info = info
       depositTxs = deposits
       seqNumber = 0
    } else {
       if l2Parent.L1Origin.Hash != epoch.Hash {
          return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin))
       }
       info, err := ba.l1.InfoByHash(ctx, epoch.Hash)
       if err != nil {
          return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err))
       }
       l1Info = info
       depositTxs = nil
       seqNumber = l2Parent.SequenceNumber + 1
    }

    // Sanity check the L1 origin was correctly selected to maintain the time invariant between L1 and L2
    nextL2Time := l2Parent.Time + ba.rollupCfg.BlockTime
    if nextL2Time < l1Info.Time() {
       return nil, NewResetError(fmt.Errorf("cannot build L2 block on top %s for time %d before L1 origin %s at time %d",
          l2Parent, nextL2Time, eth.ToBlockID(l1Info), l1Info.Time()))
    }

    var upgradeTxs []hexutil.Bytes
    if ba.rollupCfg.IsEcotoneActivationBlock(nextL2Time) {
       upgradeTxs, err = EcotoneNetworkUpgradeTransactions()
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to build ecotone network upgrade txs: %w", err))
       }
    }

    if ba.rollupCfg.IsFjordActivationBlock(nextL2Time) {
       fjord, err := FjordNetworkUpgradeTransactions()
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to build fjord network upgrade txs: %w", err))
       }
       upgradeTxs = append(upgradeTxs, fjord...)
    }

    l1InfoTx, err := L1InfoDepositBytes(ba.rollupCfg, sysConfig, seqNumber, l1Info, nextL2Time)
    if err != nil {
       return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err))
    }

    var afterForceIncludeTxs []hexutil.Bytes
    if ba.rollupCfg.IsInterop(nextL2Time) {
       depositsCompleteTx, err := DepositsCompleteBytes(seqNumber, l1Info)
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to create depositsCompleteTx: %w", err))
       }
       afterForceIncludeTxs = append(afterForceIncludeTxs, depositsCompleteTx)
    }

    txs := make([]hexutil.Bytes, 0, 1+len(depositTxs)+len(afterForceIncludeTxs)+len(upgradeTxs))
    txs = append(txs, l1InfoTx)
    txs = append(txs, depositTxs...)
    txs = append(txs, afterForceIncludeTxs...)
    txs = append(txs, upgradeTxs...)

    var withdrawals *types.Withdrawals
    if ba.rollupCfg.IsCanyon(nextL2Time) {
       withdrawals = &types.Withdrawals{}
    }

    var parentBeaconRoot *common.Hash
    if ba.rollupCfg.IsEcotone(nextL2Time) {
       parentBeaconRoot = l1Info.ParentBeaconRoot()
       if parentBeaconRoot == nil { // default to zero hash if there is no beacon-block-root available
          parentBeaconRoot = new(common.Hash)
       }
    }

    return &eth.PayloadAttributes{
       Timestamp:             hexutil.Uint64(nextL2Time),
       PrevRandao:            eth.Bytes32(l1Info.MixDigest()),
       SuggestedFeeRecipient: predeploys.SequencerFeeVaultAddr,
       Transactions:          txs,
       NoTxPool:              true,
       GasLimit:              (*eth.Uint64Quantity)(&sysConfig.GasLimit),
       Withdrawals:           withdrawals,
       ParentBeaconBlockRoot: parentBeaconRoot,
    }, nil
}

L2层的最终处理逻辑,进行消息的转发

function relayMessage(
    uint256 _nonce,
    address _sender,
    address _target,
    uint256 _value,
    uint256 _minGasLimit,
    bytes calldata _message
)
    external
    payable
{
    // 确保状态不是暂停
    require(paused() == false, "CrossDomainMessenger: paused");

    // 确保版本正确
    (, uint16 version) = Encoding.decodeVersionedNonce(_nonce);
    require(version < 2, "CrossDomainMessenger: only version 0 or 1 messages are supported at this time");

    // 检查该消息是否已经被转发,防止重复转发
    if (version == 0) {
        bytes32 oldHash = Hashing.hashCrossDomainMessageV0(_target, _sender, _message, _nonce);
        require(successfulMessages[oldHash] == false, "CrossDomainMessenger: legacy withdrawal already relayed");
    }

    // 使用版本 1 的哈希作为消息的唯一标识符.
    bytes32 versionedHash =
        Hashing.hashCrossDomainMessageV1(_nonce, _sender, _target, _value, _minGasLimit, _message);

    if (_isOtherMessenger()) {
        // 确保 msg.value 与 _value 匹配
        assert(msg.value == _value);
        assert(!failedMessages[versionedHash]);
    } else {
        require(msg.value == 0, "CrossDomainMessenger: value must be zero unless message is from a system address");

        require(failedMessages[versionedHash], "CrossDomainMessenger: message cannot be replayed");
    }
    // 确保地址安全
    require(
        _isUnsafeTarget(_target) == false, "CrossDomainMessenger: cannot send message to blocked system address"
    );

    require(successfulMessages[versionedHash] == false, "CrossDomainMessenger: message has already been relayed");

    //确保有足够的燃气执行外部调用和完成执行,若不够,则将消息标记为失败
    if (
        !SafeCall.hasMinGas(_minGasLimit, RELAY_RESERVED_GAS + RELAY_GAS_CHECK_BUFFER)
            || xDomainMsgSender != Constants.DEFAULT_L2_SENDER
    ) {
        failedMessages[versionedHash] = true;
        emit FailedRelayedMessage(versionedHash);

        // Revert in this case if the transaction was triggered by the estimation address. This
        // should only be possible during gas estimation or we have bigger problems. Reverting
        // here will make the behavior of gas estimation change such that the gas limit
        // computed will be the amount required to relay the message, even if that amount is
        // greater than the minimum gas limit specified by the user.
        if (tx.origin == Constants.ESTIMATION_ADDRESS) {
            revert("CrossDomainMessenger: failed to relay message");
        }

        return;
    }
    // 最核心的逻辑,执行SafeCall.call来转发执行逻辑
    xDomainMsgSender = _sender;
    bool success = SafeCall.call(_target, gasleft() - RELAY_RESERVED_GAS, _value, _message);
    xDomainMsgSender = Constants.DEFAULT_L2_SENDER;

    // 根据执行结果处理最终的逻辑
    if (success) {
        assert(successfulMessages[versionedHash] == false);
        successfulMessages[versionedHash] = true;
        emit RelayedMessage(versionedHash);
    } else {
        failedMessages[versionedHash] = true;
        emit FailedRelayedMessage(versionedHash);

        if (tx.origin == Constants.ESTIMATION_ADDRESS) {
            revert("CrossDomainMessenger: failed to relay message");
        }
    }
}

时序图

参考文献

https://docs.optimism.io/stack/protocol/rollup/withdrawal-flow
https://learnblockchain.cn/article/9207

转载:https://learnblockchain.cn/article/9419

OPStack在EIP-4844中的升级

OPStack在EIP-4844中的升级

Ethereum的EIP-4844对layer2来说是一次巨大的变革,它显著降低了layer2在使用L1作为DA(数据可用性)的费用。本文不详细解析EIP-4844的具体内容,只简要介绍,作为我们了解OP-Stack更新的背景。

EIP-4844

TL;DR:
EIP-4844引入了一种称为“blob”的数据格式,这种格式的数据不参与EVM执行,而是存储在共识层,每个数据块的生命周期为4096个epoch(约18天)。blob存在于l1主网上,由新的type3 transaction携带,每个区块最多能容纳6个blob,每个transaction最多可以携带6个blob。

欲了解更多详情,请参考以下文章:

Ethereum Evolved: Dencun Upgrade Part 5, EIP-4844

EIP-4844, Blobs, and Blob Gas: What you need to know

Proto-Danksharding

OP-Stack的应用

OP-Stack在采用BLOB替换之前的CALLDATA作为rollup的数据存储方式后,费率直线下降
image

在OP-Stack的此次更新中,主要的业务逻辑变更涉及将原先通过calldata发送的数据转换为blob格式,并通过blob类型的交易发送到L1。此外,还涉及到从L1获取发送到rollup的数据时对blob的解析,以下是参与此次升级的主要组件:

  1. submitter —— 负责将rollup数据发送到L1的组件
  2. fetcher —— 将L1的数据(旧rollup数据/deposit交易等)同步到L2中
  3. blob相关定义与实现 —— 如何获取和结构blob数据等内容等
  4. 其他相关设计部分 —— 如客户端支持blob类型交易的签名、与fault proof相关的设计等

⚠️⚠️⚠️请注意,本文中所有涉及的代码均基于最初的PR设计,可能与实际生产环境中运行的代码存在差异。


Blob相关定义与编解码实现

Pull Request(8131) blob 定义

Pull Request(8767) encoding & decoding

定义blob

BlobSize        = 4096 * 32

type Blob [BlobSize]byte

blob encoding

official specs about blob encoding

需要注意的是,此specs对应的是最新版本的代码,而下方PR截取的代码则为最初的简化版本。主要区别在于:Blob类型被分为4096个字段元素,每个字段元素的最大大小受限于特定模的大小,即math.log2(BLS_MODULUS) = 254.8570894...,这意味着每个字段元素的大小不会超过254位,即31.75字节。最初的演示代码只使用了31字节,放弃了0.75字节的空间。而在最新版本的代码中,通过四个字段元素的联合作用,充分利用了每个字段元素的0.75字节空间,从而提高了数据的使用效率。
以下为Pull Request(8767)的部分截取代码
通过4096次循环,它读取总共31*4096字节的数据,这些数据随后被加入到blob中。

func (b *Blob) FromData(data Data) error {
    if len(data) > MaxBlobDataSize {
        return fmt.Errorf("data is too large for blob. len=%v", len(data))
    }
    b.Clear()
    // encode 4-byte little-endian length value into topmost 4 bytes (out of 31) of first field
    // element
    binary.LittleEndian.PutUint32(b[1:5], uint32(len(data)))
    // encode first 27 bytes of input data into remaining bytes of first field element
    offset := copy(b[5:32], data)
    // encode (up to) 31 bytes of remaining input data at a time into the subsequent field element
    for i := 1; i < 4096; i++ {
        offset += copy(b[i*32+1:i*32+32], data[offset:])
        if offset == len(data) {
            break
        }
    }
    if offset < len(data) {
        return fmt.Errorf("failed to fit all data into blob. bytes remaining: %v", len(data)-offset)
    }
    return nil
}

blob decoding

blob数据的解码,原理同上述的数据编码

func (b *Blob) ToData() (Data, error) {
    data := make(Data, 4096*32)
    for i := 0; i < 4096; i++ {
        if b[i*32] != 0 {
            return nil, fmt.Errorf("invalid blob, found non-zero high order byte %x of field element %d", b[i*32], i)
        }
        copy(data[i*31:i*31+31], b[i*32+1:i*32+32])
    }
    // extract the length prefix & trim the output accordingly
    dataLen := binary.LittleEndian.Uint32(data[:4])
    data = data[4:]
    if dataLen > uint32(len(data)) {
        return nil, fmt.Errorf("invalid blob, length prefix out of range: %d", dataLen)
    }
    data = data[:dataLen]
    return data, nil
}

Submiter

Pull Request(8769)

flag配置

switch c.DataAvailabilityType {
case flags.CalldataType:
case flags.BlobsType:
default:
    return fmt.Errorf("unknown data availability type: %v", c.DataAvailabilityType)
}

BatchSubmitter

BatchSubmitter的功能从之前仅发送calldata数据扩展为根据情况发送calldata或blob类型的数据。Blob类型的数据通过之前提到的FromData(blob-encode)函数在blobTxCandidate内部进行编码

func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
    // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
    data := txdata.Bytes()

    var candidate *txmgr.TxCandidate
    if l.Config.UseBlobs {
        var err error
        if candidate, err = l.blobTxCandidate(data); err != nil {
            // We could potentially fall through and try a calldata tx instead, but this would
            // likely result in the chain spending more in gas fees than it is tuned for, so best
            // to just fail. We do not expect this error to trigger unless there is a serious bug
            // or configuration issue.
            return fmt.Errorf("could not create blob tx candidate: %w", err)
        }
    } else {
        candidate = l.calldataTxCandidate(data)
    }

    intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
    if err != nil {
        // we log instead of return an error here because txmgr can do its own gas estimation
        l.Log.Error("Failed to calculate intrinsic gas", "err", err)
    } else {
        candidate.GasLimit = intrinsicGas
    }

    queue.Send(txdata, *candidate, receiptsCh)
    return nil
}

func (l *BatchSubmitter) blobTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
    var b eth.Blob
    if err := b.FromData(data); err != nil {
        return nil, fmt.Errorf("data could not be converted to blob: %w", err)
    }
    return &txmgr.TxCandidate{
        To:    &l.RollupConfig.BatchInboxAddress,
        Blobs: []*eth.Blob{&b},
    }, nil
}

Fetcher

Pull Request(9098)

GetBlob

GetBlob负责获取blob数据,其主要逻辑包括利用4096个字段元素构建完整的blob,并通过commitment验证构建的blob的正确性。
同时,GetBlob也参与了上层L1Retrieval中的逻辑流程

func (p *PreimageOracle) GetBlob(ref eth.L1BlockRef, blobHash eth.IndexedBlobHash) *eth.Blob {
    // Send a hint for the blob commitment & blob field elements.
    blobReqMeta := make([]byte, 16)
    binary.BigEndian.PutUint64(blobReqMeta[0:8], blobHash.Index)
    binary.BigEndian.PutUint64(blobReqMeta[8:16], ref.Time)
    p.hint.Hint(BlobHint(append(blobHash.Hash[:], blobReqMeta...)))

    commitment := p.oracle.Get(preimage.Sha256Key(blobHash.Hash))

    // Reconstruct the full blob from the 4096 field elements.
    blob := eth.Blob{}
    fieldElemKey := make([]byte, 80)
    copy(fieldElemKey[:48], commitment)
    for i := 0; i < params.BlobTxFieldElementsPerBlob; i++ {
        binary.BigEndian.PutUint64(fieldElemKey[72:], uint64(i))
        fieldElement := p.oracle.Get(preimage.BlobKey(crypto.Keccak256(fieldElemKey)))

        copy(blob[i<<5:(i+1)<<5], fieldElement[:])
    }

    blobCommitment, err := blob.ComputeKZGCommitment()
    if err != nil || !bytes.Equal(blobCommitment[:], commitment[:]) {
        panic(fmt.Errorf("invalid blob commitment: %w", err))
    }

    return &blob
}

其他杂项

除了以上几个主要模块外,还包含例如负责签署type3类型transaction的client sign模块,和fault proof相关涉及的模块,fault proof会在下一章节进行详细描述,这里就不过多赘述了

Pull Request(5452), fault proof相关

Pull Request(9182), client sign相关

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

op-proposer介绍

op-proposer介绍

在这一章节中,我们将探讨到底什么是op-proposer

首先分享下来自官方specs中的资源(source)

一句话概括性的描述proposer的作用:定期的将来自layer2上的状态根(state root)发送到layer1上,以便可以无需信任地直接在layer1层面上执行一些来自layer2的交易,如提款或者message通信。

在本章节中,将会以处理来自layer2的一笔layer1的提现交易为例子,讲解op-proposer在整个流程中的作用。

提现流程

在Optimism中,提现是从L2(如 OP Mainnet, OP Goerli)到L1(如 Ethereum mainnet, Goerli)的交易,可能附带或不附带资产。可以粗略的分为四笔交易:

  • 用户在L2提交的提现发起交易;
  • proposer将L2中的state root 通过发送交易的方式上传到L1当中,以供接下来步骤中用户中L1中使用
  • 用户在L1提交的提现证明交易,基于Merkle Patricia Trie,证明提现的合法性;
  • 错误挑战期过后,用户在L1提交的提现最终交易,实际运行L1交易,认领任何附加资产等;

具体详情可以查看官方对于这部分的描述(source)

什么是proposer

proposer是服务于在L1中需要用到L2部分数据时的连通器,通过proposer将这一部分L2的数据(state root)发送到L1的合约当中。L1的合约就可以通过合约调用的方式直接使用了。

注意⚠️:很多人认为proposer发送的state root代表这些设计的区块是finalized。这种理解是错误的。safe的区块在L1中经过两个epoch(64个区块)后即可认定为finalized
proposer是将finalized的区块数据上传,而不是上传后才finalized

proposer和batcher的区别

在之前我们讲解了batcher部分,batcher也是负责把L2的数据发送到L1中。你可能会有疑问,batcher不都已经把数据搬运到L1当中了,为什么还需要一个proposer再进行一次搬运呢?

区块状态不一致

batcher发送数据时,区块的状态还是unsafe状态,不能直接使用,且无法根据batcher的交易来判断区块的状态何时变成finalized状态。
proposer发送数据时,代表了相关区块已经达到了finalized阶段,可以最大程度的去相信并使用相关数据。

传递的数据格式和大小不同

batcher是将几乎完整的交易信息,包括gasprice,data等详细信息存储在layer1当中。
proposer只是将区块的state root发送到l1当中。state root后续配合merkle-tree的设计使用
batcher传递的数据是巨量,proposer是少量的。因此batcher的数据更适合放置在calldate中,便宜,但是不能直接被合约使用。proposer的数据存储在合约的storage当中,数据量少,成本不会很高,并且可以在合约交互中使用。

在以太坊中,数据存储calldata当中和存储在合约的storage当中的区别

在以太坊中,calldatastorage的区别主要有三方面:

  1. 持久性

    • storage:持久存储,数据永久保存。
    • calldata:临时存储,函数执行完毕后数据消失。
  2. 成本

    • storage:较贵,需永久存储数据。
    • calldata:较便宜,临时存储。
  3. 可访问性

    • storage:多个函数或事务中可访问。
    • calldata:仅当前函数执行期间可访问。

代码实现

在这部分我们会从代码层来进行深度的机制和实现原理的讲解

程序起点

op-proposer/proposer/l2_output_submitter.go

通过调用Start函数来启动loop循环,在loop的循环中,主要通过函数FetchNextOutputInfo负责查看下一个区块是否该发送proposal交易,如果需要发送,则直接调用sendTransaction函数发送到L1当作,如不需要发送,则进行下一次循环。

    func (l *L2OutputSubmitter) loop() {
        defer l.wg.Done()

        ctx := l.ctx

        ticker := time.NewTicker(l.pollInterval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                output, shouldPropose, err := l.FetchNextOutputInfo(ctx)
                if err != nil {
                    break
                }
                if !shouldPropose {
                    break
                }
                cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
                if err := l.sendTransaction(cCtx, output); err != nil {
                    l.log.Error("Failed to send proposal transaction",
                        "err", err,
                        "l1blocknum", output.Status.CurrentL1.Number,
                        "l1blockhash", output.Status.CurrentL1.Hash,
                        "l1head", output.Status.HeadL1.Number)
                    cancel()
                    break
                }
                l.metr.RecordL2BlocksProposed(output.BlockRef)
                cancel()

            case <-l.done:
                return
            }
        }
    }

获取output

op-proposer/proposer/l2_output_submitter.go

FetchNextOutputInfo函数通过调用l2ooContract合约来获取下一次该发送proposal的区块数,再将该区块块号和当前L2度区块块号进行比较,来判断是否应该发送proposal交易。如果需要发送,则调用fetchOutput函数来生成output

    func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
        cCtx, cancel := context.WithTimeout(ctx, l.networkTimeout)
        defer cancel()
        callOpts := &bind.CallOpts{
            From:    l.txMgr.From(),
            Context: cCtx,
        }
        nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts)
        if err != nil {
            l.log.Error("proposer unable to get next block number", "err", err)
            return nil, false, err
        }
        // Fetch the current L2 heads
        cCtx, cancel = context.WithTimeout(ctx, l.networkTimeout)
        defer cancel()
        status, err := l.rollupClient.SyncStatus(cCtx)
        if err != nil {
            l.log.Error("proposer unable to get sync status", "err", err)
            return nil, false, err
        }

        // Use either the finalized or safe head depending on the config. Finalized head is default & safer.
        var currentBlockNumber *big.Int
        if l.allowNonFinalized {
            currentBlockNumber = new(big.Int).SetUint64(status.SafeL2.Number)
        } else {
            currentBlockNumber = new(big.Int).SetUint64(status.FinalizedL2.Number)
        }
        // Ensure that we do not submit a block in the future
        if currentBlockNumber.Cmp(nextCheckpointBlock) < 0 {
            l.log.Debug("proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
            return nil, false, nil
        }

        return l.fetchOutput(ctx, nextCheckpointBlock)
    }

fetchOutput函数在内部间接通过OutputV0AtBlock函数来获取并处理output返回体

op-service/sources/l2_client.go

OutputV0AtBlock函数获取之前检索出来需要传递proposal的区块哈希来拿到区块头,再根据这个区块头派生OutputV0所需要的数据。其中通过GetProof函数获取的的proof中的StorageHash(withdrawal_storage_root)的作用是,如果只需要L2ToL1MessagePasserAddr相关的state的数据的话,withdrawal_storage_root可以大幅度减小整个默克尔树证明过程的大小。

    func (s *L2Client) OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error) {
        head, err := s.InfoByHash(ctx, blockHash)
        if err != nil {
            return nil, fmt.Errorf("failed to get L2 block by hash: %w", err)
        }
        if head == nil {
            return nil, ethereum.NotFound
        }

        proof, err := s.GetProof(ctx, predeploys.L2ToL1MessagePasserAddr, []common.Hash{}, blockHash.String())
        if err != nil {
            return nil, fmt.Errorf("failed to get contract proof at block %s: %w", blockHash, err)
        }
        if proof == nil {
            return nil, fmt.Errorf("proof %w", ethereum.NotFound)
        }
        // make sure that the proof (including storage hash) that we retrieved is correct by verifying it against the state-root
        if err := proof.Verify(head.Root()); err != nil {
            return nil, fmt.Errorf("invalid withdrawal root hash, state root was %s: %w", head.Root(), err)
        }
        stateRoot := head.Root()
        return &eth.OutputV0{
            StateRoot:                eth.Bytes32(stateRoot),
            MessagePasserStorageRoot: eth.Bytes32(proof.StorageHash),
            BlockHash:                blockHash,
        }, nil
    }

发送output

op-proposer/proposer/l2_output_submitter.go

sendTransaction函数中会间接调用proposeL2OutputTxData函数去使用L1链上合约的ABI来将我们的output与合约函数的入参格式进行匹配。随后sendTransaction函数将包装好的数据发送到L1上,与L2OutputOracle合约交互。

    func proposeL2OutputTxData(abi *abi.ABI, output *eth.OutputResponse) ([]byte, error) {
        return abi.Pack(
            "proposeL2Output",
            output.OutputRoot,
            new(big.Int).SetUint64(output.BlockRef.Number),
            output.Status.CurrentL1.Hash,
            new(big.Int).SetUint64(output.Status.CurrentL1.Number))
    }

packages/contracts-bedrock/src/L1/L2OutputOracle.sol

L2OutputOracle合约通过将此来自L2区块的state root进行校验,并存入合约的storage当中。

    /// @notice Accepts an outputRoot and the timestamp of the corresponding L2 block.
    ///         The timestamp must be equal to the current value returned by `nextTimestamp()` in
    ///         order to be accepted. This function may only be called by the Proposer.
    /// @param _outputRoot    The L2 output of the checkpoint block.
    /// @param _l2BlockNumber The L2 block number that resulted in _outputRoot.
    /// @param _l1BlockHash   A block hash which must be included in the current chain.
    /// @param _l1BlockNumber The block number with the specified block hash.
    function proposeL2Output(
        bytes32 _outputRoot,
        uint256 _l2BlockNumber,
        bytes32 _l1BlockHash,
        uint256 _l1BlockNumber
    )
        external
        payable
    {
        require(msg.sender == proposer, "L2OutputOracle: only the proposer address can propose new outputs");

        require(
            _l2BlockNumber == nextBlockNumber(),
            "L2OutputOracle: block number must be equal to next expected block number"
        );

        require(
            computeL2Timestamp(_l2BlockNumber) < block.timestamp,
            "L2OutputOracle: cannot propose L2 output in the future"
        );

        require(_outputRoot != bytes32(0), "L2OutputOracle: L2 output proposal cannot be the zero hash");

        if (_l1BlockHash != bytes32(0)) {
            // This check allows the proposer to propose an output based on a given L1 block,
            // without fear that it will be reorged out.
            // It will also revert if the blockheight provided is more than 256 blocks behind the
            // chain tip (as the hash will return as zero). This does open the door to a griefing
            // attack in which the proposer's submission is censored until the block is no longer
            // retrievable, if the proposer is experiencing this attack it can simply leave out the
            // blockhash value, and delay submission until it is confident that the L1 block is
            // finalized.
            require(
                blockhash(_l1BlockNumber) == _l1BlockHash,
                "L2OutputOracle: block hash does not match the hash at the expected height"
            );
        }

        emit OutputProposed(_outputRoot, nextOutputIndex(), _l2BlockNumber, block.timestamp);

        l2Outputs.push(
            Types.OutputProposal({
                outputRoot: _outputRoot,
                timestamp: uint128(block.timestamp),
                l2BlockNumber: uint128(_l2BlockNumber)
            })
        );
    }

总结

proposer的总体实现思路与逻辑相对简单,即定期循环从L1中读取下次需要发送proposal的L2区块并与本地L2区块比较,并负责将数据处理并发送到L1当中。其他在提款过程中的其他交易流程大部分由SDK负责,可以详细阅读我们之前推送的官方对于提款过程部分的描述(source)。
如果想要查看在主网中proposer的实际行为,可以查看此proposer address

opstack是如何从Layer1中派生出来Layer2的

opstack是如何从Layer1中派生出来Layer2的

在阅读本文章之前,我强烈建议你先阅读一下来自optimism/specs中有关派生部分的介绍(source)
如果你看完这篇文章,感到迷茫,这是正常的。但是还是请记住这份感觉,因为在看完我们这篇文章的分析之后,请你回过来头再看一遍,你就会发现这篇官方的文章真的很凝练,把所有要点和细节都精炼的阐述了一遍。

接下来让我们进入文章正题。我们都知道layer2的运行节点,是可以从DA层(layer1)中获取数据,并且构建出完整的区块数据的。今天我们就来讲解一下这个过程中是如何在codebase中实现的。

你需要有的问题

如果现在让你设计这样一套系统,你会怎么设计呢?你会有哪些问题?在这里我列出来了一些问题,带着这些问题去思考会帮助你更好的理解整篇文章

  • 当你启动一个新节点的时候,整个系统是如何运行的?
  • 你需要一个个去查询所有l1的区块数据吗?如何触发查询?
  • 当拿到l1区块的数据后,你需要哪些数据?
  • 派生过程中,区块的状态是怎么变化的?如何从unsafe变成safe再变成finalized
  • 官方specs中晦涩的数据结构 batch/channel/frame 这些到底是干嘛的?(可以在上一章03-how-batcher-works章节中详细理解)

什么是派生(derivation)?

在理解derivation前,我们先来聊一聊optimism的基本rollup机制,这里我们简单以一笔l2上的transfer交易为例。

当你在optimism网络上发出一笔转账交易,这笔交易会被"转发"给sequencer节点,由sequencer进行排序,然后进行区块的封装并进行区块的广播,这里可以理解为出块。我们把这个包含你交易的区块称为区块A。这时的区块A状态为unsafe。接下来等sequencer达到一定的时间间隔了(比如4分钟),会由sequencer中的batcher的模块把这四分钟内所有收集到的交易(包括你这笔转账交易)通过一笔交易发送到l1上,并由l1产出区块X。这时的区块A状态仍然为unsafe。当任何一个节点执行derivation部分的程序后,此节点从l1中获取区块X的数据,并对本地l2的unsafe区块A进行更新。这时的区块A状态为safe。在经过l1两个epoch(64个区块)后,由l2节点将区块A标记为finalized区块。

而派生就是把角色带入到上述例子的l2节点当中,通过不断的并行执行derivation程序将获取的unsafe区块逐步变成safe区块,同时把已经是safe的区块逐步变成finalized状态的一个过程。

代码层深潜

hoho 船长,让我们深潜

batcher工作原理

batcher工作原理

在这一章节中,我们将探讨到底什么是batcher ⚙️
官方specs中有batcher的介绍(source)

在进行之前,我们先提出几个问题,通过这两个问题来真正理解batcher的作用以及工作原理

  • batcher是什么?它为什么叫做batcher
  • batcher在代码中到底是怎么运行的?

前置知识

  • 在rollup机制中,要想做到的去中心化特性,例如抗审查等。我们必须要把layer2上发生的数据(transactions)全部发送到layer1当中。这样就可以在利用layer1的安全性的同时,又可以完全从layer1中构建出来整个layer2的数据,使得layer2才真正的具有有效性。
  • Epochs and the Sequencing Window:Epoch可以简单理解为L1新的一个区块(N+1)生成的这段时间。epoch的编号等于L1区块N的编号,在L1区块N -> N+1 这段时间内产生的所有L2区块都属于epoch N。在上个概念中我们提到必须上传L2的数据到L1中,那么我们应该在什么范围内上传数据才是有效的呢,Sequencing Window的size给了我们答案,即区块N/epoch N的相关数据,必须在L1的第N + size之前已经上传到L1了。
  • Batch/Batcher Transaction: Batch可以简单理解为每一个L2区块构建所需要的交易。Batcher Transaction为多个batch组合起来经过加工后发送到L1的那笔交易
  • Channe: channel可以简单理解为是batch的组合,组合是为了获得更好的压缩率,从而降低数据可用性成本,以使batcher上传的成本进一步降低。
  • Frame: frame可以理解为,有时候为了更好的压缩率,可能会导致channel数据过大而不能直接被batcher将整个channel发送给L1,因此需要对channel进行切割,分多次进行发送。

什么是batcher

在rollup中,需要一个角色来传递L2信息到L1当中,同时每当有新的交易就马上发送是昂贵且不方便管理的。这时候我们将需要制定一种合理的批量上传策略。因此,为了解决这个问题,batcher出现了。batcher是唯一存在(sequencer当前掌管私钥),且和特定地址发送Batcher Transaction来传递L2信息的组件。

batcher通过对unsafe区块数据进行收集,来获取多个batch,在这里每个区块都对应一个batch。当收集足够的batch进行高效压缩后生成channel,并以frame的形式发送到L1来完成L2的信息上传。

代码实现

在这部分我们会从代码层来进行深度的机制和实现原理的讲解

程序起点

op-batcher/batcher/driver.go

通过调用Start函数来启动loop循环,在loop的循环中,主要处理三件事

  • 当定时器触发时,将所有新的还未加载的L2block加载进来,然后触发publishStateToL1函数向L1进行state发布
  • 处理receipts,记录成功或者失败状态
  • 处理关闭请求
    func (l *BatchSubmitter) Start() error {
        l.log.Info("Starting Batch Submitter")

        l.mutex.Lock()
        defer l.mutex.Unlock()

        if l.running {
            return errors.New("batcher is already running")
        }
        l.running = true

        l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
        l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
        l.state.Clear()
        l.lastStoredBlock = eth.BlockID{}

        l.wg.Add(1)
        go l.loop()

        l.log.Info("Batch Submitter started")

        return nil
    }
    func (l *BatchSubmitter) loop() {
        defer l.wg.Done()

        ticker := time.NewTicker(l.PollInterval)
        defer ticker.Stop()

        receiptsCh := make(chan txmgr.TxReceipt[txData])
        queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)

        for {
            select {
            case <-ticker.C:
                if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
                    err := l.state.Close()
                    if err != nil {
                        l.log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
                    }
                    l.publishStateToL1(queue, receiptsCh, true)
                    l.state.Clear()
                    continue
                }
                l.publishStateToL1(queue, receiptsCh, false)
            case r := <-receiptsCh:
                l.handleReceipt(r)
            case <-l.shutdownCtx.Done():
                err := l.state.Close()
                if err != nil {
                    l.log.Error("error closing the channel manager", "err", err)
                }
                l.publishStateToL1(queue, receiptsCh, true)
                return
            }
        }
    }

加载最新区块数据

op-batcher/batcher/driver.go

loadBlocksIntoState函数调用calculateL2BlockRangeToStore来获取自上次发送batch transaction而派生的最新safeblock后新生成的unsafeblock范围。然后循环将这个范围中的每一个unsafe块调用loadBlockIntoState函数从L2里获取并通过AddL2Block函数加载到内部的block队列里。等待进一步处理。

    func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
        start, end, err := l.calculateL2BlockRangeToStore(ctx)
        ……
        var latestBlock *types.Block
        // Add all blocks to "state"
        for i := start.Number + 1; i < end.Number+1; i++ {
            block, err := l.loadBlockIntoState(ctx, i)
            if errors.Is(err, ErrReorg) {
                l.log.Warn("Found L2 reorg", "block_number", i)
                l.lastStoredBlock = eth.BlockID{}
                return err
            } else if err != nil {
                l.log.Warn("failed to load block into state", "err", err)
                return err
            }
            l.lastStoredBlock = eth.ToBlockID(block)
            latestBlock = block
        }
        ……
    }
    func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
        ……
        block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
        ……
        if err := l.state.AddL2Block(block); err != nil {
            return nil, fmt.Errorf("adding L2 block to state: %w", err)
        }
        ……
        return block, nil
    }

将加载的block数据处理,并发送到layer1

op-batcher/batcher/driver.go

publishTxToL1函数使用TxData函数对之前加载到数据进行处理,并调用sendTransaction函数发送到L1

    func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
        // send all available transactions
        l1tip, err := l.l1Tip(ctx)
        if err != nil {
            l.log.Error("Failed to query L1 tip", "error", err)
            return err
        }
        l.recordL1Tip(l1tip)

        // Collect next transaction data
        txdata, err := l.state.TxData(l1tip.ID())
        if err == io.EOF {
            l.log.Trace("no transaction data available")
            return err
        } else if err != nil {
            l.log.Error("unable to get tx data", "err", err)
            return err
        }

        l.sendTransaction(txdata, queue, receiptsCh)
        return nil
    }

TxData详解

op-batcher/batcher/channel_manager.go

TxData函数主要负责两件事务

  • 查找第一个含有frame的的channel,如果存在且通过检查后使用nextTxData获取数据并返回
  • 如果没有这样的channel,我们需要现调用ensureChannelWithSpace检查channel还有剩余的空间,再使用processBlocks将之前加载到block队列中的数据构造到 outchannel的composer当中压缩
  • outputFramesoutchannel composer当中的数据切割成适合大小的frame
  • 最后再把刚构造到数据通过nextTxData函数返回出去。

EnsureChannelWithSpace 确保 currentChannel 填充有可容纳更多数据的空间的channel(即,channel.IsFull 返回 false)。 如果 currentChannel 为零或已满,则会创建一个新channel

    func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
        s.mu.Lock()
        defer s.mu.Unlock()
        var firstWithFrame *channel
        for _, ch := range s.channelQueue {
            if ch.HasFrame() {
                firstWithFrame = ch
                break
            }
        }

        dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
        s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))

        // Short circuit if there is a pending frame or the channel manager is closed.
        if dataPending || s.closed {
            return s.nextTxData(firstWithFrame)
        }

        // No pending frame, so we have to add new blocks to the channel

        // If we have no saved blocks, we will not be able to create valid frames
        if len(s.blocks) == 0 {
            return txData{}, io.EOF
        }

        if err := s.ensureChannelWithSpace(l1Head); err != nil {
            return txData{}, err
        }

        if err := s.processBlocks(); err != nil {
            return txData{}, err
        }

        // Register current L1 head only after all pending blocks have been
        // processed. Even if a timeout will be triggered now, it is better to have
        // all pending blocks be included in this channel for submission.
        s.registerL1Block(l1Head)

        if err := s.outputFrames(); err != nil {
            return txData{}, err
        }

        return s.nextTxData(s.currentChannel)
    }

processBlocks函数在内部通过AddBlockblock队列里的block加入到当前的channel当中

    func (s *channelManager) processBlocks() error {
        var (
            blocksAdded int
            _chFullErr  *ChannelFullError // throw away, just for type checking
            latestL2ref eth.L2BlockRef
        )
        for i, block := range s.blocks {
            l1info, err := s.currentChannel.AddBlock(block)
            if errors.As(err, &_chFullErr) {
                // current block didn't get added because channel is already full
                break
            } else if err != nil {
                return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
            }
            s.log.Debug("Added block to channel", "channel", s.currentChannel.ID(), "block", block)

            blocksAdded += 1
            latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
            s.metr.RecordL2BlockInChannel(block)
            // current block got added but channel is now full
            if s.currentChannel.IsFull() {
                break
            }
        }

AddBlock 首先通过BlockToBatchbatchblcok中获取出来,再通过AddBatch函数对数据进行压缩并存储。

    func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error) {
        if c.IsFull() {
            return derive.L1BlockInfo{}, c.FullErr()
        }

        batch, l1info, err := derive.BlockToBatch(block)
        if err != nil {
            return l1info, fmt.Errorf("converting block to batch: %w", err)
        }

        if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
            c.setFullErr(err)
            return l1info, c.FullErr()
        } else if err != nil {
            return l1info, fmt.Errorf("adding block to channel out: %w", err)
        }
        c.blocks = append(c.blocks, block)
        c.updateSwTimeout(batch)

        if err = c.co.FullErr(); err != nil {
            c.setFullErr(err)
            // Adding this block still worked, so don't return error, just mark as full
        }

        return l1info, nil
    }

txdata获取后,使用sendTransaction将整个数据发送到L1当中。

总结

在这一章节中,我们了解了什么是batcher并且了解了batcher的运行原理,你可以在这个 address中查看当前batcher的行为。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN