您正在查看: Surou 发布的文章

batcher工作原理

batcher工作原理

在这一章节中,我们将探讨到底什么是batcher ⚙️
官方specs中有batcher的介绍(source)

在进行之前,我们先提出几个问题,通过这两个问题来真正理解batcher的作用以及工作原理

  • batcher是什么?它为什么叫做batcher
  • batcher在代码中到底是怎么运行的?

前置知识

  • 在rollup机制中,要想做到的去中心化特性,例如抗审查等。我们必须要把layer2上发生的数据(transactions)全部发送到layer1当中。这样就可以在利用layer1的安全性的同时,又可以完全从layer1中构建出来整个layer2的数据,使得layer2才真正的具有有效性。
  • Epochs and the Sequencing Window:Epoch可以简单理解为L1新的一个区块(N+1)生成的这段时间。epoch的编号等于L1区块N的编号,在L1区块N -> N+1 这段时间内产生的所有L2区块都属于epoch N。在上个概念中我们提到必须上传L2的数据到L1中,那么我们应该在什么范围内上传数据才是有效的呢,Sequencing Window的size给了我们答案,即区块N/epoch N的相关数据,必须在L1的第N + size之前已经上传到L1了。
  • Batch/Batcher Transaction: Batch可以简单理解为每一个L2区块构建所需要的交易。Batcher Transaction为多个batch组合起来经过加工后发送到L1的那笔交易
  • Channe: channel可以简单理解为是batch的组合,组合是为了获得更好的压缩率,从而降低数据可用性成本,以使batcher上传的成本进一步降低。
  • Frame: frame可以理解为,有时候为了更好的压缩率,可能会导致channel数据过大而不能直接被batcher将整个channel发送给L1,因此需要对channel进行切割,分多次进行发送。

什么是batcher

在rollup中,需要一个角色来传递L2信息到L1当中,同时每当有新的交易就马上发送是昂贵且不方便管理的。这时候我们将需要制定一种合理的批量上传策略。因此,为了解决这个问题,batcher出现了。batcher是唯一存在(sequencer当前掌管私钥),且和特定地址发送Batcher Transaction来传递L2信息的组件。

batcher通过对unsafe区块数据进行收集,来获取多个batch,在这里每个区块都对应一个batch。当收集足够的batch进行高效压缩后生成channel,并以frame的形式发送到L1来完成L2的信息上传。

代码实现

在这部分我们会从代码层来进行深度的机制和实现原理的讲解

程序起点

op-batcher/batcher/driver.go

通过调用Start函数来启动loop循环,在loop的循环中,主要处理三件事

  • 当定时器触发时,将所有新的还未加载的L2block加载进来,然后触发publishStateToL1函数向L1进行state发布
  • 处理receipts,记录成功或者失败状态
  • 处理关闭请求
    func (l *BatchSubmitter) Start() error {
        l.log.Info("Starting Batch Submitter")

        l.mutex.Lock()
        defer l.mutex.Unlock()

        if l.running {
            return errors.New("batcher is already running")
        }
        l.running = true

        l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
        l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
        l.state.Clear()
        l.lastStoredBlock = eth.BlockID{}

        l.wg.Add(1)
        go l.loop()

        l.log.Info("Batch Submitter started")

        return nil
    }
    func (l *BatchSubmitter) loop() {
        defer l.wg.Done()

        ticker := time.NewTicker(l.PollInterval)
        defer ticker.Stop()

        receiptsCh := make(chan txmgr.TxReceipt[txData])
        queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)

        for {
            select {
            case <-ticker.C:
                if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
                    err := l.state.Close()
                    if err != nil {
                        l.log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
                    }
                    l.publishStateToL1(queue, receiptsCh, true)
                    l.state.Clear()
                    continue
                }
                l.publishStateToL1(queue, receiptsCh, false)
            case r := <-receiptsCh:
                l.handleReceipt(r)
            case <-l.shutdownCtx.Done():
                err := l.state.Close()
                if err != nil {
                    l.log.Error("error closing the channel manager", "err", err)
                }
                l.publishStateToL1(queue, receiptsCh, true)
                return
            }
        }
    }

加载最新区块数据

op-batcher/batcher/driver.go

loadBlocksIntoState函数调用calculateL2BlockRangeToStore来获取自上次发送batch transaction而派生的最新safeblock后新生成的unsafeblock范围。然后循环将这个范围中的每一个unsafe块调用loadBlockIntoState函数从L2里获取并通过AddL2Block函数加载到内部的block队列里。等待进一步处理。

    func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
        start, end, err := l.calculateL2BlockRangeToStore(ctx)
        ……
        var latestBlock *types.Block
        // Add all blocks to "state"
        for i := start.Number + 1; i < end.Number+1; i++ {
            block, err := l.loadBlockIntoState(ctx, i)
            if errors.Is(err, ErrReorg) {
                l.log.Warn("Found L2 reorg", "block_number", i)
                l.lastStoredBlock = eth.BlockID{}
                return err
            } else if err != nil {
                l.log.Warn("failed to load block into state", "err", err)
                return err
            }
            l.lastStoredBlock = eth.ToBlockID(block)
            latestBlock = block
        }
        ……
    }
    func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
        ……
        block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
        ……
        if err := l.state.AddL2Block(block); err != nil {
            return nil, fmt.Errorf("adding L2 block to state: %w", err)
        }
        ……
        return block, nil
    }

将加载的block数据处理,并发送到layer1

op-batcher/batcher/driver.go

publishTxToL1函数使用TxData函数对之前加载到数据进行处理,并调用sendTransaction函数发送到L1

    func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
        // send all available transactions
        l1tip, err := l.l1Tip(ctx)
        if err != nil {
            l.log.Error("Failed to query L1 tip", "error", err)
            return err
        }
        l.recordL1Tip(l1tip)

        // Collect next transaction data
        txdata, err := l.state.TxData(l1tip.ID())
        if err == io.EOF {
            l.log.Trace("no transaction data available")
            return err
        } else if err != nil {
            l.log.Error("unable to get tx data", "err", err)
            return err
        }

        l.sendTransaction(txdata, queue, receiptsCh)
        return nil
    }

TxData详解

op-batcher/batcher/channel_manager.go

TxData函数主要负责两件事务

  • 查找第一个含有frame的的channel,如果存在且通过检查后使用nextTxData获取数据并返回
  • 如果没有这样的channel,我们需要现调用ensureChannelWithSpace检查channel还有剩余的空间,再使用processBlocks将之前加载到block队列中的数据构造到 outchannel的composer当中压缩
  • outputFramesoutchannel composer当中的数据切割成适合大小的frame
  • 最后再把刚构造到数据通过nextTxData函数返回出去。

EnsureChannelWithSpace 确保 currentChannel 填充有可容纳更多数据的空间的channel(即,channel.IsFull 返回 false)。 如果 currentChannel 为零或已满,则会创建一个新channel

    func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
        s.mu.Lock()
        defer s.mu.Unlock()
        var firstWithFrame *channel
        for _, ch := range s.channelQueue {
            if ch.HasFrame() {
                firstWithFrame = ch
                break
            }
        }

        dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
        s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))

        // Short circuit if there is a pending frame or the channel manager is closed.
        if dataPending || s.closed {
            return s.nextTxData(firstWithFrame)
        }

        // No pending frame, so we have to add new blocks to the channel

        // If we have no saved blocks, we will not be able to create valid frames
        if len(s.blocks) == 0 {
            return txData{}, io.EOF
        }

        if err := s.ensureChannelWithSpace(l1Head); err != nil {
            return txData{}, err
        }

        if err := s.processBlocks(); err != nil {
            return txData{}, err
        }

        // Register current L1 head only after all pending blocks have been
        // processed. Even if a timeout will be triggered now, it is better to have
        // all pending blocks be included in this channel for submission.
        s.registerL1Block(l1Head)

        if err := s.outputFrames(); err != nil {
            return txData{}, err
        }

        return s.nextTxData(s.currentChannel)
    }

processBlocks函数在内部通过AddBlockblock队列里的block加入到当前的channel当中

    func (s *channelManager) processBlocks() error {
        var (
            blocksAdded int
            _chFullErr  *ChannelFullError // throw away, just for type checking
            latestL2ref eth.L2BlockRef
        )
        for i, block := range s.blocks {
            l1info, err := s.currentChannel.AddBlock(block)
            if errors.As(err, &_chFullErr) {
                // current block didn't get added because channel is already full
                break
            } else if err != nil {
                return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
            }
            s.log.Debug("Added block to channel", "channel", s.currentChannel.ID(), "block", block)

            blocksAdded += 1
            latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
            s.metr.RecordL2BlockInChannel(block)
            // current block got added but channel is now full
            if s.currentChannel.IsFull() {
                break
            }
        }

AddBlock 首先通过BlockToBatchbatchblcok中获取出来,再通过AddBatch函数对数据进行压缩并存储。

    func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error) {
        if c.IsFull() {
            return derive.L1BlockInfo{}, c.FullErr()
        }

        batch, l1info, err := derive.BlockToBatch(block)
        if err != nil {
            return l1info, fmt.Errorf("converting block to batch: %w", err)
        }

        if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
            c.setFullErr(err)
            return l1info, c.FullErr()
        } else if err != nil {
            return l1info, fmt.Errorf("adding block to channel out: %w", err)
        }
        c.blocks = append(c.blocks, block)
        c.updateSwTimeout(batch)

        if err = c.co.FullErr(); err != nil {
            c.setFullErr(err)
            // Adding this block still worked, so don't return error, just mark as full
        }

        return l1info, nil
    }

txdata获取后,使用sendTransaction将整个数据发送到L1当中。

总结

在这一章节中,我们了解了什么是batcher并且了解了batcher的运行原理,你可以在这个 address中查看当前batcher的行为。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

optimism中的libp2p应用

optimism中的libp2p应用

在本节中,主要用于讲解optimism是如何使用libp2p来完成op-node中的p2p网络建立的。
p2p网络主要是用于在不同的node中传递信息,例如sequencer完成unsafe的区块构建后,通过p2p的gossiphub的pub/sub进行传播。libp2p还处理了其他,例如网络,寻址等在p2p网络中的基础件层。

