diff --git a/core/chains/evm/client/node.go b/core/chains/evm/client/node.go index 1a9decf4328..ddc86948e48 100644 --- a/core/chains/evm/client/node.go +++ b/core/chains/evm/client/node.go @@ -127,8 +127,6 @@ type rawclient struct { // It must have a ws url and may have a http url type node struct { utils.StartStopOnce - ws rawclient - http *rawclient lfcLog logger.Logger rpcLog logger.Logger name string @@ -136,6 +134,9 @@ type node struct { chainID *big.Int cfg NodeConfig + ws rawclient + http *rawclient + state NodeState stateMu sync.RWMutex @@ -279,9 +280,6 @@ func (n *node) dial(callerCtx context.Context) error { n.http.geth = ethclient.NewClient(httprpc) } - n.lfcLog.Debugw("RPC dial: success") - promEVMPoolRPCNodeDialsSuccess.WithLabelValues(n.chainID.String(), n.name).Inc() - return nil } @@ -343,7 +341,12 @@ func (n *node) verify(callerCtx context.Context) (err error) { func (n *node) Close() error { return n.StopOnce(n.name, func() error { - defer n.wg.Wait() + defer func() { + n.wg.Wait() + if n.ws.rpc != nil { + n.ws.rpc.Close() + } + }() n.stateMu.Lock() defer n.stateMu.Unlock() @@ -351,9 +354,6 @@ func (n *node) Close() error { n.cancelNodeCtx() n.cancelInflightRequests() n.state = NodeStateClosed - if n.ws.rpc != nil { - n.ws.rpc.Close() - } return nil }) } @@ -413,7 +413,7 @@ func (n *node) getRPCDomain() string { // CallContext implementation func (n *node) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return err } @@ -425,10 +425,10 @@ func (n *node) CallContext(ctx context.Context, result interface{}, method strin lggr.Debug("RPC call: evmclient.Client#CallContext") start := time.Now() - if n.http != nil { - err = n.wrapHTTP(n.http.rpc.CallContext(ctx, result, method, args...)) + if http != nil { + err = n.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...)) } else { - err = n.wrapWS(n.ws.rpc.CallContext(ctx, result, method, args...)) + err = n.wrapWS(ws.rpc.CallContext(ctx, result, method, args...)) } duration := time.Since(start) @@ -438,7 +438,7 @@ func (n *node) CallContext(ctx context.Context, result interface{}, method strin } func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return err } @@ -447,10 +447,10 @@ func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { lggr.Debug("RPC call: evmclient.Client#BatchCallContext") start := time.Now() - if n.http != nil { - err = n.wrapHTTP(n.http.rpc.BatchCallContext(ctx, b)) + if http != nil { + err = n.wrapHTTP(http.rpc.BatchCallContext(ctx, b)) } else { - err = n.wrapWS(n.ws.rpc.BatchCallContext(ctx, b)) + err = n.wrapWS(ws.rpc.BatchCallContext(ctx, b)) } duration := time.Since(start) @@ -460,7 +460,7 @@ func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { } func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head, args ...interface{}) (ethereum.Subscription, error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, _, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -469,7 +469,7 @@ func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head, lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() - sub, err := n.ws.rpc.EthSubscribe(ctx, channel, args...) + sub, err := ws.rpc.EthSubscribe(ctx, channel, args...) if err == nil { n.registerSub(sub) } @@ -483,7 +483,7 @@ func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head, // GethClient wrappers func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -493,11 +493,11 @@ func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (rece lggr.Debug("RPC call: evmclient.Client#TransactionReceipt") start := time.Now() - if n.http != nil { - receipt, err = n.http.geth.TransactionReceipt(ctx, txHash) + if http != nil { + receipt, err = http.geth.TransactionReceipt(ctx, txHash) err = n.wrapHTTP(err) } else { - receipt, err = n.ws.geth.TransactionReceipt(ctx, txHash) + receipt, err = ws.geth.TransactionReceipt(ctx, txHash) err = n.wrapWS(err) } duration := time.Since(start) @@ -510,7 +510,7 @@ func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (rece } func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -519,11 +519,11 @@ func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *typ lggr.Debug("RPC call: evmclient.Client#HeaderByNumber") start := time.Now() - if n.http != nil { - header, err = n.http.geth.HeaderByNumber(ctx, number) + if http != nil { + header, err = http.geth.HeaderByNumber(ctx, number) err = n.wrapHTTP(err) } else { - header, err = n.ws.geth.HeaderByNumber(ctx, number) + header, err = ws.geth.HeaderByNumber(ctx, number) err = n.wrapWS(err) } duration := time.Since(start) @@ -534,7 +534,7 @@ func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *typ } func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -543,11 +543,11 @@ func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *type lggr.Debug("RPC call: evmclient.Client#HeaderByHash") start := time.Now() - if n.http != nil { - header, err = n.http.geth.HeaderByHash(ctx, hash) + if http != nil { + header, err = http.geth.HeaderByHash(ctx, hash) err = n.wrapHTTP(err) } else { - header, err = n.ws.geth.HeaderByHash(ctx, hash) + header, err = ws.geth.HeaderByHash(ctx, hash) err = n.wrapWS(err) } duration := time.Since(start) @@ -560,7 +560,7 @@ func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *type } func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return err } @@ -569,10 +569,10 @@ func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error lggr.Debug("RPC call: evmclient.Client#SendTransaction") start := time.Now() - if n.http != nil { - err = n.wrapHTTP(n.http.geth.SendTransaction(ctx, tx)) + if http != nil { + err = n.wrapHTTP(http.geth.SendTransaction(ctx, tx)) } else { - err = n.wrapWS(n.ws.geth.SendTransaction(ctx, tx)) + err = n.wrapWS(ws.geth.SendTransaction(ctx, tx)) } duration := time.Since(start) @@ -583,7 +583,7 @@ func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error // PendingNonceAt returns one higher than the highest nonce from both mempool and mined transactions func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonce uint64, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return 0, err } @@ -592,11 +592,11 @@ func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonc lggr.Debug("RPC call: evmclient.Client#PendingNonceAt") start := time.Now() - if n.http != nil { - nonce, err = n.http.geth.PendingNonceAt(ctx, account) + if http != nil { + nonce, err = http.geth.PendingNonceAt(ctx, account) err = n.wrapHTTP(err) } else { - nonce, err = n.ws.geth.PendingNonceAt(ctx, account) + nonce, err = ws.geth.PendingNonceAt(ctx, account) err = n.wrapWS(err) } duration := time.Since(start) @@ -612,7 +612,7 @@ func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonc // mined nonce at the given block number, but it actually returns the total // transaction count which is the highest mined nonce + 1 func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce uint64, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return 0, err } @@ -621,11 +621,11 @@ func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber lggr.Debug("RPC call: evmclient.Client#NonceAt") start := time.Now() - if n.http != nil { - nonce, err = n.http.geth.NonceAt(ctx, account, blockNumber) + if http != nil { + nonce, err = http.geth.NonceAt(ctx, account, blockNumber) err = n.wrapHTTP(err) } else { - nonce, err = n.ws.geth.NonceAt(ctx, account, blockNumber) + nonce, err = ws.geth.NonceAt(ctx, account, blockNumber) err = n.wrapWS(err) } duration := time.Since(start) @@ -638,7 +638,7 @@ func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber } func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -647,11 +647,11 @@ func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code lggr.Debug("RPC call: evmclient.Client#PendingCodeAt") start := time.Now() - if n.http != nil { - code, err = n.http.geth.PendingCodeAt(ctx, account) + if http != nil { + code, err = http.geth.PendingCodeAt(ctx, account) err = n.wrapHTTP(err) } else { - code, err = n.ws.geth.PendingCodeAt(ctx, account) + code, err = ws.geth.PendingCodeAt(ctx, account) err = n.wrapWS(err) } duration := time.Since(start) @@ -664,7 +664,7 @@ func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code } func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -673,11 +673,11 @@ func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber * lggr.Debug("RPC call: evmclient.Client#CodeAt") start := time.Now() - if n.http != nil { - code, err = n.http.geth.CodeAt(ctx, account, blockNumber) + if http != nil { + code, err = http.geth.CodeAt(ctx, account, blockNumber) err = n.wrapHTTP(err) } else { - code, err = n.ws.geth.CodeAt(ctx, account, blockNumber) + code, err = ws.geth.CodeAt(ctx, account, blockNumber) err = n.wrapWS(err) } duration := time.Since(start) @@ -690,7 +690,7 @@ func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber * } func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return 0, err } @@ -699,11 +699,11 @@ func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint lggr.Debug("RPC call: evmclient.Client#EstimateGas") start := time.Now() - if n.http != nil { - gas, err = n.http.geth.EstimateGas(ctx, call) + if http != nil { + gas, err = http.geth.EstimateGas(ctx, call) err = n.wrapHTTP(err) } else { - gas, err = n.ws.geth.EstimateGas(ctx, call) + gas, err = ws.geth.EstimateGas(ctx, call) err = n.wrapWS(err) } duration := time.Since(start) @@ -716,7 +716,7 @@ func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint } func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -725,11 +725,11 @@ func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) lggr.Debug("RPC call: evmclient.Client#SuggestGasPrice") start := time.Now() - if n.http != nil { - price, err = n.http.geth.SuggestGasPrice(ctx) + if http != nil { + price, err = http.geth.SuggestGasPrice(ctx) err = n.wrapHTTP(err) } else { - price, err = n.ws.geth.SuggestGasPrice(ctx) + price, err = ws.geth.SuggestGasPrice(ctx) err = n.wrapWS(err) } duration := time.Since(start) @@ -742,7 +742,7 @@ func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) } func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) (val []byte, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -751,11 +751,11 @@ func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumb lggr.Debug("RPC call: evmclient.Client#CallContract") start := time.Now() - if n.http != nil { - val, err = n.http.geth.CallContract(ctx, msg, blockNumber) + if http != nil { + val, err = http.geth.CallContract(ctx, msg, blockNumber) err = n.wrapHTTP(err) } else { - val, err = n.ws.geth.CallContract(ctx, msg, blockNumber) + val, err = ws.geth.CallContract(ctx, msg, blockNumber) err = n.wrapWS(err) } duration := time.Since(start) @@ -769,7 +769,7 @@ func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumb } func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Block, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -778,11 +778,11 @@ func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Blo lggr.Debug("RPC call: evmclient.Client#BlockByNumber") start := time.Now() - if n.http != nil { - b, err = n.http.geth.BlockByNumber(ctx, number) + if http != nil { + b, err = http.geth.BlockByNumber(ctx, number) err = n.wrapHTTP(err) } else { - b, err = n.ws.geth.BlockByNumber(ctx, number) + b, err = ws.geth.BlockByNumber(ctx, number) err = n.wrapWS(err) } duration := time.Since(start) @@ -795,7 +795,7 @@ func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Blo } func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Block, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -804,11 +804,11 @@ func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Bloc lggr.Debug("RPC call: evmclient.Client#BlockByHash") start := time.Now() - if n.http != nil { - b, err = n.http.geth.BlockByHash(ctx, hash) + if http != nil { + b, err = http.geth.BlockByHash(ctx, hash) err = n.wrapHTTP(err) } else { - b, err = n.ws.geth.BlockByHash(ctx, hash) + b, err = ws.geth.BlockByHash(ctx, hash) err = n.wrapWS(err) } duration := time.Since(start) @@ -821,7 +821,7 @@ func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Bloc } func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -830,11 +830,11 @@ func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumbe lggr.Debug("RPC call: evmclient.Client#BalanceAt") start := time.Now() - if n.http != nil { - balance, err = n.http.geth.BalanceAt(ctx, account, blockNumber) + if http != nil { + balance, err = http.geth.BalanceAt(ctx, account, blockNumber) err = n.wrapHTTP(err) } else { - balance, err = n.ws.geth.BalanceAt(ctx, account, blockNumber) + balance, err = ws.geth.BalanceAt(ctx, account, blockNumber) err = n.wrapWS(err) } duration := time.Since(start) @@ -847,7 +847,7 @@ func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumbe } func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -856,11 +856,11 @@ func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []type lggr.Debug("RPC call: evmclient.Client#FilterLogs") start := time.Now() - if n.http != nil { - l, err = n.http.geth.FilterLogs(ctx, q) + if http != nil { + l, err = http.geth.FilterLogs(ctx, q) err = n.wrapHTTP(err) } else { - l, err = n.ws.geth.FilterLogs(ctx, q) + l, err = ws.geth.FilterLogs(ctx, q) err = n.wrapWS(err) } duration := time.Since(start) @@ -873,7 +873,7 @@ func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []type } func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, _, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -882,7 +882,7 @@ func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs") start := time.Now() - sub, err = n.ws.geth.SubscribeFilterLogs(ctx, q, ch) + sub, err = ws.geth.SubscribeFilterLogs(ctx, q, ch) if err == nil { n.registerSub(sub) } @@ -895,7 +895,7 @@ func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, } func (n *node) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - ctx, cancel, err := n.makeLiveQueryCtx(ctx) + ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx) if err != nil { return nil, err } @@ -904,11 +904,11 @@ func (n *node) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error lggr.Debug("RPC call: evmclient.Client#SuggestGasTipCap") start := time.Now() - if n.http != nil { - tipCap, err = n.http.geth.SuggestGasTipCap(ctx) + if http != nil { + tipCap, err = http.geth.SuggestGasTipCap(ctx) err = n.wrapHTTP(err) } else { - tipCap, err = n.ws.geth.SuggestGasTipCap(ctx) + tipCap, err = ws.geth.SuggestGasTipCap(ctx) err = n.wrapWS(err) } duration := time.Since(start) @@ -990,9 +990,8 @@ func wrap(err error, tp string) error { return errors.Wrapf(err, "%s call failed", tp) } -// makeLiveQueryCtx wraps makeQueryCtx but returns error if node is not -// "alive". -func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, err error) { +// makeLiveQueryCtx wraps makeQueryCtx but returns error if node is not NodeStateAlive. +func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient, err error) { // Need to wrap in mutex because state transition can cancel and replace the // context n.stateMu.RLock() @@ -1002,6 +1001,11 @@ func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context, return } cancelCh := n.chStopInFlight + ws = n.ws + if n.http != nil { + cp := *n.http + http = &cp + } n.stateMu.RUnlock() ctx, cancel = makeQueryCtx(parentCtx, cancelCh) return diff --git a/core/chains/evm/client/node_lifecycle_test.go b/core/chains/evm/client/node_lifecycle_test.go index 981c2d410e7..c68c88ee4be 100644 --- a/core/chains/evm/client/node_lifecycle_test.go +++ b/core/chains/evm/client/node_lifecycle_test.go @@ -233,6 +233,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { default: } return `"0x00"`, makeHeadResult(0) + case "eth_unsubscribe": case "web3_clientVersion": return `"test client version 2"`, "" default: @@ -268,6 +269,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { switch method { case "eth_subscribe": return `"0x00"`, makeHeadResult(0) + case "eth_unsubscribe": default: t.Errorf("unexpected RPC method: %s", method) } @@ -432,6 +434,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { default: } return `"0x00"`, makeHeadResult(0) + case "eth_unsubscribe": default: t.Errorf("unexpected RPC method: %s", method) } diff --git a/core/internal/testutils/testutils.go b/core/internal/testutils/testutils.go index 1399b3649ad..2e1bf8fe431 100644 --- a/core/internal/testutils/testutils.go +++ b/core/internal/testutils/testutils.go @@ -313,7 +313,7 @@ func IntToHex(n int) string { // TestInterval is just a sensible poll interval that gives fast tests without // risk of spamming -const TestInterval = 10 * time.Millisecond +const TestInterval = 100 * time.Millisecond // AssertEventually waits for f to return true func AssertEventually(t *testing.T, f func() bool) {