diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 9b742da64..1e9f75717 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -1,6 +1,7 @@ package core import ( + "context" "errors" "fmt" "github.com/ethereum/go-ethereum/metrics" @@ -201,7 +202,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { p.inConfirmStage2 = false statedb.PrepareForParallel() - p.allTxReqs = make([]*ParallelTxRequest, 0, txNum) + p.allTxReqs = make([]*ParallelTxRequest, txNum) for _, slot := range p.slotState { slot.pendingTxReqList = make([]*ParallelTxRequest, 0) @@ -872,48 +873,110 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat p.commonTxs = make([]*types.Transaction, 0, txNum) p.receipts = make([]*types.Receipt, 0, txNum) - for i, tx := range allTxs { - // can be moved it into slot for efficiency, but signer is not concurrent safe - // Parallel Execution 1.0&2.0 is for full sync mode, Nonce PreCheck is not necessary - // And since we will do out-of-order execution, the Nonce PreCheck could fail. - // We will disable it and leave it to Parallel 3.0 which is for validator mode - msg, err := TransactionToMessage(tx, signer, header.BaseFee) - if err != nil { - return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) - } + parallelNum := p.parallelNum + + if txNum > parallelNum*2 && txNum >= 4 { + var wg sync.WaitGroup + errChan := make(chan error) - // find the latestDepTx from TxDAG or latestExcludedTx - latestDepTx := -1 - if dep := types.TxDependency(txDAG, i); len(dep) > 0 { - latestDepTx = int(dep[len(dep)-1]) + begin := 0 + // first try to find latestExcludeTx, as for opBNB, they are the first consecutive txs. + for idx := 0; idx < len(allTxs); idx++ { + if txDAG != nil && txDAG.TxDep(idx).CheckFlag(types.ExcludedTxFlag) { + if err := p.transferTxs(allTxs, idx, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil { + return nil, nil, 0, err + } + latestExcludedTx = idx + } else { + begin = idx + break + } } - if latestDepTx < latestExcludedTx { - latestDepTx = latestExcludedTx + + // Create a cancelable context + ctx, cancel := context.WithCancel(context.Background()) + + // Create a pool of workers + transactionsPerWorker := (len(allTxs) - begin) / parallelNum + + // Create a pool of workers + for i := 0; i < parallelNum; i++ { + wg.Add(1) + go func(start, end int, signer types.Signer, blk *types.Block, sdb *state.StateDB, cfg vm.Config, usedGas *uint64) { + defer wg.Done() + for j := start; j < end; j++ { + select { + case <-ctx.Done(): + return // Exit the goroutine if the context is canceled + default: + if err := p.transferTxs(allTxs, j, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil { + errChan <- err + cancel() // Cancel the context to stop other goroutines + return + } + } + } + }(begin+i*transactionsPerWorker, begin+(i+1)*transactionsPerWorker, signer, block, statedb, cfg, usedGas) } - // parallel start, wrap an exec message, which will be dispatched to a slot - txReq := &ParallelTxRequest{ - txIndex: i, - baseStateDB: statedb, - staticSlotIndex: -1, - tx: tx, - gasLimit: block.GasLimit(), // gp.Gas(). - msg: msg, - block: block, - vmConfig: cfg, - usedGas: usedGas, - curTxChan: make(chan int, 1), - runnable: 1, // 0: not runnable, 1: runnable - useDAG: txDAG != nil, + // Distribute any remaining transactions + for i := begin + parallelNum*transactionsPerWorker; i < len(allTxs); i++ { + if err := p.transferTxs(allTxs, i, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil { + errChan <- err + cancel() // Cancel the context to stop other goroutines + } } - txReq.executedNum.Store(0) - txReq.conflictIndex.Store(-2) - if latestDepTx >= 0 { - txReq.conflictIndex.Store(int32(latestDepTx)) + + // Wait for all workers to finish and handle errors + go func() { + wg.Wait() + close(errChan) + }() + + for err := range errChan { + return nil, nil, 0, err } - p.allTxReqs = append(p.allTxReqs, txReq) - if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) { - latestExcludedTx = i + // + } else { + for i, tx := range allTxs { + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + } + + // find the latestDepTx from TxDAG or latestExcludedTx + latestDepTx := -1 + if dep := types.TxDependency(txDAG, i); len(dep) > 0 { + latestDepTx = int(dep[len(dep)-1]) + } + if latestDepTx < latestExcludedTx { + latestDepTx = latestExcludedTx + } + + // parallel start, wrap an exec message, which will be dispatched to a slot + txReq := &ParallelTxRequest{ + txIndex: i, + baseStateDB: statedb, + staticSlotIndex: -1, + tx: tx, + gasLimit: block.GasLimit(), // gp.Gas(). + msg: msg, + block: block, + vmConfig: cfg, + usedGas: usedGas, + curTxChan: make(chan int, 1), + runnable: 1, // 0: not runnable, 1: runnable + useDAG: txDAG != nil, + } + txReq.executedNum.Store(0) + txReq.conflictIndex.Store(-2) + if latestDepTx >= 0 { + txReq.conflictIndex.Store(int32(latestDepTx)) + } + p.allTxReqs[i] = txReq + if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) { + latestExcludedTx = i + } } } allTxCount := len(p.allTxReqs) @@ -1064,6 +1127,50 @@ func (p *ParallelStateProcessor) handlePendingResultLoop() { } } +func (p *ParallelStateProcessor) transferTxs(txs types.Transactions, i int, signer types.Signer, block *types.Block, statedb *state.StateDB, cfg vm.Config, usedGas *uint64, latestExcludedTx int) error { + if p.allTxReqs[i] != nil { + return nil + } + tx := txs[i] + txDAG := cfg.TxDAG + msg, err := TransactionToMessage(tx, signer, block.Header().BaseFee) + if err != nil { + return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + } + + // find the latestDepTx from TxDAG or latestExcludedTx + latestDepTx := -1 + if dep := types.TxDependency(txDAG, i); len(dep) > 0 { + latestDepTx = int(dep[len(dep)-1]) + } + if latestDepTx < latestExcludedTx { + latestDepTx = latestExcludedTx + } + + // parallel start, wrap an exec message, which will be dispatched to a slot + txReq := &ParallelTxRequest{ + txIndex: i, + baseStateDB: statedb, + staticSlotIndex: -1, + tx: tx, + gasLimit: block.GasLimit(), // gp.Gas(). + msg: msg, + block: block, + vmConfig: cfg, + usedGas: usedGas, + curTxChan: make(chan int, 1), + runnable: 1, // 0: not runnable, 1: runnable + useDAG: txDAG != nil, + } + txReq.executedNum.Store(0) + txReq.conflictIndex.Store(-2) + if latestDepTx >= 0 { + txReq.conflictIndex.Store(int32(latestDepTx)) + } + p.allTxReqs[i] = txReq + return nil +} + func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.ParallelStateDB, evm *vm.EVM, delayGasFee bool) (*vm.EVM, *ExecutionResult, error) { // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg)