您正在查看: 2024年10月

Experiment with Go + HTMX and Ethereum authentication

介绍

一个非常简单的 HTMX 与 Web3 示例:

  • 转到 HTML 模板引擎。
  • 用于 SSR 的 HTMX 解决方案。
  • 以太坊作为验证者。
  • (所有请求都采取+CSRF保护措施)。
  • SQLite3 与 sqlc 和 golang-migrate。

Github: https://github.com/Darkness4/auth-web3-htmx
Demo: https://auth-web3-htmx.mnguyen.fr/

Fast RLP encoder/decoder

https://github.com/umbracle/fastrlp

FastRlp 不使用反射来避免瓶颈。它提供可以编码或解码为任何特定类型的单值原语
编码:

a := &fastrlp.Arena{}

// Encode a uint
v := a.NewUint(300)
buf := v.MarshalTo(nil)

// Encode an array
v = a.NewArray()
v.Set(a.NewUint(300))
buf = v.MarshalTo(nil)

您可以在此处找到更多示例。

解码:

p := &fastrlp.Parser{}
v, err := p.Parse([]byte{0x01})
if err != nil {
    panic(err)
}

num, err := v.GetUint64()
if err != nil {
    panic(err)
}
fmt.Println(num)

Arbitrum Sequencer配置参数分析

参数 类型 默认值
enable bool false 这个参数决定了 Sequencer 是否处于激活状态,从而能够处理交易和生成区块
max-block-speed time 250 Millisecond 控制 Sequencer 生成区块的最大速度。这有助于防止系统过载、优化性能、管理资源,并减少交易处理的延迟。
max-revert-gas-reject uint64 0 控制在处理交易时的 gas 使用限制。这个参数的主要作用是防止过度消耗 gas 的情况,特别是在交易失败或回滚的情况下
max-acceptable-timestamp-delta time 1 Hour 控制 Sequencer 处理区块和交易时的时间戳容忍度
sender-whitelist string 用于控制哪些地址可以向 Sequencer 发送交易
forwarder connection-timeout time 30 Second 用于设置 Forwarder 连接到其他节点(包括 Sequencer 或 Layer 1 节点)的超时时间
idle-connection-timeout time 60 Second 设置 Forwarder 在连接保持空闲状态时的超时时间。这一配置参数控制了连接在没有活动数据传输时可以保持的最大时间长度
max-idle-connections int 100 Forwarder 能够保持的最大空闲连接数。这个参数控制了 Forwarder 在没有活跃数据传输的情况下允许保留的空闲连接的数量
redis-url string “” 用于配置 Forwarder 与 Redis 数据库的连接
update-interval time 1 Second 用于设置 Forwarder 更新其内部状态或数据的时间间隔
retry-interval time 100 Millisecond 设置 Forwarder 在遇到失败或错误时进行重试的时间间隔
queue-size int 1024 用于设置 Sequencer 接收和处理交易的队列大小。这个参数定义了 Sequencer 在内存中能够存储的交易队列的最大数量
queue-timeout time 12 Second 用于设置交易在队列中等待处理的最大时间。这个参数定义了交易在被 Sequencer 处理之前可以在队列中停留的最长时间
nonce-cache-size int 1024 用于设置 Sequencer 缓存的 nonce 值的数量
max-tx-data-size int 95000 用于控制 Sequencer 处理的单笔交易数据的最大大小。优化资源使用、提高系统稳定性、保护系统性能,并防止系统资源滥用
nonce-failure-cache-size int 1024 用于设置 Sequencer 缓存因 nonce 问题而失败的交易数量的最大值
nonce-failure-cache-expiry time 1 Second 用于设置缓存因 nonce 问题而失败的交易的过期时间
expected-surplus-soft-threshold string default 指定了Sequencer在处理交易时的软阈值,表示在网络中预期的剩余交易量的一个阈值,如果交易池中的交易数量低于这个阈值,Sequencer可以更自由地处理和确认交易;而当交易池中的交易量高于这个阈值时,Sequencer可能会采取一些措施来限制处理交易的速度,避免网络拥堵
expected-surplus-hard-threshold string default 指定了Sequencer在交易池中的硬性阈值。当交易池中的交易数量超过这个阈值时,Sequencer会采取更严格的措施来控制交易处理速度,以防止网络过载。这与expected-surplus-soft-threshold不同,后者是一个软性阈值,通常用于更灵活的流量管理,而硬性阈值则是更严格的限制。
enable-profiling bool false enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据。这些数据包括处理交易的时间、资源使用情况、网络延迟等信息。启用性能分析可以帮助开发者和运维团队了解Sequencer的运行状况、识别性能瓶颈,并优化系统以提高效率和稳定性。启用性能分析通常在调试和优化阶段是非常有用的,但在生产环境中,可能会因为性能开销而选择禁用它,除非需要深入的性能数据。

