diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 592465f2ac..87917d43fa 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index 4d1ed2628d..9a2e2b0ba4 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,8 +5,10 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/holiman/uint256" ) var ( @@ -20,12 +22,18 @@ type cacheForMiner struct { pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex + + allCache map[common.Address][]*txpool.LazyTransaction + filteredCache map[common.Address][]*txpool.LazyTransaction + cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), + allCache: make(map[common.Address][]*txpool.LazyTransaction), + filteredCache: make(map[common.Address][]*txpool.LazyTransaction), } } @@ -67,8 +75,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { +func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { pending := make(map[common.Address]types.Transactions) + pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -77,11 +86,71 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { } } pc.txLock.Unlock() - for _, txs := range pending { + + // convert pending to lazyTransactions + filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) + allLazy := make(map[common.Address][]*txpool.LazyTransaction) + for addr, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) + filterd := filter(txs, addr) + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i, tx := range txs { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: tx.Hash(), + Tx: tx, + Time: tx.Time(), + GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(tx.GasTipCap()), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + allLazy[addr] = lazies + filteredLazy[addr] = lazies[:len(filterd)] + } } + + pc.cacheLock.Lock() + pc.filteredCache = filteredLazy + pc.allCache = allLazy + pc.cacheLock.Unlock() +} + +func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { + pc.cacheLock.Lock() + pending := pc.allCache + if filtered { + pending = pc.filteredCache + } + pc.cacheLock.Unlock() return pending + + //pendingLazy := make(map[common.Address][]*txpool.LazyTransaction) + //var txnum = 0 + //for addr, txs := range pending { + // // If the miner requests tip enforcement, cap the lists now + // if enforceTip && !pc.IsLocal(addr) { + // for i, tx := range txs { + // if tx.Tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + // txs = txs[:i] + // break + // } + // } + // } + // if len(txs) > 0 { + // lazies := make([]*txpool.LazyTransaction, len(txs)) + // for i, tx := range txs { + // lazies[i] = tx + // txnum++ + // } + // pendingLazy[addr] = lazies + // } + //} + //log.Info("cacheForMiner dump", "duration", time.Since(start), "accounts", len(pending), "txs", txnum) + //return pendingLazy } func (pc *cacheForMiner) markLocal(addr common.Address) { @@ -91,7 +160,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) isLocal(addr common.Address) bool { +func (pc *cacheForMiner) IsLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index a539253fa3..7953eda76e 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -349,6 +349,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) + // set dumper + pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee)) + // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -383,9 +386,27 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() + go pool.loopOfSync() return nil } +func (pool *LegacyPool) loopOfSync() { + ticker := time.NewTicker(200 * time.Millisecond) + for { + select { + case <-pool.reorgShutdownCh: + return + case <-ticker.C: + gasTip := pool.gasTip.Load() + currHead := pool.currentHead.Load() + if gasTip == nil || currHead == nil { + continue + } + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee)) + } + } +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -624,57 +645,35 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - // TODO need to confirm - defer func(t0 time.Time) { - getPendingDurationTimer.Update(time.Since(t0)) - }(time.Now()) + empty := txpool.PendingFilter{} + if filter == empty { + // return all pending transactions, no filtering + return pool.pendingCache.dump(false) + } // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyBlobTxs { return nil } + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) + // It is a bit tricky here, we don't do the filtering here. + return pool.pendingCache.dump(true) +} - // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool - var ( - minTipBig *big.Int - baseFeeBig *big.Int - ) - if filter.MinTip != nil { - minTipBig = filter.MinTip.ToBig() - } - if filter.BaseFee != nil { - baseFeeBig = filter.BaseFee.ToBig() - } - pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - for addr, txs := range pool.pendingCache.dump() { - - // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { +func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { + return func(txs types.Transactions, addr common.Address) types.Transactions { + if !pool.pendingCache.IsLocal(addr) { for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { + if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { txs = txs[:i] break } } } - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i := 0; i < len(txs); i++ { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: txs[i].Hash(), - Tx: txs[i], - Time: txs[i].Time(), - GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), - GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), - Gas: txs[i].Gas(), - BlobGas: txs[i].BlobGas(), - } - } - pending[addr] = lazies - } + return txs } - return pending } // Locals retrieves the accounts currently considered local by the pool. @@ -840,6 +839,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { + currHead := pool.currentHead.Load() + if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) { + if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) { + baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1) + pool.priced.SetBaseFee(baseFee) + } + pool.priced.Reheap() + pool.priced.currHead = currHead + } + // If the new transaction is underpriced, don't accept it if !isLocal && pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) @@ -1110,7 +1119,9 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { // to the add is finished. Only use this during tests for determinism! func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { defer func(t0 time.Time) { - addTimer.UpdateSince(t0) + if len(txs) > 0 { + addTimer.Update(time.Since(t0) / time.Duration(len(txs))) + } }(time.Now()) // Do not treat as local if local transactions have been disabled local = local && !pool.config.NoLocals @@ -1147,7 +1158,9 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error pool.mu.Lock() t0 := time.Now() newErrs, dirtyAddrs := pool.addTxsLocked(news, local) - addWithLockTimer.UpdateSince(t0) + if len(news) > 0 { + addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news))) + } pool.mu.Unlock() var nilSlot = 0 @@ -1403,6 +1416,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, reorgDurationTimer.Update(time.Since(t0)) if reset != nil { reorgresetTimer.UpdateSince(t0) + if reset.newHead != nil { + log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64()) + } } }(time.Now()) defer close(done) @@ -1451,10 +1467,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) pool.priced.SetBaseFee(pendingBaseFee) - } else { - pool.priced.Reheap() } } + gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee + go func() { + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee)) + }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index df25aff5e5..6b900ed2c0 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -2073,6 +2073,7 @@ func TestDualHeapEviction(t *testing.T) { add(false) for baseFee = 0; baseFee <= 1000; baseFee += 100 { pool.priced.SetBaseFee(big.NewInt(int64(baseFee))) + pool.priced.Reheap() add(true) check(highCap, "fee cap") add(false) diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 5aee83bb37..6b823a4a73 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -549,6 +549,7 @@ func (h *priceHeap) Pop() interface{} { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type pricedList struct { + currHead *types.Header // Current block header for effective tip calculation // Number of stale price points to (re-heap trigger). stales atomic.Int64 @@ -667,6 +668,10 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { return drop, true } +func (l *pricedList) NeedReheap(currHead *types.Header) bool { + return l.currHead == nil || currHead == nil || currHead.Hash().Cmp(l.currHead.Hash()) != 0 +} + // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *pricedList) Reheap() { l.reheapMu.Lock() @@ -698,5 +703,4 @@ func (l *pricedList) Reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *pricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee - l.Reheap() } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 7dc4cd5f37..e24cbaf25e 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -400,6 +400,15 @@ func (f *TxFetcher) Stop() { close(f.quit) } +func (f *TxFetcher) IsWorking() (bool, error) { + select { + case <-f.quit: + return false, errTerminated + default: + return true, nil + } +} + func (f *TxFetcher) loop() { var ( waitTimer = new(mclock.Timer) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 6a11bf3689..7f36ae722d 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -25,10 +25,31 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/p2p/enode" ) +// TxQueueSize is the size of the transaction queue used to enqueue transactions +const ( + TxQueueSize = 16 +) + +// enqueueTx is a channel to enqueue transactions in parallel. +// It is used to improve the performance of transaction enqueued. +var enqueueTx = make(chan func(), TxQueueSize) + +func init() { + // run the transaction enqueuing loop + for i := 0; i < TxQueueSize; i++ { + go func() { + for enqueue := range enqueueTx { + enqueue() + } + }() + } +} + // ethHandler implements the eth.Backend interface to handle the various network // packets that are sent as replies or broadcasts. type ethHandler handler @@ -92,16 +113,28 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return errors.New("disallowed broadcast blob transaction") } } - return h.txFetcher.Enqueue(peer.ID(), *packet, false) + return asyncEnqueueTx(peer, *packet, h.txFetcher, false) case *eth.PooledTransactionsResponse: - return h.txFetcher.Enqueue(peer.ID(), *packet, true) + return asyncEnqueueTx(peer, *packet, h.txFetcher, true) default: return fmt.Errorf("unexpected eth packet type: %T", packet) } } +func asyncEnqueueTx(peer *eth.Peer, txs []*types.Transaction, fetcher *fetcher.TxFetcher, directed bool) error { + if working, err := fetcher.IsWorking(); !working { + return err + } + enqueueTx <- func() { + if err := fetcher.Enqueue(peer.ID(), txs, directed); err != nil { + peer.Log().Warn("Failed to enqueue transaction", "err", err) + } + } + return nil +} + // handleBlockAnnounces is invoked from a peer's message handler when it transmits a // batch of block announcements for the local node to process. func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 002d222e64..89c848a429 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -34,6 +34,8 @@ const ( var ( txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil) txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil) + txP2PAnnQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/ann/queue", nil) + txP2PBroadQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/broad/queue", nil) ) // safeGetPeerIP @@ -133,6 +135,8 @@ func (p *Peer) broadcastTransactions() { queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] } + txP2PBroadQueueGauge.Update(int64(len(queue))) + case <-done: done = nil @@ -208,6 +212,8 @@ func (p *Peer) announceTransactions() { queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] } + txP2PAnnQueueGauge.Update(int64(len(queue))) + case <-done: done = nil diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index ffd78b0594..fc7c2a18ea 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -39,11 +39,13 @@ const ( // maxQueuedTxs is the maximum number of transactions to queue up before dropping // older broadcasts. - maxQueuedTxs = 4096 + // we need a higher limit to support 10k txs in a block + maxQueuedTxs = 98304 // maxQueuedTxAnns is the maximum number of transaction announcements to queue up // before dropping older announcements. - maxQueuedTxAnns = 4096 + // we need a higher limit to support 10k txs in a block + maxQueuedTxAnns = 98304 // maxQueuedBlocks is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 1df0a73150..5c137d4079 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,6 +178,8 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { + // wait for the transactions to be sync into cache + time.Sleep(350 * time.Millisecond) return n.beacon.Commit() } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index a8fd7913c3..9307e2105a 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,6 +214,7 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) + time.Sleep(1 * time.Second) sim.Commit() // 3. diff --git a/miner/worker.go b/miner/worker.go index c63fb81c23..2a1f8403e2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1287,7 +1287,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs diff --git a/params/bootnodes.go b/params/bootnodes.go index 62cb287da5..c080114451 100644 --- a/params/bootnodes.go +++ b/params/bootnodes.go @@ -32,6 +32,7 @@ var OpBNBMainnetBootnodes = []string{ var OpBNBTestnetBootnodes = []string{ "enr:-KO4QKFOBDW--pF4pFwv3Al_jiLOITj_Y5mr1Ajyy2yxHpFtNcBfkZEkvWUxAKXQjWALZEFxYHooU88JClyzA00e8YeGAYtBOOZig2V0aMfGhE0ZYGqAgmlkgnY0gmlwhDREiqaJc2VjcDI1NmsxoQM8pC_6wwTr5N2Q-yXQ1KGKsgz9i9EPLk8Ata65pUyYG4RzbmFwwIN0Y3CCdl-DdWRwgnZf", "enr:-KO4QFJc0KR09ye818GT2kyN9y6BAGjhz77sYimxn85jJf2hOrNqg4X0b0EsS-_ssdkzVpavqh6oMX7W5Y81xMRuEayGAYtBSiK9g2V0aMfGhE0ZYGqAgmlkgnY0gmlwhANzx96Jc2VjcDI1NmsxoQPwA1XHfWGd4umIt7j3Fc7hKq_35izIWT_9yiN_tX8lR4RzbmFwwIN0Y3CCdl-DdWRwgnZf", + "enode://c57adc1e28cc11ee9c64b5e70b8f505e890bb0eb5685423781cdf1be7655052fe318ab7562270ae2266cf28cfe982ff9ac9a89546ddc895778acabf306e99b12@3.229.26.240:0?discport=30304", } // MainnetBootnodes are the enode URLs of the P2P bootstrap nodes running on