您正在查看: 2024年6月

OPStack在EIP-4844中的升级

OPStack在EIP-4844中的升级

Ethereum的EIP-4844对layer2来说是一次巨大的变革,它显著降低了layer2在使用L1作为DA(数据可用性)的费用。本文不详细解析EIP-4844的具体内容,只简要介绍,作为我们了解OP-Stack更新的背景。

EIP-4844

TL;DR:
EIP-4844引入了一种称为“blob”的数据格式,这种格式的数据不参与EVM执行,而是存储在共识层,每个数据块的生命周期为4096个epoch(约18天)。blob存在于l1主网上,由新的type3 transaction携带,每个区块最多能容纳6个blob,每个transaction最多可以携带6个blob。

欲了解更多详情,请参考以下文章:

Ethereum Evolved: Dencun Upgrade Part 5, EIP-4844

EIP-4844, Blobs, and Blob Gas: What you need to know

Proto-Danksharding

OP-Stack的应用

OP-Stack在采用BLOB替换之前的CALLDATA作为rollup的数据存储方式后,费率直线下降
image

在OP-Stack的此次更新中,主要的业务逻辑变更涉及将原先通过calldata发送的数据转换为blob格式,并通过blob类型的交易发送到L1。此外,还涉及到从L1获取发送到rollup的数据时对blob的解析,以下是参与此次升级的主要组件:

  1. submitter —— 负责将rollup数据发送到L1的组件
  2. fetcher —— 将L1的数据(旧rollup数据/deposit交易等)同步到L2中
  3. blob相关定义与实现 —— 如何获取和结构blob数据等内容等
  4. 其他相关设计部分 —— 如客户端支持blob类型交易的签名、与fault proof相关的设计等

⚠️⚠️⚠️请注意,本文中所有涉及的代码均基于最初的PR设计,可能与实际生产环境中运行的代码存在差异。


Blob相关定义与编解码实现

Pull Request(8131) blob 定义

Pull Request(8767) encoding & decoding

定义blob

BlobSize        = 4096 * 32

type Blob [BlobSize]byte

blob encoding

official specs about blob encoding

需要注意的是,此specs对应的是最新版本的代码,而下方PR截取的代码则为最初的简化版本。主要区别在于:Blob类型被分为4096个字段元素,每个字段元素的最大大小受限于特定模的大小,即math.log2(BLS_MODULUS) = 254.8570894...,这意味着每个字段元素的大小不会超过254位,即31.75字节。最初的演示代码只使用了31字节,放弃了0.75字节的空间。而在最新版本的代码中,通过四个字段元素的联合作用,充分利用了每个字段元素的0.75字节空间,从而提高了数据的使用效率。
以下为Pull Request(8767)的部分截取代码
通过4096次循环,它读取总共31*4096字节的数据,这些数据随后被加入到blob中。

func (b *Blob) FromData(data Data) error {
    if len(data) > MaxBlobDataSize {
        return fmt.Errorf("data is too large for blob. len=%v", len(data))
    }
    b.Clear()
    // encode 4-byte little-endian length value into topmost 4 bytes (out of 31) of first field
    // element
    binary.LittleEndian.PutUint32(b[1:5], uint32(len(data)))
    // encode first 27 bytes of input data into remaining bytes of first field element
    offset := copy(b[5:32], data)
    // encode (up to) 31 bytes of remaining input data at a time into the subsequent field element
    for i := 1; i < 4096; i++ {
        offset += copy(b[i*32+1:i*32+32], data[offset:])
        if offset == len(data) {
            break
        }
    }
    if offset < len(data) {
        return fmt.Errorf("failed to fit all data into blob. bytes remaining: %v", len(data)-offset)
    }
    return nil
}

blob decoding

blob数据的解码,原理同上述的数据编码

func (b *Blob) ToData() (Data, error) {
    data := make(Data, 4096*32)
    for i := 0; i < 4096; i++ {
        if b[i*32] != 0 {
            return nil, fmt.Errorf("invalid blob, found non-zero high order byte %x of field element %d", b[i*32], i)
        }
        copy(data[i*31:i*31+31], b[i*32+1:i*32+32])
    }
    // extract the length prefix & trim the output accordingly
    dataLen := binary.LittleEndian.Uint32(data[:4])
    data = data[4:]
    if dataLen > uint32(len(data)) {
        return nil, fmt.Errorf("invalid blob, length prefix out of range: %d", dataLen)
    }
    data = data[:dataLen]
    return data, nil
}

Submiter

Pull Request(8769)

flag配置

switch c.DataAvailabilityType {
case flags.CalldataType:
case flags.BlobsType:
default:
    return fmt.Errorf("unknown data availability type: %v", c.DataAvailabilityType)
}

BatchSubmitter

BatchSubmitter的功能从之前仅发送calldata数据扩展为根据情况发送calldata或blob类型的数据。Blob类型的数据通过之前提到的FromData(blob-encode)函数在blobTxCandidate内部进行编码

func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
    // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
    data := txdata.Bytes()

    var candidate *txmgr.TxCandidate
    if l.Config.UseBlobs {
        var err error
        if candidate, err = l.blobTxCandidate(data); err != nil {
            // We could potentially fall through and try a calldata tx instead, but this would
            // likely result in the chain spending more in gas fees than it is tuned for, so best
            // to just fail. We do not expect this error to trigger unless there is a serious bug
            // or configuration issue.
            return fmt.Errorf("could not create blob tx candidate: %w", err)
        }
    } else {
        candidate = l.calldataTxCandidate(data)
    }

    intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
    if err != nil {
        // we log instead of return an error here because txmgr can do its own gas estimation
        l.Log.Error("Failed to calculate intrinsic gas", "err", err)
    } else {
        candidate.GasLimit = intrinsicGas
    }

    queue.Send(txdata, *candidate, receiptsCh)
    return nil
}

func (l *BatchSubmitter) blobTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
    var b eth.Blob
    if err := b.FromData(data); err != nil {
        return nil, fmt.Errorf("data could not be converted to blob: %w", err)
    }
    return &txmgr.TxCandidate{
        To:    &l.RollupConfig.BatchInboxAddress,
        Blobs: []*eth.Blob{&b},
    }, nil
}

Fetcher

Pull Request(9098)

GetBlob

GetBlob负责获取blob数据,其主要逻辑包括利用4096个字段元素构建完整的blob,并通过commitment验证构建的blob的正确性。
同时,GetBlob也参与了上层L1Retrieval中的逻辑流程

