eth, eth/downloader: make synchronize thread safe
This commit is contained in:
parent
43901c9282
commit
9d188f73b5
@ -68,8 +68,7 @@ type Downloader struct {
|
|||||||
getBlock getBlockFn
|
getBlock getBlockFn
|
||||||
|
|
||||||
// Status
|
// Status
|
||||||
fetchingHashes int32
|
synchronizing int32
|
||||||
downloadingBlocks int32
|
|
||||||
|
|
||||||
// Channels
|
// Channels
|
||||||
newPeerCh chan *peer
|
newPeerCh chan *peer
|
||||||
@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) {
|
|||||||
delete(d.peers, id)
|
delete(d.peers, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given
|
// Synchronize will select the peer and use it for synchronizing. If an empty string is given
|
||||||
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
||||||
// checks fail an error will be returned. This method is synchronous
|
// checks fail an error will be returned. This method is synchronous
|
||||||
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
func (d *Downloader) Synchronize(id string, hash common.Hash) error {
|
||||||
// Make sure it's doing neither. Once done we can restart the
|
// Make sure only one goroutine is ever allowed past this point at once
|
||||||
// downloading process if the TD is higher. For now just get on
|
if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
|
||||||
// with whatever is going on. This prevents unnecessary switching.
|
return nil
|
||||||
if d.isBusy() {
|
|
||||||
return errBusy
|
|
||||||
}
|
}
|
||||||
|
defer atomic.StoreInt32(&d.synchronizing, 0)
|
||||||
|
|
||||||
// When a synchronization attempt is made while the queue still
|
// Abort if the queue still contains some leftover data
|
||||||
// contains items we abort the sync attempt
|
if _, cached := d.queue.Size(); cached > 0 {
|
||||||
if done, pend := d.queue.Size(); done+pend > 0 {
|
|
||||||
return errPendingQueue
|
return errPendingQueue
|
||||||
}
|
}
|
||||||
|
// Retrieve the origin peer and initiate the downloading process
|
||||||
// Fetch the peer using the id or throw an error if the peer couldn't be found
|
|
||||||
p := d.peers[id]
|
p := d.peers[id]
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return errUnknownPeer
|
return errUnknownPeer
|
||||||
}
|
}
|
||||||
|
return d.getFromPeer(p, hash, false)
|
||||||
// Get the hash from the peer and initiate the downloading progress.
|
|
||||||
err := d.getFromPeer(p, hash, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Done lets the downloader know that whatever previous hashes were taken
|
|
||||||
// are processed. If the block count reaches zero and done is called
|
|
||||||
// we reset the queue for the next batch of incoming hashes and blocks.
|
|
||||||
func (d *Downloader) Done() {
|
|
||||||
d.queue.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
|
// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
|
||||||
@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
|
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
|
||||||
|
|
||||||
d.activePeer = p.id
|
d.activePeer = p.id
|
||||||
defer func() {
|
defer func() {
|
||||||
// reset on error
|
// reset on error
|
||||||
@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
|
||||||
// Start the fetcher. This will block the update entirely
|
// Start the fetcher. This will block the update entirely
|
||||||
// interupts need to be send to the appropriate channels
|
// interupts need to be send to the appropriate channels
|
||||||
// respectively.
|
// respectively.
|
||||||
@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(logger.Detail).Infoln("Sync completed")
|
glog.V(logger.Debug).Infoln("Synchronization completed")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX Make synchronous
|
// XXX Make synchronous
|
||||||
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
|
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
|
||||||
atomic.StoreInt32(&d.fetchingHashes, 1)
|
|
||||||
defer atomic.StoreInt32(&d.fetchingHashes, 0)
|
|
||||||
|
|
||||||
if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called?
|
|
||||||
return errAlreadyInPool
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
|
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@ -312,10 +288,8 @@ out:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) startFetchingBlocks(p *peer) error {
|
func (d *Downloader) startFetchingBlocks(p *peer) error {
|
||||||
glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)")
|
glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
|
||||||
|
|
||||||
atomic.StoreInt32(&d.downloadingBlocks, 1)
|
|
||||||
defer atomic.StoreInt32(&d.downloadingBlocks, 0)
|
|
||||||
// Defer the peer reset. This will empty the peer requested set
|
// Defer the peer reset. This will empty the peer requested set
|
||||||
// and makes sure there are no lingering peers with an incorrect
|
// and makes sure there are no lingering peers with an incorrect
|
||||||
// state
|
// state
|
||||||
@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) isFetchingHashes() bool {
|
|
||||||
return atomic.LoadInt32(&d.fetchingHashes) == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) isDownloadingBlocks() bool {
|
|
||||||
return atomic.LoadInt32(&d.downloadingBlocks) == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) isBusy() bool {
|
|
||||||
return d.isFetchingHashes() || d.isDownloadingBlocks()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) IsBusy() bool {
|
|
||||||
return d.isBusy()
|
|
||||||
}
|
|
||||||
|
@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
|
|||||||
|
|
||||||
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
|
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
|
||||||
dl.activePeerId = peerId
|
dl.activePeerId = peerId
|
||||||
return dl.downloader.Synchronise(peerId, hash)
|
return dl.downloader.Synchronize(peerId, hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
|
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
|
||||||
|
@ -63,16 +63,6 @@ func (q *queue) Reset() {
|
|||||||
q.blockCache = nil
|
q.blockCache = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Done checks if all the downloads have been retrieved, wiping the queue.
|
|
||||||
func (q *queue) Done() {
|
|
||||||
q.lock.Lock()
|
|
||||||
defer q.lock.Unlock()
|
|
||||||
|
|
||||||
if len(q.blockCache) == 0 {
|
|
||||||
q.Reset()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size retrieves the number of hashes in the queue, returning separately for
|
// Size retrieves the number of hashes in the queue, returning separately for
|
||||||
// pending and already downloaded.
|
// pending and already downloaded.
|
||||||
func (q *queue) Size() (int, int) {
|
func (q *queue) Size() (int, int) {
|
||||||
|
@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
|
|
||||||
// Attempt to insert the newly received by checking if the parent exists.
|
// Attempt to insert the newly received by checking if the parent exists.
|
||||||
// if the parent exists we process the block and propagate to our peers
|
// if the parent exists we process the block and propagate to our peers
|
||||||
// otherwise synchronise with the peer
|
// otherwise synchronize with the peer
|
||||||
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
||||||
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
||||||
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
|
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
|
||||||
@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
}
|
}
|
||||||
self.BroadcastBlock(hash, request.Block)
|
self.BroadcastBlock(hash, request.Block)
|
||||||
} else {
|
} else {
|
||||||
go self.synchronise(p)
|
go self.synchronize(p)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
||||||
|
16
eth/sync.go
16
eth/sync.go
@ -32,14 +32,14 @@ func (pm *ProtocolManager) update() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
itimer.Stop()
|
itimer.Stop()
|
||||||
go pm.synchronise(peer)
|
go pm.synchronize(peer)
|
||||||
case <-itimer.C:
|
case <-itimer.C:
|
||||||
// The timer will make sure that the downloader keeps an active state
|
// The timer will make sure that the downloader keeps an active state
|
||||||
// in which it attempts to always check the network for highest td peers
|
// in which it attempts to always check the network for highest td peers
|
||||||
// Either select the peer or restart the timer if no peers could
|
// Either select the peer or restart the timer if no peers could
|
||||||
// be selected.
|
// be selected.
|
||||||
if peer := getBestPeer(pm.peers); peer != nil {
|
if peer := getBestPeer(pm.peers); peer != nil {
|
||||||
go pm.synchronise(peer)
|
go pm.synchronize(peer)
|
||||||
} else {
|
} else {
|
||||||
itimer.Reset(5 * time.Second)
|
itimer.Reset(5 * time.Second)
|
||||||
}
|
}
|
||||||
@ -63,7 +63,6 @@ func (pm *ProtocolManager) processBlocks() error {
|
|||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
defer pm.downloader.Done()
|
|
||||||
|
|
||||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
||||||
|
|
||||||
@ -78,26 +77,19 @@ func (pm *ProtocolManager) processBlocks() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) synchronise(peer *peer) {
|
func (pm *ProtocolManager) synchronize(peer *peer) {
|
||||||
// Make sure the peer's TD is higher than our own. If not drop.
|
// Make sure the peer's TD is higher than our own. If not drop.
|
||||||
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
|
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Check downloader if it's busy so it doesn't show the sync message
|
|
||||||
// for every attempty
|
|
||||||
if pm.downloader.IsBusy() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME if we have the hash in our chain and the TD of the peer is
|
// FIXME if we have the hash in our chain and the TD of the peer is
|
||||||
// much higher than ours, something is wrong with us or the peer.
|
// much higher than ours, something is wrong with us or the peer.
|
||||||
// Check if the hash is on our own chain
|
// Check if the hash is on our own chain
|
||||||
if pm.chainman.HasBlock(peer.recentHash) {
|
if pm.chainman.HasBlock(peer.recentHash) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the hashes from the peer (synchronously)
|
// Get the hashes from the peer (synchronously)
|
||||||
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
err := pm.downloader.Synchronize(peer.id, peer.recentHash)
|
||||||
if err != nil && err == downloader.ErrBadPeer {
|
if err != nil && err == downloader.ErrBadPeer {
|
||||||
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
|
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
|
||||||
pm.removePeer(peer)
|
pm.removePeer(peer)
|
||||||
|
Loading…
Reference in New Issue
Block a user