package kit import ( "bytes" "context" "crypto/rand" "encoding/binary" "fmt" "net" "net/http" "os" "sync" "testing" "time" "github.com/google/uuid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-statestore" miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/gen" genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test" lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" pipeline "github.com/filecoin-project/lotus/storage/pipeline" sectorstorage "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/mock" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) func init() { chain.BootstrapPeerThreshold = 1 messagepool.HeadChangeCoalesceMinDelay = time.Microsecond messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond } // Ensemble is a collection of nodes instantiated within a test. // // Create a new ensemble with: // // ens := kit.NewEnsemble() // // Create full nodes and miners: // // var full TestFullNode // var miner TestMiner // ens.FullNode(&full, opts...) // populates a full node // ens.Miner(&miner, &full, opts...) // populates a miner, using the full node as its chain daemon // // It is possible to pass functional options to set initial balances, // presealed sectors, owner keys, etc. // // After the initial nodes are added, call `ens.Start()` to forge genesis // and start the network. Mining will NOT be started automatically. It needs // to be started explicitly by calling `BeginMining`. // // Nodes also need to be connected with one another, either via `ens.Connect()` // or `ens.InterconnectAll()`. A common inchantation for simple tests is to do: // // ens.InterconnectAll().BeginMining(blocktime) // // You can continue to add more nodes, but you must always follow with // `ens.Start()` to activate the new nodes. // // The API is chainable, so it's possible to do a lot in a very succinct way: // // kit.NewEnsemble().FullNode(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining() // // You can also find convenient fullnode:miner presets, such as 1:1, 1:2, // and 2:1, e.g.: // // kit.EnsembleMinimal() // kit.EnsembleOneTwo() // kit.EnsembleTwoOne() type Ensemble struct { t *testing.T bootstrapped bool genesisBlock bytes.Buffer mn mocknet.Mocknet options *ensembleOpts inactive struct { fullnodes []*TestFullNode miners []*TestMiner workers []*TestWorker } active struct { fullnodes []*TestFullNode miners []*TestMiner workers []*TestWorker bms map[*TestMiner]*BlockMiner } genesis struct { version network.Version miners []genesis.Miner accounts []genesis.Actor } } // NewEnsemble instantiates a new blank Ensemble. func NewEnsemble(t *testing.T, opts ...EnsembleOpt) *Ensemble { options := DefaultEnsembleOpts for _, o := range opts { err := o(&options) require.NoError(t, err) } n := &Ensemble{t: t, options: &options} n.active.bms = make(map[*TestMiner]*BlockMiner) for _, up := range options.upgradeSchedule { if up.Height < 0 { n.genesis.version = up.Network } } // add accounts from ensemble options to genesis. for _, acc := range options.accounts { n.genesis.accounts = append(n.genesis.accounts, genesis.Actor{ Type: genesis.TAccount, Balance: acc.initialBalance, Meta: (&genesis.AccountMeta{Owner: acc.key.Address}).ActorMeta(), }) } // Ensure we're using the right actors. This really shouldn't be some global thing, but it's // the best we can do for now. if n.options.mockProofs { require.NoError(t, build.UseNetworkBundle("testing-fake-proofs")) } else { require.NoError(t, build.UseNetworkBundle("testing")) } return n } // Mocknet returns the underlying mocknet. func (n *Ensemble) Mocknet() mocknet.Mocknet { return n.mn } func (n *Ensemble) NewPrivKey() (libp2pcrypto.PrivKey, peer.ID) { privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader) require.NoError(n.t, err) peerId, err := peer.IDFromPrivateKey(privkey) require.NoError(n.t, err) return privkey, peerId } // FullNode enrolls a new full node. func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble { options := DefaultNodeOpts for _, o := range opts { err := o(&options) require.NoError(n.t, err) } key, err := key.GenerateKey(types.KTBLS) require.NoError(n.t, err) if !n.bootstrapped && !options.balance.IsZero() { // if we still haven't forged genesis, create a key+address, and assign // it some FIL; this will be set as the default wallet when the node is // started. genacc := genesis.Actor{ Type: genesis.TAccount, Balance: options.balance, Meta: (&genesis.AccountMeta{Owner: key.Address}).ActorMeta(), } n.genesis.accounts = append(n.genesis.accounts, genacc) } *full = TestFullNode{t: n.t, options: options, DefaultKey: key, EthSubRouter: gateway.NewEthSubHandler()} n.inactive.fullnodes = append(n.inactive.fullnodes, full) return n } // Miner enrolls a new miner, using the provided full node for chain // interactions. func (n *Ensemble) MinerEnroll(minerNode *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble { require.NotNil(n.t, full, "full node required when instantiating miner") options := DefaultNodeOpts for _, o := range opts { err := o(&options) require.NoError(n.t, err) } privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader) require.NoError(n.t, err) peerId, err := peer.IDFromPrivateKey(privkey) require.NoError(n.t, err) tdir, err := os.MkdirTemp("", "preseal-memgen") require.NoError(n.t, err) minerCnt := len(n.inactive.miners) + len(n.active.miners) actorAddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(minerCnt)) require.NoError(n.t, err) if options.mainMiner != nil { actorAddr = options.mainMiner.ActorAddr } ownerKey := options.ownerKey var presealSectors int if !n.bootstrapped { presealSectors = options.sectors var ( k *types.KeyInfo genm *genesis.Miner ) // Will use 2KiB sectors by default (default value of sectorSize). proofType, err := miner.SealProofTypeFromSectorSize(options.sectorSize, n.genesis.version) require.NoError(n.t, err) // Create the preseal commitment. if n.options.mockProofs { genm, k, err = mock.PreSeal(proofType, actorAddr, presealSectors) } else { genm, k, err = seed.PreSeal(actorAddr, proofType, 0, presealSectors, tdir, []byte("make genesis mem random"), nil, true) } require.NoError(n.t, err) genm.PeerId = peerId // create an owner key, and assign it some FIL. ownerKey, err = key.NewKey(*k) require.NoError(n.t, err) genacc := genesis.Actor{ Type: genesis.TAccount, Balance: options.balance, Meta: (&genesis.AccountMeta{Owner: ownerKey.Address}).ActorMeta(), } n.genesis.miners = append(n.genesis.miners, *genm) n.genesis.accounts = append(n.genesis.accounts, genacc) } else { require.NotNil(n.t, ownerKey, "worker key can't be null if initializing a miner after genesis") } rl, err := net.Listen("tcp", "127.0.0.1:") require.NoError(n.t, err) *minerNode = TestMiner{ t: n.t, ActorAddr: actorAddr, OwnerKey: ownerKey, FullNode: full, PresealDir: tdir, PresealSectors: presealSectors, options: options, RemoteListener: rl, } minerNode.Libp2p.PeerID = peerId minerNode.Libp2p.PrivKey = privkey return n } func (n *Ensemble) AddInactiveMiner(m *TestMiner) { n.inactive.miners = append(n.inactive.miners, m) } func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble { n.MinerEnroll(minerNode, full, opts...) n.AddInactiveMiner(minerNode) return n } // Worker enrolls a new worker, using the provided full node for chain // interactions. func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...NodeOpt) *Ensemble { require.NotNil(n.t, minerNode, "miner node required when instantiating worker") options := DefaultNodeOpts for _, o := range opts { err := o(&options) require.NoError(n.t, err) } rl, err := net.Listen("tcp", "127.0.0.1:") require.NoError(n.t, err) *worker = TestWorker{ t: n.t, MinerNode: minerNode, RemoteListener: rl, options: options, Stop: func(ctx context.Context) error { return nil }, } n.inactive.workers = append(n.inactive.workers, worker) return n } // Start starts all enrolled nodes. func (n *Ensemble) Start() *Ensemble { ctx := context.Background() var gtempl *genesis.Template if !n.bootstrapped { // We haven't been bootstrapped yet, we need to generate genesis and // create the networking backbone. gtempl = n.generateGenesis() n.mn = mocknet.New() } // --------------------- // FULL NODES // --------------------- // Create all inactive full nodes. for i, full := range n.inactive.fullnodes { var r repo.Repo if !full.options.fsrepo { rmem := repo.NewMemory(nil) n.t.Cleanup(rmem.Cleanup) r = rmem } else { repoPath := n.t.TempDir() rfs, err := repo.NewFS(repoPath) require.NoError(n.t, err) require.NoError(n.t, rfs.Init(repo.FullNode)) r = rfs } // setup config with options lr, err := r.Lock(repo.FullNode) require.NoError(n.t, err) ks, err := lr.KeyStore() require.NoError(n.t, err) if full.Pkey != nil { pk, err := libp2pcrypto.MarshalPrivateKey(full.Pkey.PrivKey) require.NoError(n.t, err) err = ks.Put("libp2p-host", types.KeyInfo{ Type: "libp2p-host", PrivateKey: pk, }) require.NoError(n.t, err) } c, err := lr.Config() require.NoError(n.t, err) cfg, ok := c.(*config.FullNode) if !ok { n.t.Fatalf("invalid config from repo, got: %T", c) } for _, opt := range full.options.cfgOpts { require.NoError(n.t, opt(cfg)) } err = lr.SetConfig(func(raw interface{}) { rcfg := raw.(*config.FullNode) *rcfg = *cfg }) require.NoError(n.t, err) err = lr.Close() require.NoError(n.t, err) opts := []node.Option{ node.FullAPI(&full.FullNode, node.Lite(full.options.lite)), node.Base(), node.Repo(r), node.If(full.options.disableLibp2p, node.MockHost(n.mn)), node.Test(), // so that we subscribe to pubsub topics immediately node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), // upgrades node.Override(new(stmgr.UpgradeSchedule), n.options.upgradeSchedule), } // append any node builder options. opts = append(opts, full.options.extraNodeOpts...) // Either generate the genesis or inject it. if i == 0 && !n.bootstrapped { opts = append(opts, node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&n.genesisBlock, *gtempl))) } else { opts = append(opts, node.Override(new(modules.Genesis), modules.LoadGenesis(n.genesisBlock.Bytes()))) } // Are we mocking proofs? if n.options.mockProofs { opts = append(opts, node.Override(new(storiface.Verifier), mock.MockVerifier), node.Override(new(storiface.Prover), mock.MockProver), ) } // Call option builders, passing active nodes as the parameter for _, bopt := range full.options.optBuilders { opts = append(opts, bopt(n.active.fullnodes)) } // Construct the full node. stop, err := node.New(ctx, opts...) full.Stop = stop require.NoError(n.t, err) addr, err := full.WalletImport(context.Background(), &full.DefaultKey.KeyInfo) require.NoError(n.t, err) err = full.WalletSetDefault(context.Background(), addr) require.NoError(n.t, err) var rpcShutdownOnce sync.Once var stopOnce sync.Once var stopErr error stopFunc := stop stop = func(ctx context.Context) error { stopOnce.Do(func() { stopErr = stopFunc(ctx) }) return stopErr } // Are we hitting this node through its RPC? if full.options.rpc { withRPC, rpcCloser := fullRpc(n.t, full) n.inactive.fullnodes[i] = withRPC full.Stop = func(ctx2 context.Context) error { rpcShutdownOnce.Do(rpcCloser) return stop(ctx) } n.t.Cleanup(func() { rpcShutdownOnce.Do(rpcCloser) }) } n.t.Cleanup(func() { _ = stop(context.Background()) }) n.active.fullnodes = append(n.active.fullnodes, full) } // If we are here, we have processed all inactive fullnodes and moved them // to active, so clear the slice. n.inactive.fullnodes = n.inactive.fullnodes[:0] // Link all the nodes. err := n.mn.LinkAll() require.NoError(n.t, err) // --------------------- // MINERS // --------------------- // Create all inactive miners. for i, m := range n.inactive.miners { if n.bootstrapped { if m.options.mainMiner == nil { // this is a miner created after genesis, so it won't have a preseal. // we need to create it on chain. proofType, err := miner.WindowPoStProofTypeFromSectorSize(m.options.sectorSize, n.genesis.version) require.NoError(n.t, err) params, aerr := actors.SerializeParams(&power3.CreateMinerParams{ Owner: m.OwnerKey.Address, Worker: m.OwnerKey.Address, WindowPoStProofType: proofType, Peer: abi.PeerID(m.Libp2p.PeerID), }) require.NoError(n.t, aerr) createStorageMinerMsg := &types.Message{ From: m.OwnerKey.Address, To: power.Address, Value: big.Zero(), Method: power.Methods.CreateMiner, Params: params, } signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, &api.MessageSendSpec{ MsgUuid: uuid.New(), }) require.NoError(n.t, err) mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) require.NoError(n.t, err) require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode) var retval power3.CreateMinerReturn err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)) require.NoError(n.t, err, "failed to create miner") m.ActorAddr = retval.IDAddress } else { params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)}) require.NoError(n.t, err) msg := &types.Message{ To: m.options.mainMiner.ActorAddr, From: m.options.mainMiner.OwnerKey.Address, Method: builtin.MethodsMiner.ChangePeerID, Params: params, Value: types.NewInt(0), } signed, err2 := m.FullNode.FullNode.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{ MsgUuid: uuid.New(), }) require.NoError(n.t, err2) mw, err2 := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) require.NoError(n.t, err2) require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode) } } has, err := m.FullNode.WalletHas(ctx, m.OwnerKey.Address) require.NoError(n.t, err) // Only import the owner's full key into our companion full node, if we // don't have it still. if !has { _, err = m.FullNode.WalletImport(ctx, &m.OwnerKey.KeyInfo) require.NoError(n.t, err) } // // Set it as the default address. // err = m.FullNode.WalletSetDefault(ctx, m.OwnerAddr.Address) // require.NoError(n.t, err) r := repo.NewMemory(nil) n.t.Cleanup(r.Cleanup) lr, err := r.Lock(repo.StorageMiner) require.NoError(n.t, err) c, err := lr.Config() require.NoError(n.t, err) cfg, ok := c.(*config.StorageMiner) if !ok { n.t.Fatalf("invalid config from repo, got: %T", c) } cfg.Common.API.RemoteListenAddress = m.RemoteListener.Addr().String() cfg.Subsystems.EnableMarkets = m.options.subsystems.Has(SMarkets) cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining) cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing) cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage) cfg.Dealmaking.MaxStagingDealsBytes = m.options.maxStagingDealsBytes if m.options.mainMiner != nil { token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions) require.NoError(n.t, err) cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) } err = lr.SetConfig(func(raw interface{}) { rcfg := raw.(*config.StorageMiner) *rcfg = *cfg }) require.NoError(n.t, err) ks, err := lr.KeyStore() require.NoError(n.t, err) pk, err := libp2pcrypto.MarshalPrivateKey(m.Libp2p.PrivKey) require.NoError(n.t, err) err = ks.Put("libp2p-host", types.KeyInfo{ Type: "libp2p-host", PrivateKey: pk, }) require.NoError(n.t, err) ds, err := lr.Datastore(context.TODO(), "/metadata") require.NoError(n.t, err) err = ds.Put(ctx, datastore.NewKey("miner-address"), m.ActorAddr.Bytes()) require.NoError(n.t, err) if i < len(n.genesis.miners) && !n.bootstrapped { // if this is a genesis miner, import preseal metadata require.NoError(n.t, importPreSealMeta(ctx, n.genesis.miners[i], ds)) } // using real proofs, therefore need real sectors. if !n.bootstrapped && !n.options.mockProofs { psd := m.PresealDir noPaths := m.options.noStorage err := lr.SetStorage(func(sc *storiface.StorageConfig) { if noPaths { sc.StoragePaths = []storiface.LocalPath{} } sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: psd}) }) require.NoError(n.t, err) } err = lr.Close() require.NoError(n.t, err) if m.options.mainMiner == nil { enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)}) require.NoError(n.t, err) msg := &types.Message{ From: m.OwnerKey.Address, To: m.ActorAddr, Method: builtin.MethodsMiner.ChangePeerID, Params: enc, Value: types.NewInt(0), } _, err2 := m.FullNode.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{ MsgUuid: uuid.New(), }) require.NoError(n.t, err2) } noLocal := m.options.minerNoLocalSealing assigner := m.options.minerAssigner disallowRemoteFinalize := m.options.disallowRemoteFinalize var mineBlock = make(chan lotusminer.MineReq) copy := *m.FullNode copy.FullNode = modules.MakeUuidWrapper(copy.FullNode) m.FullNode = © //m.FullNode.FullNode = modules.MakeUuidWrapper(fn.FullNode) opts := []node.Option{ node.StorageMiner(&m.StorageMiner, cfg.Subsystems), node.Base(), node.Repo(r), node.Test(), node.If(m.options.disableLibp2p, node.MockHost(n.mn)), //node.Override(new(v1api.RawFullNodeAPI), func() api.FullNode { return modules.MakeUuidWrapper(m.FullNode) }), //node.Override(new(v1api.RawFullNodeAPI), modules.MakeUuidWrapper), node.Override(new(v1api.RawFullNodeAPI), m.FullNode), node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)), // disable resource filtering so that local worker gets assigned tasks // regardless of system pressure. node.Override(new(config.SealerConfig), func() config.SealerConfig { scfg := config.DefaultStorageMiner() if noLocal { scfg.Storage.AllowSectorDownload = false scfg.Storage.AllowAddPiece = false scfg.Storage.AllowPreCommit1 = false scfg.Storage.AllowPreCommit2 = false scfg.Storage.AllowCommit = false scfg.Storage.AllowUnseal = false } scfg.Storage.Assigner = assigner scfg.Storage.DisallowRemoteFinalize = disallowRemoteFinalize scfg.Storage.ResourceFiltering = config.ResourceFilteringDisabled return scfg.Storage }), // upgrades node.Override(new(stmgr.UpgradeSchedule), n.options.upgradeSchedule), } if m.options.subsystems.Has(SMarkets) { opts = append(opts, node.Override(new(idxprov.MeshCreator), idxprov_test.NewNoopMeshCreator), ) } // append any node builder options. opts = append(opts, m.options.extraNodeOpts...) idAddr, err := address.IDFromAddress(m.ActorAddr) require.NoError(n.t, err) // preload preseals if the network still hasn't bootstrapped. var presealSectors []abi.SectorID if !n.bootstrapped { sectors := n.genesis.miners[i].Sectors for _, sector := range sectors { presealSectors = append(presealSectors, abi.SectorID{ Miner: abi.ActorID(idAddr), Number: sector.SectorID, }) } } if n.options.mockProofs { opts = append(opts, node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) { return mock.NewMockSectorMgr(presealSectors), nil }), node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))), node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))), node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))), node.Override(new(storiface.Verifier), mock.MockVerifier), node.Override(new(storiface.Prover), mock.MockProver), node.Unset(new(*sectorstorage.Manager)), ) } // start node stop, err := node.New(ctx, opts...) require.NoError(n.t, err) n.t.Cleanup(func() { _ = stop(context.Background()) }) m.BaseAPI = m.StorageMiner // Are we hitting this node through its RPC? if m.options.rpc { withRPC := minerRpc(n.t, m) n.inactive.miners[i] = withRPC } mineOne := func(ctx context.Context, req lotusminer.MineReq) error { select { case mineBlock <- req: return nil case <-ctx.Done(): return ctx.Err() } } m.MineOne = mineOne m.Stop = stop n.active.miners = append(n.active.miners, m) } // If we are here, we have processed all inactive miners and moved them // to active, so clear the slice. n.inactive.miners = n.inactive.miners[:0] // --------------------- // WORKERS // --------------------- // Create all inactive workers. for i, m := range n.inactive.workers { r := repo.NewMemory(nil) lr, err := r.Lock(repo.Worker) require.NoError(n.t, err) if m.options.noStorage { err := lr.SetStorage(func(sc *storiface.StorageConfig) { sc.StoragePaths = []storiface.LocalPath{} }) require.NoError(n.t, err) } ds, err := lr.Datastore(context.Background(), "/metadata") require.NoError(n.t, err) addr := m.RemoteListener.Addr().String() localStore, err := paths.NewLocal(ctx, lr, m.MinerNode, []string{"http://" + addr + "/remote"}) require.NoError(n.t, err) auth := http.Header(nil) remote := paths.NewRemote(localStore, m.MinerNode, auth, 20, &paths.DefaultPartialFileHandler{}) store := m.options.workerStorageOpt(remote) fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}} m.FetchHandler = fh.ServeHTTP wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix)) workerApi := &sealworker.Worker{ LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{ TaskTypes: m.options.workerTasks, NoSwap: false, Name: m.options.workerName, }, store, localStore, m.MinerNode, m.MinerNode, wsts), LocalStore: localStore, Storage: lr, } m.Worker = workerApi require.True(n.t, m.options.rpc) withRPC := workerRpc(n.t, m) n.inactive.workers[i] = withRPC err = m.MinerNode.WorkerConnect(ctx, "http://"+addr+"/rpc/v0") require.NoError(n.t, err) n.active.workers = append(n.active.workers, m) } // If we are here, we have processed all inactive workers and moved them // to active, so clear the slice. n.inactive.workers = n.inactive.workers[:0] // --------------------- // MISC // --------------------- // Link all the nodes. err = n.mn.LinkAll() require.NoError(n.t, err) if !n.bootstrapped && len(n.active.miners) > 0 { // We have *just* bootstrapped, so mine 2 blocks to setup some CE stuff in some actors var wait sync.Mutex wait.Lock() observer := n.active.fullnodes[0] bm := NewBlockMiner(n.t, n.active.miners[0]) n.t.Cleanup(bm.Stop) bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) { wait.Unlock() }) wait.Lock() bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) { wait.Unlock() }) wait.Lock() n.bootstrapped = true } return n } // InterconnectAll connects all miners and full nodes to one another. func (n *Ensemble) InterconnectAll() *Ensemble { // connect full nodes to miners. for _, from := range n.active.fullnodes { for _, to := range n.active.miners { // []*TestMiner to []api.CommonAPI type coercion not possible // so cannot use variadic form. n.Connect(from, to) } } // connect full nodes between each other, skipping ourselves. last := len(n.active.fullnodes) - 1 for i, from := range n.active.fullnodes { if i == last { continue } for _, to := range n.active.fullnodes[i+1:] { n.Connect(from, to) } } return n } // Connect connects one full node to the provided full nodes. func (n *Ensemble) Connect(from api.Net, to ...api.Net) *Ensemble { addr, err := from.NetAddrsListen(context.Background()) require.NoError(n.t, err) for _, other := range to { err = other.NetConnect(context.Background(), addr) require.NoError(n.t, err) } return n } func (n *Ensemble) BeginMiningMustPost(blocktime time.Duration, miners ...*TestMiner) []*BlockMiner { ctx := context.Background() // wait one second to make sure that nodes are connected and have handshaken. // TODO make this deterministic by listening to identify events on the // libp2p eventbus instead (or something else). time.Sleep(1 * time.Second) var bms []*BlockMiner if len(miners) == 0 { // no miners have been provided explicitly, instantiate block miners // for all active miners that aren't still mining. for _, m := range n.active.miners { if _, ok := n.active.bms[m]; ok { continue // skip, already have a block miner } miners = append(miners, m) } } if len(miners) > 1 { n.t.Fatalf("Only one active miner for MustPost, but have %d", len(miners)) } for _, m := range miners { bm := NewBlockMiner(n.t, m) bm.MineBlocksMustPost(ctx, blocktime) n.t.Cleanup(bm.Stop) bms = append(bms, bm) n.active.bms[m] = bm } return bms } // BeginMining kicks off mining for the specified miners. If nil or 0-length, // it will kick off mining for all enrolled and active miners. It also adds a // cleanup function to stop all mining operations on test teardown. func (n *Ensemble) BeginMining(blocktime time.Duration, miners ...*TestMiner) []*BlockMiner { ctx := context.Background() // wait one second to make sure that nodes are connected and have handshaken. // TODO make this deterministic by listening to identify events on the // libp2p eventbus instead (or something else). time.Sleep(1 * time.Second) var bms []*BlockMiner if len(miners) == 0 { // no miners have been provided explicitly, instantiate block miners // for all active miners that aren't still mining. for _, m := range n.active.miners { if _, ok := n.active.bms[m]; ok { continue // skip, already have a block miner } miners = append(miners, m) } } for _, m := range miners { bm := NewBlockMiner(n.t, m) bm.MineBlocks(ctx, blocktime) n.t.Cleanup(bm.Stop) bms = append(bms, bm) n.active.bms[m] = bm } return bms } func (n *Ensemble) generateGenesis() *genesis.Template { var verifRoot = gen.DefaultVerifregRootkeyActor if k := n.options.verifiedRoot.key; k != nil { verifRoot = genesis.Actor{ Type: genesis.TAccount, Balance: n.options.verifiedRoot.initialBalance, Meta: (&genesis.AccountMeta{Owner: k.Address}).ActorMeta(), } } templ := &genesis.Template{ NetworkVersion: n.genesis.version, Accounts: n.genesis.accounts, Miners: n.genesis.miners, NetworkName: "test", Timestamp: uint64(time.Now().Unix() - int64(n.options.pastOffset.Seconds())), VerifregRootKey: verifRoot, RemainderAccount: gen.DefaultRemainderAccountActor, } return templ } func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.MetadataDS) error { maxSectorID := abi.SectorNumber(0) for _, sector := range meta.Sectors { sectorKey := datastore.NewKey(pipeline.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID)) commD := sector.CommD commR := sector.CommR info := &pipeline.SectorInfo{ State: pipeline.Proving, SectorNumber: sector.SectorID, Pieces: []api.SectorPiece{ { Piece: abi.PieceInfo{ Size: abi.PaddedPieceSize(meta.SectorSize), PieceCID: commD, }, DealInfo: nil, // todo: likely possible to get, but not really that useful }, }, CommD: &commD, CommR: &commR, } b, err := cborutil.Dump(info) if err != nil { return err } if err := mds.Put(ctx, sectorKey, b); err != nil { return err } if sector.SectorID > maxSectorID { maxSectorID = sector.SectorID } } buf := make([]byte, binary.MaxVarintLen64) size := binary.PutUvarint(buf, uint64(maxSectorID)) return mds.Put(ctx, datastore.NewKey(pipeline.StorageCounterDSPrefix), buf[:size]) }