了解libp2p

libp2p(简称来自“库对等”或“library peer-to-peer”)是一个面向对等(P2P)网络的框架,能够帮助开发P2P应用程序。它包含了一套协议、规范和库,使网络参与者(也称为“对等体”或“peers”)之间的P2P通信变得更为简便 (source)。libp2p最初是作为IPFS(InterPlanetary File System,星际文件系统)项目的一部分,后来它演变成了一个独立的项目,成为了分布式网络的模块化网络堆栈 (source)。

libp2p是IPFS社区的一个开源项目,欢迎广泛社区的贡献,包括帮助编写规范、编码实现以及创建示例和教程 (source)。libp2p是由多个构建模块组成的,每个模块都有非常明确、有文档记录且经过测试的接口,使得它们可组合、可替换,因此可升级 (source)。libp2p的模块化特性使得开发人员可以选择并使用仅对他们的应用程序必要的组件,从而在构建P2P网络应用程序时促进了灵活性和效率。

相关资源

libp2p的模块化架构和开源特性为开发强大、可扩展和灵活的P2P应用程序提供了良好的环境,使其成为分布式网络和网络应用程序开发领域的重要参与者。

libp2p实现方式

在使用libp2p时,你会需要实现和配置一些核心组件以构建你的P2P网络。以下是libp2p在应用中的一些主要实现方面:

1. 节点创建与配置:

  • 创建和配置libp2p节点是最基本的步骤,这包括设置节点的网络地址、身份和其他基本参数。
    关键使用代码:
    libp2p.New()

2. 传输协议:

  • 选择和配置你的传输协议(例如TCP、WebSockets等)以确保节点之间的通信。
    关键使用代码:
    tcpTransport := tcp.NewTCPTransport()

3. 多路复用和流控制:

  • 实现多路复用来允许在单一的连接上处理多个并发的数据流。
  • 实现流量控制来管理数据的传输速率和处理速率。
    关键使用代码:
    yamuxTransport := yamux.New()

4. 安全和加密:

  • 配置安全传输层以确保通信的安全性和隐私。
  • 实现加密和身份验证机制以保护数据和验证通信方。
    关键使用代码:
    tlsTransport := tls.New()

5. 协议和消息处理:

  • 定义和实现自定义协议来处理特定的网络操作和消息交换。
  • 处理接收到的消息并根据需要发送响应。
    关键使用代码:
    host.SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)

6. 发现和路由:

  • 实现节点发现机制来找到网络中的其他节点。
  • 实现路由逻辑以确定如何将消息路由到网络中的正确节点。
    关键使用代码:
    dht := kaddht.NewDHT(ctx, host, datastore.NewMapDatastore())

7. 网络行为和策略:

  • 定义和实现网络的行为和策略,例如连接管理、错误处理和重试逻辑。
    关键使用代码:
    connManager := connmgr.NewConnManager(lowWater, highWater, gracePeriod)

8. 状态管理和存储:

  • 管理节点和网络的状态,包括连接状态、节点列表和数据存储。
    关键使用代码:
    peerstore := pstoremem.NewPeerstore()

9. 测试和调试:

  • 为你的libp2p应用编写测试以确保其正确性和可靠性。
  • 使用调试工具和日志来诊断和解决网络问题。
    关键使用代码:
    logging.SetLogLevel("libp2p", "DEBUG")

10. 文档和社区支持:

- 查阅libp2p的文档以了解其各种组件和API。
- 与libp2p社区交流以获取支持和解决问题。

}

以上是使用libp2p时需要考虑和实现的一些主要方面。每个项目的具体实现可能会有所不同,但这些基本方面是构建和运行libp2p应用所必需的。在实现这些功能时,可以参考libp2p的官方文档GitHub仓库中的示例代码和教程。

在OP-node中libp2p的使用

为了弄清楚op-node和libp2p的关系,我们必须弄清楚几个问题

- 为什么选择libp2p?为什么不选择devp2p(geth使用devp2p)
- OP-node有哪些数据或者流程和p2p网络紧密相关
- 这些功能是如何在代码层实现的 

op-node需要libp2p网络的原因

首先我们要了解为什么optimism需要p2p网络
libp2p是一个模块化的网络协议,允许开发人员构建去中心化的点对点应用,适用于多种用例 (source)(source)。而devp2p主要用于以太坊生态系统,专为以太坊应用定制 (source)。libp2p的灵活性和广泛适用性可能使其成为开发人员的首选。

op-node主要使用libp2p的功能点

- 用于sequencer将产生的unsafe的block传递到其他非sequencer节点
- 用于非sequencer模式下的其他节点当出现gap时进行快速同步(反向链同步)
- 用于采用积分声誉系统来规范整体节点的良好环境

代码实现

host自定义初始化

host可以理解为是p2p的节点,当开启这个节点的时候,需要针对自己的项目进行一些特殊的初始化配置

现在让我们看一下 op-node/p2p/host.go文件中的Host方法,

该函数主要用于设置 libp2p 主机并进行各种配置。以下是该函数的关键部分以及各部分的简单中文描述:

  1. 检查是否禁用 P2P
    如果 P2P 被禁用,函数会直接返回。

  2. 从公钥获取 Peer ID
    使用配置中的公钥来生成 Peer ID。

  3. 初始化 Peerstore
    创建一个基础的 Peerstore 存储。

  4. 初始化扩展 Peerstore
    在基础 Peerstore 的基础上,创建一个扩展的 Peerstore。

  5. 将私钥和公钥添加到 Peerstore
    在 Peerstore 中存储 Peer 的私钥和公钥。

  6. 初始化连接控制器(Connection Gater)
    用于控制网络连接。

  7. 初始化连接管理器(Connection Manager)
    用于管理网络连接。

  8. 设置传输和监听地址
    设置网络传输协议和主机的监听地址。

  9. 创建 libp2p 主机
    使用前面的所有设置来创建一个新的 libp2p 主机。

  10. 初始化静态 Peer
    如果有配置静态 Peer,进行初始化。

  11. 返回主机
    最后,函数返回创建好的 libp2p 主机。

这些关键部分负责 libp2p 主机的初始化和设置,每个部分都负责主机配置的一个特定方面。

    func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
        if conf.DisableP2P {
            return nil, nil
        }
        pub := conf.Priv.GetPublic()
        pid, err := peer.IDFromPublicKey(pub)
        if err != nil {
            return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
        }

        basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
        if err != nil {
            return nil, fmt.Errorf("failed to open peerstore: %w", err)
        }

        peerScoreParams := conf.PeerScoringParams()
        var scoreRetention time.Duration
        if peerScoreParams != nil {
            // Use the same retention period as gossip will if available
            scoreRetention = peerScoreParams.PeerScoring.RetainScore
        } else {
            // Disable score GC if peer scoring is disabled
            scoreRetention = 0
        }
        ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, scoreRetention)
        if err != nil {
            return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
        }

        if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
        }
        if err := ps.AddPubKey(pid, pub); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
        }

        var connGtr gating.BlockingConnectionGater
        connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection gater: %w", err)
        }
        connGtr = gating.AddBanExpiry(connGtr, ps, log, clock.SystemClock, metrics)
        connGtr = gating.AddMetering(connGtr, metrics)

        connMngr, err := DefaultConnManager(conf)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection manager: %w", err)
        }

        listenAddr, err := addrFromIPAndPort(conf.ListenIP, conf.ListenTCPPort)
        if err != nil {
            return nil, fmt.Errorf("failed to make listen addr: %w", err)
        }
        tcpTransport := libp2p.Transport(
            tcp.NewTCPTransport,
            tcp.WithConnectionTimeout(time.Minute*60)) // break unused connections
        // TODO: technically we can also run the node on websocket and QUIC transports. Maybe in the future?

        var nat lconf.NATManagerC // disabled if nil
        if conf.NAT {
            nat = basichost.NewNATManager
        }

        opts := []libp2p.Option{
            libp2p.Identity(conf.Priv),
            // Explicitly set the user-agent, so we can differentiate from other Go libp2p users.
            libp2p.UserAgent(conf.UserAgent),
            tcpTransport,
            libp2p.WithDialTimeout(conf.TimeoutDial),
            // No relay services, direct connections between peers only.
            libp2p.DisableRelay(),
            // host will start and listen to network directly after construction from config.
            libp2p.ListenAddrs(listenAddr),
            libp2p.ConnectionGater(connGtr),
            libp2p.ConnectionManager(connMngr),
            //libp2p.ResourceManager(nil), // TODO use resource manager interface to manage resources per peer better.
            libp2p.NATManager(nat),
            libp2p.Peerstore(ps),
            libp2p.BandwidthReporter(reporter), // may be nil if disabled
            libp2p.MultiaddrResolver(madns.DefaultResolver),
            // Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
            libp2p.Ping(true),
            // Help peers with their NAT reachability status, but throttle to avoid too much work.
            libp2p.EnableNATService(),
            libp2p.AutoNATServiceRateLimit(10, 5, time.Second*60),
        }
        opts = append(opts, conf.HostMux...)
        if conf.NoTransportSecurity {
            opts = append(opts, libp2p.Security(insecure.ID, insecure.NewWithIdentity))
        } else {
            opts = append(opts, conf.HostSecurity...)
        }
        h, err := libp2p.New(opts...)
        if err != nil {
            return nil, err
        }

        staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
        for i, peerAddr := range conf.StaticPeers {
            addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
            if err != nil {
                return nil, fmt.Errorf("bad peer address: %w", err)
            }
            staticPeers[i] = addr
        }

        out := &extraHost{
            Host:        h,
            connMgr:     connMngr,
            log:         log,
            staticPeers: staticPeers,
            quitC:       make(chan struct{}),
        }
        out.initStaticPeers()
        if len(conf.StaticPeers) > 0 {
            go out.monitorStaticPeers()
        }

        out.gater = connGtr
        return out, nil
    }

gossip下的区块传播

gossip在分布式系统中用于确保数据一致性,并修复由多播引起的问题。它是一种通信协议,其中信息从一个或多个节点发送到网络中的其他节点集,当网络中的一组客户端同时需要相同的数据时,这会很有用。当sequencer产生出unsafe状态的区块的时候,就是通过gossip网络传递给其他节点的。

