package main import ( "bufio" "context" "encoding/json" "fmt" "io" "io/ioutil" "math" "net/http" _ "net/http/pprof" "os" "runtime" "runtime/pprof" "sort" "time" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" "github.com/ipfs/go-datastore" badger "github.com/ipfs/go-ds-badger2" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/urfave/cli/v2" ) type TipSetExec struct { TipSet types.TipSetKey Trace []*api.InvocResult Duration time.Duration } var importBenchCmd = &cli.Command{ Name: "import", Usage: "benchmark chain import and validation", Subcommands: []*cli.Command{ importAnalyzeCmd, }, Flags: []cli.Flag{ &cli.Int64Flag{ Name: "height", Usage: "halt validation after given height", }, &cli.IntFlag{ Name: "batch-seal-verify-threads", Usage: "set the parallelism factor for batch seal verification", Value: runtime.NumCPU(), }, }, Action: func(cctx *cli.Context) error { vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads") if !cctx.Args().Present() { fmt.Println("must pass car file of chain to benchmark importing") return nil } cfi, err := os.Open(cctx.Args().First()) if err != nil { return err } defer cfi.Close() //nolint:errcheck // read only file tdir, err := ioutil.TempDir("", "lotus-import-bench") if err != nil { return err } bds, err := badger.NewDatastore(tdir, nil) if err != nil { return err } bs := blockstore.NewBlockstore(bds) cbs, err := blockstore.CachedBlockstore(context.TODO(), bs, blockstore.DefaultCacheOpts()) if err != nil { return err } bs = cbs ds := datastore.NewMapDatastore() cs := store.NewChainStore(bs, ds, vm.Syscalls(ffiwrapper.ProofVerifier)) stm := stmgr.NewStateManager(cs) prof, err := os.Create("import-bench.prof") if err != nil { return err } defer prof.Close() //nolint:errcheck if err := pprof.StartCPUProfile(prof); err != nil { return err } head, err := cs.Import(cfi) if err != nil { return err } if h := cctx.Int64("height"); h != 0 { tsh, err := cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true) if err != nil { return err } head = tsh } ts := head tschain := []*types.TipSet{ts} for ts.Height() != 0 { next, err := cs.LoadTipSet(ts.Parents()) if err != nil { return err } tschain = append(tschain, next) ts = next } ibj, err := os.Create("import-bench.json") if err != nil { return err } defer ibj.Close() //nolint:errcheck enc := json.NewEncoder(ibj) var lastTse *TipSetExec lastState := tschain[len(tschain)-1].ParentState() for i := len(tschain) - 2; i >= 0; i-- { cur := tschain[i] log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids()) if cur.ParentState() != lastState { lastTrace := lastTse.Trace d, err := json.MarshalIndent(lastTrace, "", " ") if err != nil { panic(err) } fmt.Println("TRACE") fmt.Println(string(d)) return xerrors.Errorf("tipset chain had state mismatch at height %d (%s != %s)", cur.Height(), cur.ParentState(), lastState) } start := time.Now() st, trace, err := stm.ExecutionTrace(context.TODO(), cur) if err != nil { return err } stripCallers(trace) lastTse = &TipSetExec{ TipSet: cur.Key(), Trace: trace, Duration: time.Since(start), } lastState = st if err := enc.Encode(lastTse); err != nil { return xerrors.Errorf("failed to write out tipsetexec: %w", err) } } pprof.StopCPUProfile() return nil }, } func walkExecutionTrace(et *types.ExecutionTrace) { for _, gc := range et.GasCharges { gc.Callers = nil } for _, sub := range et.Subcalls { walkExecutionTrace(&sub) //nolint:scopelint,gosec } } func stripCallers(trace []*api.InvocResult) { for _, t := range trace { walkExecutionTrace(&t.ExecutionTrace) } } type Invocation struct { TipSet types.TipSetKey Invoc *api.InvocResult } const GasPerNs = 10 func countGasCosts(et *types.ExecutionTrace) (int64, int64) { var cgas, vgas int64 for _, gc := range et.GasCharges { cgas += gc.ComputeGas vgas += gc.VirtualComputeGas } for _, sub := range et.Subcalls { c, v := countGasCosts(&sub) cgas += c vgas += v } return cgas, vgas } func compStats(vals []float64) (float64, float64) { var sum float64 for _, v := range vals { sum += v } av := sum / float64(len(vals)) var varsum float64 for _, v := range vals { delta := av - v varsum += delta * delta } return av, math.Sqrt(varsum / float64(len(vals))) } type stats struct { timeTaken meanVar gasRatio meanVar extraCovar *covar } type covar struct { meanX float64 meanY float64 c float64 n float64 m2x float64 m2y float64 } func (cov1 *covar) Covariance() float64 { return cov1.c / (cov1.n - 1) } func (cov1 *covar) VarianceX() float64 { return cov1.m2x / (cov1.n - 1) } func (v1 *covar) StddevX() float64 { return math.Sqrt(v1.VarianceX()) } func (cov1 *covar) VarianceY() float64 { return cov1.m2y / (cov1.n - 1) } func (v1 *covar) StddevY() float64 { return math.Sqrt(v1.VarianceY()) } func (cov1 *covar) AddPoint(x, y float64) { cov1.n += 1 dx := x - cov1.meanX cov1.meanX += dx / cov1.n dx2 := x - cov1.meanX cov1.m2x += dx * dx2 dy := y - cov1.meanY cov1.meanY += dy / cov1.n dy2 := y - cov1.meanY cov1.m2y += dy * dy2 cov1.c += dx * dy } func (cov1 *covar) Combine(cov2 *covar) { if cov1.n == 0 { *cov1 = *cov2 return } if cov2.n == 0 { return } if cov1.n == 1 { cpy := *cov2 cpy.AddPoint(cov2.meanX, cov2.meanY) *cov1 = cpy return } if cov2.n == 1 { cov1.AddPoint(cov2.meanX, cov2.meanY) } out := covar{} out.n = cov1.n + cov2.n dx := cov1.meanX - cov2.meanX out.meanX = cov1.meanX - dx*cov2.n/out.n out.m2x = cov1.m2x + cov2.m2x + dx*dx*cov1.n*cov2.n/out.n dy := cov1.meanY - cov2.meanY out.meanY = cov1.meanY - dy*cov2.n/out.n out.m2y = cov1.m2y + cov2.m2y + dy*dy*cov1.n*cov2.n/out.n out.c = cov1.c + cov2.c + dx*dy*cov1.n*cov2.n/out.n *cov1 = out } func (cov1 *covar) A() float64 { return cov1.Covariance() / cov1.VarianceX() } func (cov1 *covar) B() float64 { return cov1.meanY - cov1.meanX*cov1.A() } func (cov1 *covar) Correl() float64 { return cov1.Covariance() / cov1.StddevX() / cov1.StddevY() } type meanVar struct { n float64 mean float64 m2 float64 } func (v1 *meanVar) AddPoint(value float64) { // based on https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm v1.n += 1 delta := value - v1.mean v1.mean += delta / v1.n delta2 := value - v1.mean v1.m2 += delta * delta2 } func (v1 *meanVar) Variance() float64 { return v1.m2 / (v1.n - 1) } func (v1 *meanVar) Mean() float64 { return v1.mean } func (v1 *meanVar) Stddev() float64 { return math.Sqrt(v1.Variance()) } func (v1 *meanVar) Combine(v2 *meanVar) { if v1.n == 0 { *v1 = *v2 return } if v2.n == 0 { return } if v1.n == 1 { cpy := *v2 cpy.AddPoint(v1.mean) *v1 = cpy return } if v2.n == 1 { v1.AddPoint(v2.mean) return } newCount := v1.n + v2.n delta := v2.mean - v1.mean meanDelta := delta * v2.n / newCount m2 := v1.m2 + v2.m2 + delta*meanDelta*v1.n v1.n = newCount v1.mean += meanDelta v1.m2 = m2 } func getExtras(ex interface{}) (*string, *float64) { if t, ok := ex.(string); ok { return &t, nil } if size, ok := ex.(float64); ok { return nil, &size } if exMap, ok := ex.(map[string]interface{}); ok { t, tok := exMap["type"].(string) size, sok := exMap["size"].(float64) if tok && sok { return &t, &size } if tok { return &t, nil } if sok { return nil, &size } return nil, nil } return nil, nil } func tallyGasCharges(charges map[string]*stats, et types.ExecutionTrace) { for i, gc := range et.GasCharges { name := gc.Name if name == "OnIpldGetStart" { continue } tt := float64(gc.TimeTaken.Nanoseconds()) if name == "OnIpldGet" { prev := et.GasCharges[i-1] if prev.Name != "OnIpldGetStart" { log.Warn("OnIpldGet without OnIpldGetStart") } tt += float64(prev.TimeTaken.Nanoseconds()) } eType, eSize := getExtras(gc.Extra) if eType != nil { name += "-" + *eType } compGas := gc.VirtualComputeGas if compGas == 0 { compGas = 1 } s := charges[name] if s == nil { s = new(stats) charges[name] = s } if eSize != nil { if s.extraCovar == nil { s.extraCovar = &covar{} } s.extraCovar.AddPoint(*eSize, tt) } s.timeTaken.AddPoint(tt) ratio := tt / float64(compGas) * GasPerNs s.gasRatio.AddPoint(ratio) } for _, sub := range et.Subcalls { tallyGasCharges(charges, sub) } } var importAnalyzeCmd = &cli.Command{ Name: "analyze", Action: func(cctx *cli.Context) error { if !cctx.Args().Present() { fmt.Println("must pass bench file to analyze") return nil } go func() { http.ListenAndServe("localhost:6060", nil) }() fi, err := os.Open(cctx.Args().First()) if err != nil { return err } const nWorkers = 16 jsonIn := make(chan []byte, 2*nWorkers) type result struct { totalTime time.Duration chargeStats map[string]*stats expensiveInvocs []Invocation } results := make(chan result, nWorkers) for i := 0; i < nWorkers; i++ { go func() { chargeStats := make(map[string]*stats) var totalTime time.Duration const invocsKeep = 32 var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep) var leastExpensiveInvoc = time.Duration(0) for { b, ok := <-jsonIn if !ok { results <- result{ totalTime: totalTime, chargeStats: chargeStats, expensiveInvocs: expensiveInvocs, } return } var tse TipSetExec err := json.Unmarshal(b, &tse) if err != nil { log.Warnf("error unmarshaling tipset: %+v", err) continue } totalTime += tse.Duration for _, inv := range tse.Trace { if inv.Duration > leastExpensiveInvoc { expensiveInvocs = append(expensiveInvocs, Invocation{ TipSet: tse.TipSet, Invoc: inv, }) } tallyGasCharges(chargeStats, inv.ExecutionTrace) } if len(expensiveInvocs) > 4*invocsKeep { sort.Slice(expensiveInvocs, func(i, j int) bool { return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration }) leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration n := 30 if len(expensiveInvocs) < n { n = len(expensiveInvocs) } expensiveInvocs = expensiveInvocs[:n] } } }() } var totalTipsets int64 reader := bufio.NewReader(fi) for { b, err := reader.ReadBytes('\n') if err != nil && err != io.EOF { if e, ok := err.(*json.SyntaxError); ok { log.Warnf("syntax error at byte offset %d", e.Offset) } return err } totalTipsets++ jsonIn <- b fmt.Fprintf(os.Stderr, "\rProcessed %d tipsets", totalTipsets) if err == io.EOF { break } } close(jsonIn) fmt.Fprintf(os.Stderr, "\n") fmt.Fprintf(os.Stderr, "Collecting results\n") var invocs []Invocation var totalTime time.Duration var keys []string var charges = make(map[string]*stats) for i := 0; i < nWorkers; i++ { fmt.Fprintf(os.Stderr, "\rProcessing results from worker %d/%d", i+1, nWorkers) res := <-results invocs = append(invocs, res.expensiveInvocs...) for k, v := range res.chargeStats { s := charges[k] if s == nil { s = new(stats) charges[k] = s } s.timeTaken.Combine(&v.timeTaken) s.gasRatio.Combine(&v.gasRatio) if v.extraCovar != nil { if s.extraCovar == nil { s.extraCovar = &covar{} } s.extraCovar.Combine(v.extraCovar) } } totalTime += res.totalTime } fmt.Fprintf(os.Stderr, "\nCollecting gas keys\n") for k := range charges { keys = append(keys, k) } fmt.Println("Gas Price Deltas") sort.Strings(keys) for _, k := range keys { s := charges[k] fmt.Printf("%s: incr by %.4f~%.4f; tt %.4f~%.4f\n", k, s.gasRatio.Mean(), s.gasRatio.Stddev(), s.timeTaken.Mean(), s.timeTaken.Stddev()) if s.extraCovar != nil { fmt.Printf("\t correll: %.2f, tt = %.2f * extra + %.2f\n", s.extraCovar.Correl(), s.extraCovar.A(), s.extraCovar.B()) fmt.Printf("\t covar: %.2f, extra: %.2f~%.2f, tt2: %.2f~%.2f, count %.0f\n", s.extraCovar.Covariance(), s.extraCovar.meanX, s.extraCovar.StddevX(), s.extraCovar.meanY, s.extraCovar.StddevY(), s.extraCovar.n) } } sort.Slice(invocs, func(i, j int) bool { return invocs[i].Invoc.Duration > invocs[j].Invoc.Duration }) fmt.Println("Total time: ", totalTime) fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets)) n := 30 if len(invocs) < n { n = len(invocs) } fmt.Printf("Top %d most expensive calls:\n", n) for i := 0; i < n; i++ { inv := invocs[i].Invoc fmt.Printf("%s: %s %s %d %s\n", inv.Duration, inv.Msg.From, inv.Msg.To, inv.Msg.Method, invocs[i].TipSet) } return nil }, }