Arbitrum Sequencer 交易接收->区块打包逻辑
代码跟进
go-ethereum/internal/ethapi/api.go
func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if !b.UnprotectedAllowed() && !tx.Protected() {
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
}
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
head := b.CurrentBlock()
signer := types.MakeSigner(b.ChainConfig(), head.Number, head.Time)
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
} else {
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
}
return tx.Hash(), nil
}
// Transaction pool API
func (a *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return a.b.EnqueueL2Message(ctx, signedTx, nil)
}
go-ethereum/arbitrum/backend.go
func (b *Backend) EnqueueL2Message(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
return b.arb.PublishTransaction(ctx, tx, options)
}
execution/gethexec/arb_interface.go
func (a *ArbInterface) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
return a.txPublisher.PublishTransaction(ctx, tx, options)
}
execution/gethexec/tx_pre_checker.go
func (c *TxPreChecker) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
block := c.bc.CurrentBlock()
statedb, err := c.bc.StateAt(block.Root)
if err != nil {
return err
}
arbos, err := arbosState.OpenSystemArbosState(statedb, nil, true)
if err != nil {
return err
}
err = PreCheckTx(c.bc, c.bc.Config(), block, statedb, arbos, tx, options, c.config())
if err != nil {
return err
}
return c.TransactionPublisher.PublishTransaction(ctx, tx, options)
}
execution/gethexec/sequencer.go
func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
config := s.config()
// Only try to acquire Rlock and check for hard threshold if l1reader is not nil
// And hard threshold was enabled, this prevents spamming of read locks when not needed
if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
s.expectedSurplusMutex.RLock()
if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
return errors.New("currently not accepting transactions due to expected surplus being below threshold")
}
s.expectedSurplusMutex.RUnlock()
}
sequencerBacklogGauge.Inc(1)
defer sequencerBacklogGauge.Dec(1)
_, forwarder := s.GetPauseAndForwarder()
if forwarder != nil {
err := forwarder.PublishTransaction(parentCtx, tx, options)
if !errors.Is(err, ErrNoSequencer) {
return err
}
}
if len(s.senderWhitelist) > 0 {
signer := types.LatestSigner(s.execEngine.bc.Config())
sender, err := types.Sender(signer, tx)
if err != nil {
return err
}
_, authorized := s.senderWhitelist[sender]
if !authorized {
return errors.New("transaction sender is not on the whitelist")
}
}
if tx.Type() >= types.ArbitrumDepositTxType || tx.Type() == types.BlobTxType {
// Should be unreachable for Arbitrum types due to UnmarshalBinary not accepting Arbitrum internal txs
// and we want to disallow BlobTxType since Arbitrum doesn't support EIP-4844 txs yet.
return types.ErrTxTypeNotSupported
}
txBytes, err := tx.MarshalBinary()
if err != nil {
return err
}
queueTimeout := config.QueueTimeout
queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
defer cancelFunc()
// Just to be safe, make sure we don't run over twice the queue timeout
abortCtx, cancel := ctxWithTimeout(parentCtx, queueTimeout*2)
defer cancel()
resultChan := make(chan error, 1)
queueItem := txQueueItem{
tx,
len(txBytes),
options,
resultChan,
&atomic.Bool{},
queueCtx,
time.Now(),
}
select {
case s.txQueue <- queueItem: // 交易进入交易队列
case <-queueCtx.Done():
return queueCtx.Err()
}
select {
case res := <-resultChan:
return res
case <-abortCtx.Done():
// We use abortCtx here and not queueCtx, because the QueueTimeout only applies to the background queue.
// We want to give the background queue as much time as possible to make a response.
err := abortCtx.Err()
if parentCtx.Err() == nil {
// If we've hit the abort deadline (as opposed to parentCtx being canceled), something went wrong.
log.Warn("Transaction sequencing hit abort deadline", "err", err, "submittedAt", queueItem.firstAppearance, "queueTimeout", queueTimeout, "txHash", tx.Hash())
}
return err
}
}
func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBlockSize int
defer func() {
panicErr := recover()
if panicErr != nil {
log.Error("sequencer block creation panicked", "panic", panicErr, "backtrace", string(debug.Stack()))
// Return an internal error to any queue items we were trying to process
for _, item := range queueItems {
// This can race, but that's alright, worst case is a log line in returnResult
if !item.returnedResult.Load() {
item.returnResult(sequencerInternalError)
}
}
// Wait for the MaxBlockSpeed until attempting to create a block again
returnValue = true
}
}()
defer nonceFailureCacheSizeGauge.Update(int64(s.nonceFailures.Len()))
config := s.config()
// Clear out old nonceFailures
s.nonceFailures.Resize(config.NonceFailureCacheSize)
nextNonceExpiryTimer := s.expireNonceFailures()
defer func() {
// We wrap this in a closure as to not cache the current value of nextNonceExpiryTimer
if nextNonceExpiryTimer != nil {
nextNonceExpiryTimer.Stop()
}
}()
for {
var queueItem txQueueItem
if s.txRetryQueue.Len() > 0 { // 优先把队列需要重试的交易拿出来
queueItem = s.txRetryQueue.Pop()
} else if len(queueItems) == 0 { // 当交易池
var nextNonceExpiryChan <-chan time.Time
if nextNonceExpiryTimer != nil {
nextNonceExpiryChan = nextNonceExpiryTimer.C
}
select {
case queueItem = <-s.txQueue:
case <-nextNonceExpiryChan:
// No need to stop the previous timer since it already elapsed
nextNonceExpiryTimer = s.expireNonceFailures()
continue
case <-s.onForwarderSet:
// Make sure this notification isn't outdated
_, forwarder := s.GetPauseAndForwarder()
if forwarder != nil {
s.nonceFailures.Clear()
}
continue
case <-ctx.Done():
return false
}
} else {
done := false
select {
case queueItem = <-s.txQueue:
default:
done = true
}
if done {
break
}
}
err := queueItem.ctx.Err()
if err != nil {
queueItem.returnResult(err)
continue
}
if queueItem.txSize > config.MaxTxDataSize { // 超过设置限制的将被丢弃
// This tx is too large
queueItem.returnResult(txpool.ErrOversizedData)
continue
}
if totalBlockSize+queueItem.txSize > config.MaxTxDataSize {
// 此交易太大,无法添加到此批次
s.txRetryQueue.Push(queueItem)
// 在这里结束批处理,将此交易放入下一个交易中
break
}
totalBlockSize += queueItem.txSize
queueItems = append(queueItems, queueItem) // 交易加入当前批次队列
}
s.nonceCache.Resize(config.NonceCacheSize) // Would probably be better in a config hook but this is basically free
s.nonceCache.BeginNewBlock()
queueItems = s.precheckNonces(queueItems, totalBlockSize)
txes := make([]*types.Transaction, len(queueItems))
hooks := s.makeSequencingHooks()
hooks.ConditionalOptionsForTx = make([]*arbitrum_types.ConditionalOptions, len(queueItems))
totalBlockSize = 0 // 重新计算总块大小以进行二次检查
for i, queueItem := range queueItems {
txes[i] = queueItem.tx
totalBlockSize = arbmath.SaturatingAdd(totalBlockSize, queueItem.txSize)
hooks.ConditionalOptionsForTx[i] = queueItem.options
}
if totalBlockSize > config.MaxTxDataSize {// 如果超过,则当前批次整体进入下一轮重新计算
for _, queueItem := range queueItems {
s.txRetryQueue.Push(queueItem)
}
log.Error(
"put too many transactions in a block",
"numTxes", len(queueItems),
"totalBlockSize", totalBlockSize,
"maxTxDataSize", config.MaxTxDataSize,
)
return false
}
if s.handleInactive(ctx, queueItems) {
return false
}
timestamp := time.Now().Unix()
s.L1BlockAndTimeMutex.Lock()
l1Block := s.l1BlockNumber
l1Timestamp := s.l1Timestamp
s.L1BlockAndTimeMutex.Unlock()
if s.l1Reader != nil && (l1Block == 0 || math.Abs(float64(l1Timestamp)-float64(timestamp)) > config.MaxAcceptableTimestampDelta.Seconds()) {
for _, queueItem := range queueItems {
s.txRetryQueue.Push(queueItem)
}
log.Error(
"cannot sequence: unknown L1 block or L1 timestamp too far from local clock time",
"l1Block", l1Block,
"l1Timestamp", time.Unix(int64(l1Timestamp), 0),
"localTimestamp", time.Unix(int64(timestamp), 0),
)
return true
}
header := &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: l1pricing.BatchPosterAddress,
BlockNumber: l1Block,
Timestamp: uint64(timestamp),
RequestId: nil,
L1BaseFee: nil,
}
start := time.Now()
var (
block *types.Block
err error
)
if config.EnableProfiling {// 当enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据
block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
} else {
block, err = s.execEngine.SequenceTransactions(header, txes, hooks) // 生产环境
}
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
var blockNum *big.Int
if block != nil {
blockNum = block.Number()
}
log.Warn("took over 5 seconds to sequence a block", "elapsed", elapsed, "numTxes", len(txes), "success", block != nil, "l2Block", blockNum)
}
if err == nil && len(hooks.TxErrors) != len(txes) {
err = fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
}
if errors.Is(err, execution.ErrRetrySequencer) {
log.Warn("error sequencing transactions", "err", err)
// we changed roles
// forward if we have where to
if s.handleInactive(ctx, queueItems) {
return false
}
// try to add back to queue otherwise
for _, item := range queueItems {
s.txRetryQueue.Push(item)
}
return false
}
if err != nil {
if errors.Is(err, context.Canceled) {
// thread closed. We'll later try to forward these messages.
for _, item := range queueItems {
s.txRetryQueue.Push(item)
}
return true // don't return failure to avoid retrying immediately
}
log.Error("error sequencing transactions", "err", err)
for _, queueItem := range queueItems {
queueItem.returnResult(err)
}
return false
}
if block != nil {
successfulBlocksCounter.Inc(1)
s.nonceCache.Finalize(block)
}
madeBlock := false
for i, err := range hooks.TxErrors {
if err == nil {
madeBlock = true
}
queueItem := queueItems[i]
if errors.Is(err, core.ErrGasLimitReached) {
// 该区块中剩余的 Gas 不足以完成此项交易。
if madeBlock {
// 该块中已经有一个较早的交易;请在新的块中重试。
s.txRetryQueue.Push(queueItem)
continue
}
}
if errors.Is(err, core.ErrIntrinsicGas) {
// 删除附加信息,因为由于 L1 数据气体正确。
err = core.ErrIntrinsicGas
}
var nonceError NonceError
if errors.As(err, &nonceError) && nonceError.txNonce > nonceError.stateNonce {
s.nonceFailures.Add(nonceError, queueItem)
continue
}
queueItem.returnResult(err)
}
return madeBlock
}
execution/gethexec/executionengine.go
func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
return s.sequencerWrapper(func() (*types.Block, error) {
hooks.TxErrors = nil
return s.sequenceTransactionsWithBlockMutex(header, txes, hooks)
})
}
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
return nil, err
}
statedb, err := s.bc.StateAt(lastBlockHeader.Root)
if err != nil {
return nil, err
}
delayedMessagesRead := lastBlockHeader.Nonce.Uint64()
startTime := time.Now()
block, receipts, err := arbos.ProduceBlockAdvanced(
header,
txes,
delayedMessagesRead,
lastBlockHeader,
statedb,
s.bc,
s.bc.Config(),
hooks,
false,
)
if err != nil {
return nil, err
}
blockCalcTime := time.Since(startTime)
if len(hooks.TxErrors) != len(txes) {
return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
}
if len(receipts) == 0 {
return nil, nil
}
allTxsErrored := true
for _, err := range hooks.TxErrors {
if err == nil {
allTxsErrored = false
break
}
}
if allTxsErrored {
return nil, nil
}
msg, err := MessageFromTxes(header, txes, hooks.TxErrors)
if err != nil {
return nil, err
}
pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
if err != nil {
return nil, err
}
msgWithMeta := arbostypes.MessageWithMetadata{
Message: msg,
DelayedMessagesRead: delayedMessagesRead,
}
msgResult, err := s.resultFromHeader(block.Header())
if err != nil {
return nil, err
}
err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, *msgResult)
if err != nil {
return nil, err
}
// 仅在我们写入消息后才写入块,因此如果节点在此过程中死亡,
// 它将通过重新生成丢失的块在启动时自然恢复。
err = s.appendBlock(block, statedb, receipts, blockCalcTime)
if err != nil {
return nil, err
}
s.cacheL1PriceDataOfMsg(pos, receipts, block, false)
return block, nil
}
交易执行顺序
Arbitrum 的交易处理方式与传统的以太坊链有所不同。Arbitrum 采用了“先到先得”的交易处理顺序,因此在其体系结构中并没有传统意义上的内存池(mempool)。
由于交易是按照序列器接收的顺序进行处理的,因此 Arbitrum 交易不需要优先费;如果交易确实包含优先费,则该费用将在执行结束时退还到交易的原始地址。
注:因为Arbitrum 需要更加有效的交易传播机制,所以生产部署配置和网络,要确保所有外部节点与主节点,尽量保证最短路径,之间的网络要稳定并且延迟尽量低,保证主节点尽快收到并处理交易。
交易价格
Arbitrum 上没有内存池的概念,交易由 Sequencer 按照先到先得的原则处理。因此,gas 价格竞标参数不会影响交易的处理顺序。
因为Arbitrum链上 gasprice 无法影响执行优先级,所以用户没有主动提价的动机,
交易收取的总费用是 L2 基础费用乘以所用 L2 gas 加上 L1 调用数据费用之和