您正在查看: Layer2 分类下的文章

一文了解 TON 的技术特点与智能合约开发范式

简而言之,TON 的核心设计理念是以一种“自下而上”的方式重构传统的区块链协议,并以舍弃互操作性为代价,实现对高并发和高可扩展性的极致追求。

TON 的核心设计思想——高并发与高可扩展性

可以这么说,TON 中所有复杂的技术选型的目的都来自于对高并发与高可扩展性的追求,当然从其诞生的背景我们也不难理解这一点。TON,即 The Open Network,是一个去中心化的计算网络,包含一个 L1 区块链和多个组件。TON 最初由 Telegram 的创始人 Nikolai Durov 及其团队共同开发,而发展到现在则由全球独立贡献者的社区支持并维护。其诞生要追溯到 2017 年,Telegram 团队开始为自己探索区块链解决方案。由于当时没有现有的 L1 区块链能够支持 Telegram 的九位数用户基础,他们决定设计自己的区块链,当时称为 Telegram Open Network。时间来到了 2018 年,为了获得实现 TON 所需的资源,Telegram 在 2018 年第一季度发起了 Gram 代币(后来改名为 Toncoin)的销售。2020 年由于监管问题,Telegram 团队退出了 TON 项目。随后,一小部分开源开发者和 Telegram 比赛获胜者接手了 TON 的代码库,将项目名称更名为 The Open Network,并继续积极地开发区块链至今,且遵循原始 TON 白皮书中概述的原则。

那么既然是以作为 Telegram 的去中心化执行环境作为设计目标,自然要面对两个问题,高并发请求与海量数据,我们知道随着技术发展到现在,号称 TPS 最高的 Solana 实测最高 TPS 也只有 65000 ,这显然不足以支撑百万级 TPS 要求的 Telegram 生态。与此同时随着 Telegram 的大规模应用,其产生的数据量早已突破天际,而区块链作为一个极度冗余的分布式系统,若要求网络中每个节点都保存一份完整的数据,这也是不现实的。

因此为了解决上述两个问题,TON 对主流的区块链协议做出了两个方面的优化:

  • 通过采用“无限分片范式”(Infinite Sharding Paradigm)设计系统,解决数据冗余问题,使其可以承载大数据,同时缓解性能瓶颈问题;
  • 通过引入基于 Actor 模型的完全并行执行环境,极大的提升网络 TPS;

做区块链的链——通过无限分片能力让每个账户都有一条专属的账户链

当下我们知道,分片(sharding)已经成为了大部分区块链协议提升性能降低成本的主流方案,而 TON 则将这点做到了极致,并提出了无限分片范式,所谓无限分片范式,指的是允许区块链根据网络负载动态地增加或减少分片数量。这种范式使得 TON 能够在保持高性能的同时,处理大规模的交易和智能合约操作,理论上 TON 可以为每个账户都建立一条专属的账户链,并通过一定的规则保证这些链之间的一致性,

抽象的来理解,在 TON 中一共存在四层链结构:

  • 账户链(AccountChain):该层链表示与某个账户相关的一系列交易所组成的链,之所以交易可以组成链式结构,是因为对于一个状态机来说,只要执行规则一致,状态机在接收到相同顺序的指令后得到的结果是一致的,因此所有区块链分布式系统中都需要对交易进行链式排序,TON 也不例外。账户链是 TON 网络中最基本的组成单元,通常情况下账户链是一个虚拟的概念,不太可能真正存在一个独立的账户链。

  • 分片链(ShardChain):在大部分的语境下,分片链才是 TON 中实际的组成单元,所谓分片链,即为一组账户链的集合。

  • 工作链(WorkChain):也可以叫做一组有自定义规则的分片链,例如创建一个基于 EVM 的工作链,在其上运行 Solidity 智能合约。理论上,社区中的每个人都可以创建自己的工作链。事实上,构建它是一个相当复杂的任务,在此之前还要支付创建它的(昂贵)费用,并获得验证者的 2/3 的票数来批准创建你的工作链。

  • 主链(MasterChain):最后在 TON 中有一条特殊的链被称为主链,该链负责为所有分片链带来最终性。一旦分片链的区块的哈希值被合并到主链的区块中,该分片链区块及其所有父区块被认为具有最终性,这意味着它们可以被认为是固定且不可变的内容,而被所有分片链的后续区块引用。

通过采用这样的范式,使 TON 网络具备以下三个特点:

  • 动态分片: TON 可以自动拆分和合并分片链以适应负载的变化。这意味着新块总是快速生成,而交易不会产生很长的等待时间。

  • 高度可扩展: 通过无限分片范式,TON 能够支持几乎无限数量的分片,理论上可以达到 2 的 60 次方个工作链。

  • 自适应性: 当网络中的某个部分负载增加时,该部分可以被细分成更多的分片来处理增加的交易量。相反,当负载减少时,分片可以合并以提高效率。

那么这样一个多链系统,首先需要面临的就是跨链通信问题,尤其是由于具有无限分片的能力,当网络中的分片数量达到一定量级后,链与链之间的信息路由将成为一件困难的事情。试想一下网络中共有 4 个节点,每个节点负责维护 1 条独立的工作链,其中链接关系表示该节点除了负责自身的工作链中交易排序工作之外,还需要监听并处理目标链中状态变化,在 TON 中具体通过监听输出队列的消息实现,

假设工作链 1 中的账户 A 希望向工作链 3 中的账户 C 发送一个消息。则需要设计到消息路由问题,在这个例子中有两条路由路径,工作链 1 -> 工作链 2-> 工作链 3 ,工作链 1 -> 工作链 4 -> 工作链 3 。

当面临更复杂的情况时,就需要一个高效且低成本的路由算法快速完成消息通信,TON 选择了所谓“超立方体路由算法”来实现跨链消息通信路由发现。所谓超立方体结构指的是一种特殊的网络拓扑结构,一个 n 维超立方体是由 2 ^n 个顶点组成的,每个顶点都可以通过一个 n 位的二进制数来唯一标识。在这个结构中,任意两个顶点如果在二进制表示中只有一位不同,那么它们就是相邻的。例如,在一个 3 维超立方体中,顶点 000 和顶点 001 是相邻的,因为它们只在最后一位上不同。而上述例子即是一个 2 维超立方体。

在超立方体路由协议中,消息将从源工作链到目标工作链的路由过程是通过比较源工作链和目标工作链地址的二进制表示来进行的。路由算法会找到这两个地址之间的最小距离(即二进制表示中不同位的数量),并通过相邻工作链逐步转发信息,直到达到目标工作链。这种方法能够确保数据包沿着最短路径传输,从而提高了网络的通信效率。

当然为了简化这个过程,TON 也提出了一个乐观技术方案,当用户可以提供对某个路由路径的有效证明,这通常是某个 merkle trie root,节点即可直接承认该用户提交的消息的可信性,这也被称为即时超立方体路由。

因此我们可以看到 TON 中的地址和其他区块链协议有着明显的区别,其他主流区块链协议大都采用椭圆加密算法生成的公私钥中公钥对应的哈希作为地址,因为地址只是做唯一性区分,而不需要承载路由寻址的功能,而 TON 中的地址有两部分组成,(workchain_id, account_id),其中 workchain_id 即按照超立方体路由算法地址进行编码,在这里就不详细展开了。

还有一个容易产生疑问的点,你可能已经发觉到主链和每个工作链均有链接关系,那么所有跨链信息均通过主链做中继不就可以了么,就像是 cosmos 那样。在 TON 的设计理念中,主链仅用于处理最关键的任务,即维护众多工作链的最终性,将消息通过主链做路由也不是不行,只是由此产生的手续费用将十分昂贵。

最后简单提一下其共识算法,TON 采用了 BFT+PoS 的方式,即任意 staker 均有机会参与区块打包,TON 的选举治理合约会每隔一段时间,从所有 Stakers 中随机选择一个打包的验证者集群,被选中称为验证者的节点将通过 BFT 算法打包出块,若打包错误信息或作恶,其 stake 的 token 将会被罚没,反之将得到出块奖励。这基本上已经是一个比较常见的选择了,因此不在这里展开介绍。

基于 Actor 模型的智能合约和完全并行执行环境

TON 中另一个与主流区块链协议不同的点是其智能合约执行环境。为了突破主流区块链协议 TPS 的限制,TON 采用了自下而上的设计思路,采用 Actor 模型重构了智能合约及其执行方式,使其具备了完全并行执行的能力。

