lotus/lotus-soup/node.go
2020-06-29 18:38:28 +02:00

749 lines
19 KiB
Go

package main
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"os"
"sort"
"strings"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/beacon"
genesis_chain "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/lp2p"
modtest "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/sync"
)
func init() {
logging.SetLogLevel("*", "ERROR")
os.Setenv("BELLMAN_NO_GPU", "1")
build.InsecurePoStValidation = true
build.DisableBuiltinAssets = true
power.ConsensusMinerMinPower = big.NewInt(2048)
saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
abi.RegisteredSealProof_StackedDrg2KiBV1: {},
}
verifreg.MinVerifiedDealSize = big.NewInt(256)
}
var (
PrepareNodeTimeout = time.Minute
genesisTopic = sync.NewTopic("genesis", &GenesisMsg{})
balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
presealTopic = sync.NewTopic("preseal", &PresealMsg{})
clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{})
minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddresses{})
stateReady = sync.State("ready")
stateDone = sync.State("done")
stateStopMining = sync.State("stop-mining")
stateMinerPickSeqNum = sync.State("miner-pick-seq-num")
)
type TestEnvironment struct {
*runtime.RunEnv
*run.InitContext
}
// workaround for default params being wrapped in quote chars
func (t *TestEnvironment) StringParam(name string) string {
return strings.Trim(t.RunEnv.StringParam(name), "\"")
}
func (t *TestEnvironment) DurationParam(name string) time.Duration {
d, err := time.ParseDuration(t.StringParam(name))
if err != nil {
panic(fmt.Errorf("invalid duration value for param '%s': %w", name, err))
}
return d
}
type Node struct {
fullApi api.FullNode
minerApi api.StorageMiner
stop node.StopFunc
MineOne func(context.Context, func(bool)) error
}
type InitialBalanceMsg struct {
Addr address.Address
Balance int
}
type PresealMsg struct {
Miner genesis.Miner
Seqno int64
}
type GenesisMsg struct {
Genesis []byte
Bootstrapper []byte
}
type MinerAddresses struct {
PeerAddr peer.AddrInfo
ActorAddr address.Address
}
func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
clients := t.IntParam("clients")
miners := t.IntParam("miners")
nodes := clients + miners
drandOpt, err := getDrandConfig(ctx, t)
if err != nil {
return nil, err
}
// the first duty of the boostrapper is to construct the genesis block
// first collect all client and miner balances to assign initial funds
balances, err := waitForBalances(t, ctx, nodes)
if err != nil {
return nil, err
}
// then collect all preseals from miners
preseals, err := collectPreseals(t, ctx, miners)
if err != nil {
return nil, err
}
// now construct the genesis block
var genesisActors []genesis.Actor
var genesisMiners []genesis.Miner
for _, bm := range balances {
genesisActors = append(genesisActors,
genesis.Actor{
Type: genesis.TAccount,
Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)),
Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(),
})
}
for _, pm := range preseals {
genesisMiners = append(genesisMiners, pm.Miner)
}
genesisTemplate := genesis.Template{
Accounts: genesisActors,
Miners: genesisMiners,
Timestamp: uint64(time.Now().Unix()) - uint64(t.IntParam("genesis_timestamp_offset")), // this needs to be in the past
}
// dump the genesis block
// var jsonBuf bytes.Buffer
// jsonEnc := json.NewEncoder(&jsonBuf)
// err := jsonEnc.Encode(genesisTemplate)
// if err != nil {
// panic(err)
// }
// runenv.RecordMessage(fmt.Sprintf("Genesis template: %s", string(jsonBuf.Bytes())))
// this is horrendously disgusting, we use this contraption to side effect the construction
// of the genesis block in the buffer -- yes, a side effect of dependency injection.
// I remember when software was straightforward...
var genesisBuffer bytes.Buffer
bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String()
n := &Node{}
stop, err := node.New(context.Background(),
node.FullAPI(&n.fullApi),
node.Online(),
node.Repo(repo.NewMemory(nil)),
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
withListenAddress(bootstrapperIP),
withBootstrapper(nil),
withPubsubConfig(true, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
n.stop = stop
var bootstrapperAddr ma.Multiaddr
bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx)
if err != nil {
stop(context.TODO())
return nil, err
}
for _, a := range bootstrapperAddrs.Addrs {
ip, err := a.ValueForProtocol(ma.P_IP4)
if err != nil {
continue
}
if ip != bootstrapperIP {
continue
}
addrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{
ID: bootstrapperAddrs.ID,
Addrs: []ma.Multiaddr{a},
})
if err != nil {
panic(err)
}
bootstrapperAddr = addrs[0]
break
}
if bootstrapperAddr == nil {
panic("failed to determine bootstrapper address")
}
genesisMsg := &GenesisMsg{
Genesis: genesisBuffer.Bytes(),
Bootstrapper: bootstrapperAddr.Bytes(),
}
t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg)
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
return n, nil
}
func prepareMiner(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := getDrandConfig(ctx, t)
if err != nil {
return nil, err
}
// first create a wallet
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
if err != nil {
return nil, err
}
// publish the account ID/balance
balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
// create and publish the preseal commitment
priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
minerID, err := peer.IDFromPrivateKey(priv)
if err != nil {
return nil, err
}
// pick unique sequence number for each miner, no matter in which group they are
seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners"))
minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1))
if err != nil {
return nil, err
}
presealDir, err := ioutil.TempDir("", "preseal")
if err != nil {
return nil, err
}
sectors := t.IntParam("sectors")
genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo)
if err != nil {
return nil, err
}
genMiner.PeerId = minerID
t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker)
presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq}
t.SyncClient.Publish(ctx, presealTopic, presealMsg)
// then collect the genesis block and bootstrapper address
genesisMsg, err := waitForGenesis(t, ctx)
if err != nil {
return nil, err
}
// prepare the repo
minerRepo := repo.NewMemory(nil)
lr, err := minerRepo.Lock(repo.StorageMiner)
if err != nil {
return nil, err
}
ks, err := lr.KeyStore()
if err != nil {
return nil, err
}
kbytes, err := priv.Bytes()
if err != nil {
return nil, err
}
err = ks.Put("libp2p-host", types.KeyInfo{
Type: "libp2p-host",
PrivateKey: kbytes,
})
if err != nil {
return nil, err
}
ds, err := lr.Datastore("/metadata")
if err != nil {
return nil, err
}
err = ds.Put(datastore.NewKey("miner-address"), minerAddr.Bytes())
if err != nil {
return nil, err
}
nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix))
for i := 0; i < (sectors + 1); i++ {
_, err = nic.Next()
if err != nil {
return nil, err
}
}
err = lr.Close()
if err != nil {
return nil, err
}
minerIP := t.NetClient.MustGetDataNetworkIP().String()
// create the node
// we need both a full node _and_ and storage miner node
n := &Node{}
stop1, err := node.New(context.Background(),
node.FullAPI(&n.fullApi),
node.Online(),
node.Repo(repo.NewMemory(nil)),
withGenesis(genesisMsg.Genesis),
withListenAddress(minerIP),
withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
// set the wallet
err = n.setWallet(ctx, walletKey)
if err != nil {
stop1(context.TODO())
return nil, err
}
mineBlock := make(chan func(bool))
stop2, err := node.New(context.Background(),
node.StorageMiner(&n.minerApi),
node.Online(),
node.Repo(minerRepo),
node.Override(new(api.FullNode), n.fullApi),
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr)),
withMinerListenAddress(minerIP),
)
if err != nil {
stop1(context.TODO())
return nil, err
}
n.stop = func(ctx context.Context) error {
// TODO use a multierror for this
err2 := stop2(ctx)
err1 := stop1(ctx)
if err2 != nil {
return err2
}
return err1
}
remoteAddrs, err := n.fullApi.NetAddrsListen(ctx)
if err != nil {
panic(err)
}
err = n.minerApi.NetConnect(ctx, remoteAddrs)
if err != nil {
panic(err)
}
n.MineOne = func(ctx context.Context, cb func(bool)) error {
select {
case mineBlock <- cb:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// add local storage for presealed sectors
err = n.minerApi.StorageAddLocal(ctx, presealDir)
if err != nil {
n.stop(context.TODO())
return nil, err
}
// set the miner PeerID
minerIDEncoded, err := actors.SerializeParams(&saminer.ChangePeerIDParams{NewID: abi.PeerID(minerID)})
if err != nil {
return nil, err
}
changeMinerID := &types.Message{
To: minerAddr,
From: genMiner.Worker,
Method: builtin.MethodsMiner.ChangePeerID,
Params: minerIDEncoded,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: 1000000,
}
_, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID)
if err != nil {
n.stop(context.TODO())
return nil, err
}
t.RecordMessage("publish our address to the miners addr topic")
actoraddress, err := n.minerApi.ActorAddress(ctx)
if err != nil {
return nil, err
}
addrinfo, err := n.minerApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddresses{addrinfo, actoraddress})
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
return n, err
}
func prepareClient(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := getDrandConfig(ctx, t)
if err != nil {
return nil, err
}
// first create a wallet
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
if err != nil {
return nil, err
}
// publish the account ID/balance
balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
// then collect the genesis block and bootstrapper address
genesisMsg, err := waitForGenesis(t, ctx)
if err != nil {
return nil, err
}
clientIP := t.NetClient.MustGetDataNetworkIP().String()
// create the node
n := &Node{}
stop, err := node.New(context.Background(),
node.FullAPI(&n.fullApi),
node.Online(),
node.Repo(repo.NewMemory(nil)),
withGenesis(genesisMsg.Genesis),
withListenAddress(clientIP),
withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
n.stop = stop
// set the wallet
err = n.setWallet(ctx, walletKey)
if err != nil {
stop(context.TODO())
return nil, err
}
t.RecordMessage("publish our address to the clients addr topic")
addrinfo, err := n.fullApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo)
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
return n, nil
}
func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error {
_, err := n.fullApi.WalletImport(ctx, &walletKey.KeyInfo)
if err != nil {
return err
}
err = n.fullApi.WalletSetDefault(ctx, walletKey.Address)
if err != nil {
return err
}
return nil
}
func withGenesis(gb []byte) node.Option {
return node.Override(new(modules.Genesis), modules.LoadGenesis(gb))
}
func withBootstrapper(ab []byte) node.Option {
return node.Override(new(dtypes.BootstrapPeers),
func() (dtypes.BootstrapPeers, error) {
if ab == nil {
return dtypes.BootstrapPeers{}, nil
}
a, err := ma.NewMultiaddrBytes(ab)
if err != nil {
return nil, err
}
ai, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
return dtypes.BootstrapPeers{*ai}, nil
})
}
func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option {
return node.Override(new(*config.Pubsub), func() *config.Pubsub {
return &config.Pubsub{
Bootstrapper: bootstrapper,
RemoteTracer: pubsubTracer,
}
})
}
func withListenAddress(ip string) node.Option {
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4001", ip)}
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
}
func withMinerListenAddress(ip string) node.Option {
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)}
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
}
func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
ch := make(chan *InitialBalanceMsg)
sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch)
balances := make([]*InitialBalanceMsg, 0, nodes)
for i := 0; i < nodes; i++ {
select {
case m := <-ch:
balances = append(balances, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for balances: %w", err)
}
}
return balances, nil
}
func collectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) {
ch := make(chan *PresealMsg)
sub := t.SyncClient.MustSubscribe(ctx, presealTopic, ch)
preseals := make([]*PresealMsg, 0, miners)
for i := 0; i < miners; i++ {
select {
case m := <-ch:
preseals = append(preseals, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for preseals: %w", err)
}
}
sort.Slice(preseals, func(i, j int) bool {
return preseals[i].Seqno < preseals[j].Seqno
})
return preseals, nil
}
func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) {
genesisCh := make(chan *GenesisMsg)
sub := t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh)
select {
case genesisMsg := <-genesisCh:
return genesisMsg, nil
case err := <-sub.Done():
return nil, fmt.Errorf("error while waiting for genesis msg: %w", err)
}
}
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) {
ch := make(chan MinerAddresses)
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
addrs := make([]MinerAddresses, 0, miners)
for i := 0; i < miners; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err)
}
}
return addrs, nil
}
func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
ch := make(chan peer.AddrInfo)
sub := t.SyncClient.MustSubscribe(ctx, clientsAddrsTopic, ch)
addrs := make([]peer.AddrInfo, 0, clients)
for i := 0; i < clients; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err)
}
}
return addrs, nil
}
func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) {
if !t.BooleanParam("enable_pubsub_tracer") {
return "", nil
}
ch := make(chan *PubsubTracerMsg)
sub := t.SyncClient.MustSubscribe(ctx, pubsubTracerTopic, ch)
select {
case m := <-ch:
return m.Tracer, nil
case err := <-sub.Done():
return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err)
}
}
func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) {
beaconType := t.StringParam("random_beacon_type")
switch beaconType {
case "external-drand":
noop := func(settings *node.Settings) error {
return nil
}
return noop, nil
case "local-drand":
cfg, err := waitForDrandConfig(ctx, t.SyncClient)
if err != nil {
t.RecordMessage("error getting drand config: %w", err)
return nil, err
}
t.RecordMessage("setting drand config: %v", cfg)
return node.Options(
node.Override(new(dtypes.DrandConfig), cfg.Config),
node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap),
), nil
case "mock":
return node.Options(
node.Override(new(beacon.RandomBeacon), modtest.RandomBeacon),
node.Override(new(dtypes.DrandConfig), dtypes.DrandConfig{
ChainInfoJSON: "{\"Hash\":\"wtf\"}",
}),
node.Override(new(dtypes.DrandBootstrap), dtypes.DrandBootstrap{}),
), nil
default:
return nil, fmt.Errorf("unknown random_beacon_type: %s", beaconType)
}
}