cmd/swarm, metrics, swarm/api/client, swarm/storage, swarm/metrics, swarm/api/http: add instrumentation (#18274)

This commit is contained in:
Elad 2018-12-11 13:51:58 +05:30 committed by Anton Evangelatov
parent b2aac658b0
commit bb724080ca
8 changed files with 368 additions and 82 deletions

@ -2,11 +2,13 @@ package main
import ( import (
"bytes" "bytes"
"context"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptrace"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
@ -16,8 +18,13 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable" colorable "github.com/mattn/go-colorable"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid" "github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
@ -26,11 +33,29 @@ const (
feedRandomDataLength = 8 feedRandomDataLength = 8
) )
// TODO: retrieve with manifest + extract repeating code
func cliFeedUploadAndSync(c *cli.Context) error { func cliFeedUploadAndSync(c *cli.Context) error {
metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1)
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))))
errc := make(chan error)
go func() {
errc <- feedUploadAndSync(c)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}
// TODO: retrieve with manifest + extract repeating code
func feedUploadAndSync(c *cli.Context) error {
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
generateEndpoints(scheme, cluster, appName, from, to) generateEndpoints(scheme, cluster, appName, from, to)
@ -204,12 +229,12 @@ func cliFeedUploadAndSync(c *cli.Context) error {
log.Info("all endpoints synced random data successfully") log.Info("all endpoints synced random data successfully")
// upload test file // upload test file
log.Info("uploading to " + endpoints[0] + " and syncing") seed := int(time.Now().UnixNano() / 1e6)
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)
f, cleanup := generateRandomFile(filesize * 1000) randomBytes := testutil.RandomBytes(seed, filesize*1000)
defer cleanup()
hash, err := upload(f, endpoints[0]) hash, err := upload(&randomBytes, endpoints[0])
if err != nil { if err != nil {
return err return err
} }
@ -218,7 +243,7 @@ func cliFeedUploadAndSync(c *cli.Context) error {
return err return err
} }
multihashHex := hexutil.Encode(hashBytes) multihashHex := hexutil.Encode(hashBytes)
fileHash, err := digest(f) fileHash, err := digest(bytes.NewReader(randomBytes))
if err != nil { if err != nil {
return err return err
} }
@ -284,14 +309,37 @@ func cliFeedUploadAndSync(c *cli.Context) error {
} }
func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error {
ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch")
defer sp.Finish()
log.Trace("sleeping", "ruid", ruid) log.Trace("sleeping", "ruid", ruid)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user)
res, err := http.Get(endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user)
var tn time.Time
reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user
req, _ := http.NewRequest("GET", reqUri, nil)
opentracing.GlobalTracer().Inject(
sp.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header))
trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn)
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
transport := http.DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn = time.Now()
res, err := transport.RoundTrip(req)
if err != nil { if err != nil {
log.Error(err.Error(), "ruid", ruid)
return err return err
} }
log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength)
if res.StatusCode != 200 { if res.StatusCode != 200 {

@ -17,14 +17,25 @@
package main package main
import ( import (
"fmt"
"os" "os"
"sort" "sort"
"github.com/ethereum/go-ethereum/cmd/utils"
gethmetrics "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/influxdb"
swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics"
"github.com/ethereum/go-ethereum/swarm/tracing"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
var (
gitCommit string // Git SHA1 commit hash of the release (set via linker flags)
)
var ( var (
endpoints []string endpoints []string
includeLocalhost bool includeLocalhost bool
@ -32,9 +43,12 @@ var (
appName string appName string
scheme string scheme string
filesize int filesize int
syncDelay int
from int from int
to int to int
verbosity int verbosity int
timeout int
single bool
) )
func main() { func main() {
@ -85,14 +99,42 @@ func main() {
Usage: "file size for generated random file in KB", Usage: "file size for generated random file in KB",
Destination: &filesize, Destination: &filesize,
}, },
cli.IntFlag{
Name: "sync-delay",
Value: 5,
Usage: "duration of delay in seconds to wait for content to be synced",
Destination: &syncDelay,
},
cli.IntFlag{ cli.IntFlag{
Name: "verbosity", Name: "verbosity",
Value: 1, Value: 1,
Usage: "verbosity", Usage: "verbosity",
Destination: &verbosity, Destination: &verbosity,
}, },
cli.IntFlag{
Name: "timeout",
Value: 120,
Usage: "timeout in seconds after which kill the process",
Destination: &timeout,
},
cli.BoolFlag{
Name: "single",
Usage: "whether to fetch content from a single node or from all nodes",
Destination: &single,
},
} }
app.Flags = append(app.Flags, []cli.Flag{
utils.MetricsEnabledFlag,
swarmmetrics.MetricsInfluxDBEndpointFlag,
swarmmetrics.MetricsInfluxDBDatabaseFlag,
swarmmetrics.MetricsInfluxDBUsernameFlag,
swarmmetrics.MetricsInfluxDBPasswordFlag,
swarmmetrics.MetricsInfluxDBHostTagFlag,
}...)
app.Flags = append(app.Flags, tracing.Flags...)
app.Commands = []cli.Command{ app.Commands = []cli.Command{
{ {
Name: "upload_and_sync", Name: "upload_and_sync",
@ -111,9 +153,38 @@ func main() {
sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.FlagsByName(app.Flags))
sort.Sort(cli.CommandsByName(app.Commands)) sort.Sort(cli.CommandsByName(app.Commands))
app.Before = func(ctx *cli.Context) error {
tracing.Setup(ctx)
return nil
}
app.After = func(ctx *cli.Context) error {
return emitMetrics(ctx)
}
err := app.Run(os.Args) err := app.Run(os.Args)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
os.Exit(1) os.Exit(1)
} }
} }
func emitMetrics(ctx *cli.Context) error {
if gethmetrics.Enabled {
var (
endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(swarmmetrics.MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(swarmmetrics.MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name)
)
return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{
"host": hosttag,
"version": gitCommit,
"filesize": fmt.Sprintf("%v", filesize),
})
}
return nil
}

@ -18,21 +18,27 @@ package main
import ( import (
"bytes" "bytes"
"context"
"crypto/md5" "crypto/md5"
crand "crypto/rand" crand "crypto/rand"
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/http/httptrace"
"os" "os"
"os/exec"
"strings"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/testutil"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid" "github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
@ -40,11 +46,11 @@ import (
func generateEndpoints(scheme string, cluster string, app string, from int, to int) { func generateEndpoints(scheme string, cluster string, app string, from int, to int) {
if cluster == "prod" { if cluster == "prod" {
for port := from; port <= to; port++ { for port := from; port < to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port))
} }
} else { } else {
for port := from; port <= to; port++ { for port := from; port < to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster))
} }
} }
@ -58,22 +64,48 @@ func cliUploadAndSync(c *cli.Context) error {
log.PrintOrigins(true) log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1)
errc := make(chan error)
go func() {
errc <- uploadAndSync(c)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}
func uploadAndSync(c *cli.Context) error {
defer func(now time.Time) {
totalTime := time.Since(now)
log.Info("total time", "time", totalTime, "kb", filesize)
metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime))
}(time.Now())
generateEndpoints(scheme, cluster, appName, from, to) generateEndpoints(scheme, cluster, appName, from, to)
seed := int(time.Now().UnixNano() / 1e6)
log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed)
log.Info("uploading to " + endpoints[0] + " and syncing") randomBytes := testutil.RandomBytes(seed, filesize*1000)
f, cleanup := generateRandomFile(filesize * 1000) t1 := time.Now()
defer cleanup() hash, err := upload(&randomBytes, endpoints[0])
hash, err := upload(f, endpoints[0])
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
} }
metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1)))
fhash, err := digest(f) fhash, err := digest(bytes.NewReader(randomBytes))
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
@ -81,24 +113,48 @@ func cliUploadAndSync(c *cli.Context) error {
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash))
time.Sleep(3 * time.Second) time.Sleep(time.Duration(syncDelay) * time.Second)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
if single {
rand.Seed(time.Now().UTC().UnixNano())
randIndex := 1 + rand.Intn(len(endpoints)-1)
ruid := uuid.New()[:8]
wg.Add(1)
go func(endpoint string, ruid string) {
for {
start := time.Now()
err := fetch(hash, endpoint, fhash, ruid)
fetchTime := time.Since(start)
if err != nil {
continue
}
metrics.GetOrRegisterMeter("upload-and-sync.single.fetch-time", nil).Mark(int64(fetchTime))
wg.Done()
return
}
}(endpoints[randIndex], ruid)
} else {
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
wg.Add(1) wg.Add(1)
go func(endpoint string, ruid string) { go func(endpoint string, ruid string) {
for { for {
start := time.Now()
err := fetch(hash, endpoint, fhash, ruid) err := fetch(hash, endpoint, fhash, ruid)
fetchTime := time.Since(start)
if err != nil { if err != nil {
continue continue
} }
metrics.GetOrRegisterMeter("upload-and-sync.each.fetch-time", nil).Mark(int64(fetchTime))
wg.Done() wg.Done()
return return
} }
}(endpoint, ruid) }(endpoint, ruid)
} }
}
wg.Wait() wg.Wait()
log.Info("all endpoints synced random file successfully") log.Info("all endpoints synced random file successfully")
@ -107,16 +163,33 @@ func cliUploadAndSync(c *cli.Context) error {
// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file
func fetch(hash string, endpoint string, original []byte, ruid string) error { func fetch(hash string, endpoint string, original []byte, ruid string) error {
ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch")
defer sp.Finish()
log.Trace("sleeping", "ruid", ruid) log.Trace("sleeping", "ruid", ruid)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash)
client := &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, var tn time.Time
}} reqUri := endpoint + "/bzz:/" + hash + "/"
res, err := client.Get(endpoint + "/bzz:/" + hash + "/") req, _ := http.NewRequest("GET", reqUri, nil)
opentracing.GlobalTracer().Inject(
sp.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header))
trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn)
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
transport := http.DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn = time.Now()
res, err := transport.RoundTrip(req)
if err != nil { if err != nil {
log.Warn(err.Error(), "ruid", ruid) log.Error(err.Error(), "ruid", ruid)
return err return err
} }
log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength)
@ -147,16 +220,19 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error {
} }
// upload is uploading a file `f` to `endpoint` via the `swarm up` cmd // upload is uploading a file `f` to `endpoint` via the `swarm up` cmd
func upload(f *os.File, endpoint string) (string, error) { func upload(dataBytes *[]byte, endpoint string) (string, error) {
var out bytes.Buffer swarm := client.NewClient(endpoint)
cmd := exec.Command("swarm", "--bzzapi", endpoint, "up", f.Name()) f := &client.File{
cmd.Stdout = &out ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)),
err := cmd.Run() ManifestEntry: api.ManifestEntry{
if err != nil { ContentType: "text/plain",
return "", err Mode: 0660,
Size: int64(len(*dataBytes)),
},
} }
hash := strings.TrimRight(out.String(), "\r\n")
return hash, nil // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
return swarm.Upload(f, "", false)
} }
func digest(r io.Reader) ([]byte, error) { func digest(r io.Reader) ([]byte, error) {
@ -179,27 +255,3 @@ func generateRandomData(datasize int) ([]byte, error) {
} }
return b, nil return b, nil
} }
// generateRandomFile is creating a temporary file with the requested byte size
func generateRandomFile(size int) (f *os.File, teardown func()) {
// create a tmp file
tmp, err := ioutil.TempFile("", "swarm-test")
if err != nil {
panic(err)
}
// callback for tmp file cleanup
teardown = func() {
tmp.Close()
os.Remove(tmp.Name())
}
buf := make([]byte, size)
_, err = crand.Read(buf)
if err != nil {
panic(err)
}
ioutil.WriteFile(tmp.Name(), buf, 0755)
return tmp, teardown
}