性能提升

  1. Forwarder 可以利用 Redis 的高性能缓存和数据存储功能,优化数据访问速度、支持异步处理和队列管理,从而提高系统的整体性能和效率
  2. queue-size 是 Arbitrum Nitro 中 SequencerConfig 的一个关键参数,用于控制 Sequencer 的交易队列大小。合理配置 queue-size 可以帮助优化交易处理能力、管理内存使用、应对高峰负载,并减少交易丢失的风险
  3. 合理配置 queue-timeout 可以优化交易处理效率、管理队列负载、避免交易过期,并提高系统的整体性能和响应速度。
  4. nonce-cache-size 是 Arbitrum Nitro 中 SequencerConfig 的一个重要参数,用于控制 Sequencer 缓存的 nonce 值的数量。合理配置此参数可以优化交易处理、提高系统性能、管理内存使用,并支持高吞吐量环境中的稳定运行
  5. 设置合理的 nonce-failure-cache-size 可以提高交易处理的效率,减少因 nonce 问题导致的交易重复处理时间

Arbitrum Sequencer 交易接收->区块打包逻辑

代码跟进

go-ethereum/internal/ethapi/api.go

func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
    // If the transaction fee cap is already specified, ensure the
    // fee of the given transaction is _reasonable_.
    if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
       return common.Hash{}, err
    }
    if !b.UnprotectedAllowed() && !tx.Protected() {
       // Ensure only eip155 signed transactions are submitted if EIP155Required is set.
       return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
    }
    if err := b.SendTx(ctx, tx); err != nil {
       return common.Hash{}, err
    }
    // Print a log with full tx details for manual investigations and interventions
    head := b.CurrentBlock()
    signer := types.MakeSigner(b.ChainConfig(), head.Number, head.Time)
    from, err := types.Sender(signer, tx)
    if err != nil {
       return common.Hash{}, err
    }

    if tx.To() == nil {
       addr := crypto.CreateAddress(from, tx.Nonce())
       log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
    } else {
       log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
    }
    return tx.Hash(), nil
}
// Transaction pool API
func (a *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
    return a.b.EnqueueL2Message(ctx, signedTx, nil)
}

go-ethereum/arbitrum/backend.go

func (b *Backend) EnqueueL2Message(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    return b.arb.PublishTransaction(ctx, tx, options)
}

execution/gethexec/arb_interface.go

func (a *ArbInterface) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    return a.txPublisher.PublishTransaction(ctx, tx, options)
}

execution/gethexec/tx_pre_checker.go

func (c *TxPreChecker) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    block := c.bc.CurrentBlock()
    statedb, err := c.bc.StateAt(block.Root)
    if err != nil {
       return err
    }
    arbos, err := arbosState.OpenSystemArbosState(statedb, nil, true)
    if err != nil {
       return err
    }
    err = PreCheckTx(c.bc, c.bc.Config(), block, statedb, arbos, tx, options, c.config())
    if err != nil {
       return err
    }
    return c.TransactionPublisher.PublishTransaction(ctx, tx, options)
}

execution/gethexec/sequencer.go