首先让我们来看看节点是在哪里加入gossip网络的,
op-node/p2p/node.go中的init方法,在节点初始化的时候,调用JoinGossip方法加入了gossip网络

    func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
            …
            // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
            n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
            if err != nil {
                return fmt.Errorf("failed to start gossipsub router: %w", err)
            }
            n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
            …
    }

接下来来到op-node/p2p/gossip.go

以下是 JoinGossip 函数中执行的主要操作的简单概述:

  1. 验证器创建

    • val 被赋予 guardGossipValidator 函数调用的结果,目的是为八卦消息创建验证器,该验证器检查网络中传播的区块的有效性。
  2. 区块主题名称生成

    • 使用 blocksTopicV1 函数生成 blocksTopicName,该函数根据配置(cfg)中的 L2ChainID 格式化字符串。格式化的字符串遵循特定的结构:/optimism/{L2ChainID}/0/blocks
  3. 主题验证器注册

    • 调用 psRegisterTopicValidator 方法,以将 val 注册为区块主题的验证器。还指定了一些验证器的配置选项,例如3秒的超时和4的并发级别。
  4. 加入主题

    • 函数通过调用 ps.Join(blocksTopicName) 尝试加入区块八卦主题。如果出现错误,它将返回一个错误消息,指示无法加入主题。
  5. 事件处理器创建

    • 通过调用 blocksTopic.EventHandler() 尝试为区块主题创建事件处理器。如果出现错误,它将返回一个错误消息,指示无法创建处理器。
  6. 记录主题事件

    • 生成了一个新的goroutine来使用 LogTopicEvents 函数记录主题事件。
  7. 主题订阅

    • 函数通过调用 blocksTopic.Subscribe() 尝试订阅区块八卦主题。如果出现错误,它将返回一个错误消息,指示无法订阅。
  8. 订阅者创建

    • 使用 MakeSubscriber 函数创建了一个 subscriber,该函数封装了一个 BlocksHandler,该处理器处理来自 gossipInOnUnsafeL2Payload 事件。生成了一个新的goroutine来运行提供的 subscription
  9. 创建并返回发布者

    • 创建了一个 publisher 实例并返回,该实例配置为使用提供的配置和区块主题。
    func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
        val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
        blocksTopicName := blocksTopicV1(cfg) // return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
        err := ps.RegisterTopicValidator(blocksTopicName,
            val,
            pubsub.WithValidatorTimeout(3*time.Second),
            pubsub.WithValidatorConcurrency(4))
        if err != nil { 
            return nil, fmt.Errorf("failed to register blocks gossip topic: %w", err)
        }
        blocksTopic, err := ps.Join(blocksTopicName)
        if err != nil {
            return nil, fmt.Errorf("failed to join blocks gossip topic: %w", err)
        }
        blocksTopicEvents, err := blocksTopic.EventHandler()
        if err != nil {
            return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
        }
        go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)

        subscription, err := blocksTopic.Subscribe()
        if err != nil {
            return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
        }

        subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
        go subscriber(p2pCtx, subscription)

        return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
    }

这样,一个非sequencer节点的订阅就已经建立了,接下来让我们把目光移到sequencer模式的节点当中,然后看看他是如果将区块广播出去的。

op-node/rollup/driver/state.go

在eventloop中通过循环来等待sequencer模式中新的payload的产生(unsafe区块),然后将这个payload通过PublishL2Payload传播到gossip网络中

func (s *Driver) eventLoop() {
    …
    for(){
        …
        select {
        case <-sequencerCh:
            payload, err := s.sequencer.RunNextSequencerAction(ctx)
            if err != nil {
                s.log.Error("Sequencer critical error", "err", err)
                return
            }
            if s.network != nil && payload != nil {
                // Publishing of unsafe data via p2p is optional.
                // Errors are not severe enough to change/halt sequencing but should be logged and metered.
                if err := s.network.PublishL2Payload(ctx, payload); err != nil {
                    s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
                    s.metrics.RecordPublishingError()
                }
            }
            planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
            …
            }
    …
    }
    …
}

这样,一个新的payload的就进入到gossip网络中了。

在libp2p的pubsub系统中,节点首先从其他节点接收消息,然后检查消息的有效性。如果消息有效并且符合节点的订阅标准,节点会考虑将其转发给其他节点。基于某些策略,如网络拓扑和节点的订阅情况,节点会决定是否将消息转发给其它节点。如果决定转发,节点会将消息发送给与其连接并订阅了相同主题的所有节点。在转发过程中,为防止消息在网络中无限循环,通常会有机制来跟踪已转发的消息,并确保不会多次转发同一消息。同时,消息可能具有“生存时间”(TTL)属性,定义了消息可以在网络中转发的次数或时间,每当消息被转发时,TTL值都会递减,直到消息不再被转发为止。在验证方面,消息通常会通过一些验证过程,例如检查消息的签名和格式,以确保消息的完整性和真实性。在libp2p的pubsub模型中,这个过程确保了消息能够广泛传播到网络中的许多节点,同时避免了无限循环和网络拥塞,实现了有效的消息传递和处理。

当存在缺失区块,通过p2p快速同步

当节点因为特殊情况,比如宕机后重新链接,可能会产生一些没有同步上的区块(gaps),当遇到这种情况时,可以通过p2p网络的反向链的方式快速同步。

我们来看一下op-node/rollup/driver/state.go中的checkForGapInUnsafeQueue函数

该代码段定义了一个名为 checkForGapInUnsafeQueue 的方法,属于 Driver 结构体。它的目的是检查一个名为 "unsafe queue" 的队列中是否存在数据缺口,并尝试通过一个名为 altSync 的备用同步方法来检索缺失的负载。这里的关键点是,该方法是为了确保数据的连续性,并在检测到数据缺失时尝试从其他同步方法中检索缺失的数据。以下是函数的主要步骤:

  1. 函数首先从 s.derivation 中获取 UnsafeL2HeadUnsafeL2SyncTarget 作为检查范围的起始和结束点。
  2. 函数检查在 startend 之间是否存在缺失的数据块,这是通过比较 endstartNumber 值来完成的。
  3. 如果检测到数据缺口,函数会通过调用 s.altSync.RequestL2Range(ctx, start, end) 来请求缺失的数据范围。如果 end 是一个空引用(即 eth.L2BlockRef{}),函数将请求一个开放结束范围的同步,从 start 开始。
  4. 在请求数据时,函数会记录一个调试日志,说明它正在请求哪个范围的数据。
  5. 函数最终返回一个错误值。如果没有错误,它会返回 nil
    // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
    // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
    // Results are received through OnUnsafeL2Payload.
    func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
        start := s.derivation.UnsafeL2Head()
        end := s.derivation.UnsafeL2SyncTarget()
        // Check if we have missing blocks between the start and end. Request them if we do.
        if end == (eth.L2BlockRef{}) {
            s.log.Debug("requesting sync with open-end range", "start", start)
            return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
        } else if end.Number > start.Number+1 {
            s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
            return s.altSync.RequestL2Range(ctx, start, end)
        }
        return nil
    }

RequestL2Range函数向requests通道里传递请求区块的开始和结束信号。

然后通过onRangeRequest方法来对请求向peerRequests通道分发,peerRequests通道会被多个peer开启的loop所等待,即每一次分发都只有一个peer会去处理这个request。

    func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
            …
            for i := uint64(0); ; i++ {
            num := req.end.Number - 1 - i
            if num <= req.start {
                return
            }
            // check if we have something in quarantine already
            if h, ok := s.quarantineByNum[num]; ok {
                if s.trusted.Contains(h) { // if we trust it, try to promote it.
                    s.tryPromote(h)
                }
                // Don't fetch things that we have a candidate for already.
                // We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks
                continue
            }

            if _, ok := s.inFlight[num]; ok {
                log.Debug("request still in-flight, not rescheduling sync request", "num", num)
                continue // request still in flight
            }
            pr := peerRequest{num: num, complete: new(atomic.Bool)}

            log.Debug("Scheduling P2P block request", "num", num)
            // schedule number
            select {
            case s.peerRequests <- pr:
                s.inFlight[num] = pr.complete
            case <-ctx.Done():
                log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
                return
            default: // peers may all be busy processing requests already
                log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
                return
            }
        }
    }

接下来我们看看,当peer收到这个request的时候会怎么处理。

首先我们要知道的是,peer和请求节点之间的链接,或者消息传递是通过libp2p的stream来传递的。stream的处理方法由接收peer节点实现,stream的创建由发送节点来开启。

我们可以在之前的init函数中看到这样的代码,这里MakeStreamHandler返回了一个处理函数,SetStreamHandler将协议id和这个处理函数绑定,因此,每当发送节点创建并使用这个stream的时候,都会触发返回的处理函数。

    n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
    // register the sync protocol with libp2p host
    payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
    n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)

