Skip to content

Commit

Permalink
eh/filters: send rpctransactions in pending-subscription (#2252)
Browse files Browse the repository at this point in the history
* eh/filters: send rpctransactions in pending-subscription
* Remove unused parameter in `NewRPCPendingTransaction`

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Paul Lange <paul.lange@clabs.co>
  • Loading branch information
3 people authored Feb 16, 2024
1 parent 1fc3473 commit 5068265
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
8 changes: 8 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,14 @@ func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.Matche
panic("not supported")
}

func (fb *filterBackend) ChainConfig() *params.ChainConfig {
panic("not supported")
}

func (fb *filterBackend) CurrentHeader() *types.Header {
panic("not supported")
}

func (fb *filterBackend) RealGasPriceMinimumForHeader(ctx context.Context, currencyAddress *common.Address, header *types.Header) (*big.Int, error) {
return nil, fmt.Errorf("filterBackend does not implement RealGasPriceMinimumForHeader")
}
Expand Down
5 changes: 4 additions & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *
go func() {
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)
chainConfig := api.backend.ChainConfig()

for {
select {
case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
latest := api.backend.CurrentHeader()
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
rpcTx := ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)
notifier.Notify(rpcSub.ID, rpcTx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
Expand Down
3 changes: 3 additions & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/ethdb"
"github.com/celo-org/celo-blockchain/event"
"github.com/celo-org/celo-blockchain/params"
"github.com/celo-org/celo-blockchain/rpc"
)

Expand All @@ -37,6 +38,8 @@ type Backend interface {
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)

CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// LastSubscription keeps track of the last index
// LastIndexSubscription keeps track of the last index
LastIndexSubscription
)

Expand Down
8 changes: 8 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ type testBackend struct {
chainFeed event.Feed
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
panic("implement me")
}

func (b *testBackend) CurrentHeader() *types.Header {
panic("implement me")
}

func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}
Expand Down
31 changes: 10 additions & 21 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,19 @@ func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string]*RPCTransac
}
pending, queue := s.b.TxPoolContent()
curHeader := s.b.CurrentHeader()
baseFeeFn := func(feeCurrency *common.Address) (*big.Int, error) {
return s.b.CurrentGasPriceMinimum(context.Background(), feeCurrency)
}
// Flatten the pending transactions
for account, txs := range pending {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig(), baseFeeFn)
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig(), baseFeeFn)
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["queued"][account.Hex()] = dump
}
Expand All @@ -176,18 +173,15 @@ func (s *PublicTxPoolAPI) ContentFrom(addr common.Address) map[string]map[string

// Build the pending transactions
dump := make(map[string]*RPCTransaction, len(pending))
baseFeeFn := func(feeCurrency *common.Address) (*big.Int, error) {
return s.b.CurrentGasPriceMinimum(context.Background(), feeCurrency)
}
for _, tx := range pending {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig(), baseFeeFn)
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"] = dump

// Build the queued transactions
dump = make(map[string]*RPCTransaction, len(queue))
for _, tx := range queue {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig(), baseFeeFn)
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())

}
content["queued"] = dump
Expand Down Expand Up @@ -1356,9 +1350,10 @@ func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber
return result
}

// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func newRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig, baseFeeFn func(*common.Address) (*big.Int, error)) *RPCTransaction {
return newRPCTransaction(tx, common.Hash{}, 0, 0, baseFeeFn, false)
// NewRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction {
// With `inABlock` set to `false, `baseFeeFn` is never called
return newRPCTransaction(tx, common.Hash{}, 0, 0, nil, false)
}

// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation.
Expand Down Expand Up @@ -1594,10 +1589,7 @@ func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, has
}
// No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
baseFeeFn := func(feeCurrency *common.Address) (*big.Int, error) {
return s.b.CurrentGasPriceMinimum(context.Background(), tx.FeeCurrency())
}
return newRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig(), baseFeeFn), nil
return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
}

// Transaction unknown, return as such
Expand Down Expand Up @@ -1898,13 +1890,10 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err
}
curHeader := s.b.CurrentHeader()
transactions := make([]*RPCTransaction, 0, len(pending))
baseFeeFn := func(feeCurrency *common.Address) (*big.Int, error) {
return s.b.CurrentGasPriceMinimum(context.Background(), feeCurrency)
}
for _, tx := range pending {
from, _ := types.Sender(s.signer, tx)
if _, exists := accounts[from]; exists {
transactions = append(transactions, newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig(), baseFeeFn))
transactions = append(transactions, NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()))
}
}
return transactions, nil
Expand Down

0 comments on commit 5068265

Please sign in to comment.