Refactor for cleanup + measure top level HAMT churn

This commit is contained in:
zenground0 2023-06-27 17:57:00 -06:00
parent 77ea7ef90d
commit 0cfdc9b5b6

View File

@ -6,11 +6,15 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http"
"path" "path"
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
_ "net/http"
_ "net/http/pprof"
"github.com/docker/go-units" "github.com/docker/go-units"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
@ -271,6 +275,52 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error)
return &handle, nil return &handle, nil
} }
func pipeline(ctx context.Context, name string, numWorkers int, createJobs func(ctx context.Context, jobCh chan job, resultCh chan result) error,
worker func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error,
processResults func(ctx context.Context, resultCh chan result) error) error {
eg, egctx := errgroup.WithContext(ctx)
jobCh := make(chan job, numWorkers)
resultCh := make(chan result)
var resultWriterWg sync.WaitGroup
resultWriterWg.Add(1)
eg.Go(func() error {
defer resultWriterWg.Done()
defer func() {
close(jobCh)
}()
return createJobs(ctx, jobCh, resultCh)
})
var id int
for w := 0; w < numWorkers; w++ {
id = w
resultWriterWg.Add(1)
eg.Go(func() error {
defer resultWriterWg.Done()
return worker(egctx, id, jobCh, resultCh)
})
}
eg.Go(func() error {
return processResults(ctx, resultCh)
})
// close result channel when workers are done sending to it.
eg.Go(func() error {
resultWriterWg.Wait()
close(resultCh)
return nil
})
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed pipeline %s: %w", name, err)
}
return nil
}
var statSnapshotCmd = &cli.Command{ var statSnapshotCmd = &cli.Command{
Name: "stat-snapshot", Name: "stat-snapshot",
Usage: "calculates the space usage of a snapshot taken from the given tipset", Usage: "calculates the space usage of a snapshot taken from the given tipset",
@ -314,20 +364,18 @@ var statSnapshotCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
go func() {
fmt.Println(http.ListenAndServe("localhost:6060", nil))
}()
numWorkers := cctx.Int("workers") numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size") dagCacheSize := cctx.Int("dag-cache-size")
eg, egctx := errgroup.WithContext(ctx)
jobCh := make(chan job, numWorkers)
resultCh := make(chan result)
cidCh := make(chan cidCall, numWorkers) cidCh := make(chan cidCall, numWorkers)
summary := make(map[string]api.ObjStat) summary := make(map[string]api.ObjStat)
// snapshot root objects with no additional bytes or links // snapshot root objects with no additional bytes or links
summary["/"] = api.ObjStat{Size: 0, Links: 0} summary["/"] = api.ObjStat{Size: 0, Links: 0}
summary["/statetree"] = api.ObjStat{Size: 0, Links: 0} summary["/statetree"] = api.ObjStat{Size: 0, Links: 0}
summary["/statetree/churn"] = api.ObjStat{Size: 0, Links: 0} // XXX this will be calculated
combine := func(statsA, statsB api.ObjStat) api.ObjStat { combine := func(statsA, statsB api.ObjStat) api.ObjStat {
return api.ObjStat{ return api.ObjStat{
@ -335,7 +383,6 @@ var statSnapshotCmd = &cli.Command{
Links: statsA.Links + statsB.Links, Links: statsA.Links + statsB.Links,
} }
} }
var workerWg sync.WaitGroup
// Threadsafe cid set lives across different pipelines so not part of error group // Threadsafe cid set lives across different pipelines so not part of error group
go func() error { go func() error {
@ -356,13 +403,8 @@ var statSnapshotCmd = &cli.Command{
out := <-ch out := <-ch
return out return out
} }
// Stage 1 walk all actors in latest state root // Stage 1 walk all actors in latest state root
eg.Go(func() error { createJobsStage1 := func(ctx context.Context, jobCh chan job, _ chan result) error {
defer func() {
close(jobCh)
}()
st, err := h.sm.StateTree(ts.ParentState()) st, err := h.sm.StateTree(ts.ParentState())
if err != nil { if err != nil {
return err return err
@ -378,7 +420,7 @@ var statSnapshotCmd = &cli.Command{
return nil return nil
}) })
}) }
worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error { worker := func(ctx context.Context, id int, jobCh chan job, resultCh chan result) error {
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
@ -405,24 +447,8 @@ var statSnapshotCmd = &cli.Command{
} }
return nil return nil
} }
var id int
for w := 0; w < numWorkers; w++ {
id = w
workerWg.Add(1)
eg.Go(func() error {
defer workerWg.Done()
return worker(egctx, id, jobCh, resultCh)
})
}
// Close result channel when workers are done sending to it. processResults := func(ctx context.Context, resultCh chan result) error {
eg.Go(func() error {
workerWg.Wait()
close(resultCh)
return nil
})
eg.Go(func() error {
for result := range resultCh { for result := range resultCh {
if stat, ok := summary[result.key]; ok { if stat, ok := summary[result.key]; ok {
summary[result.key] = combine(stat, result.stats) summary[result.key] = combine(stat, result.stats)
@ -432,45 +458,30 @@ var statSnapshotCmd = &cli.Command{
} }
} }
return nil return nil
}) }
if err := eg.Wait(); err != nil { if err := pipeline(ctx, "Latest State Actors", numWorkers, createJobsStage1, worker, processResults); err != nil {
return fmt.Errorf("failed to measure space in latest state root: %w", err) return err
} }
// Stage 2: walk the top of the latest state root // Stage 2: walk the top of the latest state root
id += 1 createJobsStage2 := func(ctx context.Context, jobCh chan job, _ chan result) error {
resultCh = make(chan result)
jobCh = make(chan job)
go func() {
defer close(jobCh)
jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"} jobCh <- job{c: ts.ParentState(), key: "/statetree/latest"}
}() return nil
go func() { }
defer close(resultCh)
worker(ctx, id, jobCh, resultCh) if err := pipeline(ctx, "Latest State HAMT", numWorkers, createJobsStage2, worker, processResults); err != nil {
}() return err
res := <-resultCh }
summary[res.key] = res.stats
// Stage 3 walk the rest of the chain: headers, messages, churn // Stage 3 walk the rest of the chain: headers, messages, churn
// ordering: // ordering:
// for each header send jobs for messages, receipts, state tree churn // for each header send jobs for messages, receipts, state tree churn
// don't walk header directly as it would just walk everything including parent tipsets // don't walk header directly as it would just walk everything including parent tipsets
eg, egctx = errgroup.WithContext(ctx)
jobCh = make(chan job, numWorkers)
resultCh = make(chan result)
var workerWg2 sync.WaitGroup
churnStateRoots := cid.NewSet() churnStateRoots := cid.NewSet()
workerWg2.Add(1) // goroutine creating jobs also sends to result channel createJobsStage3 := func(ctx context.Context, jobCh chan job, resultCh chan result) error {
eg.Go(func() error {
defer func() {
close(jobCh)
workerWg2.Done()
}()
// walk chain // walk chain
var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs))) var dag format.NodeGetter = merkledag.NewDAGService(blockservice.New(h.bs, offline.Exchange(h.bs)))
if dagCacheSize != 0 { if dagCacheSize != 0 {
var err error var err error
@ -482,7 +493,6 @@ var statSnapshotCmd = &cli.Command{
blocksToWalk := ts.Cids() blocksToWalk := ts.Cids()
startHeight := ts.Height() startHeight := ts.Height()
fmt.Printf("starting height %d\n", startHeight)
snapshotStateLimit := abi.ChainEpoch(2000) snapshotStateLimit := abi.ChainEpoch(2000)
churnActorCache := cid.NewSet() churnActorCache := cid.NewSet()
@ -539,40 +549,26 @@ var statSnapshotCmd = &cli.Command{
} }
} }
return nil return nil
}) }
for w := 0; w < numWorkers; w++ { if err := pipeline(ctx, "Churn, Headers, Messages", numWorkers, createJobsStage3, worker, processResults); err != nil {
id += w return err
workerWg2.Add(1) }
eg.Go(func() error {
defer workerWg2.Done() // step 1 clean things up and get a nice abstraction to reuse
return worker(egctx, id, jobCh, resultCh) // Stage 4 walk all actor HAMTs for churn
createJobsStage4 := func(ctx context.Context, jobCh chan job, _ chan result) error {
return churnStateRoots.ForEach(func(c cid.Cid) error {
jobCh <- job{c: c, key: "/statetree/churn"}
return nil
}) })
} }
// Close result channel when workers are done sending to it. if err := pipeline(ctx, "Churn HAMT", numWorkers, createJobsStage4, worker, processResults); err != nil {
eg.Go(func() error { return err
workerWg2.Wait()
close(resultCh)
return nil
})
eg.Go(func() error {
for result := range resultCh {
if stat, ok := summary[result.key]; ok {
summary[result.key] = combine(stat, result.stats)
} else {
summary[result.key] = result.stats
}
}
return nil
})
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to measure space in snapshot churn: %w", err)
} }
// Stage 4 walk all actor HAMTs for churn
if cctx.Bool("pretty") { if cctx.Bool("pretty") {
DumpSnapshotStats(summary) DumpSnapshotStats(summary)
} else { } else {