From baded64d8819ece2bb715bf707882017dca03ae4 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 20 Mar 2019 21:30:34 +0100 Subject: [PATCH] swarm/network: measure time of messages in priority queue (#19250) --- cmd/swarm/swarm-smoke/main.go | 7 ++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 35 ++++++++----------- metrics/influxdb/influxdb.go | 1 + swarm/network/fetcher.go | 7 ++-- swarm/network/priorityqueue/priorityqueue.go | 23 ++++++++---- swarm/network/stream/delivery.go | 12 +++++++ swarm/network/stream/stream.go | 2 +- swarm/storage/chunker.go | 5 ++- swarm/storage/netstore.go | 3 ++ swarm/swarm.go | 2 ++ .../opentracing/opentracing-go/CHANGELOG.md | 2 +- .../opentracing/opentracing-go/Makefile | 18 ++-------- .../opentracing/opentracing-go/README.md | 4 +-- .../opentracing-go/globaltracer.go | 18 +++++++--- .../opentracing/opentracing-go/propagation.go | 2 +- vendor/vendor.json | 6 ++-- 16 files changed, 87 insertions(+), 60 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 43d2c1ff5..860fbcc1d 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -40,6 +40,7 @@ var ( allhosts string hosts []string filesize int + inputSeed int syncDelay int httpPort int wsPort int @@ -74,6 +75,12 @@ func main() { Usage: "ws port", Destination: &wsPort, }, + cli.IntFlag{ + Name: "seed", + Value: 0, + Usage: "input seed in case we need deterministic upload", + Destination: &inputSeed, + }, cli.IntFlag{ Name: "filesize", Value: 1024, diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index d1032f821..6c20a4fa6 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -39,6 +39,11 @@ import ( ) func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { + // use input seed if it has been set + if inputSeed != 0 { + seed = inputSeed + } + randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) @@ -47,37 +52,28 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { errc <- uploadAndSync(ctx, randomBytes, tuid) }() + var err error select { - case err := <-errc: + case err = <-errc: if err != nil { metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) } - return err case <-time.After(time.Duration(timeout) * time.Second): metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) - e := fmt.Errorf("timeout after %v sec", timeout) - // trigger debug functionality on randomBytes - err := trackChunks(randomBytes[:]) - if err != nil { - e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err) - } - - return e + err = fmt.Errorf("timeout after %v sec", timeout) } - // trigger debug functionality on randomBytes even on successful runs - err := trackChunks(randomBytes[:]) - if err != nil { - log.Error(err.Error()) + // trigger debug functionality on randomBytes + e := trackChunks(randomBytes[:]) + if e != nil { + log.Error(e.Error()) } - return nil + return err } func trackChunks(testData []byte) error { - log.Warn("Test timed out, running chunk debug sequence") - addrs, err := getAllRefs(testData) if err != nil { return err @@ -94,14 +90,14 @@ func trackChunks(testData []byte) error { rpcClient, err := rpc.Dial(httpHost) if err != nil { - log.Error("Error dialing host", "err", err) + log.Error("error dialing host", "err", err, "host", httpHost) continue } var hasInfo []api.HasInfo err = rpcClient.Call(&hasInfo, "bzz_has", addrs) if err != nil { - log.Error("Error calling host", "err", err) + log.Error("error calling rpc client", "err", err, "host", httpHost) continue } @@ -125,7 +121,6 @@ func trackChunks(testData []byte) error { } func getAllRefs(testData []byte) (storage.AddressCollection, error) { - log.Trace("Getting all references for given root hash") datadir, err := ioutil.TempDir("", "chunk-debug") if err != nil { return nil, fmt.Errorf("unable to create temp dir: %v", err) diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index c4ef92723..6619915fd 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -91,6 +91,7 @@ func (r *reporter) makeClient() (err error) { URL: r.url, Username: r.username, Password: r.password, + Timeout: 10 * time.Second, }) return diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index f7deead3d..5c0dfefce 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) { // incoming request case hopCount = <-f.requestC: - log.Trace("new request", "request addr", f.addr) // 2) chunk is requested, set requested flag // launch a request iff none been launched yet doRequest = !requested + log.Trace("new request", "request addr", f.addr, "doRequest", doRequest) requested = true // peer we requested from is gone. fall back to another // and remove the peer from the peers map case id := <-gone: - log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) peers.Delete(id.String()) doRequest = requested + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest) // search timeout: too much time passed since the last request, // extend the search to a new peer if we can find one case <-waitC: - log.Trace("search timed out: requesting", "request addr", f.addr) doRequest = requested + log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest) // all Fetcher context closed, can quit case <-f.ctx.Done(): @@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources for i = 0; i < len(sources); i++ { req.Source = sources[i] var err error + log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String()) sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err == nil { // remove the peer from known sources diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go index 538502605..056e85ec1 100644 --- a/swarm/network/priorityqueue/priorityqueue.go +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -28,8 +28,9 @@ package priorityqueue import ( "context" "errors" + "time" - "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) var ( @@ -69,13 +70,16 @@ READ: case <-ctx.Done(): return case x := <-q: - log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p])) - f(x) + val := x.(struct { + v interface{} + t time.Time + }) + f(val.v) + metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t) p = top default: if p > 0 { p-- - log.Trace("priority.queue p > 0", "p", p) continue READ } p = top @@ -83,7 +87,6 @@ READ: case <-ctx.Done(): return case <-pq.wakeup: - log.Trace("priority.queue wakeup", "p", p) } } } @@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error { if p < 0 || p >= len(pq.Queues) { return errBadPriority } - log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p])) + val := struct { + v interface{} + t time.Time + }{ + x, + time.Now(), + } select { - case pq.Queues[p] <- x: + case pq.Queues[p] <- val: default: return ErrContention } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 01ae7f943..bc4f1f665 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } + osp.LogFields(olog.Bool("delivered", true)) return } osp.LogFields(olog.Bool("skipCheck", false)) @@ -216,6 +217,10 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg // chunk delivery msg is response to retrieverequest msg func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "handle.chunk.delivery") processReceivedChunksCount.Inc(1) @@ -223,13 +228,18 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) span := tracing.ShiftSpanByKey(spanId) + log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID()) + go func() { + defer osp.Finish() + if span != nil { span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) defer span.Finish() } req.peer = sp + log.Trace("handle.chunk.delivery", "put", req.Addr) err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) if err != nil { if err == storage.ErrChunkInvalid { @@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch req.peer.Drop(err) } } + log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err) }() return nil } @@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( // this span will finish only when delivery is handled (or times out) ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request") ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr)) + log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr) err := sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: req.Addr, SkipCheck: req.SkipCheck, diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index c7c489152..1038e52d0 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API { Namespace: "stream", Version: "3.0", Service: r.api, - Public: true, + Public: false, }, } } diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 5b36b477e..b2f0f5633 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in chunkData, err := r.getter.Get(ctx, Reference(childAddress)) if err != nil { metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) - log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case <-quitC: @@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in // Read keeps a cursor so cannot be called simulateously, see ReadAt func (r *LazyChunkReader) Read(b []byte) (read int, err error) { - log.Debug("lazychunkreader.read", "key", r.addr) + log.Trace("lazychunkreader.read", "key", r.addr) metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) read, err = r.ReadAt(b, r.off) if err != nil && err != io.EOF { - log.Debug("lazychunkreader.readat", "read", read, "err", err) + log.Trace("lazychunkreader.readat", "read", read, "err", err) metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index e3845489e..7741b8f7b 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error { // if chunk is now put in the store, check if there was an active fetcher and call deliver on it // (this delivers the chunk to requestors via the fetcher) + log.Trace("n.getFetcher", "ref", ch.Address()) if f := n.getFetcher(ch.Address()); f != nil { + log.Trace("n.getFetcher deliver", "ref", ch.Address()) f.deliver(ctx, ch) } return nil @@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) { f.chunk = ch // closing the deliveredC channel will terminate ongoing requests close(f.deliveredC) + log.Trace("n.getFetcher close deliveredC", "ref", ch.Address()) }) } diff --git a/swarm/swarm.go b/swarm/swarm.go index b4b08c5c5..ae78ccd48 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API { apis = append(apis, s.bzz.APIs()...) + apis = append(apis, s.streamer.APIs()...) + if s.ps != nil { apis = append(apis, s.ps.APIs()...) } diff --git a/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md b/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md index 1fc9fdf7f..ecfb7e3b7 100644 --- a/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md +++ b/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md @@ -10,5 +10,5 @@ Changes by Version 1.0.0 (2016-09-26) ------------------- -- This release implements OpenTracing Specification 1.0 (http://opentracing.io/spec) +- This release implements OpenTracing Specification 1.0 (https://opentracing.io/spec) diff --git a/vendor/github.com/opentracing/opentracing-go/Makefile b/vendor/github.com/opentracing/opentracing-go/Makefile index d49a5c0d4..62abb63f5 100644 --- a/vendor/github.com/opentracing/opentracing-go/Makefile +++ b/vendor/github.com/opentracing/opentracing-go/Makefile @@ -1,26 +1,15 @@ -PACKAGES := . ./mocktracer/... ./ext/... - .DEFAULT_GOAL := test-and-lint -.PHONE: test-and-lint - +.PHONY: test-and-lint test-and-lint: test lint .PHONY: test test: go test -v -cover -race ./... +.PHONY: cover cover: - @rm -rf cover-all.out - $(foreach pkg, $(PACKAGES), $(MAKE) cover-pkg PKG=$(pkg) || true;) - @grep mode: cover.out > coverage.out - @cat cover-all.out >> coverage.out - go tool cover -html=coverage.out -o cover.html - @rm -rf cover.out cover-all.out coverage.out - -cover-pkg: - go test -coverprofile cover.out $(PKG) - @grep -v mode: cover.out >> cover-all.out + go test -v -coverprofile=coverage.txt -covermode=atomic -race ./... .PHONY: lint lint: @@ -29,4 +18,3 @@ lint: @# Run again with magic to exit non-zero if golint outputs anything. @! (golint ./... | read dummy) go vet ./... - diff --git a/vendor/github.com/opentracing/opentracing-go/README.md b/vendor/github.com/opentracing/opentracing-go/README.md index 007ee237c..6ef1d7c9d 100644 --- a/vendor/github.com/opentracing/opentracing-go/README.md +++ b/vendor/github.com/opentracing/opentracing-go/README.md @@ -8,8 +8,8 @@ This package is a Go platform API for OpenTracing. ## Required Reading In order to understand the Go platform API, one must first be familiar with the -[OpenTracing project](http://opentracing.io) and -[terminology](http://opentracing.io/documentation/pages/spec.html) more specifically. +[OpenTracing project](https://opentracing.io) and +[terminology](https://opentracing.io/specification/) more specifically. ## API overview for those adding instrumentation diff --git a/vendor/github.com/opentracing/opentracing-go/globaltracer.go b/vendor/github.com/opentracing/opentracing-go/globaltracer.go index 8c8e793ff..4f7066a92 100644 --- a/vendor/github.com/opentracing/opentracing-go/globaltracer.go +++ b/vendor/github.com/opentracing/opentracing-go/globaltracer.go @@ -1,7 +1,12 @@ package opentracing +type registeredTracer struct { + tracer Tracer + isRegistered bool +} + var ( - globalTracer Tracer = NoopTracer{} + globalTracer = registeredTracer{NoopTracer{}, false} ) // SetGlobalTracer sets the [singleton] opentracing.Tracer returned by @@ -11,22 +16,27 @@ var ( // Prior to calling `SetGlobalTracer`, any Spans started via the `StartSpan` // (etc) globals are noops. func SetGlobalTracer(tracer Tracer) { - globalTracer = tracer + globalTracer = registeredTracer{tracer, true} } // GlobalTracer returns the global singleton `Tracer` implementation. // Before `SetGlobalTracer()` is called, the `GlobalTracer()` is a noop // implementation that drops all data handed to it. func GlobalTracer() Tracer { - return globalTracer + return globalTracer.tracer } // StartSpan defers to `Tracer.StartSpan`. See `GlobalTracer()`. func StartSpan(operationName string, opts ...StartSpanOption) Span { - return globalTracer.StartSpan(operationName, opts...) + return globalTracer.tracer.StartSpan(operationName, opts...) } // InitGlobalTracer is deprecated. Please use SetGlobalTracer. func InitGlobalTracer(tracer Tracer) { SetGlobalTracer(tracer) } + +// IsGlobalTracerRegistered returns a `bool` to indicate if a tracer has been globally registered +func IsGlobalTracerRegistered() bool { + return globalTracer.isRegistered +} diff --git a/vendor/github.com/opentracing/opentracing-go/propagation.go b/vendor/github.com/opentracing/opentracing-go/propagation.go index 0dd466a37..b0c275eb0 100644 --- a/vendor/github.com/opentracing/opentracing-go/propagation.go +++ b/vendor/github.com/opentracing/opentracing-go/propagation.go @@ -160,7 +160,7 @@ type HTTPHeadersCarrier http.Header // Set conforms to the TextMapWriter interface. func (c HTTPHeadersCarrier) Set(key, val string) { h := http.Header(c) - h.Add(key, val) + h.Set(key, val) } // ForeachKey conforms to the TextMapReader interface. diff --git a/vendor/vendor.json b/vendor/vendor.json index c2c7d7c3d..e965b4d7f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -352,10 +352,10 @@ "revisionTime": "2017-01-28T05:05:32Z" }, { - "checksumSHA1": "wIcN7tZiF441h08RHAm4NV8cYO4=", + "checksumSHA1": "a/DHmc9bdsYlZZcwp6i3xhvV7Pk=", "path": "github.com/opentracing/opentracing-go", - "revision": "bd9c3193394760d98b2fa6ebb2291f0cd1d06a7d", - "revisionTime": "2018-06-06T20:41:48Z" + "revision": "25a84ff92183e2f8ac018ba1db54f8a07b3c0e04", + "revisionTime": "2019-02-18T02:30:34Z" }, { "checksumSHA1": "uhDxBvLEqRAMZKgpTZ8MFuLIIM8=",