接下来让我们看看处理函数里面是如何处理的
函数首先进行全局和个人的速率限制检查,以控制处理请求的速度。然后,它读取并验证了请求的区块号,确保它在合理的范围内。之后,函数从 L2 层获取请求的区块负载,并将其写入到响应流中。在写入响应数据时,它设置了写入截止时间,以避免在写入过程中被慢速的 peer 连接阻塞。最终,函数返回请求的区块号和可能的错误。

    func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) {
        peerId := stream.Conn().RemotePeer()

        // take a token from the global rate-limiter,
        // to make sure there's not too much concurrent server work between different peers.
        if err := srv.globalRequestsRL.Wait(ctx); err != nil {
            return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
        }

        // find rate limiting data of peer, or add otherwise
        srv.peerStatsLock.Lock()
        ps, _ := srv.peerRateLimits.Get(peerId)
        if ps == nil {
            ps = &peerStat{
                Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),
            }
            srv.peerRateLimits.Add(peerId, ps)
            ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting
        } else {
            // Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors.

            // If the requester thinks we're taking too long, then it's their problem and they can disconnect.
            // We'll disconnect ourselves only when failing to read/write,
            // if the work is invalid (range validation), or when individual sub tasks timeout.
            if err := ps.Requests.Wait(ctx); err != nil {
                return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
            }
        }
        srv.peerStatsLock.Unlock()

        // Set read deadline, if available
        _ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout))

        // Read the request
        var req uint64
        if err := binary.Read(stream, binary.LittleEndian, &req); err != nil {
            return 0, fmt.Errorf("failed to read requested block number: %w", err)
        }
        if err := stream.CloseRead(); err != nil {
            return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err)
        }

        // Check the request is within the expected range of blocks
        if req < srv.cfg.Genesis.L2.Number {
            return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)
        }
        max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix()))
        if err != nil {
            return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr)
        }
        if req > max {
            return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr)
        }

        payload, err := srv.l2.PayloadByNumber(ctx, req)
        if err != nil {
            if errors.Is(err, ethereum.NotFound) {
                return req, fmt.Errorf("peer requested unknown block by number: %w", err)
            } else {
                return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err)
            }
        }

        // We set write deadline, if available, to safely write without blocking on a throttling peer connection
        _ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout))

        // 0 - resultCode: success = 0
        // 1:5 - version: 0
        var tmp [5]byte
        if _, err := stream.Write(tmp[:]); err != nil {
            return req, fmt.Errorf("failed to write response header data: %w", err)
        }
        w := snappy.NewBufferedWriter(stream)
        if _, err := payload.MarshalSSZ(w); err != nil {
            return req, fmt.Errorf("failed to write payload to sync response: %w", err)
        }
        if err := w.Close(); err != nil {
            return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err)
        }
        return req, nil
    }

至此,反向链同步请求和处理的大致流程已经讲解完毕

p2p节点中的积分声誉系统

为了防止某些节点进行恶意的请求与响应来破坏整个网络的安全性,optimism还使用了一套积分系统。

例如在op-node/p2p/app_scores.go 中存在一系列函数对peer的分数进行设置

    func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

    func (s *peerApplicationScorer) onResponseError(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

    func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

然后在添加新的节点前会检查其积分情况

    func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
        return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
    }

    func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
        score, err := g.scores.GetPeerScore(p)
        if err != nil {
            return false
        }
        return score >= g.minScore
    }

    func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
        return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
    }

    func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
        return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
    }

    func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
        return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
    }

总结

libp2p的高度可配置性使得整个项目的p2p具有高度的可自定义化和模块话,以上是optimsim对libp2p进行个性化实现的主要逻辑,还有其他细节可以在p2p目录下通过阅读源码的方式来详细学习。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

optimism中区块的传递

optimism中区块的传递

区块的传递是整个optimism rollup系统中较为重要的概念,在这一章节,我们将从介绍optimism中多种sync方式的原理,来揭开整个系统里区块的传递过程。

区块类型

在进行进一步深入前,让我们了解一些基本的概念。

  • Unsafe L2 Block (不安全的 L2 区块):

    • 这是指 L1 链上最高的 L2 区块,其 L1 起源是规范 L1 链的 可能 扩展(如 op-node 所知)。这意味着,尽管该区块链接到 L1 链,但其完整性和正确性尚未得到充分验证。
  • Safe L2 Block (安全的 L2 区块):

    • 这是指 L1 链上最高的 L2 区块,其 epoch 的序列窗口在规范的 L1 链中是完整的(如 op-node 所知)。这意味着该区块的所有前提条件都已在 L1 链上得到验证,因此它被认为是安全的。
  • Finalized L2 Block (定稿的 L2 区块):

    • 这是指已知完全源自定稿 L1 区块数据的 L2 区块。这意味着该区块不仅安全,而且已根据 L1 链的数据完全确认,不会再发生更改。

sync类型

  1. op-node p2p gossip 同步

    • op-node 通过 p2p gossip 协议接收最新的不安全区块,由 sequencer 推送的。
  2. op-node 基于libp2p的请求-响应的逆向区块头同步

    • 通过此同步方式,op-node 可以填补不安全区块的任何缺口。
  3. 执行层(EL,又名 engine sync)同步

    • 在 op-node 中有两个标志,允许来自 gossip 的不安全区块触发引擎中向这些区块的长范围同步。相关的标志是 --l2.engine-sync--l2.skip-sync-start-check(用于处理非常旧的安全区块)。然后,如果为此设置了 EL,它可以执行任何同步,例如 snap-sync(需要 op-geth p2p 连接等,并且需要从某些节点进行同步)。
  4. op-node RPC 同步

    • 这是一种基于可信 RPC 方法的同步,当 L1 出现问题时,这种同步方式相对简单。

op-node p2p gossip 同步

这种同步的场景处于:当l2的块新产生的时候,即在上一节我们讨论的sequencer模式下是如何产生新的区块的。

当产生新的区块后,sequencer通过基于libp2p的P2P网络的pub/sub(广播/订阅)模块,向’新unsafe区块‘ topic 发出广播。所有订阅了此topic的节点都会直接或间接的收到这一广播消息。详情可以查看

op-node 基于libp2p的请求-响应的逆向区块头同步

这种同步的场景处于:当节点因为特殊情况,比如宕机后重新链接,可能会产生一些没有同步上的区块(gaps)

当这种情况出现的时候,可以通过p2p网络的反向链的方式快速同步,即通过使用libp2p原生的stream流来和其他p2p节点建立链接,同时发送同步请求。详情可以查看

执行层(EL,又名 engine sync)同步

这种同步的场景处于:当有较多区块,一个大范围区块需要同步的时候,从l1慢慢派生比较慢,想要快速同步。

使用--l2.engine-sync--l2.skip-sync-start-check去启动op-node,发送的payload来达到发送长范围同步请求的目的。

代码层讲解

首先我们来看一下这两个标志的定义

op-node/flags/flags.go 中定义并解释了这两个flag的作用

  • L2EngineSyncEnabled Flag (l2.engine-sync):

    • 该标志用于启用或禁用执行引擎的 P2P 同步功能。当设置为 true 时,它允许执行引擎通过 P2P 网络与其他节点同步区块数据。它的默认值是 false,意味着在默认情况下,该 P2P 同步功能是禁用的。
  • SkipSyncStartCheck Flag (l2.skip-sync-start-check):

    • 该标志用于在确定同步起始点时,跳过对不安全 L2 区块的 L1 起源一致性的合理性检查。当设置为 true 时,它会推迟 L1 起源的验证。如果你正在使用 l2.engine-sync,建议启用此标志来跳过初始的一致性检查。它的默认值是 false,意味着在默认情况下,该合理性检查是启用的。
    L2EngineSyncEnabled = &cli.BoolFlag{
        Name:     "l2.engine-sync",
        Usage:    "Enables or disables execution engine P2P sync",
        EnvVars:  prefixEnvVars("L2_ENGINE_SYNC_ENABLED"),
        Required: false,
        Value:    false,
    }
    SkipSyncStartCheck = &cli.BoolFlag{
        Name: "l2.skip-sync-start-check",
        Usage: "Skip sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. " +
            "This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync",
        EnvVars:  prefixEnvVars("L2_SKIP_SYNC_START_CHECK"),
        Required: false,
        Value:    false,
    }

L2EngineSyncEnabled

L2EngineSyncEnabled标志用于在op-node接收到新的unsafe的payload(区块)后,发送给op-geth进一步验证时,触发op-geth的p2p之间sync,在sync期间所有的unsafe区块都会被视为通过验证,并进行下一个unsafe的流程。op-geth内部的p2p sync比较适用于长范围的unsafe区块的获取。其实在op-geth内部,不管L2EngineSyncEnabled标志有没有启用,在遇到parent区块不存在的时候,都会开启sync去同步数据。

让我们深入代码层面看一下
首先是 op-node/rollup/derive/engine_queue.go

EngineSync为L2EngineSyncEnabled标志的具体表达。在这里嵌套在两个检查函数当中。

   // checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
   // It returns true if the status is acceptable.
   func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
      if eq.syncCfg.EngineSync {
         // Allow SYNCING and ACCEPTED if engine P2P sync is enabled
         return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
      }
      return status == eth.ExecutionValid
   }

   // checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
   // It returns true if the status is acceptable.
   func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
      if eq.syncCfg.EngineSync {
         // Allow SYNCING if engine P2P sync is enabled
         return status == eth.ExecutionValid || status == eth.ExecutionSyncing
      }
      return status == eth.ExecutionValid
   }

让我们把视角转到op-geth的 eth/catalyst/api.go当中,当parent区块缺失后,触发sync,并且返回SYNCING Status

   func (api *ConsensusAPI) newPayload(params engine.ExecutableData) (engine.PayloadStatusV1, error) {
      …
      // If the parent is missing, we - in theory - could trigger a sync, but that
      // would also entail a reorg. That is problematic if multiple sibling blocks
      // are being fed to us, and even more so, if some semi-distant uncle shortens
      // our live chain. As such, payload execution will not permit reorgs and thus
      // will not trigger a sync cycle. That is fine though, if we get a fork choice
      // update after legit payload executions.
      parent := api.eth.BlockChain().GetBlock(block.ParentHash(), block.NumberU64()-1)
      if parent == nil {
         return api.delayPayloadImport(block)
      }
      …
   }
   func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (engine.PayloadStatusV1, error) {
      …
      if err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header()); err == nil {
         log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash())
         return engine.PayloadStatusV1{Status: engine.SYNCING}, nil
      }
      …
   }

SkipSyncStartCheck

SkipSyncStartCheck这个标识符主要是帮助在选择sync模式下,优化性能和减少不必要的检查。在已确认找到一个符合条件的L2块后,代码会跳过进一步的健全性检查,以加速同步或其他后续处理。这是一种优化手段,用于在确定性高的情况下快速地进行操作。

op-node/rollup/sync/start.go目录中

