eth/downloader: always reenter processing if not exiting

This commit is contained in:
Péter Szilágyi 2015-06-18 00:26:54 +03:00
parent 2f4cbe22f5
commit 55dd8fd621
2 changed files with 37 additions and 39 deletions

@ -48,7 +48,6 @@ var (
errCrossCheckFailed = errors.New("block cross-check failed") errCrossCheckFailed = errors.New("block cross-check failed")
errCancelHashFetch = errors.New("hash fetching canceled (requested)") errCancelHashFetch = errors.New("hash fetching canceled (requested)")
errCancelBlockFetch = errors.New("block downloading canceled (requested)") errCancelBlockFetch = errors.New("block downloading canceled (requested)")
errCancelChainImport = errors.New("chain importing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
) )
@ -719,7 +718,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
// between these state changes, a block may have arrived, but a processing // between these state changes, a block may have arrived, but a processing
// attempt denied, so we need to re-enter to ensure the block isn't left // attempt denied, so we need to re-enter to ensure the block isn't left
// to idle in the cache. // to idle in the cache.
func (d *Downloader) process() (err error) { func (d *Downloader) process() {
// Make sure only one goroutine is ever allowed to process blocks at once // Make sure only one goroutine is ever allowed to process blocks at once
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
return return
@ -729,8 +728,8 @@ func (d *Downloader) process() (err error) {
// the fresh blocks might have been rejected entry to to this present thread // the fresh blocks might have been rejected entry to to this present thread
// not yet releasing the `processing` state. // not yet releasing the `processing` state.
defer func() { defer func() {
if err == nil && d.queue.GetHeadBlock() != nil { if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
err = d.process() d.process()
} }
}() }()
// Release the lock upon exit (note, before checking for reentry!), and set // Release the lock upon exit (note, before checking for reentry!), and set
@ -748,7 +747,7 @@ func (d *Downloader) process() (err error) {
// Fetch the next batch of blocks // Fetch the next batch of blocks
blocks := d.queue.TakeBlocks() blocks := d.queue.TakeBlocks()
if len(blocks) == 0 { if len(blocks) == 0 {
return nil return
} }
// Reset the import statistics // Reset the import statistics
d.importLock.Lock() d.importLock.Lock()
@ -762,7 +761,7 @@ func (d *Downloader) process() (err error) {
for len(blocks) != 0 { for len(blocks) != 0 {
// Check for any termination requests // Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 { if atomic.LoadInt32(&d.interrupt) == 1 {
return errCancelChainImport return
} }
// Retrieve the first batch of blocks to insert // Retrieve the first batch of blocks to insert
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
@ -776,7 +775,7 @@ func (d *Downloader) process() (err error) {
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
d.dropPeer(blocks[index].OriginPeer) d.dropPeer(blocks[index].OriginPeer)
d.cancel() d.cancel()
return errCancelChainImport return
} }
blocks = blocks[max:] blocks = blocks[max:]
} }

@ -764,7 +764,6 @@ func TestHashAttackerDropping(t *testing.T) {
{errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
{errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()