diff --git a/cmd/devp2p/crawl.go b/cmd/devp2p/crawl.go index 0d8127684..8c9755ac1 100644 --- a/cmd/devp2p/crawl.go +++ b/cmd/devp2p/crawl.go @@ -17,6 +17,8 @@ package main import ( + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -34,6 +36,7 @@ type crawler struct { // settings revalidateInterval time.Duration + mu sync.RWMutex } const ( @@ -67,7 +70,7 @@ func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler return c } -func (c *crawler) run(timeout time.Duration) nodeSet { +func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet { var ( timeoutTimer = time.NewTimer(timeout) timeoutCh <-chan time.Time @@ -75,35 +78,51 @@ func (c *crawler) run(timeout time.Duration) nodeSet { doneCh = make(chan enode.Iterator, len(c.iters)) liveIters = len(c.iters) ) + if nthreads < 1 { + nthreads = 1 + } defer timeoutTimer.Stop() defer statusTicker.Stop() for _, it := range c.iters { go c.runIterator(doneCh, it) } - var ( - added int - updated int - skipped int - recent int - removed int + added uint64 + updated uint64 + skipped uint64 + recent uint64 + removed uint64 + wg sync.WaitGroup ) + wg.Add(nthreads) + for i := 0; i < nthreads; i++ { + go func() { + defer wg.Done() + for { + select { + case n := <-c.ch: + switch c.updateNode(n) { + case nodeSkipIncompat: + atomic.AddUint64(&skipped, 1) + case nodeSkipRecent: + atomic.AddUint64(&recent, 1) + case nodeRemoved: + atomic.AddUint64(&removed, 1) + case nodeAdded: + atomic.AddUint64(&added, 1) + default: + atomic.AddUint64(&updated, 1) + } + case <-c.closed: + return + } + } + }() + } + loop: for { select { - case n := <-c.ch: - switch c.updateNode(n) { - case nodeSkipIncompat: - skipped++ - case nodeSkipRecent: - recent++ - case nodeRemoved: - removed++ - case nodeAdded: - added++ - default: - updated++ - } case it := <-doneCh: if it == c.inputIter { // Enable timeout when we're done revalidating the input nodes. @@ -119,8 +138,11 @@ loop: break loop case <-statusTicker.C: log.Info("Crawling in progress", - "added", added, "updated", updated, "removed", removed, - "ignored(recent)", recent, "ignored(incompatible)", skipped) + "added", atomic.LoadUint64(&added), + "updated", atomic.LoadUint64(&updated), + "removed", atomic.LoadUint64(&removed), + "ignored(recent)", atomic.LoadUint64(&removed), + "ignored(incompatible)", atomic.LoadUint64(&skipped)) } } @@ -131,6 +153,7 @@ loop: for ; liveIters > 0; liveIters-- { <-doneCh } + wg.Wait() return c.output } @@ -148,7 +171,9 @@ func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) { // updateNode updates the info about the given node, and returns a status // about what changed func (c *crawler) updateNode(n *enode.Node) int { + c.mu.RLock() node, ok := c.output[n.ID()] + c.mu.RUnlock() // Skip validation of recently-seen nodes. if ok && time.Since(node.LastCheck) < c.revalidateInterval { @@ -156,10 +181,9 @@ func (c *crawler) updateNode(n *enode.Node) int { } // Request the node record. - nn, err := c.disc.RequestENR(n) - node.LastCheck = truncNow() status := nodeUpdated - if err != nil { + node.LastCheck = truncNow() + if nn, err := c.disc.RequestENR(n); err != nil { if node.Score == 0 { // Node doesn't implement EIP-868. log.Debug("Skipping node", "id", n.ID()) @@ -176,8 +200,9 @@ func (c *crawler) updateNode(n *enode.Node) int { } node.LastResponse = node.LastCheck } - // Store/update node in output set. + c.mu.Lock() + defer c.mu.Unlock() if node.Score <= 0 { log.Debug("Removing node", "id", n.ID()) delete(c.output, n.ID()) diff --git a/cmd/devp2p/discv4cmd.go b/cmd/devp2p/discv4cmd.go index 94e61c36f..631226347 100644 --- a/cmd/devp2p/discv4cmd.go +++ b/cmd/devp2p/discv4cmd.go @@ -78,7 +78,7 @@ var ( Name: "crawl", Usage: "Updates a nodes.json file with random nodes found in the DHT", Action: discv4Crawl, - Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag}), + Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}), } discv4TestCommand = &cli.Command{ Name: "test", @@ -120,6 +120,11 @@ var ( Usage: "Time limit for the crawl.", Value: 30 * time.Minute, } + crawlParallelismFlag = &cli.IntFlag{ + Name: "parallel", + Usage: "How many parallel discoveries to attempt.", + Value: 16, + } remoteEnodeFlag = &cli.StringFlag{ Name: "remote", Usage: "Enode of the remote node under test", @@ -195,7 +200,7 @@ func discv4ResolveJSON(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs)) c.revalidateInterval = 0 - output := c.run(0) + output := c.run(0, 1) writeNodesJSON(nodesFile, output) return nil } @@ -214,7 +219,7 @@ func discv4Crawl(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, disc.RandomNodes()) c.revalidateInterval = 10 * time.Minute - output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) + output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name)) writeNodesJSON(nodesFile, output) return nil } diff --git a/cmd/devp2p/discv5cmd.go b/cmd/devp2p/discv5cmd.go index 343e2a0d5..832a4bc1f 100644 --- a/cmd/devp2p/discv5cmd.go +++ b/cmd/devp2p/discv5cmd.go @@ -110,7 +110,7 @@ func discv5Crawl(ctx *cli.Context) error { defer disc.Close() c := newCrawler(inputSet, disc, disc.RandomNodes()) c.revalidateInterval = 10 * time.Minute - output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) + output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name)) writeNodesJSON(nodesFile, output) return nil } diff --git a/cmd/devp2p/dns_cloudflare.go b/cmd/devp2p/dns_cloudflare.go index 92c6faf27..bfe92257e 100644 --- a/cmd/devp2p/dns_cloudflare.go +++ b/cmd/devp2p/dns_cloudflare.go @@ -126,11 +126,16 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string) } // Iterate over the new records and inject anything missing. + log.Info("Updating DNS entries") + created := 0 + updated := 0 + skipped := 0 for path, val := range records { old, exists := existing[path] if !exists { // Entry is unknown, push a new one to Cloudflare. - log.Info(fmt.Sprintf("Creating %s = %q", path, val)) + log.Debug(fmt.Sprintf("Creating %s = %q", path, val)) + created++ ttl := rootTTL if path != name { ttl = treeNodeTTLCloudflare // Max TTL permitted by Cloudflare @@ -139,27 +144,33 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string) _, err = c.CreateDNSRecord(context.Background(), c.zoneID, record) } else if old.Content != val { // Entry already exists, only change its content. - log.Info(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val)) + log.Debug(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val)) + updated++ old.Content = val err = c.UpdateDNSRecord(context.Background(), c.zoneID, old.ID, old) } else { + skipped++ log.Debug(fmt.Sprintf("Skipping %s = %q", path, val)) } if err != nil { return fmt.Errorf("failed to publish %s: %v", path, err) } } - + log.Info("Updated DNS entries", "new", created, "updated", updated, "untouched", skipped) // Iterate over the old records and delete anything stale. + deleted := 0 + log.Info("Deleting stale DNS entries") for path, entry := range existing { if _, ok := records[path]; ok { continue } // Stale entry, nuke it. - log.Info(fmt.Sprintf("Deleting %s = %q", path, entry.Content)) + log.Debug(fmt.Sprintf("Deleting %s = %q", path, entry.Content)) + deleted++ if err := c.DeleteDNSRecord(context.Background(), c.zoneID, entry.ID); err != nil { return fmt.Errorf("failed to delete %s: %v", path, err) } } + log.Info("Deleted stale DNS entries", "count", deleted) return nil } diff --git a/cmd/devp2p/dns_route53.go b/cmd/devp2p/dns_route53.go index 4aab0856f..81734eb2a 100644 --- a/cmd/devp2p/dns_route53.go +++ b/cmd/devp2p/dns_route53.go @@ -329,8 +329,9 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error var req route53.ListResourceRecordSetsInput req.HostedZoneId = &c.zoneID existing := make(map[string]recordSet) + log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID) for page := 0; ; page++ { - log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page) + log.Debug("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page) resp, err := c.api.ListResourceRecordSets(context.TODO(), &req) if err != nil { return existing, err @@ -360,7 +361,7 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error req.StartRecordName = resp.NextRecordName req.StartRecordType = resp.NextRecordType } - + log.Info("Loaded existing TXT records", "name", name, "zone", c.zoneID, "records", len(existing)) return existing, nil }