eth/filters, ethclient/gethclient: add fullTx option to pending tx filter (#25186) (#1626)

This PR adds a way to subscribe to the _full_ pending transactions, as opposed to just being notified about hashes.

In use cases where client subscribes to newPendingTransactions and gets txhashes only to then request the actual transaction, the caller can now shortcut that flow and obtain the transactions directly.

Co-authored-by: lx <92799281+brilliant-lx@users.noreply.github.com>
Co-authored-by: lmittmann <lmittmann@users.noreply.github.com>
Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
lmittmann 2023-06-27 05:52:31 +02:00 committed by GitHub
parent 598c36bcea
commit 291cb8ab51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 45 deletions

@ -39,6 +39,7 @@ type filter struct {
typ Type typ Type
deadline *time.Timer // filter is inactiv when deadline triggers deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash hashes []common.Hash
txs []*types.Transaction
crit FilterCriteria crit FilterCriteria
logs []*types.Log logs []*types.Log
s *Subscription // associated subscription in event system s *Subscription // associated subscription in event system
@ -99,7 +100,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
} }
} }
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // NewPendingTransactionFilter creates a filter that fetches pending transactions
// as transactions enter the pending state. // as transactions enter the pending state.
// //
// It is part of the filter package because this filter can be used through the // It is part of the filter package because this filter can be used through the
@ -108,20 +109,20 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
// https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter // https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var ( var (
pendingTxs = make(chan []common.Hash) pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
) )
api.filtersMu.Lock() api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filtersMu.Unlock() api.filtersMu.Unlock()
gopool.Submit(func() { gopool.Submit(func() {
for { for {
select { select {
case ph := <-pendingTxs: case pTx := <-pendingTxs:
api.filtersMu.Lock() api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found { if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph...) f.txs = append(f.txs, pTx...)
} }
api.filtersMu.Unlock() api.filtersMu.Unlock()
case <-pendingTxSub.Err(): case <-pendingTxSub.Err():
@ -136,9 +137,10 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
return pendingTxSub.ID return pendingTxSub.ID
} }
// NewPendingTransactions creates a subscription that is triggered each time a transaction // NewPendingTransactions creates a subscription that is triggered each time a
// enters the transaction pool and was signed from one of the transactions this nodes manages. // transaction enters the transaction pool. If fullTx is true the full tx is
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { // sent to the client, otherwise the hash is sent.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
@ -147,16 +149,20 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
rpcSub := notifier.CreateSubscription() rpcSub := notifier.CreateSubscription()
gopool.Submit(func() { gopool.Submit(func() {
txHashes := make(chan []common.Hash, 128) txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes) pendingTxSub := api.events.SubscribePendingTxs(txs)
for { for {
select { select {
case hashes := <-txHashes: case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification. // To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification // TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, h := range hashes { for _, tx := range txs {
notifier.Notify(rpcSub.ID, h) if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
} }
case <-rpcSub.Err(): case <-rpcSub.Err():
pendingTxSub.Unsubscribe() pendingTxSub.Unsubscribe()
@ -551,10 +557,14 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(api.timeout) f.deadline.Reset(api.timeout)
switch f.typ { switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription: case BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription:
hashes := f.hashes hashes := f.hashes
f.hashes = nil f.hashes = nil
return returnHashes(hashes), nil return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
case LogsSubscription, MinedAndPendingLogsSubscription: case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs logs := f.logs
f.logs = nil f.logs = nil

@ -47,8 +47,8 @@ const (
PendingLogsSubscription PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending // PendingTransactionsSubscription queries for pending transactions entering
// transactions entering the pending state // the pending state
PendingTransactionsSubscription PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported // BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription BlocksSubscription
@ -83,7 +83,7 @@ type subscription struct {
created time.Time created time.Time
logsCrit ethereum.FilterQuery logsCrit ethereum.FilterQuery
logs chan []*types.Log logs chan []*types.Log
hashes chan []common.Hash txs chan []*types.Transaction
headers chan *types.Header headers chan *types.Header
finalizedHeaders chan *types.Header finalizedHeaders chan *types.Header
votes chan *types.VoteEnvelope votes chan *types.VoteEnvelope
@ -187,7 +187,7 @@ func (sub *Subscription) Unsubscribe() {
case sub.es.uninstall <- sub.f: case sub.es.uninstall <- sub.f:
break uninstallLoop break uninstallLoop
case <-sub.f.logs: case <-sub.f.logs:
case <-sub.f.hashes: case <-sub.f.txs:
case <-sub.f.headers: case <-sub.f.headers:
case <-sub.f.votes: case <-sub.f.votes:
} }
@ -255,7 +255,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit, logsCrit: crit,
created: time.Now(), created: time.Now(),
logs: logs, logs: logs,
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: make(chan *types.Header), headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -273,7 +273,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit, logsCrit: crit,
created: time.Now(), created: time.Now(),
logs: logs, logs: logs,
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: make(chan *types.Header), headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -291,7 +291,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit, logsCrit: crit,
created: time.Now(), created: time.Now(),
logs: logs, logs: logs,
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: make(chan *types.Header), headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -308,7 +308,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription, typ: BlocksSubscription,
created: time.Now(), created: time.Now(),
logs: make(chan []*types.Log), logs: make(chan []*types.Log),
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: headers, headers: headers,
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -325,7 +325,7 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header)
typ: FinalizedHeadersSubscription, typ: FinalizedHeadersSubscription,
created: time.Now(), created: time.Now(),
logs: make(chan []*types.Log), logs: make(chan []*types.Log),
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: headers, headers: headers,
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -334,15 +334,15 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header)
return es.subscribe(sub) return es.subscribe(sub)
} }
// SubscribePendingTxs creates a subscription that writes transaction hashes for // SubscribePendingTxs creates a subscription that writes transactions for
// transactions that enter the transaction pool. // transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
sub := &subscription{ sub := &subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: PendingTransactionsSubscription, typ: PendingTransactionsSubscription,
created: time.Now(), created: time.Now(),
logs: make(chan []*types.Log), logs: make(chan []*types.Log),
hashes: hashes, txs: txs,
headers: make(chan *types.Header), headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope), votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}), installed: make(chan struct{}),
@ -359,7 +359,7 @@ func (es *EventSystem) SubscribeNewVotes(votes chan *types.VoteEnvelope) *Subscr
typ: VotesSubscription, typ: VotesSubscription,
created: time.Now(), created: time.Now(),
logs: make(chan []*types.Log), logs: make(chan []*types.Log),
hashes: make(chan []common.Hash), txs: make(chan []*types.Transaction),
headers: make(chan *types.Header), headers: make(chan *types.Header),
votes: votes, votes: votes,
installed: make(chan struct{}), installed: make(chan struct{}),
@ -404,12 +404,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
} }
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] { for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes f.txs <- ev.Txs
} }
} }

