diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 2c1dd65a0..03e2cc2c4 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -40,7 +40,7 @@ var ( allhosts string hosts []string filesize int - syncDelay int + syncDelay bool inputSeed int httpPort int wsPort int @@ -87,10 +87,9 @@ func main() { Usage: "file size for generated random file in KB", Destination: &filesize, }, - cli.IntFlag{ + cli.BoolFlag{ Name: "sync-delay", - Value: 5, - Usage: "duration of delay in seconds to wait for content to be synced", + Usage: "wait for content to be synced", Destination: &syncDelay, }, cli.IntFlag{ diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index ab082c543..6ca3d3947 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -81,9 +81,13 @@ outer: return err } - log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "wait for sync", syncDelay) hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) - time.Sleep(time.Duration(syncDelay) * time.Second) + + if syncDelay { + waitToSync() + } + uploadedBytes += filesize * 1000 q := make(chan struct{}, 1) d := make(chan struct{}) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index d6eb87ace..7338e3473 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -197,7 +197,8 @@ func getBzzAddrFromHost(client *rpc.Client) (string, error) { // we make an ugly assumption about the output format of the hive.String() method // ideally we should replace this with an API call that returns the bzz addr for a given host, // but this also works for now (provided we don't change the hive.String() method, which we haven't in some time - return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil + ss := strings.Split(strings.Split(hive, "\n")[3], " ") + return ss[len(ss)-1], nil } // checkChunksVsMostProxHosts is checking: @@ -284,13 +285,16 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) - waitToSync() + // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true + if syncDelay { + waitToSync() - log.Debug("chunks before fetch attempt", "hash", hash) + log.Debug("chunks before fetch attempt", "hash", hash) - err = trackChunks(randomBytes, false) - if err != nil { - log.Error(err.Error()) + err = trackChunks(randomBytes, false) + if err != nil { + log.Error(err.Error()) + } } if onlyUpload { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 821cdaa9a..b43fdeee2 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -223,6 +223,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) } + var wantDelaySet bool + var wantDelay time.Time + ctr := 0 errC := make(chan error) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) @@ -234,6 +237,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if wait := c.NeedData(ctx, hash); wait != nil { ctr++ want.Set(i/HashSize, true) + + // measure how long it takes before we mark chunks for retrieval, and actually send the request + if !wantDelaySet { + wantDelaySet = true + wantDelay = time.Now() + } + // create request and wait until the chunk data arrives and is stored go func(w func(context.Context) error) { select { @@ -304,6 +314,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + + // record want delay + if wantDelaySet { + metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) + } + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority error", "err", err) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 17ce0d798..28fd06e4d 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -415,9 +415,14 @@ func (p *Peer) removeClientParams(s Stream) error { } func (p *Peer) close() { + p.serverMu.Lock() + defer p.serverMu.Unlock() + for _, s := range p.servers { s.Close() } + + p.servers = nil } // runUpdateSyncing is a long running function that creates the initial