func (p *PreimageOracle) GetBlob(ref eth.L1BlockRef, blobHash eth.IndexedBlobHash) *eth.Blob {
    // Send a hint for the blob commitment & blob field elements.
    blobReqMeta := make([]byte, 16)
    binary.BigEndian.PutUint64(blobReqMeta[0:8], blobHash.Index)
    binary.BigEndian.PutUint64(blobReqMeta[8:16], ref.Time)
    p.hint.Hint(BlobHint(append(blobHash.Hash[:], blobReqMeta...)))

    commitment := p.oracle.Get(preimage.Sha256Key(blobHash.Hash))

    // Reconstruct the full blob from the 4096 field elements.
    blob := eth.Blob{}
    fieldElemKey := make([]byte, 80)
    copy(fieldElemKey[:48], commitment)
    for i := 0; i < params.BlobTxFieldElementsPerBlob; i++ {
        binary.BigEndian.PutUint64(fieldElemKey[72:], uint64(i))
        fieldElement := p.oracle.Get(preimage.BlobKey(crypto.Keccak256(fieldElemKey)))

        copy(blob[i<<5:(i+1)<<5], fieldElement[:])
    }

    blobCommitment, err := blob.ComputeKZGCommitment()
    if err != nil || !bytes.Equal(blobCommitment[:], commitment[:]) {
        panic(fmt.Errorf("invalid blob commitment: %w", err))
    }

    return &blob
}

其他杂项

除了以上几个主要模块外,还包含例如负责签署type3类型transaction的client sign模块,和fault proof相关涉及的模块,fault proof会在下一章节进行详细描述,这里就不过多赘述了

Pull Request(5452), fault proof相关

Pull Request(9182), client sign相关

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

op-proposer介绍

op-proposer介绍

在这一章节中,我们将探讨到底什么是op-proposer

首先分享下来自官方specs中的资源(source)

一句话概括性的描述proposer的作用:定期的将来自layer2上的状态根(state root)发送到layer1上,以便可以无需信任地直接在layer1层面上执行一些来自layer2的交易,如提款或者message通信。

在本章节中,将会以处理来自layer2的一笔layer1的提现交易为例子,讲解op-proposer在整个流程中的作用。

提现流程

在Optimism中,提现是从L2(如 OP Mainnet, OP Goerli)到L1(如 Ethereum mainnet, Goerli)的交易,可能附带或不附带资产。可以粗略的分为四笔交易:

  • 用户在L2提交的提现发起交易;
  • proposer将L2中的state root 通过发送交易的方式上传到L1当中,以供接下来步骤中用户中L1中使用
  • 用户在L1提交的提现证明交易,基于Merkle Patricia Trie,证明提现的合法性;
  • 错误挑战期过后,用户在L1提交的提现最终交易,实际运行L1交易,认领任何附加资产等;

具体详情可以查看官方对于这部分的描述(source)

什么是proposer

proposer是服务于在L1中需要用到L2部分数据时的连通器,通过proposer将这一部分L2的数据(state root)发送到L1的合约当中。L1的合约就可以通过合约调用的方式直接使用了。

注意⚠️:很多人认为proposer发送的state root代表这些设计的区块是finalized。这种理解是错误的。safe的区块在L1中经过两个epoch(64个区块)后即可认定为finalized
proposer是将finalized的区块数据上传,而不是上传后才finalized

proposer和batcher的区别

在之前我们讲解了batcher部分,batcher也是负责把L2的数据发送到L1中。你可能会有疑问,batcher不都已经把数据搬运到L1当中了,为什么还需要一个proposer再进行一次搬运呢?

区块状态不一致

batcher发送数据时,区块的状态还是unsafe状态,不能直接使用,且无法根据batcher的交易来判断区块的状态何时变成finalized状态。
proposer发送数据时,代表了相关区块已经达到了finalized阶段,可以最大程度的去相信并使用相关数据。

传递的数据格式和大小不同

batcher是将几乎完整的交易信息,包括gasprice,data等详细信息存储在layer1当中。
proposer只是将区块的state root发送到l1当中。state root后续配合merkle-tree的设计使用
batcher传递的数据是巨量,proposer是少量的。因此batcher的数据更适合放置在calldate中,便宜,但是不能直接被合约使用。proposer的数据存储在合约的storage当中,数据量少,成本不会很高,并且可以在合约交互中使用。

在以太坊中,数据存储calldata当中和存储在合约的storage当中的区别

在以太坊中,calldatastorage的区别主要有三方面:

  1. 持久性

    • storage:持久存储,数据永久保存。
    • calldata:临时存储,函数执行完毕后数据消失。
  2. 成本

    • storage:较贵,需永久存储数据。
    • calldata:较便宜,临时存储。
  3. 可访问性

    • storage:多个函数或事务中可访问。
    • calldata:仅当前函数执行期间可访问。

代码实现

在这部分我们会从代码层来进行深度的机制和实现原理的讲解

程序起点

op-proposer/proposer/l2_output_submitter.go

通过调用Start函数来启动loop循环,在loop的循环中,主要通过函数FetchNextOutputInfo负责查看下一个区块是否该发送proposal交易,如果需要发送,则直接调用sendTransaction函数发送到L1当作,如不需要发送,则进行下一次循环。

    func (l *L2OutputSubmitter) loop() {
        defer l.wg.Done()

        ctx := l.ctx

        ticker := time.NewTicker(l.pollInterval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                output, shouldPropose, err := l.FetchNextOutputInfo(ctx)
                if err != nil {
                    break
                }
                if !shouldPropose {
                    break
                }
                cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
                if err := l.sendTransaction(cCtx, output); err != nil {
                    l.log.Error("Failed to send proposal transaction",
                        "err", err,
                        "l1blocknum", output.Status.CurrentL1.Number,
                        "l1blockhash", output.Status.CurrentL1.Hash,
                        "l1head", output.Status.HeadL1.Number)
                    cancel()
                    break
                }
                l.metr.RecordL2BlocksProposed(output.BlockRef)
                cancel()

            case <-l.done:
                return
            }
        }
    }

获取output

op-proposer/proposer/l2_output_submitter.go

FetchNextOutputInfo函数通过调用l2ooContract合约来获取下一次该发送proposal的区块数,再将该区块块号和当前L2度区块块号进行比较,来判断是否应该发送proposal交易。如果需要发送,则调用fetchOutput函数来生成output

    func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
        cCtx, cancel := context.WithTimeout(ctx, l.networkTimeout)
        defer cancel()
        callOpts := &bind.CallOpts{
            From:    l.txMgr.From(),
            Context: cCtx,
        }
        nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts)
        if err != nil {
            l.log.Error("proposer unable to get next block number", "err", err)
            return nil, false, err
        }
        // Fetch the current L2 heads
        cCtx, cancel = context.WithTimeout(ctx, l.networkTimeout)
        defer cancel()
        status, err := l.rollupClient.SyncStatus(cCtx)
        if err != nil {
            l.log.Error("proposer unable to get sync status", "err", err)
            return nil, false, err
        }

        // Use either the finalized or safe head depending on the config. Finalized head is default & safer.
        var currentBlockNumber *big.Int
        if l.allowNonFinalized {
            currentBlockNumber = new(big.Int).SetUint64(status.SafeL2.Number)
        } else {
            currentBlockNumber = new(big.Int).SetUint64(status.FinalizedL2.Number)
        }
        // Ensure that we do not submit a block in the future
        if currentBlockNumber.Cmp(nextCheckpointBlock) < 0 {
            l.log.Debug("proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
            return nil, false, nil
        }

        return l.fetchOutput(ctx, nextCheckpointBlock)
    }

