介绍

对于ForwardingTarget有两个相关参数

参数 类型 介绍
forwarding-target string 交易转发目标 URL,或“null”以禁用转发(当且仅当不是序列器)
secondary-forwarding-target []string 次要交易转发目标 URL

参数验证规则

func (c *Config) Validate() error {
    if err := c.Sequencer.Validate(); err != nil {
       return err
    }
    if !c.Sequencer.Enable && c.ForwardingTarget == "" {
       return errors.New("ForwardingTarget not set and not sequencer (can use \"null\")")
    }
    if c.ForwardingTarget == "null" {
       c.forwardingTarget = ""
    } else {
       c.forwardingTarget = c.ForwardingTarget
    }
    if c.forwardingTarget != "" && c.Sequencer.Enable {
       return errors.New("ForwardingTarget set and sequencer enabled")
    }
    return nil
}

使用场景

  1. Sequencer.Enable == true时,forwardingTarget 必须为空,即不转发交易
  2. Sequencer.Enable != true 时,ForwardingTarget 可以设置为某个接收转发的RPC, 或者设置为null 即不转发交易只查询,可用于 ReadOnly节点

逻辑分析

func CreateExecutionNode(
    ctx context.Context,
    stack *node.Node,
    chainDB ethdb.Database,
    l2BlockChain *core.BlockChain,
    l1client arbutil.L1Interface,
    configFetcher ConfigFetcher,
) (*ExecutionNode, error) {
    ...
    if config.Sequencer.Enable {
        seqConfigFetcher := func() *SequencerConfig { return &configFetcher().Sequencer }
        sequencer, err = NewSequencer(execEngine, parentChainReader, seqConfigFetcher)
        if err != nil {
           return nil, err
        }
        txPublisher = sequencer
    } else {
        if config.Forwarder.RedisUrl != "" {
           txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
        } else if config.forwardingTarget == "" {
           txPublisher = NewTxDropper()
        } else {
           targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
           txPublisher = NewForwarder(targets, &config.Forwarder)
        }
    }
    ...
}

Sequencer.Enable == false时

  1. Forwarder.RedisUrl不为空,则使用NewRedisTxForwarder,并仅使用forwardingTarget
  2. 当config.forwardingTarget为空时,即不转发交易,使用NewTxDropper
  3. Else, 同时使用forwardingTarget,SecondaryForwardingTarget
    1. 两者
func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    if !f.enabled.Load() {
       return ErrNoSequencer
    }
    ctx, cancelFunc := f.ctxWithTimeout()
    defer cancelFunc()
    for pos, rpcClient := range f.rpcClients {
       var err error
       if options == nil {
          err = f.ethClients[pos].SendTransaction(ctx, tx)
       } else {
          err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
       }
       if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
          return err
       }
       log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
    }
    return errors.New("failed to publish transaction to any of the forwarding targets")
}
// CheckHealth returns health of the highest priority forwarding target
func (f *TxForwarder) CheckHealth(inctx context.Context) error {
    // If f.enabled is true, len(f.rpcClients) should always be greater than zero,
    // but better safe than sorry.
    if !f.enabled.Load() || len(f.rpcClients) == 0 {
       return ErrNoSequencer
    }
    f.healthMutex.Lock()
    defer f.healthMutex.Unlock()
    if time.Since(f.healthChecked) > cacheUpstreamHealth {
       timeout := f.timeout
       if timeout == time.Duration(0) || timeout >= maxHealthTimeout {
          timeout = maxHealthTimeout
       }
       ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
       defer cancelFunc()
       f.healthErr = f.rpcClients[0].CallContext(ctx, nil, "arb_checkPublisherHealth")
       f.healthChecked = time.Now()
    }
    return f.healthErr
}

初始化

func (f *TxForwarder) Initialize(inctx context.Context) error {
    if f.ctx == nil {
       f.ctx = inctx
    }
    ctx, cancelFunc := f.ctxWithTimeout()
    defer cancelFunc()
    var targets []string
    var lastError error
    for _, target := range f.targets {
       if target == "" {
          continue
       }
       rpcClient, err := rpc.DialTransport(ctx, target, f.transport)
       if err != nil {
          log.Warn("error initializing a forwarding client in txForwarder", "forwarding url", target, "err", err)
          lastError = err
          continue
       }
       targets = append(targets, target)
       ethClient := ethclient.NewClient(rpcClient)
       f.rpcClients = append(f.rpcClients, rpcClient)
       f.ethClients = append(f.ethClients, ethClient)
    }
    f.targets = targets
    if len(f.rpcClients) > 0 {
       f.enabled.Store(true)
    } else {
       return lastError
    }
    return nil
}

会遍历所有的targets

区别

根据代码分析,

  • 启用Forwarder.RedisUrl时,仅使用forwardingTarget
  • 当config.forwardingTarget不为空时,forwarding-targetsecondary-forwarding-target同时叠加使用

部署优化

  • 将节点拓扑树形化,减少子叶节点与Sequencer传输距离
  • 尽可能多的覆盖同级子叶节点
  • 防止子叶节点不同层级内循环传播

TODO

继续跟进Forwarder.RedisUrl

if config.Forwarder.RedisUrl != "" {
    txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
} else if config.forwardingTarget == "" {
    txPublisher = NewTxDropper()
} else {
    targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
    txPublisher = NewForwarder(targets, &config.Forwarder)
}

NewRedisTxForwarder看位置,应该是推荐方式?相比NewForwarder性能区别是什么?Redis 共享数据加速?

// TODO 空闲再继续