我们知道主流的区块链协议大都采用的是单线程串行的执行环境,以 Ethereum 为例,其执行环境 EVM 是一个以交易作为输入的状态机,当出块节点通过打包区块完成对交易的排序后,将以该顺序通过 EVM 执行交易,整个过程是完全串行并单线程的,即某个时刻只能有一笔被执行,这样做的好处是只要确认了交易顺序,执行的结果在广泛的分布式集群中就具有一致性,与此同时由于同时只有一笔交易被串行执行,这就意味着在执行过程中,不可能存在其他交易对某待访问状态数据进行修改,这样就实现了智能合约之间的互操作性。例如我们通过 Uniswap 使用 USDT 购买 ETH,当该交易被执行时,该交易对中 LP 的分布情况即为一个确定值,这样就可以通过某些数学模型得出对应的结果,但假设情况不是这样的,在执行某 bonding curve 的计算时,有其他 LP 添加了新的流动性,那么计算结果将会是一个过时的结果,这显然是不可接受的。

但是这种架构也有明显的局限性,那就是 TPS 的瓶颈,而这个瓶颈在当前多核处理器下显得很老旧,就像你用一个最新的 PC 去玩一些老的电脑游戏,比如红警,当作战单位多到一定数量后,依然会发现卡的不行,这就是软件架构的问题。

你可能会听到一些协议已经在关注这个问题,并提出了自己的并行方案,以当前号称 TPS 最高的 Solana 为例,也具备并行执行的能力。只不过其设计思路与 TON 不同,在 Solana 中,其核心思想是将所有交易按照执行依赖关系分为几组,不同组之间不共享任何状态数据。即不存在相同的依赖,这样不同组内的交易就可以并行执行而不用担心出现冲突的情况,而对于同组内的交易,则还是沿用传统的串行方式执行。

而在 TON 中,其完全舍弃了串行执行的架构,转而采用了一个专为并行而生的开发范式,Actor 模型来重构执行环境。所谓 Actor 模型是由 Carl Hewitt 在 1973 年首次提出,目的是通过消息传递来解决传统并发程序中共享状态的复杂性问题。每个 Actor 都有自己的私有状态和行为,且与其他 Actor 之间不共享任何状态信息。Actor 模型是一种并发计算的计算模型,它通过消息传递来实现并行计算。在这个模型中,"Actor"是基本的工作单元,它能够处理接收的消息、创建新的 Actor、发送更多消息、决定如何响应接下来的消息。Actor 模型需要具备以下几个特性:

  • 封装和独立性:每个 Actor 在处理消息时都是完全独立的,可以并行处理消息而不会互相干扰。

  • 消息传递:Actor 之间仅通过发送和接收消息进行交互,消息传递是异步的。

  • 动态结构:Actor 可以在运行时创建更多的 Actor,这种动态性使得 Actor 模型能够根据需要扩展系统。

TON 采用了这个架构,来设计智能合约模型,这就意味着在 TON 中,每个智能合约都是一个 Actor 模型,其具备完全独立的存储空间。因为不依赖任何外部数据。除此之外,对同一个智能合约的调用还是按照接收队列中消息的排序进行执行,因此 TON 中的交易将可以被高效的并行执行,而不需要担心冲突问题。

然而这样的设计方案也带来了一些全新的影响,对于 DApp 开发者来说,其习惯的开发范式将被打破,具体如下:

  1. 智能合约之间的异步调用
    在 TON 的智能合约内部是无法原子性的调用外部合约或访问外部合约数据的,我们知道在 Solidity 中,合约 A 的 function 1 中调用合约 B 的 function 2 ,或者通过合约 C 的只读 functio n3 访问某状态数据,整个过程是原子性的,在一笔交易中被执行,这是一件非常容易的事情,然而在 TON 中,这将不可能实现,任何与外部智能合约的交互都将通过打包新的交易异步执行,这种由智能合约发起的交易也被称为内部消息。且执行过程中无法阻塞以获得执行结果。

例如我们开发一个 DEX,如果采用 EVM 中常见的范式,通常会有一个统一的 router 合约用于管理交易路由,而每个 Pool 都单独管理某个交易对相关的 LP 数据,那么假设当前有两个池子 USDT-DAI 和 DAI-ETH。当用户希望通过 USDT 直接购买 ETH,就可以通过 router 合约在一笔交易中顺序请求这两个池子,完成原子性交易。然而在 TON 中就没有这么容易实现了,需要思考新的开发范式,若仍然复用该该范式的话,那信息流可能是这样的,这个请求将伴随一个由用户发起的 external message 和三个 internal messages 完成(注意这是用于说明差异性的,真实的开发中甚至连 ERC 20 的范式也要重新设计)。

  1. 需要仔细考虑跨合约调用时出现执行错误情况的处理流程,为每个合约间调用设计相应的弹回(bounce)函数。
    我们知道在主流的 EVM 中,当交易执行时遇到问题时,整个交易将会被回滚,即被重置到执行最初时的状态。这在串行单线程模型中是容易理解的。然而在 TON 中,由于合约间调用采用了异步的方式执行,即使后续某环节出错,由于前面已经被成功执行的交易已经被执行并确认,这就有可能造成问题。因此 TON 中设置了一种特殊的消息类型,叫做弹回消息,即当某内部消息触发的后续执行过程出现错误时,被触发合约可以通过触发合约预留的弹回函数将触发合约中的某些状态重置。

  2. 在某些复杂情况下,先被接收的交易不一定先被执行完毕,因此不可以预设这种时序关系。
    在这样一个异步和并行智能合约调用的系统中,定义处理操作顺序可能很难。这就是为什么 TON 中的每个消息都有它的逻辑时间 Lamport time(后面简称 lt)。它用于理解哪个事件引发了另一个以及验证者首先需要处理什么。对于一个简单的模型,先被接收的交易一定先被执行完成。


在这个模型中,A 和 B 分别表示两个智能合约,则有如果 msg 1 _lt < msg 2 _lt,则 tx 1 _lt < tx 2 _lt 的时序关系。

然而在较为复杂的情况下,这个规则就会被打破。在官方文档中有这样的例子,假设我们有三个合约 A、B 和 C。在一笔交易中,A 发送两个内部消息 msg 1 和 msg 2 :一个给 B,另一个给 C。尽管它们是按确切顺序创建的(先 msg 1 ,然后是 msg 2),但我们无法确定 msg 1 将在 msg 2 之前被处理。这是因为从 A 到 B 和从 A 到 C 的路由可能在长度和验证者集中有所不同。如果这些合约位于不同的分片链中,其中一条消息可能需要几个区块才能到达目标合约。即我们有两种可能的交易路径,如图所示。

  1. 在 TON 中,其智能合约的持久化存储采用了一个以 Cell 为单元的有向无环图作为数据结构,
    数据将按照编码规则紧凑的压缩为一个 Cell,同时按照有向无环图的方式向下延伸,这与 EVM 中状态数据基于 hashmap 的结构组织不同,由于数据请求算法的不同,TON 中为不同深度的数据处理设置了不同的 Gas 价格,越深的 Cell 数据处理所需要的 Gas 越高,因此在 TON 中存在一种 DOS 攻击的范式,即某些恶意用户通过发送大量垃圾消息占用某个智能合约中所有的浅层 Cell,这就意味着诚实用户的存储成本将越来越高。而在 EVM 中,由于 hashmap 的查询复杂度为 o( 1),因此有着相同的 Gas,不会有类似问题。所以 TON Dapp 开发者应该尽量避免智能合约中出现无界数据类型。当出现无界数据类型时,应通过分片的方式将其打散。

  2. 还有一些特征则不那么特殊了,例如智能合约需要为存储支付租金,在 TON 中智能合约天然是可升级的,以及原生的抽象账户功能,即在 TON 中所有钱包地址均为智能合约,只是未被初始化等,这些需要开发者小心留意。

转载:https://www.chaincatcher.com/article/2128150

其它:https://github.com/UnsignedInt8/TON/tree/master/TON

Arbitrum Sequencer配置参数分析

