From d599833b64edac46cd9621804a4be80fef3e45c3 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Sun, 25 Jun 2023 15:55:41 -0600 Subject: [PATCH 01/12] WIP --- cmd/lotus-shed/snapshot.go | 39 ++++ cmd/lotus-shed/state-stats.go | 365 +++++++++++++++++++++++------- cmd/lotus-shed/stateroot-stats.go | 30 ++- 3 files changed, 351 insertions(+), 83 deletions(-) create mode 100644 cmd/lotus-shed/snapshot.go diff --git a/cmd/lotus-shed/snapshot.go b/cmd/lotus-shed/snapshot.go new file mode 100644 index 000000000..3ac3319d8 --- /dev/null +++ b/cmd/lotus-shed/snapshot.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/urfave/cli/v2" +) + +var snapshotCmd = &cli.Command{ + Name: "snapshot", + Description: "interact with filecoin chain snapshots", + Subcommands: []*cli.Command{ + snapshotStatCmd, + }, +} + +var snapshotStatCmd = &cli.Command{ + Name: "stat-actor", + Usage: "display number of snapshot bytes held by provided actor", + ArgsUsage: "[datastore path] [head] [actor address]", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "field", // specify top level actor field to stat + }, + }, + Action: func(cctx *cli.Context) error { + // Initialize in memory graph counter + // Get root tskey + // Walk headers back + + // Count header bytes + // open state, + // if no field set graph count actor HEAD + // if field is set, parse actor head, look for field + // if field not found or not a cid error, otherwise do graph count on the cid + + // Print out stats + + return nil + }, +} diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index e07c63589..846c5ff2b 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "reflect" + "sort" "sync" "github.com/docker/go-units" @@ -23,6 +24,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" @@ -166,39 +168,13 @@ var statObjCmd = &cli.Command{ return err } - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } + defer h.closer() - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + dag := merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, @@ -212,6 +188,258 @@ var statObjCmd = &cli.Command{ }, } +type StoreHandle struct { + bs blockstore.Blockstore + cs *store.ChainStore + sm *stmgr.StateManager + closer func() +} + +func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) { + r, err := repo.NewFS(repoPath) + if err != nil { + return nil, xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return nil, err + } + if !exists { + return nil, xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return nil, err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return nil, fmt.Errorf("failed to open blockstore: %w", err) + } + + closer := func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + } + + mds, err := lr.Datastore(context.Background(), "/metadata") + if err != nil { + return nil, err + } + + cs := store.NewChainStore(bs, bs, mds, nil, nil) + if err := cs.Load(ctx); err != nil { + return nil, fmt.Errorf("failed to load chain store: %w", err) + } + + tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) + sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + if err != nil { + return nil, fmt.Errorf("failed to open state manager: %w", err) + } + handle := StoreHandle{ + bs: bs, + sm: sm, + cs: cs, + closer: closer, + } + + return &handle, nil +} + +var statSnapshotCmd = &cli.Command{ + Name: "stat-snapshot", + Usage: "calculates the space usage of a snapshot taken from the given tipset", + Description: `Walk the chain back to lightweight snapshot size and break down space usage into high level + categories: headers, messages, receipts, latest state root, and churn from earlier state roots. + State root and churn space is further broken down by actor type and immediate top level fields + `, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "specify tipset to call method on (pass comma separated array of cids)", + }, + &cli.IntFlag{ + Name: "workers", + Usage: "number of workers to use when processing", + Value: 10, + }, + &cli.IntFlag{ + Name: "dag-cache-size", + Usage: "cache size per worker (setting to 0 disables)", + Value: 8092, + }, + &cli.BoolFlag{ + Name: "pretty", + Usage: "print formated output instead of ldjson", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + h, err := loadChainStore(ctx, cctx.String("repo")) + if err != nil { + return err + } + defer h.closer() + tsr := &ChainStoreTipSetResolver{ + Chain: h.cs, + } + + ts, err := lcli.LoadTipSet(ctx, cctx, tsr) + if err != nil { + return err + } + + type job struct { + c cid.Cid + key string // prefix path for the region being recorded i.e. "/state/mineractor" + } + type cidCall struct { + c cid.Cid + resp chan bool + } + type result struct { + key string + stats api.ObjStat + } + // Stage 1 walk the latest state root + // A) walk all actors + // B) when done walk the actor HAMT + + st, err := h.sm.StateTree(ts.ParentState()) + if err != nil { + return err + } + numWorkers := cctx.Int("workers") + dagCacheSize := cctx.Int("dag-cache-size") + + eg, egctx := errgroup.WithContext(ctx) + + jobs := make(chan job, numWorkers) + results := make(chan result) + cidCh := make(chan cidCall, numWorkers) + + go func() { + seen := cid.NewSet() + + select { + case call := <-cidCh: + call.resp <- seen.Visit(call.c) + case <-ctx.Done(): + log.Infof("shutting down cid set goroutine") + close(cidCh) + } + + }() + + worker := func(ctx context.Context, id int) error { + defer func() { + //log.Infow("worker done", "id", id, "completed", completed) + }() + + for { + select { + case job, ok := <-jobs: + if !ok { + return nil + } + + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + stats := snapshotJobStats(job, dag, cidCh) + for _, stat := range stats { + select { + case results <- stat: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() + } + + } + } + + go func() { + st.ForEach(func(_ address.Address, act *types.Actor) error { + actType := builtin.ActorNameByCode(act.Code) + if actType == "" { + actType = act.Code.String() + } + + jobs <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} + + return nil + }) + + }() + + for w := 0; w < numWorkers; w++ { + id := w + eg.Go(func() error { + return worker(egctx, id) + }) + } + + summary := make(map[string]api.ObjStat) + combine := func(statsA, statsB api.ObjStat) api.ObjStat { + return api.ObjStat{ + Size: statsA.Size + statsB.Size, + Links: statsA.Links + statsB.Links, + } + } + LOOP: + for { + select { + case result, ok := <-results: + if !ok { + return eg.Wait() + } + if stat, ok := summary[result.key]; ok { + summary[result.key] = combine(stat, summary[result.key]) + } else { + summary[result.key] = result.stats + } + + case <-ctx.Done(): + break LOOP + } + } + // XXX walk the top of the state tree + + // Stage 2 walk the rest of the chain: headers, messages, churn + // ordering: + // A) for each header send jobs for messages, receipts, state tree churn + // don't walk header directly as it would just walk everything including parent tipsets + // B) when done walk all actor HAMTs for churn + + if cctx.Bool("pretty") { + DumpSnapshotStats(summary) + } else { + if err := DumpJSON(summary); err != nil { + return err + } + } + + return nil + }, +} + var statActorCmd = &cli.Command{ Name: "stat-actor", Usage: "calculates the size of actors and their immeidate structures", @@ -265,57 +493,14 @@ to reduce the number of decode operations performed by caching the decoded objec addrs = append(addrs, addr) } } - - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return xerrors.Errorf("opening fs repo: %w", err) - } - - exists, err := r.Exists() - if err != nil { - return err - } - if !exists { - return xerrors.Errorf("lotus repo doesn't exist") - } - - lr, err := r.Lock(repo.FullNode) - if err != nil { - return err - } - defer lr.Close() //nolint:errcheck - - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) - if err != nil { - return fmt.Errorf("failed to open blockstore: %w", err) - } - - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - - mds, err := lr.Datastore(context.Background(), "/metadata") - if err != nil { - return err - } - - cs := store.NewChainStore(bs, bs, mds, nil, nil) - if err := cs.Load(ctx); err != nil { - return nil - } - - tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) - sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex) + h, err := loadChainStore(ctx, cctx.String("repo")) if err != nil { return err } + defer h.closer() tsr := &ChainStoreTipSetResolver{ - Chain: cs, + Chain: h.cs, } ts, err := lcli.LoadTipSet(ctx, cctx, tsr) @@ -323,11 +508,11 @@ to reduce the number of decode operations performed by caching the decoded objec return err } - log.Infow("tipset", "parentstate", ts.ParentState()) + // log.Infow("tipset", "parentstate", ts.ParentState()) if len(addrs) == 0 && cctx.Bool("all") { var err error - addrs, err = sm.ListAllActors(ctx, ts) + addrs, err = h.sm.ListAllActors(ctx, ts) if err != nil { return err } @@ -344,7 +529,7 @@ to reduce the number of decode operations performed by caching the decoded objec worker := func(ctx context.Context, id int) error { completed := 0 defer func() { - log.Infow("worker done", "id", id, "completed", completed) + //log.Infow("worker done", "id", id, "completed", completed) }() for { @@ -354,15 +539,15 @@ to reduce the number of decode operations performed by caching the decoded objec return nil } - actor, err := sm.LoadActor(ctx, addr, ts) + actor, err := h.sm.LoadActor(ctx, addr, ts) if err != nil { return err } - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) if dagCacheSize != 0 { var err error - dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize) + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) if err != nil { return err } @@ -428,7 +613,7 @@ to reduce the number of decode operations performed by caching the decoded objec } func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { - log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) + // log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) nd, err := dag.Get(ctx, actor.Head) if err != nil { @@ -532,3 +717,19 @@ func DumpStats(actStats actorStats) { fmt.Println("--------------------------------------------------------------------------") } + +func DumpSnapshotStats(stats map[string]api.ObjStat) { + // sort keys so we get subkey locality + keys := make([]string, 0, len(stats)) + for k, _ := range stats { + keys = append(keys, k) + } + sort.Strings(keys) + + fmt.Printf("%-*s%-*s%-*s\n", 32, "Path", 24, "Size", 24, "\"Blocks\"") + for _, k := range keys { + stat := stats[k] + sizeStr := units.BytesSize(float64(stat.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links) + } +} diff --git a/cmd/lotus-shed/stateroot-stats.go b/cmd/lotus-shed/stateroot-stats.go index f429c4e64..bec37d71f 100644 --- a/cmd/lotus-shed/stateroot-stats.go +++ b/cmd/lotus-shed/stateroot-stats.go @@ -104,6 +104,34 @@ type statItem struct { Stat api.ObjStat } +// var snapShotStatCmd = &cli.Command{ +// Name: "snapshot-stat", +// Usage: "print statistics of the snapshot rooted at given block", +// Flags: []cli.Flag{ +// &cli.StringFlag{ +// Name: "tipset", +// Usage: "specify tipset to start from", +// }, +// }, +// Action: func(cctx *cli.Context) error { +// api, closer, err := lcli.GetFullNodeAPI(cctx) +// if err != nil { +// return err +// } + +// defer closer() +// ctx := lcli.ReqContext(cctx) + +// ts, err := lcli.LoadTipSet(ctx, cctx, api) +// if err != nil { +// return err +// } + +// api.ChainStatSnapshot(ctx, ts.Key()) + +// }, +// } + var staterootStatCmd = &cli.Command{ Name: "stat", Usage: "print statistics for the stateroot of a given block", @@ -197,7 +225,7 @@ var staterootStatCmd = &cli.Command{ return err } - fmt.Printf("%s\t%s\t%d\n", inf.Addr, string(cmh.Digest), inf.Stat.Size) + fmt.Printf("%s\t%x\t%d\n", inf.Addr, cmh.Digest, inf.Stat.Size) } return nil }, From 36a88f45fcecd7af430c95f189d22c16ee890217 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Sun, 25 Jun 2023 17:36:21 -0600 Subject: [PATCH 02/12] Output is buggy but halfway there --- cmd/lotus-shed/main.go | 1 + cmd/lotus-shed/state-stats.go | 328 +++++++++++++++++++++++----------- 2 files changed, 221 insertions(+), 108 deletions(-) diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 13ab6af0d..41a2a4d94 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -23,6 +23,7 @@ func main() { local := []*cli.Command{ addressCmd, statActorCmd, + statSnapshotCmd, statObjCmd, base64Cmd, base32Cmd, diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 846c5ff2b..d1e313724 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "path" "reflect" "sort" "sync" @@ -22,9 +23,12 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + gstactors "github.com/filecoin-project/go-state-types/actors" + "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" @@ -51,6 +55,19 @@ type fieldItem struct { Stats api.ObjStat } +type job struct { + c cid.Cid + key string // prefix path for the region being recorded i.e. "/state/mineractor" +} +type cidCall struct { + c cid.Cid + resp chan bool +} +type result struct { + key string + stats api.ObjStat +} + type cacheNodeGetter struct { ds format.NodeGetter cache *lru.TwoQueueCache[cid.Cid, format.Node] @@ -213,7 +230,6 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) if err != nil { return nil, err } - defer lr.Close() //nolint:errcheck bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { @@ -221,6 +237,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) } closer := func() { + lr.Close() if c, ok := bs.(io.Closer); ok { if err := c.Close(); err != nil { log.Warnf("failed to close blockstore: %s", err) @@ -297,105 +314,14 @@ var statSnapshotCmd = &cli.Command{ return err } - type job struct { - c cid.Cid - key string // prefix path for the region being recorded i.e. "/state/mineractor" - } - type cidCall struct { - c cid.Cid - resp chan bool - } - type result struct { - key string - stats api.ObjStat - } - // Stage 1 walk the latest state root - // A) walk all actors - // B) when done walk the actor HAMT - - st, err := h.sm.StateTree(ts.ParentState()) - if err != nil { - return err - } numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") eg, egctx := errgroup.WithContext(ctx) - jobs := make(chan job, numWorkers) - results := make(chan result) + jobCh := make(chan job, numWorkers) + resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) - - go func() { - seen := cid.NewSet() - - select { - case call := <-cidCh: - call.resp <- seen.Visit(call.c) - case <-ctx.Done(): - log.Infof("shutting down cid set goroutine") - close(cidCh) - } - - }() - - worker := func(ctx context.Context, id int) error { - defer func() { - //log.Infow("worker done", "id", id, "completed", completed) - }() - - for { - select { - case job, ok := <-jobs: - if !ok { - return nil - } - - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) - if dagCacheSize != 0 { - var err error - dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) - if err != nil { - return err - } - } - - stats := snapshotJobStats(job, dag, cidCh) - for _, stat := range stats { - select { - case results <- stat: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } - - } - } - - go func() { - st.ForEach(func(_ address.Address, act *types.Actor) error { - actType := builtin.ActorNameByCode(act.Code) - if actType == "" { - actType = act.Code.String() - } - - jobs <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} - - return nil - }) - - }() - - for w := 0; w < numWorkers; w++ { - id := w - eg.Go(func() error { - return worker(egctx, id) - }) - } - summary := make(map[string]api.ObjStat) combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ @@ -403,30 +329,129 @@ var statSnapshotCmd = &cli.Command{ Links: statsA.Links + statsB.Links, } } - LOOP: - for { - select { - case result, ok := <-results: - if !ok { - return eg.Wait() + var workerWg sync.WaitGroup + + // Threadsafe cid set lives across different pipelines so not part of error group + go func() error { + seen := cid.NewSet() + for { + select { + case call := <-cidCh: + call.resp <- seen.Visit(call.c) + case <-ctx.Done(): + log.Infof("shutting down cid set goroutine") + return ctx.Err() } + } + }() + visit := func(c cid.Cid) bool { + ch := make(chan bool) + cidCh <- cidCall{c: c, resp: ch} + out := <-ch + return out + } + + // Stage 1 walk all actors in latest state root + eg.Go(func() error { + defer func() { + close(jobCh) + }() + st, err := h.sm.StateTree(ts.ParentState()) + if err != nil { + return err + } + return st.ForEach(func(_ address.Address, act *types.Actor) error { + actType := builtin.ActorNameByCode(act.Code) + if actType == "" { + actType = act.Code.String() + } + + jobCh <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} + + return nil + }) + + }) + + worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + for job := range jobCh { + stats, err := collectSnapshotJobStats(ctx, job, dag, visit) + if err != nil { + return err + } + for _, stat := range stats { + select { + case resultCh <- stat: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + } + var id int + for w := 0; w < numWorkers; w++ { + id = w + workerWg.Add(1) + eg.Go(func() error { + defer workerWg.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + // Close result channel when workers are done sending to it. + eg.Go(func() error { + workerWg.Wait() + close(resultCh) + return nil + }) + + eg.Go(func() error { + for result := range resultCh { if stat, ok := summary[result.key]; ok { summary[result.key] = combine(stat, summary[result.key]) } else { summary[result.key] = result.stats } - - case <-ctx.Done(): - break LOOP } - } - // XXX walk the top of the state tree + return nil + }) - // Stage 2 walk the rest of the chain: headers, messages, churn + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to measure space in latest state root: %w", err) + } + + // Stage 2: walk the top of the latest state root + + id += 1 + resultCh = make(chan result) + jobCh = make(chan job) + go func() { + defer close(jobCh) + jobCh <- job{c: ts.ParentState(), key: "statetop"} + }() + go func() { + defer close(resultCh) + worker(ctx, id, jobCh, resultCh) + }() + res := <-resultCh + summary[res.key] = res.stats + + // Stage 3 walk the rest of the chain: headers, messages, churn // ordering: - // A) for each header send jobs for messages, receipts, state tree churn + // for each header send jobs for messages, receipts, state tree churn // don't walk header directly as it would just walk everything including parent tipsets - // B) when done walk all actor HAMTs for churn + + // Stage 4 walk all actor HAMTs for churn if cctx.Bool("pretty") { DumpSnapshotStats(summary) @@ -612,6 +637,93 @@ to reduce the number of decode operations performed by caching the decoded objec }, } +func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) { + // "state" and "churn" require further breakdown by actor type + if path.Dir(in.key) != "state" && path.Dir(in.key) != "churn" { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + return []result{{key: in.key, stats: dsc.stats}}, nil + } + + // in.c is an actor head cid, try to unmarshal and create sub keys for different regions of state + nd, err := dag.Get(ctx, in.c) + if err != nil { + return nil, err + } + subjobs := make([]job, 0) + results := make([]result, 0) + + // reconstruct actor for state parsing from key + av, err := gstactors.VersionForNetwork(network.Version20) + if err != nil { + return nil, fmt.Errorf("failed to get actors version for network: %w", err) + } + code, ok := actors.GetActorCodeID(av, path.Base(in.key)) + if !ok { // try parsing key directly + code, err = cid.Parse(path.Base(in.key)) + if err != nil { + log.Debugf("failing to parse actor string: %s", path.Base(in.key)) + } + } + + actor := types.ActorV5{Head: in.c, Code: code} + oif, err := vm.DumpActorState(consensus.NewTipSetExecutor(filcns.RewardFunc).NewActorRegistry(), &actor, nd.RawData()) + if err != nil { + oif = nil + } + // Account actors return nil from DumpActorState as they have no state + if oif != nil { + v := reflect.Indirect(reflect.ValueOf(oif)) + for i := 0; i < v.NumField(); i++ { + varName := v.Type().Field(i).Name + varType := v.Type().Field(i).Type + varValue := v.Field(i).Interface() + + if varType == reflect.TypeOf(cid.Cid{}) { + subjobs = append(subjobs, job{ + key: fmt.Sprintf("%s/%s", in.key, varName), + c: varValue.(cid.Cid), + }) + } + } + } + + // Walk subfields + for _, job := range subjobs { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, job.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + var res result + res.key = job.key + res.stats = dsc.stats + + results = append(results, res) + } + + // now walk the top level object of actor state + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil { + return nil, err + } + results = append(results, result{key: in.key, stats: dsc.stats}) + return results, nil +} + func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { // log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) From 8f3123d75b440ab84dc2a48aaf3cdc1b9c408aec Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 27 Jun 2023 00:54:02 -0600 Subject: [PATCH 03/12] Churn json output is working --- cmd/lotus-shed/state-stats.go | 134 ++++++++++++++++++++++++++++++++-- scripts/snapshot-summary.py | 28 +++++++ 2 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 scripts/snapshot-summary.py diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index d1e313724..0a7811828 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -323,6 +324,9 @@ var statSnapshotCmd = &cli.Command{ resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) summary := make(map[string]api.ObjStat) + summary["/snapshot"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links + summary["/snapshot/churn"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links + combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ Size: statsA.Size + statsB.Size, @@ -356,21 +360,22 @@ var statSnapshotCmd = &cli.Command{ defer func() { close(jobCh) }() + st, err := h.sm.StateTree(ts.ParentState()) if err != nil { return err } + return st.ForEach(func(_ address.Address, act *types.Actor) error { actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ if actType == "" { actType = act.Code.String() } - - jobCh <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} + jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/state/%s", actType)} return nil }) - }) worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { @@ -418,7 +423,8 @@ var statSnapshotCmd = &cli.Command{ eg.Go(func() error { for result := range resultCh { if stat, ok := summary[result.key]; ok { - summary[result.key] = combine(stat, summary[result.key]) + summary[result.key] = combine(stat, result.stats) + } else { summary[result.key] = result.stats } @@ -431,13 +437,12 @@ var statSnapshotCmd = &cli.Command{ } // Stage 2: walk the top of the latest state root - id += 1 resultCh = make(chan result) jobCh = make(chan job) go func() { defer close(jobCh) - jobCh <- job{c: ts.ParentState(), key: "statetop"} + jobCh <- job{c: ts.ParentState(), key: "/snapshot/state"} }() go func() { defer close(resultCh) @@ -450,6 +455,119 @@ var statSnapshotCmd = &cli.Command{ // ordering: // for each header send jobs for messages, receipts, state tree churn // don't walk header directly as it would just walk everything including parent tipsets + eg, egctx = errgroup.WithContext(ctx) + jobCh = make(chan job, numWorkers) + resultCh = make(chan result) + var workerWg2 sync.WaitGroup + + churnStateRoots := cid.NewSet() + workerWg2.Add(1) // goroutine creating jobs also sends to result channel + eg.Go(func() error { + defer func() { + close(jobCh) + workerWg2.Done() + }() + // walk chain + + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + blocksToWalk := ts.Cids() + startHeight := ts.Height() + fmt.Printf("starting height %d\n", startHeight) + snapshotStateLimit := abi.ChainEpoch(2000) + + churnActorCache := cid.NewSet() + blocksTracked := cid.NewSet() + for len(blocksToWalk) > 0 { + blkCid := blocksToWalk[0] + blocksToWalk = blocksToWalk[1:] + nd, err := dag.Get(ctx, blkCid) + if err != nil { + return xerrors.Errorf("getting block: %w", err) + } + + var b types.BlockHeader + if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil { + return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blkCid, err) + } + + // header directly to result channel + resultCh <- result{key: "snapshot/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}} + // message job + if b.Height > startHeight-snapshotStateLimit { + jobCh <- job{key: "snapshot/messages", c: b.Messages} + } + + // state churn job + if b.Height > startHeight-snapshotStateLimit { + if churnStateRoots.Visit(b.ParentStateRoot) { + st, err := h.sm.StateTree(b.ParentStateRoot) + if err != nil { + return err + } + + err = st.ForEach(func(_ address.Address, act *types.Actor) error { + if churnActorCache.Visit(act.Head) { + actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ + if actType == "" { + actType = act.Code.String() + } + jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/churn/%s", actType)} + } + + return nil + }) + if err != nil { + return err + } + } + } + for _, blkCid := range b.Parents { + if blocksTracked.Visit(blkCid) && b.Height != 0 { + blocksToWalk = append(blocksToWalk, blkCid) + } + } + } + return nil + }) + + for w := 0; w < numWorkers; w++ { + id += w + workerWg2.Add(1) + eg.Go(func() error { + defer workerWg2.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + // Close result channel when workers are done sending to it. + eg.Go(func() error { + workerWg2.Wait() + close(resultCh) + return nil + }) + + eg.Go(func() error { + for result := range resultCh { + if stat, ok := summary[result.key]; ok { + summary[result.key] = combine(stat, result.stats) + } else { + summary[result.key] = result.stats + } + } + return nil + }) + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to measure space in snapshot churn: %w", err) + } // Stage 4 walk all actor HAMTs for churn @@ -638,8 +756,8 @@ to reduce the number of decode operations performed by caching the decoded objec } func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) { - // "state" and "churn" require further breakdown by actor type - if path.Dir(in.key) != "state" && path.Dir(in.key) != "churn" { + // "state" and "churn" attempt further breakdown by actor type + if !(path.Dir(in.key) == "/snapshot/state") && !(path.Dir(in.key) == "/snapshot/churn") { dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py new file mode 100644 index 000000000..da75ee9a7 --- /dev/null +++ b/scripts/snapshot-summary.py @@ -0,0 +1,28 @@ +import plotly.express as px +import sys, json + +snapshot_data = json.load(sys.stdin) + +# XXX: parameterize to use block count as value instead of byte size +# XXX: parameterize on different types of px chart types +# XXX: parameterize on output port so we can serve this from infra + +parents = [] +names = [] +values = [] + +for key in snapshot_data: + path = key.split('/') + name = path[len(path) - 1] + parent = path[len(path) - 2] + stats = snapshot_data[key] + parents.append(parent) + names.append(name) + values.append(stats['Size']) + +data = dict(names=names, parents=parents, values=values) +print(data) +fig = px.treemap(data, names='names', parents='parents', values='values') +print(fig) +fig.show() + From 4aa977f1190a9cddffe231e7a84f6d7be28aa256 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 27 Jun 2023 11:00:20 -0600 Subject: [PATCH 04/12] Touch up pathing --- cmd/lotus-shed/state-stats.go | 18 ++++++++++-------- scripts/snapshot-summary.py | 9 ++++++--- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 0a7811828..74492cdc3 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -324,8 +324,10 @@ var statSnapshotCmd = &cli.Command{ resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) summary := make(map[string]api.ObjStat) - summary["/snapshot"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links - summary["/snapshot/churn"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links + // snapshot root objects with no additional bytes or links + summary["/"] = api.ObjStat{Size: 0, Links: 0} + summary["/statetree"] = api.ObjStat{Size: 0, Links: 0} + summary["/statetree/churn"] = api.ObjStat{Size: 0, Links: 0} // XXX this will be calculated combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ @@ -372,7 +374,7 @@ var statSnapshotCmd = &cli.Command{ if actType == "" { actType = act.Code.String() } - jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/state/%s", actType)} + jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/latest/%s", actType)} return nil }) @@ -442,7 +444,7 @@ var statSnapshotCmd = &cli.Command{ jobCh = make(chan job) go func() { defer close(jobCh) - jobCh <- job{c: ts.ParentState(), key: "/snapshot/state"} + jobCh <- job{c: ts.ParentState(), key: "/statetree"} }() go func() { defer close(resultCh) @@ -499,10 +501,10 @@ var statSnapshotCmd = &cli.Command{ } // header directly to result channel - resultCh <- result{key: "snapshot/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}} + resultCh <- result{key: "/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}} // message job if b.Height > startHeight-snapshotStateLimit { - jobCh <- job{key: "snapshot/messages", c: b.Messages} + jobCh <- job{key: "/messages", c: b.Messages} } // state churn job @@ -520,7 +522,7 @@ var statSnapshotCmd = &cli.Command{ if actType == "" { actType = act.Code.String() } - jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/churn/%s", actType)} + jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/churn/%s", actType)} } return nil @@ -757,7 +759,7 @@ to reduce the number of decode operations performed by caching the decoded objec func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) { // "state" and "churn" attempt further breakdown by actor type - if !(path.Dir(in.key) == "/snapshot/state") && !(path.Dir(in.key) == "/snapshot/churn") { + if !(path.Dir(in.key) == "/statetree/latest") && !(path.Dir(in.key) == "/statetree/churn") { dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py index da75ee9a7..fd64da64f 100644 --- a/scripts/snapshot-summary.py +++ b/scripts/snapshot-summary.py @@ -1,5 +1,6 @@ import plotly.express as px import sys, json +import pathlib snapshot_data = json.load(sys.stdin) @@ -12,9 +13,11 @@ names = [] values = [] for key in snapshot_data: - path = key.split('/') - name = path[len(path) - 1] - parent = path[len(path) - 2] + path = pathlib.Path(key) + name = key + parent = str(path.parent) + if key == '/': + parent = '' stats = snapshot_data[key] parents.append(parent) names.append(name) From 77ea7ef90db4a4e83448466cd29a9fc0466a6d02 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 27 Jun 2023 11:29:14 -0600 Subject: [PATCH 05/12] Tweak path stuff --- cmd/lotus-shed/state-stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 74492cdc3..11da70dca 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -444,7 +444,7 @@ var statSnapshotCmd = &cli.Command{ jobCh = make(chan job) go func() { defer close(jobCh) - jobCh <- job{c: ts.ParentState(), key: "/statetree"} + jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"} }() go func() { defer close(resultCh) From 0cfdc9b5b6b28136e3cf900fd30511220091d303 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 27 Jun 2023 17:57:00 -0600 Subject: [PATCH 06/12] Refactor for cleanup + measure top level HAMT churn --- cmd/lotus-shed/state-stats.go | 166 +++++++++++++++++----------------- 1 file changed, 81 insertions(+), 85 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 11da70dca..6378fdc11 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -6,11 +6,15 @@ import ( "encoding/json" "fmt" "io" + "net/http" "path" "reflect" "sort" "sync" + _ "net/http" + _ "net/http/pprof" + "github.com/docker/go-units" lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-blockservice" @@ -271,6 +275,52 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) return &handle, nil } +func pipeline(ctx context.Context, name string, numWorkers int, createJobs func(ctx context.Context, jobCh chan job, resultCh chan result) error, + worker func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error, + processResults func(ctx context.Context, resultCh chan result) error) error { + + eg, egctx := errgroup.WithContext(ctx) + jobCh := make(chan job, numWorkers) + resultCh := make(chan result) + var resultWriterWg sync.WaitGroup + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + defer func() { + close(jobCh) + }() + return createJobs(ctx, jobCh, resultCh) + }) + + var id int + for w := 0; w < numWorkers; w++ { + id = w + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + eg.Go(func() error { + return processResults(ctx, resultCh) + }) + + // close result channel when workers are done sending to it. + eg.Go(func() error { + resultWriterWg.Wait() + close(resultCh) + return nil + }) + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed pipeline %s: %w", name, err) + } + return nil +} + var statSnapshotCmd = &cli.Command{ Name: "stat-snapshot", Usage: "calculates the space usage of a snapshot taken from the given tipset", @@ -314,20 +364,18 @@ var statSnapshotCmd = &cli.Command{ if err != nil { return err } + go func() { + fmt.Println(http.ListenAndServe("localhost:6060", nil)) + }() numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") - eg, egctx := errgroup.WithContext(ctx) - - jobCh := make(chan job, numWorkers) - resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) summary := make(map[string]api.ObjStat) // snapshot root objects with no additional bytes or links summary["/"] = api.ObjStat{Size: 0, Links: 0} summary["/statetree"] = api.ObjStat{Size: 0, Links: 0} - summary["/statetree/churn"] = api.ObjStat{Size: 0, Links: 0} // XXX this will be calculated combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ @@ -335,7 +383,6 @@ var statSnapshotCmd = &cli.Command{ Links: statsA.Links + statsB.Links, } } - var workerWg sync.WaitGroup // Threadsafe cid set lives across different pipelines so not part of error group go func() error { @@ -356,13 +403,8 @@ var statSnapshotCmd = &cli.Command{ out := <-ch return out } - // Stage 1 walk all actors in latest state root - eg.Go(func() error { - defer func() { - close(jobCh) - }() - + createJobsStage1 := func(ctx context.Context, jobCh chan job, _ chan result) error { st, err := h.sm.StateTree(ts.ParentState()) if err != nil { return err @@ -378,7 +420,7 @@ var statSnapshotCmd = &cli.Command{ return nil }) - }) + } worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) @@ -405,24 +447,8 @@ var statSnapshotCmd = &cli.Command{ } return nil } - var id int - for w := 0; w < numWorkers; w++ { - id = w - workerWg.Add(1) - eg.Go(func() error { - defer workerWg.Done() - return worker(egctx, id, jobCh, resultCh) - }) - } - // Close result channel when workers are done sending to it. - eg.Go(func() error { - workerWg.Wait() - close(resultCh) - return nil - }) - - eg.Go(func() error { + processResults := func(ctx context.Context, resultCh chan result) error { for result := range resultCh { if stat, ok := summary[result.key]; ok { summary[result.key] = combine(stat, result.stats) @@ -432,45 +458,30 @@ var statSnapshotCmd = &cli.Command{ } } return nil - }) + } - if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to measure space in latest state root: %w", err) + if err := pipeline(ctx, "Latest State Actors", numWorkers, createJobsStage1, worker, processResults); err != nil { + return err } // Stage 2: walk the top of the latest state root - id += 1 - resultCh = make(chan result) - jobCh = make(chan job) - go func() { - defer close(jobCh) + createJobsStage2 := func(ctx context.Context, jobCh chan job, _ chan result) error { jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"} - }() - go func() { - defer close(resultCh) - worker(ctx, id, jobCh, resultCh) - }() - res := <-resultCh - summary[res.key] = res.stats + return nil + } + + if err := pipeline(ctx, "Latest State HAMT", numWorkers, createJobsStage2, worker, processResults); err != nil { + return err + } // Stage 3 walk the rest of the chain: headers, messages, churn // ordering: // for each header send jobs for messages, receipts, state tree churn // don't walk header directly as it would just walk everything including parent tipsets - eg, egctx = errgroup.WithContext(ctx) - jobCh = make(chan job, numWorkers) - resultCh = make(chan result) - var workerWg2 sync.WaitGroup churnStateRoots := cid.NewSet() - workerWg2.Add(1) // goroutine creating jobs also sends to result channel - eg.Go(func() error { - defer func() { - close(jobCh) - workerWg2.Done() - }() + createJobsStage3 := func(ctx context.Context, jobCh chan job, resultCh chan result) error { // walk chain - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) if dagCacheSize != 0 { var err error @@ -482,7 +493,6 @@ var statSnapshotCmd = &cli.Command{ blocksToWalk := ts.Cids() startHeight := ts.Height() - fmt.Printf("starting height %d\n", startHeight) snapshotStateLimit := abi.ChainEpoch(2000) churnActorCache := cid.NewSet() @@ -539,40 +549,26 @@ var statSnapshotCmd = &cli.Command{ } } return nil - }) + } - for w := 0; w < numWorkers; w++ { - id += w - workerWg2.Add(1) - eg.Go(func() error { - defer workerWg2.Done() - return worker(egctx, id, jobCh, resultCh) + if err := pipeline(ctx, "Churn, Headers, Messages", numWorkers, createJobsStage3, worker, processResults); err != nil { + return err + } + + // step 1 clean things up and get a nice abstraction to reuse + // Stage 4 walk all actor HAMTs for churn + + createJobsStage4 := func(ctx context.Context, jobCh chan job, _ chan result) error { + return churnStateRoots.ForEach(func(c cid.Cid) error { + jobCh <- job{c: c, key: "/statetree/churn"} + return nil }) } - // Close result channel when workers are done sending to it. - eg.Go(func() error { - workerWg2.Wait() - close(resultCh) - return nil - }) - - eg.Go(func() error { - for result := range resultCh { - if stat, ok := summary[result.key]; ok { - summary[result.key] = combine(stat, result.stats) - } else { - summary[result.key] = result.stats - } - } - return nil - }) - if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to measure space in snapshot churn: %w", err) + if err := pipeline(ctx, "Churn HAMT", numWorkers, createJobsStage4, worker, processResults); err != nil { + return err } - // Stage 4 walk all actor HAMTs for churn - if cctx.Bool("pretty") { DumpSnapshotStats(summary) } else { From 3897bf14d14ee44a9b5e631049da756af1cf5f2c Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 28 Jun 2023 09:15:38 -0600 Subject: [PATCH 07/12] Cleanup --- cmd/lotus-shed/snapshot.go | 39 ------------------------------- cmd/lotus-shed/stateroot-stats.go | 28 ---------------------- scripts/snapshot-summary.py | 7 +++--- 3 files changed, 4 insertions(+), 70 deletions(-) delete mode 100644 cmd/lotus-shed/snapshot.go diff --git a/cmd/lotus-shed/snapshot.go b/cmd/lotus-shed/snapshot.go deleted file mode 100644 index 3ac3319d8..000000000 --- a/cmd/lotus-shed/snapshot.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "github.com/urfave/cli/v2" -) - -var snapshotCmd = &cli.Command{ - Name: "snapshot", - Description: "interact with filecoin chain snapshots", - Subcommands: []*cli.Command{ - snapshotStatCmd, - }, -} - -var snapshotStatCmd = &cli.Command{ - Name: "stat-actor", - Usage: "display number of snapshot bytes held by provided actor", - ArgsUsage: "[datastore path] [head] [actor address]", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "field", // specify top level actor field to stat - }, - }, - Action: func(cctx *cli.Context) error { - // Initialize in memory graph counter - // Get root tskey - // Walk headers back - - // Count header bytes - // open state, - // if no field set graph count actor HEAD - // if field is set, parse actor head, look for field - // if field not found or not a cid error, otherwise do graph count on the cid - - // Print out stats - - return nil - }, -} diff --git a/cmd/lotus-shed/stateroot-stats.go b/cmd/lotus-shed/stateroot-stats.go index bec37d71f..16dfc5935 100644 --- a/cmd/lotus-shed/stateroot-stats.go +++ b/cmd/lotus-shed/stateroot-stats.go @@ -104,34 +104,6 @@ type statItem struct { Stat api.ObjStat } -// var snapShotStatCmd = &cli.Command{ -// Name: "snapshot-stat", -// Usage: "print statistics of the snapshot rooted at given block", -// Flags: []cli.Flag{ -// &cli.StringFlag{ -// Name: "tipset", -// Usage: "specify tipset to start from", -// }, -// }, -// Action: func(cctx *cli.Context) error { -// api, closer, err := lcli.GetFullNodeAPI(cctx) -// if err != nil { -// return err -// } - -// defer closer() -// ctx := lcli.ReqContext(cctx) - -// ts, err := lcli.LoadTipSet(ctx, cctx, api) -// if err != nil { -// return err -// } - -// api.ChainStatSnapshot(ctx, ts.Key()) - -// }, -// } - var staterootStatCmd = &cli.Command{ Name: "stat", Usage: "print statistics for the stateroot of a given block", diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py index fd64da64f..b06e79a01 100644 --- a/scripts/snapshot-summary.py +++ b/scripts/snapshot-summary.py @@ -4,9 +4,10 @@ import pathlib snapshot_data = json.load(sys.stdin) -# XXX: parameterize to use block count as value instead of byte size -# XXX: parameterize on different types of px chart types -# XXX: parameterize on output port so we can serve this from infra +# Possible extensions: +# 1. parameterize to use block count as value instead of byte size +# 2. parameterize on different types of px chart types +# 3. parameterize on output port so we can serve this from infra parents = [] names = [] From 016661b657f6f12f09d40afce89e68b5e7846498 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 28 Jun 2023 09:17:57 -0600 Subject: [PATCH 08/12] More cleanup --- cmd/lotus-shed/state-stats.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 6378fdc11..d28a612c5 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -649,7 +649,7 @@ to reduce the number of decode operations performed by caching the decoded objec return err } - // log.Infow("tipset", "parentstate", ts.ParentState()) + log.Infow("tipset", "parentstate", ts.ParentState()) if len(addrs) == 0 && cctx.Bool("all") { var err error @@ -670,7 +670,7 @@ to reduce the number of decode operations performed by caching the decoded objec worker := func(ctx context.Context, id int) error { completed := 0 defer func() { - //log.Infow("worker done", "id", id, "completed", completed) + log.Infow("worker done", "id", id, "completed", completed) }() for { @@ -841,7 +841,7 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, } func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { - // log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) + log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) nd, err := dag.Get(ctx, actor.Head) if err != nil { From 3cacbdfa1873f7d8695d8f7dfe222f123c963586 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 28 Jun 2023 09:52:47 -0600 Subject: [PATCH 09/12] Lint fixes --- cmd/lotus-shed/state-stats.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index d28a612c5..49bcb1294 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -7,14 +7,13 @@ import ( "fmt" "io" "net/http" + _ "net/http" + _ "net/http/pprof" "path" "reflect" "sort" "sync" - _ "net/http" - _ "net/http/pprof" - "github.com/docker/go-units" lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-blockservice" @@ -242,7 +241,9 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) } closer := func() { - lr.Close() + if err := lr.Close(); err != nil { + log.Warnf("failed to close locked repo: %s", err) + } if c, ok := bs.(io.Closer); ok { if err := c.Close(); err != nil { log.Warnf("failed to close blockstore: %s", err) @@ -385,15 +386,15 @@ var statSnapshotCmd = &cli.Command{ } // Threadsafe cid set lives across different pipelines so not part of error group - go func() error { + go func() { seen := cid.NewSet() for { select { case call := <-cidCh: call.resp <- seen.Visit(call.c) case <-ctx.Done(): - log.Infof("shutting down cid set goroutine") - return ctx.Err() + log.Infof("shutting down cid set goroutine: %s", ctx.Err()) + return } } }() From d2b2fba799652ecbdf637139dc913a337e9dbdbf Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 28 Jun 2023 10:11:47 -0600 Subject: [PATCH 10/12] Remove debug pprof serving --- cmd/lotus-shed/state-stats.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 49bcb1294..6b898c4d0 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -6,9 +6,6 @@ import ( "encoding/json" "fmt" "io" - "net/http" - _ "net/http" - _ "net/http/pprof" "path" "reflect" "sort" @@ -365,9 +362,6 @@ var statSnapshotCmd = &cli.Command{ if err != nil { return err } - go func() { - fmt.Println(http.ListenAndServe("localhost:6060", nil)) - }() numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") From ab72f2e230cadd11ba940fe5c4dcec1bf40510a4 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 28 Jun 2023 22:35:28 -0600 Subject: [PATCH 11/12] Lint --- cmd/lotus-shed/state-stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 6b898c4d0..865af39ca 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -944,7 +944,7 @@ func DumpStats(actStats actorStats) { func DumpSnapshotStats(stats map[string]api.ObjStat) { // sort keys so we get subkey locality keys := make([]string, 0, len(stats)) - for k, _ := range stats { + for k := range stats { keys = append(keys, k) } sort.Strings(keys) From 3f0ddcc553e62bb062da9acc955777494cfe3525 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Thu, 29 Jun 2023 13:22:16 -0600 Subject: [PATCH 12/12] Cleanup plotting script --- scripts/snapshot-summary.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py index b06e79a01..f37623cd2 100644 --- a/scripts/snapshot-summary.py +++ b/scripts/snapshot-summary.py @@ -25,8 +25,6 @@ for key in snapshot_data: values.append(stats['Size']) data = dict(names=names, parents=parents, values=values) -print(data) fig = px.treemap(data, names='names', parents='parents', values='values') -print(fig) fig.show()