Merge pull request #25651 from holiman/fix_snapqueue
eth/protocols/snap: fix problems due to idle-but-busy peers
This commit is contained in:
commit
511bf8f188
@ -2248,14 +2248,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
|
|||||||
// Whether or not the response is valid, we can mark the peer as idle and
|
// Whether or not the response is valid, we can mark the peer as idle and
|
||||||
// notify the scheduler to assign a new task. If the response is invalid,
|
// notify the scheduler to assign a new task. If the response is invalid,
|
||||||
// we'll drop the peer in a bit.
|
// we'll drop the peer in a bit.
|
||||||
|
defer func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if _, ok := s.peers[peer.ID()]; ok {
|
||||||
|
s.accountIdlers[peer.ID()] = struct{}{}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.update <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
if _, ok := s.peers[peer.ID()]; ok {
|
|
||||||
s.accountIdlers[peer.ID()] = struct{}{}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case s.update <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Ensure the response is for a valid request
|
// Ensure the response is for a valid request
|
||||||
req, ok := s.accountReqs[id]
|
req, ok := s.accountReqs[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -2360,14 +2364,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
|
|||||||
// Whether or not the response is valid, we can mark the peer as idle and
|
// Whether or not the response is valid, we can mark the peer as idle and
|
||||||
// notify the scheduler to assign a new task. If the response is invalid,
|
// notify the scheduler to assign a new task. If the response is invalid,
|
||||||
// we'll drop the peer in a bit.
|
// we'll drop the peer in a bit.
|
||||||
|
defer func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if _, ok := s.peers[peer.ID()]; ok {
|
||||||
|
s.bytecodeIdlers[peer.ID()] = struct{}{}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.update <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
if _, ok := s.peers[peer.ID()]; ok {
|
|
||||||
s.bytecodeIdlers[peer.ID()] = struct{}{}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case s.update <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Ensure the response is for a valid request
|
// Ensure the response is for a valid request
|
||||||
req, ok := s.bytecodeReqs[id]
|
req, ok := s.bytecodeReqs[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -2469,14 +2477,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
|
|||||||
// Whether or not the response is valid, we can mark the peer as idle and
|
// Whether or not the response is valid, we can mark the peer as idle and
|
||||||
// notify the scheduler to assign a new task. If the response is invalid,
|
// notify the scheduler to assign a new task. If the response is invalid,
|
||||||
// we'll drop the peer in a bit.
|
// we'll drop the peer in a bit.
|
||||||
|
defer func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if _, ok := s.peers[peer.ID()]; ok {
|
||||||
|
s.storageIdlers[peer.ID()] = struct{}{}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.update <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
if _, ok := s.peers[peer.ID()]; ok {
|
|
||||||
s.storageIdlers[peer.ID()] = struct{}{}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case s.update <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Ensure the response is for a valid request
|
// Ensure the response is for a valid request
|
||||||
req, ok := s.storageReqs[id]
|
req, ok := s.storageReqs[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -2596,14 +2608,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
|||||||
// Whether or not the response is valid, we can mark the peer as idle and
|
// Whether or not the response is valid, we can mark the peer as idle and
|
||||||
// notify the scheduler to assign a new task. If the response is invalid,
|
// notify the scheduler to assign a new task. If the response is invalid,
|
||||||
// we'll drop the peer in a bit.
|
// we'll drop the peer in a bit.
|
||||||
|
defer func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if _, ok := s.peers[peer.ID()]; ok {
|
||||||
|
s.trienodeHealIdlers[peer.ID()] = struct{}{}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.update <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
if _, ok := s.peers[peer.ID()]; ok {
|
|
||||||
s.trienodeHealIdlers[peer.ID()] = struct{}{}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case s.update <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Ensure the response is for a valid request
|
// Ensure the response is for a valid request
|
||||||
req, ok := s.trienodeHealReqs[id]
|
req, ok := s.trienodeHealReqs[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -2691,14 +2707,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
|
|||||||
// Whether or not the response is valid, we can mark the peer as idle and
|
// Whether or not the response is valid, we can mark the peer as idle and
|
||||||
// notify the scheduler to assign a new task. If the response is invalid,
|
// notify the scheduler to assign a new task. If the response is invalid,
|
||||||
// we'll drop the peer in a bit.
|
// we'll drop the peer in a bit.
|
||||||
|
defer func() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if _, ok := s.peers[peer.ID()]; ok {
|
||||||
|
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case s.update <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
if _, ok := s.peers[peer.ID()]; ok {
|
|
||||||
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case s.update <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Ensure the response is for a valid request
|
// Ensure the response is for a valid request
|
||||||
req, ok := s.bytecodeHealReqs[id]
|
req, ok := s.bytecodeHealReqs[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Loading…
Reference in New Issue
Block a user