FindL2Heads函数通过从给定的“开始”(start)点(即之前的不安全L2区块)开始逐步回溯,来查找这三种类型的区块。在回溯过程中,该函数会检查各个L2区块的L1源是否与已知的L1规范链匹配,以及是否符合其他一些条件和检查。这允许函数更快地确定L2的“安全”头部,从而可能加速整个同步过程。

   func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain, lgr log.Logger, syncCfg *Config) (result *FindHeadsResult, err error) {
      …
      for {

         …

         if syncCfg.SkipSyncStartCheck && highestL2WithCanonicalL1Origin.Hash == n.Hash {
            lgr.Info("Found highest L2 block with canonical L1 origin. Skip further sanity check and jump to the safe head")
            n = result.Safe
            continue
         }
         // Pull L2 parent for next iteration
         parent, err := l2.L2BlockRefByHash(ctx, n.ParentHash)
         if err != nil {
            return nil, fmt.Errorf("failed to fetch L2 block by hash %v: %w", n.ParentHash, err)
         }

         // Check the L1 origin relation
         if parent.L1Origin != n.L1Origin {
            // sanity check that the L1 origin block number is coherent
            if parent.L1Origin.Number+1 != n.L1Origin.Number {
               return nil, fmt.Errorf("l2 parent %s of %s has L1 origin %s that is not before %s", parent, n, parent.L1Origin, n.L1Origin)
            }
            // sanity check that the later sequence number is 0, if it changed between the L2 blocks
            if n.SequenceNumber != 0 {
               return nil, fmt.Errorf("l2 block %s has parent %s with different L1 origin %s, but non-zero sequence number %d", n, parent, parent.L1Origin, n.SequenceNumber)
            }
            // if the L1 origin is known to be canonical, then the parent must be too
            if l1Block.Hash == n.L1Origin.Hash && l1Block.ParentHash != parent.L1Origin.Hash {
               return nil, fmt.Errorf("parent L2 block %s has origin %s but expected %s", parent, parent.L1Origin, l1Block.ParentHash)
            }
         } else {
            if parent.SequenceNumber+1 != n.SequenceNumber {
               return nil, fmt.Errorf("sequence number inconsistency %d <> %d between l2 blocks %s and %s", parent.SequenceNumber, n.SequenceNumber, parent, n)
            }
         }

         n = parent

         // once we found the block at seq nr 0 that is more than a full seq window behind the common chain post-reorg, then use the parent block as safe head.
         if ready {
            result.Safe = n
            return result, nil
         }
      }
   }

op-node RPC 同步

这种同步场景处于: 当你有信任的l2 rpc节点的时候,我们可以直接和rpc通信,发送较短范围的同步请求,和2类似。如果设置,在反向链同步中会优先使用RPC而不是P2P同步。

关键代码

op-node/node/node.go

初始化rpcSync,如果rpcSyncClient设置,赋值给rpcSync

   func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
      rpcSyncClient, rpcCfg, err := cfg.L2Sync.Setup(ctx, n.log, &cfg.Rollup)
      if err != nil {
         return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
      }
      if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
         return nil
      }
      syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, rpcCfg)
      if err != nil {
         return fmt.Errorf("failed to create sync client: %w", err)
      }
      n.rpcSync = syncClient
      return nil
   }

启动node,如果rpcSync非空,开启rpcSync eventloop

   func (n *OpNode) Start(ctx context.Context) error {
      n.log.Info("Starting execution engine driver")

      // start driving engine: sync blocks by deriving them from L1 and driving them into the engine
      if err := n.l2Driver.Start(); err != nil {
         n.log.Error("Could not start a rollup node", "err", err)
         return err
      }

      // If the backup unsafe sync client is enabled, start its event loop
      if n.rpcSync != nil {
         if err := n.rpcSync.Start(); err != nil {
            n.log.Error("Could not start the backup sync client", "err", err)
            return err
         }
         n.log.Info("Started L2-RPC sync service")
      }

      return nil
   }

op-node/sources/sync_client.go

一旦接收到s.requests通道里的信号后(区块号),调用fetchUnsafeBlockFromRpc函数从RPC节点中获取相应的区块信息。

   // eventLoop is the main event loop for the sync client.
   func (s *SyncClient) eventLoop() {
      defer s.wg.Done()
      s.log.Info("Starting sync client event loop")

      backoffStrategy := &retry.ExponentialStrategy{
         Min:       1000 * time.Millisecond,
         Max:       20_000 * time.Millisecond,
         MaxJitter: 250 * time.Millisecond,
      }

      for {
         select {
         case <-s.resCtx.Done():
            s.log.Debug("Shutting down RPC sync worker")
            return
         case reqNum := <-s.requests:
            _, err := retry.Do(s.resCtx, 5, backoffStrategy, func() (interface{}, error) {
               // Limit the maximum time for fetching payloads
               ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
               defer cancel()
               // We are only fetching one block at a time here.
               return nil, s.fetchUnsafeBlockFromRpc(ctx, reqNum)
            })
            if err != nil {
               if err == s.resCtx.Err() {
                  return
               }
               s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
               // Reschedule at end of queue
               select {
               case s.requests <- reqNum:
               default:
                  // drop syncing job if we are too busy with sync jobs already.
               }
            }
         }
      }
   }

接下来我们来看看从哪里往s.requests通道发送信号的呢?
同文件下的RequestL2Range函数,此函数介绍一个需要同步的区块范围,然后将任务通过for循环,分别发送出去。

   func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
      // Drain previous requests now that we have new information
      for len(s.requests) > 0 {
         select { // in case requests is being read at the same time, don't block on draining it.
         case <-s.requests:
         default:
            break
         }
      }

      endNum := end.Number
      if end == (eth.L2BlockRef{}) {
         n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix()))
         if err != nil {
            return err
         }
         if n <= start.Number {
            return nil
         }
         endNum = n
      }

      // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.

      s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1)

      for i := start.Number + 1; i < endNum; i++ {
         select {
         case s.requests <- i:
         case <-ctx.Done():
            return ctx.Err()
         }
      }
      return nil
   }

在外层的OpNode类型的RequestL2Range实现方法里。可以清楚的看到rpcSync类型的反向链同步是优先选择的。

   func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
      if n.rpcSync != nil {
         return n.rpcSync.RequestL2Range(ctx, start, end)
      }
      if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
         if unixTimeStale(start.Time, 12*time.Hour) {
            n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
            return nil
         }
         return n.p2pNode.RequestL2Range(ctx, start, end)
      }
      n.log.Debug("ignoring request to sync L2 range, no sync method available", "start", start, "end", end)
      return nil
   }

总结

理解了这些同步方式后,我们知道了unsafe的payload(区块)究竟是怎么进行传递的。不同的sync模块对应着在不同场景下的区块数据传递。那么整个网络中如何一步步的将unsafe的区块变成safe区块,然后再进行finalized的呢?这些内容会在其他章节进行讲解。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN

Optimism Sequencer 工作流程

Sequencer 工作流程

Sequencer 在 Layer 2 (L2) 解决方案中起到核心作用,主要负责交易汇总,L1 数据派生,L2 区块生成,L1 batch 数据提交,以及 L1 中 L2 state root 的提议。在本文中,我们将深入探讨 Sequencer 的工作原理和相关代码实现。在这部分我们主要讨论L2区块的产生流程

L2 区块生成

在一个更宏观的层面,sequencer在L2 区块的生成过程中实际上只是创建一个只包含 deposit 的模板块的 payload 。该 payload 随后被发送给 Execution Layer (EL),EL 从 txpool 中提取交易,然后进行 payload 包装以生成实际的区块。

代码流程

当操作节点(opnode)启动后,Driver 会启动一个 eventloop。在这个 eventloop 中,我们定义了 sequencerCh 通道和 planSequencerAction 方法。

sequencerTimer := time.NewTimer(0)
var sequencerCh <-chan time.Time
planSequencerAction := func() {
    delay := s.sequencer.PlanNextSequencerAction()
    sequencerCh = sequencerTimer.C
    if len(sequencerCh) > 0 { // 确保通道在重置前已被清空
        <-sequencerCh
    }
    sequencerTimer.Reset(delay)
}

planSequencerAction 方法中,我们重新设置了通道信号接收计时器的时间。而 PlanNextSequencerAction 方法则用于计算 RunNextSequencerAction 的延迟时间。

延迟时间解释

在这里,“延迟时间”是一个重要的概念。它决定了执行下一个序列化动作之前应该等待的时间。通过动态计算延迟时间,我们可以更灵活地控制序列化的频率和时机,从而优化系统的性能。

Event Loop 的循环结构

在 event loop 的 for 循环中,首先进行了一系列的检查。例如,我们检查是否启用了 sequencer 和 L1 状态是否已准备好,以确定是否可以触发下一个 sequencer 操作。

for {    
    // 主条件:检查 Sequencer 是否启用和 L1 状态是否准备好
    // 在这个 if 语句中,我们检查了几个关键条件来确定是否可以进行 sequencing,包括:
    // - Sequencer 是否启用
    // - Sequencer 是否停止
    // - L1 状态是否准备好
    // - Derivation pipeline 的引擎是否准备好
    if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
        s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() {

        // 检查安全滞后
        // 在这段代码中,我们监视安全和不安全的 L2 头之间的滞后,以确定是否需要暂停新区块的创建。
        if s.driverConfig.SequencerMaxSafeLag > 0 && s.derivation.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.derivation.UnsafeL2Head().Number {
            if sequencerCh != nil {
                s.log.Warn(
                    "Delay creating new block since safe lag exceeds limit",
                    "safe_l2", s.derivation.SafeL2Head(),
                    "unsafe_l2", s.derivation.UnsafeL2Head(),
                )
                sequencerCh = nil
            }
        // 更新 Sequencer 操作
        // 如果 sequencer 正在构建一个新的区块,并且 L1 状态已准备好,我们将更新下一个 sequencer 动作的触发器。
        } else if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
            planSequencerAction()
        }
    // 默认条件:在所有其他情况下,我们将 sequencerCh 设置为 nil,这意味着没有计划任何新的 sequencer 动作。
    } else {
        sequencerCh = nil
    }
}

总结

在 event loop 的循环结构中,我们进行了一系列的检查来确定是否可以触发下一个 sequencer 操作。

在通过检查的过程中,第一次planSequencerAction设置了计时器。

接下来查看

