This commit is contained in:
Anton Evangelatov 2021-06-18 18:40:33 +02:00
parent 89df3cc207
commit 577b730415
10 changed files with 228 additions and 86 deletions

View File

@ -58,7 +58,7 @@ func TestBatchDealInput(t *testing.T) {
))
client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs(), opts)
ens.InterconnectAll().BeginMining(blockTime)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
err := miner.MarketSetAsk(ctx, big.Zero(), big.Zero(), 200, 128, 32<<30)
require.NoError(t, err)

View File

@ -62,7 +62,7 @@ func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) {
err = miner.SectorMarkForUpgrade(ctx, sl[0])
require.NoError(t, err)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
dh.MakeOnlineDeal(context.Background(), 6, false, 0)

View File

@ -29,7 +29,7 @@ func TestDealCyclesConcurrent(t *testing.T) {
kit2.QuietMiningLogs()
blockTime := 10 * time.Millisecond
//blockTime := 10 * time.Millisecond
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
@ -37,9 +37,11 @@ func TestDealCyclesConcurrent(t *testing.T) {
startEpoch := abi.ChainEpoch(2 << 12)
runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) {
client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
dh := kit2.NewDealHarness(t, client, miner)
api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me
client, main, market, _ := kit2.EnsembleWithMarket(t, kit2.MockProofs(), kit2.ThroughRPC())
dh := kit2.NewDealHarness(t, client, main, market)
runConcurrentDeals(t, dh, fullDealCyclesOpts{
n: n,
@ -49,13 +51,14 @@ func TestDealCyclesConcurrent(t *testing.T) {
})
}
cycles := []int{1, 2, 4, 8}
//cycles := []int{1, 2, 4, 8}
cycles := []int{1}
for _, n := range cycles {
n := n
ns := fmt.Sprintf("%d", n)
t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) })
t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) })
t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) })
//t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) })
//t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) })
//t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) })
t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
}
}
@ -99,7 +102,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
client, miner, ens := kit2.EnsembleMinimal(t, kit2.ThroughRPC()) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
t.Run("stdretrieval", func(t *testing.T) {
runConcurrentDeals(t, dh, fullDealCyclesOpts{n: 1})
@ -135,7 +138,7 @@ func TestPublishDealsBatching(t *testing.T) {
client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs(), kit2.ConstructorOpts(opts))
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
@ -226,7 +229,7 @@ func TestFirstDealEnablesMining(t *testing.T) {
ctx := context.Background()
dh := kit2.NewDealHarness(t, &client, &provider)
dh := kit2.NewDealHarness(t, &client, &provider, &provider)
ref, _ := client.CreateImportFile(ctx, 5, 0)
@ -269,7 +272,7 @@ func TestOfflineDealFlow(t *testing.T) {
client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs())
ens.InterconnectAll().BeginMining(blocktime)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
// Create a random file and import on the client.
res, inFile := client.CreateImportFile(ctx, 1, 0)
@ -364,7 +367,7 @@ func TestZeroPricePerByteRetrieval(t *testing.T) {
err = miner.MarketSetRetrievalAsk(ctx, ask)
require.NoError(t, err)
dh := kit2.NewDealHarness(t, client, miner)
dh := kit2.NewDealHarness(t, client, miner, miner)
runConcurrentDeals(t, dh, fullDealCyclesOpts{
n: 1,
startEpoch: startEpoch,

View File

@ -25,17 +25,19 @@ import (
)
type DealHarness struct {
t *testing.T
client *TestFullNode
miner *TestMiner
t *testing.T
client *TestFullNode
storageMiner *TestMiner
marketMiner *TestMiner
}
// NewDealHarness creates a test harness that contains testing utilities for deals.
func NewDealHarness(t *testing.T, client *TestFullNode, miner *TestMiner) *DealHarness {
func NewDealHarness(t *testing.T, client *TestFullNode, storageMiner *TestMiner, marketMiner *TestMiner) *DealHarness {
return &DealHarness{
t: t,
client: client,
miner: miner,
t: t,
client: client,
storageMiner: storageMiner,
marketMiner: marketMiner,
}
}
@ -60,7 +62,7 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, rseed int, fastRet bo
// StartDeal starts a storage deal between the client and the miner.
func (dh *DealHarness) StartDeal(ctx context.Context, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
maddr, err := dh.miner.ActorAddress(ctx)
maddr, err := dh.storageMiner.ActorAddress(ctx)
require.NoError(dh.t, err)
addr, err := dh.client.WalletDefaultAddress(ctx)
@ -109,7 +111,7 @@ loop:
break loop
}
mds, err := dh.miner.MarketListIncompleteDeals(ctx)
mds, err := dh.marketMiner.MarketListIncompleteDeals(ctx)
require.NoError(dh.t, err)
var minerState storagemarket.StorageDealStatus
@ -133,7 +135,7 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
updates, err := dh.miner.MarketGetDealUpdates(subCtx)
updates, err := dh.storageMiner.MarketGetDealUpdates(subCtx)
require.NoError(dh.t, err)
for {
@ -160,19 +162,19 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
}
func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
snums, err := dh.miner.SectorsList(ctx)
snums, err := dh.storageMiner.SectorsList(ctx)
require.NoError(dh.t, err)
for _, snum := range snums {
si, err := dh.miner.SectorsStatus(ctx, snum, false)
si, err := dh.storageMiner.SectorsStatus(ctx, snum, false)
require.NoError(dh.t, err)
dh.t.Logf("Sector state: %s", si.State)
if si.State == api.SectorState(sealing.WaitDeals) {
require.NoError(dh.t, dh.miner.SectorStartSealing(ctx, snum))
require.NoError(dh.t, dh.storageMiner.SectorStartSealing(ctx, snum))
}
dh.miner.FlushSealingBatches(ctx)
dh.storageMiner.FlushSealingBatches(ctx)
}
}

View File

@ -4,7 +4,9 @@ import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"net"
"sync"
"testing"
"time"
@ -28,12 +30,12 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/genesis"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
@ -191,6 +193,10 @@ func (n *Ensemble) Miner(miner *TestMiner, full *TestFullNode, opts ...NodeOpt)
actorAddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(minerCnt))
require.NoError(n.t, err)
if options.mainMiner != nil {
actorAddr = options.mainMiner.ActorAddr
}
ownerKey := options.ownerKey
if !n.bootstrapped {
var (
@ -225,13 +231,17 @@ func (n *Ensemble) Miner(miner *TestMiner, full *TestFullNode, opts ...NodeOpt)
require.NotNil(n.t, ownerKey, "worker key can't be null if initializing a miner after genesis")
}
rl, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(n.t, err)
*miner = TestMiner{
t: n.t,
ActorAddr: actorAddr,
OwnerKey: ownerKey,
FullNode: full,
PresealDir: tdir,
options: options,
t: n.t,
ActorAddr: actorAddr,
OwnerKey: ownerKey,
FullNode: full,
PresealDir: tdir,
options: options,
RemoteListener: rl,
}
miner.Libp2p.PeerID = peerId
@ -328,39 +338,59 @@ func (n *Ensemble) Start() *Ensemble {
// Create all inactive miners.
for i, m := range n.inactive.miners {
if n.bootstrapped {
// this is a miner created after genesis, so it won't have a preseal.
// we need to create it on chain.
params, aerr := actors.SerializeParams(&power2.CreateMinerParams{
Owner: m.OwnerKey.Address,
Worker: m.OwnerKey.Address,
SealProofType: n.options.proofType,
Peer: abi.PeerID(m.Libp2p.PeerID),
})
require.NoError(n.t, aerr)
if m.options.mainMiner == nil {
// this is a miner created after genesis, so it won't have a preseal.
// we need to create it on chain.
params, aerr := actors.SerializeParams(&power2.CreateMinerParams{
Owner: m.OwnerKey.Address,
Worker: m.OwnerKey.Address,
SealProofType: n.options.proofType,
Peer: abi.PeerID(m.Libp2p.PeerID),
})
require.NoError(n.t, aerr)
createStorageMinerMsg := &types.Message{
From: m.OwnerKey.Address,
To: power.Address,
Value: big.Zero(),
createStorageMinerMsg := &types.Message{
From: m.OwnerKey.Address,
To: power.Address,
Value: big.Zero(),
Method: power.Methods.CreateMiner,
Params: params,
Method: power.Methods.CreateMiner,
Params: params,
GasLimit: 0,
GasPremium: big.NewInt(5252),
GasLimit: 0,
GasPremium: big.NewInt(5252),
}
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
require.NoError(n.t, err)
mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
var retval power2.CreateMinerReturn
err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))
require.NoError(n.t, err, "failed to create miner")
m.ActorAddr = retval.IDAddress
} else {
params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
msg := &types.Message{
To: m.options.mainMiner.ActorAddr,
From: m.options.mainMiner.OwnerKey.Address,
Method: miner.Methods.ChangePeerID,
Params: params,
Value: types.NewInt(0),
}
signed, err2 := m.FullNode.FullNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(n.t, err2)
mw, err2 := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err2)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
}
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
require.NoError(n.t, err)
mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
var retval power2.CreateMinerReturn
err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))
require.NoError(n.t, err, "failed to create miner")
m.ActorAddr = retval.IDAddress
}
has, err := m.FullNode.WalletHas(ctx, m.OwnerKey.Address)
@ -382,6 +412,33 @@ func (n *Ensemble) Start() *Ensemble {
lr, err := r.Lock(repo.StorageMiner)
require.NoError(n.t, err)
c, err := lr.Config()
require.NoError(n.t, err)
cfg, ok := c.(*config.StorageMiner)
if !ok {
n.t.Fatalf("invalid config from repo, got: %T", c)
}
cfg.Common.API.RemoteListenAddress = m.RemoteListener.Addr().String()
cfg.Subsystems.EnableStorageMarket = m.options.subsystems.Has(SStorageMarket)
cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining)
cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing)
cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage)
if m.options.mainMiner != nil {
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:3])
require.NoError(n.t, err)
cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
}
err = lr.SetConfig(func(raw interface{}) {
rcfg := raw.(*config.StorageMiner)
*rcfg = *cfg
})
require.NoError(n.t, err)
ks, err := lr.KeyStore()
require.NoError(n.t, err)
@ -411,20 +468,22 @@ func (n *Ensemble) Start() *Ensemble {
err = lr.Close()
require.NoError(n.t, err)
enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
if m.options.mainMiner == nil {
enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
msg := &types.Message{
From: m.OwnerKey.Address,
To: m.ActorAddr,
Method: miner.Methods.ChangePeerID,
Params: enc,
Value: types.NewInt(0),
msg := &types.Message{
From: m.OwnerKey.Address,
To: m.ActorAddr,
Method: miner.Methods.ChangePeerID,
Params: enc,
Value: types.NewInt(0),
}
_, err2 := m.FullNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(n.t, err2)
}
_, err = m.FullNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(n.t, err)
var mineBlock = make(chan lotusminer.MineReq)
opts := []node.Option{
node.StorageMiner(&m.StorageMiner),
@ -458,16 +517,8 @@ func (n *Ensemble) Start() *Ensemble {
if n.options.mockProofs {
opts = append(opts,
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(presealSectors), nil
}),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(ffiwrapper.Prover), mock.MockProver),
node.Unset(new(*sectorstorage.Manager)),
)
}

View File

@ -1,6 +1,9 @@
package kit2
import "testing"
import (
"testing"
"time"
)
// EnsembleMinimal creates and starts an Ensemble with a single full node and a single miner.
// It does not interconnect nodes nor does it begin mining.
@ -18,6 +21,29 @@ func EnsembleMinimal(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMin
return &full, &miner, ens
}
func EnsembleWithMarket(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) {
eopts, nopts := siftOptions(t, opts)
var (
fullnode TestFullNode
main, market TestMiner
)
mainNodeOpts := []NodeOpt{WithSubsystem(SSealing), WithSubsystem(SSectorStorage), WithSubsystem(SMining)}
mainNodeOpts = append(mainNodeOpts, nopts...)
blockTime := 100 * time.Millisecond
ens := NewEnsemble(t, eopts...).FullNode(&fullnode, nopts...).Miner(&main, &fullnode, mainNodeOpts...).Start()
ens.InterconnectAll().BeginMining(blockTime)
marketNodeOpts := []NodeOpt{OwnerAddr(fullnode.DefaultKey), MainMiner(&main), WithSubsystem(SStorageMarket)}
marketNodeOpts = append(marketNodeOpts, nopts...)
ens.Miner(&market, &fullnode, marketNodeOpts...).Start().InterconnectAll()
return &fullnode, &main, &market, ens
}
// EnsembleTwoOne creates and starts an Ensemble with two full nodes and one miner.
// It does not interconnect nodes nor does it begin mining.
//

View File

@ -3,6 +3,7 @@ package kit2
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
@ -20,6 +21,35 @@ import (
"github.com/stretchr/testify/require"
)
type MinerSubsystem int
const (
SStorageMarket MinerSubsystem = 1 << iota
SMining
SSealing
SSectorStorage
MinerSubsystems = iota
)
func (ms MinerSubsystem) Add(single MinerSubsystem) MinerSubsystem {
return ms | single
}
func (ms MinerSubsystem) Has(single MinerSubsystem) bool {
return ms&single == single
}
func (ms MinerSubsystem) All() [MinerSubsystems]bool {
var out [MinerSubsystems]bool
for i := range out {
out[i] = ms&(1<<i) > 0
}
return out
}
// TestMiner represents a miner enrolled in an Ensemble.
type TestMiner struct {
api.StorageMiner
@ -43,6 +73,8 @@ type TestMiner struct {
PrivKey libp2pcrypto.PrivKey
}
RemoteListener net.Listener
options nodeOpts
}

View File

@ -23,6 +23,9 @@ type nodeOpts struct {
rpc bool
ownerKey *wallet.Key
extraNodeOpts []node.Option
subsystems MinerSubsystem
mainMiner *TestMiner
}
// DefaultNodeOpts are the default options that will be applied to test nodes.
@ -34,6 +37,31 @@ var DefaultNodeOpts = nodeOpts{
// NodeOpt is a functional option for test nodes.
type NodeOpt func(opts *nodeOpts) error
func WithAllSubsystems() NodeOpt {
return func(opts *nodeOpts) error {
opts.subsystems = opts.subsystems.Add(SStorageMarket)
opts.subsystems = opts.subsystems.Add(SMining)
opts.subsystems = opts.subsystems.Add(SSealing)
opts.subsystems = opts.subsystems.Add(SSectorStorage)
return nil
}
}
func WithSubsystem(single MinerSubsystem) NodeOpt {
return func(opts *nodeOpts) error {
opts.subsystems = opts.subsystems.Add(single)
return nil
}
}
func MainMiner(m *TestMiner) NodeOpt {
return func(opts *nodeOpts) error {
opts.mainMiner = m
return nil
}
}
// OwnerBalance specifies the balance to be attributed to a miner's owner
// account. Only relevant when creating a miner.
func OwnerBalance(balance abi.TokenAmount) NodeOpt {

View File

@ -131,7 +131,7 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) {
m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
if !permissioned {
return rpcServer, nil
return m, nil
}
ah := &auth.Handler{