record point for every fetch (#269)

This commit is contained in:
Anton Evangelatov 2020-10-08 20:43:46 +02:00 committed by GitHub
parent 4cc1fcb37b
commit 1a64caaa4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -34,7 +34,7 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-noise" noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio" secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls" tls "github.com/libp2p/go-libp2p-tls"
@ -115,7 +115,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
return err return err
} }
runenv.RecordMessage("done dialling provider") runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency) return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size)
default: default:
panic("unsupported group ID") panic("unsupported group ID")
@ -155,7 +155,7 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
return ret return ret
} }
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int) error { func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64) error {
var ( var (
cids []cid.Cid cids []cid.Cid
// create a selector for the whole UnixFS dag // create a selector for the whole UnixFS dag
@ -207,9 +207,11 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
for err := range errCh { for err := range errCh {
return err return err
} }
dur := time.Since(start)
runenv.RecordMessage("\t<<< request complete with no errors") runenv.RecordMessage("\t<<< request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, time.Since(start)) runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)
runenv.R().RecordPoint(fmt.Sprintf("duration,lat=%s,bw=%d,concurrency=%d,size=%d", np.latency, np.bandwidth, concurrency, size), float64(dur))
// verify that we have the CID now. // verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil { if node, err := dagsrv.Get(grpctx, c); err != nil {