From d599833b64edac46cd9621804a4be80fef3e45c3 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Sun, 25 Jun 2023 15:55:41 -0600 Subject: [PATCH] WIP --- cmd/lotus-shed/snapshot.go | 39 ++++ cmd/lotus-shed/state-stats.go | 365 +++++++++++++++++++++++------- cmd/lotus-shed/stateroot-stats.go | 30 ++- 3 files changed, 351 insertions(+), 83 deletions(-) create mode 100644 cmd/lotus-shed/snapshot.go diff --git a/cmd/lotus-shed/snapshot.go b/cmd/lotus-shed/snapshot.go new file mode 100644 index 000000000..3ac3319d8 --- /dev/null +++ b/cmd/lotus-shed/snapshot.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/urfave/cli/v2" +) + +var snapshotCmd = &cli.Command{ + Name: "snapshot", + Description: "interact with filecoin chain snapshots", + Subcommands: []*cli.Command{ + snapshotStatCmd, + }, +} + +var snapshotStatCmd = &cli.Command{ + Name: "stat-actor", + Usage: "display number of snapshot bytes held by provided actor", + ArgsUsage: "[datastore path] [head] [actor address]", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "field", // specify top level actor field to stat + }, + }, + Action: func(cctx *cli.Context) error { + // Initialize in memory graph counter + // Get root tskey + // Walk headers back + + // Count header bytes + // open state, + // if no field set graph count actor HEAD + // if field is set, parse actor head, look for field + // if field not found or not a cid error, otherwise do graph count on the cid + + // Print out stats + + return nil + }, +} diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index e07c63589..846c5ff2b 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "reflect" + "sort" "sync" "github.com/docker/go-units" @@ -23,6 +24,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" @@ -166,39 +168,13 @@ var statObjCmd = &cli.Command{ return err } - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } + defer h.closer() - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + dag := merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, @@ -212,6 +188,258 @@ var statObjCmd = &cli.Command{ }, } +type StoreHandle struct { + bs blockstore.Blockstore + cs *store.ChainStore + sm *stmgr.StateManager + closer func() +} + +func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) { + r, err := repo.NewFS(repoPath) + if err != nil { + return nil, xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return nil, err + } + if !exists { + return nil, xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return nil, err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return nil, fmt.Errorf("failed to open blockstore: %w", err) + } + + closer := func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + } + + mds, err := lr.Datastore(context.Background(), "/metadata") + if err != nil { + return nil, err + } + + cs := store.NewChainStore(bs, bs, mds, nil, nil) + if err := cs.Load(ctx); err != nil { + return nil, fmt.Errorf("failed to load chain store: %w", err) + } + + tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) + sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + if err != nil { + return nil, fmt.Errorf("failed to open state manager: %w", err) + } + handle := StoreHandle{ + bs: bs, + sm: sm, + cs: cs, + closer: closer, + } + + return &handle, nil +} + +var statSnapshotCmd = &cli.Command{ + Name: "stat-snapshot", + Usage: "calculates the space usage of a snapshot taken from the given tipset", + Description: `Walk the chain back to lightweight snapshot size and break down space usage into high level + categories: headers, messages, receipts, latest state root, and churn from earlier state roots. + State root and churn space is further broken down by actor type and immediate top level fields + `, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "specify tipset to call method on (pass comma separated array of cids)", + }, + &cli.IntFlag{ + Name: "workers", + Usage: "number of workers to use when processing", + Value: 10, + }, + &cli.IntFlag{ + Name: "dag-cache-size", + Usage: "cache size per worker (setting to 0 disables)", + Value: 8092, + }, + &cli.BoolFlag{ + Name: "pretty", + Usage: "print formated output instead of ldjson", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + h, err := loadChainStore(ctx, cctx.String("repo")) + if err != nil { + return err + } + defer h.closer() + tsr := &ChainStoreTipSetResolver{ + Chain: h.cs, + } + + ts, err := lcli.LoadTipSet(ctx, cctx, tsr) + if err != nil { + return err + } + + type job struct { + c cid.Cid + key string // prefix path for the region being recorded i.e. "/state/mineractor" + } + type cidCall struct { + c cid.Cid + resp chan bool + } + type result struct { + key string + stats api.ObjStat + } + // Stage 1 walk the latest state root + // A) walk all actors + // B) when done walk the actor HAMT + + st, err := h.sm.StateTree(ts.ParentState()) + if err != nil { + return err + } + numWorkers := cctx.Int("workers") + dagCacheSize := cctx.Int("dag-cache-size") + + eg, egctx := errgroup.WithContext(ctx) + + jobs := make(chan job, numWorkers) + results := make(chan result) + cidCh := make(chan cidCall, numWorkers) + + go func() { + seen := cid.NewSet() + + select { + case call := <-cidCh: + call.resp <- seen.Visit(call.c) + case <-ctx.Done(): + log.Infof("shutting down cid set goroutine") + close(cidCh) + } + + }() + + worker := func(ctx context.Context, id int) error { + defer func() { + //log.Infow("worker done", "id", id, "completed", completed) + }() + + for { + select { + case job, ok := <-jobs: + if !ok { + return nil + } + + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + stats := snapshotJobStats(job, dag, cidCh) + for _, stat := range stats { + select { + case results <- stat: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() + } + + } + } + + go func() { + st.ForEach(func(_ address.Address, act *types.Actor) error { + actType := builtin.ActorNameByCode(act.Code) + if actType == "" { + actType = act.Code.String() + } + + jobs <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} + + return nil + }) + + }() + + for w := 0; w < numWorkers; w++ { + id := w + eg.Go(func() error { + return worker(egctx, id) + }) + } + + summary := make(map[string]api.ObjStat) + combine := func(statsA, statsB api.ObjStat) api.ObjStat { + return api.ObjStat{ + Size: statsA.Size + statsB.Size, + Links: statsA.Links + statsB.Links, + } + } + LOOP: + for { + select { + case result, ok := <-results: + if !ok { + return eg.Wait() + } + if stat, ok := summary[result.key]; ok { + summary[result.key] = combine(stat, summary[result.key]) + } else { + summary[result.key] = result.stats + } + + case <-ctx.Done(): + break LOOP + } + } + // XXX walk the top of the state tree + + // Stage 2 walk the rest of the chain: headers, messages, churn + // ordering: + // A) for each header send jobs for messages, receipts, state tree churn + // don't walk header directly as it would just walk everything including parent tipsets + // B) when done walk all actor HAMTs for churn + + if cctx.Bool("pretty") { + DumpSnapshotStats(summary) + } else { + if err := DumpJSON(summary); err != nil { + return err + } + } + + return nil + }, +} + var statActorCmd = &cli.Command{ Name: "stat-actor", Usage: "calculates the size of actors and their immeidate structures", @@ -265,57 +493,14 @@ to reduce the number of decode operations performed by caching the decoded objec addrs = append(addrs, addr) } } - - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() - if err != nil { - return err - } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } - - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - mds, err := lr.Datastore(context.Background(), "/metadata") - if err != nil { - return err - } - - cs := store.NewChainStore(bs, bs, mds, nil, nil) - if err := cs.Load(ctx); err != nil { - return nil - } - - tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) - sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } + defer h.closer() tsr := &ChainStoreTipSetResolver{ - Chain: cs, + Chain: h.cs, } ts, err := lcli.LoadTipSet(ctx, cctx, tsr) @@ -323,11 +508,11 @@ to reduce the number of decode operations performed by caching the decoded objec return err } - log.Infow("tipset", "parentstate", ts.ParentState()) + // log.Infow("tipset", "parentstate", ts.ParentState()) if len(addrs) == 0 && cctx.Bool("all") { var err error - addrs, err = sm.ListAllActors(ctx, ts) + addrs, err = h.sm.ListAllActors(ctx, ts) if err != nil { return err } @@ -344,7 +529,7 @@ to reduce the number of decode operations performed by caching the decoded objec worker := func(ctx context.Context, id int) error { completed := 0 defer func() { - log.Infow("worker done", "id", id, "completed", completed) + //log.Infow("worker done", "id", id, "completed", completed) }() for { @@ -354,15 +539,15 @@ to reduce the number of decode operations performed by caching the decoded objec return nil } - actor, err := sm.LoadActor(ctx, addr, ts) + actor, err := h.sm.LoadActor(ctx, addr, ts) if err != nil { return err } - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) if dagCacheSize != 0 { var err error - dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize) + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) if err != nil { return err } @@ -428,7 +613,7 @@ to reduce the number of decode operations performed by caching the decoded objec } func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { - log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) + // log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) nd, err := dag.Get(ctx, actor.Head) if err != nil { @@ -532,3 +717,19 @@ func DumpStats(actStats actorStats) { fmt.Println("--------------------------------------------------------------------------") } + +func DumpSnapshotStats(stats map[string]api.ObjStat) { + // sort keys so we get subkey locality + keys := make([]string, 0, len(stats)) + for k, _ := range stats { + keys = append(keys, k) + } + sort.Strings(keys) + + fmt.Printf("%-*s%-*s%-*s\n", 32, "Path", 24, "Size", 24, "\"Blocks\"") + for _, k := range keys { + stat := stats[k] + sizeStr := units.BytesSize(float64(stat.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links) + } +} diff --git a/cmd/lotus-shed/stateroot-stats.go b/cmd/lotus-shed/stateroot-stats.go index f429c4e64..bec37d71f 100644 --- a/cmd/lotus-shed/stateroot-stats.go +++ b/cmd/lotus-shed/stateroot-stats.go @@ -104,6 +104,34 @@ type statItem struct { Stat api.ObjStat } +// var snapShotStatCmd = &cli.Command{ +// Name: "snapshot-stat", +// Usage: "print statistics of the snapshot rooted at given block", +// Flags: []cli.Flag{ +// &cli.StringFlag{ +// Name: "tipset", +// Usage: "specify tipset to start from", +// }, +// }, +// Action: func(cctx *cli.Context) error { +// api, closer, err := lcli.GetFullNodeAPI(cctx) +// if err != nil { +// return err +// } + +// defer closer() +// ctx := lcli.ReqContext(cctx) + +// ts, err := lcli.LoadTipSet(ctx, cctx, api) +// if err != nil { +// return err +// } + +// api.ChainStatSnapshot(ctx, ts.Key()) + +// }, +// } + var staterootStatCmd = &cli.Command{ Name: "stat", Usage: "print statistics for the stateroot of a given block", @@ -197,7 +225,7 @@ var staterootStatCmd = &cli.Command{ return err } - fmt.Printf("%s\t%s\t%d\n", inf.Addr, string(cmh.Digest), inf.Stat.Size) + fmt.Printf("%s\t%x\t%d\n", inf.Addr, cmh.Digest, inf.Stat.Size) } return nil },