diff --git a/lotus-soup/deal_stresstest.go b/lotus-soup/deal_stresstest.go new file mode 100644 index 000000000..c8b323ddc --- /dev/null +++ b/lotus-soup/deal_stresstest.go @@ -0,0 +1,143 @@ +package main + +import ( + "context" + "io/ioutil" + "math/rand" + "os" + "sync" + "time" + + "github.com/filecoin-project/lotus/api" + + "github.com/ipfs/go-cid" +) + +var dealStressTestRoles = map[string]func(*TestEnvironment) error{ + "bootstrapper": runBootstrapper, + "miner": runMiner, + "client": runDealStressTestClient, + "drand": runDrandNode, + "pubsub-tracer": runPubsubTracer, +} + +func runDealStressTestClient(t *TestEnvironment) error { + t.RecordMessage("running client") + cl, err := prepareClient(t) + if err != nil { + return err + } + + ctx := context.Background() + addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners")) + if err != nil { + return err + } + t.RecordMessage("got %v miner addrs", len(addrs)) + + client := cl.fullApi + + // select a random miner + minerAddr := addrs[rand.Intn(len(addrs))] + if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil { + return err + } + + t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr) + + time.Sleep(2 * time.Second) + + // prepare a number of concurrent data points + deals := t.IntParam("deals") + data := make([][]byte, 0, deals) + files := make([]*os.File, 0, deals) + cids := make([]cid.Cid, 0, deals) + rng := rand.NewSource(time.Now().UnixNano()) + + for i := 0; i < deals; i++ { + dealData := make([]byte, 1600) + rand.New(rng).Read(dealData) + + dealFile, err := ioutil.TempFile("/tmp", "data") + if err != nil { + return err + } + defer os.Remove(dealFile.Name()) + + _, err = dealFile.Write(dealData) + if err != nil { + return err + } + + dealCid, err := client.ClientImport(ctx, api.FileRef{Path: dealFile.Name(), IsCAR: false}) + if err != nil { + return err + } + + t.RecordMessage("deal %d file cid: %s", i, dealCid) + + data = append(data, dealData) + files = append(files, dealFile) + cids = append(cids, dealCid) + } + + concurrentDeals := true + if t.StringParam("deal_mode") == "serial" { + concurrentDeals = false + } + + t.RecordMessage("starting storage deals") + if concurrentDeals { + + var wg1 sync.WaitGroup + for i := 0; i < deals; i++ { + wg1.Add(1) + go func(i int) { + defer wg1.Done() + deal := startDeal(ctx, minerAddr.ActorAddr, client, cids[i]) + t.RecordMessage("started storage deal %d -> %s", i, deal) + time.Sleep(2 * time.Second) + t.RecordMessage("waiting for deal %d to be sealed", i) + waitDealSealed(t, ctx, client, deal) + }(i) + } + t.RecordMessage("waiting for all deals to be sealed") + wg1.Wait() + t.RecordMessage("all deals sealed; starting retrieval") + + var wg2 sync.WaitGroup + for i := 0; i < deals; i++ { + wg2.Add(1) + go func(i int) { + defer wg2.Done() + t.RecordMessage("retrieving data for deal %d", i) + retrieveData(t, ctx, client, cids[i], true, data[i]) + t.RecordMessage("retrieved data for deal %d", i) + }(i) + } + t.RecordMessage("waiting for all retrieval deals to complete") + wg2.Wait() + t.RecordMessage("all retrieval deals successful") + + } else { + + for i := 0; i < deals; i++ { + deal := startDeal(ctx, minerAddr.ActorAddr, client, cids[i]) + t.RecordMessage("started storage deal %s", deal) + time.Sleep(2 * time.Second) + t.RecordMessage("waiting for deal to be sealed") + waitDealSealed(t, ctx, client, deal) + } + + for i := 0; i < deals; i++ { + t.RecordMessage("retrieving data for deal %d", i) + retrieveData(t, ctx, client, cids[i], true, data[i]) + t.RecordMessage("retrieved data for deal %d", i) + } + } + + t.SyncClient.MustSignalEntry(ctx, stateStopMining) + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + + return nil +} diff --git a/lotus-soup/main.go b/lotus-soup/main.go index e0906be1e..1d079536a 100644 --- a/lotus-soup/main.go +++ b/lotus-soup/main.go @@ -8,7 +8,8 @@ import ( ) var testplans = map[string]interface{}{ - "lotus-baseline": doRun(baselineRoles), + "lotus-baseline": doRun(baselineRoles), + "lotus-deal-stress-test": doRun(dealStressTestRoles), } func main() {