select {
case <-sequencerCh:
    payload, err := s.sequencer.RunNextSequencerAction(ctx)
    if err != nil {
        s.log.Error("Sequencer critical error", "err", err)
        return
    }
    if s.network != nil && payload != nil {
        // Publishing of unsafe data via p2p is optional.
        // Errors are not severe enough to change/halt sequencing but should be logged and metered.
        if err := s.network.PublishL2Payload(ctx, payload); err != nil {
            s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
            s.metrics.RecordPublishingError()
        }
    }
    planSequencerAction() // schedule the next sequencer action to keep the sequencing looping

这部分代码是等待刚才计时器到达设定的时间后,被计时器发出的消息所触发。它首先尝试执行下一个序列化动作。如果这个动作成功了,它会尝试通过网络来发布新创建的负载。无论如何,它最终都会调用 planSequencerAction 函数来计划下一个序列化动作,这样就创建了一个持续的循环来处理序列化动作。

接下来让我们查看被触发的RunNextSequencerAction函数的内容

// RunNextSequencerAction starts new block building work, or seals existing work,
// and is best timed by first awaiting the delay returned by PlanNextSequencerAction.
// If a new block is successfully sealed, it will be returned for publishing, nil otherwise.
//
// Only critical errors are bubbled up, other errors are handled internally.
// Internally starting or sealing of a block may fail with a derivation-like error:
//   - If it is a critical error, the error is bubbled up to the caller.
//   - If it is a reset error, the ResettableEngineControl used to build blocks is requested to reset, and a backoff applies.
//     No attempt is made at completing the block building.
//   - If it is a temporary error, a backoff is applied to reattempt building later.
//   - If it is any other error, a backoff is applied and building is cancelled.
//
// Upon L1 reorgs that are deep enough to affect the L1 origin selection, a reset-error may occur,
// to direct the engine to follow the new L1 chain before continuing to sequence blocks.
// It is up to the EngineControl implementation to handle conflicting build jobs of the derivation
// process (as verifier) and sequencing process.
// Generally it is expected that the latest call interrupts any ongoing work,
// and the derivation process does not interrupt in the happy case,
// since it can consolidate previously sequenced blocks by comparing sequenced inputs with derived inputs.
// If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish,
// but the derivation can continue to reset until the chain is correct.
// If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayload, error) {
    if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) {
        if safe {
            d.log.Warn("avoiding sequencing to not interrupt safe-head changes", "onto", onto, "onto_time", onto.Time)
            // approximates the worst-case time it takes to build a block, to reattempt sequencing after.
            d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.config.BlockTime))
            return nil, nil
        }
        payload, err := d.CompleteBuildingBlock(ctx)
        if err != nil {
            if errors.Is(err, derive.ErrCritical) {
                return nil, err // bubble up critical errors.
            } else if errors.Is(err, derive.ErrReset) {
                d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err)
                d.metrics.RecordSequencerReset()
                d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.config.BlockTime)) // hold off from sequencing for a full block
                d.CancelBuildingBlock(ctx)
                d.engine.Reset()
            } else if errors.Is(err, derive.ErrTemporary) {
                d.log.Error("sequencer failed temporarily to seal new block", "err", err)
                d.nextAction = d.timeNow().Add(time.Second)
                // We don't explicitly cancel block building jobs upon temporary errors: we may still finish the block.
                // Any unfinished block building work eventually times out, and will be cleaned up that way.
            } else {
                d.log.Error("sequencer failed to seal block with unclassified error", "err", err)
                d.nextAction = d.timeNow().Add(time.Second)
                d.CancelBuildingBlock(ctx)
            }
            return nil, nil
        } else {
            d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions))
            return payload, nil
        }
    } else {
        err := d.StartBuildingBlock(ctx)
        if err != nil {
            if errors.Is(err, derive.ErrCritical) {
                return nil, err
            } else if errors.Is(err, derive.ErrReset) {
                d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err)
                d.metrics.RecordSequencerReset()
                d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.config.BlockTime)) // hold off from sequencing for a full block
                d.engine.Reset()
            } else if errors.Is(err, derive.ErrTemporary) {
                d.log.Error("sequencer temporarily failed to start building new block", "err", err)
                d.nextAction = d.timeNow().Add(time.Second)
            } else {
                d.log.Error("sequencer failed to start building new block with unclassified error", "err", err)
                d.nextAction = d.timeNow().Add(time.Second)
            }
        } else {
            parent, buildingID, _ := d.engine.BuildingPayload() // we should have a new payload ID now that we're building a block
            d.log.Info("sequencer started building new block", "payload_id", buildingID, "l2_parent_block", parent, "l2_parent_block_time", parent.Time)
        }
        return nil, nil
    }
}

这段代码定义了一个名为 RunNextSequencerAction 的方法,它是 Sequencer 结构的一部分。这个方法的目的是管理区块的创建和封装过程,根据当前的状态和遇到的任何错误来决定下一步的操作。

以下是该方法的主要工作流程和组件:
检查当前的区块创建状态:
使用 d.engine.BuildingPayload() 来检查当前是否有一个正在创建的区块。

处理正在创建的区块:
如果有一个正在创建的区块,它会检查是否安全继续创建。如果是这样,它会稍后重新尝试。如果不是这样,它会尝试完成区块创建。

错误处理:
在尝试完成区块创建时可能会遇到各种错误。这些错误被分类并适当处理:

严重错误:这些错误会被传递给调用者。
重置错误:这会导致排序器重置,并延迟后续的排序尝试。
临时错误:这只会导致短暂的延迟再次尝试。
其他错误:这将取消当前的区块创建任务,并稍后重新尝试。
成功创建区块:
如果区块成功创建,它将记录一个消息,并返回新创建的区块和nil错误。

开始一个新的区块创建任务:
如果当前没有区块正在创建,它将开始一个新的区块创建任务。这包括一个与上面相似的错误处理流程。

日志记录:
在整个方法中,根据不同的情况和结果,会有多个日志消息被记录,以帮助跟踪排序器的状态和行为。

让我们来突出关键的步骤,主要是两部分,一部分是完成完全的构建,一个是开启新的区块的构建。

首先让我们看一下开始新的区块构建的过程

func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
…
attrs, err := d.attrBuilder.PreparePayloadAttributes(fetchCtx, l2Head, l1Origin.ID())
if err != nil {
    return err
}

…
attrs.NoTxPool = uint64(attrs.Timestamp) > l1Origin.Time+d.config.MaxSequencerDrift

…
// Start a payload building process.
errTyp, err := d.engine.StartPayload(ctx, l2Head, attrs, false)
if err != nil {
    return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
}
…

}

在这段代码中, RunNextSequencerAction 方法及其在区块创建和封装过程中的作用如下

方法细节和解释

在这一部分,我们将深入探讨创建新区块的方法及其组成部分。

Optimism 中的重要概念

在我们深入探讨 PreparePayloadAttributes 之前,我们需要先理解 Optimism 网络中的两个重要概念:Sequencing WindowSequencing Epoch

Sequencing Window

在合并后的以太坊网络中,L1 的固定区块时间是 12 秒,而 L2 的区块时间是 2 秒。基于这个设置,我们可以明确“Sequencing Window”的概念,并通过一个示例来阐述它:

  • 示例:如果我们设定一个“Sequencing Window”来包含 5 个 L1 区块,那么这个窗口的总时间将为 60 秒(12 秒/区块 × 5 区块 = 60 秒)。在这 60 秒的时间段里,理论上可以产生 30 个 L2 区块(60 秒/2 秒 = 30)。

Sequencing Epoch

“Sequencing Epoch”是根据特定的“Sequencing Window”派生的一系列 L2 区块。

  • 示例:在一个“Sequencing Window”中包含了 5 个 L1 区块,这意味着它涵盖了 60 秒的时间段(12 秒/区块 × 5 区块)。在这段时间里,理论上可以生成 30 个 L2 区块(60 秒 ÷ 2 秒/区块 = 30 区块)。

适应网络变化

在一些特殊情况下,为了保持网络的活跃性,我们可以通过增加“epoch”的长度来应对 L1 插槽被跳过或临时失去与 L1 的连接的情况。相反,为了防止 L2 时间戳逐渐超前于 L1,我们可能需要缩短“epoch”的时间来进行调整。

通过这样的设计,系统能够灵活而高效地调整区块生成的策略,确保网络的稳定和安全。

函数详解

在下面的函数中,我们可以看到传入的 epoch 参数是 l1Origin.ID()。这符合我们对 epoch 编号的定义。函数负责准备创建新 L2 块的所有必要属性。

    attrs, err := d.attrBuilder.PreparePayloadAttributes(fetchCtx, l2Head, l1Origin.ID())
func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) {
    var l1Info eth.BlockInfo
    var depositTxs []hexutil.Bytes
    var seqNumber uint64

sysConfig, err := ba.l2.SystemConfigByL2Hash(ctx, l2Parent.Hash)
if err != nil {
    return nil, NewTemporaryError(fmt.Errorf("failed to retrieve L2 parent block: %w", err))
}

// If the L1 origin changed this block, then we are in the first block of the epoch. In this
// case we need to fetch all transaction receipts from the L1 origin block so we can scan for
// user deposits.
if l2Parent.L1Origin.Number != epoch.Number {
    info, receipts, err := ba.l1.FetchReceipts(ctx, epoch.Hash)
    if err != nil {
        return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
    }
    if l2Parent.L1Origin.Hash != info.ParentHash() {
        return nil, NewResetError(
            fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
                epoch, info.ParentHash(), l2Parent.L1Origin))
    }

    deposits, err := DeriveDeposits(receipts, ba.cfg.DepositContractAddress)
    if err != nil {
        // deposits may never be ignored. Failing to process them is a critical error.
        return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err))
    }
    // apply sysCfg changes
    if err := UpdateSystemConfigWithL1Receipts(&sysConfig, receipts, ba.cfg); err != nil {
        return nil, NewCriticalError(fmt.Errorf("failed to apply derived L1 sysCfg updates: %w", err))
    }

    l1Info = info
    depositTxs = deposits
    seqNumber = 0
} else {
    if l2Parent.L1Origin.Hash != epoch.Hash {
        return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin))
    }
    info, err := ba.l1.InfoByHash(ctx, epoch.Hash)
    if err != nil {
        return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err))
    }
    l1Info = info
    depositTxs = nil
    seqNumber = l2Parent.SequenceNumber + 1
}

