p2p/discover: fix race involving the seed node iterator

nodeDB.querySeeds was not safe for concurrent use but could be called
concurrenty on multiple goroutines in the following case:

- the table was empty
- a timed refresh started
- a lookup was started and initiated refresh

These conditions are unlikely to coincide during normal use, but are
much more likely to occur all at once when the user's machine just woke
from sleep. The root cause of the issue is that querySeeds reused the
same leveldb iterator until it was exhausted.

This commit moves the refresh scheduling logic into its own goroutine
(so only one refresh is ever active) and changes querySeeds to not use
a persistent iterator. The seed node selection is now more random and
ignores nodes that have not been contacted in the last 5 days.
This commit is contained in:
Felix Lange 2015-09-30 05:01:49 +02:00
parent 7977e87ce1
commit b4374436f3
5 changed files with 204 additions and 178 deletions

@ -21,6 +21,7 @@ package discover
import ( import (
"bytes" "bytes"
"crypto/rand"
"encoding/binary" "encoding/binary"
"os" "os"
"sync" "sync"
@ -46,11 +47,8 @@ var (
// nodeDB stores all nodes we know about. // nodeDB stores all nodes we know about.
type nodeDB struct { type nodeDB struct {
lvl *leveldb.DB // Interface to the database itself lvl *leveldb.DB // Interface to the database itself
seeder iterator.Iterator // Iterator for fetching possible seed nodes self NodeID // Own node id to prevent adding it into the database
self NodeID // Own node id to prevent adding it into the database
runner sync.Once // Ensures we can start at most one expirer runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop quit chan struct{} // Channel to signal the expiring thread to stop
} }
@ -302,52 +300,70 @@ func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails)) return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
} }
// querySeeds retrieves a batch of nodes to be used as potential seed servers // querySeeds retrieves random nodes to be used as potential seed nodes
// during bootstrapping the node into the network. // for bootstrapping.
// func (db *nodeDB) querySeeds(n int, maxAge time.Duration) []*Node {
// Ideal seeds are the most recently seen nodes (highest probability to be still var (
// alive), but yet untried. However, since leveldb only supports dumb iteration now = time.Now()
// we will instead start pulling in potential seeds that haven't been yet pinged nodes = make([]*Node, 0, n)
// since the start of the boot procedure. it = db.lvl.NewIterator(nil, nil)
// id NodeID
// If the database runs out of potential seeds, we restart the startup counter )
// and start iterating over the peers again. defer it.Release()
func (db *nodeDB) querySeeds(n int) []*Node {
// Create a new seed iterator if none exists seek:
if db.seeder == nil { for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
db.seeder = db.lvl.NewIterator(nil, nil) // Seek to a random entry. The first byte is incremented by a
} // random amount each time in order to increase the likelihood
// Iterate over the nodes and find suitable seeds // of hitting all existing nodes in very small databases.
nodes := make([]*Node, 0, n) ctr := id[0]
for len(nodes) < n && db.seeder.Next() { rand.Read(id[:])
// Iterate until a discovery node is found id[0] = ctr + id[0]%16
id, field := splitKey(db.seeder.Key()) it.Seek(makeKey(id, nodeDBDiscoverRoot))
if field != nodeDBDiscoverRoot {
continue n := nextNode(it)
if n == nil {
id[0] = 0
continue seek // iterator exhausted
} }
// Dump it if its a self reference if n.ID == db.self {
if bytes.Compare(id[:], db.self[:]) == 0 { continue seek
db.deleteNode(id)
continue
} }
// Load it as a potential seed if now.Sub(db.lastPong(n.ID)) > maxAge {
if node := db.node(id); node != nil { continue seek
nodes = append(nodes, node)
} }
} for i := range nodes {
// Release the iterator if we reached the end if nodes[i].ID == n.ID {
if len(nodes) == 0 { continue seek // duplicate
db.seeder.Release() }
db.seeder = nil }
nodes = append(nodes, n)
} }
return nodes return nodes
} }
// reads the next node record from the iterator, skipping over other
// database entries.
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
var n Node
if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
if glog.V(logger.Warn) {
glog.Errorf("invalid node %x: %v", id, err)
}
continue
}
return &n
}
return nil
}
// close flushes and closes the database files. // close flushes and closes the database files.
func (db *nodeDB) close() { func (db *nodeDB) close() {
if db.seeder != nil {
db.seeder.Release()
}
close(db.quit) close(db.quit)
db.lvl.Close() db.lvl.Close()
} }

