diff --git a/eth/filters/api.go b/eth/filters/api.go index 91477a917..ef360dd70 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -117,16 +117,19 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) - f := &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = f + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() gopool.Submit(func() { for { select { case ph := <-pendingTxs: - f.hashes = append(f.hashes, ph...) + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph...) + } + api.filtersMu.Unlock() case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4be7ceb44..12f037d0f 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -174,17 +174,6 @@ func (sub *Subscription) Unsubscribe() { // this ensures that the manager won't use the event channel which // will probably be closed by the client asap after this method returns. <-sub.Err() - - drainLoop: - for { - select { - case <-sub.f.logs: - case <-sub.f.hashes: - case <-sub.f.headers: - default: - break drainLoop - } - } }) }