From 577b73041577b034598c51ffb596ff2b38a0345f Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Fri, 18 Jun 2021 18:40:33 +0200 Subject: [PATCH] wip --- itests/batch_deal_test.go | 2 +- itests/ccupgrade_test.go | 2 +- itests/deals_test.go | 29 ++-- itests/kit2/deals.go | 30 ++-- itests/kit2/ensemble.go | 161 ++++++++++++++------- itests/kit2/ensemble_presets.go | 28 +++- itests/kit2/node_miner.go | 32 ++++ itests/kit2/node_opts.go | 28 ++++ itests/{wdpost_test.go => wdpost_test.god} | 0 node/rpc.go | 2 +- 10 files changed, 228 insertions(+), 86 deletions(-) rename itests/{wdpost_test.go => wdpost_test.god} (100%) diff --git a/itests/batch_deal_test.go b/itests/batch_deal_test.go index 9cc4d7ac1..6dc04c65d 100644 --- a/itests/batch_deal_test.go +++ b/itests/batch_deal_test.go @@ -58,7 +58,7 @@ func TestBatchDealInput(t *testing.T) { )) client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs(), opts) ens.InterconnectAll().BeginMining(blockTime) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) err := miner.MarketSetAsk(ctx, big.Zero(), big.Zero(), 200, 128, 32<<30) require.NoError(t, err) diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index 2c35b425d..14b44291b 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -62,7 +62,7 @@ func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) { err = miner.SectorMarkForUpgrade(ctx, sl[0]) require.NoError(t, err) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) dh.MakeOnlineDeal(context.Background(), 6, false, 0) diff --git a/itests/deals_test.go b/itests/deals_test.go index af0ef68c4..ded4ec136 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -29,7 +29,7 @@ func TestDealCyclesConcurrent(t *testing.T) { kit2.QuietMiningLogs() - blockTime := 10 * time.Millisecond + //blockTime := 10 * time.Millisecond // For these tests where the block time is artificially short, just use // a deal start epoch that is guaranteed to be far enough in the future @@ -37,9 +37,11 @@ func TestDealCyclesConcurrent(t *testing.T) { startEpoch := abi.ChainEpoch(2 << 12) runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) { - client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) - dh := kit2.NewDealHarness(t, client, miner) + api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me + + client, main, market, _ := kit2.EnsembleWithMarket(t, kit2.MockProofs(), kit2.ThroughRPC()) + + dh := kit2.NewDealHarness(t, client, main, market) runConcurrentDeals(t, dh, fullDealCyclesOpts{ n: n, @@ -49,13 +51,14 @@ func TestDealCyclesConcurrent(t *testing.T) { }) } - cycles := []int{1, 2, 4, 8} + //cycles := []int{1, 2, 4, 8} + cycles := []int{1} for _, n := range cycles { n := n ns := fmt.Sprintf("%d", n) - t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) }) - t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) }) - t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) }) + //t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) }) + //t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) }) + //t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) }) t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) } } @@ -99,7 +102,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) { client, miner, ens := kit2.EnsembleMinimal(t, kit2.ThroughRPC()) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) t.Run("stdretrieval", func(t *testing.T) { runConcurrentDeals(t, dh, fullDealCyclesOpts{n: 1}) @@ -135,7 +138,7 @@ func TestPublishDealsBatching(t *testing.T) { client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs(), kit2.ConstructorOpts(opts)) ens.InterconnectAll().BeginMining(10 * time.Millisecond) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) // Starts a deal and waits until it's published runDealTillPublish := func(rseed int) { @@ -226,7 +229,7 @@ func TestFirstDealEnablesMining(t *testing.T) { ctx := context.Background() - dh := kit2.NewDealHarness(t, &client, &provider) + dh := kit2.NewDealHarness(t, &client, &provider, &provider) ref, _ := client.CreateImportFile(ctx, 5, 0) @@ -269,7 +272,7 @@ func TestOfflineDealFlow(t *testing.T) { client, miner, ens := kit2.EnsembleMinimal(t, kit2.MockProofs()) ens.InterconnectAll().BeginMining(blocktime) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) // Create a random file and import on the client. res, inFile := client.CreateImportFile(ctx, 1, 0) @@ -364,7 +367,7 @@ func TestZeroPricePerByteRetrieval(t *testing.T) { err = miner.MarketSetRetrievalAsk(ctx, ask) require.NoError(t, err) - dh := kit2.NewDealHarness(t, client, miner) + dh := kit2.NewDealHarness(t, client, miner, miner) runConcurrentDeals(t, dh, fullDealCyclesOpts{ n: 1, startEpoch: startEpoch, diff --git a/itests/kit2/deals.go b/itests/kit2/deals.go index 2e015a9c7..87fcca28a 100644 --- a/itests/kit2/deals.go +++ b/itests/kit2/deals.go @@ -25,17 +25,19 @@ import ( ) type DealHarness struct { - t *testing.T - client *TestFullNode - miner *TestMiner + t *testing.T + client *TestFullNode + storageMiner *TestMiner + marketMiner *TestMiner } // NewDealHarness creates a test harness that contains testing utilities for deals. -func NewDealHarness(t *testing.T, client *TestFullNode, miner *TestMiner) *DealHarness { +func NewDealHarness(t *testing.T, client *TestFullNode, storageMiner *TestMiner, marketMiner *TestMiner) *DealHarness { return &DealHarness{ - t: t, - client: client, - miner: miner, + t: t, + client: client, + storageMiner: storageMiner, + marketMiner: marketMiner, } } @@ -60,7 +62,7 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, rseed int, fastRet bo // StartDeal starts a storage deal between the client and the miner. func (dh *DealHarness) StartDeal(ctx context.Context, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid { - maddr, err := dh.miner.ActorAddress(ctx) + maddr, err := dh.storageMiner.ActorAddress(ctx) require.NoError(dh.t, err) addr, err := dh.client.WalletDefaultAddress(ctx) @@ -109,7 +111,7 @@ loop: break loop } - mds, err := dh.miner.MarketListIncompleteDeals(ctx) + mds, err := dh.marketMiner.MarketListIncompleteDeals(ctx) require.NoError(dh.t, err) var minerState storagemarket.StorageDealStatus @@ -133,7 +135,7 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) { subCtx, cancel := context.WithCancel(ctx) defer cancel() - updates, err := dh.miner.MarketGetDealUpdates(subCtx) + updates, err := dh.storageMiner.MarketGetDealUpdates(subCtx) require.NoError(dh.t, err) for { @@ -160,19 +162,19 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) { } func (dh *DealHarness) StartSealingWaiting(ctx context.Context) { - snums, err := dh.miner.SectorsList(ctx) + snums, err := dh.storageMiner.SectorsList(ctx) require.NoError(dh.t, err) for _, snum := range snums { - si, err := dh.miner.SectorsStatus(ctx, snum, false) + si, err := dh.storageMiner.SectorsStatus(ctx, snum, false) require.NoError(dh.t, err) dh.t.Logf("Sector state: %s", si.State) if si.State == api.SectorState(sealing.WaitDeals) { - require.NoError(dh.t, dh.miner.SectorStartSealing(ctx, snum)) + require.NoError(dh.t, dh.storageMiner.SectorStartSealing(ctx, snum)) } - dh.miner.FlushSealingBatches(ctx) + dh.storageMiner.FlushSealingBatches(ctx) } } diff --git a/itests/kit2/ensemble.go b/itests/kit2/ensemble.go index ad3b0107f..6e31aa742 100644 --- a/itests/kit2/ensemble.go +++ b/itests/kit2/ensemble.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "crypto/rand" + "fmt" "io/ioutil" + "net" "sync" "testing" "time" @@ -28,12 +30,12 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" - sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/mock" "github.com/filecoin-project/lotus/genesis" lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" @@ -191,6 +193,10 @@ func (n *Ensemble) Miner(miner *TestMiner, full *TestFullNode, opts ...NodeOpt) actorAddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(minerCnt)) require.NoError(n.t, err) + if options.mainMiner != nil { + actorAddr = options.mainMiner.ActorAddr + } + ownerKey := options.ownerKey if !n.bootstrapped { var ( @@ -225,13 +231,17 @@ func (n *Ensemble) Miner(miner *TestMiner, full *TestFullNode, opts ...NodeOpt) require.NotNil(n.t, ownerKey, "worker key can't be null if initializing a miner after genesis") } + rl, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(n.t, err) + *miner = TestMiner{ - t: n.t, - ActorAddr: actorAddr, - OwnerKey: ownerKey, - FullNode: full, - PresealDir: tdir, - options: options, + t: n.t, + ActorAddr: actorAddr, + OwnerKey: ownerKey, + FullNode: full, + PresealDir: tdir, + options: options, + RemoteListener: rl, } miner.Libp2p.PeerID = peerId @@ -328,39 +338,59 @@ func (n *Ensemble) Start() *Ensemble { // Create all inactive miners. for i, m := range n.inactive.miners { if n.bootstrapped { - // this is a miner created after genesis, so it won't have a preseal. - // we need to create it on chain. - params, aerr := actors.SerializeParams(&power2.CreateMinerParams{ - Owner: m.OwnerKey.Address, - Worker: m.OwnerKey.Address, - SealProofType: n.options.proofType, - Peer: abi.PeerID(m.Libp2p.PeerID), - }) - require.NoError(n.t, aerr) + if m.options.mainMiner == nil { + // this is a miner created after genesis, so it won't have a preseal. + // we need to create it on chain. + params, aerr := actors.SerializeParams(&power2.CreateMinerParams{ + Owner: m.OwnerKey.Address, + Worker: m.OwnerKey.Address, + SealProofType: n.options.proofType, + Peer: abi.PeerID(m.Libp2p.PeerID), + }) + require.NoError(n.t, aerr) - createStorageMinerMsg := &types.Message{ - From: m.OwnerKey.Address, - To: power.Address, - Value: big.Zero(), + createStorageMinerMsg := &types.Message{ + From: m.OwnerKey.Address, + To: power.Address, + Value: big.Zero(), - Method: power.Methods.CreateMiner, - Params: params, + Method: power.Methods.CreateMiner, + Params: params, - GasLimit: 0, - GasPremium: big.NewInt(5252), + GasLimit: 0, + GasPremium: big.NewInt(5252), + } + signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil) + require.NoError(n.t, err) + + mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) + require.NoError(n.t, err) + require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode) + + var retval power2.CreateMinerReturn + err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)) + require.NoError(n.t, err, "failed to create miner") + + m.ActorAddr = retval.IDAddress + } else { + params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)}) + require.NoError(n.t, err) + + msg := &types.Message{ + To: m.options.mainMiner.ActorAddr, + From: m.options.mainMiner.OwnerKey.Address, + Method: miner.Methods.ChangePeerID, + Params: params, + Value: types.NewInt(0), + } + + signed, err2 := m.FullNode.FullNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(n.t, err2) + + mw, err2 := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) + require.NoError(n.t, err2) + require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode) } - signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil) - require.NoError(n.t, err) - - mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) - require.NoError(n.t, err) - require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode) - - var retval power2.CreateMinerReturn - err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)) - require.NoError(n.t, err, "failed to create miner") - - m.ActorAddr = retval.IDAddress } has, err := m.FullNode.WalletHas(ctx, m.OwnerKey.Address) @@ -382,6 +412,33 @@ func (n *Ensemble) Start() *Ensemble { lr, err := r.Lock(repo.StorageMiner) require.NoError(n.t, err) + c, err := lr.Config() + require.NoError(n.t, err) + + cfg, ok := c.(*config.StorageMiner) + if !ok { + n.t.Fatalf("invalid config from repo, got: %T", c) + } + cfg.Common.API.RemoteListenAddress = m.RemoteListener.Addr().String() + cfg.Subsystems.EnableStorageMarket = m.options.subsystems.Has(SStorageMarket) + cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining) + cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing) + cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage) + + if m.options.mainMiner != nil { + token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:3]) + require.NoError(n.t, err) + + cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) + cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) + } + + err = lr.SetConfig(func(raw interface{}) { + rcfg := raw.(*config.StorageMiner) + *rcfg = *cfg + }) + require.NoError(n.t, err) + ks, err := lr.KeyStore() require.NoError(n.t, err) @@ -411,20 +468,22 @@ func (n *Ensemble) Start() *Ensemble { err = lr.Close() require.NoError(n.t, err) - enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)}) - require.NoError(n.t, err) + if m.options.mainMiner == nil { + enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)}) + require.NoError(n.t, err) - msg := &types.Message{ - From: m.OwnerKey.Address, - To: m.ActorAddr, - Method: miner.Methods.ChangePeerID, - Params: enc, - Value: types.NewInt(0), + msg := &types.Message{ + From: m.OwnerKey.Address, + To: m.ActorAddr, + Method: miner.Methods.ChangePeerID, + Params: enc, + Value: types.NewInt(0), + } + + _, err2 := m.FullNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(n.t, err2) } - _, err = m.FullNode.MpoolPushMessage(ctx, msg, nil) - require.NoError(n.t, err) - var mineBlock = make(chan lotusminer.MineReq) opts := []node.Option{ node.StorageMiner(&m.StorageMiner), @@ -458,16 +517,8 @@ func (n *Ensemble) Start() *Ensemble { if n.options.mockProofs { opts = append(opts, - node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) { - return mock.NewMockSectorMgr(presealSectors), nil - }), - node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))), - node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))), - node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))), - node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Override(new(ffiwrapper.Prover), mock.MockProver), - node.Unset(new(*sectorstorage.Manager)), ) } diff --git a/itests/kit2/ensemble_presets.go b/itests/kit2/ensemble_presets.go index 28a4b5d92..910374878 100644 --- a/itests/kit2/ensemble_presets.go +++ b/itests/kit2/ensemble_presets.go @@ -1,6 +1,9 @@ package kit2 -import "testing" +import ( + "testing" + "time" +) // EnsembleMinimal creates and starts an Ensemble with a single full node and a single miner. // It does not interconnect nodes nor does it begin mining. @@ -18,6 +21,29 @@ func EnsembleMinimal(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMin return &full, &miner, ens } +func EnsembleWithMarket(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) { + eopts, nopts := siftOptions(t, opts) + + var ( + fullnode TestFullNode + main, market TestMiner + ) + + mainNodeOpts := []NodeOpt{WithSubsystem(SSealing), WithSubsystem(SSectorStorage), WithSubsystem(SMining)} + mainNodeOpts = append(mainNodeOpts, nopts...) + + blockTime := 100 * time.Millisecond + ens := NewEnsemble(t, eopts...).FullNode(&fullnode, nopts...).Miner(&main, &fullnode, mainNodeOpts...).Start() + ens.InterconnectAll().BeginMining(blockTime) + + marketNodeOpts := []NodeOpt{OwnerAddr(fullnode.DefaultKey), MainMiner(&main), WithSubsystem(SStorageMarket)} + marketNodeOpts = append(marketNodeOpts, nopts...) + + ens.Miner(&market, &fullnode, marketNodeOpts...).Start().InterconnectAll() + + return &fullnode, &main, &market, ens +} + // EnsembleTwoOne creates and starts an Ensemble with two full nodes and one miner. // It does not interconnect nodes nor does it begin mining. // diff --git a/itests/kit2/node_miner.go b/itests/kit2/node_miner.go index 1cd65e20e..c4812711d 100644 --- a/itests/kit2/node_miner.go +++ b/itests/kit2/node_miner.go @@ -3,6 +3,7 @@ package kit2 import ( "context" "fmt" + "net" "strings" "testing" "time" @@ -20,6 +21,35 @@ import ( "github.com/stretchr/testify/require" ) +type MinerSubsystem int + +const ( + SStorageMarket MinerSubsystem = 1 << iota + SMining + SSealing + SSectorStorage + + MinerSubsystems = iota +) + +func (ms MinerSubsystem) Add(single MinerSubsystem) MinerSubsystem { + return ms | single +} + +func (ms MinerSubsystem) Has(single MinerSubsystem) bool { + return ms&single == single +} + +func (ms MinerSubsystem) All() [MinerSubsystems]bool { + var out [MinerSubsystems]bool + + for i := range out { + out[i] = ms&(1< 0 + } + + return out +} + // TestMiner represents a miner enrolled in an Ensemble. type TestMiner struct { api.StorageMiner @@ -43,6 +73,8 @@ type TestMiner struct { PrivKey libp2pcrypto.PrivKey } + RemoteListener net.Listener + options nodeOpts } diff --git a/itests/kit2/node_opts.go b/itests/kit2/node_opts.go index 59d5454df..c165d9517 100644 --- a/itests/kit2/node_opts.go +++ b/itests/kit2/node_opts.go @@ -23,6 +23,9 @@ type nodeOpts struct { rpc bool ownerKey *wallet.Key extraNodeOpts []node.Option + + subsystems MinerSubsystem + mainMiner *TestMiner } // DefaultNodeOpts are the default options that will be applied to test nodes. @@ -34,6 +37,31 @@ var DefaultNodeOpts = nodeOpts{ // NodeOpt is a functional option for test nodes. type NodeOpt func(opts *nodeOpts) error +func WithAllSubsystems() NodeOpt { + return func(opts *nodeOpts) error { + opts.subsystems = opts.subsystems.Add(SStorageMarket) + opts.subsystems = opts.subsystems.Add(SMining) + opts.subsystems = opts.subsystems.Add(SSealing) + opts.subsystems = opts.subsystems.Add(SSectorStorage) + + return nil + } +} + +func WithSubsystem(single MinerSubsystem) NodeOpt { + return func(opts *nodeOpts) error { + opts.subsystems = opts.subsystems.Add(single) + return nil + } +} + +func MainMiner(m *TestMiner) NodeOpt { + return func(opts *nodeOpts) error { + opts.mainMiner = m + return nil + } +} + // OwnerBalance specifies the balance to be attributed to a miner's owner // account. Only relevant when creating a miner. func OwnerBalance(balance abi.TokenAmount) NodeOpt { diff --git a/itests/wdpost_test.go b/itests/wdpost_test.god similarity index 100% rename from itests/wdpost_test.go rename to itests/wdpost_test.god diff --git a/node/rpc.go b/node/rpc.go index 06d3628ee..f90dfaed6 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -131,7 +131,7 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) { m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof if !permissioned { - return rpcServer, nil + return m, nil } ah := &auth.Handler{