@ -162,9 +162,33 @@ var nodeDBSeedQueryNodes = []struct {
node *Node node *Node
pong time.Time pong time.Time
}{ }{
// This one should not be in the result set because its last
// pong time is too far in the past.
{ {
node: newNode( node: newNode(
MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 3},
30303,
30303,
),
pong: time.Now().Add(-3 * time.Hour),
},
// This one shouldn't be in in the result set because its
// nodeID is the local node's ID.
{
node: newNode(
MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 3},
30303,
30303,
),
pong: time.Now().Add(-4 * time.Second),
},
// These should be in the result set.
{
node: newNode(
MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 1}, net.IP{127, 0, 0, 1},
30303, 30303,
30303, 30303,
@ -173,7 +197,7 @@ var nodeDBSeedQueryNodes = []struct {
}, },
{ {
node: newNode( node: newNode(
MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 2}, net.IP{127, 0, 0, 2},
30303, 30303,
30303, 30303,
@ -182,7 +206,7 @@ var nodeDBSeedQueryNodes = []struct {
}, },
{ {
node: newNode( node: newNode(
MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 3}, net.IP{127, 0, 0, 3},
30303, 30303,
30303, 30303,
@ -192,7 +216,7 @@ var nodeDBSeedQueryNodes = []struct {
} }
func TestNodeDBSeedQuery(t *testing.T) { func TestNodeDBSeedQuery(t *testing.T) {
db, _ := newNodeDB("", Version, NodeID{}) db, _ := newNodeDB("", Version, nodeDBSeedQueryNodes[1].node.ID)
defer db.close() defer db.close()
// Insert a batch of nodes for querying // Insert a batch of nodes for querying
@ -200,20 +224,24 @@ func TestNodeDBSeedQuery(t *testing.T) {
if err := db.updateNode(seed.node); err != nil { if err := db.updateNode(seed.node); err != nil {
t.Fatalf("node %d: failed to insert: %v", i, err) t.Fatalf("node %d: failed to insert: %v", i, err)
} }
if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil {
t.Fatalf("node %d: failed to insert lastPong: %v", i, err)
}
} }
// Retrieve the entire batch and check for duplicates // Retrieve the entire batch and check for duplicates
seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) seeds := db.querySeeds(len(nodeDBSeedQueryNodes)*2, time.Hour)
if len(seeds) != len(nodeDBSeedQueryNodes) {
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes))
}
have := make(map[NodeID]struct{}) have := make(map[NodeID]struct{})
for _, seed := range seeds { for _, seed := range seeds {
have[seed.ID] = struct{}{} have[seed.ID] = struct{}{}
} }
want := make(map[NodeID]struct{}) want := make(map[NodeID]struct{})
for _, seed := range nodeDBSeedQueryNodes { for _, seed := range nodeDBSeedQueryNodes[2:] {
want[seed.node.ID] = struct{}{} want[seed.node.ID] = struct{}{}
} }
if len(seeds) != len(want) {
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(want))
}
for id, _ := range have { for id, _ := range have {
if _, ok := want[id]; !ok { if _, ok := want[id]; !ok {
t.Errorf("extra seed: %v", id) t.Errorf("extra seed: %v", id)
@ -224,63 +252,6 @@ func TestNodeDBSeedQuery(t *testing.T) {
t.Errorf("missing seed: %v", id) t.Errorf("missing seed: %v", id)
} }
} }
// Make sure the next batch is empty (seed EOF)
seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes))
if len(seeds) != 0 {
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0)
}
}
func TestNodeDBSeedQueryContinuation(t *testing.T) {
db, _ := newNodeDB("", Version, NodeID{})
defer db.close()
// Insert a batch of nodes for querying
for i, seed := range nodeDBSeedQueryNodes {
if err := db.updateNode(seed.node); err != nil {
t.Fatalf("node %d: failed to insert: %v", i, err)
}
}
// Iteratively retrieve the batch, checking for an empty batch on reset
for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
if seeds := db.querySeeds(1); len(seeds) != 1 {
t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
}
}
if seeds := db.querySeeds(1); len(seeds) != 0 {
t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0)
}
for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
if seeds := db.querySeeds(1); len(seeds) != 1 {
t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
}
}
}
func TestNodeDBSelfSeedQuery(t *testing.T) {
// Assign a node as self to verify evacuation
self := nodeDBSeedQueryNodes[0].node.ID
db, _ := newNodeDB("", Version, self)
defer db.close()
// Insert a batch of nodes for querying
for i, seed := range nodeDBSeedQueryNodes {
if err := db.updateNode(seed.node); err != nil {
t.Fatalf("node %d: failed to insert: %v", i, err)
}
}
// Retrieve the entire batch and check that self was evacuated
seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes))
if len(seeds) != len(nodeDBSeedQueryNodes)-1 {
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)-1)
}
have := make(map[NodeID]struct{})
for _, seed := range seeds {
have[seed.ID] = struct{}{}
}
if _, ok := have[self]; ok {
t.Errorf("self not evacuated")
}
} }
func TestNodeDBPersistency(t *testing.T) { func TestNodeDBPersistency(t *testing.T) {

@ -44,6 +44,10 @@ const (
maxBondingPingPongs = 16 maxBondingPingPongs = 16
maxFindnodeFailures = 5 maxFindnodeFailures = 5
autoRefreshInterval = 1 * time.Hour
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
) )
type Table struct { type Table struct {
@ -52,6 +56,10 @@ type Table struct {
nursery []*Node // bootstrap nodes nursery []*Node // bootstrap nodes
db *nodeDB // database of known nodes db *nodeDB // database of known nodes
refreshReq chan struct{}
closeReq chan struct{}
closed chan struct{}
bondmu sync.Mutex bondmu sync.Mutex
bonding map[NodeID]*bondproc bonding map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes bondslots chan struct{} // limits total number of active bonding processes
@ -93,11 +101,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
db, _ = newNodeDB("", Version, ourID) db, _ = newNodeDB("", Version, ourID)
} }
tab := &Table{ tab := &Table{
net: t, net: t,
db: db, db: db,
self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
bonding: make(map[NodeID]*bondproc), bonding: make(map[NodeID]*bondproc),
bondslots: make(chan struct{}, maxBondingPingPongs), bondslots: make(chan struct{}, maxBondingPingPongs),
refreshReq: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
} }
for i := 0; i < cap(tab.bondslots); i++ { for i := 0; i < cap(tab.bondslots); i++ {
tab.bondslots <- struct{}{} tab.bondslots <- struct{}{}
@ -105,6 +116,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
for i := range tab.buckets { for i := range tab.buckets {
tab.buckets[i] = new(bucket) tab.buckets[i] = new(bucket)
} }
go tab.refreshLoop()
return tab return tab
} }
@ -163,10 +175,12 @@ func randUint(max uint32) uint32 {
// Close terminates the network listener and flushes the node database. // Close terminates the network listener and flushes the node database.
func (tab *Table) Close() { func (tab *Table) Close() {
if tab.net != nil { select {
tab.net.close() case <-tab.closed:
// already closed.
case tab.closeReq <- struct{}{}:
<-tab.closed // wait for refreshLoop to end.
} }
tab.db.close()
} }
// Bootstrap sets the bootstrap nodes. These nodes are used to connect // Bootstrap sets the bootstrap nodes. These nodes are used to connect
@ -183,7 +197,7 @@ func (tab *Table) Bootstrap(nodes []*Node) {
tab.nursery = append(tab.nursery, &cpy) tab.nursery = append(tab.nursery, &cpy)
} }
tab.mutex.Unlock() tab.mutex.Unlock()
tab.refresh() tab.requestRefresh()
} }
// Lookup performs a network search for nodes close // Lookup performs a network search for nodes close
@ -210,9 +224,9 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
result := tab.closest(target, bucketSize) result := tab.closest(target, bucketSize)
tab.mutex.Unlock() tab.mutex.Unlock()
// If the result set is empty, all nodes were dropped, refresh // If the result set is empty, all nodes were dropped, refresh.
if len(result.entries) == 0 { if len(result.entries) == 0 {
tab.refresh() tab.requestRefresh()
return nil return nil
} }
@ -257,56 +271,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
return result.entries return result.entries
} }
// refresh performs a lookup for a random target to keep buckets full, or seeds func (tab *Table) requestRefresh() {
// the table if it is empty (initial bootstrap or discarded faulty peers). select {
func (tab *Table) refresh() { case tab.refreshReq <- struct{}{}:
seed := true case <-tab.closed:
}
}
// If the discovery table is empty, seed with previously known nodes func (tab *Table) refreshLoop() {
tab.mutex.Lock() defer func() {
for _, bucket := range tab.buckets { tab.db.close()
if len(bucket.entries) > 0 { if tab.net != nil {
seed = false tab.net.close()
break }
close(tab.closed)
}()
timer := time.NewTicker(autoRefreshInterval)
var done chan struct{}
for {
select {
case <-timer.C:
if done == nil {
done = make(chan struct{})
go tab.doRefresh(done)
}
case <-tab.refreshReq:
if done == nil {
done = make(chan struct{})
go tab.doRefresh(done)
}
case <-done:
done = nil
case <-tab.closeReq:
if done != nil {
<-done
}
return
} }
} }
}
// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a lookup with a random target instead.
var target NodeID
rand.Read(target[:])
result := tab.Lookup(target)
if len(result) > 0 {
return
}
// The table is empty. Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
seeds = tab.bondall(append(seeds, tab.nursery...))
if glog.V(logger.Debug) {
if len(seeds) == 0 {
glog.Infof("no seed nodes found")
}
for _, n := range seeds {
age := time.Since(tab.db.lastPong(n.ID))
glog.Infof("seed node (age %v): %v", age, n)
}
}
tab.mutex.Lock()
tab.stuff(seeds)
tab.mutex.Unlock() tab.mutex.Unlock()
// If the table is not empty, try to refresh using the live entries // Finally, do a self lookup to fill up the buckets.
if !seed { tab.Lookup(tab.self.ID)
// The Kademlia paper specifies that the bucket refresh should
// perform a refresh in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
//
// We perform a lookup with a random target instead.
var target NodeID
rand.Read(target[:])
result := tab.Lookup(target)
if len(result) == 0 {
// Lookup failed, seed after all
seed = true
}
}
if seed {
// Pick a batch of previously know seeds to lookup with
seeds := tab.db.querySeeds(10)
for _, seed := range seeds {
glog.V(logger.Debug).Infoln("Seeding network with", seed)
}
nodes := append(tab.nursery, seeds...)
// Bond with all the seed nodes (will pingpong only if failed recently)
bonded := tab.bondall(nodes)
if len(bonded) > 0 {
tab.Lookup(tab.self.ID)
}
// TODO: the Kademlia paper says that we're supposed to perform
// random lookups in all buckets further away than our closest neighbor.
}
} }
// closest returns the n nodes in the table that are closest to the // closest returns the n nodes in the table that are closest to the
@ -373,8 +417,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
} }
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
var result error var result error
if node == nil || fails > 0 { age := time.Since(tab.db.lastPong(id))
glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails) if node == nil || fails > 0 || age > nodeDBNodeExpiration {
glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)
tab.bondmu.Lock() tab.bondmu.Lock()
w := tab.bonding[id] w := tab.bonding[id]
@ -435,13 +480,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// ping a remote endpoint and wait for a reply, also updating the node // ping a remote endpoint and wait for a reply, also updating the node
// database accordingly. // database accordingly.
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
// Update the last ping and send the message
tab.db.updateLastPing(id, time.Now()) tab.db.updateLastPing(id, time.Now())
if err := tab.net.ping(id, addr); err != nil { if err := tab.net.ping(id, addr); err != nil {
return err return err
} }
// Pong received, update the database and return
tab.db.updateLastPong(id, time.Now()) tab.db.updateLastPong(id, time.Now())
// Start the background expiration goroutine after the first
// successful communication. Subsequent calls have no effect if it
// is already running. We do this here instead of somewhere else
// so that the search for seed nodes also considers older nodes
// that would otherwise be removed by the expiration.
tab.db.ensureExpirer() tab.db.ensureExpirer()
return nil return nil
} }