func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
    config := s.config()
    // Only try to acquire Rlock and check for hard threshold if l1reader is not nil
    // And hard threshold was enabled, this prevents spamming of read locks when not needed
    if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
       s.expectedSurplusMutex.RLock()
       if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
          return errors.New("currently not accepting transactions due to expected surplus being below threshold")
       }
       s.expectedSurplusMutex.RUnlock()
    }

    sequencerBacklogGauge.Inc(1)
    defer sequencerBacklogGauge.Dec(1)

    _, forwarder := s.GetPauseAndForwarder()
    if forwarder != nil {
       err := forwarder.PublishTransaction(parentCtx, tx, options)
       if !errors.Is(err, ErrNoSequencer) {
          return err
       }
    }

    if len(s.senderWhitelist) > 0 {
       signer := types.LatestSigner(s.execEngine.bc.Config())
       sender, err := types.Sender(signer, tx)
       if err != nil {
          return err
       }
       _, authorized := s.senderWhitelist[sender]
       if !authorized {
          return errors.New("transaction sender is not on the whitelist")
       }
    }
    if tx.Type() >= types.ArbitrumDepositTxType || tx.Type() == types.BlobTxType {
       // Should be unreachable for Arbitrum types due to UnmarshalBinary not accepting Arbitrum internal txs
       // and we want to disallow BlobTxType since Arbitrum doesn't support EIP-4844 txs yet.
       return types.ErrTxTypeNotSupported
    }

    txBytes, err := tx.MarshalBinary()
    if err != nil {
       return err
    }

    queueTimeout := config.QueueTimeout
    queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
    defer cancelFunc()

    // Just to be safe, make sure we don't run over twice the queue timeout
    abortCtx, cancel := ctxWithTimeout(parentCtx, queueTimeout*2)
    defer cancel()

    resultChan := make(chan error, 1)
    queueItem := txQueueItem{
       tx,
       len(txBytes),
       options,
       resultChan,
       &atomic.Bool{},
       queueCtx,
       time.Now(),
    }
    select {
    case s.txQueue <- queueItem: // 交易进入交易队列
    case <-queueCtx.Done():
       return queueCtx.Err()
    }

    select {
    case res := <-resultChan:
       return res
    case <-abortCtx.Done():
       // We use abortCtx here and not queueCtx, because the QueueTimeout only applies to the background queue.
       // We want to give the background queue as much time as possible to make a response.
       err := abortCtx.Err()
       if parentCtx.Err() == nil {
          // If we've hit the abort deadline (as opposed to parentCtx being canceled), something went wrong.
          log.Warn("Transaction sequencing hit abort deadline", "err", err, "submittedAt", queueItem.firstAppearance, "queueTimeout", queueTimeout, "txHash", tx.Hash())
       }
       return err
    }
}
func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
    var queueItems []txQueueItem
    var totalBlockSize int

    defer func() {
       panicErr := recover()
       if panicErr != nil {
          log.Error("sequencer block creation panicked", "panic", panicErr, "backtrace", string(debug.Stack()))
          // Return an internal error to any queue items we were trying to process
          for _, item := range queueItems {
             // This can race, but that's alright, worst case is a log line in returnResult
             if !item.returnedResult.Load() {
                item.returnResult(sequencerInternalError)
             }
          }
          // Wait for the MaxBlockSpeed until attempting to create a block again
          returnValue = true
       }
    }()
    defer nonceFailureCacheSizeGauge.Update(int64(s.nonceFailures.Len()))

    config := s.config()

    // Clear out old nonceFailures
    s.nonceFailures.Resize(config.NonceFailureCacheSize)
    nextNonceExpiryTimer := s.expireNonceFailures()
    defer func() {
       // We wrap this in a closure as to not cache the current value of nextNonceExpiryTimer
       if nextNonceExpiryTimer != nil {
          nextNonceExpiryTimer.Stop()
       }
    }()

    for {
       var queueItem txQueueItem
       if s.txRetryQueue.Len() > 0 { // 优先把队列需要重试的交易拿出来
          queueItem = s.txRetryQueue.Pop()
       } else if len(queueItems) == 0 { // 当交易池
          var nextNonceExpiryChan <-chan time.Time
          if nextNonceExpiryTimer != nil {
             nextNonceExpiryChan = nextNonceExpiryTimer.C
          }
          select {
          case queueItem = <-s.txQueue:
          case <-nextNonceExpiryChan:
             // No need to stop the previous timer since it already elapsed
             nextNonceExpiryTimer = s.expireNonceFailures()
             continue
          case <-s.onForwarderSet:
             // Make sure this notification isn't outdated
             _, forwarder := s.GetPauseAndForwarder()
             if forwarder != nil {
                s.nonceFailures.Clear()
             }
             continue
          case <-ctx.Done():
             return false
          }
       } else {
          done := false
          select {
          case queueItem = <-s.txQueue:
          default:
             done = true
          }
          if done {
             break
          }
       }
       err := queueItem.ctx.Err()
       if err != nil {
          queueItem.returnResult(err)
          continue
       }
       if queueItem.txSize > config.MaxTxDataSize { // 超过设置限制的将被丢弃
          // This tx is too large
          queueItem.returnResult(txpool.ErrOversizedData)
          continue
       }
       if totalBlockSize+queueItem.txSize > config.MaxTxDataSize {
          // 此交易太大,无法添加到此批次
          s.txRetryQueue.Push(queueItem)
          // 在这里结束批处理,将此交易放入下一个交易中
          break
       }
       totalBlockSize += queueItem.txSize
       queueItems = append(queueItems, queueItem) // 交易加入当前批次队列
    }

    s.nonceCache.Resize(config.NonceCacheSize) // Would probably be better in a config hook but this is basically free
    s.nonceCache.BeginNewBlock()
    queueItems = s.precheckNonces(queueItems, totalBlockSize)
    txes := make([]*types.Transaction, len(queueItems))
    hooks := s.makeSequencingHooks()
    hooks.ConditionalOptionsForTx = make([]*arbitrum_types.ConditionalOptions, len(queueItems))
    totalBlockSize = 0 // 重新计算总块大小以进行二次检查
    for i, queueItem := range queueItems {
       txes[i] = queueItem.tx
       totalBlockSize = arbmath.SaturatingAdd(totalBlockSize, queueItem.txSize)
       hooks.ConditionalOptionsForTx[i] = queueItem.options
    }

    if totalBlockSize > config.MaxTxDataSize {// 如果超过,则当前批次整体进入下一轮重新计算
       for _, queueItem := range queueItems {
          s.txRetryQueue.Push(queueItem)
       }
       log.Error(
          "put too many transactions in a block",
          "numTxes", len(queueItems),
          "totalBlockSize", totalBlockSize,
          "maxTxDataSize", config.MaxTxDataSize,
       )
       return false
    }

    if s.handleInactive(ctx, queueItems) {
       return false
    }

    timestamp := time.Now().Unix()
    s.L1BlockAndTimeMutex.Lock()
    l1Block := s.l1BlockNumber
    l1Timestamp := s.l1Timestamp
    s.L1BlockAndTimeMutex.Unlock()

    if s.l1Reader != nil && (l1Block == 0 || math.Abs(float64(l1Timestamp)-float64(timestamp)) > config.MaxAcceptableTimestampDelta.Seconds()) {
       for _, queueItem := range queueItems {
          s.txRetryQueue.Push(queueItem)
       }
       log.Error(
          "cannot sequence: unknown L1 block or L1 timestamp too far from local clock time",
          "l1Block", l1Block,
          "l1Timestamp", time.Unix(int64(l1Timestamp), 0),
          "localTimestamp", time.Unix(int64(timestamp), 0),
       )
       return true
    }

    header := &arbostypes.L1IncomingMessageHeader{
       Kind:        arbostypes.L1MessageType_L2Message,
       Poster:      l1pricing.BatchPosterAddress,
       BlockNumber: l1Block,
       Timestamp:   uint64(timestamp),
       RequestId:   nil,
       L1BaseFee:   nil,
    }

    start := time.Now()
    var (
       block *types.Block
       err   error
    )
    if config.EnableProfiling {// 当enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据
       block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
    } else {
       block, err = s.execEngine.SequenceTransactions(header, txes, hooks) // 生产环境
    }
    elapsed := time.Since(start)
    blockCreationTimer.Update(elapsed)
    if elapsed >= time.Second*5 {
       var blockNum *big.Int
       if block != nil {
          blockNum = block.Number()
       }
       log.Warn("took over 5 seconds to sequence a block", "elapsed", elapsed, "numTxes", len(txes), "success", block != nil, "l2Block", blockNum)
    }
    if err == nil && len(hooks.TxErrors) != len(txes) {
       err = fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
    }
    if errors.Is(err, execution.ErrRetrySequencer) {
       log.Warn("error sequencing transactions", "err", err)
       // we changed roles
       // forward if we have where to
       if s.handleInactive(ctx, queueItems) {
          return false
       }
       // try to add back to queue otherwise
       for _, item := range queueItems {
          s.txRetryQueue.Push(item)
       }
       return false
    }
    if err != nil {
       if errors.Is(err, context.Canceled) {
          // thread closed. We'll later try to forward these messages.
          for _, item := range queueItems {
             s.txRetryQueue.Push(item)
          }
          return true // don't return failure to avoid retrying immediately
       }
       log.Error("error sequencing transactions", "err", err)
       for _, queueItem := range queueItems {
          queueItem.returnResult(err)
       }
       return false
    }

    if block != nil {
       successfulBlocksCounter.Inc(1)
       s.nonceCache.Finalize(block)
    }

    madeBlock := false
    for i, err := range hooks.TxErrors {
       if err == nil {
          madeBlock = true
       }
       queueItem := queueItems[i]
       if errors.Is(err, core.ErrGasLimitReached) {
          // 该区块中剩余的 Gas 不足以完成此项交易。
          if madeBlock {
             // 该块中已经有一个较早的交易;请在新的块中重试。
             s.txRetryQueue.Push(queueItem)
             continue
          }
       }
       if errors.Is(err, core.ErrIntrinsicGas) {
          // 删除附加信息,因为由于 L1 数据气体正确。
          err = core.ErrIntrinsicGas
       }
       var nonceError NonceError
       if errors.As(err, &nonceError) && nonceError.txNonce > nonceError.stateNonce {
          s.nonceFailures.Add(nonceError, queueItem)
          continue
       }
       queueItem.returnResult(err)
    }
    return madeBlock
}

