147 lines
3.7 KiB
Go
147 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/ipfs/go-cid"
|
|
|
|
"github.com/filecoin-project/oni/lotus-soup/testkit"
|
|
)
|
|
|
|
func dealsStress(t *testkit.TestEnvironment) error {
|
|
// Dispatch/forward non-client roles to defaults.
|
|
if t.Role != "client" {
|
|
return testkit.HandleDefaultRole(t)
|
|
}
|
|
|
|
t.RecordMessage("running client")
|
|
|
|
cl, err := testkit.PrepareClient(t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx := context.Background()
|
|
client := cl.FullApi
|
|
|
|
// select a random miner
|
|
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
|
|
if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// prepare a number of concurrent data points
|
|
deals := t.IntParam("deals")
|
|
data := make([][]byte, 0, deals)
|
|
files := make([]*os.File, 0, deals)
|
|
cids := make([]cid.Cid, 0, deals)
|
|
rng := rand.NewSource(time.Now().UnixNano())
|
|
|
|
for i := 0; i < deals; i++ {
|
|
dealData := make([]byte, 1600)
|
|
rand.New(rng).Read(dealData)
|
|
|
|
dealFile, err := ioutil.TempFile("/tmp", "data")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.Remove(dealFile.Name())
|
|
|
|
_, err = dealFile.Write(dealData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dealCid, err := client.ClientImport(ctx, api.FileRef{Path: dealFile.Name(), IsCAR: false})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.RecordMessage("deal %d file cid: %s", i, dealCid)
|
|
|
|
data = append(data, dealData)
|
|
files = append(files, dealFile)
|
|
cids = append(cids, dealCid.Root)
|
|
}
|
|
|
|
concurrentDeals := true
|
|
if t.StringParam("deal_mode") == "serial" {
|
|
concurrentDeals = false
|
|
}
|
|
|
|
// this to avoid failure to get block
|
|
time.Sleep(2 * time.Second)
|
|
|
|
t.RecordMessage("starting storage deals")
|
|
if concurrentDeals {
|
|
|
|
var wg1 sync.WaitGroup
|
|
for i := 0; i < deals; i++ {
|
|
wg1.Add(1)
|
|
go func(i int) {
|
|
defer wg1.Done()
|
|
t1 := time.Now()
|
|
deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i])
|
|
t.RecordMessage("started storage deal %d -> %s", i, deal)
|
|
time.Sleep(2 * time.Second)
|
|
t.RecordMessage("waiting for deal %d to be sealed", i)
|
|
testkit.WaitDealSealed(t, ctx, client, deal)
|
|
t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.MinerActorAddr)).Update(int64(time.Since(t1)))
|
|
}(i)
|
|
}
|
|
t.RecordMessage("waiting for all deals to be sealed")
|
|
wg1.Wait()
|
|
t.RecordMessage("all deals sealed; starting retrieval")
|
|
|
|
var wg2 sync.WaitGroup
|
|
for i := 0; i < deals; i++ {
|
|
wg2.Add(1)
|
|
go func(i int) {
|
|
defer wg2.Done()
|
|
t.RecordMessage("retrieving data for deal %d", i)
|
|
t1 := time.Now()
|
|
testkit.RetrieveData(t, ctx, client, cids[i], true, data[i])
|
|
t.RecordMessage("retrieved data for deal %d", i)
|
|
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
|
|
}(i)
|
|
}
|
|
t.RecordMessage("waiting for all retrieval deals to complete")
|
|
wg2.Wait()
|
|
t.RecordMessage("all retrieval deals successful")
|
|
|
|
} else {
|
|
|
|
for i := 0; i < deals; i++ {
|
|
deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i])
|
|
t.RecordMessage("started storage deal %d -> %s", i, deal)
|
|
time.Sleep(2 * time.Second)
|
|
t.RecordMessage("waiting for deal %d to be sealed", i)
|
|
testkit.WaitDealSealed(t, ctx, client, deal)
|
|
}
|
|
|
|
for i := 0; i < deals; i++ {
|
|
t.RecordMessage("retrieving data for deal %d", i)
|
|
testkit.RetrieveData(t, ctx, client, cids[i], true, data[i])
|
|
t.RecordMessage("retrieved data for deal %d", i)
|
|
}
|
|
}
|
|
|
|
t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining)
|
|
t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount)
|
|
|
|
time.Sleep(15 * time.Second) // wait for metrics to be emitted
|
|
|
|
return nil
|
|
}
|