参数 类型 默认值
enable bool false 这个参数决定了 Sequencer 是否处于激活状态,从而能够处理交易和生成区块
max-block-speed time 250 Millisecond 控制 Sequencer 生成区块的最大速度。这有助于防止系统过载、优化性能、管理资源,并减少交易处理的延迟。
max-revert-gas-reject uint64 0 控制在处理交易时的 gas 使用限制。这个参数的主要作用是防止过度消耗 gas 的情况,特别是在交易失败或回滚的情况下
max-acceptable-timestamp-delta time 1 Hour 控制 Sequencer 处理区块和交易时的时间戳容忍度
sender-whitelist string 用于控制哪些地址可以向 Sequencer 发送交易
forwarder connection-timeout time 30 Second 用于设置 Forwarder 连接到其他节点(包括 Sequencer 或 Layer 1 节点)的超时时间
idle-connection-timeout time 60 Second 设置 Forwarder 在连接保持空闲状态时的超时时间。这一配置参数控制了连接在没有活动数据传输时可以保持的最大时间长度
max-idle-connections int 100 Forwarder 能够保持的最大空闲连接数。这个参数控制了 Forwarder 在没有活跃数据传输的情况下允许保留的空闲连接的数量
redis-url string “” 用于配置 Forwarder 与 Redis 数据库的连接
update-interval time 1 Second 用于设置 Forwarder 更新其内部状态或数据的时间间隔
retry-interval time 100 Millisecond 设置 Forwarder 在遇到失败或错误时进行重试的时间间隔
queue-size int 1024 用于设置 Sequencer 接收和处理交易的队列大小。这个参数定义了 Sequencer 在内存中能够存储的交易队列的最大数量
queue-timeout time 12 Second 用于设置交易在队列中等待处理的最大时间。这个参数定义了交易在被 Sequencer 处理之前可以在队列中停留的最长时间
nonce-cache-size int 1024 用于设置 Sequencer 缓存的 nonce 值的数量
max-tx-data-size int 95000 用于控制 Sequencer 处理的单笔交易数据的最大大小。优化资源使用、提高系统稳定性、保护系统性能,并防止系统资源滥用
nonce-failure-cache-size int 1024 用于设置 Sequencer 缓存因 nonce 问题而失败的交易数量的最大值
nonce-failure-cache-expiry time 1 Second 用于设置缓存因 nonce 问题而失败的交易的过期时间
expected-surplus-soft-threshold string default 指定了Sequencer在处理交易时的软阈值,表示在网络中预期的剩余交易量的一个阈值,如果交易池中的交易数量低于这个阈值,Sequencer可以更自由地处理和确认交易;而当交易池中的交易量高于这个阈值时,Sequencer可能会采取一些措施来限制处理交易的速度,避免网络拥堵
expected-surplus-hard-threshold string default 指定了Sequencer在交易池中的硬性阈值。当交易池中的交易数量超过这个阈值时,Sequencer会采取更严格的措施来控制交易处理速度,以防止网络过载。这与expected-surplus-soft-threshold不同,后者是一个软性阈值,通常用于更灵活的流量管理,而硬性阈值则是更严格的限制。
enable-profiling bool false enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据。这些数据包括处理交易的时间、资源使用情况、网络延迟等信息。启用性能分析可以帮助开发者和运维团队了解Sequencer的运行状况、识别性能瓶颈,并优化系统以提高效率和稳定性。启用性能分析通常在调试和优化阶段是非常有用的,但在生产环境中,可能会因为性能开销而选择禁用它,除非需要深入的性能数据。

性能提升

  1. Forwarder 可以利用 Redis 的高性能缓存和数据存储功能,优化数据访问速度、支持异步处理和队列管理,从而提高系统的整体性能和效率
  2. queue-size 是 Arbitrum Nitro 中 SequencerConfig 的一个关键参数,用于控制 Sequencer 的交易队列大小。合理配置 queue-size 可以帮助优化交易处理能力、管理内存使用、应对高峰负载,并减少交易丢失的风险
  3. 合理配置 queue-timeout 可以优化交易处理效率、管理队列负载、避免交易过期,并提高系统的整体性能和响应速度。
  4. nonce-cache-size 是 Arbitrum Nitro 中 SequencerConfig 的一个重要参数,用于控制 Sequencer 缓存的 nonce 值的数量。合理配置此参数可以优化交易处理、提高系统性能、管理内存使用,并支持高吞吐量环境中的稳定运行
  5. 设置合理的 nonce-failure-cache-size 可以提高交易处理的效率,减少因 nonce 问题导致的交易重复处理时间

Arbitrum Sequencer 交易接收->区块打包逻辑

代码跟进

go-ethereum/internal/ethapi/api.go

func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
    // If the transaction fee cap is already specified, ensure the
    // fee of the given transaction is _reasonable_.
    if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
       return common.Hash{}, err
    }
    if !b.UnprotectedAllowed() && !tx.Protected() {
       // Ensure only eip155 signed transactions are submitted if EIP155Required is set.
       return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
    }
    if err := b.SendTx(ctx, tx); err != nil {
       return common.Hash{}, err
    }
    // Print a log with full tx details for manual investigations and interventions
    head := b.CurrentBlock()
    signer := types.MakeSigner(b.ChainConfig(), head.Number, head.Time)
    from, err := types.Sender(signer, tx)
    if err != nil {
       return common.Hash{}, err
    }

    if tx.To() == nil {
       addr := crypto.CreateAddress(from, tx.Nonce())
       log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
    } else {
       log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
    }
    return tx.Hash(), nil
}
// Transaction pool API
func (a *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
    return a.b.EnqueueL2Message(ctx, signedTx, nil)
}

go-ethereum/arbitrum/backend.go

func (b *Backend) EnqueueL2Message(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    return b.arb.PublishTransaction(ctx, tx, options)
}

execution/gethexec/arb_interface.go

func (a *ArbInterface) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    return a.txPublisher.PublishTransaction(ctx, tx, options)
}

execution/gethexec/tx_pre_checker.go

func (c *TxPreChecker) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    block := c.bc.CurrentBlock()
    statedb, err := c.bc.StateAt(block.Root)
    if err != nil {
       return err
    }
    arbos, err := arbosState.OpenSystemArbosState(statedb, nil, true)
    if err != nil {
       return err
    }
    err = PreCheckTx(c.bc, c.bc.Config(), block, statedb, arbos, tx, options, c.config())
    if err != nil {
       return err
    }
    return c.TransactionPublisher.PublishTransaction(ctx, tx, options)
}

execution/gethexec/sequencer.go

