package main import ( "context" "fmt" "io/ioutil" "math/rand" "os" "sync" "time" "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/testplans/lotus-soup/testkit" ) func dealsStress(t *testkit.TestEnvironment) error { // Dispatch/forward non-client roles to defaults. if t.Role != "client" { return testkit.HandleDefaultRole(t) } t.RecordMessage("running client") cl, err := testkit.PrepareClient(t) if err != nil { return err } ctx := context.Background() client := cl.FullApi // select a random miner minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))] if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil { return err } t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr) time.Sleep(12 * time.Second) // prepare a number of concurrent data points deals := t.IntParam("deals") data := make([][]byte, 0, deals) files := make([]*os.File, 0, deals) cids := make([]cid.Cid, 0, deals) rng := rand.NewSource(time.Now().UnixNano()) for i := 0; i < deals; i++ { dealData := make([]byte, 1600) rand.New(rng).Read(dealData) dealFile, err := ioutil.TempFile("/tmp", "data") if err != nil { return err } defer os.Remove(dealFile.Name()) _, err = dealFile.Write(dealData) if err != nil { return err } dealCid, err := client.ClientImport(ctx, api.FileRef{Path: dealFile.Name(), IsCAR: false}) if err != nil { return err } t.RecordMessage("deal %d file cid: %s", i, dealCid) data = append(data, dealData) files = append(files, dealFile) cids = append(cids, dealCid.Root) } concurrentDeals := true if t.StringParam("deal_mode") == "serial" { concurrentDeals = false } // this to avoid failure to get block time.Sleep(2 * time.Second) t.RecordMessage("starting storage deals") if concurrentDeals { var wg1 sync.WaitGroup for i := 0; i < deals; i++ { wg1.Add(1) go func(i int) { defer wg1.Done() t1 := time.Now() deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i], false) t.RecordMessage("started storage deal %d -> %s", i, deal) time.Sleep(2 * time.Second) t.RecordMessage("waiting for deal %d to be sealed", i) testkit.WaitDealSealed(t, ctx, client, deal) t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.MinerActorAddr)).Update(int64(time.Since(t1))) }(i) } t.RecordMessage("waiting for all deals to be sealed") wg1.Wait() t.RecordMessage("all deals sealed; starting retrieval") var wg2 sync.WaitGroup for i := 0; i < deals; i++ { wg2.Add(1) go func(i int) { defer wg2.Done() t.RecordMessage("retrieving data for deal %d", i) t1 := time.Now() _ = testkit.RetrieveData(t, ctx, client, cids[i], nil, true, data[i]) t.RecordMessage("retrieved data for deal %d", i) t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1))) }(i) } t.RecordMessage("waiting for all retrieval deals to complete") wg2.Wait() t.RecordMessage("all retrieval deals successful") } else { for i := 0; i < deals; i++ { deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i], false) t.RecordMessage("started storage deal %d -> %s", i, deal) time.Sleep(2 * time.Second) t.RecordMessage("waiting for deal %d to be sealed", i) testkit.WaitDealSealed(t, ctx, client, deal) } for i := 0; i < deals; i++ { t.RecordMessage("retrieving data for deal %d", i) _ = testkit.RetrieveData(t, ctx, client, cids[i], nil, true, data[i]) t.RecordMessage("retrieved data for deal %d", i) } } t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining) t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount) time.Sleep(15 * time.Second) // wait for metrics to be emitted return nil }