@ -250,7 +250,7 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
} }
hashes []common.Hash txs []*types.Transaction
) )
fid0 := api.NewPendingTransactionFilter() fid0 := api.NewPendingTransactionFilter()
@ -265,9 +265,9 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err) t.Fatalf("Unable to retrieve logs: %v", err)
} }
h := results.([]common.Hash) tx := results.([]*types.Transaction)
hashes = append(hashes, h...) txs = append(txs, tx...)
if len(hashes) >= len(transactions) { if len(txs) >= len(transactions) {
break break
} }
// check timeout // check timeout
@ -278,13 +278,13 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
if len(hashes) != len(transactions) { if len(txs) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
return return
} }
for i := range hashes { for i := range txs {
if hashes[i] != transactions[i].Hash() { if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
} }
} }
} }
@ -715,11 +715,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
fids[i] = fid fids[i] = fid
// Wait for at least one tx to arrive in filter // Wait for at least one tx to arrive in filter
for { for {
hashes, err := api.GetFilterChanges(fid) txs, err := api.GetFilterChanges(fid)
if err != nil { if err != nil {
t.Fatalf("Filter should exist: %v\n", err) t.Fatalf("Filter should exist: %v\n", err)
} }
if len(hashes.([]common.Hash)) > 0 { if len(txs.([]*types.Transaction)) > 0 {
break break
} }
runtime.Gosched() runtime.Gosched()

@ -175,7 +175,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
return &result, err return &result, err
} }
// SubscribePendingTransactions subscribes to new pending transactions. // SubscribeFullPendingTransactions subscribes to new pending transactions.
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
}
// SubscribePendingTransactions subscribes to new pending transaction hashes.
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
} }

@ -123,8 +123,11 @@ func TestGethClient(t *testing.T) {
"TestSetHead", "TestSetHead",
func(t *testing.T) { testSetHead(t, client) }, func(t *testing.T) { testSetHead(t, client) },
}, { }, {
"TestSubscribePendingTxs", "TestSubscribePendingTxHashes",
func(t *testing.T) { testSubscribePendingTransactions(t, client) }, func(t *testing.T) { testSubscribePendingTransactions(t, client) },
}, {
"TestSubscribePendingTxs",
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
}, { }, {
"TestCallContract", "TestCallContract",
func(t *testing.T) { testCallContract(t, client) }, func(t *testing.T) { testCallContract(t, client) },
@ -298,6 +301,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
} }
} }
func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
ec := New(client)
ethcl := ethclient.NewClient(client)
// Subscribe to Transactions
ch := make(chan *types.Transaction)
ec.SubscribeFullPendingTransactions(context.Background(), ch)
// Send a transaction
chainID, err := ethcl.ChainID(context.Background())
if err != nil {
t.Fatal(err)
}
// Create transaction
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
signer := types.LatestSignerForChainID(chainID)
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
if err != nil {
t.Fatal(err)
}
signedTx, err := tx.WithSignature(signer, signature)
if err != nil {
t.Fatal(err)
}
// Send transaction
err = ethcl.SendTransaction(context.Background(), signedTx)
if err != nil {
t.Fatal(err)
}
// Check that the transaction was send over the channel
tx = <-ch
if tx.Hash() != signedTx.Hash() {
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
}
}
func testCallContract(t *testing.T, client *rpc.Client) { func testCallContract(t *testing.T, client *rpc.Client) {
ec := New(client) ec := New(client)
msg := ethereum.CallMsg{ msg := ethereum.CallMsg{