wip; add concurrency; make it work.

This commit is contained in:
Raúl Kripalani 2020-09-11 22:36:38 +01:00
parent 6563a68ae2
commit 77000db747
5 changed files with 188 additions and 143 deletions

View File

@ -9,9 +9,10 @@
runner = "local:docker"
[global.run.test_params]
size = "104857600"
latencies = '["50ms", "100ms", "150ms", "200ms"]'
bandwidth = '["16M", "8M", "4M", "1M"]'
size = "100MB"
latencies = '["50ms", "100ms", "200ms"]'
bandwidth = '["32MiB", "16MiB", "8MiB", "4MiB", "1MiB"]'
concurrency = "1"
[[groups]]
id = "providers"

View File

@ -3,7 +3,6 @@ module github.com/libp2p/test-plans/ping
go 1.14
require (
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
@ -22,4 +21,5 @@ require (
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/testground/sdk-go v0.2.3
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
)

View File

@ -8,6 +8,7 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
@ -73,7 +74,9 @@ github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f/go.mod h1:rQY
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.1 h1:w9pSFNSdq/JPM1N12Fz/F/bzo993Is1W+Q7HjPzi7yg=
github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU=
github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po=
github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
@ -208,6 +211,7 @@ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46U
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s=
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-badger v0.2.3 h1:J27YvAcpuA5IvZUbeBxOcQgqnYHUPxoygc6QxxkodZ4=
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=

View File

@ -1,19 +1,18 @@
package main
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
goruntime "runtime"
"time"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-chunker"
"github.com/ipfs/go-ipfs-exchange-offline"
@ -22,12 +21,13 @@ import (
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/testground/sdk-go/network"
"golang.org/x/sync/errgroup"
gs "github.com/ipfs/go-graphsync"
gsi "github.com/ipfs/go-graphsync/impl"
@ -53,41 +53,41 @@ func main() {
run.InvokeMap(testcases)
}
type networkParams struct {
latency time.Duration
bandwidth uint64
}
func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
var (
size = runenv.IntParam("size")
bandwidths = runenv.SizeArrayParam("bandwidths")
latencies []time.Duration
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
networkParams = parseNetworkConfig(runenv)
)
lats := runenv.StringArrayParam("latencies")
for _, l := range lats {
d, err := time.ParseDuration(l)
if err != nil {
return err
}
latencies = append(latencies, d)
}
runenv.RecordMessage("started test instance")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
initCtx.MustWaitAllInstancesInitialized(ctx)
defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
host, peers := makeHost(ctx, runenv, initCtx)
host, peers, _ := makeHost(ctx, runenv, initCtx)
defer host.Close()
var (
// make datastore, blockstore, dag service, graphsync
ds = dss.MutexWrap(datastore.NewMapDatastore())
bs = blockstore.NewBlockstore(ds)
bs = blockstore.NewBlockstore(dss.MutexWrap(ds.NewMapDatastore()))
dagsrv = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
gsync = gsi.New(ctx, gsnet.NewFromLibp2pHost(host), makeLoader(bs), makeStorer(bs))
gsync = gsi.New(ctx,
gsnet.NewFromLibp2pHost(host),
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
)
)
defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
switch runenv.TestGroupID {
case "providers":
if runenv.TestGroupInstanceCount > 1 {
@ -97,7 +97,11 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("we are the provider")
defer runenv.RecordMessage("done provider")
return runProvider(ctx, runenv, initCtx, dagsrv, size, latencies, bandwidths)
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
return runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency)
case "requestors":
runenv.RecordMessage("we are the requestor")
@ -108,88 +112,152 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
return err
}
runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, bs, latencies, bandwidths)
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency)
default:
panic("unsupported group ID")
}
}
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, bs blockstore.Blockstore, latencies []time.Duration, bandwidths []uint64) error {
// create a selector for the whole UnixFS dag
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
sel := ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(
ssb.ExploreRecursiveEdge()),
).Node()
func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
var (
bandwidths = runenv.SizeArrayParam("bandwidths")
latencies []time.Duration
)
for i, latency := range latencies {
for j, bandwidth := range bandwidths {
round := i*len(latencies) + j
lats := runenv.StringArrayParam("latencies")
for _, l := range lats {
d, err := time.ParseDuration(l)
if err != nil {
panic(err)
}
latencies = append(latencies, d)
}
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), new(cid.Cid))
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
// prepend bandwidth=0 and latency=0 zero values; the first iteration will
// be a control iteration. The sidecar interprets zero values as no
// limitation on that attribute.
bandwidths = append([]uint64{0}, bandwidths...)
latencies = append([]time.Duration{0}, latencies...)
runenv.RecordMessage("waiting to start round %d", round)
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
var ret []networkParams
for _, bandwidth := range bandwidths {
for _, latency := range latencies {
ret = append(ret, networkParams{
latency: latency,
bandwidth: bandwidth,
})
}
}
return ret
}
sctx, scancel := context.WithCancel(ctx)
cidCh := make(chan *cid.Cid, 1)
initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh)
cid := <-cidCh
scancel()
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
ssb = builder.NewSelectorSpecBuilder(basicnode.Style.Any)
sel = ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
)
// make a go-ipld-prime link for the root UnixFS node
clink := cidlink.Link{Cid: *cid}
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
runenv.RecordMessage("ROUND %d: latency=%s, bandwidth=%d", round, latency, bandwidth)
runenv.RecordMessage("CID: %s", cid)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
runenv.RecordMessage("waiting for provider's network to be configured %d", round)
<-initCtx.SyncClient.MustBarrier(ctx, stateNet, 1).C
runenv.RecordMessage("network configured for round %d", round)
// clean up previous CIDs to attempt to free memory
// TODO does this work?
_ = dagsrv.RemoveMany(ctx, cids)
// execute the traversal.
runenv.RecordMessage(">>>>> requesting")
progressCh, errCh := gsync.Request(ctx, p.ID, clink, sel)
for r := range progressCh {
runenv.RecordMessage("******* progress: %+v", r)
}
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
runenv.RecordMessage("<<<<< request complete")
if len(errCh) > 0 {
return <-errCh
}
sctx, scancel := context.WithCancel(ctx)
cidCh := make(chan []cid.Cid, 1)
initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh)
cids = <-cidCh
scancel()
// run GC to get accurate-ish stats.
goruntime.GC()
goruntime.GC()
<-initCtx.SyncClient.MustBarrier(ctx, stateNet, 1).C
errgrp, grpctx := errgroup.WithContext(ctx)
for _, c := range cids {
c := c // capture
np := np // capture
errgrp.Go(func() error {
// make a go-ipld-prime link for the root UnixFS node
clink := cidlink.Link{Cid: c}
// execute the traversal.
runenv.RecordMessage("\t>>> requesting CID %s", c)
start := time.Now()
_, errCh := gsync.Request(grpctx, p.ID, clink, sel)
for err := range errCh {
return err
}
runenv.RecordMessage("\t<<< request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, time.Since(start))
// verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil {
return err
} else if node == nil {
return fmt.Errorf("finished graphsync request, but CID not in store")
}
return nil
})
}
if err := errgrp.Wait(); err != nil {
return err
}
}
return nil
}
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size int, latencies []time.Duration, bandwidths []uint64) error {
for i, latency := range latencies {
for j, bandwidth := range bandwidths {
round := i*len(latencies) + j
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), new(cid.Cid))
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
runenv.RecordMessage("waiting to start round %d", round)
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
// remove the previous CIDs from the dag service; hopefully this
// will delete them from the store and free up memory.
for _, c := range cids {
_ = dagsrv.Remove(ctx, c)
}
cids = cids[:0]
runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)
// generate as many random files as the concurrency level.
for i := 0; i < concurrency; i++ {
// file with random data
file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
// import to UnixFS
bufferedDS := format.NewBufferedDAG(ctx, dagsrv)
const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024
@ -210,36 +278,38 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
return fmt.Errorf("unable to create unix fs node: %w", err)
}
err = bufferedDS.Commit()
if err != nil {
return fmt.Errorf("unable to commit unix fs node: %w", err)
}
runenv.RecordMessage("CID is: %s", node.Cid())
initCtx.SyncClient.MustPublish(ctx, topicCid, node.Cid())
runenv.RecordMessage("ROUND %d: latency=%s, bandwidth=%d", round, latency, bandwidth)
runenv.RecordMessage("configuring network for round %d", round)
initCtx.NetClient.MustConfigureNetwork(ctx, &network.Config{
Network: "default",
Enable: true,
Default: network.LinkShape{
Latency: latency,
Bandwidth: bandwidth,
},
CallbackState: stateNet,
CallbackTarget: 1,
})
runenv.RecordMessage("network configured for round %d", round)
cids = append(cids, node.Cid())
}
if err := bufferedDS.Commit(); err != nil {
return fmt.Errorf("unable to commit unix fs node: %w", err)
}
// run GC to get accurate-ish stats.
goruntime.GC()
goruntime.GC()
runenv.RecordMessage("\tCIDs are: %v", cids)
initCtx.SyncClient.MustPublish(ctx, topicCid, cids)
runenv.RecordMessage("\tconfiguring network for round %d", round)
initCtx.NetClient.MustConfigureNetwork(ctx, &network.Config{
Network: "default",
Enable: true,
Default: network.LinkShape{
Latency: np.latency,
Bandwidth: np.bandwidth * 8, // bps
},
CallbackState: stateNet,
CallbackTarget: 1,
})
runenv.RecordMessage("\tnetwork configured for round %d", round)
}
return nil
}
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo) {
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) {
secureChannel := runenv.StringParam("secure_channel")
var security libp2p.Option
@ -255,9 +325,11 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
// ☎️ Let's construct the libp2p node.
ip := initCtx.NetClient.MustGetDataNetworkIP()
listenAddr := fmt.Sprintf("/ip4/%s/tcp/0", ip)
bwcounter := metrics.NewBandwidthCounter()
host, err := libp2p.New(ctx,
security,
libp2p.ListenAddrStrings(listenAddr),
libp2p.BandwidthReporter(bwcounter),
)
if err != nil {
panic(fmt.Sprintf("failed to instantiate libp2p instance: %s", err))
@ -303,38 +375,5 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
}
}
return host, peers
}
func makeLoader(bs blockstore.Blockstore) ipld.Loader {
return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
c, ok := lnk.(cidlink.Link)
if !ok {
return nil, errors.New("incorrect link type")
}
// read block from one store
block, err := bs.Get(c.Cid)
if err != nil {
return nil, err
}
return bytes.NewReader(block.RawData()), nil
}
}
func makeStorer(bs blockstore.Blockstore) ipld.Storer {
return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) {
var buf bytes.Buffer
var committer ipld.StoreCommitter = func(lnk ipld.Link) error {
c, ok := lnk.(cidlink.Link)
if !ok {
return errors.New("incorrect link type")
}
block, err := blocks.NewBlockWithCid(buf.Bytes(), c.Cid)
if err != nil {
return err
}
return bs.Put(block)
}
return &buf, committer, nil
}
return host, peers, bwcounter
}

View File

@ -17,7 +17,8 @@ name = "stress"
instances = { min = 2, max = 10000, default = 2 }
[testcases.params]
size = { type = "int", desc = "size of file to transfer", default = "524288" }
size = { type = "int", desc = "size of file to transfer, in human-friendly form", default = "1MiB" }
secure_channel = { type = "enum", desc = "secure channel used", values = ["secio", "noise", "tls"], default = "noise" }
latencies = { type = "string", desc = "latencies to try with; comma-separated list of durations", default = '["100ms", "200ms", "300ms"]' }
bandwidths = { type = "string", desc = "bandwidths (egress bytes/s) to try with; comma-separated list of humanized sizes", default = '["10M", "1M", "512kb"]' }
bandwidths = { type = "string", desc = "bandwidths (egress bytes/s) to try with; comma-separated list of humanized sizes", default = '["10M", "1M", "512kb"]' }
concurrency = { type = "int", desc = "concurrency level", default = "1" }