eth: fix #2710 filter races

and locking bugs found in its wake.
This commit is contained in:
Henning Diedrich 2016-06-17 09:53:54 +02:00
parent 4f3f6e28d5
commit 51f8ce26cf
2 changed files with 54 additions and 28 deletions

@ -68,8 +68,6 @@ type PublicFilterAPI struct {
transactionMu sync.RWMutex transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue transactionQueue map[int]*hashQueue
transactMu sync.Mutex
} }
// NewPublicFilterAPI returns a new PublicFilterAPI instance. // NewPublicFilterAPI returns a new PublicFilterAPI instance.
@ -100,6 +98,7 @@ done:
for { for {
select { select {
case <-timer.C: case <-timer.C:
s.filterManager.Lock() // lock order like filterLoop()
s.logMu.Lock() s.logMu.Lock()
for id, filter := range s.logQueue { for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime { if time.Since(filter.timeout) > filterTickerTime {
@ -126,6 +125,7 @@ done:
} }
} }
s.transactionMu.Unlock() s.transactionMu.Unlock()
s.filterManager.Unlock()
case <-s.quit: case <-s.quit:
break done break done
} }
@ -135,19 +135,24 @@ done:
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain. // NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) { func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
// protect filterManager.Add() and setting of filter fields
s.filterManager.Lock()
defer s.filterManager.Unlock()
externalId, err := newFilterId() externalId, err := newFilterId()
if err != nil { if err != nil {
return "", err return "", err
} }
s.blockMu.Lock()
filter := New(s.chainDb) filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter) id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil { if err != nil {
return "", err return "", err
} }
s.blockMu.Lock()
s.blockQueue[id] = &hashQueue{timeout: time.Now()} s.blockQueue[id] = &hashQueue{timeout: time.Now()}
s.blockMu.Unlock()
filter.BlockCallback = func(block *types.Block, logs vm.Logs) { filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock() s.blockMu.Lock()
@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
} }
} }
defer s.blockMu.Unlock()
s.filterMapMu.Lock() s.filterMapMu.Lock()
s.filterMapping[externalId] = id s.filterMapping[externalId] = id
s.filterMapMu.Unlock() s.filterMapMu.Unlock()
@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
// NewPendingTransactionFilter creates a filter that returns new pending transactions. // NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
// protect filterManager.Add() and setting of filter fields
s.filterManager.Lock()
defer s.filterManager.Unlock()
externalId, err := newFilterId() externalId, err := newFilterId()
if err != nil { if err != nil {
return "", err return "", err
} }
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
filter := New(s.chainDb) filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter) id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil { if err != nil {
return "", err return "", err
} }
s.transactionMu.Lock()
s.transactionQueue[id] = &hashQueue{timeout: time.Now()} s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
s.transactionMu.Unlock()
filter.TransactionCallback = func(tx *types.Transaction) { filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock() s.transactionMu.Lock()
@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
// newLogFilter creates a new log filter. // newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock() // protect filterManager.Add() and setting of filter fields
defer s.logMu.Unlock() s.filterManager.Lock()
defer s.filterManager.Unlock()
filter := New(s.chainDb) filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter) id, err := s.filterManager.Add(filter, LogFilter)
@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
return 0, err return 0, err
} }
s.logMu.Lock()
s.logQueue[id] = &logQueue{timeout: time.Now()} s.logQueue[id] = &logQueue{timeout: time.Now()}
s.logMu.Unlock()
filter.SetBeginBlock(earliest) filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest) filter.SetEndBlock(latest)
@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
// UninstallFilter removes the filter with the given filter id. // UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool { func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterMapMu.Lock() s.filterManager.Lock()
defer s.filterMapMu.Unlock() defer s.filterManager.Unlock()
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId] id, ok := s.filterMapping[filterId]
if !ok { if !ok {
s.filterMapMu.Unlock()
return false return false
} }
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId) delete(s.filterMapping, filterId)
s.filterMapMu.Unlock()
s.filterManager.Remove(id)
if _, ok := s.logQueue[id]; ok {
s.logMu.Lock() s.logMu.Lock()
defer s.logMu.Unlock() if _, ok := s.logQueue[id]; ok {
delete(s.logQueue, id) delete(s.logQueue, id)
s.logMu.Unlock()
return true return true
} }
if _, ok := s.blockQueue[id]; ok { s.logMu.Unlock()
s.blockMu.Lock() s.blockMu.Lock()
defer s.blockMu.Unlock() if _, ok := s.blockQueue[id]; ok {
delete(s.blockQueue, id) delete(s.blockQueue, id)
s.blockMu.Unlock()
return true return true
} }
if _, ok := s.transactionQueue[id]; ok { s.blockMu.Unlock()
s.transactionMu.Lock() s.transactionMu.Lock()
defer s.transactionMu.Unlock() if _, ok := s.transactionQueue[id]; ok {
delete(s.transactionQueue, id) delete(s.transactionQueue, id)
s.transactionMu.Unlock()
return true return true
} }
s.transactionMu.Unlock()
return false return false
} }
@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
// GetFilterLogs returns the logs for the filter with the given id. // GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId] id, ok := s.filterMapping[filterId]
s.filterMapMu.RUnlock()
if !ok { if !ok {
return toRPCLogs(nil, false) return toRPCLogs(nil, false)
} }
@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
// GetFilterChanges returns the logs for the filter with the given id since last time is was called. // GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling. // This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.Lock() s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId] id, ok := s.filterMapping[filterId]
s.filterMapMu.Unlock() s.filterMapMu.RUnlock()
if !ok { // filter not found if !ok { // filter not found
return []interface{}{} return []interface{}{}

@ -82,11 +82,20 @@ func (fs *FilterSystem) Stop() {
fs.sub.Unsubscribe() fs.sub.Unsubscribe()
} }
// Add adds a filter to the filter manager // Acquire filter system maps lock, required to force lock acquisition
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { // sequence with filterMu acquired first to avoid deadlocks by callbacks
func (fs *FilterSystem) Lock() {
fs.filterMu.Lock() fs.filterMu.Lock()
defer fs.filterMu.Unlock() }
// Release filter system maps lock
func (fs *FilterSystem) Unlock() {
fs.filterMu.Unlock()
}
// Add adds a filter to the filter manager
// Expects filterMu to be locked.
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
id := fs.filterId id := fs.filterId
filter.created = time.Now() filter.created = time.Now()
@ -110,10 +119,8 @@ func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error)
} }
// Remove removes a filter by filter id // Remove removes a filter by filter id
// Expects filterMu to be locked.
func (fs *FilterSystem) Remove(id int) { func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
delete(fs.chainFilters, id) delete(fs.chainFilters, id)
delete(fs.pendingTxFilters, id) delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id) delete(fs.logFilters, id)