lotus/lotus-soup/testkit/node.go

253 lines
6.5 KiB
Go
Raw Normal View History

package testkit
import (
"context"
"fmt"
2020-06-29 12:57:55 +00:00
"net/http"
2020-06-26 14:30:44 +00:00
"os"
"sort"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules/dtypes"
modtest "github.com/filecoin-project/lotus/node/modules/testing"
2020-07-03 18:07:46 +00:00
tstats "github.com/filecoin-project/lotus/tools/stats"
2020-07-03 18:06:10 +00:00
influxdb "github.com/kpacha/opencensus-influxdb"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)
var PrepareNodeTimeout = 3 * time.Minute
type LotusNode struct {
FullApi api.FullNode
MinerApi api.StorageMiner
StopFn node.StopFunc
Wallet *wallet.Key
MineOne func(context.Context, miner.MineReq) error
}
func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error {
_, err := n.FullApi.WalletImport(ctx, &walletKey.KeyInfo)
if err != nil {
return err
}
err = n.FullApi.WalletSetDefault(ctx, walletKey.Address)
if err != nil {
return err
}
n.Wallet = walletKey
return nil
}
func WaitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
2020-06-24 11:10:35 +00:00
ch := make(chan *InitialBalanceMsg)
sub := t.SyncClient.MustSubscribe(ctx, BalanceTopic, ch)
2020-06-24 11:10:35 +00:00
balances := make([]*InitialBalanceMsg, 0, nodes)
for i := 0; i < nodes; i++ {
select {
case m := <-ch:
balances = append(balances, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for balances: %w", err)
}
}
return balances, nil
}
func CollectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) {
2020-06-24 11:10:35 +00:00
ch := make(chan *PresealMsg)
sub := t.SyncClient.MustSubscribe(ctx, PresealTopic, ch)
2020-06-24 11:10:35 +00:00
preseals := make([]*PresealMsg, 0, miners)
for i := 0; i < miners; i++ {
select {
case m := <-ch:
preseals = append(preseals, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for preseals: %w", err)
}
}
2020-06-26 14:30:44 +00:00
sort.Slice(preseals, func(i, j int) bool {
return preseals[i].Seqno < preseals[j].Seqno
})
2020-06-24 11:10:35 +00:00
return preseals, nil
}
func WaitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) {
2020-06-24 11:10:35 +00:00
genesisCh := make(chan *GenesisMsg)
sub := t.SyncClient.MustSubscribe(ctx, GenesisTopic, genesisCh)
2020-06-24 11:10:35 +00:00
select {
case genesisMsg := <-genesisCh:
return genesisMsg, nil
case err := <-sub.Done():
return nil, fmt.Errorf("error while waiting for genesis msg: %w", err)
}
}
func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) {
ch := make(chan MinerAddressesMsg)
sub := t.SyncClient.MustSubscribe(ctx, MinersAddrsTopic, ch)
addrs := make([]MinerAddressesMsg, 0, miners)
for i := 0; i < miners; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err)
}
}
return addrs, nil
}
func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]*ClientAddressesMsg, error) {
ch := make(chan *ClientAddressesMsg)
sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch)
addrs := make([]*ClientAddressesMsg, 0, clients)
for i := 0; i < clients; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err)
}
}
return addrs, nil
}
2020-06-24 14:41:57 +00:00
func GetPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) {
if !t.BooleanParam("enable_pubsub_tracer") {
return "", nil
}
ch := make(chan *PubsubTracerMsg)
sub := t.SyncClient.MustSubscribe(ctx, PubsubTracerTopic, ch)
select {
case m := <-ch:
return m.Multiaddr, nil
case err := <-sub.Done():
return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err)
}
}
func GetRandomBeaconOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) {
beaconType := t.StringParam("random_beacon_type")
switch beaconType {
case "external-drand":
2020-06-24 14:41:57 +00:00
noop := func(settings *node.Settings) error {
return nil
}
return noop, nil
case "local-drand":
cfg, err := waitForDrandConfig(ctx, t.SyncClient)
if err != nil {
t.RecordMessage("error getting drand config: %w", err)
return nil, err
}
t.RecordMessage("setting drand config: %v", cfg)
return node.Options(
node.Override(new(dtypes.DrandConfig), cfg.Config),
node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap),
), nil
case "mock":
return node.Options(
node.Override(new(beacon.RandomBeacon), modtest.RandomBeacon),
node.Override(new(dtypes.DrandConfig), dtypes.DrandConfig{
ChainInfoJSON: "{\"Hash\":\"wtf\"}",
}),
node.Override(new(dtypes.DrandBootstrap), dtypes.DrandBootstrap{}),
), nil
default:
return nil, fmt.Errorf("unknown random_beacon_type: %s", beaconType)
2020-06-24 14:41:57 +00:00
}
}
2020-06-30 11:50:46 +00:00
func startServer(endpoint ma.Multiaddr, srv *http.Server) (listenAddr string, err error) {
2020-06-30 15:53:27 +00:00
lst, err := manet.Listen(endpoint)
if err != nil {
return "", fmt.Errorf("could not listen: %w", err)
2020-06-30 15:53:27 +00:00
}
go func() {
_ = srv.Serve(manet.NetListener(lst))
}()
return lst.Addr().String(), nil
2020-06-30 15:53:27 +00:00
}
func registerAndExportMetrics(instanceName string) {
// Register all Lotus metric views
err := view.Register(metrics.DefaultViews...)
if err != nil {
panic(err)
}
// Set the metric to one so it is published to the exporter
stats.Record(context.Background(), metrics.LotusInfo.M(1))
// Register our custom exporter to opencensus
e, err := influxdb.NewExporter(context.Background(), influxdb.Options{
Database: "testground",
Address: os.Getenv("INFLUXDB_URL"),
Username: "",
Password: "",
InstanceName: instanceName,
})
if err != nil {
panic(err)
}
view.RegisterExporter(e)
view.SetReportingPeriod(5 * time.Second)
}
2020-07-03 18:06:10 +00:00
2020-07-06 16:24:11 +00:00
func collectStats(t *TestEnvironment, ctx context.Context, api api.FullNode) error {
t.RecordMessage("collecting blockchain stats")
2020-07-03 18:06:10 +00:00
influxAddr := os.Getenv("INFLUXDB_URL")
influxUser := ""
influxPass := ""
2020-07-06 16:24:11 +00:00
influxDb := "testground"
2020-07-03 18:06:10 +00:00
2020-07-03 18:07:46 +00:00
influx, err := tstats.InfluxClient(influxAddr, influxUser, influxPass)
2020-07-03 18:06:10 +00:00
if err != nil {
2020-07-06 16:24:11 +00:00
t.RecordMessage(err.Error())
2020-07-03 18:06:10 +00:00
return err
}
2020-07-06 16:24:11 +00:00
height := int64(0)
headlag := 1
2020-07-03 18:06:10 +00:00
2020-07-03 18:07:46 +00:00
go func() {
time.Sleep(15 * time.Second)
2020-07-06 16:24:11 +00:00
t.RecordMessage("calling tstats.Collect")
tstats.Collect(context.Background(), api, influx, influxDb, height, headlag)
2020-07-03 18:07:46 +00:00
}()
2020-07-03 18:06:10 +00:00
return nil
}