@ -514,9 +514,6 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod
if toaddr.Port == 0 { if toaddr.Port == 0 {
panic("query to node at distance 0") panic("query to node at distance 0")
} }
if target != tn.target {
panic("findnode with wrong target")
}
next := uint16(toaddr.Port) - 1 next := uint16(toaddr.Port) - 1
var result []*Node var result []*Node
for i, id := range tn.dists[toaddr.Port] { for i, id := range tn.dists[toaddr.Port] {

@ -52,8 +52,6 @@ const (
respTimeout = 500 * time.Millisecond respTimeout = 500 * time.Millisecond
sendTimeout = 500 * time.Millisecond sendTimeout = 500 * time.Millisecond
expiration = 20 * time.Second expiration = 20 * time.Second
refreshInterval = 1 * time.Hour
) )
// RPC packet types // RPC packet types
@ -312,10 +310,8 @@ func (t *udp) loop() {
plist = list.New() plist = list.New()
timeout = time.NewTimer(0) timeout = time.NewTimer(0)
nextTimeout *pending // head of plist when timeout was last reset nextTimeout *pending // head of plist when timeout was last reset
refresh = time.NewTicker(refreshInterval)
) )
<-timeout.C // ignore first timeout <-timeout.C // ignore first timeout
defer refresh.Stop()
defer timeout.Stop() defer timeout.Stop()
resetTimeout := func() { resetTimeout := func() {
@ -344,9 +340,6 @@ func (t *udp) loop() {
resetTimeout() resetTimeout()
select { select {
case <-refresh.C:
go t.refresh()
case <-t.closing: case <-t.closing:
for el := plist.Front(); el != nil; el = el.Next() { for el := plist.Front(); el != nil; el = el.Next() {
el.Value.(*pending).errc <- errClosed el.Value.(*pending).errc <- errClosed