Churn json output is working
This commit is contained in:
parent
36a88f45fc
commit
8f3123d75b
@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -323,6 +324,9 @@ var statSnapshotCmd = &cli.Command{
|
||||
resultCh := make(chan result)
|
||||
cidCh := make(chan cidCall, numWorkers)
|
||||
summary := make(map[string]api.ObjStat)
|
||||
summary["/snapshot"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links
|
||||
summary["/snapshot/churn"] = api.ObjStat{Size: 0, Links: 0} // snapshot root object has no additional bytes or links
|
||||
|
||||
combine := func(statsA, statsB api.ObjStat) api.ObjStat {
|
||||
return api.ObjStat{
|
||||
Size: statsA.Size + statsB.Size,
|
||||
@ -356,21 +360,22 @@ var statSnapshotCmd = &cli.Command{
|
||||
defer func() {
|
||||
close(jobCh)
|
||||
}()
|
||||
|
||||
st, err := h.sm.StateTree(ts.ParentState())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return st.ForEach(func(_ address.Address, act *types.Actor) error {
|
||||
actType := builtin.ActorNameByCode(act.Code)
|
||||
actType = path.Base(actType) // strip away fil/<nv>
|
||||
if actType == "<unknown>" {
|
||||
actType = act.Code.String()
|
||||
}
|
||||
|
||||
jobCh <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)}
|
||||
jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/state/%s", actType)}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error {
|
||||
@ -418,7 +423,8 @@ var statSnapshotCmd = &cli.Command{
|
||||
eg.Go(func() error {
|
||||
for result := range resultCh {
|
||||
if stat, ok := summary[result.key]; ok {
|
||||
summary[result.key] = combine(stat, summary[result.key])
|
||||
summary[result.key] = combine(stat, result.stats)
|
||||
|
||||
} else {
|
||||
summary[result.key] = result.stats
|
||||
}
|
||||
@ -431,13 +437,12 @@ var statSnapshotCmd = &cli.Command{
|
||||
}
|
||||
|
||||
// Stage 2: walk the top of the latest state root
|
||||
|
||||
id += 1
|
||||
resultCh = make(chan result)
|
||||
jobCh = make(chan job)
|
||||
go func() {
|
||||
defer close(jobCh)
|
||||
jobCh <- job{c: ts.ParentState(), key: "statetop"}
|
||||
jobCh <- job{c: ts.ParentState(), key: "/snapshot/state"}
|
||||
}()
|
||||
go func() {
|
||||
defer close(resultCh)
|
||||
@ -450,6 +455,119 @@ var statSnapshotCmd = &cli.Command{
|
||||
// ordering:
|
||||
// for each header send jobs for messages, receipts, state tree churn
|
||||
// don't walk header directly as it would just walk everything including parent tipsets
|
||||
eg, egctx = errgroup.WithContext(ctx)
|
||||
jobCh = make(chan job, numWorkers)
|
||||
resultCh = make(chan result)
|
||||
var workerWg2 sync.WaitGroup
|
||||
|
||||
churnStateRoots := cid.NewSet()
|
||||
workerWg2.Add(1) // goroutine creating jobs also sends to result channel
|
||||
eg.Go(func() error {
|
||||
defer func() {
|
||||
close(jobCh)
|
||||
workerWg2.Done()
|
||||
}()
|
||||
// walk chain
|
||||
|
||||
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
|
||||
if dagCacheSize != 0 {
|
||||
var err error
|
||||
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
blocksToWalk := ts.Cids()
|
||||
startHeight := ts.Height()
|
||||
fmt.Printf("starting height %d\n", startHeight)
|
||||
snapshotStateLimit := abi.ChainEpoch(2000)
|
||||
|
||||
churnActorCache := cid.NewSet()
|
||||
blocksTracked := cid.NewSet()
|
||||
for len(blocksToWalk) > 0 {
|
||||
blkCid := blocksToWalk[0]
|
||||
blocksToWalk = blocksToWalk[1:]
|
||||
nd, err := dag.Get(ctx, blkCid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting block: %w", err)
|
||||
}
|
||||
|
||||
var b types.BlockHeader
|
||||
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil {
|
||||
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blkCid, err)
|
||||
}
|
||||
|
||||
// header directly to result channel
|
||||
resultCh <- result{key: "snapshot/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}}
|
||||
// message job
|
||||
if b.Height > startHeight-snapshotStateLimit {
|
||||
jobCh <- job{key: "snapshot/messages", c: b.Messages}
|
||||
}
|
||||
|
||||
// state churn job
|
||||
if b.Height > startHeight-snapshotStateLimit {
|
||||
if churnStateRoots.Visit(b.ParentStateRoot) {
|
||||
st, err := h.sm.StateTree(b.ParentStateRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = st.ForEach(func(_ address.Address, act *types.Actor) error {
|
||||
if churnActorCache.Visit(act.Head) {
|
||||
actType := builtin.ActorNameByCode(act.Code)
|
||||
actType = path.Base(actType) // strip away fil/<nv>
|
||||
if actType == "<unknown>" {
|
||||
actType = act.Code.String()
|
||||
}
|
||||
jobCh <- job{c: act.Head, key: fmt.Sprintf("/snapshot/churn/%s", actType)}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, blkCid := range b.Parents {
|
||||
if blocksTracked.Visit(blkCid) && b.Height != 0 {
|
||||
blocksToWalk = append(blocksToWalk, blkCid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
for w := 0; w < numWorkers; w++ {
|
||||
id += w
|
||||
workerWg2.Add(1)
|
||||
eg.Go(func() error {
|
||||
defer workerWg2.Done()
|
||||
return worker(egctx, id, jobCh, resultCh)
|
||||
})
|
||||
}
|
||||
|
||||
// Close result channel when workers are done sending to it.
|
||||
eg.Go(func() error {
|
||||
workerWg2.Wait()
|
||||
close(resultCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
for result := range resultCh {
|
||||
if stat, ok := summary[result.key]; ok {
|
||||
summary[result.key] = combine(stat, result.stats)
|
||||
} else {
|
||||
summary[result.key] = result.stats
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return fmt.Errorf("failed to measure space in snapshot churn: %w", err)
|
||||
}
|
||||
|
||||
// Stage 4 walk all actor HAMTs for churn
|
||||
|
||||
@ -638,8 +756,8 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
}
|
||||
|
||||
func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) {
|
||||
// "state" and "churn" require further breakdown by actor type
|
||||
if path.Dir(in.key) != "state" && path.Dir(in.key) != "churn" {
|
||||
// "state" and "churn" attempt further breakdown by actor type
|
||||
if !(path.Dir(in.key) == "/snapshot/state") && !(path.Dir(in.key) == "/snapshot/churn") {
|
||||
dsc := &dagStatCollector{
|
||||
ds: dag,
|
||||
walk: carWalkFunc,
|
||||
|
28
scripts/snapshot-summary.py
Normal file
28
scripts/snapshot-summary.py
Normal file
@ -0,0 +1,28 @@
|
||||
import plotly.express as px
|
||||
import sys, json
|
||||
|
||||
snapshot_data = json.load(sys.stdin)
|
||||
|
||||
# XXX: parameterize to use block count as value instead of byte size
|
||||
# XXX: parameterize on different types of px chart types
|
||||
# XXX: parameterize on output port so we can serve this from infra
|
||||
|
||||
parents = []
|
||||
names = []
|
||||
values = []
|
||||
|
||||
for key in snapshot_data:
|
||||
path = key.split('/')
|
||||
name = path[len(path) - 1]
|
||||
parent = path[len(path) - 2]
|
||||
stats = snapshot_data[key]
|
||||
parents.append(parent)
|
||||
names.append(name)
|
||||
values.append(stats['Size'])
|
||||
|
||||
data = dict(names=names, parents=parents, values=values)
|
||||
print(data)
|
||||
fig = px.treemap(data, names='names', parents='parents', values='values')
|
||||
print(fig)
|
||||
fig.show()
|
||||
|
Loading…
Reference in New Issue
Block a user