diff --git a/cmd/evm/blockrunner.go b/cmd/evm/blockrunner.go index c5d836e0ea..1eef34dd04 100644 --- a/cmd/evm/blockrunner.go +++ b/cmd/evm/blockrunner.go @@ -86,7 +86,7 @@ func blockTestCmd(ctx *cli.Context) error { continue } test := tests[name] - if err := test.Run(false, rawdb.HashScheme, tracer, func(res error, chain *core.BlockChain) { + if err := test.Run(false, rawdb.HashScheme, tracer, false, func(res error, chain *core.BlockChain) { if ctx.Bool(DumpFlag.Name) { if state, _ := chain.State(); state != nil { fmt.Println(string(state.Dump(nil))) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2e55edcaf3..784be6d7f3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -170,6 +170,8 @@ var ( utils.RollupComputePendingBlock, utils.RollupHaltOnIncompatibleProtocolVersionFlag, utils.RollupSuperchainUpgradesFlag, + utils.ParallelTxDAGFlag, + utils.ParallelTxDAGSenderPrivFlag, configFileFlag, utils.LogDebugFlag, utils.LogBacktraceAtFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e3a4a42f21..ac46716bc6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1099,11 +1099,24 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Category: flags.MetricsCategory, } + ParallelTxDAGFlag = &cli.BoolFlag{ + Name: "parallel.txdag", + Usage: "Enable the experimental parallel TxDAG generation (default = false)", + Category: flags.VMCategory, + } + VMOpcodeOptimizeFlag = &cli.BoolFlag{ Name: "vm.opcode.optimize", Usage: "enable opcode optimization", Category: flags.VMCategory, } + + ParallelTxDAGSenderPrivFlag = &cli.StringFlag{ + Name: "parallel.txdagsenderpriv", + Usage: "private key of the sender who sends the TxDAG transactions", + Value: "", + Category: flags.VMCategory, + } ) var ( @@ -1989,6 +2002,17 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.EnablePreimageRecording = ctx.Bool(VMEnableDebugFlag.Name) } + if ctx.IsSet(ParallelTxDAGFlag.Name) { + cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name) + } + + if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) { + priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name) + if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil { + Fatalf("Failed to parse txdag private key of %s, err: %v", ParallelTxDAGSenderPrivFlag.Name, err) + } + } + if ctx.IsSet(VMOpcodeOptimizeFlag.Name) { cfg.EnableOpcodeOptimizing = ctx.Bool(VMOpcodeOptimizeFlag.Name) if cfg.EnableOpcodeOptimizing { diff --git a/core/blockchain.go b/core/blockchain.go index 511d4db8a9..516fe23747 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -92,6 +92,8 @@ var ( triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil) innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) + txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil) + blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil) mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) @@ -298,6 +300,9 @@ type BlockChain struct { processor Processor // Block transaction processor interface forker *ForkChoice vmConfig vm.Config + + // parallel EVM related + enableTxDAG bool } // NewBlockChain returns a fully initialised block chain using information @@ -1987,8 +1992,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) - blockExecutionTimer.Update(ptime) // The time spent on block execution - blockValidationTimer.Update(vtime) // The time spent on block validation + txDAGGenerateTimer.Update(statedb.TxDAGGenerate) + blockExecutionTimer.Update(ptime) // The time spent on block execution + blockValidationTimer.Update(vtime) // The time spent on block validation innerExecutionTimer.Update(DebugInnerExecutionDuration) @@ -2758,3 +2764,12 @@ func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash, func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) { bc.hc.SetHead(headNumber, nil, createDelFn(bc)) } + +func (bc *BlockChain) TxDAGEnabledWhenMine() bool { + return bc.enableTxDAG +} + +func (bc *BlockChain) SetupTxDAGGeneration() { + log.Info("node enable TxDAG feature") + bc.enableTxDAG = true +} diff --git a/core/state/journal.go b/core/state/journal.go index 6cdc1fc868..a0e4b2dd53 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -163,7 +163,7 @@ func (ch createObjectChange) dirtied() *common.Address { func (ch resetObjectChange) revert(s *StateDB) { s.setStateObject(ch.prev) if !ch.prevdestruct { - delete(s.stateObjectsDestruct, ch.prev.address) + s.removeStateObjectsDestruct(ch.prev.address) } if ch.prevAccount != nil { s.accounts[ch.prev.addrHash] = ch.prevAccount diff --git a/core/state/state_object.go b/core/state/state_object.go index 8696557845..ae534ed215 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "slices" "sync" "time" @@ -70,6 +71,11 @@ type stateObject struct { origin *types.StateAccount // Account original data without any change applied, nil means it was not existent data types.StateAccount // Account data with all mutations applied in the scope of block + // dirty account state + dirtyBalance *uint256.Int + dirtyNonce *uint64 + dirtyCodeHash []byte + // Write caches. trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded @@ -96,7 +102,7 @@ type stateObject struct { // empty returns whether the account is considered empty. func (s *stateObject) empty() bool { - return s.data.Nonce == 0 && s.data.Balance.IsZero() && bytes.Equal(s.data.CodeHash, types.EmptyCodeHash.Bytes()) + return s.Nonce() == 0 && s.Balance().IsZero() && bytes.Equal(s.CodeHash(), types.EmptyCodeHash.Bytes()) } // newObject creates a state object. @@ -108,7 +114,7 @@ func newObject(db *StateDB, address common.Address, acct *types.StateAccount) *s if acct == nil { acct = types.NewEmptyStateAccount() } - return &stateObject{ + s := &stateObject{ db: db, address: address, addrHash: crypto.Keccak256Hash(address[:]), @@ -119,6 +125,15 @@ func newObject(db *StateDB, address common.Address, acct *types.StateAccount) *s dirtyStorage: make(Storage), created: created, } + + // dirty data when create a new account + if created { + s.dirtyBalance = new(uint256.Int).Set(acct.Balance) + s.dirtyNonce = new(uint64) + *s.dirtyNonce = acct.Nonce + s.dirtyCodeHash = acct.CodeHash + } + return s } // EncodeRLP implements rlp.Encoder. @@ -188,7 +203,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // 1) resurrect happened, and new slot values were set -- those should // have been handles via pendingStorage above. // 2) we don't have new values, and can deliver empty response back - if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { + if _, destructed := s.db.getStateObjectsDestruct(s.address); destructed { return common.Hash{} } // If no live objects are available, attempt to use snapshots @@ -263,6 +278,19 @@ func (s *stateObject) finalise(prefetch bool) { slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure } } + + if s.dirtyNonce != nil { + s.data.Nonce = *s.dirtyNonce + s.dirtyNonce = nil + } + if s.dirtyBalance != nil { + s.data.Balance = s.dirtyBalance + s.dirtyBalance = nil + } + if s.dirtyCodeHash != nil { + s.data.CodeHash = s.dirtyCodeHash + s.dirtyCodeHash = nil + } if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch) } @@ -271,6 +299,31 @@ func (s *stateObject) finalise(prefetch bool) { } } +func (s *stateObject) finaliseRWSet() { + if s.db.mvStates == nil { + return + } + ms := s.db.mvStates + for key, value := range s.dirtyStorage { + // three are some unclean dirtyStorage from previous reverted txs, it will skip finalise + // so add a new rule, if val has no change, then skip it + if value == s.GetCommittedState(key) { + continue + } + ms.RecordStorageWrite(s.address, key) + } + + if s.dirtyNonce != nil && *s.dirtyNonce != s.data.Nonce { + ms.RecordAccountWrite(s.address, types.AccountNonce) + } + if s.dirtyBalance != nil && s.dirtyBalance.Cmp(s.data.Balance) != 0 { + ms.RecordAccountWrite(s.address, types.AccountBalance) + } + if s.dirtyCodeHash != nil && !slices.Equal(s.dirtyCodeHash, s.data.CodeHash) { + ms.RecordAccountWrite(s.address, types.AccountCodeHash) + } +} + // updateTrie is responsible for persisting cached storage changes into the // object's storage trie. In case the storage trie is not yet loaded, this // function will load the trie automatically. If any issues arise during the @@ -463,13 +516,13 @@ func (s *stateObject) SubBalance(amount *uint256.Int) { func (s *stateObject) SetBalance(amount *uint256.Int) { s.db.journal.append(balanceChange{ account: &s.address, - prev: new(uint256.Int).Set(s.data.Balance), + prev: new(uint256.Int).Set(s.Balance()), }) s.setBalance(amount) } func (s *stateObject) setBalance(amount *uint256.Int) { - s.data.Balance = amount + s.dirtyBalance = amount } func (s *stateObject) deepCopy(db *StateDB) *stateObject { @@ -490,6 +543,16 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject { obj.selfDestructed = s.selfDestructed obj.dirtyCode = s.dirtyCode obj.deleted = s.deleted + if s.dirtyBalance != nil { + obj.dirtyBalance = new(uint256.Int).Set(s.dirtyBalance) + } + if s.dirtyNonce != nil { + obj.dirtyNonce = new(uint64) + *obj.dirtyNonce = *s.dirtyNonce + } + if s.dirtyCodeHash != nil { + obj.dirtyCodeHash = s.dirtyCodeHash + } return obj } @@ -547,7 +610,7 @@ func (s *stateObject) SetCode(codeHash common.Hash, code []byte) { func (s *stateObject) setCode(codeHash common.Hash, code []byte) { s.code = code - s.data.CodeHash = codeHash[:] + s.dirtyCodeHash = codeHash[:] s.dirtyCode = true compiler.GenOrLoadOptimizedCode(codeHash, s.code) } @@ -555,24 +618,33 @@ func (s *stateObject) setCode(codeHash common.Hash, code []byte) { func (s *stateObject) SetNonce(nonce uint64) { s.db.journal.append(nonceChange{ account: &s.address, - prev: s.data.Nonce, + prev: s.Nonce(), }) s.setNonce(nonce) } func (s *stateObject) setNonce(nonce uint64) { - s.data.Nonce = nonce + s.dirtyNonce = &nonce } func (s *stateObject) CodeHash() []byte { + if len(s.dirtyCodeHash) > 0 { + return s.dirtyCodeHash + } return s.data.CodeHash } func (s *stateObject) Balance() *uint256.Int { + if s.dirtyBalance != nil { + return s.dirtyBalance + } return s.data.Balance } func (s *stateObject) Nonce() uint64 { + if s.dirtyNonce != nil { + return *s.dirtyNonce + } return s.data.Nonce } diff --git a/core/state/statedb.go b/core/state/statedb.go index f5464eb23c..0d5e7a7eef 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "golang.org/x/exp/slices" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" @@ -90,10 +92,11 @@ type StateDB struct { // This map holds 'live' objects, which will get modified while processing // a state transition. - stateObjects map[common.Address]*stateObject - stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie - stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution - stateObjectsDestruct map[common.Address]*types.StateAccount // State objects destructed in the block along with its previous value + stateObjects map[common.Address]*stateObject + stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie + stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + stateObjectsDestruct map[common.Address]*types.StateAccount // State objects destructed in the block along with its previous value + stateObjectsDestructDirty map[common.Address]*types.StateAccount // DB error. // State objects are used by the consensus core and VM which are @@ -113,6 +116,9 @@ type StateDB struct { logs map[common.Hash][]*types.Log logSize uint + // parallel EVM related + mvStates *types.MVStates + // Preimages occurred seen by VM in the scope of block. preimages map[common.Hash][]byte @@ -143,6 +149,7 @@ type StateDB struct { TrieDBCommits time.Duration TrieCommits time.Duration CodeCommits time.Duration + TxDAGGenerate time.Duration AccountUpdated int StorageUpdated int @@ -160,24 +167,25 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return nil, err } sdb := &StateDB{ - db: db, - trie: tr, - originalRoot: root, - snaps: snaps, - accounts: make(map[common.Hash][]byte), - storages: make(map[common.Hash]map[common.Hash][]byte), - accountsOrigin: make(map[common.Address][]byte), - storagesOrigin: make(map[common.Address]map[common.Hash][]byte), - stateObjects: make(map[common.Address]*stateObject), - stateObjectsPending: make(map[common.Address]struct{}), - stateObjectsDirty: make(map[common.Address]struct{}), - stateObjectsDestruct: make(map[common.Address]*types.StateAccount), - logs: make(map[common.Hash][]*types.Log), - preimages: make(map[common.Hash][]byte), - journal: newJournal(), - accessList: newAccessList(), - transientStorage: newTransientStorage(), - hasher: crypto.NewKeccakState(), + db: db, + trie: tr, + originalRoot: root, + snaps: snaps, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject), + stateObjectsPending: make(map[common.Address]struct{}), + stateObjectsDirty: make(map[common.Address]struct{}), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount), + stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount), + logs: make(map[common.Hash][]*types.Log), + preimages: make(map[common.Hash][]byte), + journal: newJournal(), + accessList: newAccessList(), + transientStorage: newTransientStorage(), + hasher: crypto.NewKeccakState(), } if sdb.snaps != nil { sdb.snap = sdb.snaps.Snapshot(root) @@ -189,24 +197,25 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) // NewStateDBByTrie creates a new state db by a given trie. func NewStateDBByTrie(tr Trie, db Database, snaps *snapshot.Tree) (*StateDB, error) { sdb := &StateDB{ - db: db, - trie: tr, - originalRoot: tr.Hash(), - snaps: snaps, - accounts: make(map[common.Hash][]byte), - storages: make(map[common.Hash]map[common.Hash][]byte), - accountsOrigin: make(map[common.Address][]byte), - storagesOrigin: make(map[common.Address]map[common.Hash][]byte), - stateObjects: make(map[common.Address]*stateObject), - stateObjectsPending: make(map[common.Address]struct{}), - stateObjectsDirty: make(map[common.Address]struct{}), - stateObjectsDestruct: make(map[common.Address]*types.StateAccount), - logs: make(map[common.Hash][]*types.Log), - preimages: make(map[common.Hash][]byte), - journal: newJournal(), - accessList: newAccessList(), - transientStorage: newTransientStorage(), - hasher: crypto.NewKeccakState(), + db: db, + trie: tr, + originalRoot: tr.Hash(), + snaps: snaps, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject), + stateObjectsPending: make(map[common.Address]struct{}), + stateObjectsDirty: make(map[common.Address]struct{}), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount), + stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount), + logs: make(map[common.Hash][]*types.Log), + preimages: make(map[common.Hash][]byte), + journal: newJournal(), + accessList: newAccessList(), + transientStorage: newTransientStorage(), + hasher: crypto.NewKeccakState(), } if sdb.snaps != nil { sdb.snap = sdb.snaps.Snapshot(tr.Hash()) @@ -337,6 +346,10 @@ func (s *StateDB) Empty(addr common.Address) bool { // GetBalance retrieves the balance from the given address or 0 if object not found func (s *StateDB) GetBalance(addr common.Address) *uint256.Int { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountBalance) + } + stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.Balance() @@ -346,6 +359,9 @@ func (s *StateDB) GetBalance(addr common.Address) *uint256.Int { // GetNonce retrieves the nonce from the given address or 0 if object not found func (s *StateDB) GetNonce(addr common.Address) uint64 { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountNonce) + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.Nonce() @@ -370,6 +386,9 @@ func (s *StateDB) TxIndex() int { } func (s *StateDB) GetCode(addr common.Address) []byte { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountCodeHash) + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.Code() @@ -378,6 +397,9 @@ func (s *StateDB) GetCode(addr common.Address) []byte { } func (s *StateDB) GetCodeSize(addr common.Address) int { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountCodeHash) + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.CodeSize() @@ -386,6 +408,9 @@ func (s *StateDB) GetCodeSize(addr common.Address) int { } func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountCodeHash) + } stateObject := s.getStateObject(addr) if stateObject != nil { return common.BytesToHash(stateObject.CodeHash()) @@ -395,6 +420,9 @@ func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { // GetState retrieves a value from the given account's storage trie. func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { + if s.mvStates != nil { + s.mvStates.RecordStorageRead(addr, hash) + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.GetState(hash) @@ -404,6 +432,9 @@ func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { // GetCommittedState retrieves a value from the given account's committed storage trie. func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash { + if s.mvStates != nil { + s.mvStates.RecordStorageRead(addr, hash) + } stateObject := s.getStateObject(addr) if stateObject != nil { return stateObject.GetCommittedState(hash) @@ -430,17 +461,25 @@ func (s *StateDB) HasSelfDestructed(addr common.Address) bool { // AddBalance adds amount to the account associated with addr. func (s *StateDB) AddBalance(addr common.Address, amount *uint256.Int) { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountBalance) + } stateObject := s.getOrNewStateObject(addr) if stateObject != nil { stateObject.AddBalance(amount) + return } } // SubBalance subtracts amount from the account associated with addr. func (s *StateDB) SubBalance(addr common.Address, amount *uint256.Int) { + if s.mvStates != nil { + s.mvStates.RecordAccountRead(addr, types.AccountBalance) + } stateObject := s.getOrNewStateObject(addr) if stateObject != nil { stateObject.SubBalance(amount) + return } } @@ -484,8 +523,8 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common // // TODO(rjl493456442) this function should only be supported by 'unwritable' // state and all mutations made should all be discarded afterwards. - if _, ok := s.stateObjectsDestruct[addr]; !ok { - s.stateObjectsDestruct[addr] = nil + if _, ok := s.getStateObjectsDestruct(addr); !ok { + s.setStateObjectsDestruct(addr, nil) } stateObject := s.getOrNewStateObject(addr) for k, v := range storage { @@ -509,7 +548,7 @@ func (s *StateDB) SelfDestruct(addr common.Address) { prevbalance: new(uint256.Int).Set(stateObject.Balance()), }) stateObject.markSelfdestructed() - stateObject.data.Balance = new(uint256.Int) + stateObject.setBalance(new(uint256.Int)) } func (s *StateDB) Selfdestruct6780(addr common.Address) { @@ -698,9 +737,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject) // account and storage data should be cleared as well. Note, it must // be done here, otherwise the destruction event of "original account" // will be lost. - _, prevdestruct := s.stateObjectsDestruct[prev.address] + _, prevdestruct := s.getStateObjectsDestruct(prev.address) if !prevdestruct { - s.stateObjectsDestruct[prev.address] = prev.origin + s.setStateObjectsDestruct(prev.address, prev.origin) } // There may be some cached account/storage data already since IntermediateRoot // will be called for each transaction before byzantium fork which will always @@ -741,32 +780,42 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject) func (s *StateDB) CreateAccount(addr common.Address) { newObj, prev := s.createObject(addr) if prev != nil { - newObj.setBalance(prev.data.Balance) + newObj.setBalance(prev.Balance()) } } +// CopyWithMvStates will copy state with MVStates +func (s *StateDB) CopyWithMvStates() *StateDB { + state := s.Copy() + if s.mvStates != nil { + state.mvStates = s.mvStates.Copy() + } + return state +} + // Copy creates a deep, independent copy of the state. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { // Copy all the basic fields, initialize the memory ones state := &StateDB{ - db: s.db, - trie: s.db.CopyTrie(s.trie), - originalRoot: s.originalRoot, - accounts: make(map[common.Hash][]byte), - storages: make(map[common.Hash]map[common.Hash][]byte), - accountsOrigin: make(map[common.Address][]byte), - storagesOrigin: make(map[common.Address]map[common.Hash][]byte), - stateObjects: make(map[common.Address]*stateObject, len(s.journal.dirties)), - stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), - stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), - stateObjectsDestruct: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestruct)), - refund: s.refund, - logs: make(map[common.Hash][]*types.Log, len(s.logs)), - logSize: s.logSize, - preimages: make(map[common.Hash][]byte, len(s.preimages)), - journal: newJournal(), - hasher: crypto.NewKeccakState(), + db: s.db, + trie: s.db.CopyTrie(s.trie), + originalRoot: s.originalRoot, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject, len(s.journal.dirties)), + stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), + stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestruct)), + stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestructDirty)), + refund: s.refund, + logs: make(map[common.Hash][]*types.Log, len(s.logs)), + logSize: s.logSize, + preimages: make(map[common.Hash][]byte, len(s.preimages)), + journal: newJournal(), + hasher: crypto.NewKeccakState(), // In order for the block producer to be able to use and make additions // to the snapshot tree, we need to copy that as well. Otherwise, any @@ -811,6 +860,9 @@ func (s *StateDB) Copy() *StateDB { for addr, value := range s.stateObjectsDestruct { state.stateObjectsDestruct[addr] = value } + for addr, value := range s.stateObjectsDestructDirty { + state.stateObjectsDestructDirty[addr] = value + } // Deep copy the state changes made in the scope of block // along with their original values. state.accounts = copySet(s.accounts) @@ -882,7 +934,20 @@ func (s *StateDB) GetRefund() uint64 { // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. func (s *StateDB) Finalise(deleteEmptyObjects bool) { + var feeReceivers []common.Address + if s.mvStates != nil { + feeReceivers = s.mvStates.FeeReceivers() + } addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) + + // finalise stateObjectsDestruct + for addr, acc := range s.stateObjectsDestructDirty { + s.stateObjectsDestruct[addr] = acc + if s.mvStates != nil && !slices.Contains(feeReceivers, addr) { + s.mvStates.RecordAccountWrite(addr, types.AccountSuicide) + } + } + s.stateObjectsDestructDirty = make(map[common.Address]*types.StateAccount) for addr := range s.journal.dirties { obj, exist := s.stateObjects[addr] if !exist { @@ -902,6 +967,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // event is tracked. if _, ok := s.stateObjectsDestruct[obj.address]; !ok { s.stateObjectsDestruct[obj.address] = obj.origin + if s.mvStates != nil && !slices.Contains(feeReceivers, addr) { + s.mvStates.RecordAccountWrite(addr, types.AccountSuicide) + } } // Note, we can't do this only at the end of a block because multiple // transactions within the same block might self destruct and then @@ -911,6 +979,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { delete(s.accountsOrigin, obj.address) // Clear out any previously updated account data (may be recreated via a resurrect) delete(s.storagesOrigin, obj.address) // Clear out any previously updated storage data (may be recreated via a resurrect) } else { + if s.mvStates != nil && !slices.Contains(feeReceivers, addr) { + obj.finaliseRWSet() + } obj.finalise(true) // Prefetch slots in the background } obj.created = false @@ -1632,6 +1703,96 @@ func (s *StateDB) GetSnap() snapshot.Snapshot { return s.snap } +func (s *StateDB) StartTxRecorder(isExcludeTx bool) { + if s.mvStates == nil { + return + } + if isExcludeTx { + rwSet := types.NewEmptyRWSet(s.txIndex).WithExcludedTxFlag() + if err := s.mvStates.FinaliseWithRWSet(rwSet); err != nil { + log.Error("MVStates SystemTx Finalise err", "err", err) + } + s.mvStates.RecordReadDone() + s.mvStates.RecordWriteDone() + return + } + s.mvStates.RecordNewTx(s.txIndex) +} + +func (s *StateDB) StopTxRecorder() { + if s.mvStates == nil { + return + } + s.mvStates.RecordReadDone() + s.mvStates.RecordWriteDone() +} + +func (s *StateDB) ResetMVStates(txCount int, feeReceivers []common.Address) *types.MVStates { + s.mvStates = types.NewMVStates(txCount, feeReceivers) + return s.mvStates +} + +func (s *StateDB) CheckFeeReceiversRWSet() { + if s.mvStates == nil { + return + } + if metrics.EnabledExpensive { + defer func(start time.Time) { + s.TxDAGGenerate += time.Since(start) + }(time.Now()) + } + s.mvStates.RecordReadDone() + feeReceivers := s.mvStates.FeeReceivers() + for _, addr := range feeReceivers { + if _, ok := s.stateObjectsDestructDirty[addr]; !ok { + continue + } + s.mvStates.RecordCannotDelayGasFee() + return + } + + for _, addr := range feeReceivers { + if _, ok := s.journal.dirties[addr]; !ok { + continue + } + s.mvStates.RecordCannotDelayGasFee() + return + } +} + +func (s *StateDB) getStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) { + if acc, ok := s.stateObjectsDestructDirty[addr]; ok { + return acc, ok + } + acc, ok := s.stateObjectsDestruct[addr] + return acc, ok +} + +func (s *StateDB) setStateObjectsDestruct(addr common.Address, acc *types.StateAccount) { + s.stateObjectsDestructDirty[addr] = acc +} + +func (s *StateDB) removeStateObjectsDestruct(addr common.Address) { + delete(s.stateObjectsDestructDirty, addr) +} + +func (s *StateDB) ResolveTxDAG(txCnt int, extraTxDeps ...types.TxDep) (types.TxDAG, error) { + if s.mvStates == nil { + return types.NewEmptyTxDAG(), nil + } + if metrics.EnabledExpensive { + defer func(start time.Time) { + s.TxDAGGenerate += time.Since(start) + }(time.Now()) + } + + return s.mvStates.ResolveTxDAG(txCnt, extraTxDeps...) +} + +func (s *StateDB) MVStates() *types.MVStates { + return s.mvStates +} + // copySet returns a deep-copied set. func copySet[k comparable](set map[k][]byte) map[k][]byte { copied := make(map[k][]byte, len(set)) diff --git a/core/state/statedb_fuzz_test.go b/core/state/statedb_fuzz_test.go index b416bcf1f3..fd02f4c0cc 100644 --- a/core/state/statedb_fuzz_test.go +++ b/core/state/statedb_fuzz_test.go @@ -265,7 +265,7 @@ func (test *stateTest) verifyAccountCreation(next common.Hash, db *triedb.Databa return err } if len(oBlob) != 0 { - return fmt.Errorf("unexpected account in old trie, %x", addrHash) + return fmt.Errorf("unexpected account in old trie, %v", addr) } if len(nBlob) == 0 { return fmt.Errorf("missing account in new trie, %x", addrHash) diff --git a/core/state_processor.go b/core/state_processor.go index c9df98536c..b4be7bb882 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -22,6 +22,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -90,6 +92,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb) } statedb.MarkFullProcessed() + if p.bc.enableTxDAG { + feeReceivers := []common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient} + statedb.ResetMVStates(len(block.Transactions()), feeReceivers).EnableAsyncGen() + } // Iterate over and process the individual transactions for i, tx := range block.Transactions() { start := time.Now() @@ -102,13 +108,15 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg if err != nil { return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } - receipts = append(receipts, receipt) allLogs = append(allLogs, receipt.Logs...) if metrics.EnabledExpensive { processTxTimer.UpdateSince(start) } } + if statedb.MVStates() != nil { + statedb.MVStates().BatchRecordHandle() + } // Fail if Shanghai not enabled and len(withdrawals) is non-zero. withdrawals := block.Withdrawals() if len(withdrawals) > 0 && !p.config.IsShanghai(block.Number(), block.Time()) { @@ -116,7 +124,22 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals) - + if p.bc.enableTxDAG { + defer func() { + statedb.MVStates().Stop() + }() + // compare input TxDAG when it enable in consensus + dag, err := statedb.ResolveTxDAG(len(block.Transactions())) + if err == nil { + // TODO(galaio): check TxDAG correctness? + log.Debug("Process TxDAG result", "block", block.NumberU64(), "tx", len(block.Transactions()), "txDAG", dag.TxCount()) + if metrics.EnabledExpensive { + go types.EvaluateTxDAGPerformance(dag) + } + } else { + log.Error("ResolveTxDAG err", "block", block.NumberU64(), "tx", len(block.Transactions()), "err", err) + } + } return receipts, allLogs, *usedGas, nil } @@ -124,6 +147,8 @@ func applyTransaction(msg *Message, config *params.ChainConfig, gp *GasPool, sta // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg) evm.Reset(txContext, statedb) + statedb.StartTxRecorder(tx.IsSystemTx() || tx.IsDepositTx()) + defer statedb.StopTxRecorder() nonce := tx.Nonce() if msg.IsDepositTx && config.IsOptimismRegolith(evm.Context.Time) { diff --git a/core/state_transition.go b/core/state_transition.go index a23a26468e..a5a8f184ae 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -534,6 +534,7 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { ReturnData: ret, }, nil } + // Note for deposit tx there is no ETH refunded for unused gas, but that's taken care of by the fact that gasPrice // is always 0 for deposit tx. So calling refundGas will ensure the gasUsed accounting is correct without actually // changing the sender's balance @@ -554,6 +555,10 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { ReturnData: ret, }, nil } + + // check fee receiver rwSet here + st.state.CheckFeeReceiversRWSet() + effectiveTip := msg.GasPrice if rules.IsLondon { effectiveTip = cmath.BigMin(msg.GasTipCap, new(big.Int).Sub(msg.GasFeeCap, st.evm.Context.BaseFee)) diff --git a/core/types/dag.go b/core/types/dag.go new file mode 100644 index 0000000000..1841332467 --- /dev/null +++ b/core/types/dag.go @@ -0,0 +1,403 @@ +package types + +import ( + "bytes" + "errors" + "fmt" + "strings" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" +) + +const TxDAGAbiJson = ` +[ + { + "type": "function", + "name": "setTxDAG", + "inputs": [ + { + "name": "data", + "type": "bytes", + "internalType": "bytes" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + } +] +` + +var TxDAGABI abi.ABI + +func init() { + var err error + // must be able to register the TxDAGABI + TxDAGABI, err = abi.JSON(strings.NewReader(TxDAGAbiJson)) + if err != nil { + panic(err) + } +} + +// TxDAGType Used to extend TxDAG and customize a new DAG structure +const ( + EmptyTxDAGType byte = iota + PlainTxDAGType +) + +var ( + // NonDependentRelFlag indicates that the txs described is non-dependent + // and is used to reduce storage when there are a large number of dependencies. + NonDependentRelFlag uint8 = 0x01 + // ExcludedTxFlag indicates that the tx is excluded from TxDAG, user should execute them in sequence. + // These excluded transactions should be consecutive in the head or tail. + ExcludedTxFlag uint8 = 0x02 + TxDepFlagMask = NonDependentRelFlag | ExcludedTxFlag +) + +type TxDAG interface { + // Type return TxDAG type + Type() byte + + // Inner return inner instance + Inner() interface{} + + // DelayGasFeeDistribution check if delay the distribution of GasFee + DelayGasFeeDistribution() bool + + // TxDep query TxDeps from TxDAG + TxDep(int) *TxDep + + // TxCount return tx count + TxCount() int + + // SetTxDep at the last one + SetTxDep(int, TxDep) error +} + +func DecodeTxDAGCalldata(data []byte) (TxDAG, error) { + // trim the method id before unpack + if len(data) < 4 { + return nil, fmt.Errorf("invalid txDAG calldata, len(data)=%d", len(data)) + } + calldata, err := TxDAGABI.Methods["setTxDAG"].Inputs.Unpack(data[4:]) + if err != nil { + return nil, fmt.Errorf("failed to call abi unpack, err: %v", err) + } + if len(calldata) <= 0 { + return nil, fmt.Errorf("invalid txDAG calldata, len(calldata)=%d", len(calldata)) + } + data, ok := calldata[0].([]byte) + if !ok { + return nil, fmt.Errorf("invalid txDAG calldata parameter") + } + return DecodeTxDAG(data) +} + +func EncodeTxDAGCalldata(dag TxDAG) ([]byte, error) { + data, err := EncodeTxDAG(dag) + if err != nil { + return nil, fmt.Errorf("failed to encode txDAG, err: %v", err) + } + data, err = TxDAGABI.Pack("setTxDAG", data) + if err != nil { + return nil, fmt.Errorf("failed to call abi pack, err: %v", err) + } + return data, nil +} + +func EncodeTxDAG(dag TxDAG) ([]byte, error) { + if dag == nil { + return nil, errors.New("input nil TxDAG") + } + var buf bytes.Buffer + buf.WriteByte(dag.Type()) + if err := rlp.Encode(&buf, dag.Inner()); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func DecodeTxDAG(enc []byte) (TxDAG, error) { + if len(enc) <= 1 { + return nil, errors.New("too short TxDAG bytes") + } + + switch enc[0] { + case EmptyTxDAGType: + return NewEmptyTxDAG(), nil + case PlainTxDAGType: + dag := new(PlainTxDAG) + if err := rlp.DecodeBytes(enc[1:], dag); err != nil { + return nil, err + } + return dag, nil + default: + return nil, errors.New("unsupported TxDAG bytes") + } +} + +func ValidateTxDAG(d TxDAG, txCnt int) error { + if d == nil { + return nil + } + + switch d.Type() { + case EmptyTxDAGType: + return nil + case PlainTxDAGType: + return ValidatePlainTxDAG(d, txCnt) + default: + return fmt.Errorf("unsupported TxDAG type: %v", d.Type()) + } +} + +func ValidatePlainTxDAG(d TxDAG, txCnt int) error { + if d.TxCount() != txCnt { + return fmt.Errorf("PlainTxDAG contains wrong txs count, expect: %v, actual: %v", txCnt, d.TxCount()) + } + for i := 0; i < txCnt; i++ { + dep := d.TxDep(i) + if dep == nil { + return fmt.Errorf("PlainTxDAG contains nil txdep, tx: %v", i) + } + for j, tx := range dep.TxIndexes { + if tx >= uint64(i) || tx >= uint64(txCnt) { + return fmt.Errorf("PlainTxDAG contains the exceed range dependency, tx: %v", i) + } + if j > 0 && dep.TxIndexes[j] <= dep.TxIndexes[j-1] { + return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i) + } + } + if dep.Flags != nil && *dep.Flags & ^TxDepFlagMask > 0 { + return fmt.Errorf("PlainTxDAG contains unknown flags, flags: %v", *dep.Flags) + } + } + return nil +} + +// GetTxDAG return TxDAG bytes from block if there is any, or return nil if not exist +// the txDAG is stored in the calldata of the last transaction of the block +func GetTxDAG(block *Block) (TxDAG, error) { + txs := block.Transactions() + if txs.Len() <= 0 { + return nil, fmt.Errorf("no txdag found") + } + // get data from the last tx + return DecodeTxDAGCalldata(txs[txs.Len()-1].Data()) +} + +func TxDependency(d TxDAG, i int) []uint64 { + if d == nil || i < 0 || i >= d.TxCount() { + return []uint64{} + } + dep := d.TxDep(i) + if dep.CheckFlag(ExcludedTxFlag) { + return []uint64{} + } + if dep.CheckFlag(NonDependentRelFlag) { + txs := make([]uint64, 0, d.TxCount()-dep.Count()) + for j := 0; j < i; j++ { + if !dep.Exist(j) && j != i { + txs = append(txs, uint64(j)) + } + } + return txs + } + return dep.TxIndexes +} + +// EmptyTxDAG indicate that execute txs in sequence +// It means no transactions or need timely distribute transaction fees +// it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence +type EmptyTxDAG struct { +} + +func NewEmptyTxDAG() TxDAG { + return &EmptyTxDAG{} +} + +func (d *EmptyTxDAG) Type() byte { + return EmptyTxDAGType +} + +func (d *EmptyTxDAG) Inner() interface{} { + return d +} + +func (d *EmptyTxDAG) DelayGasFeeDistribution() bool { + return false +} + +func (d *EmptyTxDAG) TxDep(int) *TxDep { + dep := TxDep{ + TxIndexes: nil, + Flags: new(uint8), + } + dep.SetFlag(NonDependentRelFlag) + return &dep +} + +func (d *EmptyTxDAG) TxCount() int { + return 0 +} + +func (d *EmptyTxDAG) SetTxDep(int, TxDep) error { + return nil +} + +func (d *EmptyTxDAG) String() string { + return "EmptyTxDAG" +} + +// PlainTxDAG indicate how to use the dependency of txs, and delay the distribution of GasFee +// +//go:generate go run ../../rlp/rlpgen -type PlainTxDAG -out gen_plaintxdag_rlp.go +type PlainTxDAG struct { + // Tx Dependency List, the list index is equal to TxIndex + TxDeps []TxDep +} + +func (d *PlainTxDAG) Type() byte { + return PlainTxDAGType +} + +func (d *PlainTxDAG) Inner() interface{} { + return d +} + +func (d *PlainTxDAG) DelayGasFeeDistribution() bool { + return true +} + +func (d *PlainTxDAG) TxDep(i int) *TxDep { + return &d.TxDeps[i] +} + +func (d *PlainTxDAG) TxCount() int { + return len(d.TxDeps) +} + +func (d *PlainTxDAG) SetTxDep(i int, dep TxDep) error { + if i < 0 || i > len(d.TxDeps) { + return fmt.Errorf("SetTxDep with wrong index: %d", i) + } + if i < len(d.TxDeps) { + d.TxDeps[i] = dep + return nil + } + d.TxDeps = append(d.TxDeps, dep) + return nil +} + +func NewPlainTxDAG(txLen int) *PlainTxDAG { + return &PlainTxDAG{ + TxDeps: make([]TxDep, txLen), + } +} + +func (d *PlainTxDAG) String() string { + builder := strings.Builder{} + for _, txDep := range d.TxDeps { + if txDep.Flags != nil { + builder.WriteString(fmt.Sprintf("%v|%v\n", txDep.TxIndexes, *txDep.Flags)) + continue + } + builder.WriteString(fmt.Sprintf("%v\n", txDep.TxIndexes)) + } + return builder.String() +} + +func (d *PlainTxDAG) Size() int { + enc, err := EncodeTxDAG(d) + if err != nil { + return 0 + } + return len(enc) +} + +// TxDep store the current tx dependency relation with other txs +type TxDep struct { + TxIndexes []uint64 + // Flags may has multi flag meaning, ref NonDependentRelFlag, ExcludedTxFlag. + Flags *uint8 `rlp:"optional"` +} + +func NewTxDep(indexes []uint64, flags ...uint8) TxDep { + dep := TxDep{ + TxIndexes: indexes, + } + if len(flags) == 0 { + return dep + } + dep.Flags = new(uint8) + for _, flag := range flags { + dep.SetFlag(flag) + } + return dep +} + +func (d *TxDep) AppendDep(i int) { + d.TxIndexes = append(d.TxIndexes, uint64(i)) +} + +func (d *TxDep) Exist(i int) bool { + for _, index := range d.TxIndexes { + if index == uint64(i) { + return true + } + } + + return false +} + +func (d *TxDep) Count() int { + return len(d.TxIndexes) +} + +func (d *TxDep) Last() int { + if d.Count() == 0 { + return -1 + } + return int(d.TxIndexes[len(d.TxIndexes)-1]) +} + +func (d *TxDep) CheckFlag(flag uint8) bool { + var flags uint8 + if d.Flags != nil { + flags = *d.Flags + } + return flags&flag == flag +} + +func (d *TxDep) SetFlag(flag uint8) { + if d.Flags == nil { + d.Flags = new(uint8) + } + *d.Flags |= flag +} + +func (d *TxDep) ClearFlag(flag uint8) { + if d.Flags == nil { + return + } + *d.Flags &= ^flag +} + +var ( + totalTxMeter = metrics.NewRegisteredMeter("dag/txcnt", nil) + totalNoDepMeter = metrics.NewRegisteredMeter("dag/nodepcnt", nil) +) + +func EvaluateTxDAGPerformance(dag TxDAG) { + if dag.TxCount() == 0 { + return + } + totalTxMeter.Mark(int64(dag.TxCount())) + for i := 0; i < dag.TxCount(); i++ { + if len(TxDependency(dag, i)) == 0 { + totalNoDepMeter.Mark(1) + } + } +} diff --git a/core/types/dag_test.go b/core/types/dag_test.go new file mode 100644 index 0000000000..b4acf1a1e3 --- /dev/null +++ b/core/types/dag_test.go @@ -0,0 +1,240 @@ +package types + +import ( + "encoding/hex" + "testing" + + "github.com/golang/snappy" + + "github.com/cometbft/cometbft/libs/rand" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEncodeTxDAGCalldata(t *testing.T) { + tg := mockSimpleDAG() + data, err := EncodeTxDAGCalldata(tg) + assert.Equal(t, nil, err) + tg, err = DecodeTxDAGCalldata(data) + assert.Equal(t, nil, err) + assert.Equal(t, true, tg.TxCount() > 0) + + _, err = DecodeTxDAGCalldata(nil) + assert.NotEqual(t, nil, err) +} + +func TestTxDAG_SetTxDep(t *testing.T) { + dag := mockSimpleDAG() + require.NoError(t, dag.SetTxDep(9, NewTxDep(nil, NonDependentRelFlag))) + require.NoError(t, dag.SetTxDep(10, NewTxDep(nil, NonDependentRelFlag))) + require.Error(t, dag.SetTxDep(12, NewTxDep(nil, NonDependentRelFlag))) + dag = NewEmptyTxDAG() + require.NoError(t, dag.SetTxDep(0, NewTxDep(nil, NonDependentRelFlag))) + require.NoError(t, dag.SetTxDep(11, NewTxDep(nil, NonDependentRelFlag))) +} + +func TestTxDAG(t *testing.T) { + dag := mockSimpleDAG() + t.Log(dag) + dag = mockSystemTxDAG() + t.Log(dag) +} + +func TestEvaluateTxDAG(t *testing.T) { + dag := mockSystemTxDAG() + EvaluateTxDAGPerformance(dag) +} + +func TestTxDAG_Compression(t *testing.T) { + dag := mockRandomDAG(10000) + enc, err := EncodeTxDAG(dag) + require.NoError(t, err) + encoded := snappy.Encode(nil, enc) + t.Log("enc", len(enc), "compressed", len(encoded), "ratio", 1-(float64(len(encoded))/float64(len(enc)))) +} + +func BenchmarkTxDAG_Encode(b *testing.B) { + dag := mockRandomDAG(10000) + for i := 0; i < b.N; i++ { + EncodeTxDAG(dag) + } +} + +func BenchmarkTxDAG_Decode(b *testing.B) { + dag := mockRandomDAG(10000) + enc, _ := EncodeTxDAG(dag) + for i := 0; i < b.N; i++ { + DecodeTxDAG(enc) + } +} + +func mockSimpleDAG() TxDAG { + dag := NewPlainTxDAG(10) + dag.TxDeps[0].TxIndexes = []uint64{} + dag.TxDeps[1].TxIndexes = []uint64{} + dag.TxDeps[2].TxIndexes = []uint64{} + dag.TxDeps[3].TxIndexes = []uint64{0} + dag.TxDeps[4].TxIndexes = []uint64{0} + dag.TxDeps[5].TxIndexes = []uint64{1, 2} + dag.TxDeps[6].TxIndexes = []uint64{5} + dag.TxDeps[7].TxIndexes = []uint64{6} + dag.TxDeps[8].TxIndexes = []uint64{} + dag.TxDeps[9].TxIndexes = []uint64{8} + return dag +} + +func mockRandomDAG(txLen int) TxDAG { + dag := NewPlainTxDAG(txLen) + for i := 0; i < txLen; i++ { + deps := make([]uint64, 0) + if i == 0 || rand.Bool() { + dag.TxDeps[i].TxIndexes = deps + continue + } + depCnt := rand.Int()%i + 1 + for j := 0; j < depCnt; j++ { + var dep uint64 + if j > 0 && deps[j-1]+1 == uint64(i) { + break + } + if j > 0 { + dep = uint64(rand.Int())%(uint64(i)-deps[j-1]-1) + deps[j-1] + 1 + } else { + dep = uint64(rand.Int() % i) + } + deps = append(deps, dep) + } + dag.TxDeps[i].TxIndexes = deps + } + return dag +} + +func mockSystemTxDAG() TxDAG { + dag := NewPlainTxDAG(12) + dag.TxDeps[0].TxIndexes = []uint64{} + dag.TxDeps[1].TxIndexes = []uint64{} + dag.TxDeps[2].TxIndexes = []uint64{} + dag.TxDeps[3].TxIndexes = []uint64{0} + dag.TxDeps[4].TxIndexes = []uint64{0} + dag.TxDeps[5].TxIndexes = []uint64{1, 2} + dag.TxDeps[6].TxIndexes = []uint64{5} + dag.TxDeps[7].TxIndexes = []uint64{6} + dag.TxDeps[8].TxIndexes = []uint64{} + dag.TxDeps[9].TxIndexes = []uint64{8} + dag.TxDeps[10] = NewTxDep([]uint64{}, ExcludedTxFlag) + dag.TxDeps[11] = NewTxDep([]uint64{}, ExcludedTxFlag) + return dag +} + +func mockSystemTxDAG2() TxDAG { + dag := NewPlainTxDAG(12) + dag.TxDeps[0] = NewTxDep([]uint64{}) + dag.TxDeps[1] = NewTxDep([]uint64{}) + dag.TxDeps[2] = NewTxDep([]uint64{}) + dag.TxDeps[3] = NewTxDep([]uint64{0}) + dag.TxDeps[4] = NewTxDep([]uint64{0}) + dag.TxDeps[5] = NewTxDep([]uint64{1, 2}) + dag.TxDeps[6] = NewTxDep([]uint64{5}) + dag.TxDeps[7] = NewTxDep([]uint64{6}) + dag.TxDeps[8] = NewTxDep([]uint64{}) + dag.TxDeps[9] = NewTxDep([]uint64{8}) + dag.TxDeps[10] = NewTxDep([]uint64{}, NonDependentRelFlag) + dag.TxDeps[11] = NewTxDep([]uint64{}, NonDependentRelFlag) + return dag +} + +func mockSystemTxDAGWithLargeDeps() TxDAG { + dag := NewPlainTxDAG(12) + dag.TxDeps[0].TxIndexes = []uint64{} + dag.TxDeps[1].TxIndexes = []uint64{} + dag.TxDeps[2].TxIndexes = []uint64{} + dag.TxDeps[3].TxIndexes = []uint64{0} + dag.TxDeps[4].TxIndexes = []uint64{0} + dag.TxDeps[5].TxIndexes = []uint64{1, 2} + dag.TxDeps[6].TxIndexes = []uint64{5} + dag.TxDeps[7].TxIndexes = []uint64{3} + dag.TxDeps[8].TxIndexes = []uint64{} + //dag.TxDeps[9].TxIndexes = []uint64{0, 1, 2, 6, 7, 8} + dag.TxDeps[9] = NewTxDep([]uint64{3, 4, 5}, NonDependentRelFlag) + dag.TxDeps[10] = NewTxDep([]uint64{}, ExcludedTxFlag) + dag.TxDeps[11] = NewTxDep([]uint64{}, ExcludedTxFlag) + return dag +} + +func TestTxDAG_Encode_Decode(t *testing.T) { + tests := []struct { + expect TxDAG + }{ + { + expect: TxDAG(&EmptyTxDAG{}), + }, + { + expect: mockSimpleDAG(), + }, + { + expect: mockRandomDAG(100), + }, + { + expect: mockSystemTxDAG(), + }, + { + expect: mockSystemTxDAG2(), + }, + { + expect: mockSystemTxDAGWithLargeDeps(), + }, + } + for i, item := range tests { + enc, err := EncodeTxDAG(item.expect) + t.Log(hex.EncodeToString(enc)) + require.NoError(t, err, i) + actual, err := DecodeTxDAG(enc) + require.NoError(t, err, i) + require.Equal(t, item.expect, actual, i) + if i%2 == 0 { + enc[0] = 2 + _, err = DecodeTxDAG(enc) + require.Error(t, err) + } + } +} + +func TestDecodeTxDAG(t *testing.T) { + tests := []struct { + enc string + err bool + }{ + {"00c0", false}, + {"01dddcc1c0c1c0c1c0c2c180c2c180c3c20102c3c20205c2c106c1c0c2c108", false}, + {"01e3e2c1c0c1c0c1c0c2c180c2c180c3c20102c3c20205c2c106c1c0c2c108c2c001c2c001", false}, + {"0132e212", true}, + {"01dfdec280c0c280c0c380c101c380c102c380c103c380c104c380c105c380c106", true}, + {"01cdccc280c0c280c0c280c0c280c0", true}, + } + for i, item := range tests { + enc, err := hex.DecodeString(item.enc) + require.NoError(t, err, i) + txDAG, err := DecodeTxDAG(enc) + if item.err { + require.Error(t, err, i) + continue + } + require.NoError(t, err, i) + t.Log(txDAG) + } +} + +func TestTxDep_Flags(t *testing.T) { + dep := NewTxDep(nil) + dep.ClearFlag(NonDependentRelFlag) + dep.SetFlag(NonDependentRelFlag) + dep.SetFlag(ExcludedTxFlag) + compared := NewTxDep(nil, NonDependentRelFlag, ExcludedTxFlag) + require.Equal(t, dep, compared) + require.Equal(t, NonDependentRelFlag|ExcludedTxFlag, *dep.Flags) + dep.ClearFlag(ExcludedTxFlag) + require.Equal(t, NonDependentRelFlag, *dep.Flags) + require.True(t, dep.CheckFlag(NonDependentRelFlag)) + require.False(t, dep.CheckFlag(ExcludedTxFlag)) +} diff --git a/core/types/gen_plaintxdag_rlp.go b/core/types/gen_plaintxdag_rlp.go new file mode 100644 index 0000000000..9e3ea46683 --- /dev/null +++ b/core/types/gen_plaintxdag_rlp.go @@ -0,0 +1,32 @@ +// Code generated by rlpgen. DO NOT EDIT. + +package types + +import "github.com/ethereum/go-ethereum/rlp" +import "io" + +func (obj *PlainTxDAG) EncodeRLP(_w io.Writer) error { + w := rlp.NewEncoderBuffer(_w) + _tmp0 := w.List() + _tmp1 := w.List() + for _, _tmp2 := range obj.TxDeps { + _tmp3 := w.List() + _tmp4 := w.List() + for _, _tmp5 := range _tmp2.TxIndexes { + w.WriteUint64(_tmp5) + } + w.ListEnd(_tmp4) + _tmp6 := _tmp2.Flags != nil + if _tmp6 { + if _tmp2.Flags == nil { + w.Write([]byte{0x80}) + } else { + w.WriteUint64(uint64((*_tmp2.Flags))) + } + } + w.ListEnd(_tmp3) + } + w.ListEnd(_tmp1) + w.ListEnd(_tmp0) + return w.Flush() +} diff --git a/core/types/mvstates.go b/core/types/mvstates.go new file mode 100644 index 0000000000..e37e53cc3f --- /dev/null +++ b/core/types/mvstates.go @@ -0,0 +1,1098 @@ +package types + +import ( + "fmt" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "golang.org/x/exp/slices" +) + +type AccountState byte + +var ( + AccountSelf AccountState = 0x01 + AccountNonce AccountState = 0x02 + AccountBalance AccountState = 0x04 + AccountCodeHash AccountState = 0x08 + AccountSuicide AccountState = 0x10 +) + +const ( + initSyncPoolSize = 4 + asyncSendInterval = 20 +) + +func init() { + for i := 0; i < initSyncPoolSize*4; i++ { + cache := make([]RWEventItem, 400) + rwEventCachePool.Put(&cache) + } +} + +type ChanPool struct { + ch chan any + new func() any +} + +func NewChanPool(size int, f func() any) *ChanPool { + return &ChanPool{ + ch: make(chan any, size), + new: f, + } +} + +func (p ChanPool) Get() any { + select { + case item := <-p.ch: + return item + default: + } + return p.new() +} + +func (p ChanPool) Put(item any) { + select { + case p.ch <- item: + default: + } +} + +// RWSet record all read & write set in txs +// Attention: this is not a concurrent safety structure +type RWSet struct { + index int + accReadSet map[common.Address]map[AccountState]struct{} + accWriteSet map[common.Address]map[AccountState]struct{} + slotReadSet map[common.Address]map[common.Hash]struct{} + slotWriteSet map[common.Address]map[common.Hash]struct{} + + // some flags + excludedTx bool + cannotGasFeeDelay bool +} + +func NewRWSet(index int) *RWSet { + return &RWSet{ + index: index, + accReadSet: make(map[common.Address]map[AccountState]struct{}), + accWriteSet: make(map[common.Address]map[AccountState]struct{}), + slotReadSet: make(map[common.Address]map[common.Hash]struct{}), + slotWriteSet: make(map[common.Address]map[common.Hash]struct{}), + } +} + +func NewEmptyRWSet(index int) *RWSet { + return &RWSet{ + index: index, + } +} + +func (s *RWSet) RecordAccountRead(addr common.Address, state AccountState) { + // only record the first read version + sub, ok := s.accReadSet[addr] + if !ok { + s.accReadSet[addr] = make(map[AccountState]struct{}) + s.accReadSet[addr][AccountSelf] = struct{}{} + s.accReadSet[addr][state] = struct{}{} + return + } + if _, ok = sub[state]; ok { + return + } + s.accReadSet[addr][state] = struct{}{} +} + +func (s *RWSet) RecordStorageRead(addr common.Address, slot common.Hash) { + // only record the first read version + sub, ok := s.slotReadSet[addr] + if !ok { + s.slotReadSet[addr] = make(map[common.Hash]struct{}) + s.slotReadSet[addr][slot] = struct{}{} + return + } + if _, ok = sub[slot]; ok { + return + } + s.slotReadSet[addr][slot] = struct{}{} +} + +func (s *RWSet) RecordAccountWrite(addr common.Address, state AccountState) { + _, ok := s.accWriteSet[addr] + if !ok { + s.accWriteSet[addr] = make(map[AccountState]struct{}) + } + s.accWriteSet[addr][state] = struct{}{} +} + +func (s *RWSet) RecordStorageWrite(addr common.Address, slot common.Hash) { + _, ok := s.slotWriteSet[addr] + if !ok { + s.slotWriteSet[addr] = make(map[common.Hash]struct{}) + } + s.slotWriteSet[addr][slot] = struct{}{} +} + +func (s *RWSet) ReadSet() (map[common.Address]map[AccountState]struct{}, map[common.Address]map[common.Hash]struct{}) { + return s.accReadSet, s.slotReadSet +} + +func (s *RWSet) WriteSet() (map[common.Address]map[AccountState]struct{}, map[common.Address]map[common.Hash]struct{}) { + return s.accWriteSet, s.slotWriteSet +} + +func (s *RWSet) WithExcludedTxFlag() *RWSet { + s.excludedTx = true + return s +} + +func (s *RWSet) String() string { + builder := strings.Builder{} + builder.WriteString(fmt.Sprintf("{tx: %v", s.index)) + builder.WriteString(", accReadSet: [") + i := 0 + for addr, sub := range s.accReadSet { + if i > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("{addr: \"%v\", states: [", addr)) + j := 0 + for key := range sub { + if j > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("%v", key)) + j++ + } + i++ + builder.WriteString("]}") + } + builder.WriteString("], slotReadSet: [") + i = 0 + for addr, sub := range s.slotReadSet { + if i > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("{addr: \"%v\", slots: [", addr)) + j := 0 + for key := range sub { + if j > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("\"%v\"", key.String())) + j++ + } + i++ + builder.WriteString("]}") + } + builder.WriteString("], accWriteSet: [") + i = 0 + for addr, sub := range s.accWriteSet { + if i > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("{addr: \"%v\", states: [", addr)) + j := 0 + for key := range sub { + if j > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("%v", key)) + j++ + } + i++ + builder.WriteString("]}") + } + builder.WriteString("], slotWriteSet: [") + i = 0 + for addr, sub := range s.slotWriteSet { + if i > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("{addr: \"%v\", slots: [", addr)) + j := 0 + for key := range sub { + if j > 0 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("\"%v\"", key.String())) + j++ + } + i++ + builder.WriteString("]}") + } + builder.WriteString("]}") + return builder.String() +} + +const ( + NewTxRWEvent byte = iota + ReadAccRWEvent + WriteAccRWEvent + ReadSlotRWEvent + WriteSlotRWEvent + CannotGasFeeDelayRWEvent +) + +type RWEventItem struct { + Event byte + Index int + Addr common.Address + State AccountState + Slot common.Hash +} + +func (e RWEventItem) String() string { + switch e.Event { + case NewTxRWEvent: + return fmt.Sprintf("(%v)%v", e.Event, e.Index) + case ReadAccRWEvent: + return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.State) + case WriteAccRWEvent: + return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.State) + case ReadSlotRWEvent: + return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.Slot) + case WriteSlotRWEvent: + return fmt.Sprintf("(%v)%v|%v", e.Event, e.Addr, e.Slot) + case CannotGasFeeDelayRWEvent: + return fmt.Sprintf("(%v)", e.Event) + } + return "Unknown" +} + +type RWTxList struct { + list []int +} + +func NewRWTxList() *RWTxList { + return &RWTxList{ + list: make([]int, 0), + } +} + +func (w *RWTxList) Append(pw int) { + if i, found := w.SearchTxIndex(pw); found { + w.list[i] = pw + return + } + + w.list = append(w.list, pw) + for i := len(w.list) - 1; i > 0; i-- { + if w.list[i] > w.list[i-1] { + break + } + w.list[i-1], w.list[i] = w.list[i], w.list[i-1] + } +} + +func (w *RWTxList) SearchTxIndex(txIndex int) (int, bool) { + n := len(w.list) + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) + // i ≤ h < j + if w.list[h] < txIndex { + i = h + 1 + } else { + j = h + } + } + return i, i < n && w.list[i] == txIndex +} + +func (w *RWTxList) FindLastTx(txIndex int) int { + var i, _ = w.SearchTxIndex(txIndex) + for j := i - 1; j >= 0; j-- { + if w.list[j] < txIndex { + return w.list[j] + } + } + + return -1 +} + +func (w *RWTxList) FindPrevTxs(txIndex int) []int { + var i, _ = w.SearchTxIndex(txIndex) + for j := i - 1; j >= 0; j-- { + if w.list[j] < txIndex { + return w.list[:j+1] + } + } + + return nil +} + +func (w *RWTxList) Copy() *RWTxList { + np := &RWTxList{} + for i, item := range w.list { + np.list[i] = item + } + return np +} + +var ( + rwEventsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/rwevents/cnt", nil) + rwEventsAllocGauge = metrics.GetOrRegisterGauge("mvstate/alloc/rwevents/gauge", nil) +) + +var ( + rwEventCachePool = NewChanPool(initSyncPoolSize*4, func() any { + rwEventsAllocMeter.Mark(1) + buf := make([]RWEventItem, 0) + return &buf + }) +) + +type MVStates struct { + rwSets []RWSet + accWriteSet map[common.Address]map[AccountState]*RWTxList + slotWriteSet map[common.Address]map[common.Hash]*RWTxList + accReadSet map[common.Address]map[AccountState]*RWTxList + slotReadSet map[common.Address]map[common.Hash]*RWTxList + nextFinaliseIndex int + gasFeeReceivers []common.Address + // dependency map cache for generating TxDAG + // depMapCache[i].exist(j) means j->i, and i > j + txDepCache []TxDep + lock sync.RWMutex + + // async rw event recorder + // these fields are only used in one routine + asyncRWSet RWSet + rwEventCh chan []RWEventItem + rwEventCache []RWEventItem + rwEventCacheIndex int + recordingRead bool + recordingWrite bool + asyncRunning bool + asyncWG sync.WaitGroup +} + +func NewMVStates(txCount int, gasFeeReceivers []common.Address) *MVStates { + s := &MVStates{ + accWriteSet: make(map[common.Address]map[AccountState]*RWTxList, txCount), + slotWriteSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount), + accReadSet: make(map[common.Address]map[AccountState]*RWTxList, txCount), + slotReadSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount), + rwEventCh: make(chan []RWEventItem, 100), + gasFeeReceivers: gasFeeReceivers, + } + return s +} + +func (s *MVStates) EnableAsyncGen() *MVStates { + s.asyncWG.Add(1) + s.asyncRunning = true + s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem) + s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)] + s.rwEventCacheIndex = 0 + s.asyncRWSet.index = -1 + go s.asyncRWEventLoop() + return s +} + +func (s *MVStates) Stop() { + s.stopAsyncRecorder() +} + +func (s *MVStates) Copy() *MVStates { + s.lock.Lock() + defer s.lock.Unlock() + ns := NewMVStates(len(s.rwSets), s.gasFeeReceivers) + ns.nextFinaliseIndex = s.nextFinaliseIndex + ns.txDepCache = append(ns.txDepCache, s.txDepCache...) + ns.rwSets = append(ns.rwSets, s.rwSets...) + for addr, sub := range s.accWriteSet { + for state, writes := range sub { + if _, ok := ns.accWriteSet[addr]; !ok { + ns.accWriteSet[addr] = make(map[AccountState]*RWTxList) + } + ns.accWriteSet[addr][state] = writes.Copy() + } + } + for addr, sub := range s.accReadSet { + for state, reads := range sub { + if _, ok := ns.accReadSet[addr]; !ok { + ns.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + ns.accReadSet[addr][state] = reads.Copy() + } + } + for addr, sub := range s.slotWriteSet { + for slot, writes := range sub { + if _, ok := ns.slotWriteSet[addr]; !ok { + ns.slotWriteSet[addr] = make(map[common.Hash]*RWTxList) + } + ns.slotWriteSet[addr][slot] = writes.Copy() + } + } + for addr, sub := range s.slotReadSet { + for slot, reads := range sub { + if _, ok := ns.slotReadSet[addr]; !ok { + ns.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + ns.slotReadSet[addr][slot] = reads.Copy() + } + } + return ns +} + +func (s *MVStates) asyncRWEventLoop() { + defer s.asyncWG.Done() + for { + select { + case item, ok := <-s.rwEventCh: + if !ok { + return + } + s.handleRWEvents(item) + rwEventCachePool.Put(&item) + } + } +} + +func (s *MVStates) handleRWEvents(items []RWEventItem) { + readFrom, readTo := -1, -1 + writeFrom, writeTo := -1, -1 + recordNewTx := false + for i, item := range items { + // init next RWSet, and finalise previous RWSet + if item.Event == NewTxRWEvent { + // handle previous rw set + if recordNewTx { + var prevReadItems []RWEventItem + if readFrom >= 0 && readTo > readFrom { + prevReadItems = items[readFrom:readTo] + } + var prevWriteItems []RWEventItem + if writeFrom >= 0 && writeTo > writeFrom { + prevWriteItems = items[writeFrom:writeTo] + } + s.finalisePreviousRWSet(prevReadItems, prevWriteItems) + readFrom, readTo = -1, -1 + writeFrom, writeTo = -1, -1 + } + recordNewTx = true + s.asyncRWSet = RWSet{ + index: item.Index, + } + continue + } + if s.asyncRWSet.index < 0 { + continue + } + switch item.Event { + // recorde current read/write event + case ReadAccRWEvent: + if readFrom < 0 { + readFrom = i + } + readTo = i + 1 + case ReadSlotRWEvent: + if readFrom < 0 { + readFrom = i + } + readTo = i + 1 + case WriteAccRWEvent: + if writeFrom < 0 { + writeFrom = i + } + writeTo = i + 1 + case WriteSlotRWEvent: + if writeFrom < 0 { + writeFrom = i + } + writeTo = i + 1 + // recorde current as cannot gas fee delay + case CannotGasFeeDelayRWEvent: + s.asyncRWSet.cannotGasFeeDelay = true + } + } + // handle last tx rw set + if recordNewTx { + var prevReadItems []RWEventItem + if readFrom >= 0 && readTo > readFrom { + prevReadItems = items[readFrom:readTo] + } + var prevWriteItems []RWEventItem + if writeFrom >= 0 && writeTo > writeFrom { + prevWriteItems = items[writeFrom:writeTo] + } + s.finalisePreviousRWSet(prevReadItems, prevWriteItems) + } +} + +func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem, writes []RWEventItem) { + if s.asyncRWSet.index < 0 { + return + } + index := s.asyncRWSet.index + for index >= len(s.rwSets) { + s.rwSets = append(s.rwSets, RWSet{index: -1}) + } + s.rwSets[index] = s.asyncRWSet + + for _, item := range writes { + if item.Event == WriteAccRWEvent { + s.finaliseAccWrite(index, item.Addr, item.State) + } else if item.Event == WriteSlotRWEvent { + s.finaliseSlotWrite(index, item.Addr, item.Slot) + } + } + + for _, item := range reads { + if item.Event == ReadAccRWEvent { + accWrites := s.queryAccWrites(item.Addr, item.State) + if accWrites != nil { + if _, ok := accWrites.SearchTxIndex(index); ok { + continue + } + } + s.finaliseAccRead(index, item.Addr, item.State) + } else if item.Event == ReadSlotRWEvent { + slotWrites := s.querySlotWrites(item.Addr, item.Slot) + if slotWrites != nil { + if _, ok := slotWrites.SearchTxIndex(index); ok { + continue + } + } + s.finaliseSlotRead(index, item.Addr, item.Slot) + } + } + + if index > s.nextFinaliseIndex { + log.Error("finalise in wrong order", "next", s.nextFinaliseIndex, "input", index) + return + } + // reset nextFinaliseIndex to index+1, it may revert to previous txs + s.nextFinaliseIndex = index + 1 + s.resolveDepsMapCacheByWrites(index, reads, writes) +} + +func (s *MVStates) RecordNewTx(index int) { + if !s.asyncRunning { + return + } + if index%2000 == 0 { + rwEventsAllocGauge.Update(int64(len(rwEventCachePool.ch))) + } + if index%asyncSendInterval == 0 { + s.BatchRecordHandle() + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = NewTxRWEvent + s.rwEventCache[s.rwEventCacheIndex].Index = index + } else { + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: NewTxRWEvent, + Index: index, + }) + } + s.rwEventCacheIndex++ + s.recordingRead = true + s.recordingWrite = true +} + +func (s *MVStates) RecordReadDone() { + s.recordingRead = false +} + +func (s *MVStates) RecordWriteDone() { + s.recordingWrite = false +} + +func (s *MVStates) RecordAccountRead(addr common.Address, state AccountState) { + if !s.asyncRunning || !s.recordingRead { + return + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = ReadAccRWEvent + s.rwEventCache[s.rwEventCacheIndex].Addr = addr + s.rwEventCache[s.rwEventCacheIndex].State = state + s.rwEventCacheIndex++ + return + } + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: ReadAccRWEvent, + Addr: addr, + State: state, + }) + s.rwEventCacheIndex++ +} + +func (s *MVStates) RecordStorageRead(addr common.Address, slot common.Hash) { + if !s.asyncRunning || !s.recordingRead { + return + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = ReadSlotRWEvent + s.rwEventCache[s.rwEventCacheIndex].Addr = addr + s.rwEventCache[s.rwEventCacheIndex].Slot = slot + s.rwEventCacheIndex++ + return + } + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: ReadSlotRWEvent, + Addr: addr, + Slot: slot, + }) + s.rwEventCacheIndex++ +} + +func (s *MVStates) RecordAccountWrite(addr common.Address, state AccountState) { + if !s.asyncRunning || !s.recordingWrite { + return + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = WriteAccRWEvent + s.rwEventCache[s.rwEventCacheIndex].Addr = addr + s.rwEventCache[s.rwEventCacheIndex].State = state + s.rwEventCacheIndex++ + return + } + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: WriteAccRWEvent, + Addr: addr, + State: state, + }) + s.rwEventCacheIndex++ +} + +func (s *MVStates) RecordStorageWrite(addr common.Address, slot common.Hash) { + if !s.asyncRunning || !s.recordingWrite { + return + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = WriteSlotRWEvent + s.rwEventCache[s.rwEventCacheIndex].Addr = addr + s.rwEventCache[s.rwEventCacheIndex].Slot = slot + s.rwEventCacheIndex++ + return + } + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: WriteSlotRWEvent, + Addr: addr, + Slot: slot, + }) + s.rwEventCacheIndex++ +} + +func (s *MVStates) RecordCannotDelayGasFee() { + if !s.asyncRunning || !s.recordingWrite { + return + } + if s.rwEventCacheIndex < len(s.rwEventCache) { + s.rwEventCache[s.rwEventCacheIndex].Event = CannotGasFeeDelayRWEvent + s.rwEventCacheIndex++ + return + } + s.rwEventCache = append(s.rwEventCache, RWEventItem{ + Event: CannotGasFeeDelayRWEvent, + }) + s.rwEventCacheIndex++ +} + +func (s *MVStates) BatchRecordHandle() { + if !s.asyncRunning || s.rwEventCacheIndex == 0 { + return + } + s.rwEventCh <- s.rwEventCache[:s.rwEventCacheIndex] + s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem) + s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)] + s.rwEventCacheIndex = 0 +} + +func (s *MVStates) stopAsyncRecorder() { + if s.asyncRunning { + s.BatchRecordHandle() + s.asyncRunning = false + close(s.rwEventCh) + rwEventCachePool.Put(&s.rwEventCache) + s.asyncWG.Wait() + } +} + +// FinaliseWithRWSet it will put target write set into pending writes. +func (s *MVStates) FinaliseWithRWSet(rwSet *RWSet) error { + s.lock.Lock() + defer s.lock.Unlock() + index := rwSet.index + for index >= len(s.rwSets) { + s.rwSets = append(s.rwSets, RWSet{index: -1}) + } + s.rwSets[index] = *rwSet + // just finalise all previous txs + start := s.nextFinaliseIndex + if start > index { + start = index + } + for i := start; i <= index; i++ { + if err := s.innerFinalise(i, true); err != nil { + return err + } + reads := make([]RWEventItem, 0, len(s.rwSets[i].accReadSet)+len(s.rwSets[i].slotReadSet)) + for addr, sub := range s.rwSets[i].accReadSet { + for state := range sub { + reads = append(reads, RWEventItem{ + Event: ReadAccRWEvent, + Addr: addr, + State: state, + }) + } + } + for addr, sub := range s.rwSets[i].slotReadSet { + for slot := range sub { + reads = append(reads, RWEventItem{ + Event: ReadSlotRWEvent, + Addr: addr, + Slot: slot, + }) + } + } + writes := make([]RWEventItem, 0, len(s.rwSets[i].accWriteSet)+len(s.rwSets[i].slotWriteSet)) + for addr, sub := range s.rwSets[i].accWriteSet { + for state := range sub { + writes = append(writes, RWEventItem{ + Event: WriteAccRWEvent, + Addr: addr, + State: state, + }) + } + } + for addr, sub := range s.rwSets[i].slotWriteSet { + for slot := range sub { + writes = append(writes, RWEventItem{ + Event: WriteSlotRWEvent, + Addr: addr, + Slot: slot, + }) + } + } + s.resolveDepsMapCacheByWrites(i, reads, writes) + } + + return nil +} + +func (s *MVStates) innerFinalise(index int, applyWriteSet bool) error { + if index >= len(s.rwSets) { + return fmt.Errorf("finalise a non-exist RWSet, index: %d", index) + } + + rwSet := s.rwSets[index] + if index > s.nextFinaliseIndex { + return fmt.Errorf("finalise in wrong order, next: %d, input: %d", s.nextFinaliseIndex, index) + } + + // reset nextFinaliseIndex to index+1, it may revert to previous txs + s.nextFinaliseIndex = index + 1 + if !applyWriteSet { + return nil + } + + // append to pending write set + for addr, sub := range rwSet.accWriteSet { + if _, exist := s.accWriteSet[addr]; !exist { + s.accWriteSet[addr] = make(map[AccountState]*RWTxList) + } + for state := range sub { + if _, exist := s.accWriteSet[addr][state]; !exist { + s.accWriteSet[addr][state] = NewRWTxList() + } + s.accWriteSet[addr][state].Append(index) + } + } + for addr, sub := range rwSet.accReadSet { + if _, exist := s.accReadSet[addr]; !exist { + s.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + for state := range sub { + if _, exist := s.accReadSet[addr][state]; !exist { + s.accReadSet[addr][state] = NewRWTxList() + } + s.accReadSet[addr][state].Append(index) + } + } + for addr, sub := range rwSet.slotWriteSet { + if _, exist := s.slotWriteSet[addr]; !exist { + s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList) + } + for slot := range sub { + if _, exist := s.slotWriteSet[addr][slot]; !exist { + s.slotWriteSet[addr][slot] = NewRWTxList() + } + s.slotWriteSet[addr][slot].Append(index) + } + } + for addr, sub := range rwSet.slotReadSet { + if _, exist := s.slotReadSet[addr]; !exist { + s.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + for slot := range sub { + if _, exist := s.slotReadSet[addr][slot]; !exist { + s.slotReadSet[addr][slot] = NewRWTxList() + } + s.slotReadSet[addr][slot].Append(index) + } + } + return nil +} + +func (s *MVStates) finaliseSlotWrite(index int, addr common.Address, slot common.Hash) { + // append to pending write set + if _, exist := s.slotWriteSet[addr]; !exist { + s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList) + } + if _, exist := s.slotWriteSet[addr][slot]; !exist { + s.slotWriteSet[addr][slot] = NewRWTxList() + } + s.slotWriteSet[addr][slot].Append(index) +} + +func (s *MVStates) finaliseSlotRead(index int, addr common.Address, slot common.Hash) { + // append to pending read set + if _, exist := s.slotReadSet[addr]; !exist { + s.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + if _, exist := s.slotReadSet[addr][slot]; !exist { + s.slotReadSet[addr][slot] = NewRWTxList() + } + s.slotReadSet[addr][slot].Append(index) +} + +func (s *MVStates) finaliseAccWrite(index int, addr common.Address, state AccountState) { + // append to pending write set + if _, exist := s.accWriteSet[addr]; !exist { + s.accWriteSet[addr] = make(map[AccountState]*RWTxList) + } + if _, exist := s.accWriteSet[addr][state]; !exist { + s.accWriteSet[addr][state] = NewRWTxList() + } + s.accWriteSet[addr][state].Append(index) +} + +func (s *MVStates) finaliseAccRead(index int, addr common.Address, state AccountState) { + // append to pending read set + if _, exist := s.accReadSet[addr]; !exist { + s.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + if _, exist := s.accReadSet[addr][state]; !exist { + s.accReadSet[addr][state] = NewRWTxList() + } + s.accReadSet[addr][state].Append(index) +} + +func (s *MVStates) queryAccWrites(addr common.Address, state AccountState) *RWTxList { + if _, exist := s.accWriteSet[addr]; !exist { + return nil + } + return s.accWriteSet[addr][state] +} + +func (s *MVStates) queryAccReads(addr common.Address, state AccountState) *RWTxList { + if _, exist := s.accReadSet[addr]; !exist { + return nil + } + return s.accReadSet[addr][state] +} + +func (s *MVStates) querySlotWrites(addr common.Address, slot common.Hash) *RWTxList { + if _, exist := s.slotWriteSet[addr]; !exist { + return nil + } + return s.slotWriteSet[addr][slot] +} + +func (s *MVStates) querySlotReads(addr common.Address, slot common.Hash) *RWTxList { + if _, exist := s.slotReadSet[addr]; !exist { + return nil + } + return s.slotReadSet[addr][slot] +} + +// resolveDepsMapCacheByWrites must be executed in order +func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, writes []RWEventItem) { + for index >= len(s.txDepCache) { + s.txDepCache = append(s.txDepCache, TxDep{}) + } + rwSet := s.rwSets[index] + // analysis dep, if the previous transaction is not executed/validated, re-analysis is required + if rwSet.excludedTx { + s.txDepCache[index] = NewTxDep([]uint64{}, ExcludedTxFlag) + return + } + depSlice := NewTxDepSlice(1) + addrMap := make(map[common.Address]struct{}) + // check tx dependency, only check key + for _, item := range reads { + // check account states & slots + var depWrites *RWTxList + if item.Event == ReadAccRWEvent { + depWrites = s.queryAccWrites(item.Addr, item.State) + } else { + depWrites = s.querySlotWrites(item.Addr, item.Slot) + } + if depWrites != nil { + if find := depWrites.FindLastTx(index); find >= 0 { + if tx := uint64(find); !depSlice.exist(tx) { + depSlice.add(tx) + } + } + } + + // check again account self with Suicide + if _, ok := addrMap[item.Addr]; ok { + continue + } + addrMap[item.Addr] = struct{}{} + depWrites = s.queryAccWrites(item.Addr, AccountSuicide) + if depWrites != nil { + if find := depWrites.FindLastTx(index); find >= 0 { + if tx := uint64(find); !depSlice.exist(tx) { + depSlice.add(tx) + } + } + } + // append AccountSelf event + s.finaliseAccRead(index, item.Addr, AccountSelf) + } + // Looking for read operations before write operations, e.g: read->read->read/write execution sequence, + // we need the write transaction to occur after the read transactions. + for _, item := range writes { + var depReads *RWTxList + if item.Event == WriteAccRWEvent { + // if here is AccountSuicide write, check AccountSelf read + state := item.State + if state == AccountSuicide { + state = AccountSelf + } + depReads = s.queryAccReads(item.Addr, state) + } else { + depReads = s.querySlotReads(item.Addr, item.Slot) + } + if depReads != nil { + if finds := depReads.FindPrevTxs(index); len(finds) >= 0 { + for _, tx := range finds { + tx := uint64(tx) + if !depSlice.exist(tx) { + depSlice.add(tx) + } + } + } + } + } + + for _, addr := range s.gasFeeReceivers { + if _, ok := addrMap[addr]; ok { + s.rwSets[index].cannotGasFeeDelay = true + break + } + } + // clear redundancy deps compared with prev + preDeps := depSlice.deps() + var removed []uint64 + for _, prev := range preDeps { + for _, tx := range s.txDepCache[int(prev)].TxIndexes { + if depSlice.exist(tx) { + removed = append(removed, tx) + } + } + } + for _, tx := range removed { + depSlice.remove(tx) + } + s.txDepCache[index] = NewTxDep(depSlice.deps()) +} + +// ResolveTxDAG generate TxDAG from RWSets +func (s *MVStates) ResolveTxDAG(txCnt int, extraTxDeps ...TxDep) (TxDAG, error) { + s.stopAsyncRecorder() + + s.lock.Lock() + defer s.lock.Unlock() + if s.nextFinaliseIndex != txCnt { + return nil, fmt.Errorf("cannot resolve with wrong FinaliseIndex, expect: %v, now: %v", txCnt, s.nextFinaliseIndex) + } + + totalCnt := txCnt + len(extraTxDeps) + for i := 0; i < txCnt; i++ { + if s.rwSets[i].cannotGasFeeDelay { + return NewEmptyTxDAG(), nil + } + } + txDAG := &PlainTxDAG{ + TxDeps: s.txDepCache, + } + if len(extraTxDeps) > 0 { + txDAG.TxDeps = append(txDAG.TxDeps, extraTxDeps...) + } + for i := 0; i < len(txDAG.TxDeps); i++ { + if len(txDAG.TxDeps[i].TxIndexes) <= (totalCnt-1)/2 { + continue + } + // if tx deps larger than half of txs, then convert with NonDependentRelFlag + txDAG.TxDeps[i].SetFlag(NonDependentRelFlag) + nd := make([]uint64, 0, totalCnt-1-len(txDAG.TxDeps[i].TxIndexes)) + for j := uint64(0); j < uint64(i); j++ { + if !slices.Contains(txDAG.TxDeps[i].TxIndexes, j) { + nd = append(nd, j) + } + } + txDAG.TxDeps[i].TxIndexes = nd + } + s.txDepCache = txDAG.TxDeps + return txDAG, nil +} + +func (s *MVStates) FeeReceivers() []common.Address { + return s.gasFeeReceivers +} + +type TxDepSlice struct { + indexes []uint64 +} + +func NewTxDepSlice(cap int) *TxDepSlice { + return &TxDepSlice{ + indexes: make([]uint64, 0, cap), + } +} + +func (m *TxDepSlice) add(index uint64) { + if m.exist(index) { + return + } + m.indexes = append(m.indexes, index) + for i := len(m.indexes) - 1; i > 0; i-- { + if m.indexes[i] < m.indexes[i-1] { + m.indexes[i-1], m.indexes[i] = m.indexes[i], m.indexes[i-1] + } + } +} + +func (m *TxDepSlice) exist(index uint64) bool { + _, ok := slices.BinarySearch(m.indexes, index) + return ok +} + +func (m *TxDepSlice) deps() []uint64 { + return m.indexes +} + +func (m *TxDepSlice) remove(index uint64) { + pos, ok := slices.BinarySearch(m.indexes, index) + if !ok { + return + } + for i := pos; i < len(m.indexes)-1; i++ { + m.indexes[i] = m.indexes[i+1] + } + m.indexes = m.indexes[:len(m.indexes)-1] +} + +func (m *TxDepSlice) len() int { + return len(m.indexes) +} diff --git a/core/types/mvstates_test.go b/core/types/mvstates_test.go new file mode 100644 index 0000000000..9ede0e5ad4 --- /dev/null +++ b/core/types/mvstates_test.go @@ -0,0 +1,458 @@ +package types + +import ( + "bytes" + "compress/gzip" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + + "github.com/cometbft/cometbft/libs/rand" + "github.com/golang/snappy" + + "github.com/stretchr/testify/require" +) + +const ( + mockRWSetSize = 10000 +) + +func TestMVStates_SimpleResolveTxDAG(t *testing.T) { + ms := NewMVStates(10, nil).EnableAsyncGen() + finaliseRWSets(t, ms, []*RWSet{ + mockRWSet(0, []interface{}{"0x00"}, []interface{}{"0x00"}), + mockRWSet(1, []interface{}{"0x01"}, []interface{}{"0x01"}), + mockRWSet(2, []interface{}{"0x02"}, []interface{}{"0x02"}), + mockRWSet(3, []interface{}{"0x03"}, []interface{}{"0x03"}), + mockRWSet(3, []interface{}{"0x03"}, []interface{}{"0x03"}), + mockRWSet(3, []interface{}{"0x00", "0x03"}, []interface{}{"0x03"}), + }) + finaliseRWSets(t, ms, []*RWSet{ + mockRWSet(4, []interface{}{"0x00", "0x04"}, []interface{}{"0x04"}), + mockRWSet(5, []interface{}{"0x01", "0x02", "0x05"}, []interface{}{"0x05"}), + mockRWSet(6, []interface{}{"0x02", "0x05", "0x06"}, []interface{}{"0x06"}), + mockRWSet(7, []interface{}{"0x06", "0x07"}, []interface{}{"0x07"}), + mockRWSet(8, []interface{}{"0x08"}, []interface{}{"0x08"}), + mockRWSet(9, []interface{}{"0x08", "0x09"}, []interface{}{"0x09"}), + }) + + dag, err := ms.ResolveTxDAG(10) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + ms.Stop() + require.Equal(t, mockSimpleDAG(), dag) + t.Log(dag) +} + +func TestMVStates_ResolveTxDAG_Async(t *testing.T) { + txCnt := 10000 + rwSets := mockRandomRWSet(txCnt) + ms1 := NewMVStates(txCnt, nil).EnableAsyncGen() + for i := 0; i < txCnt; i++ { + require.NoError(t, ms1.FinaliseWithRWSet(rwSets[i])) + } + time.Sleep(100 * time.Millisecond) + _, err := ms1.ResolveTxDAG(txCnt) + require.NoError(t, err) +} + +func TestMVStates_TxDAG_Compression(t *testing.T) { + txCnt := 10000 + rwSets := mockRandomRWSet(txCnt) + ms1 := NewMVStates(txCnt, nil).EnableAsyncGen() + for _, rwSet := range rwSets { + ms1.FinaliseWithRWSet(rwSet) + } + dag := resolveDepsMapCacheByWritesInMVStates(ms1) + enc, err := EncodeTxDAG(dag) + require.NoError(t, err) + + // snappy compression + start := time.Now() + encoded := snappy.Encode(nil, enc) + t.Log("snappy", "enc", len(enc), "compressed", len(encoded), + "ratio", 1-(float64(len(encoded))/float64(len(enc))), + "time", float64(time.Since(start).Microseconds())/1000) + + // gzip compression + start = time.Now() + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + _, err = zw.Write(enc) + require.NoError(t, err) + err = zw.Close() + require.NoError(t, err) + encoded = buf.Bytes() + t.Log("gzip", "enc", len(enc), "compressed", len(encoded), + "ratio", 1-(float64(len(encoded))/float64(len(enc))), + "time", float64(time.Since(start).Microseconds())/1000) +} + +var ( + mockRandRWSets []*RWSet + mockSameRWSets []*RWSet + mockDiffRWSets []*RWSet + mockRWEventItems [][]RWEventItem + mockSameRWEventItems [][]RWEventItem + mockDiffRWEventItems [][]RWEventItem +) + +func init() { + mockRandRWSets = mockRandomRWSet(mockRWSetSize) + mockSameRWSets = mockSameRWSet(mockRWSetSize) + mockDiffRWSets = mockDifferentRWSet(mockRWSetSize) + mockRWEventItems = make([][]RWEventItem, mockRWSetSize) + for i, rwSet := range mockRandRWSets { + mockRWEventItems[i] = mockRWEventItemsFromRWSet(i, rwSet) + } + mockSameRWEventItems = make([][]RWEventItem, mockRWSetSize) + for i, rwSet := range mockSameRWSets { + mockSameRWEventItems[i] = mockRWEventItemsFromRWSet(i, rwSet) + } + mockDiffRWEventItems = make([][]RWEventItem, mockRWSetSize) + for i, rwSet := range mockDiffRWSets { + mockDiffRWEventItems[i] = mockRWEventItemsFromRWSet(i, rwSet) + } +} + +func BenchmarkResolveTxDAGByWritesInMVStates(b *testing.B) { + ms1 := NewMVStates(mockRWSetSize, nil).EnableAsyncGen() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, item := range mockRWEventItems { + ms1.handleRWEvents(item) + } + resolveDepsMapCacheByWritesInMVStates(ms1) + } +} + +func BenchmarkResolveTxDAG_RWEvent_RandRWSet(b *testing.B) { + benchmarkResolveTxDAGRWEvent(b, mockRWEventItems) +} + +func BenchmarkResolveTxDAG_RWEvent_SameRWSet(b *testing.B) { + benchmarkResolveTxDAGRWEvent(b, mockSameRWEventItems) +} + +func BenchmarkResolveTxDAG_RWEvent_DiffRWSet(b *testing.B) { + benchmarkResolveTxDAGRWEvent(b, mockDiffRWEventItems) +} + +func benchmarkResolveTxDAGRWEvent(b *testing.B, eventItems [][]RWEventItem) { + for i := 0; i < b.N; i++ { + s := NewMVStates(0, nil).EnableAsyncGen() + for _, items := range eventItems { + for _, item := range items { + switch item.Event { + case NewTxRWEvent: + s.RecordNewTx(item.Index) + case ReadAccRWEvent: + s.RecordAccountRead(item.Addr, item.State) + case ReadSlotRWEvent: + s.RecordStorageRead(item.Addr, item.Slot) + case WriteAccRWEvent: + s.RecordAccountWrite(item.Addr, item.State) + case WriteSlotRWEvent: + s.RecordStorageWrite(item.Addr, item.Slot) + } + } + } + s.ResolveTxDAG(mockRWSetSize) + s.Stop() + } +} + +func BenchmarkMVStates_Finalise(b *testing.B) { + rwSets := mockRandomRWSet(mockRWSetSize) + ms1 := NewMVStates(mockRWSetSize, nil).EnableAsyncGen() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, rwSet := range rwSets { + ms1.FinaliseWithRWSet(rwSet) + } + } +} + +func resolveDepsMapCacheByWritesInMVStates(s *MVStates) TxDAG { + txCnt := s.nextFinaliseIndex + txDAG := NewPlainTxDAG(txCnt) + for i := 0; i < txCnt; i++ { + txDAG.TxDeps[i] = s.txDepCache[i] + } + return txDAG +} + +func TestMVStates_SystemTxResolveTxDAG(t *testing.T) { + ms := NewMVStates(12, nil).EnableAsyncGen() + finaliseRWSets(t, ms, []*RWSet{ + mockRWSet(0, []interface{}{"0x00"}, []interface{}{"0x00"}), + mockRWSet(1, []interface{}{"0x01"}, []interface{}{"0x01"}), + mockRWSet(2, []interface{}{"0x02"}, []interface{}{"0x02"}), + mockRWSet(3, []interface{}{"0x00", "0x03"}, []interface{}{"0x03"}), + mockRWSet(4, []interface{}{"0x00", "0x04"}, []interface{}{"0x04"}), + mockRWSet(5, []interface{}{"0x01", "0x02", "0x05"}, []interface{}{"0x05"}), + mockRWSet(6, []interface{}{"0x02", "0x05", "0x06"}, []interface{}{"0x06"}), + mockRWSet(7, []interface{}{"0x06", "0x07"}, []interface{}{"0x07"}), + mockRWSet(8, []interface{}{"0x08"}, []interface{}{"0x08"}), + mockRWSet(9, []interface{}{"0x08", "0x09"}, []interface{}{"0x09"}), + mockRWSet(10, []interface{}{"0x10"}, []interface{}{"0x10"}).WithExcludedTxFlag(), + mockRWSet(11, []interface{}{"0x11"}, []interface{}{"0x11"}).WithExcludedTxFlag(), + }) + + dag, err := ms.ResolveTxDAG(12) + require.NoError(t, err) + require.Equal(t, mockSystemTxDAG(), dag) + t.Log(dag) +} + +func TestMVStates_SystemTxWithLargeDepsResolveTxDAG(t *testing.T) { + ms := NewMVStates(12, nil).EnableAsyncGen() + finaliseRWSets(t, ms, []*RWSet{ + mockRWSet(0, []interface{}{"0x00"}, []interface{}{"0x00"}), + mockRWSet(1, []interface{}{"0x01"}, []interface{}{"0x01"}), + mockRWSet(2, []interface{}{"0x02"}, []interface{}{"0x02"}), + mockRWSet(3, []interface{}{"0x00", "0x03"}, []interface{}{"0x03"}), + mockRWSet(4, []interface{}{"0x00", "0x04"}, []interface{}{"0x04"}), + mockRWSet(5, []interface{}{"0x01", "0x02", "0x05"}, []interface{}{"0x05"}), + mockRWSet(6, []interface{}{"0x02", "0x05", "0x06"}, []interface{}{"0x06"}), + mockRWSet(7, []interface{}{"0x00", "0x03", "0x07"}, []interface{}{"0x07"}), + mockRWSet(8, []interface{}{"0x08"}, []interface{}{"0x08"}), + mockRWSet(9, []interface{}{"0x00", "0x01", "0x02", "0x06", "0x07", "0x08", "0x09"}, []interface{}{"0x09"}), + mockRWSet(10, []interface{}{"0x10"}, []interface{}{"0x10"}).WithExcludedTxFlag(), + mockRWSet(11, []interface{}{"0x11"}, []interface{}{"0x11"}).WithExcludedTxFlag(), + }) + dag, err := ms.ResolveTxDAG(12) + require.NoError(t, err) + require.Equal(t, mockSystemTxDAGWithLargeDeps(), dag) + t.Log(dag) +} + +func TestTxRecorder_Basic(t *testing.T) { + sets := []*RWSet{ + mockRWSet(0, []interface{}{AccountSelf, AccountBalance, "0x00"}, + []interface{}{AccountBalance, AccountCodeHash, "0x00"}), + mockRWSet(1, []interface{}{AccountSelf, AccountBalance, "0x01"}, + []interface{}{AccountBalance, AccountCodeHash, "0x01"}), + mockRWSet(2, []interface{}{AccountSelf, AccountBalance, "0x01", "0x01"}, + []interface{}{AccountBalance, AccountCodeHash, "0x01"}), + } + ms := NewMVStates(0, nil).EnableAsyncGen() + for _, item := range sets { + ms.RecordNewTx(item.index) + for addr, sub := range item.accReadSet { + for state := range sub { + ms.RecordAccountRead(addr, state) + } + } + for addr, sub := range item.slotReadSet { + for slot := range sub { + ms.RecordStorageRead(addr, slot) + } + } + for addr, sub := range item.accWriteSet { + for state := range sub { + ms.RecordAccountWrite(addr, state) + } + } + for addr, sub := range item.slotWriteSet { + for slot := range sub { + ms.RecordStorageWrite(addr, slot) + } + } + } + dag, err := ms.ResolveTxDAG(3) + require.NoError(t, err) + require.Equal(t, "[]\n[0]\n[1]\n", dag.(*PlainTxDAG).String()) +} + +func TestRWSet(t *testing.T) { + set := NewRWSet(0) + mockRWSetWithAddr(set, common.Address{1}, []interface{}{AccountSelf, AccountBalance, "0x00"}, + []interface{}{AccountBalance, AccountCodeHash, "0x00"}) + mockRWSetWithAddr(set, common.Address{2}, []interface{}{AccountSelf, AccountBalance, "0x01"}, + []interface{}{AccountBalance, AccountCodeHash, "0x01"}) + mockRWSetWithAddr(set, common.Address{3}, []interface{}{AccountSelf, AccountBalance, "0x01", "0x01"}, + []interface{}{AccountBalance, AccountCodeHash, "0x01"}) + t.Log(set) +} + +func TestTxRecorder_CannotDelayGasFee(t *testing.T) { + ms := NewMVStates(0, nil).EnableAsyncGen() + ms.RecordNewTx(0) + ms.RecordNewTx(1) + ms.RecordCannotDelayGasFee() + ms.RecordNewTx(2) + dag, err := ms.ResolveTxDAG(3) + require.NoError(t, err) + require.Equal(t, NewEmptyTxDAG(), dag) +} + +func mockRWSet(index int, read []interface{}, write []interface{}) *RWSet { + set := NewRWSet(index) + set.accReadSet[common.Address{}] = map[AccountState]struct{}{} + set.accWriteSet[common.Address{}] = map[AccountState]struct{}{} + set.slotReadSet[common.Address{}] = map[common.Hash]struct{}{} + set.slotWriteSet[common.Address{}] = map[common.Hash]struct{}{} + for _, k := range read { + state, ok := k.(AccountState) + if ok { + set.accReadSet[common.Address{}][state] = struct{}{} + } else { + set.slotReadSet[common.Address{}][str2Slot(k.(string))] = struct{}{} + } + } + for _, k := range write { + state, ok := k.(AccountState) + if ok { + set.accWriteSet[common.Address{}][state] = struct{}{} + } else { + set.slotWriteSet[common.Address{}][str2Slot(k.(string))] = struct{}{} + } + } + + return set +} + +func mockRWSetWithAddr(set *RWSet, addr common.Address, read []interface{}, write []interface{}) *RWSet { + set.accReadSet[addr] = map[AccountState]struct{}{} + set.accWriteSet[addr] = map[AccountState]struct{}{} + set.slotReadSet[addr] = map[common.Hash]struct{}{} + set.slotWriteSet[addr] = map[common.Hash]struct{}{} + for _, k := range read { + state, ok := k.(AccountState) + if ok { + set.accReadSet[addr][state] = struct{}{} + } else { + set.slotReadSet[addr][str2Slot(k.(string))] = struct{}{} + } + } + for _, k := range write { + state, ok := k.(AccountState) + if ok { + set.accWriteSet[addr][state] = struct{}{} + } else { + set.slotWriteSet[addr][str2Slot(k.(string))] = struct{}{} + } + } + + return set +} + +func str2Slot(str string) common.Hash { + return common.BytesToHash([]byte(str)) +} + +func mockRandomRWSet(count int) []*RWSet { + var ret []*RWSet + for i := 0; i < count; i++ { + read := []interface{}{fmt.Sprintf("0x%d", i)} + write := []interface{}{fmt.Sprintf("0x%d", i)} + if i != 0 && rand.Bool() { + depCnt := rand.Int()%i + 1 + last := 0 + for j := 0; j < depCnt; j++ { + num, ok := randInRange(last, i) + if !ok { + break + } + read = append(read, fmt.Sprintf("0x%d", num)) + last = num + } + } + // random read + for j := 0; j < 20; j++ { + read = append(read, fmt.Sprintf("rr-%d-%d", j, rand.Int())) + } + for j := 0; j < 5; j++ { + read = append(read, fmt.Sprintf("rw-%d-%d", j, rand.Int())) + } + // random write + s := mockRWSet(i, read, write) + ret = append(ret, s) + } + return ret +} + +func mockSameRWSet(count int) []*RWSet { + var ret []*RWSet + for i := 0; i < count; i++ { + read := []interface{}{"0xa0", "0xa1", fmt.Sprintf("0x%d", i), fmt.Sprintf("0x%d", i)} + write := []interface{}{"0xa0", fmt.Sprintf("0x%d", i)} + // random write + s := mockRWSet(i, read, write) + ret = append(ret, s) + } + return ret +} + +func mockDifferentRWSet(count int) []*RWSet { + var ret []*RWSet + for i := 0; i < count; i++ { + read := []interface{}{fmt.Sprintf("0x%d", i), fmt.Sprintf("0x%d", i)} + write := []interface{}{fmt.Sprintf("0x%d", i)} + // random write + s := mockRWSet(i, read, write) + ret = append(ret, s) + } + return ret +} + +func finaliseRWSets(t *testing.T, mv *MVStates, rwSets []*RWSet) { + for _, rwSet := range rwSets { + require.NoError(t, mv.FinaliseWithRWSet(rwSet)) + } +} + +func randInRange(i, j int) (int, bool) { + if i >= j { + return 0, false + } + return rand.Int()%(j-i) + i, true +} + +func mockRWEventItemsFromRWSet(index int, rwSet *RWSet) []RWEventItem { + items := make([]RWEventItem, 0) + items = append(items, RWEventItem{ + Event: NewTxRWEvent, + Index: index, + }) + for addr, sub := range rwSet.accReadSet { + for state := range sub { + items = append(items, RWEventItem{ + Event: ReadAccRWEvent, + Addr: addr, + State: state, + }) + } + } + for addr, sub := range rwSet.slotReadSet { + for slot := range sub { + items = append(items, RWEventItem{ + Event: ReadSlotRWEvent, + Addr: addr, + Slot: slot, + }) + } + } + for addr, sub := range rwSet.accWriteSet { + for state := range sub { + items = append(items, RWEventItem{ + Event: WriteAccRWEvent, + Addr: addr, + State: state, + }) + } + } + for addr, sub := range rwSet.slotWriteSet { + for slot := range sub { + items = append(items, RWEventItem{ + Event: WriteSlotRWEvent, + Addr: addr, + Slot: slot, + }) + } + } + return items +} diff --git a/core/vm/interface.go b/core/vm/interface.go index 25bfa06720..c8636ef643 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -79,6 +79,11 @@ type StateDB interface { AddLog(*types.Log) AddPreimage(common.Hash, []byte) + + TxIndex() int + + // parallel DAG related + CheckFeeReceiversRWSet() } // CallContext provides a basic interface for the EVM calling conventions. The EVM diff --git a/eth/backend.go b/eth/backend.go index d96c20ce0f..ff5926a187 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -283,6 +283,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } + if config.EnableParallelTxDAG { + eth.blockchain.SetupTxDAGGeneration() + } if chainConfig := eth.blockchain.Config(); chainConfig.Optimism != nil { // config.Genesis.Config.ChainID cannot be used because it's based on CLI flags only, thus default to mainnet L1 config.NetworkId = chainConfig.ChainID.Uint64() // optimism defaults eth network ID to chain ID eth.networkID = config.NetworkId diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 383641ffc3..3ce1fa3df5 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -219,6 +219,7 @@ type Config struct { RollupHaltOnIncompatibleProtocolVersion string EnableOpcodeOptimizing bool + EnableParallelTxDAG bool } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/go.mod b/go.mod index 73768e2685..b46d2b8255 100644 --- a/go.mod +++ b/go.mod @@ -110,6 +110,7 @@ require ( github.com/dgraph-io/ristretto v0.0.4-0.20210318174700-74754f61e018 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/ferranbt/fastssz v0.0.0-20210905181407-59cf6761a7d5 // indirect github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect diff --git a/go.sum b/go.sum index 0671fa8e41..c7854ffc84 100644 --- a/go.sum +++ b/go.sum @@ -350,6 +350,8 @@ github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZi github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/dot v0.11.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/miner/miner.go b/miner/miner.go index 53755ad632..08c60c186e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -19,6 +19,7 @@ package miner import ( "context" + "crypto/ecdsa" "errors" "fmt" "math/big" @@ -63,7 +64,8 @@ var ( snapshotAccountReadTimer = metrics.NewRegisteredTimer("miner/snapshot/account/reads", nil) snapshotStorageReadTimer = metrics.NewRegisteredTimer("miner/snapshot/storage/reads", nil) - waitPayloadTimer = metrics.NewRegisteredTimer("miner/wait/payload", nil) + waitPayloadTimer = metrics.NewRegisteredTimer("miner/wait/payload", nil) + txDAGGenerateTimer = metrics.NewRegisteredTimer("miner/txdag/gen", nil) isBuildBlockInterruptCounter = metrics.NewRegisteredCounter("miner/build/interrupt", nil) ) @@ -108,6 +110,8 @@ type Config struct { EffectiveGasCeil uint64 // if non-zero, a gas ceiling to apply independent of the header's gaslimit value Mev MevConfig // Mev configuration + + ParallelTxDAGSenderPriv *ecdsa.PrivateKey // The private key for the parallel tx DAG sender } // DefaultConfig contains default settings for miner. diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index cb1220fbfa..89ffd4d63f 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -44,7 +44,7 @@ func testBuildPayload(t *testing.T, noTxPool, interrupt bool) { db = rawdb.NewMemoryDatabase() recipient = common.HexToAddress("0xdeadbeef") ) - w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), db, 0) + w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), db, 0, nil, nil) defer w.close() const numInterruptTxs = 256 diff --git a/miner/worker.go b/miner/worker.go index b8df413238..5cf742bcd5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -18,6 +18,7 @@ package miner import ( "context" + "crypto/ecdsa" "errors" "fmt" "math/big" @@ -40,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -102,6 +104,10 @@ var ( txErrReplayMeter = metrics.NewRegisteredMeter("miner/tx/replay", nil) ) +var ( + DefaultTxDAGAddress = common.HexToAddress("0xda90000000000000000000000000000000000000") +) + // environment is the worker's current environment and holds all // information of the sealing block generation. type environment struct { @@ -118,17 +124,20 @@ type environment struct { blobs int UnRevertible mapset.Set[common.Hash] + + gasForTxDAG uint64 // gas reserved for the txdag } // copy creates a deep copy of environment. func (env *environment) copy() *environment { cpy := &environment{ - signer: env.signer, - state: env.state.Copy(), - tcount: env.tcount, - coinbase: env.coinbase, - header: types.CopyHeader(env.header), - receipts: copyReceipts(env.receipts), + signer: env.signer, + state: env.state.CopyWithMvStates(), + tcount: env.tcount, + coinbase: env.coinbase, + header: types.CopyHeader(env.header), + receipts: copyReceipts(env.receipts), + gasForTxDAG: env.gasForTxDAG, } if env.gasPool != nil { gasPool := *env.gasPool @@ -924,7 +933,7 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac } } // If we don't have enough gas for any further transactions then we're done. - if env.gasPool.Gas() < params.TxGas { + if env.gasPool.Gas()-env.gasForTxDAG < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) break } @@ -960,7 +969,7 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac } txTotalMeter.Mark(1) // If we don't have enough space for the next transaction, skip the account. - if env.gasPool.Gas() < ltx.Gas { + if env.gasPool.Gas()-env.gasForTxDAG < ltx.Gas { log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas) txs.Pop() txErrNotenoughgasMeter.Mark(1) @@ -1036,6 +1045,86 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac return nil } +// generate and append DAG tx +func (w *worker) appendTxDAG(env *environment) { + // whether enable TxDAG + if !w.chain.TxDAGEnabledWhenMine() { + return + } + // TODO this is a placeholder for the tx DAG data that will be generated by the stateDB + txForDAG, err := w.generateDAGTx(env.state, env.signer, env.tcount, env.gasForTxDAG) + if err != nil { + log.Warn("failed to generate DAG tx", "err", err) + return + } + env.state.SetTxContext(txForDAG.Hash(), env.tcount) + _, err = w.commitTransaction(env, txForDAG) + if err != nil { + log.Warn("failed to commit DAG tx", "err", err) + return + } + env.tcount++ +} + +// generateDAGTx generates a DAG transaction for the block +func (w *worker) generateDAGTx(statedb *state.StateDB, signer types.Signer, txIndex int, gasLimitForDag uint64) (*types.Transaction, error) { + if statedb == nil { + return nil, fmt.Errorf("failed to get state db, env.state=nil") + } + + if signer == nil { + return nil, fmt.Errorf("current signer is nil") + } + + sender := w.config.ParallelTxDAGSenderPriv + if sender == nil { + return nil, fmt.Errorf("missing sender private key") + } + + // get txDAG data from the stateDB + // txIndex is the index of this txDAG transaction + defer func() { + statedb.MVStates().Stop() + }() + txDAG, err := statedb.ResolveTxDAG(txIndex, types.TxDep{Flags: &types.NonDependentRelFlag}) + if txDAG == nil { + return nil, err + } + + publicKey := sender.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + return nil, fmt.Errorf("error casting public key to ECDSA") + } + fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) + + // get nonce from the + nonce := statedb.GetNonce(fromAddress) + + data, err := types.EncodeTxDAGCalldata(txDAG) + if err != nil { + return nil, fmt.Errorf("failed to encode txDAG, err: %v", err) + } + + // Create the transaction + tx := types.NewTx(&types.LegacyTx{ + Nonce: nonce, + To: &DefaultTxDAGAddress, + Value: big.NewInt(0), + Gas: gasLimitForDag, + GasPrice: big.NewInt(0), + Data: data, + }) + + // Sign the transaction with the private key + signedTx, err := types.SignTx(tx, signer, sender) + if err != nil { + return nil, fmt.Errorf("failed to sign transaction, err: %v", err) + } + + return signedTx, nil +} + // generateParams wraps various of settings for generating sealing task. type generateParams struct { timestamp uint64 // The timestamp for sealing task @@ -1223,6 +1312,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err localBlobTxs[account] = txs } } + // Fill the block with all available pending transactions. start = time.Now() if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { @@ -1246,6 +1336,27 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err return nil } +func (w *worker) estimateGasForTxDAG(env *environment) uint64 { + var gas uint64 = 0 + if w.chain.TxDAGEnabledWhenMine() { + // 1. a 10k-transactions block need at most 64kB to store its transaction, and its data size grows linearly with the number of transactions + // 2. 100M gaslimit block can include at most 4761 = (100M/21000) transactions + // + // the total gas for TxDAG is calculated as follows: + // + // MaxBytesPerTx = 64 * 1024 / 10000 = 6.5 bytes ~ 7 bytes + // MaxTxsCanInclude uint64 = GasLimit / 21000 + // total = MaxBytesPerTx * NoZeroGas * MaxTxsCanInclude + params.TxGas + // + if w.chainConfig.IsIstanbul(env.header.Number) { + gas = 7*params.TxDataNonZeroGasEIP2028*(env.header.GasLimit/21000) + params.TxGas + } else { + gas = 7*params.TxDataNonZeroGasFrontier*(env.header.GasLimit/21000) + params.TxGas + } + } + return gas +} + // generateWork generates a sealing block based on the given parameters. func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { // TODO delete after debug performance metrics @@ -1274,6 +1385,11 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { misc.EnsureCreate2Deployer(w.chainConfig, work.header.Time, work.state) start := time.Now() + if w.chain.TxDAGEnabledWhenMine() { + feeReceivers := []common.Address{work.coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient} + work.state.ResetMVStates(0, feeReceivers) + log.Debug("ResetMVStates", "block", work.header.Number.Uint64()) + } for _, tx := range genParams.txs { from, _ := types.Sender(work.signer, tx) work.state.SetTxContext(tx.Hash(), work.tcount) @@ -1288,6 +1404,8 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { // forced transactions done, fill rest of block with transactions if !genParams.noTxs { + // reserve gas for TxDAG + work.gasForTxDAG = w.estimateGasForTxDAG(work) // use shared interrupt if present interrupt := genParams.interrupt if interrupt == nil { @@ -1302,6 +1420,9 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { wg.Add(1) go func() { defer wg.Done() + if w.chain.TxDAGEnabledWhenMine() { + newWork.state.MVStates().EnableAsyncGen() + } err := w.fillTransactions(interrupt, newWork) if errors.Is(err, errBlockInterruptedByTimeout) { log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout), "parentHash", genParams.parentHash) @@ -1311,14 +1432,23 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { isBuildBlockInterruptCounter.Inc(1) } }() + if w.chain.TxDAGEnabledWhenMine() { + work.state.MVStates().EnableAsyncGen() + } err := w.fillTransactionsAndBundles(interrupt, work) wg.Wait() timer.Stop() // don't need timeout interruption any more if errors.Is(err, errFillBundleInterrupted) { log.Warn("fill bundles is interrupted, discard", "err", err) - work = newWork + work, newWork = newWork, work + } + if w.chain.TxDAGEnabledWhenMine() { + newWork.state.MVStates().Stop() } } else { + if w.chain.TxDAGEnabledWhenMine() { + work.state.MVStates().EnableAsyncGen() + } err := w.fillTransactions(interrupt, work) timer.Stop() // don't need timeout interruption any more if errors.Is(err, errBlockInterruptedByTimeout) { @@ -1329,6 +1459,10 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { isBuildBlockInterruptCounter.Inc(1) } } + if w.chain.TxDAGEnabledWhenMine() { + // append a DAG tx at the end of the block + w.appendTxDAG(work) + } } if intr := genParams.interrupt; intr != nil && genParams.isUpdate && intr.Load() != commitInterruptNone { @@ -1355,6 +1489,7 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { storageUpdateTimer.Update(work.state.StorageUpdates) // Storage updates are complete(in FinalizeAndAssemble) accountHashTimer.Update(work.state.AccountHashes) // Account hashes are complete(in FinalizeAndAssemble) storageHashTimer.Update(work.state.StorageHashes) // Storage hashes are complete(in FinalizeAndAssemble) + txDAGGenerateTimer.Update(work.state.TxDAGGenerate) innerExecutionTimer.Update(core.DebugInnerExecutionDuration) diff --git a/miner/worker_test.go b/miner/worker_test.go index 1c19e60de9..495d52d6e7 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -115,7 +115,8 @@ type testWorkerBackend struct { genesis *core.Genesis } -func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, n int) *testWorkerBackend { +func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, + db ethdb.Database, n int, overrideVMConfig *vm.Config) *testWorkerBackend { var gspec = &core.Genesis{ Config: chainConfig, Alloc: types.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, @@ -131,7 +132,11 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine default: t.Fatalf("unexpected consensus engine type: %T", engine) } - chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vm.Config{}, nil, nil) + vmConfig := vm.Config{} + if overrideVMConfig != nil { + vmConfig = *overrideVMConfig + } + chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vmConfig, nil, nil) if err != nil { t.Fatalf("core.NewBlockChain failed: %v", err) } @@ -160,11 +165,16 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { return tx } -func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { - backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) +func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, + blocks int, overrideConfig *Config, overrideVMConfig *vm.Config) (*worker, *testWorkerBackend) { + backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks, overrideVMConfig) backend.txPool.Add(pendingTxs, true, false) time.Sleep(500 * time.Millisecond) // Wait for txs to be promoted - w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + cfg := testConfig + if overrideConfig != nil { + cfg = overrideConfig + } + w := newWorker(cfg, chainConfig, engine, backend, new(event.TypeMux), nil, false) w.setEtherbase(testBankAddress) return w, backend } @@ -178,7 +188,7 @@ func TestGenerateAndImportBlock(t *testing.T) { config.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} engine := clique.New(config.Clique, db) - w, b := newTestWorker(t, &config, engine, db, 0) + w, b := newTestWorker(t, &config, engine, db, 0, nil, nil) defer w.close() // This test chain imports the mined blocks. @@ -214,6 +224,77 @@ func TestGenerateAndImportBlock(t *testing.T) { } } +func TestGenerateTxDAGGaslessBlock(t *testing.T) { + generateTxDAGGaslessBlock(t, true, true) + generateTxDAGGaslessBlock(t, true, false) + generateTxDAGGaslessBlock(t, false, true) + generateTxDAGGaslessBlock(t, false, false) +} + +func generateTxDAGGaslessBlock(t *testing.T, enableMev, enableTxDAG bool) { + t.Log("generateTxDAGGaslessBlock", enableMev, enableTxDAG) + var ( + db = rawdb.NewMemoryDatabase() + config = *params.AllCliqueProtocolChanges + ) + config.Optimism = ¶ms.OptimismConfig{ + EIP1559Elasticity: 2, + EIP1559Denominator: 8, + EIP1559DenominatorCanyon: 8, + } + cfg := Config{} + cfg = *testConfig + if enableMev { + cfg.Mev.MevEnabled = true + } + cfg.NewPayloadTimeout = 3 * time.Second + cfg.ParallelTxDAGSenderPriv, _ = crypto.ToECDSA(crypto.Keccak256([]byte{1})) + vmConfig := vm.Config{NoBaseFee: true} + engine := clique.New(config.Clique, db) + + w, b := newTestWorker(t, &config, engine, db, 0, &cfg, &vmConfig) + defer w.close() + if enableTxDAG { + w.chain.SetupTxDAGGeneration() + } + + // Ignore empty commit here for less noise. + w.skipSealHook = func(task *task) bool { + return len(task.receipts) == 0 + } + + // Start mining! + w.start() + + for i := 0; i < 5; i++ { + b.txPool.Add([]*types.Transaction{b.newRandomTx(true)}, true, false) + b.txPool.Add([]*types.Transaction{b.newRandomTx(false)}, true, false) + time.Sleep(1 * time.Second) // Wait for txs to be promoted + + block := w.getSealingBlock(&generateParams{ + timestamp: uint64(time.Now().Unix()), + forceTime: false, + parentHash: common.Hash{}, + coinbase: common.Address{}, + random: common.Hash{}, + withdrawals: nil, + beaconRoot: nil, + noTxs: false, + txs: types.Transactions{ + types.NewTx(&types.DepositTx{ + To: nil, // contract creation + Value: big.NewInt(6), + Gas: 50, + })}, + gasLimit: nil, + interrupt: nil, + isUpdate: false, + }) + txDAG, _ := types.GetTxDAG(block.block) + t.Log("block", block.block.NumberU64(), "txs", len(block.block.Transactions()), "txdag", txDAG) + } +} + func TestEmptyWorkEthash(t *testing.T) { t.Parallel() testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) @@ -226,7 +307,7 @@ func TestEmptyWorkClique(t *testing.T) { func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, nil, nil) defer w.close() taskCh := make(chan struct{}, 2) @@ -271,7 +352,7 @@ func TestAdjustIntervalClique(t *testing.T) { func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, nil, nil) defer w.close() w.skipSealHook = func(task *task) bool { @@ -376,7 +457,7 @@ func TestGetSealingWorkPostMerge(t *testing.T) { func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, nil, nil) defer w.close() w.setExtra([]byte{0x01, 0x02}) diff --git a/tests/block_test.go b/tests/block_test.go index fb355085fd..0457510821 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -73,20 +73,56 @@ func TestExecutionSpecBlocktests(t *testing.T) { }) } +func TestBlockchainWithTxDAG(t *testing.T) { + bt := new(testMatcher) + // General state tests are 'exported' as blockchain tests, but we can run them natively. + // For speedier CI-runs, the line below can be uncommented, so those are skipped. + // For now, in hardfork-times (Berlin), we run the tests both as StateTests and + // as blockchain tests, since the latter also covers things like receipt root + bt.skipLoad(`^GeneralStateTests/`) + + // Skip random failures due to selfish mining test + bt.skipLoad(`.*bcForgedTest/bcForkUncle\.json`) + + // Slow tests + bt.slow(`.*bcExploitTest/DelegateCallSpam.json`) + bt.slow(`.*bcExploitTest/ShanghaiLove.json`) + bt.slow(`.*bcExploitTest/SuicideIssue.json`) + bt.slow(`.*/bcForkStressTest/`) + bt.slow(`.*/bcGasPricerTest/RPC_API_Test.json`) + bt.slow(`.*/bcWalletTest/`) + + // Very slow test + bt.skipLoad(`.*/stTimeConsuming/.*`) + // test takes a lot for time and goes easily OOM because of sha3 calculation on a huge range, + // using 4.6 TGas + bt.skipLoad(`.*randomStatetest94.json.*`) + + bt.walk(t, blockTestDir, func(t *testing.T, name string, test *BlockTest) { + if runtime.GOARCH == "386" && runtime.GOOS == "windows" && rand.Int63()%2 == 0 { + t.Skip("test (randomly) skipped on 32-bit windows") + } + if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, true, nil)); err != nil { + t.Errorf("test in path mode with snapshotter failed: %v", err) + return + } + }) +} + func execBlockTest(t *testing.T, bt *testMatcher, test *BlockTest) { - if err := bt.checkFailure(t, test.Run(false, rawdb.HashScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(false, rawdb.HashScheme, nil, false, nil)); err != nil { t.Errorf("test in hash mode without snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(true, rawdb.HashScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(true, rawdb.HashScheme, nil, false, nil)); err != nil { t.Errorf("test in hash mode with snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(false, rawdb.PathScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(false, rawdb.PathScheme, nil, false, nil)); err != nil { t.Errorf("test in path mode without snapshotter failed: %v", err) return } - if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(true, rawdb.PathScheme, nil, false, nil)); err != nil { t.Errorf("test in path mode with snapshotter failed: %v", err) return } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index e8ca68dee0..252a2cdd09 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -109,7 +109,7 @@ type btHeaderMarshaling struct { ExcessBlobGas *math.HexOrDecimal64 } -func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, postCheck func(error, *core.BlockChain)) (result error) { +func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, enableTxDAG bool, postCheck func(error, *core.BlockChain)) (result error) { config, ok := Forks[t.json.Network] if !ok { return UnsupportedForkError{t.json.Network} @@ -159,6 +159,9 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, po return err } defer chain.Stop() + if enableTxDAG { + chain.SetupTxDAGGeneration() + } validBlocks, err := t.insertBlocks(chain) if err != nil {