介绍
一个非常简单的 HTMX 与 Web3 示例:
- 转到 HTML 模板引擎。
- 用于 SSR 的 HTMX 解决方案。
- 以太坊作为验证者。
- (所有请求都采取+CSRF保护措施)。
- SQLite3 与 sqlc 和 golang-migrate。
Github: https://github.com/Darkness4/auth-web3-htmx
Demo: https://auth-web3-htmx.mnguyen.fr/
一个非常简单的 HTMX 与 Web3 示例:
Github: https://github.com/Darkness4/auth-web3-htmx
Demo: https://auth-web3-htmx.mnguyen.fr/
https://github.com/umbracle/fastrlp
FastRlp 不使用反射来避免瓶颈。它提供可以编码或解码为任何特定类型的单值原语
编码:
a := &fastrlp.Arena{}
// Encode a uint
v := a.NewUint(300)
buf := v.MarshalTo(nil)
// Encode an array
v = a.NewArray()
v.Set(a.NewUint(300))
buf = v.MarshalTo(nil)
您可以在此处找到更多示例。
解码:
p := &fastrlp.Parser{}
v, err := p.Parse([]byte{0x01})
if err != nil {
panic(err)
}
num, err := v.GetUint64()
if err != nil {
panic(err)
}
fmt.Println(num)
参数 | 类型 | 默认值 | ||
---|---|---|---|---|
enable | bool | false | 这个参数决定了 Sequencer 是否处于激活状态,从而能够处理交易和生成区块 | |
max-block-speed | time | 250 Millisecond | 控制 Sequencer 生成区块的最大速度。这有助于防止系统过载、优化性能、管理资源,并减少交易处理的延迟。 | |
max-revert-gas-reject | uint64 | 0 | 控制在处理交易时的 gas 使用限制。这个参数的主要作用是防止过度消耗 gas 的情况,特别是在交易失败或回滚的情况下 |
|
max-acceptable-timestamp-delta | time | 1 Hour | 控制 Sequencer 处理区块和交易时的时间戳容忍度 | |
sender-whitelist | string | 用于控制哪些地址可以向 Sequencer 发送交易 | ||
forwarder | connection-timeout | time | 30 Second | 用于设置 Forwarder 连接到其他节点(包括 Sequencer 或 Layer 1 节点)的超时时间 |
idle-connection-timeout | time | 60 Second | 设置 Forwarder 在连接保持空闲状态时的超时时间。这一配置参数控制了连接在没有活动数据传输时可以保持的最大时间长度 | |
max-idle-connections | int | 100 | Forwarder 能够保持的最大空闲连接数。这个参数控制了 Forwarder 在没有活跃数据传输的情况下允许保留的空闲连接的数量 | |
redis-url | string | “” | 用于配置 Forwarder 与 Redis 数据库的连接 | |
update-interval | time | 1 Second | 用于设置 Forwarder 更新其内部状态或数据的时间间隔 | |
retry-interval | time | 100 Millisecond | 设置 Forwarder 在遇到失败或错误时进行重试的时间间隔 | |
queue-size | int | 1024 | 用于设置 Sequencer 接收和处理交易的队列大小。这个参数定义了 Sequencer 在内存中能够存储的交易队列的最大数量 | |
queue-timeout | time | 12 Second | 用于设置交易在队列中等待处理的最大时间。这个参数定义了交易在被 Sequencer 处理之前可以在队列中停留的最长时间 | |
nonce-cache-size | int | 1024 | 用于设置 Sequencer 缓存的 nonce 值的数量 | |
max-tx-data-size | int | 95000 | 用于控制 Sequencer 处理的单笔交易数据的最大大小。优化资源使用、提高系统稳定性、保护系统性能,并防止系统资源滥用 | |
nonce-failure-cache-size | int | 1024 | 用于设置 Sequencer 缓存因 nonce 问题而失败的交易数量的最大值 | |
nonce-failure-cache-expiry | time | 1 Second | 用于设置缓存因 nonce 问题而失败的交易的过期时间 | |
expected-surplus-soft-threshold | string | default | 指定了Sequencer在处理交易时的软阈值,表示在网络中预期的剩余交易量的一个阈值,如果交易池中的交易数量低于这个阈值,Sequencer可以更自由地处理和确认交易;而当交易池中的交易量高于这个阈值时,Sequencer可能会采取一些措施来限制处理交易的速度,避免网络拥堵 | |
expected-surplus-hard-threshold | string | default | 指定了Sequencer在交易池中的硬性阈值。当交易池中的交易数量超过这个阈值时,Sequencer会采取更严格的措施来控制交易处理速度,以防止网络过载。这与expected-surplus-soft-threshold 不同,后者是一个软性阈值,通常用于更灵活的流量管理,而硬性阈值则是更严格的限制。 |
|
enable-profiling | bool | false | 当enable-profiling 设置为true 时,Sequencer会收集和记录关于其操作的性能数据。这些数据包括处理交易的时间、资源使用情况、网络延迟等信息。启用性能分析可以帮助开发者和运维团队了解Sequencer的运行状况、识别性能瓶颈,并优化系统以提高效率和稳定性。启用性能分析通常在调试和优化阶段是非常有用的,但在生产环境中,可能会因为性能开销而选择禁用它,除非需要深入的性能数据。 |
queue-size
是 Arbitrum Nitro 中 SequencerConfig
的一个关键参数,用于控制 Sequencer 的交易队列大小。合理配置 queue-size
可以帮助优化交易处理能力、管理内存使用、应对高峰负载,并减少交易丢失的风险queue-timeout
可以优化交易处理效率、管理队列负载、避免交易过期,并提高系统的整体性能和响应速度。nonce-cache-size
是 Arbitrum Nitro 中 SequencerConfig
的一个重要参数,用于控制 Sequencer 缓存的 nonce 值的数量。合理配置此参数可以优化交易处理、提高系统性能、管理内存使用,并支持高吞吐量环境中的稳定运行nonce-failure-cache-size
可以提高交易处理的效率,减少因 nonce 问题导致的交易重复处理时间go-ethereum/internal/ethapi/api.go
func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if !b.UnprotectedAllowed() && !tx.Protected() {
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
}
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
head := b.CurrentBlock()
signer := types.MakeSigner(b.ChainConfig(), head.Number, head.Time)
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
} else {
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
}
return tx.Hash(), nil
}
// Transaction pool API
func (a *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return a.b.EnqueueL2Message(ctx, signedTx, nil)
}
go-ethereum/arbitrum/backend.go
func (b *Backend) EnqueueL2Message(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
return b.arb.PublishTransaction(ctx, tx, options)
}
execution/gethexec/arb_interface.go
func (a *ArbInterface) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
return a.txPublisher.PublishTransaction(ctx, tx, options)
}
execution/gethexec/tx_pre_checker.go
func (c *TxPreChecker) PublishTransaction(ctx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
block := c.bc.CurrentBlock()
statedb, err := c.bc.StateAt(block.Root)
if err != nil {
return err
}
arbos, err := arbosState.OpenSystemArbosState(statedb, nil, true)
if err != nil {
return err
}
err = PreCheckTx(c.bc, c.bc.Config(), block, statedb, arbos, tx, options, c.config())
if err != nil {
return err
}
return c.TransactionPublisher.PublishTransaction(ctx, tx, options)
}
execution/gethexec/sequencer.go
func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
config := s.config()
// Only try to acquire Rlock and check for hard threshold if l1reader is not nil
// And hard threshold was enabled, this prevents spamming of read locks when not needed
if s.l1Reader != nil && config.ExpectedSurplusHardThreshold != "default" {
s.expectedSurplusMutex.RLock()
if s.expectedSurplusUpdated && s.expectedSurplus < int64(config.expectedSurplusHardThreshold) {
return errors.New("currently not accepting transactions due to expected surplus being below threshold")
}
s.expectedSurplusMutex.RUnlock()
}
sequencerBacklogGauge.Inc(1)
defer sequencerBacklogGauge.Dec(1)
_, forwarder := s.GetPauseAndForwarder()
if forwarder != nil {
err := forwarder.PublishTransaction(parentCtx, tx, options)
if !errors.Is(err, ErrNoSequencer) {
return err
}
}
if len(s.senderWhitelist) > 0 {
signer := types.LatestSigner(s.execEngine.bc.Config())
sender, err := types.Sender(signer, tx)
if err != nil {
return err
}
_, authorized := s.senderWhitelist[sender]
if !authorized {
return errors.New("transaction sender is not on the whitelist")
}
}
if tx.Type() >= types.ArbitrumDepositTxType || tx.Type() == types.BlobTxType {
// Should be unreachable for Arbitrum types due to UnmarshalBinary not accepting Arbitrum internal txs
// and we want to disallow BlobTxType since Arbitrum doesn't support EIP-4844 txs yet.
return types.ErrTxTypeNotSupported
}
txBytes, err := tx.MarshalBinary()
if err != nil {
return err
}
queueTimeout := config.QueueTimeout
queueCtx, cancelFunc := ctxWithTimeout(parentCtx, queueTimeout)
defer cancelFunc()
// Just to be safe, make sure we don't run over twice the queue timeout
abortCtx, cancel := ctxWithTimeout(parentCtx, queueTimeout*2)
defer cancel()
resultChan := make(chan error, 1)
queueItem := txQueueItem{
tx,
len(txBytes),
options,
resultChan,
&atomic.Bool{},
queueCtx,
time.Now(),
}
select {
case s.txQueue <- queueItem: // 交易进入交易队列
case <-queueCtx.Done():
return queueCtx.Err()
}
select {
case res := <-resultChan:
return res
case <-abortCtx.Done():
// We use abortCtx here and not queueCtx, because the QueueTimeout only applies to the background queue.
// We want to give the background queue as much time as possible to make a response.
err := abortCtx.Err()
if parentCtx.Err() == nil {
// If we've hit the abort deadline (as opposed to parentCtx being canceled), something went wrong.
log.Warn("Transaction sequencing hit abort deadline", "err", err, "submittedAt", queueItem.firstAppearance, "queueTimeout", queueTimeout, "txHash", tx.Hash())
}
return err
}
}
func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBlockSize int
defer func() {
panicErr := recover()
if panicErr != nil {
log.Error("sequencer block creation panicked", "panic", panicErr, "backtrace", string(debug.Stack()))
// Return an internal error to any queue items we were trying to process
for _, item := range queueItems {
// This can race, but that's alright, worst case is a log line in returnResult
if !item.returnedResult.Load() {
item.returnResult(sequencerInternalError)
}
}
// Wait for the MaxBlockSpeed until attempting to create a block again
returnValue = true
}
}()
defer nonceFailureCacheSizeGauge.Update(int64(s.nonceFailures.Len()))
config := s.config()
// Clear out old nonceFailures
s.nonceFailures.Resize(config.NonceFailureCacheSize)
nextNonceExpiryTimer := s.expireNonceFailures()
defer func() {
// We wrap this in a closure as to not cache the current value of nextNonceExpiryTimer
if nextNonceExpiryTimer != nil {
nextNonceExpiryTimer.Stop()
}
}()
for {
var queueItem txQueueItem
if s.txRetryQueue.Len() > 0 { // 优先把队列需要重试的交易拿出来
queueItem = s.txRetryQueue.Pop()
} else if len(queueItems) == 0 { // 当交易池
var nextNonceExpiryChan <-chan time.Time
if nextNonceExpiryTimer != nil {
nextNonceExpiryChan = nextNonceExpiryTimer.C
}
select {
case queueItem = <-s.txQueue:
case <-nextNonceExpiryChan:
// No need to stop the previous timer since it already elapsed
nextNonceExpiryTimer = s.expireNonceFailures()
continue
case <-s.onForwarderSet:
// Make sure this notification isn't outdated
_, forwarder := s.GetPauseAndForwarder()
if forwarder != nil {
s.nonceFailures.Clear()
}
continue
case <-ctx.Done():
return false
}
} else {
done := false
select {
case queueItem = <-s.txQueue:
default:
done = true
}
if done {
break
}
}
err := queueItem.ctx.Err()
if err != nil {
queueItem.returnResult(err)
continue
}
if queueItem.txSize > config.MaxTxDataSize { // 超过设置限制的将被丢弃
// This tx is too large
queueItem.returnResult(txpool.ErrOversizedData)
continue
}
if totalBlockSize+queueItem.txSize > config.MaxTxDataSize {
// 此交易太大,无法添加到此批次
s.txRetryQueue.Push(queueItem)
// 在这里结束批处理,将此交易放入下一个交易中
break
}
totalBlockSize += queueItem.txSize
queueItems = append(queueItems, queueItem) // 交易加入当前批次队列
}
s.nonceCache.Resize(config.NonceCacheSize) // Would probably be better in a config hook but this is basically free
s.nonceCache.BeginNewBlock()
queueItems = s.precheckNonces(queueItems, totalBlockSize)
txes := make([]*types.Transaction, len(queueItems))
hooks := s.makeSequencingHooks()
hooks.ConditionalOptionsForTx = make([]*arbitrum_types.ConditionalOptions, len(queueItems))
totalBlockSize = 0 // 重新计算总块大小以进行二次检查
for i, queueItem := range queueItems {
txes[i] = queueItem.tx
totalBlockSize = arbmath.SaturatingAdd(totalBlockSize, queueItem.txSize)
hooks.ConditionalOptionsForTx[i] = queueItem.options
}
if totalBlockSize > config.MaxTxDataSize {// 如果超过,则当前批次整体进入下一轮重新计算
for _, queueItem := range queueItems {
s.txRetryQueue.Push(queueItem)
}
log.Error(
"put too many transactions in a block",
"numTxes", len(queueItems),
"totalBlockSize", totalBlockSize,
"maxTxDataSize", config.MaxTxDataSize,
)
return false
}
if s.handleInactive(ctx, queueItems) {
return false
}
timestamp := time.Now().Unix()
s.L1BlockAndTimeMutex.Lock()
l1Block := s.l1BlockNumber
l1Timestamp := s.l1Timestamp
s.L1BlockAndTimeMutex.Unlock()
if s.l1Reader != nil && (l1Block == 0 || math.Abs(float64(l1Timestamp)-float64(timestamp)) > config.MaxAcceptableTimestampDelta.Seconds()) {
for _, queueItem := range queueItems {
s.txRetryQueue.Push(queueItem)
}
log.Error(
"cannot sequence: unknown L1 block or L1 timestamp too far from local clock time",
"l1Block", l1Block,
"l1Timestamp", time.Unix(int64(l1Timestamp), 0),
"localTimestamp", time.Unix(int64(timestamp), 0),
)
return true
}
header := &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: l1pricing.BatchPosterAddress,
BlockNumber: l1Block,
Timestamp: uint64(timestamp),
RequestId: nil,
L1BaseFee: nil,
}
start := time.Now()
var (
block *types.Block
err error
)
if config.EnableProfiling {// 当enable-profiling设置为true时,Sequencer会收集和记录关于其操作的性能数据
block, err = s.execEngine.SequenceTransactionsWithProfiling(header, txes, hooks)
} else {
block, err = s.execEngine.SequenceTransactions(header, txes, hooks) // 生产环境
}
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
var blockNum *big.Int
if block != nil {
blockNum = block.Number()
}
log.Warn("took over 5 seconds to sequence a block", "elapsed", elapsed, "numTxes", len(txes), "success", block != nil, "l2Block", blockNum)
}
if err == nil && len(hooks.TxErrors) != len(txes) {
err = fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
}
if errors.Is(err, execution.ErrRetrySequencer) {
log.Warn("error sequencing transactions", "err", err)
// we changed roles
// forward if we have where to
if s.handleInactive(ctx, queueItems) {
return false
}
// try to add back to queue otherwise
for _, item := range queueItems {
s.txRetryQueue.Push(item)
}
return false
}
if err != nil {
if errors.Is(err, context.Canceled) {
// thread closed. We'll later try to forward these messages.
for _, item := range queueItems {
s.txRetryQueue.Push(item)
}
return true // don't return failure to avoid retrying immediately
}
log.Error("error sequencing transactions", "err", err)
for _, queueItem := range queueItems {
queueItem.returnResult(err)
}
return false
}
if block != nil {
successfulBlocksCounter.Inc(1)
s.nonceCache.Finalize(block)
}
madeBlock := false
for i, err := range hooks.TxErrors {
if err == nil {
madeBlock = true
}
queueItem := queueItems[i]
if errors.Is(err, core.ErrGasLimitReached) {
// 该区块中剩余的 Gas 不足以完成此项交易。
if madeBlock {
// 该块中已经有一个较早的交易;请在新的块中重试。
s.txRetryQueue.Push(queueItem)
continue
}
}
if errors.Is(err, core.ErrIntrinsicGas) {
// 删除附加信息,因为由于 L1 数据气体正确。
err = core.ErrIntrinsicGas
}
var nonceError NonceError
if errors.As(err, &nonceError) && nonceError.txNonce > nonceError.stateNonce {
s.nonceFailures.Add(nonceError, queueItem)
continue
}
queueItem.returnResult(err)
}
return madeBlock
}
execution/gethexec/executionengine.go
func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
return s.sequencerWrapper(func() (*types.Block, error) {
hooks.TxErrors = nil
return s.sequenceTransactionsWithBlockMutex(header, txes, hooks)
})
}
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) {
lastBlockHeader, err := s.getCurrentHeader()
if err != nil {
return nil, err
}
statedb, err := s.bc.StateAt(lastBlockHeader.Root)
if err != nil {
return nil, err
}
delayedMessagesRead := lastBlockHeader.Nonce.Uint64()
startTime := time.Now()
block, receipts, err := arbos.ProduceBlockAdvanced(
header,
txes,
delayedMessagesRead,
lastBlockHeader,
statedb,
s.bc,
s.bc.Config(),
hooks,
false,
)
if err != nil {
return nil, err
}
blockCalcTime := time.Since(startTime)
if len(hooks.TxErrors) != len(txes) {
return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
}
if len(receipts) == 0 {
return nil, nil
}
allTxsErrored := true
for _, err := range hooks.TxErrors {
if err == nil {
allTxsErrored = false
break
}
}
if allTxsErrored {
return nil, nil
}
msg, err := MessageFromTxes(header, txes, hooks.TxErrors)
if err != nil {
return nil, err
}
pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1)
if err != nil {
return nil, err
}
msgWithMeta := arbostypes.MessageWithMetadata{
Message: msg,
DelayedMessagesRead: delayedMessagesRead,
}
msgResult, err := s.resultFromHeader(block.Header())
if err != nil {
return nil, err
}
err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, *msgResult)
if err != nil {
return nil, err
}
// 仅在我们写入消息后才写入块,因此如果节点在此过程中死亡,
// 它将通过重新生成丢失的块在启动时自然恢复。
err = s.appendBlock(block, statedb, receipts, blockCalcTime)
if err != nil {
return nil, err
}
s.cacheL1PriceDataOfMsg(pos, receipts, block, false)
return block, nil
}
Arbitrum 的交易处理方式与传统的以太坊链有所不同。Arbitrum 采用了“先到先得”的交易处理顺序,因此在其体系结构中并没有传统意义上的内存池(mempool)。
由于交易是按照序列器接收的顺序进行处理的,因此 Arbitrum 交易不需要优先费;如果交易确实包含优先费,则该费用将在执行结束时退还到交易的原始地址。
注:因为Arbitrum 需要更加有效的交易传播机制,所以生产部署配置和网络,要确保所有外部节点与主节点,尽量保证最短路径,之间的网络要稳定并且延迟尽量低,保证主节点尽快收到并处理交易。
Arbitrum 上没有内存池的概念,交易由 Sequencer 按照先到先得的原则处理。因此,gas 价格竞标参数不会影响交易的处理顺序。
因为Arbitrum链上 gasprice 无法影响执行优先级,所以用户没有主动提价的动机,
交易收取的总费用是 L2 基础费用乘以所用 L2 gas 加上 L1 调用数据费用之和