fetchOutput函数在内部间接通过OutputV0AtBlock函数来获取并处理output返回体

op-service/sources/l2_client.go

OutputV0AtBlock函数获取之前检索出来需要传递proposal的区块哈希来拿到区块头,再根据这个区块头派生OutputV0所需要的数据。其中通过GetProof函数获取的的proof中的StorageHash(withdrawal_storage_root)的作用是,如果只需要L2ToL1MessagePasserAddr相关的state的数据的话,withdrawal_storage_root可以大幅度减小整个默克尔树证明过程的大小。

    func (s *L2Client) OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error) {
        head, err := s.InfoByHash(ctx, blockHash)
        if err != nil {
            return nil, fmt.Errorf("failed to get L2 block by hash: %w", err)
        }
        if head == nil {
            return nil, ethereum.NotFound
        }

        proof, err := s.GetProof(ctx, predeploys.L2ToL1MessagePasserAddr, []common.Hash{}, blockHash.String())
        if err != nil {
            return nil, fmt.Errorf("failed to get contract proof at block %s: %w", blockHash, err)
        }
        if proof == nil {
            return nil, fmt.Errorf("proof %w", ethereum.NotFound)
        }
        // make sure that the proof (including storage hash) that we retrieved is correct by verifying it against the state-root
        if err := proof.Verify(head.Root()); err != nil {
            return nil, fmt.Errorf("invalid withdrawal root hash, state root was %s: %w", head.Root(), err)
        }
        stateRoot := head.Root()
        return &eth.OutputV0{
            StateRoot:                eth.Bytes32(stateRoot),
            MessagePasserStorageRoot: eth.Bytes32(proof.StorageHash),
            BlockHash:                blockHash,
        }, nil
    }

发送output

op-proposer/proposer/l2_output_submitter.go

sendTransaction函数中会间接调用proposeL2OutputTxData函数去使用L1链上合约的ABI来将我们的output与合约函数的入参格式进行匹配。随后sendTransaction函数将包装好的数据发送到L1上,与L2OutputOracle合约交互。

    func proposeL2OutputTxData(abi *abi.ABI, output *eth.OutputResponse) ([]byte, error) {
        return abi.Pack(
            "proposeL2Output",
            output.OutputRoot,
            new(big.Int).SetUint64(output.BlockRef.Number),
            output.Status.CurrentL1.Hash,
            new(big.Int).SetUint64(output.Status.CurrentL1.Number))
    }

packages/contracts-bedrock/src/L1/L2OutputOracle.sol

L2OutputOracle合约通过将此来自L2区块的state root进行校验,并存入合约的storage当中。

    /// @notice Accepts an outputRoot and the timestamp of the corresponding L2 block.
    ///         The timestamp must be equal to the current value returned by `nextTimestamp()` in
    ///         order to be accepted. This function may only be called by the Proposer.
    /// @param _outputRoot    The L2 output of the checkpoint block.
    /// @param _l2BlockNumber The L2 block number that resulted in _outputRoot.
    /// @param _l1BlockHash   A block hash which must be included in the current chain.
    /// @param _l1BlockNumber The block number with the specified block hash.
    function proposeL2Output(
        bytes32 _outputRoot,
        uint256 _l2BlockNumber,
        bytes32 _l1BlockHash,
        uint256 _l1BlockNumber
    )
        external
        payable
    {
        require(msg.sender == proposer, "L2OutputOracle: only the proposer address can propose new outputs");

        require(
            _l2BlockNumber == nextBlockNumber(),
            "L2OutputOracle: block number must be equal to next expected block number"
        );

        require(
            computeL2Timestamp(_l2BlockNumber) < block.timestamp,
            "L2OutputOracle: cannot propose L2 output in the future"
        );

        require(_outputRoot != bytes32(0), "L2OutputOracle: L2 output proposal cannot be the zero hash");

        if (_l1BlockHash != bytes32(0)) {
            // This check allows the proposer to propose an output based on a given L1 block,
            // without fear that it will be reorged out.
            // It will also revert if the blockheight provided is more than 256 blocks behind the
            // chain tip (as the hash will return as zero). This does open the door to a griefing
            // attack in which the proposer's submission is censored until the block is no longer
            // retrievable, if the proposer is experiencing this attack it can simply leave out the
            // blockhash value, and delay submission until it is confident that the L1 block is
            // finalized.
            require(
                blockhash(_l1BlockNumber) == _l1BlockHash,
                "L2OutputOracle: block hash does not match the hash at the expected height"
            );
        }

        emit OutputProposed(_outputRoot, nextOutputIndex(), _l2BlockNumber, block.timestamp);

        l2Outputs.push(
            Types.OutputProposal({
                outputRoot: _outputRoot,
                timestamp: uint128(block.timestamp),
                l2BlockNumber: uint128(_l2BlockNumber)
            })
        );
    }

总结

proposer的总体实现思路与逻辑相对简单,即定期循环从L1中读取下次需要发送proposal的L2区块并与本地L2区块比较,并负责将数据处理并发送到L1当中。其他在提款过程中的其他交易流程大部分由SDK负责,可以详细阅读我们之前推送的官方对于提款过程部分的描述(source)。
如果想要查看在主网中proposer的实际行为,可以查看此proposer address

opstack是如何从Layer1中派生出来Layer2的

opstack是如何从Layer1中派生出来Layer2的

在阅读本文章之前,我强烈建议你先阅读一下来自optimism/specs中有关派生部分的介绍(source)
如果你看完这篇文章,感到迷茫,这是正常的。但是还是请记住这份感觉,因为在看完我们这篇文章的分析之后,请你回过来头再看一遍,你就会发现这篇官方的文章真的很凝练,把所有要点和细节都精炼的阐述了一遍。

接下来让我们进入文章正题。我们都知道layer2的运行节点,是可以从DA层(layer1)中获取数据,并且构建出完整的区块数据的。今天我们就来讲解一下这个过程中是如何在codebase中实现的。

你需要有的问题

如果现在让你设计这样一套系统,你会怎么设计呢?你会有哪些问题?在这里我列出来了一些问题,带着这些问题去思考会帮助你更好的理解整篇文章

  • 当你启动一个新节点的时候,整个系统是如何运行的?
  • 你需要一个个去查询所有l1的区块数据吗?如何触发查询?
  • 当拿到l1区块的数据后,你需要哪些数据?
  • 派生过程中,区块的状态是怎么变化的?如何从unsafe变成safe再变成finalized
  • 官方specs中晦涩的数据结构 batch/channel/frame 这些到底是干嘛的?(可以在上一章03-how-batcher-works章节中详细理解)

