package testkit import ( "context" "fmt" "net/http" "os" "sort" "time" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules/dtypes" modtest "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/verifreg" logging "github.com/ipfs/go-log/v2" influxdb "github.com/kpacha/opencensus-influxdb" ma "github.com/multiformats/go-multiaddr" tstats "github.com/filecoin-project/lotus/tools/stats" "github.com/libp2p/go-libp2p-core/peer" manet "github.com/multiformats/go-multiaddr-net" "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) func init() { _ = logging.SetLogLevel("*", "ERROR") _ = os.Setenv("BELLMAN_NO_GPU", "1") build.InsecurePoStValidation = true build.DisableBuiltinAssets = true power.ConsensusMinerMinPower = big.NewInt(2048) saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{ abi.RegisteredSealProof_StackedDrg2KiBV1: {}, } verifreg.MinVerifiedDealSize = big.NewInt(256) } var PrepareNodeTimeout = time.Minute type LotusNode struct { FullApi api.FullNode MinerApi api.StorageMiner StopFn node.StopFunc MineOne func(context.Context, func(bool, error)) error } func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error { _, err := n.FullApi.WalletImport(ctx, &walletKey.KeyInfo) if err != nil { return err } err = n.FullApi.WalletSetDefault(ctx, walletKey.Address) if err != nil { return err } return nil } func WaitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) { ch := make(chan *InitialBalanceMsg) sub := t.SyncClient.MustSubscribe(ctx, BalanceTopic, ch) balances := make([]*InitialBalanceMsg, 0, nodes) for i := 0; i < nodes; i++ { select { case m := <-ch: balances = append(balances, m) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for balances: %w", err) } } return balances, nil } func CollectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) { ch := make(chan *PresealMsg) sub := t.SyncClient.MustSubscribe(ctx, PresealTopic, ch) preseals := make([]*PresealMsg, 0, miners) for i := 0; i < miners; i++ { select { case m := <-ch: preseals = append(preseals, m) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for preseals: %w", err) } } sort.Slice(preseals, func(i, j int) bool { return preseals[i].Seqno < preseals[j].Seqno }) return preseals, nil } func WaitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) { genesisCh := make(chan *GenesisMsg) sub := t.SyncClient.MustSubscribe(ctx, GenesisTopic, genesisCh) select { case genesisMsg := <-genesisCh: return genesisMsg, nil case err := <-sub.Done(): return nil, fmt.Errorf("error while waiting for genesis msg: %w", err) } } func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) { ch := make(chan MinerAddressesMsg) sub := t.SyncClient.MustSubscribe(ctx, MinersAddrsTopic, ch) addrs := make([]MinerAddressesMsg, 0, miners) for i := 0; i < miners; i++ { select { case a := <-ch: addrs = append(addrs, a) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err) } } return addrs, nil } func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) { ch := make(chan peer.AddrInfo) sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch) addrs := make([]peer.AddrInfo, 0, clients) for i := 0; i < clients; i++ { select { case a := <-ch: addrs = append(addrs, a) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err) } } return addrs, nil } func GetPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) { if !t.BooleanParam("enable_pubsub_tracer") { return "", nil } ch := make(chan *PubsubTracerMsg) sub := t.SyncClient.MustSubscribe(ctx, PubsubTracerTopic, ch) select { case m := <-ch: return m.Multiaddr, nil case err := <-sub.Done(): return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err) } } func GetRandomBeaconOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) { beaconType := t.StringParam("random_beacon_type") switch beaconType { case "external-drand": noop := func(settings *node.Settings) error { return nil } return noop, nil case "local-drand": cfg, err := waitForDrandConfig(ctx, t.SyncClient) if err != nil { t.RecordMessage("error getting drand config: %w", err) return nil, err } t.RecordMessage("setting drand config: %v", cfg) return node.Options( node.Override(new(dtypes.DrandConfig), cfg.Config), node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap), ), nil case "mock": return node.Options( node.Override(new(beacon.RandomBeacon), modtest.RandomBeacon), node.Override(new(dtypes.DrandConfig), dtypes.DrandConfig{ ChainInfoJSON: "{\"Hash\":\"wtf\"}", }), node.Override(new(dtypes.DrandBootstrap), dtypes.DrandBootstrap{}), ), nil default: return nil, fmt.Errorf("unknown random_beacon_type: %s", beaconType) } } func startServer(endpoint ma.Multiaddr, srv *http.Server) (listenAddr string, err error) { lst, err := manet.Listen(endpoint) if err != nil { return "", fmt.Errorf("could not listen: %w", err) } go func() { _ = srv.Serve(manet.NetListener(lst)) }() return lst.Addr().String(), nil } func registerAndExportMetrics(instanceName string) { // Register all Lotus metric views err := view.Register(metrics.DefaultViews...) if err != nil { panic(err) } // Set the metric to one so it is published to the exporter stats.Record(context.Background(), metrics.LotusInfo.M(1)) // Register our custom exporter to opencensus e, err := influxdb.NewExporter(context.Background(), influxdb.Options{ Database: "testground", Address: os.Getenv("INFLUXDB_URL"), Username: "", Password: "", InstanceName: instanceName, }) if err != nil { panic(err) } view.RegisterExporter(e) view.SetReportingPeriod(5 * time.Second) } func collectStats(t *TestEnvironment, ctx context.Context, api api.FullNode) error { t.RecordMessage("collecting blockchain stats") influxAddr := os.Getenv("INFLUXDB_URL") influxUser := "" influxPass := "" influxDb := "testground" influx, err := tstats.InfluxClient(influxAddr, influxUser, influxPass) if err != nil { t.RecordMessage(err.Error()) return err } height := int64(0) headlag := 3 go func() { time.Sleep(15 * time.Second) t.RecordMessage("calling tstats.Collect") tstats.Collect(context.Background(), api, influx, influxDb, height, headlag) }() return nil }