lotus/itests/kit/ensemble.go

718 lines
19 KiB
Go

package kit
import (
"bytes"
"context"
"crypto/rand"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/gen"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/genesis"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/mockstorage"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
power2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/power"
"github.com/ipfs/go-datastore"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
)
func init() {
chain.BootstrapPeerThreshold = 1
messagepool.HeadChangeCoalesceMinDelay = time.Microsecond
messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond
messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond
}
type BuilderOpt func(opts *BuilderOpts) error
type BuilderOpts struct {
pastOffset time.Duration
spt abi.RegisteredSealProof
}
var DefaultBuilderOpts = BuilderOpts{
pastOffset: 10000 * time.Second,
spt: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
func ProofType(proofType abi.RegisteredSealProof) BuilderOpt {
return func(opts *BuilderOpts) error {
opts.spt = proofType
return nil
}
}
// Ensemble is a collection of nodes instantiated within a test. Ensemble
// supports building full nodes and miners.
type Ensemble struct {
t *testing.T
bootstrapped bool
genesisBlock bytes.Buffer
mn mocknet.Mocknet
options *BuilderOpts
inactive struct {
fullnodes []*TestFullNode
miners []*TestMiner
}
active struct {
fullnodes []*TestFullNode
miners []*TestMiner
}
genesis struct {
miners []genesis.Miner
accounts []genesis.Actor
}
}
// NewEnsemble
func NewEnsemble(t *testing.T, opts ...BuilderOpt) *Ensemble {
options := DefaultBuilderOpts
for _, o := range opts {
err := o(&options)
require.NoError(t, err)
}
return &Ensemble{t: t, options: &options}
}
type NodeOpts struct {
balance abi.TokenAmount
lite bool
sectors int
mockProofs bool
rpc bool
ownerKey *wallet.Key
}
var DefaultNodeOpts = NodeOpts{
balance: big.Mul(big.NewInt(100000000), types.NewInt(build.FilecoinPrecision)),
sectors: 2,
}
type NodeOpt func(opts *NodeOpts) error
// OwnerBalance specifies the balance to be attributed to a miner's owner account.
//
// Only used when creating a miner.
func OwnerBalance(balance abi.TokenAmount) NodeOpt {
return func(opts *NodeOpts) error {
opts.balance = balance
return nil
}
}
// LiteNode specifies that this node will be a lite node.
//
// Only used when creating a fullnode.
func LiteNode() NodeOpt {
return func(opts *NodeOpts) error {
opts.lite = true
return nil
}
}
// PresealSectors specifies the amount of preseal sectors to give to a miner
// at genesis.
//
// Only used when creating a miner.
func PresealSectors(sectors int) NodeOpt {
return func(opts *NodeOpts) error {
opts.sectors = sectors
return nil
}
}
// MockProofs activates mock proofs for the entire ensemble.
func MockProofs() NodeOpt {
return func(opts *NodeOpts) error {
opts.mockProofs = true
return nil
}
}
func ThroughRPC() NodeOpt {
return func(opts *NodeOpts) error {
opts.rpc = true
return nil
}
}
func OwnerAddr(wk *wallet.Key) NodeOpt {
return func(opts *NodeOpts) error {
opts.ownerKey = wk
return nil
}
}
// FullNode enrolls a new full node.
func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble {
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
var key *wallet.Key
if !n.bootstrapped && !options.balance.IsZero() {
// create a key+ddress, and assign it some FIL.
// this will be set as the default wallet.
var err error
key, err = wallet.GenerateKey(types.KTBLS)
require.NoError(n.t, err)
genacc := genesis.Actor{
Type: genesis.TAccount,
Balance: options.balance,
Meta: (&genesis.AccountMeta{Owner: key.Address}).ActorMeta(),
}
n.genesis.accounts = append(n.genesis.accounts, genacc)
}
*full = TestFullNode{options: options, DefaultKey: key}
n.inactive.fullnodes = append(n.inactive.fullnodes, full)
return n
}
// Miner enrolls a new miner, using the provided full node for chain
// interactions.
func (n *Ensemble) Miner(miner *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble {
require.NotNil(n.t, full, "full node required when instantiating miner")
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader)
require.NoError(n.t, err)
peerId, err := peer.IDFromPrivateKey(privkey)
require.NoError(n.t, err)
tdir, err := ioutil.TempDir("", "preseal-memgen")
require.NoError(n.t, err)
minerCnt := len(n.inactive.miners) + len(n.active.miners)
actorAddr, err := address.NewIDAddress(genesis2.MinerStart + uint64(minerCnt))
require.NoError(n.t, err)
ownerKey := options.ownerKey
if !n.bootstrapped {
var (
sectors = options.sectors
k *types.KeyInfo
genm *genesis.Miner
)
// create the preseal commitment.
if options.mockProofs {
genm, k, err = mockstorage.PreSeal(abi.RegisteredSealProof_StackedDrg2KiBV1, actorAddr, sectors)
} else {
genm, k, err = seed.PreSeal(actorAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, tdir, []byte("make genesis mem random"), nil, true)
}
require.NoError(n.t, err)
genm.PeerId = peerId
// create an owner key, and assign it some FIL.
ownerKey, err = wallet.NewKey(*k)
require.NoError(n.t, err)
genacc := genesis.Actor{
Type: genesis.TAccount,
Balance: options.balance,
Meta: (&genesis.AccountMeta{Owner: ownerKey.Address}).ActorMeta(),
}
n.genesis.miners = append(n.genesis.miners, *genm)
n.genesis.accounts = append(n.genesis.accounts, genacc)
} else {
require.NotNil(n.t, ownerKey, "worker key can't be null if initializing a miner after genesis")
}
*miner = TestMiner{
ActorAddr: actorAddr,
OwnerKey: ownerKey,
FullNode: full,
PresealDir: tdir,
options: options,
}
miner.Libp2p.PeerID = peerId
miner.Libp2p.PrivKey = privkey
n.inactive.miners = append(n.inactive.miners, miner)
return n
}
// Start starts all enrolled nodes.
func (n *Ensemble) Start() *Ensemble {
ctx, cancel := context.WithCancel(context.Background())
n.t.Cleanup(cancel)
var gtempl *genesis.Template
if !n.bootstrapped {
// We haven't been bootstrapped yet, we need to generate genesis and
// create the networking backbone.
gtempl = n.generateGenesis()
n.mn = mocknet.New(ctx)
}
// Create all inactive full nodes.
for i, full := range n.inactive.fullnodes {
opts := []node.Option{
node.FullAPI(&full.FullNode, node.Lite(full.options.lite)),
node.Online(),
node.Repo(repo.NewMemory(nil)),
node.MockHost(n.mn),
node.Test(),
// so that we subscribe to pubsub topics immediately
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
}
// Either generate the genesis or inject it.
if i == 0 && !n.bootstrapped {
opts = append(opts, node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&n.genesisBlock, *gtempl)))
} else {
opts = append(opts, node.Override(new(modules.Genesis), modules.LoadGenesis(n.genesisBlock.Bytes())))
}
// Are we mocking proofs?
if full.options.mockProofs {
opts = append(opts, node.Override(new(ffiwrapper.Verifier), mock.MockVerifier))
}
// Construct the full node.
stop, err := node.New(ctx, opts...)
// fullOpts[i].Opts(fulls),
require.NoError(n.t, err)
addr, err := full.WalletImport(context.Background(), &full.DefaultKey.KeyInfo)
require.NoError(n.t, err)
err = full.WalletSetDefault(context.Background(), addr)
require.NoError(n.t, err)
// Are we hitting this node through its RPC?
if full.options.rpc {
withRPC := fullRpc(n.t, full)
n.inactive.fullnodes[i] = withRPC
}
n.t.Cleanup(func() { _ = stop(context.Background()) })
n.active.fullnodes = append(n.active.fullnodes, full)
}
// If we are here, we have processed all inactive fullnodes and moved them
// to active, so clear the slice.
n.inactive.fullnodes = n.inactive.fullnodes[:0]
// Link all the nodes.
err := n.mn.LinkAll()
require.NoError(n.t, err)
// Create all inactive miners.
for i, m := range n.inactive.miners {
if n.bootstrapped {
// this is a miner created after genesis, so it won't have a preseal.
// we need to create it on chain.
params, aerr := actors.SerializeParams(&power2.CreateMinerParams{
Owner: m.OwnerKey.Address,
Worker: m.OwnerKey.Address,
SealProofType: n.options.spt,
Peer: abi.PeerID(m.Libp2p.PeerID),
})
require.NoError(n.t, aerr)
createStorageMinerMsg := &types.Message{
From: m.OwnerKey.Address,
To: power.Address,
Value: big.Zero(),
Method: power.Methods.CreateMiner,
Params: params,
GasLimit: 0,
GasPremium: big.NewInt(5252),
}
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
require.NoError(n.t, err)
mw, err := m.FullNode.FullNode.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
require.NoError(n.t, err)
require.Equal(n.t, exitcode.Ok, mw.Receipt.ExitCode)
var retval power2.CreateMinerReturn
err = retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return))
require.NoError(n.t, err, "failed to create miner")
m.ActorAddr = retval.IDAddress
}
has, err := m.FullNode.WalletHas(ctx, m.OwnerKey.Address)
require.NoError(n.t, err)
// Only import the owner's full key into our companion full node, if we
// don't have it still.
if !has {
_, err = m.FullNode.WalletImport(ctx, &m.OwnerKey.KeyInfo)
require.NoError(n.t, err)
}
// // Set it as the default address.
// err = m.FullNode.WalletSetDefault(ctx, m.OwnerAddr.Address)
// require.NoError(n.t, err)
r := repo.NewMemory(nil)
lr, err := r.Lock(repo.StorageMiner)
require.NoError(n.t, err)
ks, err := lr.KeyStore()
require.NoError(n.t, err)
pk, err := m.Libp2p.PrivKey.Bytes()
require.NoError(n.t, err)
err = ks.Put("libp2p-host", types.KeyInfo{
Type: "libp2p-host",
PrivateKey: pk,
})
require.NoError(n.t, err)
ds, err := lr.Datastore(context.TODO(), "/metadata")
require.NoError(n.t, err)
err = ds.Put(datastore.NewKey("miner-address"), m.ActorAddr.Bytes())
require.NoError(n.t, err)
nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix))
for i := 0; i < m.options.sectors; i++ {
_, err := nic.Next()
require.NoError(n.t, err)
}
_, err = nic.Next()
require.NoError(n.t, err)
err = lr.Close()
require.NoError(n.t, err)
enc, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(m.Libp2p.PeerID)})
require.NoError(n.t, err)
msg := &types.Message{
From: m.OwnerKey.Address,
To: m.ActorAddr,
Method: miner.Methods.ChangePeerID,
Params: enc,
Value: types.NewInt(0),
}
_, err = m.FullNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(n.t, err)
var mineBlock = make(chan lotusminer.MineReq)
opts := []node.Option{
node.StorageMiner(&m.StorageMiner),
node.Online(),
node.Repo(r),
node.Test(),
node.MockHost(n.mn),
node.Override(new(v1api.FullNode), m.FullNode),
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
}
idAddr, err := address.IDFromAddress(m.ActorAddr)
require.NoError(n.t, err)
if !n.bootstrapped && m.options.mockProofs {
s := n.genesis.miners[i].Sectors
sectors := make([]abi.SectorID, len(s))
for i, sector := range s {
sectors[i] = abi.SectorID{
Miner: abi.ActorID(idAddr),
Number: sector.SectorID,
}
}
opts = append(opts,
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
return mock.NewMockSectorMgr(sectors), nil
}),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Unset(new(*sectorstorage.Manager)),
)
}
// start node
stop, err := node.New(ctx, opts...)
require.NoError(n.t, err)
// using real proofs, therefore need real sectors.
if !n.bootstrapped && !m.options.mockProofs {
err := m.StorageAddLocal(ctx, m.PresealDir)
require.NoError(n.t, err)
}
n.t.Cleanup(func() { _ = stop(context.Background()) })
// Are we hitting this node through its RPC?
if m.options.rpc {
withRPC := minerRpc(n.t, m)
n.inactive.miners[i] = withRPC
}
mineOne := func(ctx context.Context, req lotusminer.MineReq) error {
select {
case mineBlock <- req:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
m.MineOne = mineOne
m.Stop = stop
n.active.miners = append(n.active.miners, m)
}
// If we are here, we have processed all inactive miners and moved them
// to active, so clear the slice.
n.inactive.miners = n.inactive.miners[:0]
// Link all the nodes.
err = n.mn.LinkAll()
require.NoError(n.t, err)
if !n.bootstrapped && len(n.active.miners) > 0 {
// We have *just* bootstrapped, so
// mine 2 blocks to setup some CE stuff
// in some actors
var wait sync.Mutex
wait.Lock()
observer := n.active.fullnodes[0]
bm := NewBlockMiner(n.t, n.active.miners[0])
n.t.Cleanup(bm.Stop)
bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) {
wait.Unlock()
})
wait.Lock()
bm.MineUntilBlock(ctx, observer, func(epoch abi.ChainEpoch) {
wait.Unlock()
})
wait.Lock()
}
n.bootstrapped = true
return n
}
// InterconnectAll connects all full nodes one to another. We do not need to
// take action with miners, because miners only stay connected to their full
// nodes over JSON-RPC.
func (n *Ensemble) InterconnectAll() *Ensemble {
last := len(n.active.fullnodes) - 1
for i, from := range n.active.fullnodes {
if i == last {
continue
}
n.Connect(from, n.active.fullnodes[i+1:]...)
}
return n
}
// Connect connects one full node to the provided full nodes.
func (n *Ensemble) Connect(from *TestFullNode, to ...*TestFullNode) *Ensemble {
addr, err := from.NetAddrsListen(context.Background())
require.NoError(n.t, err)
for _, other := range to {
err = other.NetConnect(context.Background(), addr)
require.NoError(n.t, err)
}
return n
}
// BeginMining kicks off mining for the specified miners. If nil or 0-length,
// it will kick off mining for all enrolled and active miners.
func (n *Ensemble) BeginMining(blocktime time.Duration, miners ...*TestMiner) []*BlockMiner {
ctx := context.Background()
// wait one second to make sure that nodes are connected and have handshaken.
// TODO make this deterministic by listening to identify events on the
// libp2p eventbus instead (or something else).
time.Sleep(1 * time.Second)
var bms []*BlockMiner
if len(miners) == 0 {
miners = n.active.miners
}
for _, m := range miners {
bm := NewBlockMiner(n.t, m)
bm.MineBlocks(ctx, blocktime)
bms = append(bms, bm)
}
return bms
}
func (n *Ensemble) generateGenesis() *genesis.Template {
templ := &genesis.Template{
Accounts: n.genesis.accounts,
Miners: n.genesis.miners,
NetworkName: "test",
Timestamp: uint64(time.Now().Unix() - 10000), // some time sufficiently far in the past
VerifregRootKey: gen.DefaultVerifregRootkeyActor,
RemainderAccount: gen.DefaultRemainderAccountActor,
}
return templ
}
func CreateRPCServer(t *testing.T, handler http.Handler) (*httptest.Server, multiaddr.Multiaddr) {
testServ := httptest.NewServer(handler)
t.Cleanup(testServ.Close)
t.Cleanup(testServ.CloseClientConnections)
addr := testServ.Listener.Addr()
maddr, err := manet.FromNetAddr(addr)
require.NoError(t, err)
return testServ, maddr
}
func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
handler, err := node.FullNodeHandler(f.FullNode, false)
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler)
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
t.Cleanup(stop)
f.ListenAddr, f.FullNode = maddr, cl
return f
}
func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
handler, err := node.MinerHandler(m.StorageMiner, false)
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler)
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v0", nil)
require.NoError(t, err)
t.Cleanup(stop)
m.ListenAddr, m.StorageMiner = maddr, cl
return m
}
func LatestActorsAt(upgradeHeight abi.ChainEpoch) node.Option {
// Attention: Update this when introducing new actor versions or your tests will be sad
return NetworkUpgradeAt(network.Version13, upgradeHeight)
}
func NetworkUpgradeAt(version network.Version, upgradeHeight abi.ChainEpoch) node.Option {
fullSchedule := stmgr.UpgradeSchedule{{
// prepare for upgrade.
Network: network.Version9,
Height: 1,
Migration: stmgr.UpgradeActorsV2,
}, {
Network: network.Version10,
Height: 2,
Migration: stmgr.UpgradeActorsV3,
}, {
Network: network.Version12,
Height: 3,
Migration: stmgr.UpgradeActorsV4,
}, {
Network: network.Version13,
Height: 4,
Migration: stmgr.UpgradeActorsV5,
}}
schedule := stmgr.UpgradeSchedule{}
for _, upgrade := range fullSchedule {
if upgrade.Network > version {
break
}
schedule = append(schedule, upgrade)
}
if upgradeHeight > 0 {
schedule[len(schedule)-1].Height = upgradeHeight
}
return node.Override(new(stmgr.UpgradeSchedule), schedule)
}
func SDRUpgradeAt(calico, persian abi.ChainEpoch) node.Option {
return node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{
Network: network.Version6,
Height: 1,
Migration: stmgr.UpgradeActorsV2,
}, {
Network: network.Version7,
Height: calico,
Migration: stmgr.UpgradeCalico,
}, {
Network: network.Version8,
Height: persian,
}})
}