func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    config := s.config()
    // Only try to acquire Rlock and check for hard threshold if l1reader is not nil
    // And hard threshold was enabled, this prevents spamming of read locks when not needed
    if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
       s.expectedSurplusMutex.RLock()
       if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
          return errors.New("currently not accepting transactions due to expected surplus being below threshold")
       }
       s.expectedSurplusMutex.RUnlock()
    }

    sequencerBacklogGauge.Inc(1)
    defer sequencerBacklogGauge.Dec(1)

    _, forwarder := s.GetPauseAndForwarder()
    if forwarder != nil {
       err := forwarder.PublishTransaction(parentCtx, tx, options)
       if !errors.Is(err, ErrNoSequencer) {
          return err
       }
    }

    if len(s.senderWhitelist) > 0 {
       signer := types.LatestSigner(s.execEngine.bc.Config())
       sender, err := types.Sender(signer, tx)
       if err != nil {
          return err
       }
       _, authorized := s.senderWhitelist[sender]
       if !authorized {
          return errors.New("transaction sender is not on the whitelist")
       }
    }
    if tx.Type() >= types.ArbitrumDepositTxType || tx.Type() == types.BlobTxType {
       // Should be unreachable for Arbitrum types due to UnmarshalBinary not accepting Arbitrum internal txs
       // and we want to disallow BlobTxType since Arbitrum doesn't support EIP-4844 txs yet.
       return types.ErrTxTypeNotSupported
    }

    txBytes, err := tx.MarshalBinary()
    if err != nil {
       return err
    }

    queueTimeout := config.QueueTimeout
    queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
    defer cancelFunc()

    // Just to be safe, make sure we don't run over twice the queue timeout
    abortCtx, cancel := ctxWithTimeout(parentCtx, queueTimeout*2)
    defer cancel()

    resultChan := make(chan error, 1)
    queueItem := txQueueItem{
       tx,
       len(txBytes),
       options,
       resultChan,
       &atomic.Bool{},
       queueCtx,
       time.Now(),
    }
    select {
    case s.txQueue <- queueItem: // 交易进入交易队列
    case <-queueCtx.Done():
       return queueCtx.Err()
    }

    select {
    case res := <-resultChan:
       return res
    case <-abortCtx.Done():
       // We use abortCtx here and not queueCtx, because the QueueTimeout only applies to the background queue.
       // We want to give the background queue as much time as possible to make a response.
       err := abortCtx.Err()
       if parentCtx.Err() == nil {
          // If we've hit the abort deadline (as opposed to parentCtx being canceled), something went wrong.
          log.Warn("Transaction sequencing hit abort deadline", "err", err, "submittedAt", queueItem.firstAppearance, "queueTimeout", queueTimeout, "txHash", tx.Hash())
       }
       return err
    }
}
func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
    var queueItems []txQueueItem
    var totalBlockSize int

    defer func() {
       panicErr := recover()
       if panicErr != nil {
          log.Error("sequencer block creation panicked", "panic", panicErr, "backtrace", string(debug.Stack()))
          // Return an internal error to any queue items we were trying to process
          for _, item := range queueItems {
             // This can race, but that's alright, worst case is a log line in returnResult
             if !item.returnedResult.Load() {
                item.returnResult(sequencerInternalError)
             }
          }
          // Wait for the MaxBlockSpeed until attempting to create a block again
          returnValue = true
       }
    }()
    defer nonceFailureCacheSizeGauge.Update(int64(s.nonceFailures.Len()))

    config := s.config()

    // Clear out old nonceFailures
    s.nonceFailures.Resize(config.NonceFailureCacheSize)
    nextNonceExpiryTimer := s.expireNonceFailures()
    defer func() {
       // We wrap this in a closure as to not cache the current value of nextNonceExpiryTimer
       if nextNonceExpiryTimer != nil {
          nextNonceExpiryTimer.Stop()
       }
    }()

    for {
       var queueItem txQueueItem
       if s.txRetryQueue.Len() > 0 { // 优先把队列需要重试的交易拿出来
          queueItem = s.txRetryQueue.Pop()
       } else if len(queueItems) == 0 { // 当交易池
          var nextNonceExpiryChan <-chan time.Time
          if nextNonceExpiryTimer != nil {
             nextNonceExpiryChan = nextNonceExpiryTimer.C
          }
          select {
          case queueItem = <-s.txQueue:
          case <-nextNonceExpiryChan:
             // No need to stop the previous timer since it already elapsed
             nextNonceExpiryTimer = s.expireNonceFailures()
             continue
          case <-s.onForwarderSet:
             // Make sure this notification isn't outdated
             _, forwarder := s.GetPauseAndForwarder()
             if forwarder != nil {
                s.nonceFailures.Clear()
             }
             continue
          case <-ctx.Done():
             return false
          }
       } else {
          done := false
          select {
          case queueItem = <-s.txQueue:
          default:
             done = true
          }
          if done {
             break
          }
       }
       err := queueItem.ctx.Err()
       if err != nil {
          queueItem.returnResult(err)
          continue
       }
       if queueItem.txSize > config.MaxTxDataSize { // 超过设置限制的将被丢弃
          // This tx is too large
          queueItem.returnResult(txpool.ErrOversizedData)
          continue
       }
       if totalBlockSize+queueItem.txSize > config.MaxTxDataSize {
          // 此交易太大,无法添加到此批次
          s.txRetryQueue.Push(queueItem)
          // 在这里结束批处理,将此交易放入下一个交易中
          break
       }
       totalBlockSize += queueItem.txSize
       queueItems = append(queueItems, queueItem) // 交易加入当前批次队列
    }

    s.nonceCache.Resize(config.NonceCacheSize) // Would probably be better in a config hook but this is basically free
    s.nonceCache.BeginNewBlock()
    queueItems = s.precheckNonces(queueItems, totalBlockSize)
    txes := make([]*types.Transaction, len(queueItems))
    hooks := s.makeSequencingHooks()
    hooks.ConditionalOptionsForTx = make([]*arbitrum_types.ConditionalOptions, len(queueItems))
    totalBlockSize = 0 // 重新计算总块大小以进行二次检查
    for i, queueItem := range queueItems {
       txes[i] = queueItem.tx
       totalBlockSize = arbmath.SaturatingAdd(totalBlockSize, queueItem.txSize)
       hooks.ConditionalOptionsForTx[i] = queueItem.options
    }

    if totalBlockSize > config.MaxTxDataSize {// 如果超过,则当前批次整体进入下一轮重新计算
       for _, queueItem := range queueItems {
          s.txRetryQueue.Push(queueItem)
       }
       log.Error(
          "put too many transactions in a block",
          "numTxes", len(queueItems),
          "totalBlockSize", totalBlockSize,
          "maxTxDataSize", config.MaxTxDataSize,
       )
       return false
    }

    if s.handleInactive(ctx, queueItems) {
       return false
    }

    timestamp := time.Now().Unix()
    s.L1BlockAndTimeMutex.Lock()
    l1Block := s.l1BlockNumber
    l1Timestamp := s.l1Timestamp
    s.L1BlockAndTimeMutex.Unlock()

    if s.l1Reader != nil && (l1Block == 0 || math.Abs(float64(l1Timestamp)-float64(timestamp)) > config.MaxAcceptableTimestampDelta.Seconds()) {
       for _, queueItem := range queueItems {
          s.txRetryQueue.Push(queueItem)
       }
       log.Error(
          "cannot sequence: unknown L1 block or L1 timestamp too far from local clock time",
          "l1Block", l1Block,
          "l1Timestamp", time.Unix(int64(l1Timestamp), 0),
          "localTimestamp", time.Unix(int64(timestamp), 0),
       )
       return true
    }

    header := &arbostypes.L1IncomingMessageHeader{
       Kind:        arbostypes.L1MessageType_L2Message,
       Poster:      l1pricing.BatchPosterAddress,
       BlockNumber: l1Block,
       Timestamp:   uint64(timestamp),
       RequestId:   nil,
       L1BaseFee:   nil,
    }

    start := time.Now()
    var (
       block *types.Block
       err   error
    )
    if config.EnableProfiling {// 当enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据
       block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
    } else {
       block, err = s.execEngine.SequenceTransactions(header, txes, hooks) // 生产环境
    }
    elapsed := time.Since(start)
    blockCreationTimer.Update(elapsed)
    if elapsed >= time.Second*5 {
       var blockNum *big.Int
       if block != nil {
          blockNum = block.Number()
       }
       log.Warn("took over 5 seconds to sequence a block", "elapsed", elapsed, "numTxes", len(txes), "success", block != nil, "l2Block", blockNum)
    }
    if err == nil && len(hooks.TxErrors) != len(txes) {
       err = fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
    }
    if errors.Is(err, execution.ErrRetrySequencer) {
       log.Warn("error sequencing transactions", "err", err)
       // we changed roles
       // forward if we have where to
       if s.handleInactive(ctx, queueItems) {
          return false
       }
       // try to add back to queue otherwise
       for _, item := range queueItems {
          s.txRetryQueue.Push(item)
       }
       return false
    }
    if err != nil {
       if errors.Is(err, context.Canceled) {
          // thread closed. We'll later try to forward these messages.
          for _, item := range queueItems {
             s.txRetryQueue.Push(item)
          }
          return true // don't return failure to avoid retrying immediately
       }
       log.Error("error sequencing transactions", "err", err)
       for _, queueItem := range queueItems {
          queueItem.returnResult(err)
       }
       return false
    }

    if block != nil {
       successfulBlocksCounter.Inc(1)
       s.nonceCache.Finalize(block)
    }

    madeBlock := false
    for i, err := range hooks.TxErrors {
       if err == nil {
          madeBlock = true
       }
       queueItem := queueItems[i]
       if errors.Is(err, core.ErrGasLimitReached) {
          // 该区块中剩余的 Gas 不足以完成此项交易。
          if madeBlock {
             // 该块中已经有一个较早的交易;请在新的块中重试。
             s.txRetryQueue.Push(queueItem)
             continue
          }
       }
       if errors.Is(err, core.ErrIntrinsicGas) {
          // 删除附加信息,因为由于 L1 数据气体正确。
          err = core.ErrIntrinsicGas
       }
       var nonceError NonceError
       if errors.As(err, &nonceError) && nonceError.txNonce > nonceError.stateNonce {
          s.nonceFailures.Add(nonceError, queueItem)
          continue
       }
       queueItem.returnResult(err)
    }
    return madeBlock
}

execution/gethexec/executionengine.go