什么是派生(derivation)?

在理解derivation前,我们先来聊一聊optimism的基本rollup机制,这里我们简单以一笔l2上的transfer交易为例。

当你在optimism网络上发出一笔转账交易,这笔交易会被"转发"给sequencer节点,由sequencer进行排序,然后进行区块的封装并进行区块的广播,这里可以理解为出块。我们把这个包含你交易的区块称为区块A。这时的区块A状态为unsafe。接下来等sequencer达到一定的时间间隔了(比如4分钟),会由sequencer中的batcher的模块把这四分钟内所有收集到的交易(包括你这笔转账交易)通过一笔交易发送到l1上,并由l1产出区块X。这时的区块A状态仍然为unsafe。当任何一个节点执行derivation部分的程序后,此节点从l1中获取区块X的数据,并对本地l2的unsafe区块A进行更新。这时的区块A状态为safe。在经过l1两个epoch(64个区块)后,由l2节点将区块A标记为finalized区块。

而派生就是把角色带入到上述例子的l2节点当中,通过不断的并行执行derivation程序将获取的unsafe区块逐步变成safe区块,同时把已经是safe的区块逐步变成finalized状态的一个过程。

代码层深潜

hoho 船长,让我们深潜

batcher工作原理

batcher工作原理

在这一章节中,我们将探讨到底什么是batcher ⚙️
官方specs中有batcher的介绍(source)

在进行之前,我们先提出几个问题,通过这两个问题来真正理解batcher的作用以及工作原理

  • batcher是什么?它为什么叫做batcher
  • batcher在代码中到底是怎么运行的?

前置知识

  • 在rollup机制中,要想做到的去中心化特性,例如抗审查等。我们必须要把layer2上发生的数据(transactions)全部发送到layer1当中。这样就可以在利用layer1的安全性的同时,又可以完全从layer1中构建出来整个layer2的数据,使得layer2才真正的具有有效性。
  • Epochs and the Sequencing Window:Epoch可以简单理解为L1新的一个区块(N+1)生成的这段时间。epoch的编号等于L1区块N的编号,在L1区块N -> N+1 这段时间内产生的所有L2区块都属于epoch N。在上个概念中我们提到必须上传L2的数据到L1中,那么我们应该在什么范围内上传数据才是有效的呢,Sequencing Window的size给了我们答案,即区块N/epoch N的相关数据,必须在L1的第N + size之前已经上传到L1了。
  • Batch/Batcher Transaction: Batch可以简单理解为每一个L2区块构建所需要的交易。Batcher Transaction为多个batch组合起来经过加工后发送到L1的那笔交易
  • Channe: channel可以简单理解为是batch的组合,组合是为了获得更好的压缩率,从而降低数据可用性成本,以使batcher上传的成本进一步降低。
  • Frame: frame可以理解为,有时候为了更好的压缩率,可能会导致channel数据过大而不能直接被batcher将整个channel发送给L1,因此需要对channel进行切割,分多次进行发送。

什么是batcher

在rollup中,需要一个角色来传递L2信息到L1当中,同时每当有新的交易就马上发送是昂贵且不方便管理的。这时候我们将需要制定一种合理的批量上传策略。因此,为了解决这个问题,batcher出现了。batcher是唯一存在(sequencer当前掌管私钥),且和特定地址发送Batcher Transaction来传递L2信息的组件。

batcher通过对unsafe区块数据进行收集,来获取多个batch,在这里每个区块都对应一个batch。当收集足够的batch进行高效压缩后生成channel,并以frame的形式发送到L1来完成L2的信息上传。

代码实现

在这部分我们会从代码层来进行深度的机制和实现原理的讲解

程序起点

op-batcher/batcher/driver.go

通过调用Start函数来启动loop循环,在loop的循环中,主要处理三件事

  • 当定时器触发时,将所有新的还未加载的L2block加载进来,然后触发publishStateToL1函数向L1进行state发布
  • 处理receipts,记录成功或者失败状态
  • 处理关闭请求
    func (l *BatchSubmitter) Start() error {
        l.log.Info("Starting Batch Submitter")

        l.mutex.Lock()
        defer l.mutex.Unlock()

        if l.running {
            return errors.New("batcher is already running")
        }
        l.running = true

        l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
        l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
        l.state.Clear()
        l.lastStoredBlock = eth.BlockID{}

        l.wg.Add(1)
        go l.loop()

        l.log.Info("Batch Submitter started")

        return nil
    }
    func (l *BatchSubmitter) loop() {
        defer l.wg.Done()

        ticker := time.NewTicker(l.PollInterval)
        defer ticker.Stop()

        receiptsCh := make(chan txmgr.TxReceipt[txData])
        queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)

        for {
            select {
            case <-ticker.C:
                if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
                    err := l.state.Close()
                    if err != nil {
                        l.log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
                    }
                    l.publishStateToL1(queue, receiptsCh, true)
                    l.state.Clear()
                    continue
                }
                l.publishStateToL1(queue, receiptsCh, false)
            case r := <-receiptsCh:
                l.handleReceipt(r)
            case <-l.shutdownCtx.Done():
                err := l.state.Close()
                if err != nil {
                    l.log.Error("error closing the channel manager", "err", err)
                }
                l.publishStateToL1(queue, receiptsCh, true)
                return
            }
        }
    }

加载最新区块数据

op-batcher/batcher/driver.go

loadBlocksIntoState函数调用calculateL2BlockRangeToStore来获取自上次发送batch transaction而派生的最新safeblock后新生成的unsafeblock范围。然后循环将这个范围中的每一个unsafe块调用loadBlockIntoState函数从L2里获取并通过AddL2Block函数加载到内部的block队列里。等待进一步处理。

    func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
        start, end, err := l.calculateL2BlockRangeToStore(ctx)
        ……
        var latestBlock *types.Block
        // Add all blocks to "state"
        for i := start.Number + 1; i < end.Number+1; i++ {
            block, err := l.loadBlockIntoState(ctx, i)
            if errors.Is(err, ErrReorg) {
                l.log.Warn("Found L2 reorg", "block_number", i)
                l.lastStoredBlock = eth.BlockID{}
                return err
            } else if err != nil {
                l.log.Warn("failed to load block into state", "err", err)
                return err
            }
            l.lastStoredBlock = eth.ToBlockID(block)
            latestBlock = block
        }
        ……
    }
    func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
        ……
        block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
        ……
        if err := l.state.AddL2Block(block); err != nil {
            return nil, fmt.Errorf("adding L2 block to state: %w", err)
        }
        ……
        return block, nil
    }

将加载的block数据处理,并发送到layer1

op-batcher/batcher/driver.go

