diff --git a/lotus-soup/manifest.toml b/lotus-soup/manifest.toml index 14531c6b5..65de6d569 100644 --- a/lotus-soup/manifest.toml +++ b/lotus-soup/manifest.toml @@ -7,7 +7,7 @@ runner = "local:docker" [builders."docker:go"] enabled = true -build_base_image = "iptestground/oni-buildbase:v3" +build_base_image = "iptestground/oni-buildbase:v4" runtime_image = "iptestground/oni-runtime:v2-debug" [runners."local:docker"] diff --git a/lotus-soup/stats/head_buffer.go b/lotus-soup/stats/head_buffer.go deleted file mode 100644 index 1c3bf9777..000000000 --- a/lotus-soup/stats/head_buffer.go +++ /dev/null @@ -1,47 +0,0 @@ -package stats - -import ( - "container/list" - - "github.com/filecoin-project/lotus/api" -) - -type headBuffer struct { - buffer *list.List - size int -} - -func NewHeadBuffer(size int) *headBuffer { - buffer := list.New() - buffer.Init() - - return &headBuffer{ - buffer: buffer, - size: size, - } -} - -func (h *headBuffer) Push(hc *api.HeadChange) (rethc *api.HeadChange) { - if h.buffer.Len() == h.size { - var ok bool - - el := h.buffer.Front() - rethc, ok = el.Value.(*api.HeadChange) - if !ok { - panic("Value from list is not the correct type") - } - - h.buffer.Remove(el) - } - - h.buffer.PushBack(hc) - - return -} - -func (h *headBuffer) Pop() { - el := h.buffer.Back() - if el != nil { - h.buffer.Remove(el) - } -} diff --git a/lotus-soup/stats/metrics.go b/lotus-soup/stats/metrics.go deleted file mode 100644 index e34ca3b80..000000000 --- a/lotus-soup/stats/metrics.go +++ /dev/null @@ -1,360 +0,0 @@ -package stats - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "math/big" - "strings" - "time" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/power" - "github.com/filecoin-project/specs-actors/actors/util/adt" - - "github.com/ipfs/go-cid" - "github.com/multiformats/go-multihash" - - cbg "github.com/whyrusleeping/cbor-gen" - - _ "github.com/influxdata/influxdb1-client" - models "github.com/influxdata/influxdb1-client/models" - client "github.com/influxdata/influxdb1-client/v2" -) - -type PointList struct { - points []models.Point -} - -func NewPointList() *PointList { - return &PointList{} -} - -func (pl *PointList) AddPoint(p models.Point) { - pl.points = append(pl.points, p) -} - -func (pl *PointList) Points() []models.Point { - return pl.points -} - -type InfluxWriteQueue struct { - ch chan client.BatchPoints -} - -func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWriteQueue { - ch := make(chan client.BatchPoints, 128) - - maxRetries := 10 - - go func() { - main: - for { - select { - case <-ctx.Done(): - return - case batch := <-ch: - for i := 0; i < maxRetries; i++ { - if err := influx.Write(batch); err != nil { - log.Warnw("Failed to write batch", "error", err) - time.Sleep(time.Second * 15) - continue - } - - continue main - } - - log.Error("Dropping batch due to failure to write") - } - } - }() - - return &InfluxWriteQueue{ - ch: ch, - } -} - -func (i *InfluxWriteQueue) AddBatch(bp client.BatchPoints) { - i.ch <- bp -} - -func (i *InfluxWriteQueue) Close() { - close(i.ch) -} - -func InfluxClient(addr, user, pass string) (client.Client, error) { - return client.NewHTTPClient(client.HTTPConfig{ - Addr: addr, - Username: user, - Password: pass, - }) -} - -func InfluxNewBatch() (client.BatchPoints, error) { - return client.NewBatchPoints(client.BatchPointsConfig{}) -} - -func NewPoint(name string, value interface{}) models.Point { - pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now()) - return pt -} - -func NewPointFrom(p models.Point) *client.Point { - return client.NewPointFrom(p) -} - -func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - cids := []string{} - for _, cid := range tipset.Cids() { - cids = append(cids, cid.String()) - } - - p := NewPoint("chain.height", int64(tipset.Height())) - p.AddTag("tipset", strings.Join(cids, " ")) - pl.AddPoint(p) - - p = NewPoint("chain.block_count", len(cids)) - pl.AddPoint(p) - - tsTime := time.Unix(int64(tipset.MinTimestamp()), int64(0)) - p = NewPoint("chain.blocktime", tsTime.Unix()) - pl.AddPoint(p) - - for _, blockheader := range tipset.Blocks() { - bs, err := blockheader.Serialize() - if err != nil { - return err - } - p := NewPoint("chain.election", 1) - p.AddTag("miner", blockheader.Miner.String()) - pl.AddPoint(p) - - p = NewPoint("chain.blockheader_size", len(bs)) - pl.AddPoint(p) - } - - return nil -} - -type apiIpldStore struct { - ctx context.Context - api api.FullNode -} - -func (ht *apiIpldStore) Context() context.Context { - return ht.ctx -} - -func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { - raw, err := ht.api.ChainReadObj(ctx, c) - if err != nil { - return err - } - - cu, ok := out.(cbg.CBORUnmarshaler) - if ok { - if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil { - return err - } - return nil - } - - return fmt.Errorf("Object does not implement CBORUnmarshaler") -} - -func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { - return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") -} - -func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - //pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) - //if err != nil { - //return err - //} - - attoFil := types.NewInt(build.FilecoinPrecision).Int - - //pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) - //pcFilFloat, _ := pcFil.Float64() - //p := NewPoint("chain.pledge_collateral", pcFilFloat) - //pl.AddPoint(p) - - netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr) - if err != nil { - return err - } - - netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil) - netBalFilFloat, _ := netBalFil.Float64() - p := NewPoint("network.balance", netBalFilFloat) - pl.AddPoint(p) - - totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key()) - if err != nil { - return err - } - - p = NewPoint("chain.power", totalPower.TotalPower.QualityAdjPower.Int64()) - pl.AddPoint(p) - - powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset.Key()) - if err != nil { - return err - } - - powerRaw, err := api.ChainReadObj(ctx, powerActor.Head) - if err != nil { - return err - } - - var powerActorState power.State - - if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil { - return fmt.Errorf("failed to unmarshal power actor state: %w", err) - } - - s := &apiIpldStore{ctx, api} - mp, err := adt.AsMap(s, powerActorState.Claims) - if err != nil { - return err - } - - err = mp.ForEach(nil, func(key string) error { - addr, err := address.NewFromBytes([]byte(key)) - if err != nil { - return err - } - - var claim power.Claim - keyerAddr := adt.AddrKey(addr) - mp.Get(keyerAddr, &claim) - - if claim.QualityAdjPower.Int64() == 0 { - return nil - } - - p = NewPoint("chain.miner_power", claim.QualityAdjPower.Int64()) - p.AddTag("miner", addr.String()) - pl.AddPoint(p) - - return nil - }) - if err != nil { - return err - } - - return nil -} - -type msgTag struct { - actor string - method uint64 - exitcode uint8 -} - -func RecordTipsetMessagesPoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - cids := tipset.Cids() - if len(cids) == 0 { - return fmt.Errorf("no cids in tipset") - } - - msgs, err := api.ChainGetParentMessages(ctx, cids[0]) - if err != nil { - return err - } - - recp, err := api.ChainGetParentReceipts(ctx, cids[0]) - if err != nil { - return err - } - - msgn := make(map[msgTag][]cid.Cid) - - for i, msg := range msgs { - p := NewPoint("chain.message_gasprice", msg.Message.GasPrice.Int64()) - pl.AddPoint(p) - - bs, err := msg.Message.Serialize() - if err != nil { - return err - } - - p = NewPoint("chain.message_size", len(bs)) - pl.AddPoint(p) - - actor, err := api.StateGetActor(ctx, msg.Message.To, tipset.Key()) - if err != nil { - return err - } - - dm, err := multihash.Decode(actor.Code.Hash()) - if err != nil { - continue - } - tag := msgTag{ - actor: string(dm.Digest), - method: uint64(msg.Message.Method), - exitcode: uint8(recp[i].ExitCode), - } - - found := false - for _, c := range msgn[tag] { - if c.Equals(msg.Cid) { - found = true - break - } - } - if !found { - msgn[tag] = append(msgn[tag], msg.Cid) - } - } - - for t, m := range msgn { - p := NewPoint("chain.message_count", len(m)) - p.AddTag("actor", t.actor) - p.AddTag("method", fmt.Sprintf("%d", t.method)) - p.AddTag("exitcode", fmt.Sprintf("%d", t.exitcode)) - pl.AddPoint(p) - - } - - return nil -} - -func ResetDatabase(influx client.Client, database string) error { - log.Info("Resetting database") - q := client.NewQuery(fmt.Sprintf(`DROP DATABASE "%s"; CREATE DATABASE "%s";`, database, database), "", "") - _, err := influx.Query(q) - return err -} - -func GetLastRecordedHeight(influx client.Client, database string) (int64, error) { - log.Info("Retrieving last record height") - q := client.NewQuery(`SELECT "value" FROM "chain.height" ORDER BY time DESC LIMIT 1`, database, "") - res, err := influx.Query(q) - if err != nil { - return 0, err - } - - if len(res.Results) == 0 { - return 0, fmt.Errorf("No results found for last recorded height") - } - - if len(res.Results[0].Series) == 0 { - return 0, fmt.Errorf("No results found for last recorded height") - } - - height, err := (res.Results[0].Series[0].Values[0][1].(json.Number)).Int64() - if err != nil { - return 0, err - } - - log.Infow("Last record height", "height", height) - - return height, nil -} diff --git a/lotus-soup/stats/rpc.go b/lotus-soup/stats/rpc.go deleted file mode 100644 index b95843c0f..000000000 --- a/lotus-soup/stats/rpc.go +++ /dev/null @@ -1,105 +0,0 @@ -package stats - -import ( - "context" - "time" - - "github.com/filecoin-project/specs-actors/actors/abi" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" -) - -func GetTips(ctx context.Context, api api.FullNode, lastHeight abi.ChainEpoch, headlag int) (<-chan *types.TipSet, error) { - chmain := make(chan *types.TipSet) - - hb := NewHeadBuffer(headlag) - - notif, err := api.ChainNotify(ctx) - if err != nil { - return nil, err - } - - go func() { - defer close(chmain) - - ping := time.Tick(30 * time.Second) - - for { - select { - case changes := <-notif: - for _, change := range changes { - log.Infow("Head event", "height", change.Val.Height(), "type", change.Type) - - switch change.Type { - case store.HCCurrent: - tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight) - if err != nil { - log.Info(err) - return - } - - for _, tipset := range tipsets { - chmain <- tipset - } - case store.HCApply: - if out := hb.Push(change); out != nil { - chmain <- out.Val - } - case store.HCRevert: - hb.Pop() - } - } - case <-ping: - log.Info("Running health check") - - cctx, cancel := context.WithTimeout(ctx, 5*time.Second) - - if _, err := api.ID(cctx); err != nil { - log.Error("Health check failed") - cancel() - return - } - - cancel() - - log.Info("Node online") - case <-ctx.Done(): - return - } - } - }() - - return chmain, nil -} - -func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowestHeight abi.ChainEpoch) ([]*types.TipSet, error) { - tipsets := []*types.TipSet{} - for { - if curr.Height() == 0 { - break - } - - if curr.Height() <= lowestHeight { - break - } - - log.Infow("Walking back", "height", curr.Height()) - tipsets = append(tipsets, curr) - - tsk := curr.Parents() - prev, err := api.ChainGetTipSet(ctx, tsk) - if err != nil { - return tipsets, err - } - - curr = prev - } - - for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 { - tipsets[i], tipsets[j] = tipsets[j], tipsets[i] - } - - return tipsets, nil -} diff --git a/lotus-soup/stats/stats.go b/lotus-soup/stats/stats.go deleted file mode 100644 index 69119ccb6..000000000 --- a/lotus-soup/stats/stats.go +++ /dev/null @@ -1,92 +0,0 @@ -package stats - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/specs-actors/actors/abi" - logging "github.com/ipfs/go-log/v2" -) - -var log = logging.Logger("stats") - -func Collect(api api.FullNode) { - time.Sleep(15 * time.Second) - fmt.Println("collect stats") - - var database string = "testground" - var height int64 = 0 - var headlag int = 3 - - influxAddr := os.Getenv("INFLUXDB_URL") - influxUser := "" - influxPass := "" - - ctx := context.Background() - - influx, err := InfluxClient(influxAddr, influxUser, influxPass) - if err != nil { - log.Fatal(err) - } - - h, err := GetLastRecordedHeight(influx, database) - if err != nil { - log.Info(err) - } - - height = h - - tipsetsCh, err := GetTips(ctx, api, abi.ChainEpoch(height), headlag) - if err != nil { - log.Fatal(err) - } - - wq := NewInfluxWriteQueue(ctx, influx) - defer wq.Close() - - for tipset := range tipsetsCh { - log.Infow("Collect stats", "height", tipset.Height()) - pl := NewPointList() - height := tipset.Height() - - if err := RecordTipsetPoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record tipset", "height", height, "error", err) - continue - } - - if err := RecordTipsetMessagesPoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record messages", "height", height, "error", err) - continue - } - - if err := RecordTipsetStatePoints(ctx, api, pl, tipset); err != nil { - log.Warnw("Failed to record state", "height", height, "error", err) - continue - } - - // Instead of having to pass around a bunch of generic stuff we want for each point - // we will just add them at the end. - - tsTimestamp := time.Unix(int64(tipset.MinTimestamp()), int64(0)) - - nb, err := InfluxNewBatch() - if err != nil { - log.Fatal(err) - } - - for _, pt := range pl.Points() { - pt.SetTime(tsTimestamp) - - nb.AddPoint(NewPointFrom(pt)) - } - - nb.SetDatabase(database) - - log.Infow("Adding points", "count", len(nb.Points()), "height", tipset.Height()) - - wq.AddBatch(nb) - } -} diff --git a/lotus-soup/testkit/node.go b/lotus-soup/testkit/node.go index de57e0444..83f17bb73 100644 --- a/lotus-soup/testkit/node.go +++ b/lotus-soup/testkit/node.go @@ -25,7 +25,7 @@ import ( logging "github.com/ipfs/go-log/v2" influxdb "github.com/kpacha/opencensus-influxdb" - stats "github.com/filecoin-project/lotus/tools/stats" + tstats "github.com/filecoin-project/lotus/tools/stats" "github.com/libp2p/go-libp2p-core/peer" manet "github.com/multiformats/go-multiaddr-net" @@ -260,17 +260,20 @@ func collectStats(ctx context.Context, api api.FullNode) error { influxUser := "" influxPass := "" - influx, err := stats.InfluxClient(influxAddr, influxUser, influxPass) + influx, err := tstats.InfluxClient(influxAddr, influxUser, influxPass) if err != nil { return err } - height, err := stats.GetLastRecordedHeight(influx, database) + height, err := tstats.GetLastRecordedHeight(influx, database) if err != nil { return err } - go stats.Collect(ctx, api, influx, database, height, headlag) + go func() { + time.Sleep(15 * time.Second) + tstats.Collect(ctx, api, influx, database, height, headlag) + }() return nil } diff --git a/lotus-soup/testkit/role_miner.go b/lotus-soup/testkit/role_miner.go index 066e159f4..f1469ed28 100644 --- a/lotus-soup/testkit/role_miner.go +++ b/lotus-soup/testkit/role_miner.go @@ -92,7 +92,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) { } sectors := t.IntParam("sectors") - genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo) + genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo, false) if err != nil { return nil, err } @@ -230,7 +230,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) { // collect stats based on Travis' scripts if t.InitContext.GroupSeq == 1 { - go collectStats(n.FullApi) + go collectStats(ctx, n.FullApi) } // Bootstrap with full node