@ -58,6 +58,34 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna
rep.run() rep.run()
} }
// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
u, err := uurl.Parse(url)
if err != nil {
return fmt.Errorf("Unable to parse InfluxDB. url: %s, err: %v", url, err)
}
rep := &reporter{
reg: r,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
return fmt.Errorf("Unable to make InfluxDB client. err: %v", err)
}
if err := rep.send(); err != nil {
return fmt.Errorf("Unable to send to InfluxDB. err: %v", err)
}
return nil
}
func (r *reporter) makeClient() (err error) { func (r *reporter) makeClient() (err error) {
r.client, err = client.NewClient(client.Config{ r.client, err = client.NewClient(client.Config{
URL: r.url, URL: r.url,

@ -19,6 +19,7 @@ package client
import ( import (
"archive/tar" "archive/tar"
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -26,6 +27,7 @@ import (
"io/ioutil" "io/ioutil"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"net/http/httptrace"
"net/textproto" "net/textproto"
"net/url" "net/url"
"os" "os"
@ -33,9 +35,14 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/pborman/uuid"
) )
var ( var (
@ -474,6 +481,11 @@ type UploadFn func(file *File) error
// TarUpload uses the given Uploader to upload files to swarm as a tar stream, // TarUpload uses the given Uploader to upload files to swarm as a tar stream,
// returning the resulting manifest hash // returning the resulting manifest hash
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) {
ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload")
defer sp.Finish()
var tn time.Time
reqR, reqW := io.Pipe() reqR, reqW := io.Pipe()
defer reqR.Close() defer reqR.Close()
addr := hash addr := hash
@ -489,6 +501,12 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
if err != nil { if err != nil {
return "", err return "", err
} }
trace := GetClientTrace("swarm api client - upload tar", "api.client.uploadtar", uuid.New()[:8], &tn)
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
transport := http.DefaultTransport
req.Header.Set("Content-Type", "application/x-tar") req.Header.Set("Content-Type", "application/x-tar")
if defaultPath != "" { if defaultPath != "" {
q := req.URL.Query() q := req.URL.Query()
@ -529,8 +547,8 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
} }
reqW.CloseWithError(err) reqW.CloseWithError(err)
}() }()
tn = time.Now()
res, err := http.DefaultClient.Do(req) res, err := transport.RoundTrip(req)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -728,3 +746,57 @@ func (c *Client) GetFeedRequest(query *feed.Query, manifestAddressOrDomain strin
} }
return &metadata, nil return &metadata, nil
} }
func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptrace.ClientTrace {
trace := &httptrace.ClientTrace{
GetConn: func(_ string) {
log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".getconn", nil).Update(time.Since(*tn))
},
GotConn: func(_ httptrace.GotConnInfo) {
log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".gotconn", nil).Update(time.Since(*tn))
},
PutIdleConn: func(err error) {
log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err)
metrics.GetOrRegisterResettingTimer(metricPrefix+".putidle", nil).Update(time.Since(*tn))
},
GotFirstResponseByte: func() {
log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".firstbyte", nil).Update(time.Since(*tn))
},
Got100Continue: func() {
log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".got100continue", nil).Update(time.Since(*tn))
},
DNSStart: func(_ httptrace.DNSStartInfo) {
log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsstart", nil).Update(time.Since(*tn))
},
DNSDone: func(_ httptrace.DNSDoneInfo) {
log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsdone", nil).Update(time.Since(*tn))
},
ConnectStart: func(network, addr string) {
log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr)
metrics.GetOrRegisterResettingTimer(metricPrefix+".connectstart", nil).Update(time.Since(*tn))
},
ConnectDone: func(network, addr string, err error) {
log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err)
metrics.GetOrRegisterResettingTimer(metricPrefix+".connectdone", nil).Update(time.Since(*tn))
},
WroteHeaders: func() {
log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".wroteheaders", nil).Update(time.Since(*tn))
},
Wait100Continue: func() {
log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".wait100continue", nil).Update(time.Since(*tn))
},
WroteRequest: func(_ httptrace.WroteRequestInfo) {
log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid)
metrics.GetOrRegisterResettingTimer(metricPrefix+".wroterequest", nil).Update(time.Since(*tn))
},
}
return trace
}

