itests: create deal harness.

This commit is contained in:
Raúl Kripalani 2021-05-20 16:12:42 +01:00
parent 1902c4c687
commit 25daa0c8e4
8 changed files with 224 additions and 172 deletions

View File

@ -1,6 +1,7 @@
package itests package itests
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -57,16 +58,20 @@ func TestBatchDealInput(t *testing.T) {
n, sn := kit.MockMinerBuilder(t, kit.OneFull, minerDef) n, sn := kit.MockMinerBuilder(t, kit.OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
s := kit.ConnectAndStartMining(t, blockTime, client, miner)
defer s.BlockMiner.Stop() blockMiner := kit.ConnectAndStartMining(t, blockTime, miner, client)
t.Cleanup(blockMiner.Stop)
dh := kit.NewDealHarness(t, client, miner)
ctx := context.Background()
// Starts a deal and waits until it's published // Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) { runDealTillSeal := func(rseed int) {
res, _, err := kit.CreateClientFile(s.Ctx, s.Client, rseed) res, _, err := kit.CreateImportFile(ctx, client, rseed)
require.NoError(t, err) require.NoError(t, err)
dc := kit.StartDeal(t, s.Ctx, s.Miner, s.Client, res.Root, false, dealStartEpoch) deal := dh.StartDeal(ctx, res.Root, false, dealStartEpoch)
kit.WaitDealSealed(t, s.Ctx, s.Miner, s.Client, dc, false) dh.WaitDealSealed(ctx, deal, false)
} }
// Run maxDealsPerMsg+1 deals in parallel // Run maxDealsPerMsg+1 deals in parallel
@ -84,7 +89,7 @@ func TestBatchDealInput(t *testing.T) {
<-done <-done
} }
sl, err := sn[0].SectorsList(s.Ctx) sl, err := sn[0].SectorsList(ctx)
require.NoError(t, err) require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), 4) require.GreaterOrEqual(t, len(sl), 4)
require.LessOrEqual(t, len(sl), 5) require.LessOrEqual(t, len(sl), 5)

View File

@ -92,7 +92,9 @@ func runTestCCUpgrade(t *testing.T, b kit.APIBuilder, blocktime time.Duration, u
t.Fatal(err) t.Fatal(err)
} }
kit.MakeDeal(t, ctx, 6, client, miner, false, false, 0) dh := kit.NewDealHarness(t, client, miner)
dh.MakeFullDeal(context.Background(), 6, false, false, 0)
// Validate upgrade // Validate upgrade

View File