// Sanity check the L1 origin was correctly selected to maintain the time invariant between L1 and L2
nextL2Time := l2Parent.Time + ba.cfg.BlockTime
if nextL2Time < l1Info.Time() {
    return nil, NewResetError(fmt.Errorf("cannot build L2 block on top %s for time %d before L1 origin %s at time %d",
        l2Parent, nextL2Time, eth.ToBlockID(l1Info), l1Info.Time()))
}

l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info, sysConfig, ba.cfg.IsRegolith(nextL2Time))
if err != nil {
    return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err))
}

txs := make([]hexutil.Bytes, 0, 1+len(depositTxs))
txs = append(txs, l1InfoTx)
txs = append(txs, depositTxs...)

return &eth.PayloadAttributes{
    Timestamp:             hexutil.Uint64(nextL2Time),
    PrevRandao:            eth.Bytes32(l1Info.MixDigest()),
    SuggestedFeeRecipient: predeploys.SequencerFeeVaultAddr,
    Transactions:          txs,
    NoTxPool:              true,
    GasLimit:              (*eth.Uint64Quantity)(&sysConfig.GasLimit),
}, nil

}

在这个函数中,我们可以看到传入的epoch参数是l1Origin.ID()。符合我们epoch编号的定义。

如代码所示,PreparePayloadAttributes 准备新区块的有效载荷属性,它首先根据L1和L2的父块信息确定是否需要获取新的L1存款和系统配置数据。然后它创建一个特殊的系统交易,其中包含与L1块相关的信息和系统配置。这个特殊的交易和其他可能的L1存款交易一起构成了一个交易集,这将被包含在新的L2块的有效负载中。函数确保了时间的一致性和正确的序列号分配,最后返回一个包含所有这些信息的PayloadAttributes结构,以用于新L2块的创建。但在这里,我们只是准备了一个初步的 payload,它仅包含 L1 中的 deposit 交易。之后,我们调用 StartPayload 来开始 payload 的下一步构建。

在获取Attribute后,我们继续往下看

attrs.NoTxPool = uint64(attrs.Timestamp) > l1Origin.Time+d.config.MaxSequencerDrift

判断是否需要产生空区块,注意这里的空区块也至少包含L1信息存款和任何用户存款。如果需要产生空区块,我们通过设置NoTxPool为true来处理,这将导致排序器不包含来自事务池的任何交易。

接下来会调用StartPayload去开启这个payload的构建

errTyp, err := d.engine.StartPayload(ctx, l2Head, attrs, false)
if err != nil {
    // 如果在启动有效载荷构建过程时出现错误,则返回格式化的错误消息
    return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
}

StartPayload 函数

StartPayload 主要是触发了ForkchoiceUpdate和更新了EngineQueue中的building的一些状态,如buildingID等,后续再次RunNextSequencerAction时会根据这个id来找找到正在构建的ID

    func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) {
        if eq.isEngineSyncing() {
            return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
        }
        if eq.buildingID != (eth.PayloadID{}) {
            eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent)
            // TODO: maybe worth it to force-cancel the old payload ID here.
        }
        fc := eth.ForkchoiceState{
            HeadBlockHash:      parent.Hash,
            SafeBlockHash:      eq.safeHead.Hash,
            FinalizedBlockHash: eq.finalized.Hash,
        }
        id, errTyp, err := StartPayload(ctx, eq.engine, fc, attrs)
        if err != nil {
            return errTyp, err
        }
        eq.buildingID = id
        eq.buildingSafe = updateSafe
        eq.buildingOnto = parent
        return BlockInsertOK, nil
    }
   func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) {
    …
    fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
    …
   }

在这个函数中内部调用ForkchoiceUpdate,我们可以看到一个新的 Payload ID 被创建,并且正在构建的 ID 和其他相关参数也被更新。

ForkchoiceUpdate 函数

紧接着,ForkchoiceUpdate 函数被调用来处理 ForkChoice 的更新。这个函数是一个包装函数,它调用 engine_forkchoiceUpdatedV1 来触发 EL 产生新的区块。

ForkchoiceUpdate函数是对调用的包装方法,其内部处理了对engine层(op-geth)的engineApi的调用,这里调用了engine_forkchoiceUpdatedV1去由EL产生区块

var result eth.ForkchoiceUpdatedResult
err := s.client.CallContext(fcCtx, &result, "engine_forkchoiceUpdatedV1", fc, attributes)

这个函数内部调用了 engine_forkchoiceUpdatedV1 方法来处理 Fork Choice 的更新和新的 Payload 的创建。

op-geth 中的 ForkchoiceUpdated 函数

接下来,我们将视角转到 op-geth 中来看一下 forkchoiceUpdated 函数的实现。

在op-geth中,处理改请求的为forkchoiceUpdated函数,此函数首先获取和验证与提供的 fork choice 状态相关的各种区块,然后基于这些信息和可选的负载属性来创建一个新的负载(即一个新的区块)。如果负载创建成功,它将返回一个包含新负载 ID 的有效响应,否则它将返回一个错误。
关键代码如下

if payloadAttributes != nil {
    if api.eth.BlockChain().Config().Optimism != nil && payloadAttributes.GasLimit == nil {
        return engine.STATUS_INVALID, engine.InvalidPayloadAttributes.With(errors.New("gasLimit parameter is required"))
    }
    transactions := make(types.Transactions, 0, len(payloadAttributes.Transactions))
    for i, otx := range payloadAttributes.Transactions {
        var tx types.Transaction
        if err := tx.UnmarshalBinary(otx); err != nil {
            return engine.STATUS_INVALID, fmt.Errorf("transaction %d is not valid: %v", i, err)
        }
        transactions = append(transactions, &tx)
    }
    args := &miner.BuildPayloadArgs{
        Parent:       update.HeadBlockHash,
        Timestamp:    payloadAttributes.Timestamp,
        FeeRecipient: payloadAttributes.SuggestedFeeRecipient,
        Random:       payloadAttributes.Random,
        Withdrawals:  payloadAttributes.Withdrawals,
        NoTxPool:     payloadAttributes.NoTxPool,
        Transactions: transactions,
        GasLimit:     payloadAttributes.GasLimit,
    }
    id := args.Id()
    // If we already are busy generating this work, then we do not need
    // to start a second process.
    if api.localBlocks.has(id) {
        return valid(&id), nil
    }
    payload, err := api.eth.Miner().BuildPayload(args)
    if err != nil {
        log.Error("Failed to build payload", "err", err)
        return valid(nil), engine.InvalidPayloadAttributes.With(err)
    }
    api.localBlocks.put(id, payload)
    return valid(&id), nil
}

在这里,首先把刚才我们在op-node中创建的payload加载到args当中,再把args传到BuildPayload函数当中

// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
    // Build the initial version with no transaction included. It should be fast
    // enough to run. The empty payload can at least make sure there is something
    // to deliver for not missing slot.
    empty, _, err := w.getSealingBlock(args.Parent, args.Timestamp, args.FeeRecipient, args.Random, args.Withdrawals, true, args.Transactions, args.GasLimit)
    if err != nil {
        return nil, err
    }
    // Construct a payload object for return.
    payload := newPayload(empty, args.Id())
    if args.NoTxPool { // don't start the background payload updating job if there is no tx pool to pull from
        return payload, nil
    }

    // Spin up a routine for updating the payload in background. This strategy
    // can maximum the revenue for including transactions with highest fee.
    go func() {
        // Setup the timer for re-building the payload. The initial clock is kept
        // for triggering process immediately.
        timer := time.NewTimer(0)
        defer timer.Stop()

        // Setup the timer for terminating the process if SECONDS_PER_SLOT (12s in
        // the Mainnet configuration) have passed since the point in time identified
        // by the timestamp parameter.
        endTimer := time.NewTimer(time.Second * 12)

        for {
            select {
            case <-timer.C:
                start := time.Now()
                block, fees, err := w.getSealingBlock(args.Parent, args.Timestamp, args.FeeRecipient, args.Random, args.Withdrawals, false, args.Transactions, args.GasLimit)
                if err == nil {
                    payload.update(block, fees, time.Since(start))
                }
                timer.Reset(w.recommit)
            case <-payload.stop:
                log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery")
                return
            case <-endTimer.C:
                log.Info("Stopping work on payload", "id", payload.id, "reason", "timeout")
                return
            }
        }
    }()
    return payload, nil
}

初始化阶段:

首先,它使用提供的参数(但不包含任何交易)快速构建一个初始版本的空负载(即一个不包含任何交易的区块)。
如果在这一步中遇到错误,它将返回该错误。
构建返回负载对象:

接着,它使用刚创建的空区块来创建一个负载对象,该对象将被返回给调用者。
如果参数 args.NoTxPool 为真,这意味着没有交易池来从中获取交易,函数将结束并返回当前的负载对象。
后台更新负载:

如果 args.NoTxPool 为假,则启动一个后台goroutine来定期更新负载,以包含更多的交易和更新状态。
这个后台进程有两个计时器:
一个用于控制何时重新创建负载来包含新的交易。
另一个用于设置一个超时,超过这个时间后,后台进程将停止工作。
在每次计时器触发时,它都会尝试获取一个新的区块来更新负载,如果成功,它将更新负载对象中的数据。
如果负载被交付或时间超时,后台进程将停止。
通过这种方式,buildPayload 函数确保了一个初始的负载快速可用,同时后台进程尽可能地通过包含更多的交易来优化负载。这种策略旨在最大化通过包含高费用交易来获得的收入。

那么当args.NoTxPool为假时,究竟是怎么运行的呢?
答案藏在 getSealingBlock函数里

func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, withdrawals types.Withdrawals, noTxs bool, transactions types.Transactions, gasLimit *uint64) (*types.Block, *big.Int, error) {
    req := &getWorkReq{
        params: &generateParams{
            timestamp:   timestamp,
            forceTime:   true,
            parentHash:  parent,
            coinbase:    coinbase,
            random:      random,
            withdrawals: withdrawals,
            noUncle:     true,
            noTxs:       noTxs,
            txs:         transactions,
            gasLimit:    gasLimit,
        },
        result: make(chan *newPayloadResult, 1),
    }
    select {
    case w.getWorkCh <- req:
        result := <-req.result
        if result.err != nil {
            return nil, nil, result.err
        }
        return result.block, result.fees, nil
    case <-w.exitCh:
        return nil, nil, errors.New("miner closed")
    }
}

