lotus/lotus-soup/baseline.go

254 lines
6.8 KiB
Go
Raw Normal View History

package main
import (
2020-06-24 13:10:01 +00:00
"bytes"
"context"
"fmt"
2020-06-24 13:16:10 +00:00
"io/ioutil"
2020-06-24 13:10:01 +00:00
"math/rand"
2020-06-24 13:16:10 +00:00
"os"
"path/filepath"
2020-06-24 13:10:01 +00:00
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
2020-06-24 13:10:01 +00:00
"github.com/ipfs/go-cid"
2020-06-24 13:16:10 +00:00
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car"
)
// This is the basline test; Filecoin 101.
//
// A network with a bootstrapper, a number of miners, and a number of clients/full nodes
// is constructed and connected through the bootstrapper.
// Some funds are allocated to each node and a number of sectors are presealed in the genesis block.
//
// The test plan:
// One or more clients store content to one or more miners, testing storage deals.
// The plan ensures that the storage deals hit the blockchain and measure the time it took.
// Verification: one or more clients retrieve and verify the hashes of stored content.
// The plan ensures that all (previously) published content can be correctly retrieved
// and measures the time it took.
//
// Preparation of the genesis block: this is the responsibility of the bootstrapper.
// In order to compute the genesis block, we need to collect identities and presealed
// sectors from each node.
// The we create a genesis block that allocates some funds to each node and collects
// the presealed sectors.
var baselineRoles = map[string]func(*TestEnvironment) error{
2020-06-26 09:04:20 +00:00
"bootstrapper": runBootstrapper,
"miner": runMiner,
"client": runBaselineClient,
"drand": runDrandNode,
"pubsub-tracer": runPubsubTracer,
}
func runBaselineClient(t *TestEnvironment) error {
t.RecordMessage("running client")
2020-06-24 13:10:01 +00:00
cl, err := prepareClient(t)
if err != nil {
return err
}
ctx := context.Background()
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
if err != nil {
return err
}
t.RecordMessage("got %v miner addrs", len(addrs))
client := cl.fullApi
// select a random miner
minerAddr := addrs[rand.Intn(len(addrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
2020-06-24 12:40:29 +00:00
return err
}
2020-06-26 16:56:09 +00:00
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
2020-06-24 12:40:29 +00:00
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
2020-06-24 12:40:29 +00:00
time.Sleep(2 * time.Second)
2020-06-24 14:46:12 +00:00
2020-06-24 13:29:27 +00:00
// generate random data
2020-06-24 13:10:01 +00:00
data := make([]byte, 1600)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
file, err := ioutil.TempFile("/tmp", "data")
if err != nil {
return err
}
defer os.Remove(file.Name())
_, err = file.Write(data)
if err != nil {
return err
}
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
2020-06-24 13:10:01 +00:00
if err != nil {
return err
}
t.RecordMessage("file cid: %s", fcid)
// start deal
2020-06-26 12:09:58 +00:00
t1 := time.Now()
deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
2020-06-24 13:29:27 +00:00
t.RecordMessage("started deal: %s", deal)
2020-06-24 13:10:01 +00:00
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second)
2020-06-24 13:10:01 +00:00
t.RecordMessage("waiting for deal to be sealed")
waitDealSealed(t, ctx, client, deal)
2020-06-26 12:09:58 +00:00
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
2020-06-24 13:16:10 +00:00
carExport := true
2020-06-24 13:29:27 +00:00
t.RecordMessage("trying to retrieve %s", fcid)
retrieveData(t, ctx, err, client, fcid, carExport, data)
2020-06-26 12:09:58 +00:00
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
2020-06-24 13:10:01 +00:00
2020-06-26 13:24:01 +00:00
time.Sleep(10 * time.Second) // wait for metrics to be emitted
// TODO broadcast published content CIDs to other clients
2020-06-24 13:10:01 +00:00
// TODO select a random piece of content published by some other client and retrieve it
2020-06-24 12:29:04 +00:00
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
return nil
}
func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid {
2020-06-24 13:40:30 +00:00
addr, err := client.WalletDefaultAddress(ctx)
if err != nil {
panic(err)
}
2020-06-24 13:10:01 +00:00
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
Data: &storagemarket.DataRef{Root: fcid},
Wallet: addr,
Miner: minerActorAddr,
EpochPrice: types.NewInt(1000000),
2020-06-24 15:39:37 +00:00
MinBlocksDuration: 1000,
2020-06-24 13:10:01 +00:00
})
if err != nil {
panic(err)
}
return deal
}
2020-06-24 13:16:10 +00:00
func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) {
2020-06-24 13:16:10 +00:00
loop:
for {
di, err := client.ClientGetDealInfo(ctx, *deal)
if err != nil {
panic(err)
}
switch di.State {
case storagemarket.StorageDealProposalRejected:
panic("deal rejected")
case storagemarket.StorageDealFailing:
panic("deal failed")
case storagemarket.StorageDealError:
panic(fmt.Sprintf("deal errored %s", di.Message))
case storagemarket.StorageDealActive:
t.RecordMessage("completed deal: %s", di)
2020-06-24 13:16:10 +00:00
break loop
}
t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State])
2020-06-24 14:12:10 +00:00
time.Sleep(2 * time.Second)
2020-06-24 13:16:10 +00:00
}
}
func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
2020-06-26 12:09:58 +00:00
t1 := time.Now()
2020-06-24 13:16:10 +00:00
offers, err := client.ClientFindData(ctx, fcid)
if err != nil {
panic(err)
}
2020-06-26 12:09:58 +00:00
for _, o := range offers {
2020-06-26 16:56:09 +00:00
t.D().Counter(fmt.Sprintf("find-data.offer,miner=%s", o.Miner)).Inc(1)
2020-06-26 12:09:58 +00:00
}
t.D().ResettingHistogram("find-data").Update(int64(time.Since(t1)))
2020-06-24 13:16:10 +00:00
if len(offers) < 1 {
panic("no offers")
}
rpath, err := ioutil.TempDir("", "lotus-retrieve-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(rpath)
caddr, err := client.WalletDefaultAddress(ctx)
if err != nil {
panic(err)
}
ref := &api.FileRef{
Path: filepath.Join(rpath, "ret"),
IsCAR: carExport,
}
2020-06-26 12:09:58 +00:00
t1 = time.Now()
2020-06-24 13:16:10 +00:00
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
if err != nil {
panic(err)
}
2020-06-26 12:09:58 +00:00
t.D().ResettingHistogram("retrieve-data").Update(int64(time.Since(t1)))
2020-06-24 13:16:10 +00:00
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))
if err != nil {
panic(err)
}
if carExport {
rdata = extractCarData(ctx, rdata, rpath)
}
if !bytes.Equal(rdata, data) {
panic("wrong data retrieved")
}
t.RecordMessage("retrieved successfully")
2020-06-24 13:16:10 +00:00
}
func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
bserv := dstest.Bserv()
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
if err != nil {
panic(err)
}
b, err := bserv.GetBlock(ctx, ch.Roots[0])
if err != nil {
panic(err)
}
nd, err := ipld.Decode(b)
if err != nil {
panic(err)
}
dserv := dag.NewDAGService(bserv)
fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd)
if err != nil {
panic(err)
}
outPath := filepath.Join(rpath, "retLoadedCAR")
if err := files.WriteTo(fil, outPath); err != nil {
panic(err)
}
rdata, err = ioutil.ReadFile(outPath)
if err != nil {
panic(err)
}
return rdata
}