diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 7355dad25..aab3d1ef3 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -23,6 +23,7 @@ func main() { local := []*cli.Command{ addressCmd, statActorCmd, + statSnapshotCmd, statObjCmd, base64Cmd, base32Cmd, diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 4aec02091..4eb00f981 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -1,11 +1,14 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" "io" + "path" "reflect" + "sort" "sync" "github.com/docker/go-units" @@ -21,8 +24,12 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + gstactors "github.com/filecoin-project/go-state-types/actors" + "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" @@ -49,6 +56,19 @@ type fieldItem struct { Stats api.ObjStat } +type job struct { + c cid.Cid + key string // prefix path for the region being recorded i.e. "/state/mineractor" +} +type cidCall struct { + c cid.Cid + resp chan bool +} +type result struct { + key string + stats api.ObjStat +} + type cacheNodeGetter struct { ds format.NodeGetter cache *lru.TwoQueueCache[cid.Cid, format.Node] @@ -166,39 +186,13 @@ var statObjCmd = &cli.Command{ return err } - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } + defer h.closer() - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + dag := merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, @@ -212,6 +206,376 @@ var statObjCmd = &cli.Command{ }, } +type StoreHandle struct { + bs blockstore.Blockstore + cs *store.ChainStore + sm *stmgr.StateManager + closer func() +} + +func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) { + r, err := repo.NewFS(repoPath) + if err != nil { + return nil, xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return nil, err + } + if !exists { + return nil, xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return nil, err + } + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return nil, fmt.Errorf("failed to open blockstore: %w", err) + } + + closer := func() { + if err := lr.Close(); err != nil { + log.Warnf("failed to close locked repo: %s", err) + } + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + } + + mds, err := lr.Datastore(context.Background(), "/metadata") + if err != nil { + return nil, err + } + + cs := store.NewChainStore(bs, bs, mds, nil, nil) + if err := cs.Load(ctx); err != nil { + return nil, fmt.Errorf("failed to load chain store: %w", err) + } + + tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) + sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + if err != nil { + return nil, fmt.Errorf("failed to open state manager: %w", err) + } + handle := StoreHandle{ + bs: bs, + sm: sm, + cs: cs, + closer: closer, + } + + return &handle, nil +} + +func pipeline(ctx context.Context, name string, numWorkers int, createJobs func(ctx context.Context, jobCh chan job, resultCh chan result) error, + worker func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error, + processResults func(ctx context.Context, resultCh chan result) error) error { + + eg, egctx := errgroup.WithContext(ctx) + jobCh := make(chan job, numWorkers) + resultCh := make(chan result) + var resultWriterWg sync.WaitGroup + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + defer func() { + close(jobCh) + }() + return createJobs(ctx, jobCh, resultCh) + }) + + var id int + for w := 0; w < numWorkers; w++ { + id = w + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + eg.Go(func() error { + return processResults(ctx, resultCh) + }) + + // close result channel when workers are done sending to it. + eg.Go(func() error { + resultWriterWg.Wait() + close(resultCh) + return nil + }) + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed pipeline %s: %w", name, err) + } + return nil +} + +var statSnapshotCmd = &cli.Command{ + Name: "stat-snapshot", + Usage: "calculates the space usage of a snapshot taken from the given tipset", + Description: `Walk the chain back to lightweight snapshot size and break down space usage into high level + categories: headers, messages, receipts, latest state root, and churn from earlier state roots. + State root and churn space is further broken down by actor type and immediate top level fields + `, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "specify tipset to call method on (pass comma separated array of cids)", + }, + &cli.IntFlag{ + Name: "workers", + Usage: "number of workers to use when processing", + Value: 10, + }, + &cli.IntFlag{ + Name: "dag-cache-size", + Usage: "cache size per worker (setting to 0 disables)", + Value: 8092, + }, + &cli.BoolFlag{ + Name: "pretty", + Usage: "print formated output instead of ldjson", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + h, err := loadChainStore(ctx, cctx.String("repo")) + if err != nil { + return err + } + defer h.closer() + tsr := &ChainStoreTipSetResolver{ + Chain: h.cs, + } + + ts, err := lcli.LoadTipSet(ctx, cctx, tsr) + if err != nil { + return err + } + + numWorkers := cctx.Int("workers") + dagCacheSize := cctx.Int("dag-cache-size") + + cidCh := make(chan cidCall, numWorkers) + summary := make(map[string]api.ObjStat) + // snapshot root objects with no additional bytes or links + summary["/"] = api.ObjStat{Size: 0, Links: 0} + summary["/statetree"] = api.ObjStat{Size: 0, Links: 0} + + combine := func(statsA, statsB api.ObjStat) api.ObjStat { + return api.ObjStat{ + Size: statsA.Size + statsB.Size, + Links: statsA.Links + statsB.Links, + } + } + + // Threadsafe cid set lives across different pipelines so not part of error group + go func() { + seen := cid.NewSet() + for { + select { + case call := <-cidCh: + call.resp <- seen.Visit(call.c) + case <-ctx.Done(): + log.Infof("shutting down cid set goroutine: %s", ctx.Err()) + return + } + } + }() + visit := func(c cid.Cid) bool { + ch := make(chan bool) + cidCh <- cidCall{c: c, resp: ch} + out := <-ch + return out + } + // Stage 1 walk all actors in latest state root + createJobsStage1 := func(ctx context.Context, jobCh chan job, _ chan result) error { + st, err := h.sm.StateTree(ts.ParentState()) + if err != nil { + return err + } + + return st.ForEach(func(_ address.Address, act *types.Actor) error { + actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ + if actType == "" { + actType = act.Code.String() + } + jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/latest/%s", actType)} + + return nil + }) + } + + worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + for job := range jobCh { + stats, err := collectSnapshotJobStats(ctx, job, dag, visit) + if err != nil { + return err + } + for _, stat := range stats { + select { + case resultCh <- stat: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + } + + processResults := func(ctx context.Context, resultCh chan result) error { + for result := range resultCh { + if stat, ok := summary[result.key]; ok { + summary[result.key] = combine(stat, result.stats) + + } else { + summary[result.key] = result.stats + } + } + return nil + } + + if err := pipeline(ctx, "Latest State Actors", numWorkers, createJobsStage1, worker, processResults); err != nil { + return err + } + + // Stage 2: walk the top of the latest state root + createJobsStage2 := func(ctx context.Context, jobCh chan job, _ chan result) error { + jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"} + return nil + } + + if err := pipeline(ctx, "Latest State HAMT", numWorkers, createJobsStage2, worker, processResults); err != nil { + return err + } + + // Stage 3 walk the rest of the chain: headers, messages, churn + // ordering: + // for each header send jobs for messages, receipts, state tree churn + // don't walk header directly as it would just walk everything including parent tipsets + + churnStateRoots := cid.NewSet() + createJobsStage3 := func(ctx context.Context, jobCh chan job, resultCh chan result) error { + // walk chain + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + blocksToWalk := ts.Cids() + startHeight := ts.Height() + snapshotStateLimit := abi.ChainEpoch(2000) + + churnActorCache := cid.NewSet() + blocksTracked := cid.NewSet() + for len(blocksToWalk) > 0 { + blkCid := blocksToWalk[0] + blocksToWalk = blocksToWalk[1:] + nd, err := dag.Get(ctx, blkCid) + if err != nil { + return xerrors.Errorf("getting block: %w", err) + } + + var b types.BlockHeader + if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil { + return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blkCid, err) + } + + // header directly to result channel + resultCh <- result{key: "/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}} + // message job + if b.Height > startHeight-snapshotStateLimit { + jobCh <- job{key: "/messages", c: b.Messages} + } + + // state churn job + if b.Height > startHeight-snapshotStateLimit { + if churnStateRoots.Visit(b.ParentStateRoot) { + st, err := h.sm.StateTree(b.ParentStateRoot) + if err != nil { + return err + } + + err = st.ForEach(func(_ address.Address, act *types.Actor) error { + if churnActorCache.Visit(act.Head) { + actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ + if actType == "" { + actType = act.Code.String() + } + jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/churn/%s", actType)} + } + + return nil + }) + if err != nil { + return err + } + } + } + for _, blkCid := range b.Parents { + if blocksTracked.Visit(blkCid) && b.Height != 0 { + blocksToWalk = append(blocksToWalk, blkCid) + } + } + } + return nil + } + + if err := pipeline(ctx, "Churn, Headers, Messages", numWorkers, createJobsStage3, worker, processResults); err != nil { + return err + } + + // step 1 clean things up and get a nice abstraction to reuse + // Stage 4 walk all actor HAMTs for churn + + createJobsStage4 := func(ctx context.Context, jobCh chan job, _ chan result) error { + return churnStateRoots.ForEach(func(c cid.Cid) error { + jobCh <- job{c: c, key: "/statetree/churn"} + return nil + }) + } + + if err := pipeline(ctx, "Churn HAMT", numWorkers, createJobsStage4, worker, processResults); err != nil { + return err + } + + if cctx.Bool("pretty") { + DumpSnapshotStats(summary) + } else { + if err := DumpJSON(summary); err != nil { + return err + } + } + + return nil + }, +} + var statActorCmd = &cli.Command{ Name: "stat-actor", Usage: "calculates the size of actors and their immeidate structures", @@ -265,57 +629,14 @@ to reduce the number of decode operations performed by caching the decoded objec addrs = append(addrs, addr) } } - - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() - if err != nil { - return err - } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } - - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - mds, err := lr.Datastore(context.Background(), "/metadata") - if err != nil { - return err - } - - cs := store.NewChainStore(bs, bs, mds, nil, nil) - if err := cs.Load(ctx); err != nil { - return nil - } - - tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) - sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } + defer h.closer() tsr := &ChainStoreTipSetResolver{ - Chain: cs, + Chain: h.cs, } ts, err := lcli.LoadTipSet(ctx, cctx, tsr) @@ -327,7 +648,7 @@ to reduce the number of decode operations performed by caching the decoded objec if len(addrs) == 0 && cctx.Bool("all") { var err error - addrs, err = sm.ListAllActors(ctx, ts) + addrs, err = h.sm.ListAllActors(ctx, ts) if err != nil { return err } @@ -354,15 +675,15 @@ to reduce the number of decode operations performed by caching the decoded objec return nil } - actor, err := sm.LoadActor(ctx, addr, ts) + actor, err := h.sm.LoadActor(ctx, addr, ts) if err != nil { return err } - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) if dagCacheSize != 0 { var err error - dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize) + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) if err != nil { return err } @@ -427,6 +748,93 @@ to reduce the number of decode operations performed by caching the decoded objec }, } +func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) { + // "state" and "churn" attempt further breakdown by actor type + if !(path.Dir(in.key) == "/statetree/latest") && !(path.Dir(in.key) == "/statetree/churn") { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + return []result{{key: in.key, stats: dsc.stats}}, nil + } + + // in.c is an actor head cid, try to unmarshal and create sub keys for different regions of state + nd, err := dag.Get(ctx, in.c) + if err != nil { + return nil, err + } + subjobs := make([]job, 0) + results := make([]result, 0) + + // reconstruct actor for state parsing from key + av, err := gstactors.VersionForNetwork(network.Version20) + if err != nil { + return nil, fmt.Errorf("failed to get actors version for network: %w", err) + } + code, ok := actors.GetActorCodeID(av, path.Base(in.key)) + if !ok { // try parsing key directly + code, err = cid.Parse(path.Base(in.key)) + if err != nil { + log.Debugf("failing to parse actor string: %s", path.Base(in.key)) + } + } + + actor := types.ActorV5{Head: in.c, Code: code} + oif, err := vm.DumpActorState(consensus.NewTipSetExecutor(filcns.RewardFunc).NewActorRegistry(), &actor, nd.RawData()) + if err != nil { + oif = nil + } + // Account actors return nil from DumpActorState as they have no state + if oif != nil { + v := reflect.Indirect(reflect.ValueOf(oif)) + for i := 0; i < v.NumField(); i++ { + varName := v.Type().Field(i).Name + varType := v.Type().Field(i).Type + varValue := v.Field(i).Interface() + + if varType == reflect.TypeOf(cid.Cid{}) { + subjobs = append(subjobs, job{ + key: fmt.Sprintf("%s/%s", in.key, varName), + c: varValue.(cid.Cid), + }) + } + } + } + + // Walk subfields + for _, job := range subjobs { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, job.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + var res result + res.key = job.key + res.stats = dsc.stats + + results = append(results, res) + } + + // now walk the top level object of actor state + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + results = append(results, result{key: in.key, stats: dsc.stats}) + return results, nil +} + func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) @@ -532,3 +940,19 @@ func DumpStats(actStats actorStats) { fmt.Println("--------------------------------------------------------------------------") } + +func DumpSnapshotStats(stats map[string]api.ObjStat) { + // sort keys so we get subkey locality + keys := make([]string, 0, len(stats)) + for k := range stats { + keys = append(keys, k) + } + sort.Strings(keys) + + fmt.Printf("%-*s%-*s%-*s\n", 32, "Path", 24, "Size", 24, "\"Blocks\"") + for _, k := range keys { + stat := stats[k] + sizeStr := units.BytesSize(float64(stat.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links) + } +} diff --git a/cmd/lotus-shed/stateroot-stats.go b/cmd/lotus-shed/stateroot-stats.go index f429c4e64..16dfc5935 100644 --- a/cmd/lotus-shed/stateroot-stats.go +++ b/cmd/lotus-shed/stateroot-stats.go @@ -197,7 +197,7 @@ var staterootStatCmd = &cli.Command{ return err } - fmt.Printf("%s\t%s\t%d\n", inf.Addr, string(cmh.Digest), inf.Stat.Size) + fmt.Printf("%s\t%x\t%d\n", inf.Addr, cmh.Digest, inf.Stat.Size) } return nil }, diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py new file mode 100644 index 000000000..f37623cd2 --- /dev/null +++ b/scripts/snapshot-summary.py @@ -0,0 +1,30 @@ +import plotly.express as px +import sys, json +import pathlib + +snapshot_data = json.load(sys.stdin) + +# Possible extensions: +# 1. parameterize to use block count as value instead of byte size +# 2. parameterize on different types of px chart types +# 3. parameterize on output port so we can serve this from infra + +parents = [] +names = [] +values = [] + +for key in snapshot_data: + path = pathlib.Path(key) + name = key + parent = str(path.parent) + if key == '/': + parent = '' + stats = snapshot_data[key] + parents.append(parent) + names.append(name) + values.append(stats['Size']) + +data = dict(names=names, parents=parents, values=values) +fig = px.treemap(data, names='names', parents='parents', values='values') +fig.show() +