From 62d8022b513772666d92c3ba45312b61c403fea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Tue, 16 Mar 2021 12:53:54 +0100 Subject: [PATCH] les: fix UDP connection query (#22451) This PR fixes multiple issues with the UDP connection pre-negotiation feature: - the enable condition was wrong (it checked the existence of the DiscV5 struct where it wasn't initialized yet, disabling the feature even if discv5 was enabled) - the server pool queried already connected nodes when the discovery iterators returned them again - servers responded positively before they were synced and really willing to accept connections Metrics are also added on the server side that count the positive and negative replies to served connection queries. --- les/client.go | 18 ++++-- les/clientpool.go | 18 +++++- les/clientpool_test.go | 24 ++++---- les/enr_entry.go | 5 +- les/metrics.go | 10 +-- les/server.go | 2 +- les/test_helper.go | 6 +- les/vflux/client/serverpool.go | 107 ++++++++++++++++++--------------- 8 files changed, 112 insertions(+), 78 deletions(-) diff --git a/les/client.go b/les/client.go index a557e7ccb3..99f79bb20c 100644 --- a/les/client.go +++ b/les/client.go @@ -73,8 +73,9 @@ type LightEthereum struct { accountManager *accounts.Manager netRPCService *ethapi.PublicNetAPI - p2pServer *p2p.Server - p2pConfig *p2p.Config + p2pServer *p2p.Server + p2pConfig *p2p.Config + udpEnabled bool } // New creates an instance of the light client. @@ -113,10 +114,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), p2pServer: stack.Server(), p2pConfig: &stack.Config().P2P, + udpEnabled: stack.Config().P2P.DiscoveryV5, } var prenegQuery vfc.QueryFunc - if leth.p2pServer.DiscV5 != nil { + if leth.udpEnabled { prenegQuery = leth.prenegQuery } leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList) @@ -198,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { // VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies { - if s.p2pServer.DiscV5 == nil { + if !s.udpEnabled { return nil } reqsEnc, _ := rlp.EncodeToBytes(&reqs) @@ -215,7 +217,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R func (s *LightEthereum) vfxVersion(n *enode.Node) uint { if n.Seq() == 0 { var err error - if s.p2pServer.DiscV5 == nil { + if !s.udpEnabled { return 0 } if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 { @@ -346,7 +348,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { func (s *LightEthereum) Start() error { log.Warn("Light client mode is an experimental feature") - discovery, err := s.setupDiscovery(s.p2pConfig) + if s.udpEnabled && s.p2pServer.DiscV5 == nil { + s.udpEnabled = false + log.Error("Discovery v5 is not initialized") + } + discovery, err := s.setupDiscovery() if err != nil { return err } diff --git a/les/clientpool.go b/les/clientpool.go index 1aa63a281e..3965d54508 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -72,6 +72,7 @@ type clientPool struct { clock mclock.Clock closed bool removePeer func(enode.ID) + synced func() bool ns *nodestate.NodeStateMachine pp *vfs.PriorityPool bt *vfs.BalanceTracker @@ -107,7 +108,7 @@ type clientInfo struct { } // newClientPool creates a new client pool -func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool { +func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool { pool := &clientPool{ ns: ns, BalanceTrackerSetup: balanceTrackerSetup, @@ -116,6 +117,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap minCap: minCap, connectedBias: connectedBias, removePeer: removePeer, + synced: synced, } pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{}) pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4) @@ -396,6 +398,13 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen { return nil } + result := make(vflux.CapacityQueryReply, len(req.AddTokens)) + if !f.synced() { + capacityQueryZeroMeter.Mark(1) + reply, _ := rlp.EncodeToBytes(&result) + return reply + } + node := f.ns.GetNode(id) if node == nil { node = enode.SignNull(&enr.Record{}, id) @@ -416,7 +425,6 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by } // use vfs.CapacityCurve to answer request for multiple newly bought token amounts curve := f.pp.GetCapacityCurve().Exclude(id) - result := make(vflux.CapacityQueryReply, len(req.AddTokens)) bias := time.Second * time.Duration(req.Bias) if f.connectedBias > bias { bias = f.connectedBias @@ -434,6 +442,12 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by result[i] = 0 } } + // add first result to metrics (don't care about priority client multi-queries yet) + if result[0] == 0 { + capacityQueryZeroMeter.Mark(1) + } else { + capacityQueryNonZeroMeter.Mark(1) + } reply, _ := rlp.EncodeToBytes(&result) return reply } diff --git a/les/clientpool_test.go b/les/clientpool_test.go index 345b373b0f..2aee444545 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -133,7 +133,7 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando disconnFn = func(id enode.ID) { disconnCh <- int(id[0]) + int(id[1])<<8 } - pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn) + pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn, alwaysTrueFn) ) pool.ns.Start() @@ -239,7 +239,7 @@ func TestConnectPaidClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) @@ -255,7 +255,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -274,7 +274,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -304,7 +304,7 @@ func TestPaidClientKickedOut(t *testing.T) { removeFn := func(id enode.ID) { kickedCh <- int(id[0]) } - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() pool.bt.SetExpirationTCs(0, 0) defer pool.stop() @@ -335,7 +335,7 @@ func TestConnectFreeClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) @@ -352,7 +352,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -382,7 +382,7 @@ func TestFreeClientKickedOut(t *testing.T) { kicked = make(chan int, 100) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -424,7 +424,7 @@ func TestPositiveBalanceCalculation(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -448,7 +448,7 @@ func TestDowngradePriorityClient(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -483,7 +483,7 @@ func TestNegativeBalanceCalculation(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -521,7 +521,7 @@ func TestInactiveClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(2, uint64(2)) diff --git a/les/enr_entry.go b/les/enr_entry.go index 8be4a7a00e..892c3530d3 100644 --- a/les/enr_entry.go +++ b/les/enr_entry.go @@ -18,7 +18,6 @@ package les import ( "github.com/ethereum/go-ethereum/core/forkid" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/dnsdisc" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" @@ -42,7 +41,7 @@ type ethEntry struct { func (ethEntry) ENRKey() string { return "eth" } // setupDiscovery creates the node discovery source for the eth protocol. -func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) { +func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) { it := enode.NewFairMix(0) // Enable DNS discovery. @@ -56,7 +55,7 @@ func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error } // Enable DHT. - if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil { + if eth.udpEnabled { it.AddSource(eth.p2pServer.DiscV5.RandomNodes()) } diff --git a/les/metrics.go b/les/metrics.go index 9a79fd1bbd..5a8d4bbe02 100644 --- a/les/metrics.go +++ b/les/metrics.go @@ -73,10 +73,12 @@ var ( serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil) clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil) - totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) - totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) - totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) - blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) + totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) + totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) + blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + capacityQueryZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryZero", nil) + capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil) requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil) requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil) diff --git a/les/server.go b/les/server.go index 9627d65afa..0351bdd801 100644 --- a/les/server.go +++ b/les/server.go @@ -149,7 +149,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les srv.maxCapacity = totalRecharge } srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2) - srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient) + srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync) srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}) checkpoint := srv.latestLocalCheckpoint() diff --git a/les/test_helper.go b/les/test_helper.go index e1d3beb6a1..e49bfc8738 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -307,7 +307,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da } server.costTracker, server.minCapacity = newCostTracker(db, server.config) server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism. - server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}) + server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn) server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true }) if server.oracle != nil { @@ -319,6 +319,10 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da return server.handler, simulation } +func alwaysTrueFn() bool { + return true +} + // testPeer is a simulated peer to allow testing direct network calls. type testPeer struct { cpeer *clientPeer diff --git a/les/vflux/client/serverpool.go b/les/vflux/client/serverpool.go index e73b277ced..cc0254c127 100644 --- a/les/vflux/client/serverpool.go +++ b/les/vflux/client/serverpool.go @@ -47,7 +47,8 @@ const ( nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set minRedialWait = 10 // minimum redial wait time in seconds preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries - maxQueryFails = 100 // number of consecutive UDP query failures before we print a warning + warnQueryFails = 20 // number of consecutive UDP query failures before we print a warning + maxQueryFails = 100 // number of consecutive UDP query failures when then chance of skipping a query reaches 50% ) // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered @@ -94,16 +95,16 @@ type nodeHistoryEnc struct { type QueryFunc func(*enode.Node) int var ( - clientSetup = &nodestate.Setup{Version: 2} - sfHasValue = clientSetup.NewPersistentFlag("hasValue") - sfQueried = clientSetup.NewFlag("queried") - sfCanDial = clientSetup.NewFlag("canDial") - sfDialing = clientSetup.NewFlag("dialed") - sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout") - sfConnected = clientSetup.NewFlag("connected") - sfRedialWait = clientSetup.NewFlag("redialWait") - sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect") - sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialing, sfConnected, sfRedialWait) + clientSetup = &nodestate.Setup{Version: 2} + sfHasValue = clientSetup.NewPersistentFlag("hasValue") + sfQuery = clientSetup.NewFlag("query") + sfCanDial = clientSetup.NewFlag("canDial") + sfDialing = clientSetup.NewFlag("dialed") + sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout") + sfConnected = clientSetup.NewFlag("connected") + sfRedialWait = clientSetup.NewFlag("redialWait") + sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect") + sfDialProcess = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait) sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}), func(field interface{}) ([]byte, error) { @@ -162,8 +163,8 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio } s.recalTimeout() s.mixer = enode.NewFairMix(mixTimeout) - knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDisableSelection, sfiNodeWeight) - alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil) + knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight) + alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil) s.mixSources = append(s.mixSources, knownSelector) s.mixSources = append(s.mixSources, alwaysConnect) @@ -226,7 +227,7 @@ func (s *ServerPool) AddMetrics( s.totalValueGauge = totalValueGauge s.sessionValueMeter = sessionValueMeter if serverSelectableGauge != nil { - s.ns.AddLogMetrics(sfHasValue, sfDisableSelection, "selectable", nil, nil, serverSelectableGauge) + s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge) } if serverDialedMeter != nil { s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil) @@ -247,43 +248,51 @@ func (s *ServerPool) AddSource(source enode.Iterator) { // Nodes that are filtered out and does not appear on the output iterator are put back // into redialWait state. func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator { - s.fillSet = NewFillSet(s.ns, input, sfQueried) - s.ns.SubscribeState(sfQueried, func(n *enode.Node, oldState, newState nodestate.Flags) { - if newState.Equals(sfQueried) { - fails := atomic.LoadUint32(&s.queryFails) - if fails == maxQueryFails { - log.Warn("UDP pre-negotiation query does not seem to work") + s.fillSet = NewFillSet(s.ns, input, sfQuery) + s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) { + if !newState.Equals(sfQuery) { + if newState.HasAll(sfQuery) { + // remove query flag if the node is already somewhere in the dial process + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) } - if fails > maxQueryFails { - fails = maxQueryFails - } - if rand.Intn(maxQueryFails*2) < int(fails) { - // skip pre-negotiation with increasing chance, max 50% - // this ensures that the client can operate even if UDP is not working at all - s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) - // set canDial before resetting queried so that FillSet will not read more - // candidates unnecessarily - s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0) - return - } - go func() { - q := query(n) - if q == -1 { - atomic.AddUint32(&s.queryFails, 1) - } else { - atomic.StoreUint32(&s.queryFails, 0) - } - s.ns.Operation(func() { - // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait - if q == 1 { - s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) - } else { - s.setRedialWait(n, queryCost, queryWaitStep) - } - s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0) - }) - }() + return } + fails := atomic.LoadUint32(&s.queryFails) + failMax := fails + if failMax > maxQueryFails { + failMax = maxQueryFails + } + if rand.Intn(maxQueryFails*2) < int(failMax) { + // skip pre-negotiation with increasing chance, max 50% + // this ensures that the client can operate even if UDP is not working at all + s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) + // set canDial before resetting queried so that FillSet will not read more + // candidates unnecessarily + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) + return + } + go func() { + q := query(n) + if q == -1 { + atomic.AddUint32(&s.queryFails, 1) + fails++ + if fails%warnQueryFails == 0 { + // warn if a large number of consecutive queries have failed + log.Warn("UDP connection queries failed", "count", fails) + } + } else { + atomic.StoreUint32(&s.queryFails, 0) + } + s.ns.Operation(func() { + // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait + if q == 1 { + s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) + } else { + s.setRedialWait(n, queryCost, queryWaitStep) + } + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) + }) + }() }) return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) { if waiting {