From 8f3123d75b440ab84dc2a48aaf3cdc1b9c408aec Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 27 Jun 2023 00:54:02 -0600 Subject: [PATCH] Churn json output is working --- cmd/lotus-shed/state-stats.go | 134 ++++++++++++++++++++++++++++++++-- scripts/snapshot-summary.py | 28 +++++++ 2 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 scripts/snapshot-summary.py diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index d1e313724..0a7811828 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -323,6 +324,9 @@ var statSnapshotCmd = &cli.Command{ resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) summary := make(map[string]api.ObjStat) + summary["/snapshot"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links + summary["/snapshot/churn"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links + combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ Size: statsA.Size + statsB.Size, @@ -356,21 +360,22 @@ var statSnapshotCmd = &cli.Command{ defer func() { close(jobCh) }() + st, err := h.sm.StateTree(ts.ParentState()) if err != nil { return err } + return st.ForEach(func(_ address.Address, act *types.Actor) error { actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ if actType == "" { actType = act.Code.String() } - - jobCh <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)} + jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/state/%s", actType)} return nil }) - }) worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { @@ -418,7 +423,8 @@ var statSnapshotCmd = &cli.Command{ eg.Go(func() error { for result := range resultCh { if stat, ok := summary[result.key]; ok { - summary[result.key] = combine(stat, summary[result.key]) + summary[result.key] = combine(stat, result.stats) + } else { summary[result.key] = result.stats } @@ -431,13 +437,12 @@ var statSnapshotCmd = &cli.Command{ } // Stage 2: walk the top of the latest state root - id += 1 resultCh = make(chan result) jobCh = make(chan job) go func() { defer close(jobCh) - jobCh <- job{c: ts.ParentState(), key: "statetop"} + jobCh <- job{c: ts.ParentState(), key: "/snapshot/state"} }() go func() { defer close(resultCh) @@ -450,6 +455,119 @@ var statSnapshotCmd = &cli.Command{ // ordering: // for each header send jobs for messages, receipts, state tree churn // don't walk header directly as it would just walk everything including parent tipsets + eg, egctx = errgroup.WithContext(ctx) + jobCh = make(chan job, numWorkers) + resultCh = make(chan result) + var workerWg2 sync.WaitGroup + + churnStateRoots := cid.NewSet() + workerWg2.Add(1) // goroutine creating jobs also sends to result channel + eg.Go(func() error { + defer func() { + close(jobCh) + workerWg2.Done() + }() + // walk chain + + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize) + if err != nil { + return err + } + } + + blocksToWalk := ts.Cids() + startHeight := ts.Height() + fmt.Printf("starting height %d\n", startHeight) + snapshotStateLimit := abi.ChainEpoch(2000) + + churnActorCache := cid.NewSet() + blocksTracked := cid.NewSet() + for len(blocksToWalk) > 0 { + blkCid := blocksToWalk[0] + blocksToWalk = blocksToWalk[1:] + nd, err := dag.Get(ctx, blkCid) + if err != nil { + return xerrors.Errorf("getting block: %w", err) + } + + var b types.BlockHeader + if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil { + return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blkCid, err) + } + + // header directly to result channel + resultCh <- result{key: "snapshot/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}} + // message job + if b.Height > startHeight-snapshotStateLimit { + jobCh <- job{key: "snapshot/messages", c: b.Messages} + } + + // state churn job + if b.Height > startHeight-snapshotStateLimit { + if churnStateRoots.Visit(b.ParentStateRoot) { + st, err := h.sm.StateTree(b.ParentStateRoot) + if err != nil { + return err + } + + err = st.ForEach(func(_ address.Address, act *types.Actor) error { + if churnActorCache.Visit(act.Head) { + actType := builtin.ActorNameByCode(act.Code) + actType = path.Base(actType) // strip away fil/ + if actType == "" { + actType = act.Code.String() + } + jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/churn/%s", actType)} + } + + return nil + }) + if err != nil { + return err + } + } + } + for _, blkCid := range b.Parents { + if blocksTracked.Visit(blkCid) && b.Height != 0 { + blocksToWalk = append(blocksToWalk, blkCid) + } + } + } + return nil + }) + + for w := 0; w < numWorkers; w++ { + id += w + workerWg2.Add(1) + eg.Go(func() error { + defer workerWg2.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + // Close result channel when workers are done sending to it. + eg.Go(func() error { + workerWg2.Wait() + close(resultCh) + return nil + }) + + eg.Go(func() error { + for result := range resultCh { + if stat, ok := summary[result.key]; ok { + summary[result.key] = combine(stat, result.stats) + } else { + summary[result.key] = result.stats + } + } + return nil + }) + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to measure space in snapshot churn: %w", err) + } // Stage 4 walk all actor HAMTs for churn @@ -638,8 +756,8 @@ to reduce the number of decode operations performed by caching the decoded objec } func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) { - // "state" and "churn" require further breakdown by actor type - if path.Dir(in.key) != "state" && path.Dir(in.key) != "churn" { + // "state" and "churn" attempt further breakdown by actor type + if !(path.Dir(in.key) == "/snapshot/state") && !(path.Dir(in.key) == "/snapshot/churn") { dsc := &dagStatCollector{ ds: dag, walk: carWalkFunc, diff --git a/scripts/snapshot-summary.py b/scripts/snapshot-summary.py new file mode 100644 index 000000000..da75ee9a7 --- /dev/null +++ b/scripts/snapshot-summary.py @@ -0,0 +1,28 @@ +import plotly.express as px +import sys, json + +snapshot_data = json.load(sys.stdin) + +# XXX: parameterize to use block count as value instead of byte size +# XXX: parameterize on different types of px chart types +# XXX: parameterize on output port so we can serve this from infra + +parents = [] +names = [] +values = [] + +for key in snapshot_data: + path = key.split('/') + name = path[len(path) - 1] + parent = path[len(path) - 2] + stats = snapshot_data[key] + parents.append(parent) + names.append(name) + values.append(stats['Size']) + +data = dict(names=names, parents=parents, values=values) +print(data) +fig = px.treemap(data, names='names', parents='parents', values='values') +print(fig) +fig.show() +