publishTxToL1函数使用TxData函数对之前加载到数据进行处理,并调用sendTransaction函数发送到L1

    func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
        // send all available transactions
        l1tip, err := l.l1Tip(ctx)
        if err != nil {
            l.log.Error("Failed to query L1 tip", "error", err)
            return err
        }
        l.recordL1Tip(l1tip)

        // Collect next transaction data
        txdata, err := l.state.TxData(l1tip.ID())
        if err == io.EOF {
            l.log.Trace("no transaction data available")
            return err
        } else if err != nil {
            l.log.Error("unable to get tx data", "err", err)
            return err
        }

        l.sendTransaction(txdata, queue, receiptsCh)
        return nil
    }

TxData详解

op-batcher/batcher/channel_manager.go

TxData函数主要负责两件事务

  • 查找第一个含有frame的的channel,如果存在且通过检查后使用nextTxData获取数据并返回
  • 如果没有这样的channel,我们需要现调用ensureChannelWithSpace检查channel还有剩余的空间,再使用processBlocks将之前加载到block队列中的数据构造到 outchannel的composer当中压缩
  • outputFramesoutchannel composer当中的数据切割成适合大小的frame
  • 最后再把刚构造到数据通过nextTxData函数返回出去。

EnsureChannelWithSpace 确保 currentChannel 填充有可容纳更多数据的空间的channel(即,channel.IsFull 返回 false)。 如果 currentChannel 为零或已满,则会创建一个新channel

    func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
        s.mu.Lock()
        defer s.mu.Unlock()
        var firstWithFrame *channel
        for _, ch := range s.channelQueue {
            if ch.HasFrame() {
                firstWithFrame = ch
                break
            }
        }

        dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
        s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))

        // Short circuit if there is a pending frame or the channel manager is closed.
        if dataPending || s.closed {
            return s.nextTxData(firstWithFrame)
        }

        // No pending frame, so we have to add new blocks to the channel

        // If we have no saved blocks, we will not be able to create valid frames
        if len(s.blocks) == 0 {
            return txData{}, io.EOF
        }

        if err := s.ensureChannelWithSpace(l1Head); err != nil {
            return txData{}, err
        }

        if err := s.processBlocks(); err != nil {
            return txData{}, err
        }

        // Register current L1 head only after all pending blocks have been
        // processed. Even if a timeout will be triggered now, it is better to have
        // all pending blocks be included in this channel for submission.
        s.registerL1Block(l1Head)

        if err := s.outputFrames(); err != nil {
            return txData{}, err
        }

        return s.nextTxData(s.currentChannel)
    }

processBlocks函数在内部通过AddBlockblock队列里的block加入到当前的channel当中

    func (s *channelManager) processBlocks() error {
        var (
            blocksAdded int
            _chFullErr  *ChannelFullError // throw away, just for type checking
            latestL2ref eth.L2BlockRef
        )
        for i, block := range s.blocks {
            l1info, err := s.currentChannel.AddBlock(block)
            if errors.As(err, &_chFullErr) {
                // current block didn't get added because channel is already full
                break
            } else if err != nil {
                return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
            }
            s.log.Debug("Added block to channel", "channel", s.currentChannel.ID(), "block", block)

            blocksAdded += 1
            latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
            s.metr.RecordL2BlockInChannel(block)
            // current block got added but channel is now full
            if s.currentChannel.IsFull() {
                break
            }
        }

AddBlock 首先通过BlockToBatchbatchblcok中获取出来,再通过AddBatch函数对数据进行压缩并存储。

    func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error) {
        if c.IsFull() {
            return derive.L1BlockInfo{}, c.FullErr()
        }

        batch, l1info, err := derive.BlockToBatch(block)
        if err != nil {
            return l1info, fmt.Errorf("converting block to batch: %w", err)
        }

        if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
            c.setFullErr(err)
            return l1info, c.FullErr()
        } else if err != nil {
            return l1info, fmt.Errorf("adding block to channel out: %w", err)
        }
        c.blocks = append(c.blocks, block)
        c.updateSwTimeout(batch)

        if err = c.co.FullErr(); err != nil {
            c.setFullErr(err)
            // Adding this block still worked, so don't return error, just mark as full
        }

        return l1info, nil
    }

txdata获取后,使用sendTransaction将整个数据发送到L1当中。

总结

在这一章节中,我们了解了什么是batcher并且了解了batcher的运行原理,你可以在这个 address中查看当前batcher的行为。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

optimism中的libp2p应用

optimism中的libp2p应用

在本节中,主要用于讲解optimism是如何使用libp2p来完成op-node中的p2p网络建立的。
p2p网络主要是用于在不同的node中传递信息,例如sequencer完成unsafe的区块构建后,通过p2p的gossiphub的pub/sub进行传播。libp2p还处理了其他,例如网络,寻址等在p2p网络中的基础件层。

了解libp2p

libp2p(简称来自“库对等”或“library peer-to-peer”)是一个面向对等(P2P)网络的框架,能够帮助开发P2P应用程序。它包含了一套协议、规范和库,使网络参与者(也称为“对等体”或“peers”)之间的P2P通信变得更为简便 (source)。libp2p最初是作为IPFS(InterPlanetary File System,星际文件系统)项目的一部分,后来它演变成了一个独立的项目,成为了分布式网络的模块化网络堆栈 (source)。

libp2p是IPFS社区的一个开源项目,欢迎广泛社区的贡献,包括帮助编写规范、编码实现以及创建示例和教程 (source)。libp2p是由多个构建模块组成的,每个模块都有非常明确、有文档记录且经过测试的接口,使得它们可组合、可替换,因此可升级 (source)。libp2p的模块化特性使得开发人员可以选择并使用仅对他们的应用程序必要的组件,从而在构建P2P网络应用程序时促进了灵活性和效率。

相关资源

libp2p的模块化架构和开源特性为开发强大、可扩展和灵活的P2P应用程序提供了良好的环境,使其成为分布式网络和网络应用程序开发领域的重要参与者。

libp2p实现方式

在使用libp2p时,你会需要实现和配置一些核心组件以构建你的P2P网络。以下是libp2p在应用中的一些主要实现方面:

1. 节点创建与配置:

  • 创建和配置libp2p节点是最基本的步骤,这包括设置节点的网络地址、身份和其他基本参数。
    关键使用代码:
    libp2p.New()

2. 传输协议:

  • 选择和配置你的传输协议(例如TCP、WebSockets等)以确保节点之间的通信。
    关键使用代码:
    tcpTransport := tcp.NewTCPTransport()

3. 多路复用和流控制:

  • 实现多路复用来允许在单一的连接上处理多个并发的数据流。
  • 实现流量控制来管理数据的传输速率和处理速率。
    关键使用代码:
    yamuxTransport := yamux.New()

4. 安全和加密:

  • 配置安全传输层以确保通信的安全性和隐私。
  • 实现加密和身份验证机制以保护数据和验证通信方。
    关键使用代码:
    tlsTransport := tls.New()

