deal stress test
This commit is contained in:
parent
62f63a29b5
commit
36f82ab9fe
143
lotus-soup/deal_stresstest.go
Normal file
143
lotus-soup/deal_stresstest.go
Normal file
@ -0,0 +1,143 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
var dealStressTestRoles = map[string]func(*TestEnvironment) error{
|
||||
"bootstrapper": runBootstrapper,
|
||||
"miner": runMiner,
|
||||
"client": runDealStressTestClient,
|
||||
"drand": runDrandNode,
|
||||
"pubsub-tracer": runPubsubTracer,
|
||||
}
|
||||
|
||||
func runDealStressTestClient(t *TestEnvironment) error {
|
||||
t.RecordMessage("running client")
|
||||
cl, err := prepareClient(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.RecordMessage("got %v miner addrs", len(addrs))
|
||||
|
||||
client := cl.fullApi
|
||||
|
||||
// select a random miner
|
||||
minerAddr := addrs[rand.Intn(len(addrs))]
|
||||
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// prepare a number of concurrent data points
|
||||
deals := t.IntParam("deals")
|
||||
data := make([][]byte, 0, deals)
|
||||
files := make([]*os.File, 0, deals)
|
||||
cids := make([]cid.Cid, 0, deals)
|
||||
rng := rand.NewSource(time.Now().UnixNano())
|
||||
|
||||
for i := 0; i < deals; i++ {
|
||||
dealData := make([]byte, 1600)
|
||||
rand.New(rng).Read(dealData)
|
||||
|
||||
dealFile, err := ioutil.TempFile("/tmp", "data")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(dealFile.Name())
|
||||
|
||||
_, err = dealFile.Write(dealData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dealCid, err := client.ClientImport(ctx, api.FileRef{Path: dealFile.Name(), IsCAR: false})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.RecordMessage("deal %d file cid: %s", i, dealCid)
|
||||
|
||||
data = append(data, dealData)
|
||||
files = append(files, dealFile)
|
||||
cids = append(cids, dealCid)
|
||||
}
|
||||
|
||||
concurrentDeals := true
|
||||
if t.StringParam("deal_mode") == "serial" {
|
||||
concurrentDeals = false
|
||||
}
|
||||
|
||||
t.RecordMessage("starting storage deals")
|
||||
if concurrentDeals {
|
||||
|
||||
var wg1 sync.WaitGroup
|
||||
for i := 0; i < deals; i++ {
|
||||
wg1.Add(1)
|
||||
go func(i int) {
|
||||
defer wg1.Done()
|
||||
deal := startDeal(ctx, minerAddr.ActorAddr, client, cids[i])
|
||||
t.RecordMessage("started storage deal %d -> %s", i, deal)
|
||||
time.Sleep(2 * time.Second)
|
||||
t.RecordMessage("waiting for deal %d to be sealed", i)
|
||||
waitDealSealed(t, ctx, client, deal)
|
||||
}(i)
|
||||
}
|
||||
t.RecordMessage("waiting for all deals to be sealed")
|
||||
wg1.Wait()
|
||||
t.RecordMessage("all deals sealed; starting retrieval")
|
||||
|
||||
var wg2 sync.WaitGroup
|
||||
for i := 0; i < deals; i++ {
|
||||
wg2.Add(1)
|
||||
go func(i int) {
|
||||
defer wg2.Done()
|
||||
t.RecordMessage("retrieving data for deal %d", i)
|
||||
retrieveData(t, ctx, client, cids[i], true, data[i])
|
||||
t.RecordMessage("retrieved data for deal %d", i)
|
||||
}(i)
|
||||
}
|
||||
t.RecordMessage("waiting for all retrieval deals to complete")
|
||||
wg2.Wait()
|
||||
t.RecordMessage("all retrieval deals successful")
|
||||
|
||||
} else {
|
||||
|
||||
for i := 0; i < deals; i++ {
|
||||
deal := startDeal(ctx, minerAddr.ActorAddr, client, cids[i])
|
||||
t.RecordMessage("started storage deal %s", deal)
|
||||
time.Sleep(2 * time.Second)
|
||||
t.RecordMessage("waiting for deal to be sealed")
|
||||
waitDealSealed(t, ctx, client, deal)
|
||||
}
|
||||
|
||||
for i := 0; i < deals; i++ {
|
||||
t.RecordMessage("retrieving data for deal %d", i)
|
||||
retrieveData(t, ctx, client, cids[i], true, data[i])
|
||||
t.RecordMessage("retrieved data for deal %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -8,7 +8,8 @@ import (
|
||||
)
|
||||
|
||||
var testplans = map[string]interface{}{
|
||||
"lotus-baseline": doRun(baselineRoles),
|
||||
"lotus-baseline": doRun(baselineRoles),
|
||||
"lotus-deal-stress-test": doRun(dealStressTestRoles),
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user