func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
    return s.sequencerWrapper(func() (*types.Block, error) {
       hooks.TxErrors = nil
       return s.sequenceTransactionsWithBlockMutex(header, txes, hooks)
    })
}
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
    lastBlockHeader, err := s.getCurrentHeader()
    if err != nil {
       return nil, err
    }

    statedb, err := s.bc.StateAt(lastBlockHeader.Root)
    if err != nil {
       return nil, err
    }

    delayedMessagesRead := lastBlockHeader.Nonce.Uint64()

    startTime := time.Now()
    block, receipts, err := arbos.ProduceBlockAdvanced(
       header,
       txes,
       delayedMessagesRead,
       lastBlockHeader,
       statedb,
       s.bc,
       s.bc.Config(),
       hooks,
       false,
    )
    if err != nil {
       return nil, err
    }
    blockCalcTime := time.Since(startTime)
    if len(hooks.TxErrors) != len(txes) {
       return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
    }

    if len(receipts) == 0 {
       return nil, nil
    }

    allTxsErrored := true
    for _, err := range hooks.TxErrors {
       if err == nil {
          allTxsErrored = false
          break
       }
    }
    if allTxsErrored {
       return nil, nil
    }

    msg, err := MessageFromTxes(header, txes, hooks.TxErrors)
    if err != nil {
       return nil, err
    }

    pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
    if err != nil {
       return nil, err
    }

    msgWithMeta := arbostypes.MessageWithMetadata{
       Message:             msg,
       DelayedMessagesRead: delayedMessagesRead,
    }
    msgResult, err := s.resultFromHeader(block.Header())
    if err != nil {
       return nil, err
    }

    err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, *msgResult)
    if err != nil {
       return nil, err
    }

    // 仅在我们写入消息后才写入块,因此如果节点在此过程中死亡,
    // 它将通过重新生成丢失的块在启动时自然恢复。
    err = s.appendBlock(block, statedb, receipts, blockCalcTime) 
    if err != nil {
       return nil, err
    }
    s.cacheL1PriceDataOfMsg(pos, receipts, block, false)

    return block, nil
}

交易执行顺序

Arbitrum 的交易处理方式与传统的以太坊链有所不同。Arbitrum 采用了“先到先得”的交易处理顺序,因此在其体系结构中并没有传统意义上的内存池(mempool)。

由于交易是按照序列器接收的顺序进行处理的,因此 Arbitrum 交易不需要优先费;如果交易确实包含优先费,则该费用将在执行结束时退还到交易的原始地址。

注:因为Arbitrum 需要更加有效的交易传播机制,所以生产部署配置和网络,要确保所有外部节点与主节点,尽量保证最短路径,之间的网络要稳定并且延迟尽量低,保证主节点尽快收到并处理交易。

交易价格

Arbitrum 上没有内存池的概念,交易由 Sequencer 按照先到先得的原则处理。因此,gas 价格竞标参数不会影响交易的处理顺序。

因为Arbitrum链上 gasprice 无法影响执行优先级,所以用户没有主动提价的动机,

交易收取的总费用是 L2 基础费用乘以所用 L2 gas 加上 L1 调用数据费用之和

Arbitrum ForwardingTarget 配置参数分析

介绍

对于ForwardingTarget有两个相关参数

参数 类型 介绍
forwarding-target string 交易转发目标 URL,或“null”以禁用转发(当且仅当不是序列器)
secondary-forwarding-target []string 次要交易转发目标 URL

参数验证规则

func (c *Config) Validate() error {
    if err := c.Sequencer.Validate(); err != nil {
       return err
    }
    if !c.Sequencer.Enable && c.ForwardingTarget == "" {
       return errors.New("ForwardingTarget not set and not sequencer (can use \"null\")")
    }
    if c.ForwardingTarget == "null" {
       c.forwardingTarget = ""
    } else {
       c.forwardingTarget = c.ForwardingTarget
    }
    if c.forwardingTarget != "" && c.Sequencer.Enable {
       return errors.New("ForwardingTarget set and sequencer enabled")
    }
    return nil
}

使用场景

  1. Sequencer.Enable == true时,forwardingTarget 必须为空,即不转发交易
  2. Sequencer.Enable != true 时,ForwardingTarget 可以设置为某个接收转发的RPC, 或者设置为null 即不转发交易只查询,可用于 ReadOnly节点

逻辑分析

func CreateExecutionNode(
    ctx context.Context,
    stack *node.Node,
    chainDB ethdb.Database,
    l2BlockChain *core.BlockChain,
    l1client arbutil.L1Interface,
    configFetcher ConfigFetcher,
) (*ExecutionNode, error) {
    ...
    if config.Sequencer.Enable {
        seqConfigFetcher := func() *SequencerConfig { return &configFetcher().Sequencer }
        sequencer, err = NewSequencer(execEngine, parentChainReader, seqConfigFetcher)
        if err != nil {
           return nil, err
        }
        txPublisher = sequencer
    } else {
        if config.Forwarder.RedisUrl != "" {
           txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
        } else if config.forwardingTarget == "" {
           txPublisher = NewTxDropper()
        } else {
           targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
           txPublisher = NewForwarder(targets, &config.Forwarder)
        }
    }
    ...
}

Sequencer.Enable == false时

  1. Forwarder.RedisUrl不为空,则使用NewRedisTxForwarder,并仅使用forwardingTarget
  2. 当config.forwardingTarget为空时,即不转发交易,使用NewTxDropper
  3. Else, 同时使用forwardingTarget,SecondaryForwardingTarget
    1. 两者
func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    if !f.enabled.Load() {
       return ErrNoSequencer
    }
    ctx, cancelFunc := f.ctxWithTimeout()
    defer cancelFunc()
    for pos, rpcClient := range f.rpcClients {
       var err error
       if options == nil {
          err = f.ethClients[pos].SendTransaction(ctx, tx)
       } else {
          err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
       }
       if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
          return err
       }
       log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
    }
    return errors.New("failed to publish transaction to any of the forwarding targets")
}
// CheckHealth returns health of the highest priority forwarding target
func (f *TxForwarder) CheckHealth(inctx context.Context) error {
    // If f.enabled is true, len(f.rpcClients) should always be greater than zero,
    // but better safe than sorry.
    if !f.enabled.Load() || len(f.rpcClients) == 0 {
       return ErrNoSequencer
    }
    f.healthMutex.Lock()
    defer f.healthMutex.Unlock()
    if time.Since(f.healthChecked) > cacheUpstreamHealth {
       timeout := f.timeout
       if timeout == time.Duration(0) || timeout >= maxHealthTimeout {
          timeout = maxHealthTimeout
       }
       ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
       defer cancelFunc()
       f.healthErr = f.rpcClients[0].CallContext(ctx, nil, "arb_checkPublisherHealth")
       f.healthChecked = time.Now()
    }
    return f.healthErr
}

初始化

func (f *TxForwarder) Initialize(inctx context.Context) error {
    if f.ctx == nil {
       f.ctx = inctx
    }
    ctx, cancelFunc := f.ctxWithTimeout()
    defer cancelFunc()
    var targets []string
    var lastError error
    for _, target := range f.targets {
       if target == "" {
          continue
       }
       rpcClient, err := rpc.DialTransport(ctx, target, f.transport)
       if err != nil {
          log.Warn("error initializing a forwarding client in txForwarder", "forwarding url", target, "err", err)
          lastError = err
          continue
       }
       targets = append(targets, target)
       ethClient := ethclient.NewClient(rpcClient)
       f.rpcClients = append(f.rpcClients, rpcClient)
       f.ethClients = append(f.ethClients, ethClient)
    }
    f.targets = targets
    if len(f.rpcClients) > 0 {
       f.enabled.Store(true)
    } else {
       return lastError
    }
    return nil
}

会遍历所有的targets

区别

根据代码分析,

  • 启用Forwarder.RedisUrl时,仅使用forwardingTarget
  • 当config.forwardingTarget不为空时,forwarding-targetsecondary-forwarding-target同时叠加使用

部署优化

  • 将节点拓扑树形化,减少子叶节点与Sequencer传输距离
  • 尽可能多的覆盖同级子叶节点
  • 防止子叶节点不同层级内循环传播

TODO

继续跟进Forwarder.RedisUrl

if config.Forwarder.RedisUrl != "" {
    txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
} else if config.forwardingTarget == "" {
    txPublisher = NewTxDropper()
} else {
    targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
    txPublisher = NewForwarder(targets, &config.Forwarder)
}

NewRedisTxForwarder看位置,应该是推荐方式?相比NewForwarder性能区别是什么?Redis 共享数据加速?

