polygon cdk 跨链桥交易处理流程分析

2024-11-16 17:25:00

数据产生

synchronizer/synchronizer.go

func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*etherman.Block, error) {
...
blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(s.ctx, fromBlock, &toBlock)

etherMan.readEvents

func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]Block, map[common.Hash][]Order, error) {
    // Filter query
    query := ethereum.FilterQuery{
        FromBlock: new(big.Int).SetUint64(fromBlock),
        Addresses: etherMan.SCAddresses,
        Topics:    [][]common.Hash{{updateGlobalExitRootSignatureHash, updateL1InfoTreeSignatureHash, depositEventSignatureHash, claimEventSignatureHash, oldClaimEventSignatureHash, newWrappedTokenEventSignatureHash, verifyBatchesTrustedAggregatorSignatureHash, rollupManagerVerifyBatchesSignatureHash}},
    }
    if toBlock != nil {
        query.ToBlock = new(big.Int).SetUint64(*toBlock)
    }
    blocks, blocksOrder, err := etherMan.readEvents(ctx, query)
    if err != nil {
        return nil, nil, err
    }
    return blocks, blocksOrder, nil
}
func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQuery) ([]Block, map[common.Hash][]Order, error) {
    logs, err := etherMan.EtherClient.FilterLogs(ctx, query)
    if err != nil {
        return nil, nil, err
    }
    var blocks []Block
    blocksOrder := make(map[common.Hash][]Order)
    for _, vLog := range logs {
        err := etherMan.processEvent(ctx, vLog, &blocks, &blocksOrder)
func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
    switch vLog.Topics[0] {
    case updateGlobalExitRootSignatureHash:
        return etherMan.updateGlobalExitRootEvent(ctx, vLog, blocks, blocksOrder)
updateGlobalExitRootSignatureHash              = crypto.Keccak256Hash([]byte("UpdateGlobalExitRoot(bytes32,bytes32)"))
func (etherMan *Client) updateGlobalExitRootEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
    etherMan.logger.Debug("UpdateGlobalExitRoot event detected. Processing...")
    return etherMan.processUpdateGlobalExitRootEvent(ctx, vLog.Topics[1], vLog.Topics[2], vLog, blocks, blocksOrder)
}
func (etherMan *Client) processUpdateGlobalExitRootEvent(ctx context.Context, mainnetExitRoot, rollupExitRoot common.Hash, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
    var gExitRoot GlobalExitRoot
    gExitRoot.ExitRoots = make([]common.Hash, 0)
    gExitRoot.ExitRoots = append(gExitRoot.ExitRoots, mainnetExitRoot)
    gExitRoot.ExitRoots = append(gExitRoot.ExitRoots, rollupExitRoot)
    gExitRoot.GlobalExitRoot = hash(mainnetExitRoot, rollupExitRoot)
    gExitRoot.BlockNumber = vLog.BlockNumber

    if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
        fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
        if err != nil {
            return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
        }
        t := time.Unix(int64(fullBlock.Time), 0)
        block := prepareBlock(vLog, t, fullBlock)
        block.GlobalExitRoots = append(block.GlobalExitRoots, gExitRoot)
        *blocks = append(*blocks, block)
    } else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
        (*blocks)[len(*blocks)-1].GlobalExitRoots = append((*blocks)[len(*blocks)-1].GlobalExitRoots, gExitRoot)
    } else {
        etherMan.logger.Error("Error processing UpdateGlobalExitRoot event. BlockHash:", vLog.BlockHash, ". BlockNumber: ", vLog.BlockNumber)
        return fmt.Errorf("error processing UpdateGlobalExitRoot event")
    }
    or := Order{
        Name: GlobalExitRootsOrder,
        Pos:  len((*blocks)[len(*blocks)-1].GlobalExitRoots) - 1,
    }
    (*blocksOrder)[(*blocks)[len(*blocks)-1].BlockHash] = append((*blocksOrder)[(*blocks)[len(*blocks)-1].BlockHash], or)
    return nil
}
GlobalExitRootsOrder EventOrder = "GlobalExitRoot"
func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*etherman.Block, error) {
...
err = s.processBlockRange(blocks, order)
func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
    ...
    for _, element := range order[blocks[i].BlockHash] {
            switch element.Name {
            case etherman.GlobalExitRootsOrder:
                isNewGer = true
                err = s.processGlobalExitRoot(blocks[i].GlobalExitRoots[element.Pos], blockID, dbTx)
func (s *ClientSynchronizer) processGlobalExitRoot(globalExitRoot etherman.GlobalExitRoot, blockID uint64, dbTx pgx.Tx) error {
    // Store GlobalExitRoot
    globalExitRoot.BlockID = blockID
    err := s.storage.AddGlobalExitRoot(s.ctx, &globalExitRoot, dbTx)
func (p *PostgresStorage) AddGlobalExitRoot(ctx context.Context, exitRoot *etherman.GlobalExitRoot, dbTx pgx.Tx) error {
    const addExitRootSQL = "INSERT INTO sync.exit_root (block_id, global_exit_root, exit_roots) VALUES ($1, $2, $3)"
    e := p.getExecQuerier(dbTx)
    _, err := e.Exec(ctx, addExitRootSQL, exitRoot.BlockID, exitRoot.GlobalExitRoot, pq.Array([][]byte{exitRoot.ExitRoots[0][:], exitRoot.ExitRoots[1][:]}))
    return err
}

数据消费

var chsExitRootEvent []chan *etherman.GlobalExitRoot
    var chsSyncedL2 []chan uint
    for i, l2EthermanClient := range l2Ethermans {
        log.Debug("trusted sequencer URL ", c.Etherman.L2URLs[i])
        zkEVMClient := client.NewClient(c.Etherman.L2URLs[i])
        chExitRootEventL2 := make(chan *etherman.GlobalExitRoot)
        chSyncedL2 := make(chan uint)
        chsExitRootEvent = append(chsExitRootEvent, chExitRootEventL2)
        chsSyncedL2 = append(chsSyncedL2, chSyncedL2)
        go runSynchronizer(ctx.Context, 0, bridgeController, l2EthermanClient, c.Synchronizer, storage, zkEVMClient, chExitRootEventL2, nil, chSyncedL2, []uint{})
    }
func runSynchronizer(ctx context.Context, genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEventL2 chan *etherman.GlobalExitRoot, chsExitRootEvent []chan *etherman.GlobalExitRoot, chSynced chan uint, allNetworkIDs []uint) {
    sy, err := synchronizer.NewSynchronizer(ctx, storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEventL2, chsExitRootEvent, chSynced, cfg, allNetworkIDs)
func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
    var isNewGer bool
    for i := range blocks {
    ...
    for _, element := range order[blocks[i].BlockHash] {
            switch element.Name {
            case etherman.GlobalExitRootsOrder:
                isNewGer = true
                err = s.processGlobalExitRoot(blocks[i].GlobalExitRoots[element.Pos], blockID, dbTx)
    ...
    if isNewGer {
        // Send latest GER stored to claimTxManager
        ger, err := s.storage.GetLatestL1SyncedExitRoot(s.ctx, nil)
        if err != nil {
            log.Errorf("networkID: %d, error getting latest GER stored on database. Error: %v", s.networkID, err)
            return err
        }
        if s.l1RollupExitRoot != ger.ExitRoots[1] {
            log.Debugf("Updating ger: %+v", ger)
            s.l1RollupExitRoot = ger.ExitRoots[1]
            for _, ch := range s.chsExitRootEvent {
                ch <- ger
            }
        }
    }
func (s *ClientSynchronizer) processGlobalExitRoot(globalExitRoot etherman.GlobalExitRoot, blockID uint64, dbTx pgx.Tx) error {
    // Store GlobalExitRoot
    globalExitRoot.BlockID = blockID
    err := s.storage.AddGlobalExitRoot(s.ctx, &globalExitRoot, dbTx)
func (p *PostgresStorage) GetLatestL1SyncedExitRoot(ctx context.Context, dbTx pgx.Tx) (*etherman.GlobalExitRoot, error) {
    var (
        ger       etherman.GlobalExitRoot
        exitRoots [][]byte
    )
    const getLatestL1SyncedExitRootSQL = "SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root WHERE block_id > 0 AND network_id = 0 ORDER BY id DESC LIMIT 1"
    err := p.getExecQuerier(dbTx).QueryRow(ctx, getLatestL1SyncedExitRootSQL).Scan(&ger.BlockID, &ger.GlobalExitRoot, pq.Array(&exitRoots))
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return &ger, gerror.ErrStorageNotFound
        }
        return nil, err
    }
    ger.ExitRoots = []common.Hash{common.BytesToHash(exitRoots[0]), common.BytesToHash(exitRoots[1])}
    return &ger, nil
}
func start(ctx *cli.Context) error {
    if c.ClaimTxManager.Enabled {
        for i := 0; i < len(c.Etherman.L2URLs); i++ {
            ...
            claimTxManager, err := claimtxman.NewClaimTxManager(ctx, c.ClaimTxManager, chsExitRootEvent[i], chsSyncedL2[i],
                c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, rollupID, l2Ethermans[i], nonceCache, auth)
func (tm *ClaimTxManager) Start() {
        ...
        case ger = <-tm.chExitRootEvent:
            if tm.synced {
                log.Debugf("RollupID: %d UpdateDepositsStatus for ger: %s", tm.rollupID, ger.GlobalExitRoot.String())
                if tm.cfg.GroupingClaims.Enabled {
                    log.Debugf("rollupID: %d, Ger value updated and ready to be processed...", tm.rollupID)
                    continue
                }
                go func() {
                    err := tm.updateDepositsStatus(ger)
                    if err != nil {
                        log.Errorf("rollupID: %d, failed to update deposits status: %v", tm.rollupID, err)
                    }
                }()
func (tm *ClaimTxManager) updateDepositsStatus(ger *etherman.GlobalExitRoot) error {
    dbTx, err := tm.storage.BeginDBTransaction(tm.ctx)
    if err != nil {
        return err
    }
    err = tm.processDepositStatus(ger, dbTx)
func (tm *ClaimTxManager) processDepositStatus(ger *etherman.GlobalExitRoot, dbTx pgx.Tx) error {
    if ger.BlockID != 0 { // L2 exit root is updated
...
    } else { // L1 exit root is updated in the trusted state
        log.Infof("RollupID: %d, Mainnet exitroot %v is updated", tm.rollupID, ger.ExitRoots[0])
        deposits, err := tm.storage.UpdateL1DepositsStatus(tm.ctx, ger.ExitRoots[0][:], tm.l2NetworkID, dbTx)
func (p *PostgresStorage) UpdateL1DepositsStatus(ctx context.Context, exitRoot []byte, destinationNetwork uint, dbTx pgx.Tx) ([]*etherman.Deposit, error) {
    const updateDepositsStatusSQL = `UPDATE sync.deposit SET ready_for_claim = true 

合约分析

PolygonZkEVMGlobalExitRoot.sol

function updateExitRoot(bytes32 newRoot) external {
        ...
        // If it already exists, do not modify the timestamp
        if (globalExitRootMap[newGlobalExitRoot] == 0) {
            globalExitRootMap[newGlobalExitRoot] = block.timestamp;
            emit UpdateGlobalExitRoot(
                cacheLastMainnetExitRoot,
                cacheLastRollupExitRoot
            );
        }
    }

PolygonZkEVMGlobalExitRootV2.sol

function updateExitRoot(bytes32 newRoot) external {
        ...
            emit UpdateL1InfoTree(
                cacheLastMainnetExitRoot,
                cacheLastRollupExitRoot
            );

            emit UpdateL1InfoTreeV2(
                currentL1InfoRoot,
                uint32(depositCount),
                lastBlockHash,
                currentTimestmap
            );
        }
    }
func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
    switch vLog.Topics[0] {
...
    case updateL1InfoTreeSignatureHash:
        return etherMan.updateL1InfoTreeEvent(ctx, vLog, blocks, blocksOrder)
func (etherMan *Client) updateL1InfoTreeEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
    etherMan.logger.Debug("UpdateL1InfoTree event detected")
    globalExitRoot, err := etherMan.PolygonZkEVMGlobalExitRoot.ParseUpdateL1InfoTree(vLog)
    if err != nil {
        return err
    }
    return etherMan.processUpdateGlobalExitRootEvent(ctx, globalExitRoot.MainnetExitRoot, globalExitRoot.RollupExitRoot, vLog, blocks, blocksOrder)
}
if isNewGer {
        // Send latest GER stored to claimTxManager
        ger, err := s.storage.GetLatestL1SyncedExitRoot(s.ctx, nil)
        if err != nil {
            log.Errorf("networkID: %d, error getting latest GER stored on database. Error: %v", s.networkID, err)
            return err
        }
        if s.l1RollupExitRoot != ger.ExitRoots[1] {
            log.Debugf("Updating ger: %+v", ger)
            s.l1RollupExitRoot = ger.ExitRoots[1]
            for _, ch := range s.chsExitRootEvent {
                ch <- ger
            }
        }
    }
func (p *PostgresStorage) GetLatestL1SyncedExitRoot(ctx context.Context, dbTx pgx.Tx) (*etherman.GlobalExitRoot, error) {
    var (
        ger       etherman.GlobalExitRoot
        exitRoots [][]byte
    )
    const getLatestL1SyncedExitRootSQL = "SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root WHERE block_id > 0 AND network_id = 0 ORDER BY id DESC LIMIT 1"
    err := p.getExecQuerier(dbTx).QueryRow(ctx, getLatestL1SyncedExitRootSQL).Scan(&ger.BlockID, &ger.GlobalExitRoot, pq.Array(&exitRoots))
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return &ger, gerror.ErrStorageNotFound
        }
        return nil, err
    }
    ger.ExitRoots = []common.Hash{common.BytesToHash(exitRoots[0]), common.BytesToHash(exitRoots[1])}
    return &ger, nil
}

// TODO 继续分析

总和分析

  1. PolygonZkEVMGlobalExitRoot合约升级到V2后,UpdateGlobalExitRoot事件由UpdateL1InfoTree代替,但是桥合约内部是统一处理的,不存在兼容性
  2. 数据库查看对应数据
SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root;

block_id | global_exit_root| exit_roots

1 | \x304fb89608cfb05e5eac3b9e7c70df5821e03b6f8166baf4192f4d921c44f651 | {"\\x76ab2578418da24f3754ecfdef1de5ff8631ca577f998799287eee91b4aace2c","\\x0000000000000000000000000000000000000000000000000000000000000000"}
2 | \x8c9a2fea84f58dc994dac422ab7ee94238643f976de700a25ec3c5701b5acf32 | {"\\xcc7402af30b35a4ee3d5b793cfed1678ae8529e7ffa79e75ae2e4baaa890df21","\\x0000000000000000000000000000000000000000000000000000000000000000"}
0 | \x8c9a2fea84f58dc994dac422ab7ee94238643f976de700a25ec3c5701b5acf32 | {"\\xcc7402af30b35a4ee3d5b793cfed1678ae8529e7ffa79e75ae2e4baaa890df21","\\x0000000000000000000000000000000000000000000000000000000000000000"}
当前页面是本站的「Baidu MIP」版。发表评论请点击:完整版 »