polygon cdk 跨链桥交易处理流程分析
数据产生
synchronizer/synchronizer.go
func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*etherman.Block, error) {
...
blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(s.ctx, fromBlock, &toBlock)
etherMan.readEvents
func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]Block, map[common.Hash][]Order, error) {
// Filter query
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(fromBlock),
Addresses: etherMan.SCAddresses,
Topics: [][]common.Hash{{updateGlobalExitRootSignatureHash, updateL1InfoTreeSignatureHash, depositEventSignatureHash, claimEventSignatureHash, oldClaimEventSignatureHash, newWrappedTokenEventSignatureHash, verifyBatchesTrustedAggregatorSignatureHash, rollupManagerVerifyBatchesSignatureHash}},
}
if toBlock != nil {
query.ToBlock = new(big.Int).SetUint64(*toBlock)
}
blocks, blocksOrder, err := etherMan.readEvents(ctx, query)
if err != nil {
return nil, nil, err
}
return blocks, blocksOrder, nil
}
func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQuery) ([]Block, map[common.Hash][]Order, error) {
logs, err := etherMan.EtherClient.FilterLogs(ctx, query)
if err != nil {
return nil, nil, err
}
var blocks []Block
blocksOrder := make(map[common.Hash][]Order)
for _, vLog := range logs {
err := etherMan.processEvent(ctx, vLog, &blocks, &blocksOrder)
func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
switch vLog.Topics[0] {
case updateGlobalExitRootSignatureHash:
return etherMan.updateGlobalExitRootEvent(ctx, vLog, blocks, blocksOrder)
updateGlobalExitRootSignatureHash = crypto.Keccak256Hash([]byte("UpdateGlobalExitRoot(bytes32,bytes32)"))
func (etherMan *Client) updateGlobalExitRootEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
etherMan.logger.Debug("UpdateGlobalExitRoot event detected. Processing...")
return etherMan.processUpdateGlobalExitRootEvent(ctx, vLog.Topics[1], vLog.Topics[2], vLog, blocks, blocksOrder)
}
func (etherMan *Client) processUpdateGlobalExitRootEvent(ctx context.Context, mainnetExitRoot, rollupExitRoot common.Hash, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
var gExitRoot GlobalExitRoot
gExitRoot.ExitRoots = make([]common.Hash, 0)
gExitRoot.ExitRoots = append(gExitRoot.ExitRoots, mainnetExitRoot)
gExitRoot.ExitRoots = append(gExitRoot.ExitRoots, rollupExitRoot)
gExitRoot.GlobalExitRoot = hash(mainnetExitRoot, rollupExitRoot)
gExitRoot.BlockNumber = vLog.BlockNumber
if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
t := time.Unix(int64(fullBlock.Time), 0)
block := prepareBlock(vLog, t, fullBlock)
block.GlobalExitRoots = append(block.GlobalExitRoots, gExitRoot)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
(*blocks)[len(*blocks)-1].GlobalExitRoots = append((*blocks)[len(*blocks)-1].GlobalExitRoots, gExitRoot)
} else {
etherMan.logger.Error("Error processing UpdateGlobalExitRoot event. BlockHash:", vLog.BlockHash, ". BlockNumber: ", vLog.BlockNumber)
return fmt.Errorf("error processing UpdateGlobalExitRoot event")
}
or := Order{
Name: GlobalExitRootsOrder,
Pos: len((*blocks)[len(*blocks)-1].GlobalExitRoots) - 1,
}
(*blocksOrder)[(*blocks)[len(*blocks)-1].BlockHash] = append((*blocksOrder)[(*blocks)[len(*blocks)-1].BlockHash], or)
return nil
}
GlobalExitRootsOrder EventOrder = "GlobalExitRoot"
func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*etherman.Block, error) {
...
err = s.processBlockRange(blocks, order)
func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
...
for _, element := range order[blocks[i].BlockHash] {
switch element.Name {
case etherman.GlobalExitRootsOrder:
isNewGer = true
err = s.processGlobalExitRoot(blocks[i].GlobalExitRoots[element.Pos], blockID, dbTx)
func (s *ClientSynchronizer) processGlobalExitRoot(globalExitRoot etherman.GlobalExitRoot, blockID uint64, dbTx pgx.Tx) error {
// Store GlobalExitRoot
globalExitRoot.BlockID = blockID
err := s.storage.AddGlobalExitRoot(s.ctx, &globalExitRoot, dbTx)
func (p *PostgresStorage) AddGlobalExitRoot(ctx context.Context, exitRoot *etherman.GlobalExitRoot, dbTx pgx.Tx) error {
const addExitRootSQL = "INSERT INTO sync.exit_root (block_id, global_exit_root, exit_roots) VALUES ($1, $2, $3)"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, addExitRootSQL, exitRoot.BlockID, exitRoot.GlobalExitRoot, pq.Array([][]byte{exitRoot.ExitRoots[0][:], exitRoot.ExitRoots[1][:]}))
return err
}
数据消费
var chsExitRootEvent []chan *etherman.GlobalExitRoot
var chsSyncedL2 []chan uint
for i, l2EthermanClient := range l2Ethermans {
log.Debug("trusted sequencer URL ", c.Etherman.L2URLs[i])
zkEVMClient := client.NewClient(c.Etherman.L2URLs[i])
chExitRootEventL2 := make(chan *etherman.GlobalExitRoot)
chSyncedL2 := make(chan uint)
chsExitRootEvent = append(chsExitRootEvent, chExitRootEventL2)
chsSyncedL2 = append(chsSyncedL2, chSyncedL2)
go runSynchronizer(ctx.Context, 0, bridgeController, l2EthermanClient, c.Synchronizer, storage, zkEVMClient, chExitRootEventL2, nil, chSyncedL2, []uint{})
}
func runSynchronizer(ctx context.Context, genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEventL2 chan *etherman.GlobalExitRoot, chsExitRootEvent []chan *etherman.GlobalExitRoot, chSynced chan uint, allNetworkIDs []uint) {
sy, err := synchronizer.NewSynchronizer(ctx, storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEventL2, chsExitRootEvent, chSynced, cfg, allNetworkIDs)
func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error {
var isNewGer bool
for i := range blocks {
...
for _, element := range order[blocks[i].BlockHash] {
switch element.Name {
case etherman.GlobalExitRootsOrder:
isNewGer = true
err = s.processGlobalExitRoot(blocks[i].GlobalExitRoots[element.Pos], blockID, dbTx)
...
if isNewGer {
// Send latest GER stored to claimTxManager
ger, err := s.storage.GetLatestL1SyncedExitRoot(s.ctx, nil)
if err != nil {
log.Errorf("networkID: %d, error getting latest GER stored on database. Error: %v", s.networkID, err)
return err
}
if s.l1RollupExitRoot != ger.ExitRoots[1] {
log.Debugf("Updating ger: %+v", ger)
s.l1RollupExitRoot = ger.ExitRoots[1]
for _, ch := range s.chsExitRootEvent {
ch <- ger
}
}
}
func (s *ClientSynchronizer) processGlobalExitRoot(globalExitRoot etherman.GlobalExitRoot, blockID uint64, dbTx pgx.Tx) error {
// Store GlobalExitRoot
globalExitRoot.BlockID = blockID
err := s.storage.AddGlobalExitRoot(s.ctx, &globalExitRoot, dbTx)
func (p *PostgresStorage) GetLatestL1SyncedExitRoot(ctx context.Context, dbTx pgx.Tx) (*etherman.GlobalExitRoot, error) {
var (
ger etherman.GlobalExitRoot
exitRoots [][]byte
)
const getLatestL1SyncedExitRootSQL = "SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root WHERE block_id > 0 AND network_id = 0 ORDER BY id DESC LIMIT 1"
err := p.getExecQuerier(dbTx).QueryRow(ctx, getLatestL1SyncedExitRootSQL).Scan(&ger.BlockID, &ger.GlobalExitRoot, pq.Array(&exitRoots))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return &ger, gerror.ErrStorageNotFound
}
return nil, err
}
ger.ExitRoots = []common.Hash{common.BytesToHash(exitRoots[0]), common.BytesToHash(exitRoots[1])}
return &ger, nil
}
func start(ctx *cli.Context) error {
if c.ClaimTxManager.Enabled {
for i := 0; i < len(c.Etherman.L2URLs); i++ {
...
claimTxManager, err := claimtxman.NewClaimTxManager(ctx, c.ClaimTxManager, chsExitRootEvent[i], chsSyncedL2[i],
c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage, rollupID, l2Ethermans[i], nonceCache, auth)
func (tm *ClaimTxManager) Start() {
...
case ger = <-tm.chExitRootEvent:
if tm.synced {
log.Debugf("RollupID: %d UpdateDepositsStatus for ger: %s", tm.rollupID, ger.GlobalExitRoot.String())
if tm.cfg.GroupingClaims.Enabled {
log.Debugf("rollupID: %d, Ger value updated and ready to be processed...", tm.rollupID)
continue
}
go func() {
err := tm.updateDepositsStatus(ger)
if err != nil {
log.Errorf("rollupID: %d, failed to update deposits status: %v", tm.rollupID, err)
}
}()
func (tm *ClaimTxManager) updateDepositsStatus(ger *etherman.GlobalExitRoot) error {
dbTx, err := tm.storage.BeginDBTransaction(tm.ctx)
if err != nil {
return err
}
err = tm.processDepositStatus(ger, dbTx)
func (tm *ClaimTxManager) processDepositStatus(ger *etherman.GlobalExitRoot, dbTx pgx.Tx) error {
if ger.BlockID != 0 { // L2 exit root is updated
...
} else { // L1 exit root is updated in the trusted state
log.Infof("RollupID: %d, Mainnet exitroot %v is updated", tm.rollupID, ger.ExitRoots[0])
deposits, err := tm.storage.UpdateL1DepositsStatus(tm.ctx, ger.ExitRoots[0][:], tm.l2NetworkID, dbTx)
func (p *PostgresStorage) UpdateL1DepositsStatus(ctx context.Context, exitRoot []byte, destinationNetwork uint, dbTx pgx.Tx) ([]*etherman.Deposit, error) {
const updateDepositsStatusSQL = `UPDATE sync.deposit SET ready_for_claim = true
合约分析
PolygonZkEVMGlobalExitRoot.sol
function updateExitRoot(bytes32 newRoot) external {
...
// If it already exists, do not modify the timestamp
if (globalExitRootMap[newGlobalExitRoot] == 0) {
globalExitRootMap[newGlobalExitRoot] = block.timestamp;
emit UpdateGlobalExitRoot(
cacheLastMainnetExitRoot,
cacheLastRollupExitRoot
);
}
}
PolygonZkEVMGlobalExitRootV2.sol
function updateExitRoot(bytes32 newRoot) external {
...
emit UpdateL1InfoTree(
cacheLastMainnetExitRoot,
cacheLastRollupExitRoot
);
emit UpdateL1InfoTreeV2(
currentL1InfoRoot,
uint32(depositCount),
lastBlockHash,
currentTimestmap
);
}
}
func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
switch vLog.Topics[0] {
...
case updateL1InfoTreeSignatureHash:
return etherMan.updateL1InfoTreeEvent(ctx, vLog, blocks, blocksOrder)
func (etherMan *Client) updateL1InfoTreeEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
etherMan.logger.Debug("UpdateL1InfoTree event detected")
globalExitRoot, err := etherMan.PolygonZkEVMGlobalExitRoot.ParseUpdateL1InfoTree(vLog)
if err != nil {
return err
}
return etherMan.processUpdateGlobalExitRootEvent(ctx, globalExitRoot.MainnetExitRoot, globalExitRoot.RollupExitRoot, vLog, blocks, blocksOrder)
}
if isNewGer {
// Send latest GER stored to claimTxManager
ger, err := s.storage.GetLatestL1SyncedExitRoot(s.ctx, nil)
if err != nil {
log.Errorf("networkID: %d, error getting latest GER stored on database. Error: %v", s.networkID, err)
return err
}
if s.l1RollupExitRoot != ger.ExitRoots[1] {
log.Debugf("Updating ger: %+v", ger)
s.l1RollupExitRoot = ger.ExitRoots[1]
for _, ch := range s.chsExitRootEvent {
ch <- ger
}
}
}
func (p *PostgresStorage) GetLatestL1SyncedExitRoot(ctx context.Context, dbTx pgx.Tx) (*etherman.GlobalExitRoot, error) {
var (
ger etherman.GlobalExitRoot
exitRoots [][]byte
)
const getLatestL1SyncedExitRootSQL = "SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root WHERE block_id > 0 AND network_id = 0 ORDER BY id DESC LIMIT 1"
err := p.getExecQuerier(dbTx).QueryRow(ctx, getLatestL1SyncedExitRootSQL).Scan(&ger.BlockID, &ger.GlobalExitRoot, pq.Array(&exitRoots))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return &ger, gerror.ErrStorageNotFound
}
return nil, err
}
ger.ExitRoots = []common.Hash{common.BytesToHash(exitRoots[0]), common.BytesToHash(exitRoots[1])}
return &ger, nil
}
// TODO 继续分析
总和分析
- PolygonZkEVMGlobalExitRoot合约升级到V2后,UpdateGlobalExitRoot事件由UpdateL1InfoTree代替,但是桥合约内部是统一处理的,不存在兼容性
- 数据库查看对应数据
SELECT block_id, global_exit_root, exit_roots FROM sync.exit_root;
block_id | global_exit_root| exit_roots
1 | \x304fb89608cfb05e5eac3b9e7c70df5821e03b6f8166baf4192f4d921c44f651 | {"\\x76ab2578418da24f3754ecfdef1de5ff8631ca577f998799287eee91b4aace2c","\\x0000000000000000000000000000000000000000000000000000000000000000"}
2 | \x8c9a2fea84f58dc994dac422ab7ee94238643f976de700a25ec3c5701b5acf32 | {"\\xcc7402af30b35a4ee3d5b793cfed1678ae8529e7ffa79e75ae2e4baaa890df21","\\x0000000000000000000000000000000000000000000000000000000000000000"}
0 | \x8c9a2fea84f58dc994dac422ab7ee94238643f976de700a25ec3c5701b5acf32 | {"\\xcc7402af30b35a4ee3d5b793cfed1678ae8529e7ffa79e75ae2e4baaa890df21","\\x0000000000000000000000000000000000000000000000000000000000000000"}