在这个部分,我们看到 mainLoop 函数通过监听 getWorkCh 通道来接收新的 Payload 创建请求。一旦接收到请求,它就会触发 generateWork 函数来开始新 Payload 的创建过程。

case req := <-w.getWorkCh:
    block, fees, err := w.generateWork(req.params)
    req.result <- &newPayloadResult{
        err:   err,
        block: block,
        fees:  fees,
    }

GenerateWork 函数

GenerateWork 函数是新 Payload 创建流程的最后一步。它负责准备工作并创建新的区块。



    // generateWork generates a sealing block based on the given parameters.
    func (w *worker) generateWork(genParams *generateParams) (*types.Block, *big.Int, error) {
        work, err := w.prepareWork(genParams)
        if err != nil {
            return nil, nil, err
        }
        defer work.discard()
        if work.gasPool == nil {
            work.gasPool = new(core.GasPool).AddGas(work.header.GasLimit)
        }

        for _, tx := range genParams.txs {
            from, _ := types.Sender(work.signer, tx)
            work.state.SetTxContext(tx.Hash(), work.tcount)
            _, err := w.commitTransaction(work, tx)
            if err != nil {
                return nil, nil, fmt.Errorf("failed to force-include tx: %s type: %d sender: %s nonce: %d, err: %w", tx.Hash(), tx.Type(), from, tx.Nonce(), err)
            }
            work.tcount++
        }

        // forced transactions done, fill rest of block with transactions
        if !genParams.noTxs {
            interrupt := new(atomic.Int32)
            timer := time.AfterFunc(w.newpayloadTimeout, func() {
                interrupt.Store(commitInterruptTimeout)
            })
            defer timer.Stop()

            err := w.fillTransactions(interrupt, work)
            if errors.Is(err, errBlockInterruptedByTimeout) {
                log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
            }
        }
        block, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts, genParams.withdrawals)
        if err != nil {
            return nil, nil, err
        }
        return block, totalFees(block, work.receipts), nil
    }

在这个函数中,我们可以看到详细的区块创建过程,包括交易的处理和区块的最终组装。

区块完成和确认流程

在这一部分,我们将继续探讨 Sequencer 模式下的区块产生流程。这个阶段主要涉及到区块的完成和确认过程。以下我们将详细分析每个步骤和函数的作用。

区块的构建和优化

在开始阶段,我们首先在内存池中构建一个新的区块。这里特别注意到 NoTxPool 参数的应用,它是之前在 Sequencer 中设置的。这一段代码负责区块的初步构建和后续的优化工作。

其中关键步骤在于

if !genParams.noTxs {
    interrupt := new(atomic.Int32)
    timer := time.AfterFunc(w.newpayloadTimeout, func() {
        interrupt.Store(commitInterruptTimeout)
    })
    defer timer.Stop()

    err := w.fillTransactions(interrupt, work)
    if errors.Is(err, errBlockInterruptedByTimeout) {
        log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
    }
}

这一部分终于用到了在之前在sequencer中设置的NoTxPool参数,然后开始在内存池中构建新的区块(这里的内存池里的交易来自自身和其他节点,且由于gossip是默认关闭的,其他节点之间是没有内存池交易互通的,因此这就是为什么sequencer的内存池是私有的原因)

至此,block已经在sequencer的节点中产生区块了。但是buildPayload函数创建的是一个初始的、结构上正确的区块,并在一个后台进程中不断优化它以增加其交易内容和潜在的矿工收益,但它的有效性和认可是依赖于后续的网络共识过程的。也就是说他还需要后续的步骤来停止这个更新,而确定一个最终的块。

接下来让我们回到sequencer当中
在当前阶段,我们已经确定了在EngineQueue当中的buildingID已经设置,并且由这个payload派生的区块已经在op-geth中产生。
接下来,sequencer由于在最开始设置的time定时器触发,再次调用RunNextSequencerAction方法。
进入判断 但是在这次,我们的buildingID已经存在,因此进入CompleteBuildingBlock的阶段。

if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) {
        …
        payload, err := d.CompleteBuildingBlock(ctx)
        …
    }

CompleteBuildingBlock在内部调用了ConfirmPayload方法

// ConfirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// If updateSafe is true, then the payload will also be recognized as safe-head at the same time.
// The severity of the error is distinguished to determine whether the payload was valid and can become canonical.
func ConfirmPayload(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, id eth.PayloadID, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
    payload, err := eng.GetPayload(ctx, id)
    …
    …
    status, err := eng.NewPayload(ctx, payload)
    …
    …
    fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, nil)
    …
    return payload, BlockInsertOK, nil
}

在这里可以参考这张oplabs给出来的插图

ENGINE

这张导图主要是描述了create blocks的过程,

Rollup 驱动程序实际上并不真正创建区块。相反,它通过 Engine API 指导执行引擎这样做。在上述每次块派生循环的迭代中,rollup 驱动程序将制作一个 payload 属性对象并将其发送到执行引擎。然后执行引擎将 payload 属性对象转换为一个区块,并将其添加到链中。Rollup 驱动程序的基本序列如下:

  1. 使用 payload 属性对象调用 engine_forkChoiceUpdatedV1。我们现在先跳过 fork choice state 参数的详细信息 - 只需知道它的一个字段是 L2 链的 headBlockHash,它被设置为 L2 链尖端的区块哈希。Engine API 返回一个 payload ID。
  2. 使用第1步返回的 payload ID 调用 engine_getPayloadV1。引擎 API 返回一个包含区块哈希作为其字段之一的 payload 对象。
  3. 使用第2步返回的 payload 调用 engine_newPayloadV1。
  4. 使用 fork choice 参数的 headBlockHash 设置为第2步返回的区块哈希来调用 engine_forkChoiceUpdatedV1。现在,L2 链的尖端是在第1步中创建的区块。

第一步的engine_forkChoiceUpdatedV1是我们从一开始开始构建的过程,而第二三四步就在ConfirmPayload方法里。

第二步 GetPayload方法
获取我们第一步构建的块的ExecutionPayload

// Resolve returns the latest built payload and also terminates the background
// thread for updating payload. It's safe to be called multiple times.
func (payload *Payload) Resolve() *engine.ExecutionPayloadEnvelope {
    payload.lock.Lock()
    defer payload.lock.Unlock()

    select {
    case <-payload.stop:
    default:
        close(payload.stop)
    }
    if payload.full != nil {
        return engine.BlockToExecutableData(payload.full, payload.fullFees)
    }
    return engine.BlockToExecutableData(payload.empty, big.NewInt(0))
}

GetPayload方法通过向我们第一步开启的协程中的payload.stop通道发送型号,来停止block的重构。同时将最新的block的数据(ExecutionPayload)发送回sequencer(op-node)

第三步 NewPayload方法

func (api *ConsensusAPI) newPayload(params engine.ExecutableData) (engine.PayloadStatusV1, error) {
    …
    block, err := engine.ExecutableDataToBlock(params)
    if err != nil {
        log.Debug("Invalid NewPayload params", "params", params, "error", err)
        return engine.PayloadStatusV1{Status: engine.INVALID}, nil
    }
    …
    if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
        log.Warn("NewPayloadV1: inserting block failed", "error", err)

        api.invalidLock.Lock()
        api.invalidBlocksHits[block.Hash()] = 1
        api.invalidTipsets[block.Hash()] = block.Header()
        api.invalidLock.Unlock()

        return api.invalid(err, parent.Header()), nil
    }
    …
}

这里先根据我们最终确认的payload相关参数,构建一个区块,再将这个区块插入我们的blockChain当中。

第四步 ForkchoiceUpdate方法

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
    …
    …
    if update.FinalizedBlockHash != (common.Hash{}) {
        if merger := api.eth.Merger(); !merger.PoSFinalized() {
            merger.FinalizePoS()
        }
        // If the finalized block is not in our canonical tree, somethings wrong
        finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash)
        if finalBlock == nil {
            log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash)
            return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("final block not available in database"))
        } else if rawdb.ReadCanonicalHash(api.eth.ChainDb(), finalBlock.NumberU64()) != update.FinalizedBlockHash {
            log.Warn("Final block not in canonical chain", "number", block.NumberU64(), "hash", update.HeadBlockHash)
            return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("final block not in canonical chain"))
        }
        // Set the finalized block
        api.eth.BlockChain().SetFinalized(finalBlock.Header())
    }
    …
}

通过SetFinalized将我们之前几步产生的block进行Finalized
此方法将一个特定的区块标记为已“最终确定(finalized)”。在区块链网络中,当一个区块被标记为“最终确定(finalized)”时,意味着该区块及其所有先前的区块都不可逆转,它们将永远成为区块链的一部分。这是一个非常重要的安全特性,确保一旦一个区块被最终确定,则它不可能被另一个分叉所替代。

这样,一个基础的l2的block的构建就算是完成了,后续的工作就是把这个新的l2的信息记录在sequencer当中,让我们返回到ConfirmPayload函数中

    payload, errTyp, err := ConfirmPayload(ctx, eq.log, eq.engine, fc, eq.buildingID, eq.buildingSafe)
    if err != nil {
        return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", eq.buildingOnto, eq.buildingID, errTyp, err)
    }
    ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
    if err != nil {
        return nil, BlockInsertPayloadErr, NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
    }

    eq.unsafeHead = ref
    eq.engineSyncTarget = ref
    eq.metrics.RecordL2Ref("l2_unsafe", ref)
    eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref)

可以看到payload被解析为PayloadToBlockRe(PayloadToBlockRef extracts the essential L2BlockRef information from an execution payload, falling back to genesis information if necessary.)例如unsafeHead。这些数据会被后续的例如区块传播等步骤使用

这样一个完整sequencer模式下的区块产生的流程将结束了。 下一章讲继续介绍在产生完区块后,sequencer是如何把这个区块传播给其他例如verifier的节点的。

转载自:https://github.com/joohhnnn/Understanding-Optimism-Codebase-CN