lotus/cmd/lotus-bench/import.go
2023-11-15 13:06:51 +01:00

840 lines
22 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"runtime/pprof"
"sort"
"time"
ocprom "contrib.go.opencensus.io/exporter/prometheus"
bdg "github.com/dgraph-io/badger/v2"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger2"
measure "github.com/ipfs/go-ds-measure"
metricsprometheus "github.com/ipfs/go-metrics-prometheus"
"github.com/ipld/go-car"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/delegated"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type TipSetExec struct {
TipSet types.TipSetKey
Trace []*api.InvocResult
Duration time.Duration
}
var importBenchCmd = &cli.Command{
Name: "import",
Usage: "Benchmark chain import and validation",
Subcommands: []*cli.Command{
importAnalyzeCmd,
},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "start-tipset",
Usage: "start validation at the given tipset key; in format cid1,cid2,cid3...",
},
&cli.StringFlag{
Name: "end-tipset",
Usage: "halt validation at the given tipset key; in format cid1,cid2,cid3...",
},
&cli.StringFlag{
Name: "genesis-tipset",
Usage: "genesis tipset key; in format cid1,cid2,cid3...",
},
&cli.Int64Flag{
Name: "start-height",
Usage: "start validation at given height; beware that chain traversal by height is very slow",
},
&cli.Int64Flag{
Name: "end-height",
Usage: "halt validation after given height; beware that chain traversal by height is very slow",
},
&cli.IntFlag{
Name: "batch-seal-verify-threads",
Usage: "set the parallelism factor for batch seal verification",
Value: runtime.NumCPU(),
},
&cli.StringFlag{
Name: "repodir",
Usage: "set the repo directory for the lotus bench run (defaults to /tmp)",
},
&cli.StringFlag{
Name: "syscall-cache",
Usage: "read and write syscall results from datastore",
},
&cli.BoolFlag{
Name: "export-traces",
Usage: "should we export execution traces",
Value: true,
},
&cli.BoolFlag{
Name: "no-import",
Usage: "should we import the chain? if set to true chain has to be previously imported",
},
&cli.BoolFlag{
Name: "global-profile",
Value: true,
},
&cli.BoolFlag{
Name: "only-import",
},
&cli.BoolFlag{
Name: "use-native-badger",
},
&cli.StringFlag{
Name: "car",
Usage: "path to CAR file; required for import; on validation, either " +
"a CAR path or the --head flag are required",
},
&cli.StringFlag{
Name: "head",
Usage: "tipset key of the head, useful when benchmarking validation " +
"on an existing chain store, where a CAR is not available; " +
"if both --car and --head are provided, --head takes precedence " +
"over the CAR root; the format is cid1,cid2,cid3...",
},
},
Action: func(cctx *cli.Context) error {
metricsprometheus.Inject() //nolint:errcheck
vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads")
go func() {
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", prometheus.DefaultRegisterer)
return
}
exporter, err := ocprom.NewExporter(ocprom.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Fatalf("could not create the prometheus stats exporter: %v", err)
}
http.Handle("/debug/metrics", exporter)
_ = http.ListenAndServe("localhost:6060", nil)
}()
var tdir string
if rdir := cctx.String("repodir"); rdir != "" {
tdir = rdir
} else {
tmp, err := os.MkdirTemp("", "lotus-import-bench")
if err != nil {
return err
}
tdir = tmp
}
var (
ds datastore.Batching
bs blockstore.Blockstore
err error
)
switch {
case cctx.Bool("use-native-badger"):
log.Info("using native badger")
var opts badgerbs.Options
if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil {
return err
}
opts.SyncWrites = false
bs, err = badgerbs.Open(opts)
default: // legacy badger via datastore.
log.Info("using legacy badger")
bdgOpt := badger.DefaultOptions
bdgOpt.GcInterval = 0
bdgOpt.Options = bdg.DefaultOptions("")
bdgOpt.Options.SyncWrites = false
bdgOpt.Options.Truncate = true
bdgOpt.Options.DetectConflicts = false
ds, err = badger.NewDatastore(tdir, &bdgOpt)
}
if err != nil {
return err
}
if ds != nil {
ds = measure.New("dsbench", ds)
defer ds.Close() //nolint:errcheck
bs = blockstore.FromDatastore(ds)
}
if c, ok := bs.(io.Closer); ok {
defer c.Close() //nolint:errcheck
}
var verifier storiface.Verifier = ffiwrapper.ProofVerifier
if cctx.IsSet("syscall-cache") {
scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions)
if err != nil {
return xerrors.Errorf("opening syscall-cache datastore: %w", err)
}
defer scds.Close() //nolint:errcheck
verifier = &cachingVerifier{
ds: scds,
backend: verifier,
}
}
if cctx.Bool("only-gc") {
return nil
}
metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, bs, metadataDs, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs, index.DummyMsgIndex)
if err != nil {
return err
}
var carFile *os.File
// open the CAR file if one is provided.
if path := cctx.String("car"); path != "" {
var err error
if carFile, err = os.Open(path); err != nil {
return xerrors.Errorf("failed to open provided CAR file: %w", err)
}
}
startTime := time.Now()
// register a gauge that reports how long since the measurable
// operation began.
promauto.NewGaugeFunc(prometheus.GaugeOpts{
Name: "lotus_bench_time_taken_secs",
}, func() float64 {
return time.Since(startTime).Seconds()
})
defer func() {
end := time.Now().Format(time.RFC3339)
resp, err := http.Get("http://localhost:6060/debug/metrics")
if err != nil {
log.Warnf("failed to scape prometheus: %s", err)
}
metricsfi, err := os.Create("bench.metrics")
if err != nil {
log.Warnf("failed to write prometheus data: %s", err)
}
_, _ = io.Copy(metricsfi, resp.Body) //nolint:errcheck
_ = metricsfi.Close() //nolint:errcheck
writeProfile := func(name string) {
if file, err := os.Create(fmt.Sprintf("%s.%s.%s.pprof", name, startTime.Format(time.RFC3339), end)); err == nil {
if err := pprof.Lookup(name).WriteTo(file, 0); err != nil {
log.Warnf("failed to write %s pprof: %s", name, err)
}
_ = file.Close()
} else {
log.Warnf("failed to create %s pprof file: %s", name, err)
}
}
writeProfile("heap")
writeProfile("allocs")
}()
var head *types.TipSet
// --- IMPORT ---
if !cctx.Bool("no-import") {
if cctx.Bool("global-profile") {
prof, err := os.Create("bench.import.pprof")
if err != nil {
return err
}
defer prof.Close() //nolint:errcheck
if err := pprof.StartCPUProfile(prof); err != nil {
return err
}
}
// import is NOT suppressed; do it.
if carFile == nil { // a CAR is compulsory for the import.
return fmt.Errorf("no CAR file provided for import")
}
head, err = cs.Import(cctx.Context, carFile)
if err != nil {
return err
}
pprof.StopCPUProfile()
}
if cctx.Bool("only-import") {
return nil
}
// --- VALIDATION ---
//
// we are now preparing for the validation benchmark.
// a HEAD needs to be set; --head takes precedence over the root
// of the CAR, if both are provided.
if h := cctx.String("head"); h != "" {
cids, err := lcli.ParseTipSetString(h)
if err != nil {
return xerrors.Errorf("failed to parse head tipset key: %w", err)
}
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
if err != nil {
return err
}
} else if carFile != nil && head == nil {
cr, err := car.NewCarReader(carFile)
if err != nil {
return err
}
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cr.Header.Roots...))
if err != nil {
return err
}
} else if h == "" && carFile == nil {
return xerrors.Errorf("neither --car nor --head flags supplied")
}
log.Infof("chain head is tipset: %s", head.Key())
var genesis *types.TipSet
log.Infof("getting genesis block")
if tsk := cctx.String("genesis-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
}
genesis, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else {
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
// fallback to the slow path of walking the chain.
genesis, err = cs.GetTipsetByHeight(context.TODO(), 0, head, true)
}
if err != nil {
return err
}
if err = cs.SetGenesis(cctx.Context, genesis.Blocks()[0]); err != nil {
return err
}
// Resolve the end tipset, falling back to head if not provided.
end := head
if tsk := cctx.String("end-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
}
end, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("end-height"); h != 0 {
log.Infof("getting end tipset at height %d...", h)
end, err = cs.GetTipsetByHeight(cctx.Context, abi.ChainEpoch(h), head, true)
}
if err != nil {
return err
}
// Resolve the start tipset, if provided; otherwise, fallback to
// height 1 for a start point.
var (
startEpoch = abi.ChainEpoch(1)
start *types.TipSet
)
if tsk := cctx.String("start-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
}
start, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("start-height"); h != 0 {
log.Infof("getting start tipset at height %d...", h)
// lookback from the end tipset (which falls back to head if not supplied).
start, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), end, true)
}
if err != nil {
return err
}
if start != nil {
startEpoch = start.Height()
if err := cs.ForceHeadSilent(cctx.Context, start); err != nil {
// if err := cs.SetHead(start); err != nil {
return err
}
}
inverseChain := append(make([]*types.TipSet, 0, end.Height()), end)
for ts := end; ts.Height() > startEpoch; {
if h := ts.Height(); h%100 == 0 {
log.Infof("walking back the chain; loaded tipset at height %d...", h)
}
next, err := cs.LoadTipSet(cctx.Context, ts.Parents())
if err != nil {
return err
}
inverseChain = append(inverseChain, next)
ts = next
}
var enc *json.Encoder
if cctx.Bool("export-traces") {
ibj, err := os.Create("bench.json")
if err != nil {
return err
}
defer ibj.Close() //nolint:errcheck
enc = json.NewEncoder(ibj)
}
if cctx.Bool("global-profile") {
prof, err := os.Create("bench.validation.pprof")
if err != nil {
return err
}
defer prof.Close() //nolint:errcheck
if err := pprof.StartCPUProfile(prof); err != nil {
return err
}
}
for i := len(inverseChain) - 1; i >= 1; i-- {
cur := inverseChain[i]
start := time.Now()
log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids())
st, trace, err := stm.ExecutionTrace(context.TODO(), cur)
if err != nil {
return err
}
tse := &TipSetExec{
TipSet: cur.Key(),
Trace: trace,
Duration: time.Since(start),
}
if enc != nil {
if err := enc.Encode(tse); err != nil {
return xerrors.Errorf("failed to write out tipsetexec: %w", err)
}
}
if inverseChain[i-1].ParentState() != st {
lastTrace := tse.Trace
d, err := json.MarshalIndent(lastTrace, "", " ")
if err != nil {
panic(err)
}
fmt.Println("TRACE")
fmt.Println(string(d))
//fmt.Println(statediff.Diff(context.Background(), bs, tschain[i-1].ParentState(), st, statediff.ExpandActors))
return xerrors.Errorf("tipset chain had state mismatch at height %d (%s != %s)", cur.Height(), cur.ParentState(), st)
}
}
pprof.StopCPUProfile()
return nil
},
}
type Invocation struct {
TipSet types.TipSetKey
Invoc *api.InvocResult
}
const GasPerNs = 10
type stats struct {
timeTaken meanVar
gasRatio meanVar
}
type covar struct {
meanX float64
meanY float64
c float64
n float64
m2x float64
m2y float64
}
func (cov1 *covar) Covariance() float64 {
return cov1.c / (cov1.n - 1)
}
func (cov1 *covar) VarianceX() float64 {
return cov1.m2x / (cov1.n - 1)
}
func (cov1 *covar) StddevX() float64 {
return math.Sqrt(cov1.VarianceX())
}
func (cov1 *covar) VarianceY() float64 {
return cov1.m2y / (cov1.n - 1)
}
func (cov1 *covar) StddevY() float64 {
return math.Sqrt(cov1.VarianceY())
}
func (cov1 *covar) AddPoint(x, y float64) {
cov1.n++
dx := x - cov1.meanX
cov1.meanX += dx / cov1.n
dx2 := x - cov1.meanX
cov1.m2x += dx * dx2
dy := y - cov1.meanY
cov1.meanY += dy / cov1.n
dy2 := y - cov1.meanY
cov1.m2y += dy * dy2
cov1.c += dx * dy
}
func (cov1 *covar) Combine(cov2 *covar) {
if cov1.n == 0 {
*cov1 = *cov2
return
}
if cov2.n == 0 {
return
}
if cov1.n == 1 {
cpy := *cov2
cpy.AddPoint(cov2.meanX, cov2.meanY)
*cov1 = cpy
return
}
if cov2.n == 1 {
cov1.AddPoint(cov2.meanX, cov2.meanY)
}
out := covar{}
out.n = cov1.n + cov2.n
dx := cov1.meanX - cov2.meanX
out.meanX = cov1.meanX - dx*cov2.n/out.n
out.m2x = cov1.m2x + cov2.m2x + dx*dx*cov1.n*cov2.n/out.n
dy := cov1.meanY - cov2.meanY
out.meanY = cov1.meanY - dy*cov2.n/out.n
out.m2y = cov1.m2y + cov2.m2y + dy*dy*cov1.n*cov2.n/out.n
out.c = cov1.c + cov2.c + dx*dy*cov1.n*cov2.n/out.n
*cov1 = out
}
func (cov1 *covar) A() float64 {
return cov1.Covariance() / cov1.VarianceX()
}
func (cov1 *covar) B() float64 {
return cov1.meanY - cov1.meanX*cov1.A()
}
func (cov1 *covar) Correl() float64 {
return cov1.Covariance() / cov1.StddevX() / cov1.StddevY()
}
type meanVar struct {
n float64
mean float64
m2 float64
}
func (v1 *meanVar) AddPoint(value float64) {
// based on https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
v1.n++
delta := value - v1.mean
v1.mean += delta / v1.n
delta2 := value - v1.mean
v1.m2 += delta * delta2
}
func (v1 *meanVar) Variance() float64 {
return v1.m2 / (v1.n - 1)
}
func (v1 *meanVar) Mean() float64 {
return v1.mean
}
func (v1 *meanVar) Stddev() float64 {
return math.Sqrt(v1.Variance())
}
func (v1 *meanVar) Combine(v2 *meanVar) {
if v1.n == 0 {
*v1 = *v2
return
}
if v2.n == 0 {
return
}
if v1.n == 1 {
cpy := *v2
cpy.AddPoint(v1.mean)
*v1 = cpy
return
}
if v2.n == 1 {
v1.AddPoint(v2.mean)
return
}
newCount := v1.n + v2.n
delta := v2.mean - v1.mean
meanDelta := delta * v2.n / newCount
m2 := v1.m2 + v2.m2 + delta*meanDelta*v1.n
v1.n = newCount
v1.mean += meanDelta
v1.m2 = m2
}
func tallyGasCharges(charges map[string]*stats, et types.ExecutionTrace) {
for _, gc := range et.GasCharges {
name := gc.Name
if name == "OnIpldGetEnd" {
continue
}
tt := float64(gc.TimeTaken.Nanoseconds())
if name == "OnVerifyPost" && tt > 2e9 {
log.Warnf("Skipping abnormally long OnVerifyPost: %fs", tt/1e9)
// discard initial very long OnVerifyPost
continue
}
compGas := gc.ComputeGas
s := charges[name]
if s == nil {
s = new(stats)
charges[name] = s
}
s.timeTaken.AddPoint(tt)
if compGas == 0 {
compGas = 1
}
ratio := tt / float64(compGas) * GasPerNs
s.gasRatio.AddPoint(ratio)
}
for _, sub := range et.Subcalls {
tallyGasCharges(charges, sub)
}
}
var importAnalyzeCmd = &cli.Command{
Name: "analyze",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("must pass bench file to analyze")
return nil
}
go func() {
_ = http.ListenAndServe("localhost:6060", nil)
}()
fi, err := os.Open(cctx.Args().First())
if err != nil {
return err
}
defer fi.Close() //nolint:errcheck
const nWorkers = 16
jsonIn := make(chan []byte, 2*nWorkers)
type result struct {
totalTime time.Duration
chargeStats map[string]*stats
expensiveInvocs []Invocation
}
results := make(chan result, nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
chargeStats := make(map[string]*stats)
var totalTime time.Duration
const invocsKeep = 32
var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep)
var leastExpensiveInvoc = time.Duration(0)
for {
b, ok := <-jsonIn
if !ok {
results <- result{
totalTime: totalTime,
chargeStats: chargeStats,
expensiveInvocs: expensiveInvocs,
}
return
}
var tse TipSetExec
err := json.Unmarshal(b, &tse)
if err != nil {
log.Warnf("error unmarshaling tipset: %+v", err)
continue
}
totalTime += tse.Duration
for _, inv := range tse.Trace {
if inv.Duration > leastExpensiveInvoc {
expensiveInvocs = append(expensiveInvocs, Invocation{
TipSet: tse.TipSet,
Invoc: inv,
})
}
tallyGasCharges(chargeStats, inv.ExecutionTrace)
}
if len(expensiveInvocs) > 4*invocsKeep {
sort.Slice(expensiveInvocs, func(i, j int) bool {
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
})
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
n := 30
if len(expensiveInvocs) < n {
n = len(expensiveInvocs)
}
expensiveInvocs = expensiveInvocs[:n]
}
}
}()
}
var totalTipsets int64
reader := bufio.NewReader(fi)
for {
b, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
}
return err
}
totalTipsets++
jsonIn <- b
fmt.Fprintf(os.Stderr, "\rProcessed %d tipsets", totalTipsets)
if err == io.EOF {
break
}
}
close(jsonIn)
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "Collecting results\n")
var invocs []Invocation
var totalTime time.Duration
var keys []string
var charges = make(map[string]*stats)
for i := 0; i < nWorkers; i++ {
fmt.Fprintf(os.Stderr, "\rProcessing results from worker %d/%d", i+1, nWorkers)
res := <-results
invocs = append(invocs, res.expensiveInvocs...)
for k, v := range res.chargeStats {
s := charges[k]
if s == nil {
s = new(stats)
charges[k] = s
}
s.timeTaken.Combine(&v.timeTaken)
s.gasRatio.Combine(&v.gasRatio)
}
totalTime += res.totalTime
}
fmt.Fprintf(os.Stderr, "\nCollecting gas keys\n")
for k := range charges {
keys = append(keys, k)
}
fmt.Println("Gas Price Deltas")
sort.Strings(keys)
for _, k := range keys {
s := charges[k]
fmt.Printf("%s: incr by %.4f~%.4f; tt %.4f~%.4f\n", k, s.gasRatio.Mean(), s.gasRatio.Stddev(),
s.timeTaken.Mean(), s.timeTaken.Stddev())
}
sort.Slice(invocs, func(i, j int) bool {
return invocs[i].Invoc.Duration > invocs[j].Invoc.Duration
})
fmt.Println("Total time: ", totalTime)
fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets))
if actorExec, ok := charges["OnActorExec"]; ok {
timeInActors := actorExec.timeTaken.Mean() * actorExec.timeTaken.n
fmt.Printf("Avarage time per epoch in actors: %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
}
if actorExecDone, ok := charges["OnMethodInvocationDone"]; ok {
timeInActors := actorExecDone.timeTaken.Mean() * actorExecDone.timeTaken.n
fmt.Printf("Avarage time per epoch in OnActorExecDone %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
}
n := 30
if len(invocs) < n {
n = len(invocs)
}
fmt.Printf("Top %d most expensive calls:\n", n)
for i := 0; i < n; i++ {
inv := invocs[i].Invoc
fmt.Printf("%s: %s %s %d %s\n", inv.Duration, inv.Msg.From, inv.Msg.To, inv.Msg.Method, invocs[i].TipSet)
}
return nil
},
}