@ -79,6 +79,8 @@ func TestAPIDealFlowReal(t *testing.T) {
} }
func TestPublishDealsBatching(t *testing.T) { func TestPublishDealsBatching(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs() kit.QuietMiningLogs()
b := kit.MockMinerBuilder b := kit.MockMinerBuilder
@ -104,18 +106,20 @@ func TestPublishDealsBatching(t *testing.T) {
n, sn := b(t, kit.OneFull, minerDef) n, sn := b(t, kit.OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
s := kit.ConnectAndStartMining(t, blocktime, client, miner)
defer s.BlockMiner.Stop() kit.ConnectAndStartMining(t, blocktime, miner, client)
dh := kit.NewDealHarness(t, client, miner)
// Starts a deal and waits until it's published // Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) { runDealTillPublish := func(rseed int) {
res, _, err := kit.CreateClientFile(s.Ctx, s.Client, rseed) res, _, err := kit.CreateImportFile(ctx, client, rseed)
require.NoError(t, err) require.NoError(t, err)
upds, err := client.ClientGetDealUpdates(s.Ctx) upds, err := client.ClientGetDealUpdates(ctx)
require.NoError(t, err) require.NoError(t, err)
kit.StartDeal(t, s.Ctx, s.Miner, s.Client, res.Root, false, startEpoch) dh.StartDeal(ctx, res.Root, false, startEpoch)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
@ -147,11 +151,11 @@ func TestPublishDealsBatching(t *testing.T) {
} }
// Expect a single PublishStorageDeals message that includes the first two deals // Expect a single PublishStorageDeals message that includes the first two deals
msgCids, err := s.Client.StateListMessages(s.Ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1) msgCids, err := client.StateListMessages(ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
require.NoError(t, err) require.NoError(t, err)
count := 0 count := 0
for _, msgCid := range msgCids { for _, msgCid := range msgCids {
msg, err := s.Client.ChainGetMessage(s.Ctx, msgCid) msg, err := client.ChainGetMessage(ctx, msgCid)
require.NoError(t, err) require.NoError(t, err)
if msg.Method == market.Methods.PublishStorageDeals { if msg.Method == market.Methods.PublishStorageDeals {
@ -187,13 +191,15 @@ func TestDealMining(t *testing.T) {
blocktime := 50 * time.Millisecond blocktime := 50 * time.Millisecond
ctx := context.Background() ctx := context.Background()
n, sn := b(t, kit.OneFull, []kit.StorageMiner{ fulls, miners := b(t,
{Full: 0, Preseal: kit.PresealGenesis}, kit.OneFull,
{Full: 0, Preseal: 0}, // TODO: Add support for miners on non-first full node []kit.StorageMiner{
}) {Full: 0, Preseal: kit.PresealGenesis},
client := n[0].FullNode.(*impl.FullNodeAPI) {Full: 0, Preseal: 0}, // TODO: Add support for miners on non-first full node
provider := sn[1] })
genesisMiner := sn[0] client := fulls[0].FullNode.(*impl.FullNodeAPI)
genesisMiner := miners[0]
provider := miners[1]
addrinfo, err := client.NetAddrsListen(ctx) addrinfo, err := client.NetAddrsListen(ctx)
if err != nil { if err != nil {
@ -225,7 +231,7 @@ func TestDealMining(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
minedTwo := make(chan struct{}) minedTwo := make(chan struct{})
m2addr, err := sn[1].ActorAddress(context.TODO()) m2addr, err := miners[1].ActorAddress(context.TODO())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -244,11 +250,11 @@ func TestDealMining(t *testing.T) {
wait <- n wait <- n
} }
if err := sn[0].MineOne(ctx, miner.MineReq{Done: mdone}); err != nil { if err := miners[0].MineOne(ctx, miner.MineReq{Done: mdone}); err != nil {
t.Error(err) t.Error(err)
} }
if err := sn[1].MineOne(ctx, miner.MineReq{Done: mdone}); err != nil { if err := miners[1].MineOne(ctx, miner.MineReq{Done: mdone}); err != nil {
t.Error(err) t.Error(err)
} }
@ -262,7 +268,7 @@ func TestDealMining(t *testing.T) {
} }
var nodeOneMined bool var nodeOneMined bool
for _, node := range sn { for _, node := range miners {
mb, err := node.MiningBase(ctx) mb, err := node.MiningBase(ctx)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -286,12 +292,14 @@ func TestDealMining(t *testing.T) {
} }
}() }()
deal := kit.StartDeal(t, ctx, provider, client, fcid, false, 0) dh := kit.NewDealHarness(t, client, provider)
deal := dh.StartDeal(ctx, fcid, false, 0)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
kit.WaitDealSealed(t, ctx, provider, client, deal, false) dh.WaitDealSealed(ctx, deal, false)
<-minedTwo <-minedTwo
@ -301,51 +309,68 @@ func TestDealMining(t *testing.T) {
} }
func runFullDealCycles(t *testing.T, n int, b kit.APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) { func runFullDealCycles(t *testing.T, n int, b kit.APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
s := kit.SetupOneClientOneMiner(t, b, blocktime) fulls, miners := b(t, kit.OneFull, kit.OneMiner)
defer s.BlockMiner.Stop() client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0]
kit.ConnectAndStartMining(t, blocktime, miner, client)
dh := kit.NewDealHarness(t, client, miner)
baseseed := 6 baseseed := 6
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
kit.MakeDeal(t, s.Ctx, baseseed+i, s.Client, s.Miner, carExport, fastRet, startEpoch) dh.MakeFullDeal(context.Background(), baseseed+i, carExport, fastRet, startEpoch)
} }
} }
func runFastRetrievalDealFlowT(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { func runFastRetrievalDealFlowT(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := kit.SetupOneClientOneMiner(t, b, blocktime) ctx := context.Background()
defer s.BlockMiner.Stop()
fulls, miners := b(t, kit.OneFull, kit.OneMiner)
client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0]
kit.ConnectAndStartMining(t, blocktime, miner, client)
dh := kit.NewDealHarness(t, client, miner)
data := make([]byte, 1600) data := make([]byte, 1600)
rand.New(rand.NewSource(int64(8))).Read(data) rand.New(rand.NewSource(int64(8))).Read(data)
r := bytes.NewReader(data) r := bytes.NewReader(data)
fcid, err := s.Client.ClientImportLocal(s.Ctx, r) fcid, err := client.ClientImportLocal(ctx, r)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fmt.Println("FILE CID: ", fcid) fmt.Println("FILE CID: ", fcid)
deal := kit.StartDeal(t, s.Ctx, s.Miner, s.Client, fcid, true, startEpoch) deal := dh.StartDeal(ctx, fcid, true, startEpoch)
dh.WaitDealPublished(ctx, deal)
kit.WaitDealPublished(t, s.Ctx, s.Miner, deal)
fmt.Println("deal published, retrieving") fmt.Println("deal published, retrieving")
// Retrieval // Retrieval
info, err := s.Client.ClientGetDealInfo(s.Ctx, *deal) info, err := client.ClientGetDealInfo(ctx, *deal)
require.NoError(t, err) require.NoError(t, err)
kit.TestRetrieval(t, s.Ctx, s.Client, fcid, &info.PieceCID, false, data) dh.TestRetrieval(ctx, fcid, &info.PieceCID, false, data)
} }
func runSecondDealRetrievalTest(t *testing.T, b kit.APIBuilder, blocktime time.Duration) { func runSecondDealRetrievalTest(t *testing.T, b kit.APIBuilder, blocktime time.Duration) {
s := kit.SetupOneClientOneMiner(t, b, blocktime) ctx := context.Background()
defer s.BlockMiner.Stop()
fulls, miners := b(t, kit.OneFull, kit.OneMiner)
client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0]
kit.ConnectAndStartMining(t, blocktime, miner, client)
dh := kit.NewDealHarness(t, client, miner)
{ {
data1 := make([]byte, 800) data1 := make([]byte, 800)
rand.New(rand.NewSource(int64(3))).Read(data1) rand.New(rand.NewSource(int64(3))).Read(data1)
r := bytes.NewReader(data1) r := bytes.NewReader(data1)
fcid1, err := s.Client.ClientImportLocal(s.Ctx, r) fcid1, err := client.ClientImportLocal(ctx, r)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -354,44 +379,50 @@ func runSecondDealRetrievalTest(t *testing.T, b kit.APIBuilder, blocktime time.D
rand.New(rand.NewSource(int64(9))).Read(data2) rand.New(rand.NewSource(int64(9))).Read(data2)
r2 := bytes.NewReader(data2) r2 := bytes.NewReader(data2)
fcid2, err := s.Client.ClientImportLocal(s.Ctx, r2) fcid2, err := client.ClientImportLocal(ctx, r2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
deal1 := kit.StartDeal(t, s.Ctx, s.Miner, s.Client, fcid1, true, 0) deal1 := dh.StartDeal(ctx, fcid1, true, 0)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
kit.WaitDealSealed(t, s.Ctx, s.Miner, s.Client, deal1, true) dh.WaitDealSealed(ctx, deal1, true)
deal2 := kit.StartDeal(t, s.Ctx, s.Miner, s.Client, fcid2, true, 0) deal2 := dh.StartDeal(ctx, fcid2, true, 0)
time.Sleep(time.Second) time.Sleep(time.Second)
kit.WaitDealSealed(t, s.Ctx, s.Miner, s.Client, deal2, false) dh.WaitDealSealed(ctx, deal2, false)
// Retrieval // Retrieval
info, err := s.Client.ClientGetDealInfo(s.Ctx, *deal2) info, err := client.ClientGetDealInfo(ctx, *deal2)
require.NoError(t, err) require.NoError(t, err)
rf, _ := s.Miner.SectorsRefs(s.Ctx) rf, _ := miner.SectorsRefs(ctx)
fmt.Printf("refs: %+v\n", rf) fmt.Printf("refs: %+v\n", rf)
kit.TestRetrieval(t, s.Ctx, s.Client, fcid2, &info.PieceCID, false, data2) dh.TestRetrieval(ctx, fcid2, &info.PieceCID, false, data2)
} }
} }
func runZeroPricePerByteRetrievalDealFlow(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { func runZeroPricePerByteRetrievalDealFlow(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := kit.SetupOneClientOneMiner(t, b, blocktime) ctx := context.Background()
defer s.BlockMiner.Stop()
fulls, miners := b(t, kit.OneFull, kit.OneMiner)
client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0]
kit.ConnectAndStartMining(t, blocktime, miner, client)
dh := kit.NewDealHarness(t, client, miner)
// Set price-per-byte to zero // Set price-per-byte to zero
ask, err := s.Miner.MarketGetRetrievalAsk(s.Ctx) ask, err := miner.MarketGetRetrievalAsk(ctx)
require.NoError(t, err) require.NoError(t, err)
ask.PricePerByte = abi.NewTokenAmount(0) ask.PricePerByte = abi.NewTokenAmount(0)
err = s.Miner.MarketSetRetrievalAsk(s.Ctx, ask) err = miner.MarketSetRetrievalAsk(ctx, ask)
require.NoError(t, err) require.NoError(t, err)
kit.MakeDeal(t, s.Ctx, 6, s.Client, s.Miner, false, false, startEpoch) dh.MakeFullDeal(ctx, 6, false, false, startEpoch)
} }

View File

@ -206,7 +206,9 @@ func TestDealFlow(t *testing.T) {
// a deal start epoch that is guaranteed to be far enough in the future // a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time // so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12) dealStartEpoch := abi.ChainEpoch(2 << 12)
kit.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false, dealStartEpoch)
dh := kit.NewDealHarness(t, nodes.lite, nodes.miner)
dh.MakeFullDeal(ctx, 6, false, false, dealStartEpoch)
} }
func TestCLIDealFlow(t *testing.T) { func TestCLIDealFlow(t *testing.T) {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
@ -11,6 +12,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/v2/actors/builtin" "github.com/filecoin-project/specs-actors/v2/actors/builtin"
@ -41,7 +43,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode TestFullNode)
// Create a deal (non-interactive) // Create a deal (non-interactive)
// Client deal --start-epoch=<start epoch> <cid> <Miner addr> 1000000attofil <duration> // Client deal --start-epoch=<start epoch> <cid> <Miner addr> 1000000attofil <duration>
res, _, err := CreateClientFile(ctx, clientNode, 1) res, _, err := CreateImportFile(ctx, clientNode, 1)
require.NoError(t, err) require.NoError(t, err)
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12) startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
dataCid := res.Root dataCid := res.Root
@ -57,7 +59,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode TestFullNode)
// <Miner addr> // <Miner addr>
// "no" (verified Client) // "no" (verified Client)
// "yes" (confirm deal) // "yes" (confirm deal)
res, _, err = CreateClientFile(ctx, clientNode, 2) res, _, err = CreateImportFile(ctx, clientNode, 2)
require.NoError(t, err) require.NoError(t, err)
dataCid2 := res.Root dataCid2 := res.Root
duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay) duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay)
@ -107,3 +109,25 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode TestFullNode)
fmt.Println("retrieve:\n", out) fmt.Println("retrieve:\n", out)
require.Regexp(t, regexp.MustCompile("Success"), out) require.Regexp(t, regexp.MustCompile("Success"), out)
} }
func CreateImportFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) {
data := make([]byte, 1600)
rand.New(rand.NewSource(int64(rseed))).Read(data)
dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
if err != nil {
return nil, nil, err
}
path := filepath.Join(dir, "sourcefile.dat")
err = ioutil.WriteFile(path, data, 0644)
if err != nil {
return nil, nil, err
}
res, err := client.ClientImport(ctx, api.FileRef{Path: path})
if err != nil {
return nil, nil, err
}
return res, data, nil
}

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -29,61 +28,54 @@ import (
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
) )
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestMiner, carExport, fastRet bool, startEpoch abi.ChainEpoch) { type DealHarness struct {
res, data, err := CreateClientFile(ctx, client, rseed) t *testing.T
client api.FullNode
miner TestMiner
}
// NewDealHarness creates a test harness that contains testing utilities for deals.
func NewDealHarness(t *testing.T, client api.FullNode, miner TestMiner) *DealHarness {
return &DealHarness{
t: t,
client: client,
miner: miner,
}
}
func (dh *DealHarness) MakeFullDeal(ctx context.Context, rseed int, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
res, data, err := CreateImportFile(ctx, dh.client, rseed)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
fcid := res.Root fcid := res.Root
fmt.Println("FILE CID: ", fcid) fmt.Println("FILE CID: ", fcid)
deal := StartDeal(t, ctx, miner, client, fcid, fastRet, startEpoch) deal := dh.StartDeal(ctx, fcid, fastRet, startEpoch)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
WaitDealSealed(t, ctx, miner, client, deal, false) dh.WaitDealSealed(ctx, deal, false)
// Retrieval // Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal) info, err := dh.client.ClientGetDealInfo(ctx, *deal)
require.NoError(t, err) require.NoError(dh.t, err)
TestRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data) dh.TestRetrieval(ctx, fcid, &info.PieceCID, carExport, data)
} }
func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) { func (dh *DealHarness) StartDeal(ctx context.Context, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
data := make([]byte, 1600) maddr, err := dh.miner.ActorAddress(ctx)
rand.New(rand.NewSource(int64(rseed))).Read(data)
dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
if err != nil { if err != nil {
return nil, nil, err dh.t.Fatal(err)
} }
path := filepath.Join(dir, "sourcefile.dat") addr, err := dh.client.WalletDefaultAddress(ctx)
err = ioutil.WriteFile(path, data, 0644)
if err != nil { if err != nil {
return nil, nil, err dh.t.Fatal(err)
} }
deal, err := dh.client.ClientStartDeal(ctx, &api.StartDealParams{
res, err := client.ClientImport(ctx, api.FileRef{Path: path})
if err != nil {
return nil, nil, err
}
return res, data, nil
}
func StartDeal(t *testing.T, ctx context.Context, miner TestMiner, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
addr, err := client.WalletDefaultAddress(ctx)
if err != nil {
t.Fatal(err)
}
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
Data: &storagemarket.DataRef{ Data: &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync, TransferType: storagemarket.TTGraphsync,
Root: fcid, Root: fcid,
@ -96,30 +88,30 @@ func StartDeal(t *testing.T, ctx context.Context, miner TestMiner, client api.Fu
FastRetrieval: fastRet, FastRetrieval: fastRet,
}) })
if err != nil { if err != nil {
t.Fatalf("%+v", err) dh.t.Fatalf("%+v", err)
} }
return deal return deal
} }
func WaitDealSealed(t *testing.T, ctx context.Context, miner TestMiner, client api.FullNode, deal *cid.Cid, noseal bool) { func (dh *DealHarness) WaitDealSealed(ctx context.Context, deal *cid.Cid, noseal bool) {
loop: loop:
for { for {
di, err := client.ClientGetDealInfo(ctx, *deal) di, err := dh.client.ClientGetDealInfo(ctx, *deal)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
switch di.State { switch di.State {
case storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing: case storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing:
if noseal { if noseal {
return return
} }
StartSealingWaiting(t, ctx, miner) dh.StartSealingWaiting(ctx)
case storagemarket.StorageDealProposalRejected: case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected") dh.t.Fatal("deal rejected")
case storagemarket.StorageDealFailing: case storagemarket.StorageDealFailing:
t.Fatal("deal failed") dh.t.Fatal("deal failed")
case storagemarket.StorageDealError: case storagemarket.StorageDealError:
t.Fatal("deal errored", di.Message) dh.t.Fatal("deal errored", di.Message)
case storagemarket.StorageDealActive: case storagemarket.StorageDealActive:
fmt.Println("COMPLETE", di) fmt.Println("COMPLETE", di)
break loop break loop
@ -129,26 +121,26 @@ loop:
} }
} }
func WaitDealPublished(t *testing.T, ctx context.Context, miner TestMiner, deal *cid.Cid) { func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
updates, err := miner.MarketGetDealUpdates(subCtx) updates, err := dh.miner.MarketGetDealUpdates(subCtx)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("context timeout") dh.t.Fatal("context timeout")
case di := <-updates: case di := <-updates:
if deal.Equals(di.ProposalCid) { if deal.Equals(di.ProposalCid) {
switch di.State { switch di.State {
case storagemarket.StorageDealProposalRejected: case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected") dh.t.Fatal("deal rejected")
case storagemarket.StorageDealFailing: case storagemarket.StorageDealFailing:
t.Fatal("deal failed") dh.t.Fatal("deal failed")
case storagemarket.StorageDealError: case storagemarket.StorageDealError:
t.Fatal("deal errored", di.Message) dh.t.Fatal("deal errored", di.Message)
case storagemarket.StorageDealFinalizing, storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing, storagemarket.StorageDealActive: case storagemarket.StorageDealFinalizing, storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing, storagemarket.StorageDealActive:
fmt.Println("COMPLETE", di) fmt.Println("COMPLETE", di)
return return
@ -159,96 +151,96 @@ func WaitDealPublished(t *testing.T, ctx context.Context, miner TestMiner, deal
} }
} }
func StartSealingWaiting(t *testing.T, ctx context.Context, miner TestMiner) { func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
snums, err := miner.SectorsList(ctx) snums, err := dh.miner.SectorsList(ctx)
require.NoError(t, err) require.NoError(dh.t, err)
for _, snum := range snums { for _, snum := range snums {
si, err := miner.SectorsStatus(ctx, snum, false) si, err := dh.miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err) require.NoError(dh.t, err)
t.Logf("Sector state: %s", si.State) dh.t.Logf("Sector state: %s", si.State)
if si.State == api.SectorState(sealing.WaitDeals) { if si.State == api.SectorState(sealing.WaitDeals) {
require.NoError(t, miner.SectorStartSealing(ctx, snum)) require.NoError(dh.t, dh.miner.SectorStartSealing(ctx, snum))
} }
} }
} }
func TestRetrieval(t *testing.T, ctx context.Context, client api.FullNode, fcid cid.Cid, piece *cid.Cid, carExport bool, data []byte) { func (dh *DealHarness) TestRetrieval(ctx context.Context, fcid cid.Cid, piece *cid.Cid, carExport bool, data []byte) {
offers, err := client.ClientFindData(ctx, fcid, piece) offers, err := dh.client.ClientFindData(ctx, fcid, piece)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
if len(offers) < 1 { if len(offers) < 1 {
t.Fatal("no offers") dh.t.Fatal("no offers")
} }
rpath, err := ioutil.TempDir("", "lotus-retrieve-test-") rpath, err := ioutil.TempDir("", "lotus-retrieve-test-")
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
defer os.RemoveAll(rpath) //nolint:errcheck defer os.RemoveAll(rpath) //nolint:errcheck
caddr, err := client.WalletDefaultAddress(ctx) caddr, err := dh.client.WalletDefaultAddress(ctx)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
ref := &api.FileRef{ ref := &api.FileRef{
Path: filepath.Join(rpath, "ret"), Path: filepath.Join(rpath, "ret"),
IsCAR: carExport, IsCAR: carExport,
} }
updates, err := client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref) updates, err := dh.client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
for update := range updates { for update := range updates {
if update.Err != "" { if update.Err != "" {
t.Fatalf("retrieval failed: %s", update.Err) dh.t.Fatalf("retrieval failed: %s", update.Err)
} }
} }
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret")) rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
if carExport { if carExport {
rdata = ExtractCarData(t, ctx, rdata, rpath) rdata = dh.ExtractCarData(ctx, rdata, rpath)
} }
if !bytes.Equal(rdata, data) { if !bytes.Equal(rdata, data) {
t.Fatal("wrong data retrieved") dh.t.Fatal("wrong data retrieved")
} }
} }
func ExtractCarData(t *testing.T, ctx context.Context, rdata []byte, rpath string) []byte { func (dh *DealHarness) ExtractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
bserv := dstest.Bserv() bserv := dstest.Bserv()
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata)) ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
b, err := bserv.GetBlock(ctx, ch.Roots[0]) b, err := bserv.GetBlock(ctx, ch.Roots[0])
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
nd, err := ipld.Decode(b) nd, err := ipld.Decode(b)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
dserv := dag.NewDAGService(bserv) dserv := dag.NewDAGService(bserv)
fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd) fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
outPath := filepath.Join(rpath, "retLoadedCAR") outPath := filepath.Join(rpath, "retLoadedCAR")
if err := files.WriteTo(fil, outPath); err != nil { if err := files.WriteTo(fil, outPath); err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
rdata, err = ioutil.ReadFile(outPath) rdata, err = ioutil.ReadFile(outPath)
if err != nil { if err != nil {
t.Fatal(err) dh.t.Fatal(err)
} }
return rdata return rdata
} }
@ -260,34 +252,25 @@ type DealsScaffold struct {
BlockMiner *BlockMiner BlockMiner *BlockMiner
} }
func SetupOneClientOneMiner(t *testing.T, b APIBuilder, blocktime time.Duration) *DealsScaffold { func ConnectAndStartMining(t *testing.T, blocktime time.Duration, miner TestMiner, clients ...*impl.FullNodeAPI) *BlockMiner {
n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
return ConnectAndStartMining(t, blocktime, client, miner)
}
func ConnectAndStartMining(t *testing.T, blocktime time.Duration, client *impl.FullNodeAPI, miner TestMiner) *DealsScaffold {
ctx := context.Background() ctx := context.Background()
addrinfo, err := client.NetAddrsListen(ctx)
if err != nil { for _, c := range clients {
t.Fatal(err) addrinfo, err := c.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
} }
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second) time.Sleep(time.Second)
blockMiner := NewBlockMiner(t, miner) blockMiner := NewBlockMiner(t, miner)
blockMiner.MineBlocks(ctx, blocktime) blockMiner.MineBlocks(ctx, blocktime)
return &DealsScaffold{ return blockMiner
Ctx: ctx,
Client: client,
Miner: miner,
BlockMiner: blockMiner,
}
} }
type TestDealState int type TestDealState int