5. 协议和消息处理:

  • 定义和实现自定义协议来处理特定的网络操作和消息交换。
  • 处理接收到的消息并根据需要发送响应。
    关键使用代码:
    host.SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)

6. 发现和路由:

  • 实现节点发现机制来找到网络中的其他节点。
  • 实现路由逻辑以确定如何将消息路由到网络中的正确节点。
    关键使用代码:
    dht := kaddht.NewDHT(ctx, host, datastore.NewMapDatastore())

7. 网络行为和策略:

  • 定义和实现网络的行为和策略,例如连接管理、错误处理和重试逻辑。
    关键使用代码:
    connManager := connmgr.NewConnManager(lowWater, highWater, gracePeriod)

8. 状态管理和存储:

  • 管理节点和网络的状态,包括连接状态、节点列表和数据存储。
    关键使用代码:
    peerstore := pstoremem.NewPeerstore()

9. 测试和调试:

  • 为你的libp2p应用编写测试以确保其正确性和可靠性。
  • 使用调试工具和日志来诊断和解决网络问题。
    关键使用代码:
    logging.SetLogLevel("libp2p", "DEBUG")

10. 文档和社区支持:

- 查阅libp2p的文档以了解其各种组件和API。
- 与libp2p社区交流以获取支持和解决问题。

}

以上是使用libp2p时需要考虑和实现的一些主要方面。每个项目的具体实现可能会有所不同,但这些基本方面是构建和运行libp2p应用所必需的。在实现这些功能时,可以参考libp2p的官方文档GitHub仓库中的示例代码和教程。

在OP-node中libp2p的使用

为了弄清楚op-node和libp2p的关系,我们必须弄清楚几个问题

- 为什么选择libp2p?为什么不选择devp2p(geth使用devp2p)
- OP-node有哪些数据或者流程和p2p网络紧密相关
- 这些功能是如何在代码层实现的 

op-node需要libp2p网络的原因

首先我们要了解为什么optimism需要p2p网络
libp2p是一个模块化的网络协议,允许开发人员构建去中心化的点对点应用,适用于多种用例 (source)(source)。而devp2p主要用于以太坊生态系统,专为以太坊应用定制 (source)。libp2p的灵活性和广泛适用性可能使其成为开发人员的首选。

op-node主要使用libp2p的功能点

- 用于sequencer将产生的unsafe的block传递到其他非sequencer节点
- 用于非sequencer模式下的其他节点当出现gap时进行快速同步(反向链同步)
- 用于采用积分声誉系统来规范整体节点的良好环境

代码实现

host自定义初始化

host可以理解为是p2p的节点,当开启这个节点的时候,需要针对自己的项目进行一些特殊的初始化配置

现在让我们看一下 op-node/p2p/host.go文件中的Host方法,

该函数主要用于设置 libp2p 主机并进行各种配置。以下是该函数的关键部分以及各部分的简单中文描述:

  1. 检查是否禁用 P2P
    如果 P2P 被禁用,函数会直接返回。

  2. 从公钥获取 Peer ID
    使用配置中的公钥来生成 Peer ID。

  3. 初始化 Peerstore
    创建一个基础的 Peerstore 存储。

  4. 初始化扩展 Peerstore
    在基础 Peerstore 的基础上,创建一个扩展的 Peerstore。

  5. 将私钥和公钥添加到 Peerstore
    在 Peerstore 中存储 Peer 的私钥和公钥。

  6. 初始化连接控制器(Connection Gater)
    用于控制网络连接。

  7. 初始化连接管理器(Connection Manager)
    用于管理网络连接。

  8. 设置传输和监听地址
    设置网络传输协议和主机的监听地址。

  9. 创建 libp2p 主机
    使用前面的所有设置来创建一个新的 libp2p 主机。

  10. 初始化静态 Peer
    如果有配置静态 Peer,进行初始化。

  11. 返回主机
    最后,函数返回创建好的 libp2p 主机。

这些关键部分负责 libp2p 主机的初始化和设置,每个部分都负责主机配置的一个特定方面。

    func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
        if conf.DisableP2P {
            return nil, nil
        }
        pub := conf.Priv.GetPublic()
        pid, err := peer.IDFromPublicKey(pub)
        if err != nil {
            return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
        }

        basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
        if err != nil {
            return nil, fmt.Errorf("failed to open peerstore: %w", err)
        }

        peerScoreParams := conf.PeerScoringParams()
        var scoreRetention time.Duration
        if peerScoreParams != nil {
            // Use the same retention period as gossip will if available
            scoreRetention = peerScoreParams.PeerScoring.RetainScore
        } else {
            // Disable score GC if peer scoring is disabled
            scoreRetention = 0
        }
        ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, scoreRetention)
        if err != nil {
            return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
        }

        if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
        }
        if err := ps.AddPubKey(pid, pub); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
        }

        var connGtr gating.BlockingConnectionGater
        connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection gater: %w", err)
        }
        connGtr = gating.AddBanExpiry(connGtr, ps, log, clock.SystemClock, metrics)
        connGtr = gating.AddMetering(connGtr, metrics)

        connMngr, err := DefaultConnManager(conf)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection manager: %w", err)
        }

        listenAddr, err := addrFromIPAndPort(conf.ListenIP, conf.ListenTCPPort)
        if err != nil {
            return nil, fmt.Errorf("failed to make listen addr: %w", err)
        }
        tcpTransport := libp2p.Transport(
            tcp.NewTCPTransport,
            tcp.WithConnectionTimeout(time.Minute*60)) // break unused connections
        // TODO: technically we can also run the node on websocket and QUIC transports. Maybe in the future?

        var nat lconf.NATManagerC // disabled if nil
        if conf.NAT {
            nat = basichost.NewNATManager
        }

        opts := []libp2p.Option{
            libp2p.Identity(conf.Priv),
            // Explicitly set the user-agent, so we can differentiate from other Go libp2p users.
            libp2p.UserAgent(conf.UserAgent),
            tcpTransport,
            libp2p.WithDialTimeout(conf.TimeoutDial),
            // No relay services, direct connections between peers only.
            libp2p.DisableRelay(),
            // host will start and listen to network directly after construction from config.
            libp2p.ListenAddrs(listenAddr),
            libp2p.ConnectionGater(connGtr),
            libp2p.ConnectionManager(connMngr),
            //libp2p.ResourceManager(nil), // TODO use resource manager interface to manage resources per peer better.
            libp2p.NATManager(nat),
            libp2p.Peerstore(ps),
            libp2p.BandwidthReporter(reporter), // may be nil if disabled
            libp2p.MultiaddrResolver(madns.DefaultResolver),
            // Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
            libp2p.Ping(true),
            // Help peers with their NAT reachability status, but throttle to avoid too much work.
            libp2p.EnableNATService(),
            libp2p.AutoNATServiceRateLimit(10, 5, time.Second*60),
        }
        opts = append(opts, conf.HostMux...)
        if conf.NoTransportSecurity {
            opts = append(opts, libp2p.Security(insecure.ID, insecure.NewWithIdentity))
        } else {
            opts = append(opts, conf.HostSecurity...)
        }
        h, err := libp2p.New(opts...)
        if err != nil {
            return nil, err
        }

        staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
        for i, peerAddr := range conf.StaticPeers {
            addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
            if err != nil {
                return nil, fmt.Errorf("bad peer address: %w", err)
            }
            staticPeers[i] = addr
        }

        out := &extraHost{
            Host:        h,
            connMgr:     connMngr,
            log:         log,
            staticPeers: staticPeers,
            quitC:       make(chan struct{}),
        }
        out.initStaticPeers()
        if len(conf.StaticPeers) > 0 {
            go out.monitorStaticPeers()
        }

        out.gater = connGtr
        return out, nil
    }

