From 34c85def3eec09ee50ea2499824aeaa48705eae2 Mon Sep 17 00:00:00 2001 From: Elad Date: Tue, 5 Mar 2019 23:43:05 +0700 Subject: [PATCH] cmd/swarm/swarm-smoke: sliding window test should not time out (#19152) --- cmd/swarm/flags.go | 3 +- cmd/swarm/swarm-smoke/sliding_window.go | 72 +++++++++++++++---------- 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/cmd/swarm/flags.go b/cmd/swarm/flags.go index 39a273d87..5e1ada632 100644 --- a/cmd/swarm/flags.go +++ b/cmd/swarm/flags.go @@ -149,8 +149,9 @@ var ( } SwarmStoreCacheCapacity = cli.UintFlag{ Name: "store.cache.size", - Usage: "Number of recent chunks cached in memory (default 5000)", + Usage: "Number of recent chunks cached in memory", EnvVar: SwarmEnvStoreCacheCapacity, + Value: 10000, } SwarmCompressedFlag = cli.BoolFlag{ Name: "compressed", diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 750d96939..d589124bd 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -42,23 +42,16 @@ func slidingWindowCmd(ctx *cli.Context, tuid string) error { errc <- slidingWindow(ctx, tuid) }() - select { - case err := <-errc: - if err != nil { - metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) - } - return err - case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) - - return fmt.Errorf("timeout after %v sec", timeout) + err := <-errc + if err != nil { + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) } + return err } func slidingWindow(ctx *cli.Context, tuid string) error { var hashes []uploadResult //swarm hashes of the uploads nodes := len(hosts) - const iterationTimeout = 30 * time.Second log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) uploadedBytes := 0 networkDepth := 0 @@ -66,6 +59,7 @@ func slidingWindow(ctx *cli.Context, tuid string) error { outer: for { + seed = int(time.Now().UTC().UnixNano()) log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() @@ -79,6 +73,7 @@ outer: } metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1) + metrics.GetOrRegisterGauge("sliding-window.upload-depth", nil).Update(int64(len(hashes))) fhash, err := digest(bytes.NewReader(randomBytes)) if err != nil { @@ -90,37 +85,56 @@ outer: hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) time.Sleep(time.Duration(syncDelay) * time.Second) uploadedBytes += filesize * 1000 - + q := make(chan struct{}, 1) + d := make(chan struct{}) + defer close(q) + defer close(d) for i, v := range hashes { - timeout := time.After(time.Duration(timeout) * time.Second) + timeoutC := time.After(time.Duration(timeout) * time.Second) errored = false - inner: + task: for { select { - case <-timeout: + case q <- struct{}{}: + go func() { + var start time.Time + done := false + for !done { + log.Info("trying to retrieve hash", "hash", v.hash) + idx := 1 + rand.Intn(len(hosts)-1) + ruid := uuid.New()[:8] + start = time.Now() + // fetch hangs when swarm dies out, so we have to jump through a bit more hoops to actually + // catch the timeout, but also allow this retry logic + err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") + if err != nil { + log.Error("error fetching hash", "err", err) + continue + } + done = true + } + metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start) + d <- struct{}{} + }() + case <-d: + <-q + break task + case <-timeoutC: errored = true - log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) + log.Error("error retrieving hash. timeout", "hash idx", i) metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) - break inner + break outer default: - idx := 1 + rand.Intn(len(hosts)-1) - ruid := uuid.New()[:8] - start := time.Now() - err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") - if err != nil { - continue inner - } - metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start) - break inner } } - if errored { - break outer - } networkDepth = i metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth)) + log.Info("sliding window test successfully fetched file", "currentDepth", networkDepth) + // this test might take a long time to finish - but we'd like to see metrics while they accumulate and not just when + // the test finishes. therefore emit the metrics on each iteration + emitMetrics(ctx) } }