介绍
对于ForwardingTarget有两个相关参数
参数 | 类型 | 介绍 |
---|---|---|
forwarding-target | string | 交易转发目标 URL,或“null”以禁用转发(当且仅当不是序列器) |
secondary-forwarding-target | []string | 次要交易转发目标 URL |
参数验证规则
func (c *Config) Validate() error {
if err := c.Sequencer.Validate(); err != nil {
return err
}
if !c.Sequencer.Enable && c.ForwardingTarget == "" {
return errors.New("ForwardingTarget not set and not sequencer (can use \"null\")")
}
if c.ForwardingTarget == "null" {
c.forwardingTarget = ""
} else {
c.forwardingTarget = c.ForwardingTarget
}
if c.forwardingTarget != "" && c.Sequencer.Enable {
return errors.New("ForwardingTarget set and sequencer enabled")
}
return nil
}
使用场景
- Sequencer.Enable == true时,forwardingTarget 必须为空,即不转发交易
- Sequencer.Enable != true 时,ForwardingTarget 可以设置为某个接收转发的RPC, 或者设置为
null
即不转发交易只查询,可用于 ReadOnly节点
逻辑分析
func CreateExecutionNode(
ctx context.Context,
stack *node.Node,
chainDB ethdb.Database,
l2BlockChain *core.BlockChain,
l1client arbutil.L1Interface,
configFetcher ConfigFetcher,
) (*ExecutionNode, error) {
...
if config.Sequencer.Enable {
seqConfigFetcher := func() *SequencerConfig { return &configFetcher().Sequencer }
sequencer, err = NewSequencer(execEngine, parentChainReader, seqConfigFetcher)
if err != nil {
return nil, err
}
txPublisher = sequencer
} else {
if config.Forwarder.RedisUrl != "" {
txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
} else if config.forwardingTarget == "" {
txPublisher = NewTxDropper()
} else {
targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
txPublisher = NewForwarder(targets, &config.Forwarder)
}
}
...
}
Sequencer.Enable == false时
- Forwarder.RedisUrl不为空,则使用NewRedisTxForwarder,并仅使用forwardingTarget
- 当config.forwardingTarget为空时,即不转发交易,使用NewTxDropper
- Else, 同时使用forwardingTarget,SecondaryForwardingTarget
- 两者
func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
if !f.enabled.Load() {
return ErrNoSequencer
}
ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
for pos, rpcClient := range f.rpcClients {
var err error
if options == nil {
err = f.ethClients[pos].SendTransaction(ctx, tx)
} else {
err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return errors.New("failed to publish transaction to any of the forwarding targets")
}
// CheckHealth returns health of the highest priority forwarding target
func (f *TxForwarder) CheckHealth(inctx context.Context) error {
// If f.enabled is true, len(f.rpcClients) should always be greater than zero,
// but better safe than sorry.
if !f.enabled.Load() || len(f.rpcClients) == 0 {
return ErrNoSequencer
}
f.healthMutex.Lock()
defer f.healthMutex.Unlock()
if time.Since(f.healthChecked) > cacheUpstreamHealth {
timeout := f.timeout
if timeout == time.Duration(0) || timeout >= maxHealthTimeout {
timeout = maxHealthTimeout
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
f.healthErr = f.rpcClients[0].CallContext(ctx, nil, "arb_checkPublisherHealth")
f.healthChecked = time.Now()
}
return f.healthErr
}
初始化
func (f *TxForwarder) Initialize(inctx context.Context) error {
if f.ctx == nil {
f.ctx = inctx
}
ctx, cancelFunc := f.ctxWithTimeout()
defer cancelFunc()
var targets []string
var lastError error
for _, target := range f.targets {
if target == "" {
continue
}
rpcClient, err := rpc.DialTransport(ctx, target, f.transport)
if err != nil {
log.Warn("error initializing a forwarding client in txForwarder", "forwarding url", target, "err", err)
lastError = err
continue
}
targets = append(targets, target)
ethClient := ethclient.NewClient(rpcClient)
f.rpcClients = append(f.rpcClients, rpcClient)
f.ethClients = append(f.ethClients, ethClient)
}
f.targets = targets
if len(f.rpcClients) > 0 {
f.enabled.Store(true)
} else {
return lastError
}
return nil
}
会遍历所有的targets
区别
根据代码分析,
- 启用Forwarder.RedisUrl时,仅使用forwardingTarget
- 当config.forwardingTarget不为空时,
forwarding-target
和secondary-forwarding-target
同时叠加使用
部署优化
- 将节点拓扑树形化,减少子叶节点与Sequencer传输距离
- 尽可能多的覆盖同级子叶节点
- 防止子叶节点不同层级内循环传播
TODO
继续跟进Forwarder.RedisUrl
if config.Forwarder.RedisUrl != "" {
txPublisher = NewRedisTxForwarder(config.forwardingTarget, &config.Forwarder)
} else if config.forwardingTarget == "" {
txPublisher = NewTxDropper()
} else {
targets := append([]string{config.forwardingTarget}, config.SecondaryForwardingTarget...)
txPublisher = NewForwarder(targets, &config.Forwarder)
}
NewRedisTxForwarder看位置,应该是推荐方式?相比NewForwarder性能区别是什么?Redis 共享数据加速?
// TODO 空闲再继续
版权属于:区块链中文技术社区 / 转载原创者
本文链接:https://bcskill.com/index.php/archives/1944.html
相关技术文章仅限于相关区块链底层技术研究,禁止用于非法用途,后果自负!本站严格遵守一切相关法律政策!