op-succinct 代码分析- proposer
代码分析
1. 启动
proposer/op/proposer/service.go
func (ps *ProposerService) Start(_ context.Context) error {
ps.Log.Info("Starting Proposer")
return ps.driver.StartL2OutputSubmitting()
}
proposer/op/proposer/driver.go
func (l *L2OutputSubmitter) StartL2OutputSubmitting() error {
...
// 当使用缓存数据库重新启动提议者时,我们需要将处于见证生成状态的所有证明标记为失败,然后重试。
witnessGenReqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusWITNESSGEN)
if err != nil {
return fmt.Errorf("failed to get witness generation pending proofs: %w", err)
}
for _, req := range witnessGenReqs {
err = l.RetryRequest(req, ProofStatusResponse{})
if err != nil {
return fmt.Errorf("failed to retry request: %w", err)
}
}
// 验证合约的聚合和范围验证密钥的配置以及汇总配置哈希。
err = l.ValidateConfig(l.Cfg.L2OutputOracleAddr.Hex()) // 向 l.Cfg.OPSuccinctServerUrl+"/validate_config" 配置的proposer/succinct/bin/server.rs 去请求数据
...
go l.loop() // loop 负责创建和提交下一个输出
2. 循环检查和提交
proposer/op/proposer/driver.go
// loopL2OO 定期轮询 L2OO 以提出下一个区块,如果当前最终确定(或安全)的区块超过了下一个区块,则它会提出该区块。
func (l *L2OutputSubmitter) loopL2OO(ctx context.Context) {
ticker := time.NewTicker(l.Cfg.PollInterval) // 检查间隔
for {
select {
case <-ticker.C:
// 获取提议者的当前指标。
metrics, err := l.GetProposerMetrics(ctx)
if err != nil {
l.Log.Error("failed to get metrics", "err", err)
continue
}
l.Log.Info("Proposer status", "metrics", metrics)
// 1) 将准备好进行证明的范围证明放入队列中。根据最新的 L2 最终区块和当前的 L2 不安全头来确定这些范围证明。
l.Log.Info("Stage 1: Getting Range Proof Boundaries...")
err = l.GetRangeProofBoundaries(ctx)
if err != nil {
l.Log.Error("failed to get range proof boundaries", "err", err)
continue
}
// 2) 检查 PROVING 请求的状态。如果成功返回,我们将验证其是否已保存在磁盘上,并将状态设置为“COMPLETE”。如果失败或超时,我们将状态设置为“FAILED”(如果是跨度证明,则将请求分成两半以重试)。
l.Log.Info("Stage 2: Processing PROVING requests...")
err = l.ProcessProvingRequests()
if err != nil {
l.Log.Error("failed to update PROVING requests", "err", err)
continue
}
// 3) 检查 WITNESSGEN 请求的状态。如果见证生成请求处于 WITNESSGEN 状态的时间超过超时时间,则将状态设置为 FAILED 并重试。
l.Log.Info("Stage 3: Processing WITNESSGEN requests...")
err = l.ProcessWitnessgenRequests()
if err != nil {
l.Log.Error("failed to update WITNESSGEN requests", "err", err)
continue
}
// 4) 确定从 L2OO 合约上的最新区块开始,是否存在连续的跨度证明链。如果有,则为所有跨度证明排队一个聚合证明。
l.Log.Info("Stage 3: Deriving Agg Proofs...")
err = l.DeriveAggProofs(ctx)
if err != nil {
l.Log.Error("failed to generate pending agg proofs", "err", err)
continue
}
// 5) 从证明者网络请求所有未请求的证明。任何状态为“UNREQ”的数据库条目都表示它已排队并准备就绪。我们从证明者网络请求所有这些(span 和 agg)。对于 agg 证明,我们还会提前检查区块哈希。
l.Log.Info("Stage 4: Requesting Queued Proofs...")
err = l.RequestQueuedProofs(ctx)
if err != nil {
l.Log.Error("failed to request unrequested proofs", "err", err)
continue
}
// 6) 在链上提交聚合证明。如果我们在数据库中有一个完整的聚合证明等待处理,我们会将其提交到链上。
l.Log.Info("Stage 5: Submitting Agg Proofs...")
err = l.SubmitAggProofs(ctx)
if err != nil {
l.Log.Error("failed to submit agg proofs", "err", err)
}
2.1 GetRangeProofBoundaries
proposer/op/proposer/range.go
// 将准备好进行证明的范围证明放入队列中。根据最新的 L2 最终区块和当前的 L2 不安全头来确定这些范围证明。
func (l *L2OutputSubmitter) GetRangeProofBoundaries(ctx context.Context) error {
// nextBlock 等于 DB 的 `EndBlock` 列中的最高值加 1。
latestL2EndBlock, err := l.db.GetLatestEndBlock()
if err != nil {
if ent.IsNotFound(err) {
latestEndBlockU256, err := l.l2ooContract.LatestBlockNumber(&bind.CallOpts{Context: ctx}) // 如果本地没有记录,则从合约读取
if err != nil {
return fmt.Errorf("failed to get latest output index: %w", err)
} else {
latestL2EndBlock = latestEndBlockU256.Uint64()
}
} else {
l.Log.Error("failed to get latest end requested", "err", err)
return err
}
}
newL2StartBlock := latestL2EndBlock
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc) // 连接L2
if err != nil {
return err
}
// 获取最新的最终确定的 L2 区块。
status, err := rollupClient.SyncStatus(ctx) // optimism_syncStatus
if err != nil {
l.Log.Error("proposer unable to get sync status", "err", err)
return err
}
// 注意:最初,这使用的是 L1 最终区块。但为了满足新 API,我们现在使用 L2 最终区块。
newL2EndBlock := status.FinalizedL2.Number
spans := l.SplitRangeBasic(newL2StartBlock, newL2EndBlock)
// 将每个跨度添加到数据库。如果没有跨度,我们将不会创建任何证明。
for _, span := range spans {
err := l.db.NewEntry(proofrequest.TypeSPAN, span.Start, span.End)
l.Log.Info("New range proof request.", "start", span.Start, "end", span.End)
if err != nil {
l.Log.Error("failed to add span to db", "err", err)
return err
}
}
return nil
}
// CreateSpans 创建一个从开始到结束大小为 MaxBlockRangePerSpanProof 的跨度列表。注意:跨度 i 的结束 = 跨度 i+1 的开始。
func (l *L2OutputSubmitter) SplitRangeBasic(start, end uint64) []Span {
spans := []Span{}
// 从开始到结束创建大小为 MaxBlockRangePerSpanProof 的跨度。每个跨度都从前一个跨度结束的地方开始。继续,直到我们在到达终点之前无法再容纳另一个完整的跨度。
for i := start; i+l.Cfg.MaxBlockRangePerSpanProof <= end; i += l.Cfg.MaxBlockRangePerSpanProof {
spans = append(spans, Span{Start: i, End: i + l.Cfg.MaxBlockRangePerSpanProof})
}
return spans
}
总结
GetRangeProofBoundaries 根据初始高度(本地记录优先,否则查找合约记录)和当前L2最终区块高度,根据设置参数MaxBlockRangePerSpanProof拆分跨度区块,创建对应的Proof任务
2.2 ProcessProvingRequests
proposer/op/proposer/prove.go
// 检查 PROVING 请求的状态。如果成功返回,我们将验证其是否已保存在磁盘上,并将状态设置为“COMPLETE”。如果失败或超时,我们将状态设置为“FAILED”(如果是跨度证明,则将请求分成两半,以重试)。
func (l *L2OutputSubmitter) ProcessProvingRequests() error {
reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusPROVING) // 获取所有当前处于 PROVING 状态的证明请求
if err != nil {
return err
}
for _, req := range reqs {
proofStatus, err := l.GetProofStatus(req.ProverRequestID)
if err != nil {
l.Log.Error("failed to get proof status for ID", "id", req.ProverRequestID, "err", err)
l.Metr.RecordError("get_proof_status", 1) // 记录获取证明状态调用的错误。
return err
}
if proofStatus.FulfillmentStatus == SP1FulfillmentStatusFulfilled {
l.Log.Info("Fulfilled Proof", "id", req.ProverRequestID)
err = l.db.AddFulfilledProof(req.ID, proofStatus.Proof) // 更新数据库中的证明并将状态更新为完成。
if err != nil {
l.Log.Error("failed to update completed proof status", "err", err)
return err
}
continue
}
if proofStatus.FulfillmentStatus == SP1FulfillmentStatusUnfulfillable {
// 记录失败原因。
l.Log.Info("Proof is unfulfillable", "id", req.ProverRequestID)
l.Metr.RecordProveFailure("unfulfillable")
err = l.RetryRequest(req, proofStatus) // 如果为区间,则拆分成2个
if err != nil {
return fmt.Errorf("failed to retry request: %w", err)
}
}
}
return nil
}
func (l *L2OutputSubmitter) RetryRequest(req *ent.ProofRequest, status ProofStatusResponse) error {
err := l.db.UpdateProofStatus(req.ID, proofrequest.StatusFAILED)
if err != nil {
l.Log.Error("failed to update proof status", "err", err)
return err
}
// // 如果出现执行错误,且请求是 SPAN 证明,且区块范围 > 1,则将请求拆分为两个请求。这可能是由于 SP1 OOM 造成的,因为区块范围较大且交易较多。
// TODO:一旦使用嵌入式分配器,就可以删除此解决方案,因为这样程序就永远不会出现 OOM。
if req.Type == proofrequest.TypeSPAN && status.ExecutionStatus == SP1ExecutionStatusUnexecutable && req.EndBlock-req.StartBlock > 1 {
// 将请求拆分为两个请求。
midBlock := (req.StartBlock + req.EndBlock) / 2
err = l.db.NewEntry(req.Type, req.StartBlock, midBlock)
if err != nil {
l.Log.Error("failed to retry first half of proof request", "err", err)
return err
}
err = l.db.NewEntry(req.Type, midBlock+1, req.EndBlock)
if err != nil {
l.Log.Error("failed to retry second half of proof request", "err", err)
return err
}
} else {
// 重试同一请求。
err = l.db.NewEntry(req.Type, req.StartBlock, req.EndBlock)
if err != nil {
l.Log.Error("failed to retry proof request", "err", err)
return err
}
}
return nil
}
总结
ProcessProvingRequests 根据本地db记录,依次向服务端l.Cfg.OPSuccinctServerUrl+"/status/"+proofId
获取所有当前处于PROVING的任务最新状态,如果SP1已生成完成,则更新本地数据为完成,如果失败则进行重试(如果为区间,则拆分成2个)
2.3 ProcessWitnessgenRequests
proposer/op/proposer/prove.go
func (l *L2OutputSubmitter) ProcessWitnessgenRequests() error {
// 获取当前处于 WITNESSGEN 状态的所有证明请求。
reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusWITNESSGEN)
if err != nil {
return err
}
for _, req := range reqs {
// 如果请求处于 WITNESSGEN 状态的时间超过超时时间(20分钟),则将状态设置为 FAILED。
if req.LastUpdatedTime+uint64(WITNESSGEN_TIMEOUT.Seconds()) < uint64(time.Now().Unix()) {
l.RetryRequest(req, ProofStatusResponse{}) // 如果超时,重试请求
}
}
return nil
}
2.4 DeriveAggProofs
proposer/op/proposer/prove.go
// 使用 L2OO 合约查找下一个证明必须覆盖的区块范围。检查数据库以查看我们是否有足够的跨度证明来请求覆盖此范围的聚合证明。如果是,则将聚合证明排队在数据库中以供稍后请求。
func (l *L2OutputSubmitter) DeriveAggProofs(ctx context.Context) error {
latest, err := l.l2ooContract.LatestBlockNumber(&bind.CallOpts{Context: ctx})
if err != nil {
return fmt.Errorf("failed to get latest L2OO output: %w", err)
}
// 这将获取下一个块号,即 currentBlock + submissionInterval。
minTo, err := l.l2ooContract.NextBlockNumber(&bind.CallOpts{Context: ctx})
if err != nil {
return fmt.Errorf("failed to get next L2OO output: %w", err)
}
created, end, err := l.db.TryCreateAggProofFromSpanProofs(latest.Uint64(), minTo.Uint64()) // 尝试从覆盖范围 [from, minTo) 的跨度证明中创建 AGG 证明。 如果创建了新的 AGG 证明,则返回 true,否则返回 false。
if err != nil {
return fmt.Errorf("failed to create agg proof from span proofs: %w", err)
}
if created {
l.Log.Info("created new AGG proof", "from", latest.Uint64(), "to", end)
}
return nil
}
2.5 RequestQueuedProofs
// proposer/op/proposer/driver.go
// 从证明者网络请求所有未请求的证明。任何状态为“UNREQ”的数据库条目都表示它已排队并准备就绪。我们从证明者网络请求所有这些(span 和 agg)。对于 agg 证明,我们还会提前检查区块哈希。
func (l *L2OutputSubmitter) RequestQueuedProofs(ctx context.Context) error {
nextProofToRequest, err := l.db.GetNextUnrequestedProof()
if err != nil {
return fmt.Errorf("failed to get unrequested proofs: %w", err)
}
if nextProofToRequest == nil {
return nil
}
if nextProofToRequest.Type == proofrequest.TypeAGG {
if nextProofToRequest.L1BlockHash == "" {
blockNumber, blockHash, err := l.checkpointBlockHash(ctx) // 获取L1最新区块-1的head, 并把区块高度通过sendCheckpointTransaction方法发送交易,l.Cfg.L2OutputOracleAddr->checkpointBlockHash 写入合约
if err != nil {
l.Log.Error("failed to checkpoint block hash", "err", err)
return err
}
nextProofToRequest, err = l.db.AddL1BlockInfoToAggRequest(nextProofToRequest.StartBlock, nextProofToRequest.EndBlock, blockNumber, blockHash.Hex()) // 创建新Proof任务
if err != nil {
l.Log.Error("failed to add L1 block info to AGG request", "err", err)
}
// 等待下一次循环,这样我们就有了添加了块信息的版本
return nil
} else {
l.Log.Info("found agg proof with already checkpointed l1 block info")
}
} else {
witnessGenProofs, err := l.db.GetNumberOfRequestsWithStatuses(proofrequest.StatusWITNESSGEN)
if err != nil {
return fmt.Errorf("failed to count witnessgen proofs: %w", err)
}
provingProofs, err := l.db.GetNumberOfRequestsWithStatuses(proofrequest.StatusPROVING)
if err != nil {
return fmt.Errorf("failed to count proving proofs: %w", err)
}
// 见证生成请求的数量上限为 MAX_CONCURRENT_WITNESS_GEN。这可以防止见证生成服务器产生的进程使机器过载。一旦 https://github.com/anton-rs/kona/issues/553 修复,我们可能就可以删除此检查。
if witnessGenProofs >= MAX_CONCURRENT_WITNESS_GEN {
l.Log.Info("max witness generation reached, waiting for next cycle")
return nil
}
// 并发证明的总数上限为 MAX_CONCURRENT_PROOF_REQUESTS。
if (witnessGenProofs + provingProofs) >= int(l.Cfg.MaxConcurrentProofRequests) {
l.Log.Info("max concurrent proof requests reached, waiting for next cycle")
return nil
}
}
go func(p ent.ProofRequest) {
l.Log.Info("requesting proof from server", "type", p.Type, "start", p.StartBlock, "end", p.EndBlock, "id", p.ID)
// 将证明状态设置为 WITNESSGEN。
err = l.db.UpdateProofStatus(p.ID, proofrequest.StatusWITNESSGEN)
if err != nil {
l.Log.Error("failed to update proof status", "err", err)
return
}
// 根据模拟配置请求证明类型。
err = l.RequestProof(p, l.Cfg.Mock)
if err != nil {
// 如果证明请求失败,我们应该将其添加到队列中以待重试。
err = l.RetryRequest(nextProofToRequest, ProofStatusResponse{})
if err != nil {
l.Log.Error("failed to retry request", "err", err)
}
}
}(*nextProofToRequest)
return nil
}
// RequestProof 处理模拟和真实证明请求
func (l *L2OutputSubmitter) RequestProof(p ent.ProofRequest, isMock bool) error {
jsonBody, err := l.prepareProofRequest(p)
if err != nil {
return err
}
if isMock { // 开启Mock
proofData, err := l.requestMockProof(p.Type, jsonBody)
if err != nil {
return fmt.Errorf("mock proof request failed: %w", err)
}
// 对于模拟证明,一旦生成了“模拟证明”,就将状态设置为 PROVING。AddFulfilledProof 期望证明处于 PROVING 状态。
err = l.db.UpdateProofStatus(p.ID, proofrequest.StatusPROVING)
if err != nil {
return fmt.Errorf("failed to set proof status to proving: %w", err)
}
return l.db.AddFulfilledProof(p.ID, proofData)
}
// 向见证生成服务器请求真实证明。从网络返回证明 ID。
proofID, err := l.requestRealProof(p.Type, jsonBody)
if err != nil {
return fmt.Errorf("real proof request failed: %w", err)
}
// 检索到证明者 ID 后,将证明状态设置为 PROVING。只有状态为 PROVING、SUCCESS 或 FAILED 的证明才有证明者请求 ID。
err = l.db.UpdateProofStatus(p.ID, proofrequest.StatusPROVING)
if err != nil {
return fmt.Errorf("failed to set proof status to proving: %w", err)
}
return l.db.SetProverRequestID(p.ID, proofID)
}
2.6 SubmitAggProofs
// 在链上提交聚合证明。如果我们在数据库中有一个完整的聚合证明等待处理,我们会将其提交到链上。
func (l *L2OutputSubmitter) SubmitAggProofs(ctx context.Context) error {
// 从 L2OutputOracle 合约获取最新的输出索引
latestBlockNumber, err := l.l2ooContract.LatestBlockNumber(&bind.CallOpts{Context: ctx})
if err != nil {
return fmt.Errorf("failed to get latest output index: %w", err)
}
// 从下一个索引开始检查已完成的 AGG 证明
completedAggProofs, err := l.db.GetAllCompletedAggProofs(latestBlockNumber.Uint64())
if err != nil {
return fmt.Errorf("failed to query for completed AGG proof: %w", err)
}
if len(completedAggProofs) == 0 {
return nil
}
// 选择具有最高 L2 块编号的聚合证明。
sort.Slice(completedAggProofs, func(i, j int) bool {
return completedAggProofs[i].EndBlock > completedAggProofs[j].EndBlock
})
// 提交具有最高 L2 块编号的聚合证明。
aggProof := completedAggProofs[0]
output, err := l.FetchOutput(ctx, aggProof.EndBlock) // 通过optimism_outputAtBlock 健全性检查,例如在出现不良 RPC 缓存的情况下
if err != nil {
return fmt.Errorf("failed to fetch output at block %d: %w", aggProof.EndBlock, err)
}
err = l.proposeOutput(ctx, output, aggProof.Proof, aggProof.L1BlockNumber)// 通过proposeOutput 进行Proof的提交
if err != nil {
return fmt.Errorf("failed to propose output: %w", err)
}
return nil
}
当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »