lotus/itests/kit/ensemble.go
2023-10-17 10:19:51 -05:00

1070 lines
30 KiB
Go

package kit
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"net"
"net/http"
"os"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-statestore"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/gen"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet/key"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/markets/idxprov"
"github.com/filecoin-project/lotus/markets/idxprov/idxprov_test"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
sectorstorage "github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/mock"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func init() {
chain.BootstrapPeerThreshold = 1
messagepool.HeadChangeCoalesceMinDelay = time.Microsecond
messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond
messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond
}
// Ensemble is a collection of nodes instantiated within a test.
//
// Create a new ensemble with:
//
// ens := kit.NewEnsemble()
//
// Create full nodes and miners:
//
// var full TestFullNode
// var miner TestMiner
// ens.FullNode(&full, opts...) // populates a full node
// ens.Miner(&miner, &full, opts...) // populates a miner, using the full node as its chain daemon
//
// It is possible to pass functional options to set initial balances,
// presealed sectors, owner keys, etc.
//
// After the initial nodes are added, call `ens.Start()` to forge genesis
// and start the network. Mining will NOT be started automatically. It needs
// to be started explicitly by calling `BeginMining`.
//
// Nodes also need to be connected with one another, either via `ens.Connect()`
// or `ens.InterconnectAll()`. A common inchantation for simple tests is to do:
//
// ens.InterconnectAll().BeginMining(blocktime)
//
// You can continue to add more nodes, but you must always follow with
// `ens.Start()` to activate the new nodes.
//
// The API is chainable, so it's possible to do a lot in a very succinct way:
//
// kit.NewEnsemble().FullNode(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining()
//
// You can also find convenient fullnode:miner presets, such as 1:1, 1:2,
// and 2:1, e.g.:
//
// kit.EnsembleMinimal()
// kit.EnsembleOneTwo()
// kit.EnsembleTwoOne()
type Ensemble struct {
t *testing.T
bootstrapped bool
genesisBlock bytes.Buffer
mn mocknet.Mocknet
options *ensembleOpts
inactive struct {
fullnodes []*TestFullNode
miners []*TestMiner
workers []*TestWorker
}
active struct {
fullnodes []*TestFullNode
miners []*TestMiner
workers []*TestWorker
bms map[*TestMiner]*BlockMiner
}
genesis struct {
version network.Version
miners []genesis.Miner
accounts []genesis.Actor
}
}
// NewEnsemble instantiates a new blank Ensemble.
func NewEnsemble(t *testing.T, opts ...EnsembleOpt) *Ensemble {
options := DefaultEnsembleOpts
for _, o := range opts {
err := o(&options)
require.NoError(t, err)
}
n := &Ensemble{t: t, options: &options}
n.active.bms = make(map[*TestMiner]*BlockMiner)
for _, up := range options.upgradeSchedule {
if up.Height < 0 {
n.genesis.version = up.Network
}
}
// add accounts from ensemble options to genesis.
for _, acc := range options.accounts {
n.genesis.accounts = append(n.genesis.accounts, genesis.Actor{
Type: genesis.TAccount,
Balance: acc.initialBalance,
Meta: (&genesis.AccountMeta{Owner: acc.key.Address}).ActorMeta(),
})
}
// Ensure we're using the right actors. This really shouldn't be some global thing, but it's
// the best we can do for now.
if n.options.mockProofs {
require.NoError(t, build.UseNetworkBundle("testing-fake-proofs"))
} else {
require.NoError(t, build.UseNetworkBundle("testing"))
}
build.EquivocationDelaySecs = 0
return n
}
// Mocknet returns the underlying mocknet.
func (n *Ensemble) Mocknet() mocknet.Mocknet {
return n.mn
}
func (n *Ensemble) NewPrivKey() (libp2pcrypto.PrivKey, peer.ID) {
privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader)
require.NoError(n.t, err)
peerId, err := peer.IDFromPrivateKey(privkey)
require.NoError(n.t, err)
return privkey, peerId
}
// FullNode enrolls a new full node.
func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble {
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
key, err := key.GenerateKey(types.KTBLS)
require.NoError(n.t, err)
if !n.bootstrapped && !options.balance.IsZero() {
// if we still haven't forged genesis, create a key+address, and assign
// it some FIL; this will be set as the default wallet when the node is
// started.
genacc := genesis.Actor{
Type: genesis.TAccount,
Balance: options.balance,
Meta: (&genesis.AccountMeta{Owner: key.Address}).ActorMeta(),
}
n.genesis.accounts = append(n.genesis.accounts, genacc)
}
*full = TestFullNode{t: n.t, options: options, DefaultKey: key, EthSubRouter: gateway.NewEthSubHandler()}
n.inactive.fullnodes = append(n.inactive.fullnodes, full)
return n
}
// Miner enrolls a new miner, using the provided full node for chain
// interactions.
func (n *Ensemble) MinerEnroll(minerNode *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble {
require.NotNil(n.t, full, "full node required when instantiating miner")
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader)
require.NoError(n.t, err)
peerId, err := peer.IDFromPrivateKey(privkey)
require.NoError(n.t, err)
tdir, err := os.MkdirTemp("", "preseal-memgen")
require.NoError(n.t, err)
minerCnt := len(n.inactive.miners) + len(n.active.miners)
actorAddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(minerCnt))
require.NoError(n.t, err)
if options.mainMiner != nil {
actorAddr = options.mainMiner.ActorAddr
}
ownerKey := options.ownerKey
var presealSectors int
if !n.bootstrapped {
presealSectors = options.sectors
var (
k *types.KeyInfo
genm *genesis.Miner
)
// Will use 2KiB sectors by default (default value of sectorSize).
proofType, err := miner.SealProofTypeFromSectorSize(options.sectorSize, n.genesis.version, false)
require.NoError(n.t, err)
// Create the preseal commitment.
if n.options.mockProofs {
genm, k, err = mock.PreSeal(proofType, actorAddr, presealSectors)
} else {
genm, k, err = seed.PreSeal(actorAddr, proofType, 0, presealSectors, tdir, []byte("make genesis mem random"), nil, true)
}
require.NoError(n.t, err)
genm.PeerId = peerId
// create an owner key, and assign it some FIL.
ownerKey, err = key.NewKey(*k)
require.NoError(n.t, err)
genacc := genesis.Actor{
Type: genesis.TAccount,
Balance: options.balance,
Meta: (&genesis.AccountMeta{Owner: ownerKey.Address}).ActorMeta(),
}
n.genesis.miners = append(n.genesis.miners, *genm)
n.genesis.accounts = append(n.genesis.accounts, genacc)
} else {
require.NotNil(n.t, ownerKey, "worker key can't be null if initializing a miner after genesis")
}
rl, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(n.t, err)
*minerNode = TestMiner{
t: n.t,
ActorAddr: actorAddr,
OwnerKey: ownerKey,
FullNode: full,
PresealDir: tdir,
PresealSectors: presealSectors,
options: options,
RemoteListener: rl,
}
minerNode.Libp2p.PeerID = peerId
minerNode.Libp2p.PrivKey = privkey
return n
}
func (n *Ensemble) AddInactiveMiner(m *TestMiner) {
n.inactive.miners = append(n.inactive.miners, m)
}
func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble {
n.MinerEnroll(minerNode, full, opts...)
n.AddInactiveMiner(minerNode)
return n
}
// Worker enrolls a new worker, using the provided full node for chain
// interactions.
func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...NodeOpt) *Ensemble {
require.NotNil(n.t, minerNode, "miner node required when instantiating worker")
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
rl, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(n.t, err)
*worker = TestWorker{
t: n.t,
MinerNode: minerNode,
RemoteListener: rl,
options: options,
Stop: func(ctx context.Context) error { return nil },
}
n.inactive.workers = append(n.inactive.workers, worker)
return n
}
// Start starts all enrolled nodes.
func (n *Ensemble) Start() *Ensemble {
ctx := context.Background()
var gtempl *genesis.Template
if !n.bootstrapped {
// We haven't been bootstrapped yet, we need to generate genesis and
// create the networking backbone.
gtempl = n.generateGenesis()
n.mn = mocknet.New()
}
// ---------------------
// FULL NODES
// ---------------------
// Create all inactive full nodes.
for i, full := range n.inactive.fullnodes {
var r repo.Repo
if !full.options.fsrepo {
rmem := repo.NewMemory(nil)
n.t.Cleanup(rmem.Cleanup)
r = rmem
} else {
repoPath := n.t.TempDir()
rfs, err := repo.NewFS(repoPath)
require.NoError(n.t, err)
require.NoError(n.t, rfs.Init(repo.FullNode))
r = rfs
}
// setup config with options
lr, err := r.Lock(repo.FullNode)
require.NoError(n.t, err)
ks, err := lr.KeyStore()
require.NoError(n.t, err)
if full.Pkey != nil {
pk, err := libp2pcrypto.MarshalPrivateKey(full.Pkey.PrivKey)
require.NoError(n.t, err)
err = ks.Put("libp2p-host", types.KeyInfo{
Type: "libp2p-host",
PrivateKey: pk,
})
require.NoError(n.t, err)
}
c, err := lr.Config()
require.NoError(n.t, err)
cfg, ok := c.(*config.FullNode)
if !ok {
n.t.Fatalf("invalid config from repo, got: %T", c)
}
for _, opt := range full.options.cfgOpts {
require.NoError(n.t, opt(cfg))
}
err = lr.SetConfig(func(raw interface{}) {
rcfg := raw.(*config.FullNode)
*rcfg = *cfg
})
require.NoError(n.t, err)
err = lr.Close()
require.NoError(n.t, err)
opts := []node.Option{
node.FullAPI(&full.FullNode, node.Lite(full.options.lite)),
node.Base(),
node.Repo(r),
node.If(full.options.disableLibp2p, node.MockHost(n.mn)),
node.Test(),
// so that we subscribe to pubsub topics immediately
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
// upgrades
node.Override(new(stmgr.UpgradeSchedule), n.options.upgradeSchedule),
}
// append any node builder options.
opts = append(opts, full.options.extraNodeOpts...)
// Either generate the genesis or inject it.
if i == 0 && !n.bootstrapped {
opts = append(opts, node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&n.genesisBlock, *gtempl)))
} else {
opts = append(opts, node.Override(new(modules.Genesis), modules.LoadGenesis(n.genesisBlock.Bytes())))
}
// Are we mocking proofs?
if n.options.mockProofs {
opts = append(opts,
node.Override(new(storiface.Verifier), mock.MockVerifier),
node.Override(new(storiface.Prover), mock.MockProver),
)
}
// Call option builders, passing active nodes as the parameter
for _, bopt := range full.options.optBuilders {
opts = append(opts, bopt(n.active.fullnodes))
}
// Construct the full node.
stop, err := node.New(ctx, opts...)
full.Stop = stop
require.NoError(n.t, err)
addr, err := full.WalletImport(context.Background(), &full.DefaultKey.KeyInfo)
require.NoError(n.t, err)
err = full.WalletSetDefault(context.Background(), addr)
require.NoError(n.t, err)
var rpcShutdownOnce sync.Once
var stopOnce sync.Once
var stopErr error
stopFunc := stop
stop = func(ctx context.Context) error {
stopOnce.Do(func() {
stopErr = stopFunc(ctx)
})
return stopErr
}
// Are we hitting this node through its RPC?
if full.options.rpc {
withRPC, rpcCloser := fullRpc(n.t, full)
n.inactive.fullnodes[i] = withRPC
full.Stop = func(ctx2 context.Context) error {
rpcShutdownOnce.Do(rpcCloser)
return stop(ctx)
}
n.t.Cleanup(func() { rpcShutdownOnce.Do(rpcCloser) })
}
n.t.Cleanup(func() {
_ = stop(context.Background())
})
n.active.fullnodes = append(n.active.fullnodes, full)
}
// If we are here, we have processed all inactive fullnodes and moved them
// to active, so clear the slice.
n.inactive.fullnodes = n.inactive.fullnodes[:0]
// Link all the nodes.
err := n.mn.LinkAll()
require.NoError(n.t, err)
// ---------------------
// MINERS
// ---------------------
// Create all inactive miners.
for i, m := range n.inactive.miners {
if n.bootstrapped {
if m.options.mainMiner == nil {
// this is a miner created after genesis, so it won't have a preseal.
// we need to create it on chain.
proofType, err := miner.WindowPoStProofTypeFromSectorSize(m.options.sectorSize, n.genesis.version)
require.NoError(n.t, err)
params, aerr := actors.SerializeParams(&power3.CreateMinerParams{
Owner: m.OwnerKey.Address,
Worker: m.OwnerKey.Address,
WindowPoStProofType: proofType,
Peer: abi.PeerID(m.Libp2p.PeerID),
})
require.NoError(n.t, aerr)
createStorageMinerMsg := &types.Message{
From: m.OwnerKey.Address,
To: power.Address,
Value: big.Zero(),
Method: power.Methods.CreateMiner,
Params: params,
}
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, &api.MessageSendSpec{
MsgUuid: uuid.New(),
})
require.NoError(n.t, err)
mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
var retval power3.CreateMinerReturn
err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))
require.NoError(n.t, err, "failed to create miner")
m.ActorAddr = retval.IDAddress
} else {
params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
msg := &types.Message{
To: m.options.mainMiner.ActorAddr,
From: m.options.mainMiner.OwnerKey.Address,
Method: builtin.MethodsMiner.ChangePeerID,
Params: params,
Value: types.NewInt(0),
}
signed, err2 := m.FullNode.FullNode.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{
MsgUuid: uuid.New(),
})
require.NoError(n.t, err2)
mw, err2 := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err2)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
}
}
has, err := m.FullNode.WalletHas(ctx, m.OwnerKey.Address)
require.NoError(n.t, err)
// Only import the owner's full key into our companion full node, if we
// don't have it still.
if !has {
_, err = m.FullNode.WalletImport(ctx, &m.OwnerKey.KeyInfo)
require.NoError(n.t, err)
}
// // Set it as the default address.
// err = m.FullNode.WalletSetDefault(ctx, m.OwnerAddr.Address)
// require.NoError(n.t, err)
r := repo.NewMemory(nil)
n.t.Cleanup(r.Cleanup)
lr, err := r.Lock(repo.StorageMiner)
require.NoError(n.t, err)
c, err := lr.Config()
require.NoError(n.t, err)
cfg, ok := c.(*config.StorageMiner)
if !ok {
n.t.Fatalf("invalid config from repo, got: %T", c)
}
cfg.Common.API.RemoteListenAddress = m.RemoteListener.Addr().String()
cfg.Subsystems.EnableMarkets = m.options.subsystems.Has(SMarkets)
cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining)
cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing)
cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage)
cfg.Dealmaking.MaxStagingDealsBytes = m.options.maxStagingDealsBytes
if m.options.mainMiner != nil {
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions)
require.NoError(n.t, err)
cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
}
err = lr.SetConfig(func(raw interface{}) {
rcfg := raw.(*config.StorageMiner)
*rcfg = *cfg
})
require.NoError(n.t, err)
ks, err := lr.KeyStore()
require.NoError(n.t, err)
pk, err := libp2pcrypto.MarshalPrivateKey(m.Libp2p.PrivKey)
require.NoError(n.t, err)
err = ks.Put("libp2p-host", types.KeyInfo{
Type: "libp2p-host",
PrivateKey: pk,
})
require.NoError(n.t, err)
ds, err := lr.Datastore(context.TODO(), "/metadata")
require.NoError(n.t, err)
err = ds.Put(ctx, datastore.NewKey("miner-address"), m.ActorAddr.Bytes())
require.NoError(n.t, err)
if i < len(n.genesis.miners) && !n.bootstrapped {
// if this is a genesis miner, import preseal metadata
require.NoError(n.t, importPreSealMeta(ctx, n.genesis.miners[i], ds))
}
// using real proofs, therefore need real sectors.
if !n.bootstrapped && !n.options.mockProofs {
psd := m.PresealDir
noPaths := m.options.noStorage
err := lr.SetStorage(func(sc *storiface.StorageConfig) {
if noPaths {
sc.StoragePaths = []storiface.LocalPath{}
}
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: psd})
})
require.NoError(n.t, err)
}
err = lr.Close()
require.NoError(n.t, err)
if m.options.mainMiner == nil {
enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
msg := &types.Message{
From: m.OwnerKey.Address,
To: m.ActorAddr,
Method: builtin.MethodsMiner.ChangePeerID,
Params: enc,
Value: types.NewInt(0),
}
_, err2 := m.FullNode.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{
MsgUuid: uuid.New(),
})
require.NoError(n.t, err2)
}
noLocal := m.options.minerNoLocalSealing
assigner := m.options.minerAssigner
disallowRemoteFinalize := m.options.disallowRemoteFinalize
var mineBlock = make(chan lotusminer.MineReq)
copy := *m.FullNode
copy.FullNode = modules.MakeUuidWrapper(copy.FullNode)
m.FullNode = &copy
//m.FullNode.FullNode = modules.MakeUuidWrapper(fn.FullNode)
opts := []node.Option{
node.StorageMiner(&m.StorageMiner, cfg.Subsystems),
node.Base(),
node.Repo(r),
node.Test(),
node.If(m.options.disableLibp2p, node.MockHost(n.mn)),
//node.Override(new(v1api.RawFullNodeAPI), func() api.FullNode { return modules.MakeUuidWrapper(m.FullNode) }),
//node.Override(new(v1api.RawFullNodeAPI), modules.MakeUuidWrapper),
node.Override(new(v1api.RawFullNodeAPI), m.FullNode),
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
// disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure.
node.Override(new(config.SealerConfig), func() config.SealerConfig {
scfg := config.DefaultStorageMiner()
if noLocal {
scfg.Storage.AllowSectorDownload = false
scfg.Storage.AllowAddPiece = false
scfg.Storage.AllowPreCommit1 = false
scfg.Storage.AllowPreCommit2 = false
scfg.Storage.AllowCommit = false
scfg.Storage.AllowUnseal = false
}
scfg.Storage.Assigner = assigner
scfg.Storage.DisallowRemoteFinalize = disallowRemoteFinalize
scfg.Storage.ResourceFiltering = config.ResourceFilteringDisabled
return scfg.Storage
}),
// upgrades
node.Override(new(stmgr.UpgradeSchedule), n.options.upgradeSchedule),
}
if m.options.subsystems.Has(SMarkets) {
opts = append(opts,
node.Override(new(idxprov.MeshCreator), idxprov_test.NewNoopMeshCreator),
)
}
// append any node builder options.
opts = append(opts, m.options.extraNodeOpts...)
idAddr, err := address.IDFromAddress(m.ActorAddr)
require.NoError(n.t, err)
// preload preseals if the network still hasn't bootstrapped.
var presealSectors []abi.SectorID
if !n.bootstrapped {
sectors := n.genesis.miners[i].Sectors
for _, sector := range sectors {
presealSectors = append(presealSectors, abi.SectorID{
Miner: abi.ActorID(idAddr),
Number: sector.SectorID,
})
}
}
if n.options.mockProofs {
opts = append(opts,
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(presealSectors), nil
}),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(storiface.Verifier), mock.MockVerifier),
node.Override(new(storiface.Prover), mock.MockProver),
node.Unset(new(*sectorstorage.Manager)),
)
}
// start node
stop, err := node.New(ctx, opts...)
require.NoError(n.t, err)
n.t.Cleanup(func() { _ = stop(context.Background()) })
m.BaseAPI = m.StorageMiner
// Are we hitting this node through its RPC?
if m.options.rpc {
withRPC := minerRpc(n.t, m)
n.inactive.miners[i] = withRPC
}
mineOne := func(ctx context.Context, req lotusminer.MineReq) error {
select {
case mineBlock <- req:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
m.MineOne = mineOne
m.Stop = stop
n.active.miners = append(n.active.miners, m)
}
// If we are here, we have processed all inactive miners and moved them
// to active, so clear the slice.
n.inactive.miners = n.inactive.miners[:0]
// ---------------------
// WORKERS
// ---------------------
// Create all inactive workers.
for i, m := range n.inactive.workers {
r := repo.NewMemory(nil)
lr, err := r.Lock(repo.Worker)
require.NoError(n.t, err)
if m.options.noStorage {
err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = []storiface.LocalPath{}
})
require.NoError(n.t, err)
}
ds, err := lr.Datastore(context.Background(), "/metadata")
require.NoError(n.t, err)
addr := m.RemoteListener.Addr().String()
localStore, err := paths.NewLocal(ctx, lr, m.MinerNode, []string{"http://" + addr + "/remote"})
require.NoError(n.t, err)
auth := http.Header(nil)
remote := paths.NewRemote(localStore, m.MinerNode, auth, 20, &paths.DefaultPartialFileHandler{})
store := m.options.workerStorageOpt(remote)
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
m.FetchHandler = fh.ServeHTTP
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
workerApi := &sealworker.Worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
TaskTypes: m.options.workerTasks,
NoSwap: false,
Name: m.options.workerName,
}, store, localStore, m.MinerNode, m.MinerNode, wsts),
LocalStore: localStore,
Storage: lr,
}
m.Worker = workerApi
require.True(n.t, m.options.rpc)
withRPC := workerRpc(n.t, m)
n.inactive.workers[i] = withRPC
err = m.MinerNode.WorkerConnect(ctx, "http://"+addr+"/rpc/v0")
require.NoError(n.t, err)
n.active.workers = append(n.active.workers, m)
}
// If we are here, we have processed all inactive workers and moved them
// to active, so clear the slice.
n.inactive.workers = n.inactive.workers[:0]
// ---------------------
// MISC
// ---------------------
// Link all the nodes.
err = n.mn.LinkAll()
require.NoError(n.t, err)
if !n.bootstrapped && len(n.active.miners) > 0 {
// We have *just* bootstrapped, so mine 2 blocks to setup some CE stuff in some actors
var wait sync.Mutex
wait.Lock()
observer := n.active.fullnodes[0]
bm := NewBlockMiner(n.t, n.active.miners[0])
n.t.Cleanup(bm.Stop)
bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) {
wait.Unlock()
})
wait.Lock()
bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) {
wait.Unlock()
})
wait.Lock()
n.bootstrapped = true
}
return n
}
// InterconnectAll connects all miners and full nodes to one another.
func (n *Ensemble) InterconnectAll() *Ensemble {
// connect full nodes to miners.
for _, from := range n.active.fullnodes {
for _, to := range n.active.miners {
// []*TestMiner to []api.CommonAPI type coercion not possible
// so cannot use variadic form.
n.Connect(from, to)
}
}
// connect full nodes between each other, skipping ourselves.
last := len(n.active.fullnodes) - 1
for i, from := range n.active.fullnodes {
if i == last {
continue
}
for _, to := range n.active.fullnodes[i+1:] {
n.Connect(from, to)
}
}
return n
}
// Connect connects one full node to the provided full nodes.
func (n *Ensemble) Connect(from api.Net, to ...api.Net) *Ensemble {
addr, err := from.NetAddrsListen(context.Background())
require.NoError(n.t, err)
for _, other := range to {
err = other.NetConnect(context.Background(), addr)
require.NoError(n.t, err)
}
return n
}
func (n *Ensemble) BeginMiningMustPost(blocktime time.Duration, miners ...*TestMiner) []*BlockMiner {
ctx := context.Background()
// wait one second to make sure that nodes are connected and have handshaken.
// TODO make this deterministic by listening to identify events on the
// libp2p eventbus instead (or something else).
time.Sleep(1 * time.Second)
var bms []*BlockMiner
if len(miners) == 0 {
// no miners have been provided explicitly, instantiate block miners
// for all active miners that aren't still mining.
for _, m := range n.active.miners {
if _, ok := n.active.bms[m]; ok {
continue // skip, already have a block miner
}
miners = append(miners, m)
}
}
if len(miners) > 1 {
n.t.Fatalf("Only one active miner for MustPost, but have %d", len(miners))
}
for _, m := range miners {
bm := NewBlockMiner(n.t, m)
bm.MineBlocksMustPost(ctx, blocktime)
n.t.Cleanup(bm.Stop)
bms = append(bms, bm)
n.active.bms[m] = bm
}
return bms
}
// BeginMining kicks off mining for the specified miners. If nil or 0-length,
// it will kick off mining for all enrolled and active miners. It also adds a
// cleanup function to stop all mining operations on test teardown.
func (n *Ensemble) BeginMining(blocktime time.Duration, miners ...*TestMiner) []*BlockMiner {
ctx := context.Background()
// wait one second to make sure that nodes are connected and have handshaken.
// TODO make this deterministic by listening to identify events on the
// libp2p eventbus instead (or something else).
time.Sleep(1 * time.Second)
var bms []*BlockMiner
if len(miners) == 0 {
// no miners have been provided explicitly, instantiate block miners
// for all active miners that aren't still mining.
for _, m := range n.active.miners {
if _, ok := n.active.bms[m]; ok {
continue // skip, already have a block miner
}
miners = append(miners, m)
}
}
for _, m := range miners {
bm := NewBlockMiner(n.t, m)
bm.MineBlocks(ctx, blocktime)
n.t.Cleanup(bm.Stop)
bms = append(bms, bm)
n.active.bms[m] = bm
}
return bms
}
func (n *Ensemble) generateGenesis() *genesis.Template {
var verifRoot = gen.DefaultVerifregRootkeyActor
if k := n.options.verifiedRoot.key; k != nil {
verifRoot = genesis.Actor{
Type: genesis.TAccount,
Balance: n.options.verifiedRoot.initialBalance,
Meta: (&genesis.AccountMeta{Owner: k.Address}).ActorMeta(),
}
}
templ := &genesis.Template{
NetworkVersion: n.genesis.version,
Accounts: n.genesis.accounts,
Miners: n.genesis.miners,
NetworkName: "test",
Timestamp: uint64(time.Now().Unix() - int64(n.options.pastOffset.Seconds())),
VerifregRootKey: verifRoot,
RemainderAccount: gen.DefaultRemainderAccountActor,
}
return templ
}
func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.MetadataDS) error {
maxSectorID := abi.SectorNumber(0)
for _, sector := range meta.Sectors {
sectorKey := datastore.NewKey(pipeline.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
commD := sector.CommD
commR := sector.CommR
info := &pipeline.SectorInfo{
State: pipeline.Proving,
SectorNumber: sector.SectorID,
Pieces: []api.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(meta.SectorSize),
PieceCID: commD,
},
DealInfo: nil, // todo: likely possible to get, but not really that useful
},
},
CommD: &commD,
CommR: &commR,
}
b, err := cborutil.Dump(info)
if err != nil {
return err
}
if err := mds.Put(ctx, sectorKey, b); err != nil {
return err
}
if sector.SectorID > maxSectorID {
maxSectorID = sector.SectorID
}
}
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID))
return mds.Put(ctx, datastore.NewKey(pipeline.StorageCounterDSPrefix), buf[:size])
}