package testkit import ( "context" "fmt" "net/http" "os" "sort" "time" influxdb "github.com/kpacha/opencensus-influxdb" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules/dtypes" modtest "github.com/filecoin-project/lotus/node/modules/testing" tinflux "github.com/filecoin-project/lotus/tools/stats/influx" tipldstore "github.com/filecoin-project/lotus/tools/stats/ipldstore" tpoints "github.com/filecoin-project/lotus/tools/stats/points" tsync "github.com/filecoin-project/lotus/tools/stats/sync" ) var PrepareNodeTimeout = 3 * time.Minute type LotusNode struct { FullApi api.FullNode MinerApi api.StorageMiner StopFn node.StopFunc Wallet *wallet.Key MineOne func(context.Context, miner.MineReq) error } func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error { _, err := n.FullApi.WalletImport(ctx, &walletKey.KeyInfo) if err != nil { return err } err = n.FullApi.WalletSetDefault(ctx, walletKey.Address) if err != nil { return err } n.Wallet = walletKey return nil } func WaitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) { ch := make(chan *InitialBalanceMsg) sub := t.SyncClient.MustSubscribe(ctx, BalanceTopic, ch) balances := make([]*InitialBalanceMsg, 0, nodes) for i := 0; i < nodes; i++ { select { case m := <-ch: balances = append(balances, m) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for balances: %w", err) } } return balances, nil } func CollectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) { ch := make(chan *PresealMsg) sub := t.SyncClient.MustSubscribe(ctx, PresealTopic, ch) preseals := make([]*PresealMsg, 0, miners) for i := 0; i < miners; i++ { select { case m := <-ch: preseals = append(preseals, m) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for preseals: %w", err) } } sort.Slice(preseals, func(i, j int) bool { return preseals[i].Seqno < preseals[j].Seqno }) return preseals, nil } func WaitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) { genesisCh := make(chan *GenesisMsg) sub := t.SyncClient.MustSubscribe(ctx, GenesisTopic, genesisCh) select { case genesisMsg := <-genesisCh: return genesisMsg, nil case err := <-sub.Done(): return nil, fmt.Errorf("error while waiting for genesis msg: %w", err) } } func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) { ch := make(chan MinerAddressesMsg) sub := t.SyncClient.MustSubscribe(ctx, MinersAddrsTopic, ch) addrs := make([]MinerAddressesMsg, 0, miners) for i := 0; i < miners; i++ { select { case a := <-ch: addrs = append(addrs, a) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err) } } return addrs, nil } func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]*ClientAddressesMsg, error) { ch := make(chan *ClientAddressesMsg) sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch) addrs := make([]*ClientAddressesMsg, 0, clients) for i := 0; i < clients; i++ { select { case a := <-ch: addrs = append(addrs, a) case err := <-sub.Done(): return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err) } } return addrs, nil } func GetPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) { if !t.BooleanParam("enable_pubsub_tracer") { return "", nil } ch := make(chan *PubsubTracerMsg) sub := t.SyncClient.MustSubscribe(ctx, PubsubTracerTopic, ch) select { case m := <-ch: return m.Multiaddr, nil case err := <-sub.Done(): return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err) } } func GetRandomBeaconOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) { beaconType := t.StringParam("random_beacon_type") switch beaconType { case "external-drand": noop := func(settings *node.Settings) error { return nil } return noop, nil case "local-drand": cfg, err := waitForDrandConfig(ctx, t.SyncClient) if err != nil { t.RecordMessage("error getting drand config: %w", err) return nil, err } t.RecordMessage("setting drand config: %v", cfg) return node.Options( node.Override(new(dtypes.DrandConfig), cfg.Config), node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap), ), nil case "mock": return node.Options( node.Override(new(beacon.RandomBeacon), modtest.RandomBeacon), node.Override(new(dtypes.DrandConfig), dtypes.DrandConfig{ ChainInfoJSON: "{\"Hash\":\"wtf\"}", }), node.Override(new(dtypes.DrandBootstrap), dtypes.DrandBootstrap{}), ), nil default: return nil, fmt.Errorf("unknown random_beacon_type: %s", beaconType) } } func startServer(endpoint ma.Multiaddr, srv *http.Server) (listenAddr string, err error) { lst, err := manet.Listen(endpoint) if err != nil { return "", fmt.Errorf("could not listen: %w", err) } go func() { _ = srv.Serve(manet.NetListener(lst)) }() return lst.Addr().String(), nil } func registerAndExportMetrics(instanceName string) { // Register all Lotus metric views err := view.Register(metrics.DefaultViews...) if err != nil { panic(err) } // Set the metric to one so it is published to the exporter stats.Record(context.Background(), metrics.LotusInfo.M(1)) // Register our custom exporter to opencensus e, err := influxdb.NewExporter(context.Background(), influxdb.Options{ Database: "testground", Address: os.Getenv("INFLUXDB_URL"), Username: "", Password: "", InstanceName: instanceName, }) if err != nil { panic(err) } view.RegisterExporter(e) view.SetReportingPeriod(5 * time.Second) } func collectStats(t *TestEnvironment, ctx context.Context, api api.FullNode) error { t.RecordMessage("collecting blockchain stats") influxAddr := os.Getenv("INFLUXDB_URL") influxUser := "" influxPass := "" influxDb := "testground" influxClient, err := tinflux.NewClient(influxAddr, influxUser, influxPass) if err != nil { t.RecordMessage(err.Error()) return err } height := int64(0) headlag := 1 go func() { time.Sleep(15 * time.Second) t.RecordMessage("calling tstats.Collect") store, err := tipldstore.NewApiIpldStore(ctx, api, 1024) if err != nil { t.RecordMessage(err.Error()) return } collector, err := tpoints.NewChainPointCollector(ctx, store, api) if err != nil { t.RecordMessage(err.Error()) return } tipsets, err := tsync.BufferedTipsetChannel(ctx, api, abi.ChainEpoch(height), headlag) if err != nil { t.RecordMessage(err.Error()) return } wq := tinflux.NewWriteQueue(ctx, influxClient) defer wq.Close() for tipset := range tipsets { if nb, err := collector.Collect(ctx, tipset); err != nil { t.RecordMessage(err.Error()) return } else { nb.SetDatabase(influxDb) wq.AddBatch(nb) } } }() return nil }