Merge pull request #11012 from filecoin-project/feat/stat-snapshot

feat:profiling:state summary and visualization
This commit is contained in:
ZenGround0 2023-07-06 16:28:05 -04:00 committed by GitHub
commit 1358d70128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 535 additions and 80 deletions

View File

@ -23,6 +23,7 @@ func main() {
local := []*cli.Command{ local := []*cli.Command{
addressCmd, addressCmd,
statActorCmd, statActorCmd,
statSnapshotCmd,
statObjCmd, statObjCmd,
base64Cmd, base64Cmd,
base32Cmd, base32Cmd,

View File

@ -1,11 +1,14 @@
package main package main
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"path"
"reflect" "reflect"
"sort"
"sync" "sync"
"github.com/docker/go-units" "github.com/docker/go-units"
@ -21,8 +24,12 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
gstactors "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/consensus/filcns"
@ -49,6 +56,19 @@ type fieldItem struct {
Stats api.ObjStat Stats api.ObjStat
} }
type job struct {
c cid.Cid
key string // prefix path for the region being recorded i.e. "/state/mineractor"
}
type cidCall struct {
c cid.Cid
resp chan bool
}
type result struct {
key string
stats api.ObjStat
}
type cacheNodeGetter struct { type cacheNodeGetter struct {
ds format.NodeGetter ds format.NodeGetter
cache *lru.TwoQueueCache[cid.Cid, format.Node] cache *lru.TwoQueueCache[cid.Cid, format.Node]
@ -166,39 +186,13 @@ var statObjCmd = &cli.Command{
return err return err
} }
r, err := repo.NewFS(cctx.String("repo")) h, err := loadChainStore(ctx, cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil { if err != nil {
return err return err
} }
if !exists { defer h.closer()
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode) dag := merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
dsc := &dagStatCollector{ dsc := &dagStatCollector{
ds: dag, ds: dag,
walk: carWalkFunc, walk: carWalkFunc,
@ -212,6 +206,376 @@ var statObjCmd = &cli.Command{
}, },
} }
type StoreHandle struct {
bs blockstore.Blockstore
cs *store.ChainStore
sm *stmgr.StateManager
closer func()
}
func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) {
r, err := repo.NewFS(repoPath)
if err != nil {
return nil, xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return nil, err
}
if !exists {
return nil, xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return nil, err
}
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, fmt.Errorf("failed to open blockstore: %w", err)
}
closer := func() {
if err := lr.Close(); err != nil {
log.Warnf("failed to close locked repo: %s", err)
}
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}
mds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return nil, err
}
cs := store.NewChainStore(bs, bs, mds, nil, nil)
if err := cs.Load(ctx); err != nil {
return nil, fmt.Errorf("failed to load chain store: %w", err)
}
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return nil, fmt.Errorf("failed to open state manager: %w", err)
}
handle := StoreHandle{
bs: bs,
sm: sm,
cs: cs,
closer: closer,
}
return &handle, nil
}
func pipeline(ctx context.Context, name string, numWorkers int, createJobs func(ctx context.Context, jobCh chan job, resultCh chan result) error,
worker func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error,
processResults func(ctx context.Context, resultCh chan result) error) error {
eg, egctx := errgroup.WithContext(ctx)
jobCh := make(chan job, numWorkers)
resultCh := make(chan result)
var resultWriterWg sync.WaitGroup
resultWriterWg.Add(1)
eg.Go(func() error {
defer resultWriterWg.Done()
defer func() {
close(jobCh)
}()
return createJobs(ctx, jobCh, resultCh)
})
var id int
for w := 0; w < numWorkers; w++ {
id = w
resultWriterWg.Add(1)
eg.Go(func() error {
defer resultWriterWg.Done()
return worker(egctx, id, jobCh, resultCh)
})
}
eg.Go(func() error {
return processResults(ctx, resultCh)
})
// close result channel when workers are done sending to it.
eg.Go(func() error {
resultWriterWg.Wait()
close(resultCh)
return nil
})
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed pipeline %s: %w", name, err)
}
return nil
}
var statSnapshotCmd = &cli.Command{
Name: "stat-snapshot",
Usage: "calculates the space usage of a snapshot taken from the given tipset",
Description: `Walk the chain back to lightweight snapshot size and break down space usage into high level
categories: headers, messages, receipts, latest state root, and churn from earlier state roots.
State root and churn space is further broken down by actor type and immediate top level fields
`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
Usage: "specify tipset to call method on (pass comma separated array of cids)",
},
&cli.IntFlag{
Name: "workers",
Usage: "number of workers to use when processing",
Value: 10,
},
&cli.IntFlag{
Name: "dag-cache-size",
Usage: "cache size per worker (setting to 0 disables)",
Value: 8092,
},
&cli.BoolFlag{
Name: "pretty",
Usage: "print formated output instead of ldjson",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
h, err := loadChainStore(ctx, cctx.String("repo"))
if err != nil {
return err
}
defer h.closer()
tsr := &ChainStoreTipSetResolver{
Chain: h.cs,
}
ts, err := lcli.LoadTipSet(ctx, cctx, tsr)
if err != nil {
return err
}
numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size")
cidCh := make(chan cidCall, numWorkers)
summary := make(map[string]api.ObjStat)
// snapshot root objects with no additional bytes or links
summary["/"] = api.ObjStat{Size: 0, Links: 0}
summary["/statetree"] = api.ObjStat{Size: 0, Links: 0}
combine := func(statsA, statsB api.ObjStat) api.ObjStat {
return api.ObjStat{
Size: statsA.Size + statsB.Size,
Links: statsA.Links + statsB.Links,
}
}
// Threadsafe cid set lives across different pipelines so not part of error group
go func() {
seen := cid.NewSet()
for {
select {
case call := <-cidCh:
call.resp <- seen.Visit(call.c)
case <-ctx.Done():
log.Infof("shutting down cid set goroutine: %s", ctx.Err())
return
}
}
}()
visit := func(c cid.Cid) bool {
ch := make(chan bool)
cidCh <- cidCall{c: c, resp: ch}
out := <-ch
return out
}
// Stage 1 walk all actors in latest state root
createJobsStage1 := func(ctx context.Context, jobCh chan job, _ chan result) error {
st, err := h.sm.StateTree(ts.ParentState())
if err != nil {
return err
}
return st.ForEach(func(_ address.Address, act *types.Actor) error {
actType := builtin.ActorNameByCode(act.Code)
actType = path.Base(actType) // strip away fil/<nv>
if actType == "<unknown>" {
actType = act.Code.String()
}
jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/latest/%s", actType)}
return nil
})
}
worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error {
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
if dagCacheSize != 0 {
var err error
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
if err != nil {
return err
}
}
for job := range jobCh {
stats, err := collectSnapshotJobStats(ctx, job, dag, visit)
if err != nil {
return err
}
for _, stat := range stats {
select {
case resultCh <- stat:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}
processResults := func(ctx context.Context, resultCh chan result) error {
for result := range resultCh {
if stat, ok := summary[result.key]; ok {
summary[result.key] = combine(stat, result.stats)
} else {
summary[result.key] = result.stats
}
}
return nil
}
if err := pipeline(ctx, "Latest State Actors", numWorkers, createJobsStage1, worker, processResults); err != nil {
return err
}
// Stage 2: walk the top of the latest state root
createJobsStage2 := func(ctx context.Context, jobCh chan job, _ chan result) error {
jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"}
return nil
}
if err := pipeline(ctx, "Latest State HAMT", numWorkers, createJobsStage2, worker, processResults); err != nil {
return err
}
// Stage 3 walk the rest of the chain: headers, messages, churn
// ordering:
// for each header send jobs for messages, receipts, state tree churn
// don't walk header directly as it would just walk everything including parent tipsets
churnStateRoots := cid.NewSet()
createJobsStage3 := func(ctx context.Context, jobCh chan job, resultCh chan result) error {
// walk chain
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
if dagCacheSize != 0 {
var err error
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
if err != nil {
return err
}
}
blocksToWalk := ts.Cids()
startHeight := ts.Height()
snapshotStateLimit := abi.ChainEpoch(2000)
churnActorCache := cid.NewSet()
blocksTracked := cid.NewSet()
for len(blocksToWalk) > 0 {
blkCid := blocksToWalk[0]
blocksToWalk = blocksToWalk[1:]
nd, err := dag.Get(ctx, blkCid)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blkCid, err)
}
// header directly to result channel
resultCh <- result{key: "/headers", stats: api.ObjStat{Size: uint64(len(nd.RawData())), Links: uint64(len(nd.Links()))}}
// message job
if b.Height > startHeight-snapshotStateLimit {
jobCh <- job{key: "/messages", c: b.Messages}
}
// state churn job
if b.Height > startHeight-snapshotStateLimit {
if churnStateRoots.Visit(b.ParentStateRoot) {
st, err := h.sm.StateTree(b.ParentStateRoot)
if err != nil {
return err
}
err = st.ForEach(func(_ address.Address, act *types.Actor) error {
if churnActorCache.Visit(act.Head) {
actType := builtin.ActorNameByCode(act.Code)
actType = path.Base(actType) // strip away fil/<nv>
if actType == "<unknown>" {
actType = act.Code.String()
}
jobCh <- job{c: act.Head, key: fmt.Sprintf("/statetree/churn/%s", actType)}
}
return nil
})
if err != nil {
return err
}
}
}
for _, blkCid := range b.Parents {
if blocksTracked.Visit(blkCid) && b.Height != 0 {
blocksToWalk = append(blocksToWalk, blkCid)
}
}
}
return nil
}
if err := pipeline(ctx, "Churn, Headers, Messages", numWorkers, createJobsStage3, worker, processResults); err != nil {
return err
}
// step 1 clean things up and get a nice abstraction to reuse
// Stage 4 walk all actor HAMTs for churn
createJobsStage4 := func(ctx context.Context, jobCh chan job, _ chan result) error {
return churnStateRoots.ForEach(func(c cid.Cid) error {
jobCh <- job{c: c, key: "/statetree/churn"}
return nil
})
}
if err := pipeline(ctx, "Churn HAMT", numWorkers, createJobsStage4, worker, processResults); err != nil {
return err
}
if cctx.Bool("pretty") {
DumpSnapshotStats(summary)
} else {
if err := DumpJSON(summary); err != nil {
return err
}
}
return nil
},
}
var statActorCmd = &cli.Command{ var statActorCmd = &cli.Command{
Name: "stat-actor", Name: "stat-actor",
Usage: "calculates the size of actors and their immeidate structures", Usage: "calculates the size of actors and their immeidate structures",
@ -265,57 +629,14 @@ to reduce the number of decode operations performed by caching the decoded objec
addrs = append(addrs, addr) addrs = append(addrs, addr)
} }
} }
h, err := loadChainStore(ctx, cctx.String("repo"))
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}
cs := store.NewChainStore(bs, bs, mds, nil, nil)
if err := cs.Load(ctx); err != nil {
return nil
}
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil { if err != nil {
return err return err
} }
defer h.closer()
tsr := &ChainStoreTipSetResolver{ tsr := &ChainStoreTipSetResolver{
Chain: cs, Chain: h.cs,
} }
ts, err := lcli.LoadTipSet(ctx, cctx, tsr) ts, err := lcli.LoadTipSet(ctx, cctx, tsr)
@ -327,7 +648,7 @@ to reduce the number of decode operations performed by caching the decoded objec
if len(addrs) == 0 && cctx.Bool("all") { if len(addrs) == 0 && cctx.Bool("all") {
var err error var err error
addrs, err = sm.ListAllActors(ctx, ts) addrs, err = h.sm.ListAllActors(ctx, ts)
if err != nil { if err != nil {
return err return err
} }
@ -354,15 +675,15 @@ to reduce the number of decode operations performed by caching the decoded objec
return nil return nil
} }
actor, err := sm.LoadActor(ctx, addr, ts) actor, err := h.sm.LoadActor(ctx, addr, ts)
if err != nil { if err != nil {
return err return err
} }
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
if dagCacheSize != 0 { if dagCacheSize != 0 {
var err error var err error
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize) dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
if err != nil { if err != nil {
return err return err
} }
@ -427,6 +748,93 @@ to reduce the number of decode operations performed by caching the decoded objec
}, },
} }
func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, visit func(c cid.Cid) bool) ([]result, error) {
// "state" and "churn" attempt further breakdown by actor type
if !(path.Dir(in.key) == "/statetree/latest") && !(path.Dir(in.key) == "/statetree/churn") {
dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
}
if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil {
return nil, err
}
return []result{{key: in.key, stats: dsc.stats}}, nil
}
// in.c is an actor head cid, try to unmarshal and create sub keys for different regions of state
nd, err := dag.Get(ctx, in.c)
if err != nil {
return nil, err
}
subjobs := make([]job, 0)
results := make([]result, 0)
// reconstruct actor for state parsing from key
av, err := gstactors.VersionForNetwork(network.Version20)
if err != nil {
return nil, fmt.Errorf("failed to get actors version for network: %w", err)
}
code, ok := actors.GetActorCodeID(av, path.Base(in.key))
if !ok { // try parsing key directly
code, err = cid.Parse(path.Base(in.key))
if err != nil {
log.Debugf("failing to parse actor string: %s", path.Base(in.key))
}
}
actor := types.ActorV5{Head: in.c, Code: code}
oif, err := vm.DumpActorState(consensus.NewTipSetExecutor(filcns.RewardFunc).NewActorRegistry(), &actor, nd.RawData())
if err != nil {
oif = nil
}
// Account actors return nil from DumpActorState as they have no state
if oif != nil {
v := reflect.Indirect(reflect.ValueOf(oif))
for i := 0; i < v.NumField(); i++ {
varName := v.Type().Field(i).Name
varType := v.Type().Field(i).Type
varValue := v.Field(i).Interface()
if varType == reflect.TypeOf(cid.Cid{}) {
subjobs = append(subjobs, job{
key: fmt.Sprintf("%s/%s", in.key, varName),
c: varValue.(cid.Cid),
})
}
}
}
// Walk subfields
for _, job := range subjobs {
dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
}
if err := merkledag.Walk(ctx, dsc.walkLinks, job.c, visit, merkledag.Concurrent()); err != nil {
return nil, err
}
var res result
res.key = job.key
res.stats = dsc.stats
results = append(results, res)
}
// now walk the top level object of actor state
dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
}
if err := merkledag.Walk(ctx, dsc.walkLinks, in.c, visit, merkledag.Concurrent()); err != nil {
return nil, err
}
results = append(results, result{key: in.key, stats: dsc.stats})
return results, nil
}
func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))
@ -532,3 +940,19 @@ func DumpStats(actStats actorStats) {
fmt.Println("--------------------------------------------------------------------------") fmt.Println("--------------------------------------------------------------------------")
} }
func DumpSnapshotStats(stats map[string]api.ObjStat) {
// sort keys so we get subkey locality
keys := make([]string, 0, len(stats))
for k := range stats {
keys = append(keys, k)
}
sort.Strings(keys)
fmt.Printf("%-*s%-*s%-*s\n", 32, "Path", 24, "Size", 24, "\"Blocks\"")
for _, k := range keys {
stat := stats[k]
sizeStr := units.BytesSize(float64(stat.Size))
fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links)
}
}

View File

@ -197,7 +197,7 @@ var staterootStatCmd = &cli.Command{
return err return err
} }
fmt.Printf("%s\t%s\t%d\n", inf.Addr, string(cmh.Digest), inf.Stat.Size) fmt.Printf("%s\t%x\t%d\n", inf.Addr, cmh.Digest, inf.Stat.Size)
} }
return nil return nil
}, },

View File

@ -0,0 +1,30 @@
import plotly.express as px
import sys, json
import pathlib
snapshot_data = json.load(sys.stdin)
# Possible extensions:
# 1. parameterize to use block count as value instead of byte size
# 2. parameterize on different types of px chart types
# 3. parameterize on output port so we can serve this from infra
parents = []
names = []
values = []
for key in snapshot_data:
path = pathlib.Path(key)
name = key
parent = str(path.parent)
if key == '/':
parent = ''
stats = snapshot_data[key]
parents.append(parent)
names.append(name)
values.append(stats['Size'])
data = dict(names=names, parents=parents, values=values)
fig = px.treemap(data, names='names', parents='parents', values='values')
fig.show()