// TODO 空闲再继续

深入OpStack,提现,储值的处理逻辑

OpStack 各个角色

  • op-node 负责和op-geth交易打包落块,交易状态推导,数据传输同步的客户端
  • batcher 将数据同步到L1的EOA账户
  • op-processer 提交区块状态到 L1 的 L2OutputOracle 合约
  • crossDomainMessagener 跨链信使合约,负责L1->L2,L2->L1的通信
  • OptimismPortal 是 op-stack 的充值提现纽带合约
  • Bridges 桥合约,主要功能是承载充值提现
  • L2OutputOracle 在L1层接收L2过来的状态根的合约

L2->L1 提现逻辑

提现的核心步骤

  1. 第一步 用户在L2层调用withdraw给自己地址提币
  2. 第二步 业务逻辑在L2合约层进行处理,中间会经过以下几个合约和步骤
    1.首先会在L2StandradBridge上面执行call_initiateWithdrawal。根据ETH/ERC20
    2.如果提现的是ETH,则会调用CrossDomainMessenger的sendMessage方法,将msgNonce+1,并在方法体内部调用L2CrossDomainMessenger的_sendMessage方法
    3.L2CrossDomainMessenger的_sendMessage 会调用L2ToL1MessagePasser的initateWithdrawal。构造出withdrawalHash,并维护msgNonce自增为1。完事发送事件
  3. 第三步 sequencer中的op-node 监听到交易事件,将事件打包成交易 (此步在链下处理)
  4. 第四步 Op-batch负责发打包好的交易rollup到L1里面,Op-proposer负责将这批次的状态根stateroot提交到L1
  5. 第五步 用户在L1提取资金(但是要注意的是,需要在挑战期过后才能提取),可以使用op-stack-SDK。它内部的逻辑会调用L1层的OptimismPortal来提取资金。

L2链层源码

function _initiateWithdrawal(
    address _l2Token,
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    if (_l2Token == Predeploys.LEGACY_ERC20_ETH) {  // 判断是否是ETH
        _initiateBridgeETH(_from, _to, _amount, _minGasLimit, _extraData);
    } else {
        address l1Token = OptimismMintableERC20(_l2Token).l1Token();  //属于ERC20
        _initiateBridgeERC20(_l2Token, l1Token, _from, _to, _amount, _minGasLimit, _extraData);
    }
}

执行父类的方法,_initiateBridgeETH

function _initiateBridgeETH(
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    require(isCustomGasToken() == false, "StandardBridge: cannot bridge ETH with custom gas token");
    require(msg.value == _amount, "StandardBridge: bridging ETH must include sufficient ETH value");

    _emitETHBridgeInitiated(_from, _to, _amount, _extraData);

    messenger.sendMessage{ value: _amount }({
        _target: address(otherBridge),
        _message: abi.encodeWithSelector(this.finalizeBridgeETH.selector, _from, _to, _amount, _extraData),
        _minGasLimit: _minGasLimit
    });
}

此时,方法进入到CrossDomainMessenger

function sendMessage(address _target, bytes calldata _message, uint32 _minGasLimit) external payable {
    if (isCustomGasToken()) {
        require(msg.value == 0, "CrossDomainMessenger: cannot send value with custom gas token");
    }

    _sendMessage({
        _to: address(otherMessenger),
        _gasLimit: baseGas(_message, _minGasLimit),
        _value: msg.value,
        _data: abi.encodeWithSelector(
            this.relayMessage.selector, messageNonce(), msg.sender, _target, msg.value, _minGasLimit, _message
        )
    });

    emit SentMessage(_target, msg.sender, _message, messageNonce(), _minGasLimit);
    emit SentMessageExtension1(msg.sender, msg.value);

    unchecked {
        ++msgNonce;
    }
}

在调用_sendMessage 的时候,执行的是子类的_sendMessage

function _sendMessage(address _to, uint64 _gasLimit, uint256 _value, bytes memory _data) internal override {
    IL2ToL1MessagePasser(payable(Predeploys.L2_TO_L1_MESSAGE_PASSER)).initiateWithdrawal{ value: _value }(
        _to, _gasLimit, _data
    );
}

最终执行的是L2ToL1MessagePasser,逻辑是将执行交易的参数打包成hash,并发送事件,到这里,L2层的逻辑已经执行完毕。

function initiateWithdrawal(address _target, uint256 _gasLimit, bytes memory _data) public payable {
    bytes32 withdrawalHash = Hashing.hashWithdrawal(
        Types.WithdrawalTransaction({
            nonce: messageNonce(),
            sender: msg.sender,
            target: _target,
            value: msg.value,
            gasLimit: _gasLimit,
            data: _data
        })
    );

    sentMessages[withdrawalHash] = true;

    emit MessagePassed(messageNonce(), msg.sender, _target, msg.value, _gasLimit, _data, withdrawalHash);

    unchecked {
        ++msgNonce;
    }
}

链下执行逻辑

func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
    // 获取并判断需要提交的最新 L2 的 start 和 end 块号
    start, end, err := l.calculateL2BlockRangeToStore(ctx)
    if err != nil {
       l.Log.Warn("Error calculating L2 block range", "err", err)
       return err
    } else if start.Number >= end.Number {
       return errors.New("start number is >= end number")
    }

    var latestBlock *types.Block
    // 从起始区块开始获取区块信息,并将区块加到 channelManager 的 blocks
    for i := start.Number + 1; i < end.Number+1; i++ {
       //核心逻辑就是 l.loadBlockIntoState
       block, err := l.loadBlockIntoState(ctx, i)
       if errors.Is(err, ErrReorg) {
          l.Log.Warn("Found L2 reorg", "block_number", i)
          l.lastStoredBlock = eth.BlockID{}
          return err
       } else if err != nil {
          l.Log.Warn("Failed to load block into state", "err", err)
          return err
       }
       l.lastStoredBlock = eth.ToBlockID(block)
       latestBlock = block
    }
    // 提取基本的 L2BlockRef 信息
    l2ref, err := derive.L2BlockToBlockRef(l.RollupConfig, latestBlock)
    if err != nil {
       l.Log.Warn("Invalid L2 block loaded into state", "err", err)
       return err
    }
    // 将L2BlockRef 加载到当前状态根中
    l.Metr.RecordL2BlocksLoaded(l2ref)
    return nil
}
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
    // 获取当前 Layer 1 的最新区块(tip)
    l1tip, err := l.l1Tip(ctx)
    if err != nil {
       l.Log.Error("Failed to query L1 tip", "err", err)
       return err
    }
    // 记录当前的 l1tip
    l.recordL1Tip(l1tip)

    // 从状态中获取与当前 L1 tip 相关的交易数据。这一步比较关键,来看一下逻辑
    txdata, err := l.state.TxData(l1tip.ID())

    if err == io.EOF {
       l.Log.Trace("No transaction data available")
       return err
    } else if err != nil {
       l.Log.Error("Unable to get tx data", "err", err)
       return err
    }

    // 发送交易数据到L1  
    if err = l.sendTransaction(txdata, queue, receiptsCh, daGroup); err != nil {
       return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
    }
    return nil
}

获取与当前L1 tip 相关的交易数据

func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 上面的代码逻辑是设置互斥锁
    var firstWithTxData *channel
    // 寻找第一个包含交易数据的通道
    for _, ch := range s.channelQueue {
       if ch.HasTxData() {
          firstWithTxData = ch
          break
       }
    }

    dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
    s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

    // 存在待处理数据或达成短路条件,则调用 nextTxData(firstWithTxData) 返回该通道的交易数据
    if dataPending || s.closed {
       return s.nextTxData(firstWithTxData)
    }

    // 没有待处理数据,我们可以添加一个新块到channel,同时返回一个EOF
    if len(s.blocks) == 0 {
       return txData{}, io.EOF
    }

    // 确保当前有足够的空间处理新块
    if err := s.ensureChannelWithSpace(l1Head); err != nil {
       return txData{}, err
    }

    // 处理待处理的块
    if err := s.processBlocks(); err != nil {
       return txData{}, err
    }

    // 处理完所有待处理的块后,注册当前的 L1 头
    s.registerL1Block(l1Head)

    // 将处理后的数据输出
    if err := s.outputFrames(); err != nil {
       return txData{}, err
    }
    // 返回当前通道的交易数据
    return s.nextTxData(s.currentChannel)
}

op-proposer 逻辑,发送状态根