execution/gethexec/executionengine.go

func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
    return s.sequencerWrapper(func() (*types.Block, error) {
       hooks.TxErrors = nil
       return s.sequenceTransactionsWithBlockMutex(header, txes, hooks)
    })
}
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
    lastBlockHeader, err := s.getCurrentHeader()
    if err != nil {
       return nil, err
    }

    statedb, err := s.bc.StateAt(lastBlockHeader.Root)
    if err != nil {
       return nil, err
    }

    delayedMessagesRead := lastBlockHeader.Nonce.Uint64()

    startTime := time.Now()
    block, receipts, err := arbos.ProduceBlockAdvanced(
       header,
       txes,
       delayedMessagesRead,
       lastBlockHeader,
       statedb,
       s.bc,
       s.bc.Config(),
       hooks,
       false,
    )
    if err != nil {
       return nil, err
    }
    blockCalcTime := time.Since(startTime)
    if len(hooks.TxErrors) != len(txes) {
       return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
    }

    if len(receipts) == 0 {
       return nil, nil
    }

    allTxsErrored := true
    for _, err := range hooks.TxErrors {
       if err == nil {
          allTxsErrored = false
          break
       }
    }
    if allTxsErrored {
       return nil, nil
    }

    msg, err := MessageFromTxes(header, txes, hooks.TxErrors)
    if err != nil {
       return nil, err
    }

    pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
    if err != nil {
       return nil, err
    }

    msgWithMeta := arbostypes.MessageWithMetadata{
       Message:             msg,
       DelayedMessagesRead: delayedMessagesRead,
    }
    msgResult, err := s.resultFromHeader(block.Header())
    if err != nil {
       return nil, err
    }

    err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, *msgResult)
    if err != nil {
       return nil, err
    }

    // 仅在我们写入消息后才写入块,因此如果节点在此过程中死亡,
    // 它将通过重新生成丢失的块在启动时自然恢复。
    err = s.appendBlock(block, statedb, receipts, blockCalcTime) 
    if err != nil {
       return nil, err
    }
    s.cacheL1PriceDataOfMsg(pos, receipts, block, false)

    return block, nil
}

交易执行顺序

Arbitrum 的交易处理方式与传统的以太坊链有所不同。Arbitrum 采用了“先到先得”的交易处理顺序,因此在其体系结构中并没有传统意义上的内存池(mempool)。

由于交易是按照序列器接收的顺序进行处理的,因此 Arbitrum 交易不需要优先费;如果交易确实包含优先费,则该费用将在执行结束时退还到交易的原始地址。

注:因为Arbitrum 需要更加有效的交易传播机制,所以生产部署配置和网络,要确保所有外部节点与主节点,尽量保证最短路径,之间的网络要稳定并且延迟尽量低,保证主节点尽快收到并处理交易。

交易价格

Arbitrum 上没有内存池的概念,交易由 Sequencer 按照先到先得的原则处理。因此,gas 价格竞标参数不会影响交易的处理顺序。

因为Arbitrum链上 gasprice 无法影响执行优先级,所以用户没有主动提价的动机,

交易收取的总费用是 L2 基础费用乘以所用 L2 gas 加上 L1 调用数据费用之和