gossip下的区块传播

gossip在分布式系统中用于确保数据一致性,并修复由多播引起的问题。它是一种通信协议,其中信息从一个或多个节点发送到网络中的其他节点集,当网络中的一组客户端同时需要相同的数据时,这会很有用。当sequencer产生出unsafe状态的区块的时候,就是通过gossip网络传递给其他节点的。

首先让我们来看看节点是在哪里加入gossip网络的,
op-node/p2p/node.go中的init方法,在节点初始化的时候,调用JoinGossip方法加入了gossip网络

    func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
            …
            // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
            n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
            if err != nil {
                return fmt.Errorf("failed to start gossipsub router: %w", err)
            }
            n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
            …
    }

接下来来到op-node/p2p/gossip.go

以下是 JoinGossip 函数中执行的主要操作的简单概述:

  1. 验证器创建

    • val 被赋予 guardGossipValidator 函数调用的结果,目的是为八卦消息创建验证器,该验证器检查网络中传播的区块的有效性。
  2. 区块主题名称生成

    • 使用 blocksTopicV1 函数生成 blocksTopicName,该函数根据配置(cfg)中的 L2ChainID 格式化字符串。格式化的字符串遵循特定的结构:/optimism/{L2ChainID}/0/blocks
  3. 主题验证器注册

    • 调用 psRegisterTopicValidator 方法,以将 val 注册为区块主题的验证器。还指定了一些验证器的配置选项,例如3秒的超时和4的并发级别。
  4. 加入主题

    • 函数通过调用 ps.Join(blocksTopicName) 尝试加入区块八卦主题。如果出现错误,它将返回一个错误消息,指示无法加入主题。
  5. 事件处理器创建

    • 通过调用 blocksTopic.EventHandler() 尝试为区块主题创建事件处理器。如果出现错误,它将返回一个错误消息,指示无法创建处理器。
  6. 记录主题事件

    • 生成了一个新的goroutine来使用 LogTopicEvents 函数记录主题事件。
  7. 主题订阅

    • 函数通过调用 blocksTopic.Subscribe() 尝试订阅区块八卦主题。如果出现错误,它将返回一个错误消息,指示无法订阅。
  8. 订阅者创建

    • 使用 MakeSubscriber 函数创建了一个 subscriber,该函数封装了一个 BlocksHandler,该处理器处理来自 gossipInOnUnsafeL2Payload 事件。生成了一个新的goroutine来运行提供的 subscription
  9. 创建并返回发布者

    • 创建了一个 publisher 实例并返回,该实例配置为使用提供的配置和区块主题。
    func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
        val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
        blocksTopicName := blocksTopicV1(cfg) // return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
        err := ps.RegisterTopicValidator(blocksTopicName,
            val,
            pubsub.WithValidatorTimeout(3*time.Second),
            pubsub.WithValidatorConcurrency(4))
        if err != nil { 
            return nil, fmt.Errorf("failed to register blocks gossip topic: %w", err)
        }
        blocksTopic, err := ps.Join(blocksTopicName)
        if err != nil {
            return nil, fmt.Errorf("failed to join blocks gossip topic: %w", err)
        }
        blocksTopicEvents, err := blocksTopic.EventHandler()
        if err != nil {
            return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
        }
        go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)

        subscription, err := blocksTopic.Subscribe()
        if err != nil {
            return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
        }

        subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
        go subscriber(p2pCtx, subscription)

        return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
    }

这样,一个非sequencer节点的订阅就已经建立了,接下来让我们把目光移到sequencer模式的节点当中,然后看看他是如果将区块广播出去的。

op-node/rollup/driver/state.go

在eventloop中通过循环来等待sequencer模式中新的payload的产生(unsafe区块),然后将这个payload通过PublishL2Payload传播到gossip网络中

func (s *Driver) eventLoop() {
    …
    for(){
        …
        select {
        case <-sequencerCh:
            payload, err := s.sequencer.RunNextSequencerAction(ctx)
            if err != nil {
                s.log.Error("Sequencer critical error", "err", err)
                return
            }
            if s.network != nil && payload != nil {
                // Publishing of unsafe data via p2p is optional.
                // Errors are not severe enough to change/halt sequencing but should be logged and metered.
                if err := s.network.PublishL2Payload(ctx, payload); err != nil {
                    s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
                    s.metrics.RecordPublishingError()
                }
            }
            planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
            …
            }
    …
    }
    …
}

这样,一个新的payload的就进入到gossip网络中了。

在libp2p的pubsub系统中,节点首先从其他节点接收消息,然后检查消息的有效性。如果消息有效并且符合节点的订阅标准,节点会考虑将其转发给其他节点。基于某些策略,如网络拓扑和节点的订阅情况,节点会决定是否将消息转发给其它节点。如果决定转发,节点会将消息发送给与其连接并订阅了相同主题的所有节点。在转发过程中,为防止消息在网络中无限循环,通常会有机制来跟踪已转发的消息,并确保不会多次转发同一消息。同时,消息可能具有“生存时间”(TTL)属性,定义了消息可以在网络中转发的次数或时间,每当消息被转发时,TTL值都会递减,直到消息不再被转发为止。在验证方面,消息通常会通过一些验证过程,例如检查消息的签名和格式,以确保消息的完整性和真实性。在libp2p的pubsub模型中,这个过程确保了消息能够广泛传播到网络中的许多节点,同时避免了无限循环和网络拥塞,实现了有效的消息传递和处理。

当存在缺失区块,通过p2p快速同步

当节点因为特殊情况,比如宕机后重新链接,可能会产生一些没有同步上的区块(gaps),当遇到这种情况时,可以通过p2p网络的反向链的方式快速同步。

我们来看一下op-node/rollup/driver/state.go中的checkForGapInUnsafeQueue函数