func (l *L2OutputSubmitter) FetchL2OOOutput(ctx context.Context) (*eth.OutputResponse, bool, error) {
    if l.l2ooContract == nil {
       return nil, false, fmt.Errorf("L2OutputOracle contract not set, cannot fetch next output info")
    }

    cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
    defer cancel()
    callOpts := &bind.CallOpts{
       From:    l.Txmgr.From(),
       Context: cCtx,
    }
    // 获取下一个检查点的区块号
    nextCheckpointBlockBig, err := l.l2ooContract.NextBlockNumber(callOpts)
    if err != nil {
       return nil, false, fmt.Errorf("querying next block number: %w", err)
    }
    nextCheckpointBlock := nextCheckpointBlockBig.Uint64()
    // 方法获取当前区块号
    currentBlockNumber, err := l.FetchCurrentBlockNumber(ctx)
    if err != nil {
       return nil, false, err
    }

    // 对比当前区块号和下一个检查点的区块号,确保不会在未来的时间提交区块
    if currentBlockNumber < nextCheckpointBlock {
       l.Log.Debug("Proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
       return nil, false, nil
    }
    //使用下一个检查点的区块号来获取输出信息
    output, err := l.FetchOutput(ctx, nextCheckpointBlock)
    if err != nil {
       return nil, false, fmt.Errorf("fetching output: %w", err)
    }

    // 检查输出信息的区块引用是否大于最终化的 L2 状态的区块号,且是否允许非最终化的状态
    if output.BlockRef.Number > output.Status.FinalizedL2.Number && (!l.Cfg.AllowNonFinalized || output.BlockRef.Number > output.Status.SafeL2.Number) {
       l.Log.Debug("Not proposing yet, L2 block is not ready for proposal",
          "l2_proposal", output.BlockRef,
          "l2_safe", output.Status.SafeL2,
          "l2_finalized", output.Status.FinalizedL2,
          "allow_non_finalized", l.Cfg.AllowNonFinalized)
       return output, false, nil
    }
    return output, true, nil
}
func (l *L2OutputSubmitter) proposeOutput(ctx context.Context, output *eth.OutputResponse) {
    cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
    defer cancel()
    //  如果上述的检查结果为true,则直接提交状态根transaction
    if err := l.sendTransaction(cCtx, output); err != nil {
       l.Log.Error("Failed to send proposal transaction",
          "err", err,
          "l1blocknum", output.Status.CurrentL1.Number,
          "l1blockhash", output.Status.CurrentL1.Hash,
          "l1head", output.Status.HeadL1.Number)
       return
    }
    l.Metr.RecordL2BlocksProposed(output.BlockRef)
}

至此,链下部分也已经处理完成

L1层的处理

在L2OutputOracle.proposeL2Output方法中

function proposeL2Output(
    bytes32 _outputRoot,
    uint256 _l2BlockNumber,
    bytes32 _l1BlockHash,
    uint256 _l1BlockNumber
)
    external
    payable
{
    // 校验发送人
    require(msg.sender == proposer, "L2OutputOracle: only the proposer address can propose new outputs");
    // 校验下一区块号
    require(
        _l2BlockNumber == nextBlockNumber(),
        "L2OutputOracle: block number must be equal to next expected block number"
    );
    // 校验区块时间
    require(
        computeL2Timestamp(_l2BlockNumber) < block.timestamp,
        "L2OutputOracle: cannot propose L2 output in the future"
    );

    require(_outputRoot != bytes32(0), "L2OutputOracle: L2 output proposal cannot be the zero hash");

    if (_l1BlockHash != bytes32(0)) {
        require(
            blockhash(_l1BlockNumber) == _l1BlockHash,
            "L2OutputOracle: block hash does not match the hash at the expected height"
        );
    }

    emit OutputProposed(_outputRoot, nextOutputIndex(), _l2BlockNumber, block.timestamp);
    // 将对应的状态根方入到l2Outputs中
    l2Outputs.push(
        Types.OutputProposal({
            outputRoot: _outputRoot,
            timestamp: uint128(block.timestamp),
            l2BlockNumber: uint128(_l2BlockNumber)
        })
    );
}

结合着时序图来看一下

L1-> L2 储值逻辑

储值的核心步骤

第一步 用户在L1层发起储值
第二步 用户会在L1链上经历几个核心步骤
1.先进入L1StandardBridge,执行_initiateETHDeposit
2.调用 CrossDomainMessenger 合约的 sendMessage
3.在CrossDomainMessenger.sendMessage 方法中,内部调用L1CrossDomainMessenger的_sendMessage方法,同时维护msgNonce
4.L1CrossDomainMessenger._sendMessage 会抛出TransactionDeposited 事件,至此,L1链执行处理完毕
第三步 链下,op-node监听到TransactionDeposited,构建交易的参数,并让op-geth调用L2StandardBridge的finalizeDeposit
第四步 finalizeDeposit执行完成之后,整个充值链路就完成了。

储值在L1层的源码

function depositETH(uint32 _minGasLimit, bytes calldata _extraData) external payable onlyEOA {
    _initiateETHDeposit(msg.sender, msg.sender, _minGasLimit, _extraData);
}

调用父类StandardBridge

function _initiateBridgeETH(
    address _from,
    address _to,
    uint256 _amount,
    uint32 _minGasLimit,
    bytes memory _extraData
)
    internal
{
    require(isCustomGasToken() == false, "StandardBridge: cannot bridge ETH with custom gas token");
    require(msg.value == _amount, "StandardBridge: bridging ETH must include sufficient ETH value");

    // Emit the correct events. By default this will be _amount, but child
    // contracts may override this function in order to emit legacy events as well.
    _emitETHBridgeInitiated(_from, _to, _amount, _extraData);
    // 发送message信息,进入的是CrossDomainMessenger合约中
    messenger.sendMessage{ value: _amount }({
        _target: address(otherBridge),
        _message: abi.encodeWithSelector(this.finalizeBridgeETH.selector, _from, _to, _amount, _extraData),
        _minGasLimit: _minGasLimit
    });
}

逻辑和提现一致,调用_sendMessage方法,此方法是执行子类L1CrossDomainMessenger的重写方法

function sendMessage(address _target, bytes calldata _message, uint32 _minGasLimit) external payable {
    if (isCustomGasToken()) {
        require(msg.value == 0, "CrossDomainMessenger: cannot send value with custom gas token");
    }

    // Triggers a message to the other messenger. Note that the amount of gas provided to the
    // message is the amount of gas requested by the user PLUS the base gas value. We want to
    // guarantee the property that the call to the target contract will always have at least
    // the minimum gas limit specified by the user.
    _sendMessage({
        _to: address(otherMessenger),
        _gasLimit: baseGas(_message, _minGasLimit),
        _value: msg.value,
        _data: abi.encodeWithSelector(
            this.relayMessage.selector, messageNonce(), msg.sender, _target, msg.value, _minGasLimit, _message
        )
    });

    emit SentMessage(_target, msg.sender, _message, messageNonce(), _minGasLimit);
    emit SentMessageExtension1(msg.sender, msg.value);

    unchecked {
        ++msgNonce;
    }
}
function _sendMessage(address _to, uint64 _gasLimit, uint256 _value, bytes memory _data) internal override {
    portal.depositTransaction{ value: _value }({
        _to: _to,
        _value: _value,
        _gasLimit: _gasLimit,
        _isCreation: false,
        _data: _data
    });
}

进入到OptimismPortal._depositTransaction方法

function _depositTransaction(
    address _to,
    uint256 _mint,
    uint256 _value,
    uint64 _gasLimit,
    bool _isCreation,
    bytes memory _data
)
    internal
{
    if (_isCreation && _to != address(0)) revert BadTarget();

    if (_gasLimit < minimumGasLimit(uint64(_data.length))) revert SmallGasLimit();

    if (_data.length > 120_000) revert LargeCalldata();

    // Transform the from-address to its alias if the caller is a contract.
    address from = msg.sender;
    if (msg.sender != tx.origin) {
        from = AddressAliasHelper.applyL1ToL2Alias(msg.sender);
    }

    // 对交易参数进行打包
    bytes memory opaqueData = abi.encodePacked(_mint, _value, _gasLimit, _isCreation, _data);

    // 发送存款事件
    emit TransactionDeposited(from, _to, DEPOSIT_VERSION, opaqueData);
}

以上,在L1层的存款逻辑处理完毕

链下处理

负责整合来自 L1 的信息、处理存款事务以及确保所有数据在时间和逻辑上的一致性。它确保生成的 L2 区块能够正确反映 L1 的状态

func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) {
    var l1Info eth.BlockInfo
    var depositTxs []hexutil.Bytes
    var seqNumber uint64

    sysConfig, err := ba.l2.SystemConfigByL2Hash(ctx, l2Parent.Hash)
    if err != nil {
       return nil, NewTemporaryError(fmt.Errorf("failed to retrieve L2 parent block: %w", err))
    }

    // If the L1 origin changed in this block, then we are in the first block of the epoch. In this
    // case we need to fetch all transaction receipts from the L1 origin block so we can scan for
    // user deposits.
    if l2Parent.L1Origin.Number != epoch.Number {
       info, receipts, err := ba.l1.FetchReceipts(ctx, epoch.Hash)
       if err != nil {
          return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
       }
       if l2Parent.L1Origin.Hash != info.ParentHash() {
          return nil, NewResetError(
             fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
                epoch, info.ParentHash(), l2Parent.L1Origin))
       }

       deposits, err := DeriveDeposits(receipts, ba.rollupCfg.DepositContractAddress)
       if err != nil {
          // deposits may never be ignored. Failing to process them is a critical error.
          return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err))
       }
       // apply sysCfg changes
       if err := UpdateSystemConfigWithL1Receipts(&sysConfig, receipts, ba.rollupCfg, info.Time()); err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to apply derived L1 sysCfg updates: %w", err))
       }

       l1Info = info
       depositTxs = deposits
       seqNumber = 0
    } else {
       if l2Parent.L1Origin.Hash != epoch.Hash {
          return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin))
       }
       info, err := ba.l1.InfoByHash(ctx, epoch.Hash)
       if err != nil {
          return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err))
       }
       l1Info = info
       depositTxs = nil
       seqNumber = l2Parent.SequenceNumber + 1
    }

    // Sanity check the L1 origin was correctly selected to maintain the time invariant between L1 and L2
    nextL2Time := l2Parent.Time + ba.rollupCfg.BlockTime
    if nextL2Time < l1Info.Time() {
       return nil, NewResetError(fmt.Errorf("cannot build L2 block on top %s for time %d before L1 origin %s at time %d",
          l2Parent, nextL2Time, eth.ToBlockID(l1Info), l1Info.Time()))
    }

    var upgradeTxs []hexutil.Bytes
    if ba.rollupCfg.IsEcotoneActivationBlock(nextL2Time) {
       upgradeTxs, err = EcotoneNetworkUpgradeTransactions()
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to build ecotone network upgrade txs: %w", err))
       }
    }

    if ba.rollupCfg.IsFjordActivationBlock(nextL2Time) {
       fjord, err := FjordNetworkUpgradeTransactions()
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to build fjord network upgrade txs: %w", err))
       }
       upgradeTxs = append(upgradeTxs, fjord...)
    }

    l1InfoTx, err := L1InfoDepositBytes(ba.rollupCfg, sysConfig, seqNumber, l1Info, nextL2Time)
    if err != nil {
       return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err))
    }

    var afterForceIncludeTxs []hexutil.Bytes
    if ba.rollupCfg.IsInterop(nextL2Time) {
       depositsCompleteTx, err := DepositsCompleteBytes(seqNumber, l1Info)
       if err != nil {
          return nil, NewCriticalError(fmt.Errorf("failed to create depositsCompleteTx: %w", err))
       }
       afterForceIncludeTxs = append(afterForceIncludeTxs, depositsCompleteTx)
    }

    txs := make([]hexutil.Bytes, 0, 1+len(depositTxs)+len(afterForceIncludeTxs)+len(upgradeTxs))
    txs = append(txs, l1InfoTx)
    txs = append(txs, depositTxs...)
    txs = append(txs, afterForceIncludeTxs...)
    txs = append(txs, upgradeTxs...)

    var withdrawals *types.Withdrawals
    if ba.rollupCfg.IsCanyon(nextL2Time) {
       withdrawals = &types.Withdrawals{}
    }

    var parentBeaconRoot *common.Hash
    if ba.rollupCfg.IsEcotone(nextL2Time) {
       parentBeaconRoot = l1Info.ParentBeaconRoot()
       if parentBeaconRoot == nil { // default to zero hash if there is no beacon-block-root available
          parentBeaconRoot = new(common.Hash)
       }
    }

    return &eth.PayloadAttributes{
       Timestamp:             hexutil.Uint64(nextL2Time),
       PrevRandao:            eth.Bytes32(l1Info.MixDigest()),
       SuggestedFeeRecipient: predeploys.SequencerFeeVaultAddr,
       Transactions:          txs,
       NoTxPool:              true,
       GasLimit:              (*eth.Uint64Quantity)(&sysConfig.GasLimit),
       Withdrawals:           withdrawals,
       ParentBeaconBlockRoot: parentBeaconRoot,
    }, nil
}

