diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 11da70dca..6378fdc11 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -6,11 +6,15 @@ import ( "encoding/json" "fmt" "io" + "net/http" "path" "reflect" "sort" "sync" + _ "net/http" + _ "net/http/pprof" + "github.com/docker/go-units" lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-blockservice" @@ -271,6 +275,52 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) return &handle, nil } +func pipeline(ctx context.Context, name string, numWorkers int, createJobs func(ctx context.Context, jobCh chan job, resultCh chan result) error, + worker func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error, + processResults func(ctx context.Context, resultCh chan result) error) error { + + eg, egctx := errgroup.WithContext(ctx) + jobCh := make(chan job, numWorkers) + resultCh := make(chan result) + var resultWriterWg sync.WaitGroup + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + defer func() { + close(jobCh) + }() + return createJobs(ctx, jobCh, resultCh) + }) + + var id int + for w := 0; w < numWorkers; w++ { + id = w + + resultWriterWg.Add(1) + eg.Go(func() error { + defer resultWriterWg.Done() + return worker(egctx, id, jobCh, resultCh) + }) + } + + eg.Go(func() error { + return processResults(ctx, resultCh) + }) + + // close result channel when workers are done sending to it. + eg.Go(func() error { + resultWriterWg.Wait() + close(resultCh) + return nil + }) + + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed pipeline %s: %w", name, err) + } + return nil +} + var statSnapshotCmd = &cli.Command{ Name: "stat-snapshot", Usage: "calculates the space usage of a snapshot taken from the given tipset", @@ -314,20 +364,18 @@ var statSnapshotCmd = &cli.Command{ if err != nil { return err } + go func() { + fmt.Println(http.ListenAndServe("localhost:6060", nil)) + }() numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") - eg, egctx := errgroup.WithContext(ctx) - - jobCh := make(chan job, numWorkers) - resultCh := make(chan result) cidCh := make(chan cidCall, numWorkers) summary := make(map[string]api.ObjStat) // snapshot root objects with no additional bytes or links summary["/"] = api.ObjStat{Size: 0, Links: 0} summary["/statetree"] = api.ObjStat{Size: 0, Links: 0} - summary["/statetree/churn"] = api.ObjStat{Size: 0, Links: 0} // XXX this will be calculated combine := func(statsA, statsB api.ObjStat) api.ObjStat { return api.ObjStat{ @@ -335,7 +383,6 @@ var statSnapshotCmd = &cli.Command{ Links: statsA.Links + statsB.Links, } } - var workerWg sync.WaitGroup // Threadsafe cid set lives across different pipelines so not part of error group go func() error { @@ -356,13 +403,8 @@ var statSnapshotCmd = &cli.Command{ out := <-ch return out } - // Stage 1 walk all actors in latest state root - eg.Go(func() error { - defer func() { - close(jobCh) - }() - + createJobsStage1 := func(ctx context.Context, jobCh chan job, _ chan result) error { st, err := h.sm.StateTree(ts.ParentState()) if err != nil { return err @@ -378,7 +420,7 @@ var statSnapshotCmd = &cli.Command{ return nil }) - }) + } worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) @@ -405,24 +447,8 @@ var statSnapshotCmd = &cli.Command{ } return nil } - var id int - for w := 0; w < numWorkers; w++ { - id = w - workerWg.Add(1) - eg.Go(func() error { - defer workerWg.Done() - return worker(egctx, id, jobCh, resultCh) - }) - } - // Close result channel when workers are done sending to it. - eg.Go(func() error { - workerWg.Wait() - close(resultCh) - return nil - }) - - eg.Go(func() error { + processResults := func(ctx context.Context, resultCh chan result) error { for result := range resultCh { if stat, ok := summary[result.key]; ok { summary[result.key] = combine(stat, result.stats) @@ -432,45 +458,30 @@ var statSnapshotCmd = &cli.Command{ } } return nil - }) + } - if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to measure space in latest state root: %w", err) + if err := pipeline(ctx, "Latest State Actors", numWorkers, createJobsStage1, worker, processResults); err != nil { + return err } // Stage 2: walk the top of the latest state root - id += 1 - resultCh = make(chan result) - jobCh = make(chan job) - go func() { - defer close(jobCh) + createJobsStage2 := func(ctx context.Context, jobCh chan job, _ chan result) error { jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"} - }() - go func() { - defer close(resultCh) - worker(ctx, id, jobCh, resultCh) - }() - res := <-resultCh - summary[res.key] = res.stats + return nil + } + + if err := pipeline(ctx, "Latest State HAMT", numWorkers, createJobsStage2, worker, processResults); err != nil { + return err + } // Stage 3 walk the rest of the chain: headers, messages, churn // ordering: // for each header send jobs for messages, receipts, state tree churn // don't walk header directly as it would just walk everything including parent tipsets - eg, egctx = errgroup.WithContext(ctx) - jobCh = make(chan job, numWorkers) - resultCh = make(chan result) - var workerWg2 sync.WaitGroup churnStateRoots := cid.NewSet() - workerWg2.Add(1) // goroutine creating jobs also sends to result channel - eg.Go(func() error { - defer func() { - close(jobCh) - workerWg2.Done() - }() + createJobsStage3 := func(ctx context.Context, jobCh chan job, resultCh chan result) error { // walk chain - var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) if dagCacheSize != 0 { var err error @@ -482,7 +493,6 @@ var statSnapshotCmd = &cli.Command{ blocksToWalk := ts.Cids() startHeight := ts.Height() - fmt.Printf("starting height %d\n", startHeight) snapshotStateLimit := abi.ChainEpoch(2000) churnActorCache := cid.NewSet() @@ -539,40 +549,26 @@ var statSnapshotCmd = &cli.Command{ } } return nil - }) + } - for w := 0; w < numWorkers; w++ { - id += w - workerWg2.Add(1) - eg.Go(func() error { - defer workerWg2.Done() - return worker(egctx, id, jobCh, resultCh) + if err := pipeline(ctx, "Churn, Headers, Messages", numWorkers, createJobsStage3, worker, processResults); err != nil { + return err + } + + // step 1 clean things up and get a nice abstraction to reuse + // Stage 4 walk all actor HAMTs for churn + + createJobsStage4 := func(ctx context.Context, jobCh chan job, _ chan result) error { + return churnStateRoots.ForEach(func(c cid.Cid) error { + jobCh <- job{c: c, key: "/statetree/churn"} + return nil }) } - // Close result channel when workers are done sending to it. - eg.Go(func() error { - workerWg2.Wait() - close(resultCh) - return nil - }) - - eg.Go(func() error { - for result := range resultCh { - if stat, ok := summary[result.key]; ok { - summary[result.key] = combine(stat, result.stats) - } else { - summary[result.key] = result.stats - } - } - return nil - }) - if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to measure space in snapshot churn: %w", err) + if err := pipeline(ctx, "Churn HAMT", numWorkers, createJobsStage4, worker, processResults); err != nil { + return err } - // Stage 4 walk all actor HAMTs for churn - if cctx.Bool("pretty") { DumpSnapshotStats(summary) } else {