From bb724080cac9fa36ec6b638cfd5cf0e54bc23362 Mon Sep 17 00:00:00 2001 From: Elad Date: Tue, 11 Dec 2018 13:51:58 +0530 Subject: [PATCH] cmd/swarm, metrics, swarm/api/client, swarm/storage, swarm/metrics, swarm/api/http: add instrumentation (#18274) --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 64 ++++++- cmd/swarm/swarm-smoke/main.go | 71 ++++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 160 ++++++++++++------ metrics/influxdb/influxdb.go | 28 +++ swarm/api/client/client.go | 76 ++++++++- swarm/api/http/middleware.go | 11 +- swarm/metrics/flags.go | 32 ++-- swarm/storage/chunker.go | 8 + 8 files changed, 368 insertions(+), 82 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 7ec152826..2c5e3fd23 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -2,11 +2,13 @@ package main import ( "bytes" + "context" "crypto/md5" "fmt" "io" "io/ioutil" "net/http" + "net/http/httptrace" "os" "os/exec" "strings" @@ -16,8 +18,13 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api/client" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" + opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -26,11 +33,29 @@ const ( feedRandomDataLength = 8 ) -// TODO: retrieve with manifest + extract repeating code func cliFeedUploadAndSync(c *cli.Context) error { - + metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1) log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) + errc := make(chan error) + go func() { + errc <- feedUploadAndSync(c) + }() + + select { + case err := <-errc: + if err != nil { + metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +// TODO: retrieve with manifest + extract repeating code +func feedUploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) @@ -204,12 +229,12 @@ func cliFeedUploadAndSync(c *cli.Context) error { log.Info("all endpoints synced random data successfully") // upload test file - log.Info("uploading to " + endpoints[0] + " and syncing") + seed := int(time.Now().UnixNano() / 1e6) + log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed) - f, cleanup := generateRandomFile(filesize * 1000) - defer cleanup() + randomBytes := testutil.RandomBytes(seed, filesize*1000) - hash, err := upload(f, endpoints[0]) + hash, err := upload(&randomBytes, endpoints[0]) if err != nil { return err } @@ -218,7 +243,7 @@ func cliFeedUploadAndSync(c *cli.Context) error { return err } multihashHex := hexutil.Encode(hashBytes) - fileHash, err := digest(f) + fileHash, err := digest(bytes.NewReader(randomBytes)) if err != nil { return err } @@ -284,14 +309,37 @@ func cliFeedUploadAndSync(c *cli.Context) error { } func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") + defer sp.Finish() + log.Trace("sleeping", "ruid", ruid) time.Sleep(3 * time.Second) log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) - res, err := http.Get(endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user) + + var tn time.Time + reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) if err != nil { + log.Error(err.Error(), "ruid", ruid) return err } + log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) if res.StatusCode != 200 { diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 845998dc1..66cecdc5c 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -17,14 +17,25 @@ package main import ( + "fmt" "os" "sort" + "github.com/ethereum/go-ethereum/cmd/utils" + gethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/influxdb" + swarmmetrics "github.com/ethereum/go-ethereum/swarm/metrics" + "github.com/ethereum/go-ethereum/swarm/tracing" + "github.com/ethereum/go-ethereum/log" cli "gopkg.in/urfave/cli.v1" ) +var ( + gitCommit string // Git SHA1 commit hash of the release (set via linker flags) +) + var ( endpoints []string includeLocalhost bool @@ -32,9 +43,12 @@ var ( appName string scheme string filesize int + syncDelay int from int to int verbosity int + timeout int + single bool ) func main() { @@ -85,14 +99,42 @@ func main() { Usage: "file size for generated random file in KB", Destination: &filesize, }, + cli.IntFlag{ + Name: "sync-delay", + Value: 5, + Usage: "duration of delay in seconds to wait for content to be synced", + Destination: &syncDelay, + }, cli.IntFlag{ Name: "verbosity", Value: 1, Usage: "verbosity", Destination: &verbosity, }, + cli.IntFlag{ + Name: "timeout", + Value: 120, + Usage: "timeout in seconds after which kill the process", + Destination: &timeout, + }, + cli.BoolFlag{ + Name: "single", + Usage: "whether to fetch content from a single node or from all nodes", + Destination: &single, + }, } + app.Flags = append(app.Flags, []cli.Flag{ + utils.MetricsEnabledFlag, + swarmmetrics.MetricsInfluxDBEndpointFlag, + swarmmetrics.MetricsInfluxDBDatabaseFlag, + swarmmetrics.MetricsInfluxDBUsernameFlag, + swarmmetrics.MetricsInfluxDBPasswordFlag, + swarmmetrics.MetricsInfluxDBHostTagFlag, + }...) + + app.Flags = append(app.Flags, tracing.Flags...) + app.Commands = []cli.Command{ { Name: "upload_and_sync", @@ -111,9 +153,38 @@ func main() { sort.Sort(cli.FlagsByName(app.Flags)) sort.Sort(cli.CommandsByName(app.Commands)) + app.Before = func(ctx *cli.Context) error { + tracing.Setup(ctx) + return nil + } + + app.After = func(ctx *cli.Context) error { + return emitMetrics(ctx) + } + err := app.Run(os.Args) if err != nil { log.Error(err.Error()) + os.Exit(1) } } + +func emitMetrics(ctx *cli.Context) error { + if gethmetrics.Enabled { + var ( + endpoint = ctx.GlobalString(swarmmetrics.MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(swarmmetrics.MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(swarmmetrics.MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(swarmmetrics.MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(swarmmetrics.MetricsInfluxDBHostTagFlag.Name) + ) + return influxdb.InfluxDBWithTagsOnce(gethmetrics.DefaultRegistry, endpoint, database, username, password, "swarm-smoke.", map[string]string{ + "host": hosttag, + "version": gitCommit, + "filesize": fmt.Sprintf("%v", filesize), + }) + } + + return nil +} diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 3843457dc..d605f79a3 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -18,21 +18,27 @@ package main import ( "bytes" + "context" "crypto/md5" crand "crypto/rand" - "crypto/tls" "errors" "fmt" "io" "io/ioutil" + "math/rand" "net/http" + "net/http/httptrace" "os" - "os/exec" - "strings" "sync" "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/api/client" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/ethereum/go-ethereum/swarm/testutil" + opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" @@ -40,11 +46,11 @@ import ( func generateEndpoints(scheme string, cluster string, app string, from int, to int) { if cluster == "prod" { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) } } else { - for port := from; port <= to; port++ { + for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) } } @@ -58,22 +64,48 @@ func cliUploadAndSync(c *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "kb", filesize) }(time.Now()) + metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) + + errc := make(chan error) + go func() { + errc <- uploadAndSync(c) + }() + + select { + case err := <-errc: + if err != nil { + metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) + } +} + +func uploadAndSync(c *cli.Context) error { + defer func(now time.Time) { + totalTime := time.Since(now) + + log.Info("total time", "time", totalTime, "kb", filesize) + metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime)) + }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) + seed := int(time.Now().UnixNano() / 1e6) + log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) - log.Info("uploading to " + endpoints[0] + " and syncing") + randomBytes := testutil.RandomBytes(seed, filesize*1000) - f, cleanup := generateRandomFile(filesize * 1000) - defer cleanup() - - hash, err := upload(f, endpoints[0]) + t1 := time.Now() + hash, err := upload(&randomBytes, endpoints[0]) if err != nil { log.Error(err.Error()) return err } + metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) - fhash, err := digest(f) + fhash, err := digest(bytes.NewReader(randomBytes)) if err != nil { log.Error(err.Error()) return err @@ -81,23 +113,47 @@ func cliUploadAndSync(c *cli.Context) error { log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) - time.Sleep(3 * time.Second) + time.Sleep(time.Duration(syncDelay) * time.Second) wg := sync.WaitGroup{} - for _, endpoint := range endpoints { + if single { + rand.Seed(time.Now().UTC().UnixNano()) + randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { for { + start := time.Now() err := fetch(hash, endpoint, fhash, ruid) + fetchTime := time.Since(start) if err != nil { continue } + metrics.GetOrRegisterMeter("upload-and-sync.single.fetch-time", nil).Mark(int64(fetchTime)) wg.Done() return } - }(endpoint, ruid) + }(endpoints[randIndex], ruid) + } else { + for _, endpoint := range endpoints { + ruid := uuid.New()[:8] + wg.Add(1) + go func(endpoint string, ruid string) { + for { + start := time.Now() + err := fetch(hash, endpoint, fhash, ruid) + fetchTime := time.Since(start) + if err != nil { + continue + } + + metrics.GetOrRegisterMeter("upload-and-sync.each.fetch-time", nil).Mark(int64(fetchTime)) + wg.Done() + return + } + }(endpoint, ruid) + } } wg.Wait() log.Info("all endpoints synced random file successfully") @@ -107,16 +163,33 @@ func cliUploadAndSync(c *cli.Context) error { // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file func fetch(hash string, endpoint string, original []byte, ruid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") + defer sp.Finish() + log.Trace("sleeping", "ruid", ruid) time.Sleep(3 * time.Second) - log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - client := &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }} - res, err := client.Get(endpoint + "/bzz:/" + hash + "/") + + var tn time.Time + reqUri := endpoint + "/bzz:/" + hash + "/" + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) if err != nil { - log.Warn(err.Error(), "ruid", ruid) + log.Error(err.Error(), "ruid", ruid) return err } log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) @@ -147,16 +220,19 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { } // upload is uploading a file `f` to `endpoint` via the `swarm up` cmd -func upload(f *os.File, endpoint string) (string, error) { - var out bytes.Buffer - cmd := exec.Command("swarm", "--bzzapi", endpoint, "up", f.Name()) - cmd.Stdout = &out - err := cmd.Run() - if err != nil { - return "", err +func upload(dataBytes *[]byte, endpoint string) (string, error) { + swarm := client.NewClient(endpoint) + f := &client.File{ + ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), + ManifestEntry: api.ManifestEntry{ + ContentType: "text/plain", + Mode: 0660, + Size: int64(len(*dataBytes)), + }, } - hash := strings.TrimRight(out.String(), "\r\n") - return hash, nil + + // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. + return swarm.Upload(f, "", false) } func digest(r io.Reader) ([]byte, error) { @@ -179,27 +255,3 @@ func generateRandomData(datasize int) ([]byte, error) { } return b, nil } - -// generateRandomFile is creating a temporary file with the requested byte size -func generateRandomFile(size int) (f *os.File, teardown func()) { - // create a tmp file - tmp, err := ioutil.TempFile("", "swarm-test") - if err != nil { - panic(err) - } - - // callback for tmp file cleanup - teardown = func() { - tmp.Close() - os.Remove(tmp.Name()) - } - - buf := make([]byte, size) - _, err = crand.Read(buf) - if err != nil { - panic(err) - } - ioutil.WriteFile(tmp.Name(), buf, 0755) - - return tmp, teardown -} diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 31a5c21b5..c4ef92723 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -58,6 +58,34 @@ func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, userna rep.run() } +// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { + u, err := uurl.Parse(url) + if err != nil { + return fmt.Errorf("Unable to parse InfluxDB. url: %s, err: %v", url, err) + } + + rep := &reporter{ + reg: r, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + return fmt.Errorf("Unable to make InfluxDB client. err: %v", err) + } + + if err := rep.send(); err != nil { + return fmt.Errorf("Unable to send to InfluxDB. err: %v", err) + } + + return nil +} + func (r *reporter) makeClient() (err error) { r.client, err = client.NewClient(client.Config{ URL: r.url, diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index d9837ca73..f793ca8b8 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -19,6 +19,7 @@ package client import ( "archive/tar" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "io/ioutil" "mime/multipart" "net/http" + "net/http/httptrace" "net/textproto" "net/url" "os" @@ -33,9 +35,14 @@ import ( "regexp" "strconv" "strings" + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/pborman/uuid" ) var ( @@ -474,6 +481,11 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { + ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") + defer sp.Finish() + + var tn time.Time + reqR, reqW := io.Pipe() defer reqR.Close() addr := hash @@ -489,6 +501,12 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t if err != nil { return "", err } + + trace := GetClientTrace("swarm api client - upload tar", "api.client.uploadtar", uuid.New()[:8], &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + req.Header.Set("Content-Type", "application/x-tar") if defaultPath != "" { q := req.URL.Query() @@ -529,8 +547,8 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t } reqW.CloseWithError(err) }() - - res, err := http.DefaultClient.Do(req) + tn = time.Now() + res, err := transport.RoundTrip(req) if err != nil { return "", err } @@ -728,3 +746,57 @@ func (c *Client) GetFeedRequest(query *feed.Query, manifestAddressOrDomain strin } return &metadata, nil } + +func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptrace.ClientTrace { + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { + log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".getconn", nil).Update(time.Since(*tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".gotconn", nil).Update(time.Since(*tn)) + }, + PutIdleConn: func(err error) { + log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".putidle", nil).Update(time.Since(*tn)) + }, + GotFirstResponseByte: func() { + log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".firstbyte", nil).Update(time.Since(*tn)) + }, + Got100Continue: func() { + log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".got100continue", nil).Update(time.Since(*tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsstart", nil).Update(time.Since(*tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsdone", nil).Update(time.Since(*tn)) + }, + ConnectStart: func(network, addr string) { + log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectstart", nil).Update(time.Since(*tn)) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectdone", nil).Update(time.Since(*tn)) + }, + WroteHeaders: func() { + log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroteheaders", nil).Update(time.Since(*tn)) + }, + Wait100Continue: func() { + log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wait100continue", nil).Update(time.Since(*tn)) + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroterequest", nil).Update(time.Since(*tn)) + }, + } + return trace +} diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 115a00856..f7f819eab 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -74,13 +74,15 @@ func ParseURI(h http.Handler) http.Handler { func InitLoggingResponseWriter(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime) + tn := time.Now() writer := newLoggingResponseWriter(w) h.ServeHTTP(writer, r) - log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode) - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime) + + ts := time.Since(tn) + log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode, "time", ts*time.Millisecond) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).Update(ts) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).Update(ts) }) } @@ -93,6 +95,7 @@ func InstrumentOpenTracing(h http.Handler) http.Handler { } spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme) ctx, sp := spancontext.StartSpan(r.Context(), spanName) + defer sp.Finish() h.ServeHTTP(w, r.WithContext(ctx)) }) diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go index 79490fd36..7c12120a6 100644 --- a/swarm/metrics/flags.go +++ b/swarm/metrics/flags.go @@ -27,26 +27,26 @@ import ( ) var ( - metricsEnableInfluxDBExportFlag = cli.BoolFlag{ + MetricsEnableInfluxDBExportFlag = cli.BoolFlag{ Name: "metrics.influxdb.export", Usage: "Enable metrics export/push to an external InfluxDB database", } - metricsInfluxDBEndpointFlag = cli.StringFlag{ + MetricsInfluxDBEndpointFlag = cli.StringFlag{ Name: "metrics.influxdb.endpoint", Usage: "Metrics InfluxDB endpoint", Value: "http://127.0.0.1:8086", } - metricsInfluxDBDatabaseFlag = cli.StringFlag{ + MetricsInfluxDBDatabaseFlag = cli.StringFlag{ Name: "metrics.influxdb.database", Usage: "Metrics InfluxDB database", Value: "metrics", } - metricsInfluxDBUsernameFlag = cli.StringFlag{ + MetricsInfluxDBUsernameFlag = cli.StringFlag{ Name: "metrics.influxdb.username", Usage: "Metrics InfluxDB username", Value: "", } - metricsInfluxDBPasswordFlag = cli.StringFlag{ + MetricsInfluxDBPasswordFlag = cli.StringFlag{ Name: "metrics.influxdb.password", Usage: "Metrics InfluxDB password", Value: "", @@ -55,7 +55,7 @@ var ( // It is used so that we can group all nodes and average a measurement across all of them, but also so // that we can select a specific node and inspect its measurements. // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key - metricsInfluxDBHostTagFlag = cli.StringFlag{ + MetricsInfluxDBHostTagFlag = cli.StringFlag{ Name: "metrics.influxdb.host.tag", Usage: "Metrics InfluxDB `host` tag attached to all measurements", Value: "localhost", @@ -65,20 +65,24 @@ var ( // Flags holds all command-line flags required for metrics collection. var Flags = []cli.Flag{ utils.MetricsEnabledFlag, - metricsEnableInfluxDBExportFlag, - metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, + MetricsEnableInfluxDBExportFlag, + MetricsInfluxDBEndpointFlag, + MetricsInfluxDBDatabaseFlag, + MetricsInfluxDBUsernameFlag, + MetricsInfluxDBPasswordFlag, + MetricsInfluxDBHostTagFlag, } func Setup(ctx *cli.Context) { if gethmetrics.Enabled { log.Info("Enabling swarm metrics collection") var ( - enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name) - endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) - database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) - username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) - password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) + enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) + endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) ) // Start system runtime metrics collection diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 40292e88f..cbe65372a 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/ethereum/go-ethereum/metrics" ch "github.com/ethereum/go-ethereum/swarm/chunk" @@ -410,10 +411,14 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { + + startTime := time.Now() chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) return 0, err } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) r.chunkData = chunkData s := r.chunkData.Size() log.Debug("lazychunkreader.size", "key", r.addr, "size", s) @@ -542,8 +547,10 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + startTime := time.Now() chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): @@ -551,6 +558,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS } return } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) if l := len(chunkData); l < 9 { select { case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l):