L2层的最终处理逻辑,进行消息的转发

function relayMessage(
    uint256 _nonce,
    address _sender,
    address _target,
    uint256 _value,
    uint256 _minGasLimit,
    bytes calldata _message
)
    external
    payable
{
    // 确保状态不是暂停
    require(paused() == false, "CrossDomainMessenger: paused");

    // 确保版本正确
    (, uint16 version) = Encoding.decodeVersionedNonce(_nonce);
    require(version < 2, "CrossDomainMessenger: only version 0 or 1 messages are supported at this time");

    // 检查该消息是否已经被转发,防止重复转发
    if (version == 0) {
        bytes32 oldHash = Hashing.hashCrossDomainMessageV0(_target, _sender, _message, _nonce);
        require(successfulMessages[oldHash] == false, "CrossDomainMessenger: legacy withdrawal already relayed");
    }

    // 使用版本 1 的哈希作为消息的唯一标识符.
    bytes32 versionedHash =
        Hashing.hashCrossDomainMessageV1(_nonce, _sender, _target, _value, _minGasLimit, _message);

    if (_isOtherMessenger()) {
        // 确保 msg.value 与 _value 匹配
        assert(msg.value == _value);
        assert(!failedMessages[versionedHash]);
    } else {
        require(msg.value == 0, "CrossDomainMessenger: value must be zero unless message is from a system address");

        require(failedMessages[versionedHash], "CrossDomainMessenger: message cannot be replayed");
    }
    // 确保地址安全
    require(
        _isUnsafeTarget(_target) == false, "CrossDomainMessenger: cannot send message to blocked system address"
    );

    require(successfulMessages[versionedHash] == false, "CrossDomainMessenger: message has already been relayed");

    //确保有足够的燃气执行外部调用和完成执行,若不够,则将消息标记为失败
    if (
        !SafeCall.hasMinGas(_minGasLimit, RELAY_RESERVED_GAS + RELAY_GAS_CHECK_BUFFER)
            || xDomainMsgSender != Constants.DEFAULT_L2_SENDER
    ) {
        failedMessages[versionedHash] = true;
        emit FailedRelayedMessage(versionedHash);

        // Revert in this case if the transaction was triggered by the estimation address. This
        // should only be possible during gas estimation or we have bigger problems. Reverting
        // here will make the behavior of gas estimation change such that the gas limit
        // computed will be the amount required to relay the message, even if that amount is
        // greater than the minimum gas limit specified by the user.
        if (tx.origin == Constants.ESTIMATION_ADDRESS) {
            revert("CrossDomainMessenger: failed to relay message");
        }

        return;
    }
    // 最核心的逻辑,执行SafeCall.call来转发执行逻辑
    xDomainMsgSender = _sender;
    bool success = SafeCall.call(_target, gasleft() - RELAY_RESERVED_GAS, _value, _message);
    xDomainMsgSender = Constants.DEFAULT_L2_SENDER;

    // 根据执行结果处理最终的逻辑
    if (success) {
        assert(successfulMessages[versionedHash] == false);
        successfulMessages[versionedHash] = true;
        emit RelayedMessage(versionedHash);
    } else {
        failedMessages[versionedHash] = true;
        emit FailedRelayedMessage(versionedHash);

        if (tx.origin == Constants.ESTIMATION_ADDRESS) {
            revert("CrossDomainMessenger: failed to relay message");
        }
    }
}

时序图

参考文献

https://docs.optimism.io/stack/protocol/rollup/withdrawal-flow
https://learnblockchain.cn/article/9207

转载:https://learnblockchain.cn/article/9419