diff --git a/eth/filters/api.go b/eth/filters/api.go index a7a1c16d9..31de09183 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -39,6 +39,7 @@ type filter struct { typ Type deadline *time.Timer // filter is inactiv when deadline triggers hashes []common.Hash + txs []*types.Transaction crit FilterCriteria logs []*types.Log s *Subscription // associated subscription in event system @@ -99,7 +100,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) { } } -// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// NewPendingTransactionFilter creates a filter that fetches pending transactions // as transactions enter the pending state. // // It is part of the filter package because this filter can be used through the @@ -108,20 +109,20 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) { // https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan []common.Hash) + pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() gopool.Submit(func() { for { select { - case ph := <-pendingTxs: + case pTx := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph...) + f.txs = append(f.txs, pTx...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -136,9 +137,10 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { return pendingTxSub.ID } -// NewPendingTransactions creates a subscription that is triggered each time a transaction -// enters the transaction pool and was signed from one of the transactions this nodes manages. -func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { +// NewPendingTransactions creates a subscription that is triggered each time a +// transaction enters the transaction pool. If fullTx is true the full tx is +// sent to the client, otherwise the hash is sent. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -147,16 +149,20 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su rpcSub := notifier.CreateSubscription() gopool.Submit(func() { - txHashes := make(chan []common.Hash, 128) - pendingTxSub := api.events.SubscribePendingTxs(txHashes) + txs := make(chan []*types.Transaction, 128) + pendingTxSub := api.events.SubscribePendingTxs(txs) for { select { - case hashes := <-txHashes: + case txs := <-txs: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification - for _, h := range hashes { - notifier.Notify(rpcSub.ID, h) + for _, tx := range txs { + if fullTx != nil && *fullTx { + notifier.Notify(rpcSub.ID, tx) + } else { + notifier.Notify(rpcSub.ID, tx.Hash()) + } } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() @@ -551,10 +557,14 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(api.timeout) switch f.typ { - case PendingTransactionsSubscription, BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription: + case BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil + case PendingTransactionsSubscription: + txs := f.txs + f.txs = nil + return txs, nil case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bf461a1c5..7186e7f7d 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -47,8 +47,8 @@ const ( PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription - // PendingTransactionsSubscription queries tx hashes for pending - // transactions entering the pending state + // PendingTransactionsSubscription queries for pending transactions entering + // the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription @@ -83,7 +83,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan []common.Hash + txs chan []*types.Transaction headers chan *types.Header finalizedHeaders chan *types.Header votes chan *types.VoteEnvelope @@ -187,7 +187,7 @@ func (sub *Subscription) Unsubscribe() { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: - case <-sub.f.hashes: + case <-sub.f.txs: case <-sub.f.headers: case <-sub.f.votes: } @@ -255,7 +255,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -273,7 +273,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -291,7 +291,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -308,7 +308,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: headers, votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -325,7 +325,7 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header) typ: FinalizedHeadersSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: headers, votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -334,15 +334,15 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header) return es.subscribe(sub) } -// SubscribePendingTxs creates a subscription that writes transaction hashes for +// SubscribePendingTxs creates a subscription that writes transactions for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: hashes, + txs: txs, headers: make(chan *types.Header), votes: make(chan *types.VoteEnvelope), installed: make(chan struct{}), @@ -359,7 +359,7 @@ func (es *EventSystem) SubscribeNewVotes(votes chan *types.VoteEnvelope) *Subscr typ: VotesSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), votes: votes, installed: make(chan struct{}), @@ -404,12 +404,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { - hashes := make([]common.Hash, 0, len(ev.Txs)) - for _, tx := range ev.Txs { - hashes = append(hashes, tx.Hash()) - } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes + f.txs <- ev.Txs } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 737eaa17c..3acc0b6da 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -250,7 +250,7 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - hashes []common.Hash + txs []*types.Transaction ) fid0 := api.NewPendingTransactionFilter() @@ -265,9 +265,9 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - h := results.([]common.Hash) - hashes = append(hashes, h...) - if len(hashes) >= len(transactions) { + tx := results.([]*types.Transaction) + txs = append(txs, tx...) + if len(txs) >= len(transactions) { break } // check timeout @@ -278,13 +278,13 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(100 * time.Millisecond) } - if len(hashes) != len(transactions) { - t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + if len(txs) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs)) return } - for i := range hashes { - if hashes[i] != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + for i := range txs { + if txs[i].Hash() != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) } } } @@ -715,11 +715,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) { fids[i] = fid // Wait for at least one tx to arrive in filter for { - hashes, err := api.GetFilterChanges(fid) + txs, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(hashes.([]common.Hash)) > 0 { + if len(txs.([]*types.Transaction)) > 0 { break } runtime.Gosched() diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index 3795b182e..72dc14d7c 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -175,7 +175,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { return &result, err } -// SubscribePendingTransactions subscribes to new pending transactions. +// SubscribeFullPendingTransactions subscribes to new pending transactions. +func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) +} + +// SubscribePendingTransactions subscribes to new pending transaction hashes. func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index d8f938569..1fb2683e3 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -123,8 +123,11 @@ func TestGethClient(t *testing.T) { "TestSetHead", func(t *testing.T) { testSetHead(t, client) }, }, { - "TestSubscribePendingTxs", + "TestSubscribePendingTxHashes", func(t *testing.T) { testSubscribePendingTransactions(t, client) }, + }, { + "TestSubscribePendingTxs", + func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) }, }, { "TestCallContract", func(t *testing.T) { testCallContract(t, client) }, @@ -298,6 +301,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { } } +func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) { + ec := New(client) + ethcl := ethclient.NewClient(client) + // Subscribe to Transactions + ch := make(chan *types.Transaction) + ec.SubscribeFullPendingTransactions(context.Background(), ch) + // Send a transaction + chainID, err := ethcl.ChainID(context.Background()) + if err != nil { + t.Fatal(err) + } + // Create transaction + tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil) + signer := types.LatestSignerForChainID(chainID) + signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey) + if err != nil { + t.Fatal(err) + } + signedTx, err := tx.WithSignature(signer, signature) + if err != nil { + t.Fatal(err) + } + // Send transaction + err = ethcl.SendTransaction(context.Background(), signedTx) + if err != nil { + t.Fatal(err) + } + // Check that the transaction was send over the channel + tx = <-ch + if tx.Hash() != signedTx.Hash() { + t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash()) + } +} + func testCallContract(t *testing.T, client *rpc.Client) { ec := New(client) msg := ethereum.CallMsg{