diff --git a/cli/state.go b/cli/state.go index 85d8f26e7..a29253dfc 100644 --- a/cli/state.go +++ b/cli/state.go @@ -252,10 +252,16 @@ func ParseTipSetString(ts string) ([]cid.Cid, error) { return cids, nil } +type TipSetResolver interface { + ChainHead(context.Context) (*types.TipSet, error) + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) +} + // LoadTipSet gets the tipset from the context, or the head from the API. // // It always gets the head from the API so commands use a consistent tipset even if time pases. -func LoadTipSet(ctx context.Context, cctx *cli.Context, api v0api.FullNode) (*types.TipSet, error) { +func LoadTipSet(ctx context.Context, cctx *cli.Context, api TipSetResolver) (*types.TipSet, error) { tss := cctx.String("tipset") if tss == "" { return api.ChainHead(ctx) @@ -264,7 +270,7 @@ func LoadTipSet(ctx context.Context, cctx *cli.Context, api v0api.FullNode) (*ty return ParseTipSetRef(ctx, api, tss) } -func ParseTipSetRef(ctx context.Context, api v0api.FullNode, tss string) (*types.TipSet, error) { +func ParseTipSetRef(ctx context.Context, api TipSetResolver, tss string) (*types.TipSet, error) { if tss[0] == '@' { if tss == "@head" { return api.ChainHead(ctx) diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 6f84739fa..a8ced92f8 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -20,6 +20,8 @@ func main() { local := []*cli.Command{ addressCmd, + statActorCmd, + statObjCmd, base64Cmd, base32Cmd, base16Cmd, diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go new file mode 100644 index 000000000..8ec4a0ff4 --- /dev/null +++ b/cmd/lotus-shed/state-stats.go @@ -0,0 +1,533 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "reflect" + "sync" + + "github.com/docker/go-units" + lru "github.com/hashicorp/golang-lru" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + offline "github.com/ipfs/go-ipfs-exchange-offline" + format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/consensus" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" +) + +type actorStats struct { + Address address.Address + Actor *types.Actor + Fields []fieldItem + Stats api.ObjStat +} + +type fieldItem struct { + Name string + Cid cid.Cid + Stats api.ObjStat +} + +type cacheNodeGetter struct { + ds format.NodeGetter + cache *lru.TwoQueueCache +} + +func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error) { + cng := &cacheNodeGetter{ds: d} + + cache, err := lru.New2Q(size) + if err != nil { + return nil, err + } + + cng.cache = cache + + return cng, nil +} + +func (cng *cacheNodeGetter) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + if n, ok := cng.cache.Get(c); ok { + return n.(format.Node), nil + } + + n, err := cng.ds.Get(ctx, c) + if err != nil { + return nil, err + } + + cng.cache.Add(c, n) + + return n, nil +} + +func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan *format.NodeOption { + out := make(chan *format.NodeOption, len(list)) + go func() { + for _, c := range list { + n, err := cng.Get(ctx, c) + if err != nil { + out <- &format.NodeOption{Err: err} + continue + } + + out <- &format.NodeOption{Node: n} + } + }() + + return out +} + +type dagStatCollector struct { + ds format.NodeGetter + walk func(format.Node) ([]*format.Link, error) + + statsLk sync.Mutex + stats api.ObjStat +} + +func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error { + size, err := nd.Size() + if err != nil { + return err + } + + dsc.statsLk.Lock() + defer dsc.statsLk.Unlock() + + dsc.stats.Size = dsc.stats.Size + size + dsc.stats.Links = dsc.stats.Links + 1 + + return nil +} + +func (dsc *dagStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) { + nd, err := dsc.ds.Get(ctx, c) + if err != nil { + return nil, err + } + + if err := dsc.record(ctx, nd); err != nil { + return nil, err + } + + return dsc.walk(nd) +} + +type ChainStoreTipSetResolver struct { + Chain *store.ChainStore +} + +func (tsr *ChainStoreTipSetResolver) ChainHead(ctx context.Context) (*types.TipSet, error) { + return tsr.Chain.GetHeaviestTipSet(), nil +} + +func (tsr *ChainStoreTipSetResolver) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + ts, err := tsr.Chain.GetTipSetFromKey(ctx, tsk) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + return tsr.Chain.GetTipsetByHeight(ctx, h, ts, true) +} +func (tsr *ChainStoreTipSetResolver) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + return tsr.Chain.LoadTipSet(ctx, tsk) +} + +var statObjCmd = &cli.Command{ + Name: "stat-obj", + Usage: "calculates the size of any DAG in the blockstore", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + c, err := cid.Parse(cctx.Args().First()) + if err != nil { + return err + } + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return fmt.Errorf("failed to open blockstore: %w", err) + } + + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, c, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return err + } + + return DumpJSON(dsc.stats) + }, +} + +var statActorCmd = &cli.Command{ + Name: "stat-actor", + Usage: "calculates the size of actors and their immeidate structures", + Description: `Any DAG linked by the actor object (field) will have its size calculated independently of all +other linked DAG. If an actor has two fields containing links to the same DAG the structure size will be counted +twice, included in each fields size individually. + +The top level stats reported for an actor is computed independently of all fields and is a more accurate +accounting of the true size of the actor in the state datastore. + +The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used +to reduce the number of decode operations performed by caching the decoded object after first access.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "specify tipset to call method on (pass comma separated array of cids)", + }, + &cli.IntFlag{ + Name: "workers", + Usage: "number of workers to use when processing", + Value: 10, + }, + &cli.IntFlag{ + Name: "dag-cache-size", + Usage: "cache size per worker (setting to 0 disables)", + Value: 8092, + }, + &cli.BoolFlag{ + Name: "all", + Usage: "process all actors in stateroot of tipset", + Value: false, + }, + &cli.BoolFlag{ + Name: "pretty", + Usage: "print formated output instead of ldjson", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + var addrs []address.Address + + if !cctx.Bool("all") { + for _, a := range cctx.Args().Slice() { + addr, err := address.NewFromString(a) + if err != nil { + return err + } + + addrs = append(addrs, addr) + } + } + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return fmt.Errorf("failed to open blockstore: %w", err) + } + + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + mds, err := lr.Datastore(context.Background(), "/metadata") + if err != nil { + return err + } + + cs := store.NewChainStore(bs, bs, mds, nil, nil) + if err := cs.Load(ctx); err != nil { + return nil + } + + tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc) + sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil) + if err != nil { + return err + } + + tsr := &ChainStoreTipSetResolver{ + Chain: cs, + } + + ts, err := lcli.LoadTipSet(ctx, cctx, tsr) + if err != nil { + return err + } + + log.Infow("tipset", "parentstate", ts.ParentState()) + + if len(addrs) == 0 && cctx.Bool("all") { + var err error + addrs, err = sm.ListAllActors(ctx, ts) + if err != nil { + return err + } + } + + numWorkers := cctx.Int("workers") + dagCacheSize := cctx.Int("dag-cache-size") + + eg, egctx := errgroup.WithContext(ctx) + + jobs := make(chan address.Address, numWorkers) + results := make(chan actorStats, numWorkers) + + worker := func(ctx context.Context, id int) error { + completed := 0 + defer func() { + log.Infow("worker done", "id", id, "completed", completed) + }() + + for { + select { + case addr, ok := <-jobs: + if !ok { + return nil + } + + actor, err := sm.LoadActor(ctx, addr, ts) + if err != nil { + return err + } + + var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + if dagCacheSize != 0 { + var err error + dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize) + if err != nil { + return err + } + } + + actStats, err := collectStats(ctx, addr, actor, dag) + if err != nil { + return err + } + + select { + case results <- actStats: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + + completed = completed + 1 + } + } + + for w := 0; w < numWorkers; w++ { + id := w + eg.Go(func() error { + return worker(egctx, id) + }) + } + + go func() { + defer close(jobs) + for _, addr := range addrs { + jobs <- addr + } + }() + + go func() { + // error is check later + eg.Wait() //nolint:errcheck + close(results) + }() + + for { + select { + case result, ok := <-results: + if !ok { + return eg.Wait() + } + + if cctx.Bool("pretty") { + DumpStats(result) + } else { + if err := DumpJSON(result); err != nil { + return err + } + } + case <-ctx.Done(): + return ctx.Err() + } + } + }, +} + +func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { + log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) + + nd, err := dag.Get(ctx, actor.Head) + if err != nil { + return actorStats{}, err + } + + // When it comes to fvm / evm actors this method of inspecting fields will probably not work + // and we may only be able to collect stats for the top level object. We might be able to iterate + // over the top level fields for the actors and identify field that are CIDs, but unsure if we would + // be able to identify a field name. + + oif, err := vm.DumpActorState(consensus.NewTipSetExecutor(filcns.RewardFunc).NewActorRegistry(), actor, nd.RawData()) + if err != nil { + oif = nil + } + + fields := []fieldItem{} + + // Account actors return nil from DumpActorState as they have no state + if oif != nil { + v := reflect.Indirect(reflect.ValueOf(oif)) + for i := 0; i < v.NumField(); i++ { + varName := v.Type().Field(i).Name + varType := v.Type().Field(i).Type + varValue := v.Field(i).Interface() + + if varType == reflect.TypeOf(cid.Cid{}) { + fields = append(fields, fieldItem{ + Name: varName, + Cid: varValue.(cid.Cid), + }) + } + } + } + + actStats := actorStats{ + Address: addr, + Actor: actor, + } + + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return actorStats{}, err + } + + actStats.Stats = dsc.stats + + for _, field := range fields { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return actorStats{}, err + } + + field.Stats = dsc.stats + + actStats.Fields = append(actStats.Fields, field) + } + + return actStats, nil +} + +func DumpJSON(i interface{}) error { + bs, err := json.Marshal(i) + if err != nil { + return err + } + + fmt.Println(string(bs)) + + return nil +} + +func DumpStats(actStats actorStats) { + strtype := builtin.ActorNameByCode(actStats.Actor.Code) + fmt.Printf("Address:\t%s\n", actStats.Address) + fmt.Printf("Balance:\t%s\n", types.FIL(actStats.Actor.Balance)) + fmt.Printf("Nonce:\t\t%d\n", actStats.Actor.Nonce) + fmt.Printf("Code:\t\t%s (%s)\n", actStats.Actor.Code, strtype) + fmt.Printf("Head:\t\t%s\n", actStats.Actor.Head) + fmt.Println() + + fmt.Printf("%-*s%-*s%-*s\n", 32, "Field", 24, "Size", 24, "\"Blocks\"") + + stats := actStats.Stats + sizeStr := units.BytesSize(float64(stats.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, "", 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links) + + for _, s := range actStats.Fields { + stats := s.Stats + sizeStr := units.BytesSize(float64(stats.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, s.Name, 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links) + } + + fmt.Println("--------------------------------------------------------------------------") +}