该代码段定义了一个名为 checkForGapInUnsafeQueue 的方法,属于 Driver 结构体。它的目的是检查一个名为 "unsafe queue" 的队列中是否存在数据缺口,并尝试通过一个名为 altSync 的备用同步方法来检索缺失的负载。这里的关键点是,该方法是为了确保数据的连续性,并在检测到数据缺失时尝试从其他同步方法中检索缺失的数据。以下是函数的主要步骤:

  1. 函数首先从 s.derivation 中获取 UnsafeL2HeadUnsafeL2SyncTarget 作为检查范围的起始和结束点。
  2. 函数检查在 startend 之间是否存在缺失的数据块,这是通过比较 endstartNumber 值来完成的。
  3. 如果检测到数据缺口,函数会通过调用 s.altSync.RequestL2Range(ctx, start, end) 来请求缺失的数据范围。如果 end 是一个空引用(即 eth.L2BlockRef{}),函数将请求一个开放结束范围的同步,从 start 开始。
  4. 在请求数据时,函数会记录一个调试日志,说明它正在请求哪个范围的数据。
  5. 函数最终返回一个错误值。如果没有错误,它会返回 nil
    // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
    // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
    // Results are received through OnUnsafeL2Payload.
    func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
        start := s.derivation.UnsafeL2Head()
        end := s.derivation.UnsafeL2SyncTarget()
        // Check if we have missing blocks between the start and end. Request them if we do.
        if end == (eth.L2BlockRef{}) {
            s.log.Debug("requesting sync with open-end range", "start", start)
            return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
        } else if end.Number > start.Number+1 {
            s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
            return s.altSync.RequestL2Range(ctx, start, end)
        }
        return nil
    }

RequestL2Range函数向requests通道里传递请求区块的开始和结束信号。

然后通过onRangeRequest方法来对请求向peerRequests通道分发,peerRequests通道会被多个peer开启的loop所等待,即每一次分发都只有一个peer会去处理这个request。

    func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
            …
            for i := uint64(0); ; i++ {
            num := req.end.Number - 1 - i
            if num <= req.start {
                return
            }
            // check if we have something in quarantine already
            if h, ok := s.quarantineByNum[num]; ok {
                if s.trusted.Contains(h) { // if we trust it, try to promote it.
                    s.tryPromote(h)
                }
                // Don't fetch things that we have a candidate for already.
                // We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks
                continue
            }

            if _, ok := s.inFlight[num]; ok {
                log.Debug("request still in-flight, not rescheduling sync request", "num", num)
                continue // request still in flight
            }
            pr := peerRequest{num: num, complete: new(atomic.Bool)}

            log.Debug("Scheduling P2P block request", "num", num)
            // schedule number
            select {
            case s.peerRequests <- pr:
                s.inFlight[num] = pr.complete
            case <-ctx.Done():
                log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
                return
            default: // peers may all be busy processing requests already
                log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
                return
            }
        }
    }

接下来我们看看,当peer收到这个request的时候会怎么处理。

首先我们要知道的是,peer和请求节点之间的链接,或者消息传递是通过libp2p的stream来传递的。stream的处理方法由接收peer节点实现,stream的创建由发送节点来开启。

我们可以在之前的init函数中看到这样的代码,这里MakeStreamHandler返回了一个处理函数,SetStreamHandler将协议id和这个处理函数绑定,因此,每当发送节点创建并使用这个stream的时候,都会触发返回的处理函数。

    n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
    // register the sync protocol with libp2p host
    payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
    n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)

接下来让我们看看处理函数里面是如何处理的
函数首先进行全局和个人的速率限制检查,以控制处理请求的速度。然后,它读取并验证了请求的区块号,确保它在合理的范围内。之后,函数从 L2 层获取请求的区块负载,并将其写入到响应流中。在写入响应数据时,它设置了写入截止时间,以避免在写入过程中被慢速的 peer 连接阻塞。最终,函数返回请求的区块号和可能的错误。

    func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) {
        peerId := stream.Conn().RemotePeer()

        // take a token from the global rate-limiter,
        // to make sure there's not too much concurrent server work between different peers.
        if err := srv.globalRequestsRL.Wait(ctx); err != nil {
            return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
        }

        // find rate limiting data of peer, or add otherwise
        srv.peerStatsLock.Lock()
        ps, _ := srv.peerRateLimits.Get(peerId)
        if ps == nil {
            ps = &peerStat{
                Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),
            }
            srv.peerRateLimits.Add(peerId, ps)
            ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting
        } else {
            // Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors.

            // If the requester thinks we're taking too long, then it's their problem and they can disconnect.
            // We'll disconnect ourselves only when failing to read/write,
            // if the work is invalid (range validation), or when individual sub tasks timeout.
            if err := ps.Requests.Wait(ctx); err != nil {
                return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
            }
        }
        srv.peerStatsLock.Unlock()

        // Set read deadline, if available
        _ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout))

        // Read the request
        var req uint64
        if err := binary.Read(stream, binary.LittleEndian, &req); err != nil {
            return 0, fmt.Errorf("failed to read requested block number: %w", err)
        }
        if err := stream.CloseRead(); err != nil {
            return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err)
        }

        // Check the request is within the expected range of blocks
        if req < srv.cfg.Genesis.L2.Number {
            return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)
        }
        max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix()))
        if err != nil {
            return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr)
        }
        if req > max {
            return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr)
        }

        payload, err := srv.l2.PayloadByNumber(ctx, req)
        if err != nil {
            if errors.Is(err, ethereum.NotFound) {
                return req, fmt.Errorf("peer requested unknown block by number: %w", err)
            } else {
                return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err)
            }
        }

        // We set write deadline, if available, to safely write without blocking on a throttling peer connection
        _ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout))

        // 0 - resultCode: success = 0
        // 1:5 - version: 0
        var tmp [5]byte
        if _, err := stream.Write(tmp[:]); err != nil {
            return req, fmt.Errorf("failed to write response header data: %w", err)
        }
        w := snappy.NewBufferedWriter(stream)
        if _, err := payload.MarshalSSZ(w); err != nil {
            return req, fmt.Errorf("failed to write payload to sync response: %w", err)
        }
        if err := w.Close(); err != nil {
            return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err)
        }
        return req, nil
    }

至此,反向链同步请求和处理的大致流程已经讲解完毕

p2p节点中的积分声誉系统

为了防止某些节点进行恶意的请求与响应来破坏整个网络的安全性,optimism还使用了一套积分系统。

例如在op-node/p2p/app_scores.go 中存在一系列函数对peer的分数进行设置

    func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

    func (s *peerApplicationScorer) onResponseError(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

    func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

然后在添加新的节点前会检查其积分情况

    func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
        return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
    }

    func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
        score, err := g.scores.GetPeerScore(p)
        if err != nil {
            return false
        }
        return score >= g.minScore
    }

    func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
        return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
    }

    func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
        return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
    }

    func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
        return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
    }

总结

libp2p的高度可配置性使得整个项目的p2p具有高度的可自定义化和模块话,以上是optimsim对libp2p进行个性化实现的主要逻辑,还有其他细节可以在p2p目录下通过阅读源码的方式来详细学习。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN