8abe0ea608
feat:chain: Message Index
535 lines
13 KiB
Go
535 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"sync"
|
|
|
|
"github.com/docker/go-units"
|
|
lru "github.com/hashicorp/golang-lru/v2"
|
|
"github.com/ipfs/go-blockservice"
|
|
"github.com/ipfs/go-cid"
|
|
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
|
format "github.com/ipfs/go-ipld-format"
|
|
"github.com/ipfs/go-merkledag"
|
|
"github.com/urfave/cli/v2"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/consensus"
|
|
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
|
"github.com/filecoin-project/lotus/chain/index"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
lcli "github.com/filecoin-project/lotus/cli"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
|
)
|
|
|
|
type actorStats struct {
|
|
Address address.Address
|
|
Actor *types.Actor
|
|
Fields []fieldItem
|
|
Stats api.ObjStat
|
|
}
|
|
|
|
type fieldItem struct {
|
|
Name string
|
|
Cid cid.Cid
|
|
Stats api.ObjStat
|
|
}
|
|
|
|
type cacheNodeGetter struct {
|
|
ds format.NodeGetter
|
|
cache *lru.TwoQueueCache[cid.Cid, format.Node]
|
|
}
|
|
|
|
func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error) {
|
|
cng := &cacheNodeGetter{ds: d}
|
|
|
|
cache, err := lru.New2Q[cid.Cid, format.Node](size)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cng.cache = cache
|
|
|
|
return cng, nil
|
|
}
|
|
|
|
func (cng *cacheNodeGetter) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
|
|
if n, ok := cng.cache.Get(c); ok {
|
|
return n, nil
|
|
}
|
|
|
|
n, err := cng.ds.Get(ctx, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cng.cache.Add(c, n)
|
|
|
|
return n, nil
|
|
}
|
|
|
|
func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan *format.NodeOption {
|
|
out := make(chan *format.NodeOption, len(list))
|
|
go func() {
|
|
for _, c := range list {
|
|
n, err := cng.Get(ctx, c)
|
|
if err != nil {
|
|
out <- &format.NodeOption{Err: err}
|
|
continue
|
|
}
|
|
|
|
out <- &format.NodeOption{Node: n}
|
|
}
|
|
}()
|
|
|
|
return out
|
|
}
|
|
|
|
type dagStatCollector struct {
|
|
ds format.NodeGetter
|
|
walk func(format.Node) ([]*format.Link, error)
|
|
|
|
statsLk sync.Mutex
|
|
stats api.ObjStat
|
|
}
|
|
|
|
func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error {
|
|
size, err := nd.Size()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dsc.statsLk.Lock()
|
|
defer dsc.statsLk.Unlock()
|
|
|
|
dsc.stats.Size = dsc.stats.Size + size
|
|
dsc.stats.Links = dsc.stats.Links + 1
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dsc *dagStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
|
|
nd, err := dsc.ds.Get(ctx, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := dsc.record(ctx, nd); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dsc.walk(nd)
|
|
}
|
|
|
|
type ChainStoreTipSetResolver struct {
|
|
Chain *store.ChainStore
|
|
}
|
|
|
|
func (tsr *ChainStoreTipSetResolver) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
|
return tsr.Chain.GetHeaviestTipSet(), nil
|
|
}
|
|
|
|
func (tsr *ChainStoreTipSetResolver) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
|
|
ts, err := tsr.Chain.GetTipSetFromKey(ctx, tsk)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
|
}
|
|
return tsr.Chain.GetTipsetByHeight(ctx, h, ts, true)
|
|
}
|
|
func (tsr *ChainStoreTipSetResolver) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
|
return tsr.Chain.LoadTipSet(ctx, tsk)
|
|
}
|
|
|
|
var statObjCmd = &cli.Command{
|
|
Name: "stat-obj",
|
|
Usage: "calculates the size of any DAG in the blockstore",
|
|
Flags: []cli.Flag{},
|
|
Action: func(cctx *cli.Context) error {
|
|
ctx := lcli.ReqContext(cctx)
|
|
|
|
c, err := cid.Parse(cctx.Args().First())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, err := repo.NewFS(cctx.String("repo"))
|
|
if err != nil {
|
|
return xerrors.Errorf("opening fs repo: %w", err)
|
|
}
|
|
|
|
exists, err := r.Exists()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
return xerrors.Errorf("lotus repo doesn't exist")
|
|
}
|
|
|
|
lr, err := r.Lock(repo.FullNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer lr.Close() //nolint:errcheck
|
|
|
|
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if c, ok := bs.(io.Closer); ok {
|
|
if err := c.Close(); err != nil {
|
|
log.Warnf("failed to close blockstore: %s", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
|
dsc := &dagStatCollector{
|
|
ds: dag,
|
|
walk: carWalkFunc,
|
|
}
|
|
|
|
if err := merkledag.Walk(ctx, dsc.walkLinks, c, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return DumpJSON(dsc.stats)
|
|
},
|
|
}
|
|
|
|
var statActorCmd = &cli.Command{
|
|
Name: "stat-actor",
|
|
Usage: "calculates the size of actors and their immeidate structures",
|
|
Description: `Any DAG linked by the actor object (field) will have its size calculated independently of all
|
|
other linked DAG. If an actor has two fields containing links to the same DAG the structure size will be counted
|
|
twice, included in each fields size individually.
|
|
|
|
The top level stats reported for an actor is computed independently of all fields and is a more accurate
|
|
accounting of the true size of the actor in the state datastore.
|
|
|
|
The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used
|
|
to reduce the number of decode operations performed by caching the decoded object after first access.`,
|
|
Flags: []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "tipset",
|
|
Usage: "specify tipset to call method on (pass comma separated array of cids)",
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "workers",
|
|
Usage: "number of workers to use when processing",
|
|
Value: 10,
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "dag-cache-size",
|
|
Usage: "cache size per worker (setting to 0 disables)",
|
|
Value: 8092,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "all",
|
|
Usage: "process all actors in stateroot of tipset",
|
|
Value: false,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "pretty",
|
|
Usage: "print formated output instead of ldjson",
|
|
Value: false,
|
|
},
|
|
},
|
|
Action: func(cctx *cli.Context) error {
|
|
ctx := lcli.ReqContext(cctx)
|
|
|
|
var addrs []address.Address
|
|
|
|
if !cctx.Bool("all") {
|
|
for _, a := range cctx.Args().Slice() {
|
|
addr, err := address.NewFromString(a)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addrs = append(addrs, addr)
|
|
}
|
|
}
|
|
|
|
r, err := repo.NewFS(cctx.String("repo"))
|
|
if err != nil {
|
|
return xerrors.Errorf("opening fs repo: %w", err)
|
|
}
|
|
|
|
exists, err := r.Exists()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
return xerrors.Errorf("lotus repo doesn't exist")
|
|
}
|
|
|
|
lr, err := r.Lock(repo.FullNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer lr.Close() //nolint:errcheck
|
|
|
|
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if c, ok := bs.(io.Closer); ok {
|
|
if err := c.Close(); err != nil {
|
|
log.Warnf("failed to close blockstore: %s", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
mds, err := lr.Datastore(context.Background(), "/metadata")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
|
if err := cs.Load(ctx); err != nil {
|
|
return nil
|
|
}
|
|
|
|
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
|
|
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tsr := &ChainStoreTipSetResolver{
|
|
Chain: cs,
|
|
}
|
|
|
|
ts, err := lcli.LoadTipSet(ctx, cctx, tsr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infow("tipset", "parentstate", ts.ParentState())
|
|
|
|
if len(addrs) == 0 && cctx.Bool("all") {
|
|
var err error
|
|
addrs, err = sm.ListAllActors(ctx, ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
numWorkers := cctx.Int("workers")
|
|
dagCacheSize := cctx.Int("dag-cache-size")
|
|
|
|
eg, egctx := errgroup.WithContext(ctx)
|
|
|
|
jobs := make(chan address.Address, numWorkers)
|
|
results := make(chan actorStats, numWorkers)
|
|
|
|
worker := func(ctx context.Context, id int) error {
|
|
completed := 0
|
|
defer func() {
|
|
log.Infow("worker done", "id", id, "completed", completed)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case addr, ok := <-jobs:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
actor, err := sm.LoadActor(ctx, addr, ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
|
if dagCacheSize != 0 {
|
|
var err error
|
|
dag, err = newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), dagCacheSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
actStats, err := collectStats(ctx, addr, actor, dag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case results <- actStats:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
completed = completed + 1
|
|
}
|
|
}
|
|
|
|
for w := 0; w < numWorkers; w++ {
|
|
id := w
|
|
eg.Go(func() error {
|
|
return worker(egctx, id)
|
|
})
|
|
}
|
|
|
|
go func() {
|
|
defer close(jobs)
|
|
for _, addr := range addrs {
|
|
jobs <- addr
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
// error is check later
|
|
eg.Wait() //nolint:errcheck
|
|
close(results)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case result, ok := <-results:
|
|
if !ok {
|
|
return eg.Wait()
|
|
}
|
|
|
|
if cctx.Bool("pretty") {
|
|
DumpStats(result)
|
|
} else {
|
|
if err := DumpJSON(result); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
|
|
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))
|
|
|
|
nd, err := dag.Get(ctx, actor.Head)
|
|
if err != nil {
|
|
return actorStats{}, err
|
|
}
|
|
|
|
// When it comes to fvm / evm actors this method of inspecting fields will probably not work
|
|
// and we may only be able to collect stats for the top level object. We might be able to iterate
|
|
// over the top level fields for the actors and identify field that are CIDs, but unsure if we would
|
|
// be able to identify a field name.
|
|
|
|
oif, err := vm.DumpActorState(consensus.NewTipSetExecutor(filcns.RewardFunc).NewActorRegistry(), actor, nd.RawData())
|
|
if err != nil {
|
|
oif = nil
|
|
}
|
|
|
|
fields := []fieldItem{}
|
|
|
|
// Account actors return nil from DumpActorState as they have no state
|
|
if oif != nil {
|
|
v := reflect.Indirect(reflect.ValueOf(oif))
|
|
for i := 0; i < v.NumField(); i++ {
|
|
varName := v.Type().Field(i).Name
|
|
varType := v.Type().Field(i).Type
|
|
varValue := v.Field(i).Interface()
|
|
|
|
if varType == reflect.TypeOf(cid.Cid{}) {
|
|
fields = append(fields, fieldItem{
|
|
Name: varName,
|
|
Cid: varValue.(cid.Cid),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
actStats := actorStats{
|
|
Address: addr,
|
|
Actor: actor,
|
|
}
|
|
|
|
dsc := &dagStatCollector{
|
|
ds: dag,
|
|
walk: carWalkFunc,
|
|
}
|
|
|
|
if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
|
|
return actorStats{}, err
|
|
}
|
|
|
|
actStats.Stats = dsc.stats
|
|
|
|
for _, field := range fields {
|
|
dsc := &dagStatCollector{
|
|
ds: dag,
|
|
walk: carWalkFunc,
|
|
}
|
|
|
|
if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
|
|
return actorStats{}, err
|
|
}
|
|
|
|
field.Stats = dsc.stats
|
|
|
|
actStats.Fields = append(actStats.Fields, field)
|
|
}
|
|
|
|
return actStats, nil
|
|
}
|
|
|
|
func DumpJSON(i interface{}) error {
|
|
bs, err := json.Marshal(i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fmt.Println(string(bs))
|
|
|
|
return nil
|
|
}
|
|
|
|
func DumpStats(actStats actorStats) {
|
|
strtype := builtin.ActorNameByCode(actStats.Actor.Code)
|
|
fmt.Printf("Address:\t%s\n", actStats.Address)
|
|
fmt.Printf("Balance:\t%s\n", types.FIL(actStats.Actor.Balance))
|
|
fmt.Printf("Nonce:\t\t%d\n", actStats.Actor.Nonce)
|
|
fmt.Printf("Code:\t\t%s (%s)\n", actStats.Actor.Code, strtype)
|
|
fmt.Printf("Head:\t\t%s\n", actStats.Actor.Head)
|
|
fmt.Println()
|
|
|
|
fmt.Printf("%-*s%-*s%-*s\n", 32, "Field", 24, "Size", 24, "\"Blocks\"")
|
|
|
|
stats := actStats.Stats
|
|
sizeStr := units.BytesSize(float64(stats.Size))
|
|
fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, "<self>", 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links)
|
|
|
|
for _, s := range actStats.Fields {
|
|
stats := s.Stats
|
|
sizeStr := units.BytesSize(float64(stats.Size))
|
|
fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, s.Name, 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links)
|
|
}
|
|
|
|
fmt.Println("--------------------------------------------------------------------------")
|
|
}
|