@ -74,13 +74,15 @@ func ParseURI(h http.Handler) http.Handler {
func InitLoggingResponseWriter(h http.Handler) http.Handler { func InitLoggingResponseWriter(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now() tn := time.Now()
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime)
writer := newLoggingResponseWriter(w) writer := newLoggingResponseWriter(w)
h.ServeHTTP(writer, r) h.ServeHTTP(writer, r)
log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode)
metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime) ts := time.Since(tn)
log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode, "time", ts*time.Millisecond)
metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).Update(ts)
metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).Update(ts)
}) })
} }
@ -93,6 +95,7 @@ func InstrumentOpenTracing(h http.Handler) http.Handler {
} }
spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme) spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme)
ctx, sp := spancontext.StartSpan(r.Context(), spanName) ctx, sp := spancontext.StartSpan(r.Context(), spanName)
defer sp.Finish() defer sp.Finish()
h.ServeHTTP(w, r.WithContext(ctx)) h.ServeHTTP(w, r.WithContext(ctx))
}) })

@ -27,26 +27,26 @@ import (
) )
var ( var (
metricsEnableInfluxDBExportFlag = cli.BoolFlag{ MetricsEnableInfluxDBExportFlag = cli.BoolFlag{
Name: "metrics.influxdb.export", Name: "metrics.influxdb.export",
Usage: "Enable metrics export/push to an external InfluxDB database", Usage: "Enable metrics export/push to an external InfluxDB database",
} }
metricsInfluxDBEndpointFlag = cli.StringFlag{ MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint", Name: "metrics.influxdb.endpoint",
Usage: "Metrics InfluxDB endpoint", Usage: "Metrics InfluxDB endpoint",
Value: "http://127.0.0.1:8086", Value: "http://127.0.0.1:8086",
} }
metricsInfluxDBDatabaseFlag = cli.StringFlag{ MetricsInfluxDBDatabaseFlag = cli.StringFlag{
Name: "metrics.influxdb.database", Name: "metrics.influxdb.database",
Usage: "Metrics InfluxDB database", Usage: "Metrics InfluxDB database",
Value: "metrics", Value: "metrics",
} }
metricsInfluxDBUsernameFlag = cli.StringFlag{ MetricsInfluxDBUsernameFlag = cli.StringFlag{
Name: "metrics.influxdb.username", Name: "metrics.influxdb.username",
Usage: "Metrics InfluxDB username", Usage: "Metrics InfluxDB username",
Value: "", Value: "",
} }
metricsInfluxDBPasswordFlag = cli.StringFlag{ MetricsInfluxDBPasswordFlag = cli.StringFlag{
Name: "metrics.influxdb.password", Name: "metrics.influxdb.password",
Usage: "Metrics InfluxDB password", Usage: "Metrics InfluxDB password",
Value: "", Value: "",
@ -55,7 +55,7 @@ var (
// It is used so that we can group all nodes and average a measurement across all of them, but also so // It is used so that we can group all nodes and average a measurement across all of them, but also so
// that we can select a specific node and inspect its measurements. // that we can select a specific node and inspect its measurements.
// https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key
metricsInfluxDBHostTagFlag = cli.StringFlag{ MetricsInfluxDBHostTagFlag = cli.StringFlag{
Name: "metrics.influxdb.host.tag", Name: "metrics.influxdb.host.tag",
Usage: "Metrics InfluxDB `host` tag attached to all measurements", Usage: "Metrics InfluxDB `host` tag attached to all measurements",
Value: "localhost", Value: "localhost",
@ -65,20 +65,24 @@ var (
// Flags holds all command-line flags required for metrics collection. // Flags holds all command-line flags required for metrics collection.
var Flags = []cli.Flag{ var Flags = []cli.Flag{
utils.MetricsEnabledFlag, utils.MetricsEnabledFlag,
metricsEnableInfluxDBExportFlag, MetricsEnableInfluxDBExportFlag,
metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, MetricsInfluxDBEndpointFlag,
MetricsInfluxDBDatabaseFlag,
MetricsInfluxDBUsernameFlag,
MetricsInfluxDBPasswordFlag,
MetricsInfluxDBHostTagFlag,
} }
func Setup(ctx *cli.Context) { func Setup(ctx *cli.Context) {
if gethmetrics.Enabled { if gethmetrics.Enabled {
log.Info("Enabling swarm metrics collection") log.Info("Enabling swarm metrics collection")
var ( var (
enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name) enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
) )
// Start system runtime metrics collection // Start system runtime metrics collection

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"io" "io"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
ch "github.com/ethereum/go-ethereum/swarm/chunk" ch "github.com/ethereum/go-ethereum/swarm/chunk"
@ -410,10 +411,14 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e
log.Debug("lazychunkreader.size", "addr", r.addr) log.Debug("lazychunkreader.size", "addr", r.addr)
if r.chunkData == nil { if r.chunkData == nil {
startTime := time.Now()
chunkData, err := r.getter.Get(cctx, Reference(r.addr)) chunkData, err := r.getter.Get(cctx, Reference(r.addr))
if err != nil { if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
return 0, err return 0, err
} }
metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime)
r.chunkData = chunkData r.chunkData = chunkData
s := r.chunkData.Size() s := r.chunkData.Size()
log.Debug("lazychunkreader.size", "key", r.addr, "size", s) log.Debug("lazychunkreader.size", "key", r.addr, "size", s)
@ -542,8 +547,10 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
wg.Add(1) wg.Add(1)
go func(j int64) { go func(j int64) {
childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
startTime := time.Now()
chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) chunkData, err := r.getter.Get(r.ctx, Reference(childAddress))
if err != nil { if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select { select {
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
@ -551,6 +558,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
} }
return return
} }
metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime)
if l := len(chunkData); l < 9 { if l := len(chunkData); l < 9 {
select { select {
case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l):