eth: better active protocol handler tracking (#27665)
Fixes an issue where waitgroups were used erroneously, which could lead to waitgroup being added to while wait was already invoked.
This commit is contained in:
parent
5c9cbc218a
commit
e1fd3d67e5
@ -124,7 +124,9 @@ type handler struct {
|
|||||||
|
|
||||||
chainSync *chainSyncer
|
chainSync *chainSyncer
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
peerWG sync.WaitGroup
|
|
||||||
|
handlerStartCh chan struct{}
|
||||||
|
handlerDoneCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHandler returns a handler for all Ethereum chain management protocol.
|
// newHandler returns a handler for all Ethereum chain management protocol.
|
||||||
@ -144,6 +146,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
merger: config.Merger,
|
merger: config.Merger,
|
||||||
requiredBlocks: config.RequiredBlocks,
|
requiredBlocks: config.RequiredBlocks,
|
||||||
quitSync: make(chan struct{}),
|
quitSync: make(chan struct{}),
|
||||||
|
handlerDoneCh: make(chan struct{}),
|
||||||
|
handlerStartCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if config.Sync == downloader.FullSync {
|
if config.Sync == downloader.FullSync {
|
||||||
// The database seems empty as the current block is the genesis. Yet the snap
|
// The database seems empty as the current block is the genesis. Yet the snap
|
||||||
@ -289,9 +293,50 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// protoTracker tracks the number of active protocol handlers.
|
||||||
|
func (h *handler) protoTracker() {
|
||||||
|
defer h.wg.Done()
|
||||||
|
var active int
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-h.handlerStartCh:
|
||||||
|
active++
|
||||||
|
case <-h.handlerDoneCh:
|
||||||
|
active--
|
||||||
|
case <-h.quitSync:
|
||||||
|
// Wait for all active handlers to finish.
|
||||||
|
for ; active > 0; active-- {
|
||||||
|
<-h.handlerDoneCh
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// incHandlers signals to increment the number of active handlers if not
|
||||||
|
// quitting.
|
||||||
|
func (h *handler) incHandlers() bool {
|
||||||
|
select {
|
||||||
|
case h.handlerStartCh <- struct{}{}:
|
||||||
|
return true
|
||||||
|
case <-h.quitSync:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// decHandlers signals to decrement the number of active handlers.
|
||||||
|
func (h *handler) decHandlers() {
|
||||||
|
h.handlerDoneCh <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
|
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
|
||||||
// various subsystems and starts handling messages.
|
// various subsystems and starts handling messages.
|
||||||
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
||||||
|
if !h.incHandlers() {
|
||||||
|
return p2p.DiscQuitting
|
||||||
|
}
|
||||||
|
defer h.decHandlers()
|
||||||
|
|
||||||
// If the peer has a `snap` extension, wait for it to connect so we can have
|
// If the peer has a `snap` extension, wait for it to connect so we can have
|
||||||
// a uniform initialization/teardown mechanism
|
// a uniform initialization/teardown mechanism
|
||||||
snap, err := h.peers.waitSnapExtension(peer)
|
snap, err := h.peers.waitSnapExtension(peer)
|
||||||
@ -299,12 +344,6 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
|||||||
peer.Log().Error("Snapshot extension barrier failed", "err", err)
|
peer.Log().Error("Snapshot extension barrier failed", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO(karalabe): Not sure why this is needed
|
|
||||||
if !h.chainSync.handlePeerEvent(peer) {
|
|
||||||
return p2p.DiscQuitting
|
|
||||||
}
|
|
||||||
h.peerWG.Add(1)
|
|
||||||
defer h.peerWG.Done()
|
|
||||||
|
|
||||||
// Execute the Ethereum handshake
|
// Execute the Ethereum handshake
|
||||||
var (
|
var (
|
||||||
@ -360,7 +399,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.chainSync.handlePeerEvent(peer)
|
h.chainSync.handlePeerEvent()
|
||||||
|
|
||||||
// Propagate existing transactions. new transactions appearing
|
// Propagate existing transactions. new transactions appearing
|
||||||
// after this will be sent via broadcasts.
|
// after this will be sent via broadcasts.
|
||||||
@ -421,8 +460,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
|||||||
// `eth`, all subsystem registrations and lifecycle management will be done by
|
// `eth`, all subsystem registrations and lifecycle management will be done by
|
||||||
// the main `eth` handler to prevent strange races.
|
// the main `eth` handler to prevent strange races.
|
||||||
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
|
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
|
||||||
h.peerWG.Add(1)
|
if !h.incHandlers() {
|
||||||
defer h.peerWG.Done()
|
return p2p.DiscQuitting
|
||||||
|
}
|
||||||
|
defer h.decHandlers()
|
||||||
|
|
||||||
if err := h.peers.registerSnapExtension(peer); err != nil {
|
if err := h.peers.registerSnapExtension(peer); err != nil {
|
||||||
if metrics.Enabled {
|
if metrics.Enabled {
|
||||||
@ -494,6 +535,10 @@ func (h *handler) Start(maxPeers int) {
|
|||||||
// start sync handlers
|
// start sync handlers
|
||||||
h.wg.Add(1)
|
h.wg.Add(1)
|
||||||
go h.chainSync.loop()
|
go h.chainSync.loop()
|
||||||
|
|
||||||
|
// start peer handler tracker
|
||||||
|
h.wg.Add(1)
|
||||||
|
go h.protoTracker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) Stop() {
|
func (h *handler) Stop() {
|
||||||
@ -503,14 +548,13 @@ func (h *handler) Stop() {
|
|||||||
// Quit chainSync and txsync64.
|
// Quit chainSync and txsync64.
|
||||||
// After this is done, no new peers will be accepted.
|
// After this is done, no new peers will be accepted.
|
||||||
close(h.quitSync)
|
close(h.quitSync)
|
||||||
h.wg.Wait()
|
|
||||||
|
|
||||||
// Disconnect existing sessions.
|
// Disconnect existing sessions.
|
||||||
// This also closes the gate for any new registrations on the peer set.
|
// This also closes the gate for any new registrations on the peer set.
|
||||||
// sessions which are already established but not added to h.peers yet
|
// sessions which are already established but not added to h.peers yet
|
||||||
// will exit when they try to register.
|
// will exit when they try to register.
|
||||||
h.peers.close()
|
h.peers.close()
|
||||||
h.peerWG.Wait()
|
h.wg.Wait()
|
||||||
|
|
||||||
log.Info("Ethereum protocol stopped")
|
log.Info("Ethereum protocol stopped")
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
|
|||||||
// Update the peer's total difficulty if better than the previous
|
// Update the peer's total difficulty if better than the previous
|
||||||
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
|
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
|
||||||
peer.SetHead(trueHead, trueTD)
|
peer.SetHead(trueHead, trueTD)
|
||||||
h.chainSync.handlePeerEvent(peer)
|
h.chainSync.handlePeerEvent()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ func newChainSyncer(handler *handler) *chainSyncer {
|
|||||||
// handlePeerEvent notifies the syncer about a change in the peer set.
|
// handlePeerEvent notifies the syncer about a change in the peer set.
|
||||||
// This is called for new peers and every time a peer announces a new
|
// This is called for new peers and every time a peer announces a new
|
||||||
// chain head.
|
// chain head.
|
||||||
func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool {
|
func (cs *chainSyncer) handlePeerEvent() bool {
|
||||||
select {
|
select {
|
||||||
case cs.peerEventCh <- struct{}{}:
|
case cs.peerEventCh <- struct{}{}:
|
||||||
return true
|
return true
|
||||||
|
Loading…
Reference in New Issue
Block a user