View File

@ -7,11 +7,13 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func SendFunds(ctx context.Context, t *testing.T, sender TestFullNode, addr address.Address, amount abi.TokenAmount) { // SendFunds sends funds from the default wallet of the specified sender node
// to the recipient address.
func SendFunds(ctx context.Context, t *testing.T, sender TestFullNode, recipient address.Address, amount abi.TokenAmount) {
senderAddr, err := sender.WalletDefaultAddress(ctx) senderAddr, err := sender.WalletDefaultAddress(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -19,7 +21,7 @@ func SendFunds(ctx context.Context, t *testing.T, sender TestFullNode, addr addr
msg := &types.Message{ msg := &types.Message{
From: senderAddr, From: senderAddr,
To: addr, To: recipient,
Value: amount, Value: amount,
} }
@ -27,7 +29,7 @@ func SendFunds(ctx context.Context, t *testing.T, sender TestFullNode, addr addr
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
res, err := sender.StateWaitMsg(ctx, sm.Cid(), 3, lapi.LookbackNoLimit, true) res, err := sender.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -43,7 +43,6 @@ import (
lotusminer "github.com/filecoin-project/lotus/miner" lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
testing2 "github.com/filecoin-project/lotus/node/modules/testing" testing2 "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/mockstorage" "github.com/filecoin-project/lotus/storage/mockstorage"
@ -224,6 +223,7 @@ func mockBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []StorageMin
fulls := make([]TestFullNode, len(fullOpts)) fulls := make([]TestFullNode, len(fullOpts))
miners := make([]TestMiner, len(storage)) miners := make([]TestMiner, len(storage))
// *****
pk, _, err := crypto.GenerateEd25519Key(rand.Reader) pk, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err) require.NoError(t, err)
@ -235,13 +235,17 @@ func mockBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []StorageMin
if len(storage) > 1 { if len(storage) > 1 {
panic("need more peer IDs") panic("need more peer IDs")
} }
// *****
// PRESEAL SECTION, TRY TO REPLACE WITH BETTER IN THE FUTURE // PRESEAL SECTION, TRY TO REPLACE WITH BETTER IN THE FUTURE
// TODO: would be great if there was a better way to fake the preseals // TODO: would be great if there was a better way to fake the preseals
var genms []genesis.Miner var (
var maddrs []address.Address genms []genesis.Miner
var genaccs []genesis.Actor maddrs []address.Address
var keys []*wallet.Key genaccs []genesis.Actor
keys []*wallet.Key
)
var presealDirs []string var presealDirs []string
for i := 0; i < len(storage); i++ { for i := 0; i < len(storage); i++ {
@ -395,11 +399,13 @@ func mockMinerBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []Stora
// PRESEAL SECTION, TRY TO REPLACE WITH BETTER IN THE FUTURE // PRESEAL SECTION, TRY TO REPLACE WITH BETTER IN THE FUTURE
// TODO: would be great if there was a better way to fake the preseals // TODO: would be great if there was a better way to fake the preseals
var genms []genesis.Miner var (
var genaccs []genesis.Actor genms []genesis.Miner
var maddrs []address.Address genaccs []genesis.Actor
var keys []*wallet.Key maddrs []address.Address
var pidKeys []crypto.PrivKey keys []*wallet.Key
pidKeys []crypto.PrivKey
)
for i := 0; i < len(storage); i++ { for i := 0; i < len(storage); i++ {
maddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(i)) maddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(i))
if err != nil { if err != nil {
@ -468,9 +474,6 @@ func mockMinerBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []Stora
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
// so that we subscribe to pubsub topics immediately
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
genesis, genesis,
fullOpts[i].Opts(fulls), fullOpts[i].Opts(fulls),