WIP
This commit is contained in:
parent
517c0a53b1
commit
d599833b64
39
cmd/lotus-shed/snapshot.go
Normal file
39
cmd/lotus-shed/snapshot.go
Normal file
@ -0,0 +1,39 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var snapshotCmd = &cli.Command{
|
||||
Name: "snapshot",
|
||||
Description: "interact with filecoin chain snapshots",
|
||||
Subcommands: []*cli.Command{
|
||||
snapshotStatCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var snapshotStatCmd = &cli.Command{
|
||||
Name: "stat-actor",
|
||||
Usage: "display number of snapshot bytes held by provided actor",
|
||||
ArgsUsage: "[datastore path] [head] [actor address]",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "field", // specify top level actor field to stat
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
// Initialize in memory graph counter
|
||||
// Get root tskey
|
||||
// Walk headers back
|
||||
|
||||
// Count header bytes
|
||||
// open state,
|
||||
// if no field set graph count actor HEAD
|
||||
// if field is set, parse actor head, look for field
|
||||
// if field not found or not a cid error, otherwise do graph count on the cid
|
||||
|
||||
// Print out stats
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
@ -23,6 +24,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
"github.com/filecoin-project/lotus/chain/consensus"
|
||||
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
||||
@ -166,39 +168,13 @@ var statObjCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := repo.NewFS(cctx.String("repo"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("opening fs repo: %w", err)
|
||||
}
|
||||
|
||||
exists, err := r.Exists()
|
||||
h, err := loadChainStore(ctx, cctx.String("repo"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return xerrors.Errorf("lotus repo doesn't exist")
|
||||
}
|
||||
defer h.closer()
|
||||
|
||||
lr, err := r.Lock(repo.FullNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer lr.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if c, ok := bs.(io.Closer); ok {
|
||||
if err := c.Close(); err != nil {
|
||||
log.Warnf("failed to close blockstore: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
||||
dag := merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
|
||||
dsc := &dagStatCollector{
|
||||
ds: dag,
|
||||
walk: carWalkFunc,
|
||||
@ -212,6 +188,258 @@ var statObjCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
type StoreHandle struct {
|
||||
bs blockstore.Blockstore
|
||||
cs *store.ChainStore
|
||||
sm *stmgr.StateManager
|
||||
closer func()
|
||||
}
|
||||
|
||||
func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) {
|
||||
r, err := repo.NewFS(repoPath)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("opening fs repo: %w", err)
|
||||
}
|
||||
|
||||
exists, err := r.Exists()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, xerrors.Errorf("lotus repo doesn't exist")
|
||||
}
|
||||
|
||||
lr, err := r.Lock(repo.FullNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer lr.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
||||
closer := func() {
|
||||
if c, ok := bs.(io.Closer); ok {
|
||||
if err := c.Close(); err != nil {
|
||||
log.Warnf("failed to close blockstore: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mds, err := lr.Datastore(context.Background(), "/metadata")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||
if err := cs.Load(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to load chain store: %w", err)
|
||||
}
|
||||
|
||||
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
|
||||
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open state manager: %w", err)
|
||||
}
|
||||
handle := StoreHandle{
|
||||
bs: bs,
|
||||
sm: sm,
|
||||
cs: cs,
|
||||
closer: closer,
|
||||
}
|
||||
|
||||
return &handle, nil
|
||||
}
|
||||
|
||||
var statSnapshotCmd = &cli.Command{
|
||||
Name: "stat-snapshot",
|
||||
Usage: "calculates the space usage of a snapshot taken from the given tipset",
|
||||
Description: `Walk the chain back to lightweight snapshot size and break down space usage into high level
|
||||
categories: headers, messages, receipts, latest state root, and churn from earlier state roots.
|
||||
State root and churn space is further broken down by actor type and immediate top level fields
|
||||
`,
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "tipset",
|
||||
Usage: "specify tipset to call method on (pass comma separated array of cids)",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "workers",
|
||||
Usage: "number of workers to use when processing",
|
||||
Value: 10,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "dag-cache-size",
|
||||
Usage: "cache size per worker (setting to 0 disables)",
|
||||
Value: 8092,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "pretty",
|
||||
Usage: "print formated output instead of ldjson",
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
h, err := loadChainStore(ctx, cctx.String("repo"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer h.closer()
|
||||
tsr := &ChainStoreTipSetResolver{
|
||||
Chain: h.cs,
|
||||
}
|
||||
|
||||
ts, err := lcli.LoadTipSet(ctx, cctx, tsr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
type job struct {
|
||||
c cid.Cid
|
||||
key string // prefix path for the region being recorded i.e. "/state/mineractor"
|
||||
}
|
||||
type cidCall struct {
|
||||
c cid.Cid
|
||||
resp chan bool
|
||||
}
|
||||
type result struct {
|
||||
key string
|
||||
stats api.ObjStat
|
||||
}
|
||||
// Stage 1 walk the latest state root
|
||||
// A) walk all actors
|
||||
// B) when done walk the actor HAMT
|
||||
|
||||
st, err := h.sm.StateTree(ts.ParentState())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
numWorkers := cctx.Int("workers")
|
||||
dagCacheSize := cctx.Int("dag-cache-size")
|
||||
|
||||
eg, egctx := errgroup.WithContext(ctx)
|
||||
|
||||
jobs := make(chan job, numWorkers)
|
||||
results := make(chan result)
|
||||
cidCh := make(chan cidCall, numWorkers)
|
||||
|
||||
go func() {
|
||||
seen := cid.NewSet()
|
||||
|
||||
select {
|
||||
case call := <-cidCh:
|
||||
call.resp <- seen.Visit(call.c)
|
||||
case <-ctx.Done():
|
||||
log.Infof("shutting down cid set goroutine")
|
||||
close(cidCh)
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
worker := func(ctx context.Context, id int) error {
|
||||
defer func() {
|
||||
//log.Infow("worker done", "id", id, "completed", completed)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-jobs:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
|
||||
if dagCacheSize != 0 {
|
||||
var err error
|
||||
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
stats := snapshotJobStats(job, dag, cidCh)
|
||||
for _, stat := range stats {
|
||||
select {
|
||||
case results <- stat:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
st.ForEach(func(_ address.Address, act *types.Actor) error {
|
||||
actType := builtin.ActorNameByCode(act.Code)
|
||||
if actType == "<unknown>" {
|
||||
actType = act.Code.String()
|
||||
}
|
||||
|
||||
jobs <- job{c: act.Head, key: fmt.Sprintf("/state/%s", actType)}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
}()
|
||||
|
||||
for w := 0; w < numWorkers; w++ {
|
||||
id := w
|
||||
eg.Go(func() error {
|
||||
return worker(egctx, id)
|
||||
})
|
||||
}
|
||||
|
||||
summary := make(map[string]api.ObjStat)
|
||||
combine := func(statsA, statsB api.ObjStat) api.ObjStat {
|
||||
return api.ObjStat{
|
||||
Size: statsA.Size + statsB.Size,
|
||||
Links: statsA.Links + statsB.Links,
|
||||
}
|
||||
}
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case result, ok := <-results:
|
||||
if !ok {
|
||||
return eg.Wait()
|
||||
}
|
||||
if stat, ok := summary[result.key]; ok {
|
||||
summary[result.key] = combine(stat, summary[result.key])
|
||||
} else {
|
||||
summary[result.key] = result.stats
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
// XXX walk the top of the state tree
|
||||
|
||||
// Stage 2 walk the rest of the chain: headers, messages, churn
|
||||
// ordering:
|
||||
// A) for each header send jobs for messages, receipts, state tree churn
|
||||
// don't walk header directly as it would just walk everything including parent tipsets
|
||||
// B) when done walk all actor HAMTs for churn
|
||||
|
||||
if cctx.Bool("pretty") {
|
||||
DumpSnapshotStats(summary)
|
||||
} else {
|
||||
if err := DumpJSON(summary); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var statActorCmd = &cli.Command{
|
||||
Name: "stat-actor",
|
||||
Usage: "calculates the size of actors and their immeidate structures",
|
||||
@ -265,57 +493,14 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
}
|
||||
|
||||
r, err := repo.NewFS(cctx.String("repo"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("opening fs repo: %w", err)
|
||||
}
|
||||
|
||||
exists, err := r.Exists()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return xerrors.Errorf("lotus repo doesn't exist")
|
||||
}
|
||||
|
||||
lr, err := r.Lock(repo.FullNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer lr.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if c, ok := bs.(io.Closer); ok {
|
||||
if err := c.Close(); err != nil {
|
||||
log.Warnf("failed to close blockstore: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
mds, err := lr.Datastore(context.Background(), "/metadata")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||
if err := cs.Load(ctx); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
|
||||
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
|
||||
h, err := loadChainStore(ctx, cctx.String("repo"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer h.closer()
|
||||
|
||||
tsr := &ChainStoreTipSetResolver{
|
||||
Chain: cs,
|
||||
Chain: h.cs,
|
||||
}
|
||||
|
||||
ts, err := lcli.LoadTipSet(ctx, cctx, tsr)
|
||||
@ -323,11 +508,11 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infow("tipset", "parentstate", ts.ParentState())
|
||||
// log.Infow("tipset", "parentstate", ts.ParentState())
|
||||
|
||||
if len(addrs) == 0 && cctx.Bool("all") {
|
||||
var err error
|
||||
addrs, err = sm.ListAllActors(ctx, ts)
|
||||
addrs, err = h.sm.ListAllActors(ctx, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -344,7 +529,7 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
worker := func(ctx context.Context, id int) error {
|
||||
completed := 0
|
||||
defer func() {
|
||||
log.Infow("worker done", "id", id, "completed", completed)
|
||||
//log.Infow("worker done", "id", id, "completed", completed)
|
||||
}()
|
||||
|
||||
for {
|
||||
@ -354,15 +539,15 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
return nil
|
||||
}
|
||||
|
||||
actor, err := sm.LoadActor(ctx, addr, ts)
|
||||
actor, err := h.sm.LoadActor(ctx, addr, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
||||
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
|
||||
if dagCacheSize != 0 {
|
||||
var err error
|
||||
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize)
|
||||
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))), dagCacheSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -428,7 +613,7 @@ to reduce the number of decode operations performed by caching the decoded objec
|
||||
}
|
||||
|
||||
func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
|
||||
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))
|
||||
// log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))
|
||||
|
||||
nd, err := dag.Get(ctx, actor.Head)
|
||||
if err != nil {
|
||||
@ -532,3 +717,19 @@ func DumpStats(actStats actorStats) {
|
||||
|
||||
fmt.Println("--------------------------------------------------------------------------")
|
||||
}
|
||||
|
||||
func DumpSnapshotStats(stats map[string]api.ObjStat) {
|
||||
// sort keys so we get subkey locality
|
||||
keys := make([]string, 0, len(stats))
|
||||
for k, _ := range stats {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
fmt.Printf("%-*s%-*s%-*s\n", 32, "Path", 24, "Size", 24, "\"Blocks\"")
|
||||
for _, k := range keys {
|
||||
stat := stats[k]
|
||||
sizeStr := units.BytesSize(float64(stat.Size))
|
||||
fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links)
|
||||
}
|
||||
}
|
||||
|
@ -104,6 +104,34 @@ type statItem struct {
|
||||
Stat api.ObjStat
|
||||
}
|
||||
|
||||
// var snapShotStatCmd = &cli.Command{
|
||||
// Name: "snapshot-stat",
|
||||
// Usage: "print statistics of the snapshot rooted at given block",
|
||||
// Flags: []cli.Flag{
|
||||
// &cli.StringFlag{
|
||||
// Name: "tipset",
|
||||
// Usage: "specify tipset to start from",
|
||||
// },
|
||||
// },
|
||||
// Action: func(cctx *cli.Context) error {
|
||||
// api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// defer closer()
|
||||
// ctx := lcli.ReqContext(cctx)
|
||||
|
||||
// ts, err := lcli.LoadTipSet(ctx, cctx, api)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// api.ChainStatSnapshot(ctx, ts.Key())
|
||||
|
||||
// },
|
||||
// }
|
||||
|
||||
var staterootStatCmd = &cli.Command{
|
||||
Name: "stat",
|
||||
Usage: "print statistics for the stateroot of a given block",
|
||||
@ -197,7 +225,7 @@ var staterootStatCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("%s\t%s\t%d\n", inf.Addr, string(cmh.Digest), inf.Stat.Size)
|
||||
fmt.Printf("%s\t%x\t%d\n", inf.Addr, cmh.Digest, inf.Stat.Size)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user