This is now "FVM" native. Changes include: 1. Don't treat "trace" messages like off-chain messages. E.g., don't include CIDs, versions, etc. 2. Include IPLD codecs where applicable. 3. Remove fields that aren't filled by the FVM (timing, some errors, code locations, etc.).
854 lines
22 KiB
854 lines
22 KiB
package main
import (
_ "net/http/pprof"
ocprom ""
bdg ""
badger ""
measure ""
metricsprometheus ""
badgerbs ""
lcli ""
_ ""
_ ""
_ ""
type TipSetExec struct {
TipSet types.TipSetKey
Trace []*api.InvocResult
Duration time.Duration
var importBenchCmd = &cli.Command{
Name: "import",
Usage: "Benchmark chain import and validation",
Subcommands: []*cli.Command{
Flags: []cli.Flag{
Name: "start-tipset",
Usage: "start validation at the given tipset key; in format cid1,cid2,cid3...",
Name: "end-tipset",
Usage: "halt validation at the given tipset key; in format cid1,cid2,cid3...",
Name: "genesis-tipset",
Usage: "genesis tipset key; in format cid1,cid2,cid3...",
Name: "start-height",
Usage: "start validation at given height; beware that chain traversal by height is very slow",
Name: "end-height",
Usage: "halt validation after given height; beware that chain traversal by height is very slow",
Name: "batch-seal-verify-threads",
Usage: "set the parallelism factor for batch seal verification",
Value: runtime.NumCPU(),
Name: "repodir",
Usage: "set the repo directory for the lotus bench run (defaults to /tmp)",
Name: "syscall-cache",
Usage: "read and write syscall results from datastore",
Name: "export-traces",
Usage: "should we export execution traces",
Value: true,
Name: "no-import",
Usage: "should we import the chain? if set to true chain has to be previously imported",
Name: "global-profile",
Value: true,
Name: "only-import",
Name: "use-native-badger",
Name: "car",
Usage: "path to CAR file; required for import; on validation, either " +
"a CAR path or the --head flag are required",
Name: "head",
Usage: "tipset key of the head, useful when benchmarking validation " +
"on an existing chain store, where a CAR is not available; " +
"if both --car and --head are provided, --head takes precedence " +
"over the CAR root; the format is cid1,cid2,cid3...",
Action: func(cctx *cli.Context) error {
metricsprometheus.Inject() //nolint:errcheck
vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads")
go func() {
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", prometheus.DefaultRegisterer)
exporter, err := ocprom.NewExporter(ocprom.Options{
Registry: registry,
Namespace: "lotus",
if err != nil {
log.Fatalf("could not create the prometheus stats exporter: %v", err)
http.Handle("/debug/metrics", exporter)
_ = http.ListenAndServe("localhost:6060", nil)
var tdir string
if rdir := cctx.String("repodir"); rdir != "" {
tdir = rdir
} else {
tmp, err := ioutil.TempDir("", "lotus-import-bench")
if err != nil {
return err
tdir = tmp
var (
ds datastore.Batching
bs blockstore.Blockstore
err error
switch {
case cctx.Bool("use-native-badger"):
log.Info("using native badger")
var opts badgerbs.Options
if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil {
return err
opts.SyncWrites = false
bs, err = badgerbs.Open(opts)
default: // legacy badger via datastore.
log.Info("using legacy badger")
bdgOpt := badger.DefaultOptions
bdgOpt.GcInterval = 0
bdgOpt.Options = bdg.DefaultOptions("")
bdgOpt.Options.SyncWrites = false
bdgOpt.Options.Truncate = true
bdgOpt.Options.DetectConflicts = false
ds, err = badger.NewDatastore(tdir, &bdgOpt)
if err != nil {
return err
if ds != nil {
ds = measure.New("dsbench", ds)
defer ds.Close() //nolint:errcheck
bs = blockstore.FromDatastore(ds)
if c, ok := bs.(io.Closer); ok {
defer c.Close() //nolint:errcheck
var verifier storiface.Verifier = ffiwrapper.ProofVerifier
if cctx.IsSet("syscall-cache") {
scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions)
if err != nil {
return xerrors.Errorf("opening syscall-cache datastore: %w", err)
defer scds.Close() //nolint:errcheck
verifier = &cachingVerifier{
ds: scds,
backend: verifier,
if cctx.Bool("only-gc") {
return nil
metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, bs, metadataDs, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, filcns.NewTipSetExecutor(), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil)
if err != nil {
return err
var carFile *os.File
// open the CAR file if one is provided.
if path := cctx.String("car"); path != "" {
var err error
if carFile, err = os.Open(path); err != nil {
return xerrors.Errorf("failed to open provided CAR file: %w", err)
startTime := time.Now()
// register a gauge that reports how long since the measurable
// operation began.
Name: "lotus_bench_time_taken_secs",
}, func() float64 {
return time.Since(startTime).Seconds()
defer func() {
end := time.Now().Format(time.RFC3339)
resp, err := http.Get("http://localhost:6060/debug/metrics")
if err != nil {
log.Warnf("failed to scape prometheus: %s", err)
metricsfi, err := os.Create("bench.metrics")
if err != nil {
log.Warnf("failed to write prometheus data: %s", err)
_, _ = io.Copy(metricsfi, resp.Body) //nolint:errcheck
_ = metricsfi.Close() //nolint:errcheck
writeProfile := func(name string) {
if file, err := os.Create(fmt.Sprintf("%s.%s.%s.pprof", name, startTime.Format(time.RFC3339), end)); err == nil {
if err := pprof.Lookup(name).WriteTo(file, 0); err != nil {
log.Warnf("failed to write %s pprof: %s", name, err)
_ = file.Close()
} else {
log.Warnf("failed to create %s pprof file: %s", name, err)
var head *types.TipSet
// --- IMPORT ---
if !cctx.Bool("no-import") {
if cctx.Bool("global-profile") {
prof, err := os.Create("bench.import.pprof")
if err != nil {
return err
defer prof.Close() //nolint:errcheck
if err := pprof.StartCPUProfile(prof); err != nil {
return err
// import is NOT suppressed; do it.
if carFile == nil { // a CAR is compulsory for the import.
return fmt.Errorf("no CAR file provided for import")
head, err = cs.Import(cctx.Context, carFile)
if err != nil {
return err
if cctx.Bool("only-import") {
return nil
// --- VALIDATION ---
// we are now preparing for the validation benchmark.
// a HEAD needs to be set; --head takes precedence over the root
// of the CAR, if both are provided.
if h := cctx.String("head"); h != "" {
cids, err := lcli.ParseTipSetString(h)
if err != nil {
return xerrors.Errorf("failed to parse head tipset key: %w", err)
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
if err != nil {
return err
} else if carFile != nil && head == nil {
cr, err := car.NewCarReader(carFile)
if err != nil {
return err
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cr.Header.Roots...))
if err != nil {
return err
} else if h == "" && carFile == nil {
return xerrors.Errorf("neither --car nor --head flags supplied")
log.Infof("chain head is tipset: %s", head.Key())
var genesis *types.TipSet
log.Infof("getting genesis block")
if tsk := cctx.String("genesis-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
genesis, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else {
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
// fallback to the slow path of walking the chain.
genesis, err = cs.GetTipsetByHeight(context.TODO(), 0, head, true)
if err != nil {
return err
if err = cs.SetGenesis(cctx.Context, genesis.Blocks()[0]); err != nil {
return err
// Resolve the end tipset, falling back to head if not provided.
end := head
if tsk := cctx.String("end-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
end, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("end-height"); h != 0 {
log.Infof("getting end tipset at height %d...", h)
end, err = cs.GetTipsetByHeight(cctx.Context, abi.ChainEpoch(h), head, true)
if err != nil {
return err
// Resolve the start tipset, if provided; otherwise, fallback to
// height 1 for a start point.
var (
startEpoch = abi.ChainEpoch(1)
start *types.TipSet
if tsk := cctx.String("start-tipset"); tsk != "" {
var cids []cid.Cid
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
start, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("start-height"); h != 0 {
log.Infof("getting start tipset at height %d...", h)
// lookback from the end tipset (which falls back to head if not supplied).
start, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), end, true)
if err != nil {
return err
if start != nil {
startEpoch = start.Height()
if err := cs.ForceHeadSilent(cctx.Context, start); err != nil {
// if err := cs.SetHead(start); err != nil {
return err
inverseChain := append(make([]*types.TipSet, 0, end.Height()), end)
for ts := end; ts.Height() > startEpoch; {
if h := ts.Height(); h%100 == 0 {
log.Infof("walking back the chain; loaded tipset at height %d...", h)
next, err := cs.LoadTipSet(cctx.Context, ts.Parents())
if err != nil {
return err
inverseChain = append(inverseChain, next)
ts = next
var enc *json.Encoder
if cctx.Bool("export-traces") {
ibj, err := os.Create("bench.json")
if err != nil {
return err
defer ibj.Close() //nolint:errcheck
enc = json.NewEncoder(ibj)
if cctx.Bool("global-profile") {
prof, err := os.Create("bench.validation.pprof")
if err != nil {
return err
defer prof.Close() //nolint:errcheck
if err := pprof.StartCPUProfile(prof); err != nil {
return err
for i := len(inverseChain) - 1; i >= 1; i-- {
cur := inverseChain[i]
start := time.Now()
log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids())
st, trace, err := stm.ExecutionTrace(context.TODO(), cur)
if err != nil {
return err
tse := &TipSetExec{
TipSet: cur.Key(),
Trace: trace,
Duration: time.Since(start),
if enc != nil {
if err := enc.Encode(tse); err != nil {
return xerrors.Errorf("failed to write out tipsetexec: %w", err)
if inverseChain[i-1].ParentState() != st {
lastTrace := tse.Trace
d, err := json.MarshalIndent(lastTrace, "", " ")
if err != nil {
//fmt.Println(statediff.Diff(context.Background(), bs, tschain[i-1].ParentState(), st, statediff.ExpandActors))
return xerrors.Errorf("tipset chain had state mismatch at height %d (%s != %s)", cur.Height(), cur.ParentState(), st)
return nil
type Invocation struct {
TipSet types.TipSetKey
Invoc *api.InvocResult
const GasPerNs = 10
func countGasCosts(et *types.ExecutionTrace) int64 {
var cgas int64
for _, gc := range et.GasCharges {
cgas += gc.ComputeGas
for _, sub := range et.Subcalls {
c := countGasCosts(&sub) //nolint
cgas += c
return cgas
type stats struct {
timeTaken meanVar
gasRatio meanVar
type covar struct {
meanX float64
meanY float64
c float64
n float64
m2x float64
m2y float64
func (cov1 *covar) Covariance() float64 {
return cov1.c / (cov1.n - 1)
func (cov1 *covar) VarianceX() float64 {
return cov1.m2x / (cov1.n - 1)
func (cov1 *covar) StddevX() float64 {
return math.Sqrt(cov1.VarianceX())
func (cov1 *covar) VarianceY() float64 {
return cov1.m2y / (cov1.n - 1)
func (cov1 *covar) StddevY() float64 {
return math.Sqrt(cov1.VarianceY())
func (cov1 *covar) AddPoint(x, y float64) {
dx := x - cov1.meanX
cov1.meanX += dx / cov1.n
dx2 := x - cov1.meanX
cov1.m2x += dx * dx2
dy := y - cov1.meanY
cov1.meanY += dy / cov1.n
dy2 := y - cov1.meanY
cov1.m2y += dy * dy2
cov1.c += dx * dy
func (cov1 *covar) Combine(cov2 *covar) {
if cov1.n == 0 {
*cov1 = *cov2
if cov2.n == 0 {
if cov1.n == 1 {
cpy := *cov2
cpy.AddPoint(cov2.meanX, cov2.meanY)
*cov1 = cpy
if cov2.n == 1 {
cov1.AddPoint(cov2.meanX, cov2.meanY)
out := covar{}
out.n = cov1.n + cov2.n
dx := cov1.meanX - cov2.meanX
out.meanX = cov1.meanX - dx*cov2.n/out.n
out.m2x = cov1.m2x + cov2.m2x + dx*dx*cov1.n*cov2.n/out.n
dy := cov1.meanY - cov2.meanY
out.meanY = cov1.meanY - dy*cov2.n/out.n
out.m2y = cov1.m2y + cov2.m2y + dy*dy*cov1.n*cov2.n/out.n
out.c = cov1.c + cov2.c + dx*dy*cov1.n*cov2.n/out.n
*cov1 = out
func (cov1 *covar) A() float64 {
return cov1.Covariance() / cov1.VarianceX()
func (cov1 *covar) B() float64 {
return cov1.meanY - cov1.meanX*cov1.A()
func (cov1 *covar) Correl() float64 {
return cov1.Covariance() / cov1.StddevX() / cov1.StddevY()
type meanVar struct {
n float64
mean float64
m2 float64
func (v1 *meanVar) AddPoint(value float64) {
// based on's_online_algorithm
delta := value - v1.mean
v1.mean += delta / v1.n
delta2 := value - v1.mean
v1.m2 += delta * delta2
func (v1 *meanVar) Variance() float64 {
return v1.m2 / (v1.n - 1)
func (v1 *meanVar) Mean() float64 {
return v1.mean
func (v1 *meanVar) Stddev() float64 {
return math.Sqrt(v1.Variance())
func (v1 *meanVar) Combine(v2 *meanVar) {
if v1.n == 0 {
*v1 = *v2
if v2.n == 0 {
if v1.n == 1 {
cpy := *v2
*v1 = cpy
if v2.n == 1 {
newCount := v1.n + v2.n
delta := v2.mean - v1.mean
meanDelta := delta * v2.n / newCount
m2 := v1.m2 + v2.m2 + delta*meanDelta*v1.n
v1.n = newCount
v1.mean += meanDelta
v1.m2 = m2
func tallyGasCharges(charges map[string]*stats, et types.ExecutionTrace) {
for _, gc := range et.GasCharges {
name := gc.Name
if name == "OnIpldGetEnd" {
tt := float64(gc.TimeTaken.Nanoseconds())
if name == "OnVerifyPost" && tt > 2e9 {
log.Warnf("Skipping abnormally long OnVerifyPost: %fs", tt/1e9)
// discard initial very long OnVerifyPost
compGas := gc.ComputeGas
s := charges[name]
if s == nil {
s = new(stats)
charges[name] = s
if compGas == 0 {
compGas = 1
ratio := tt / float64(compGas) * GasPerNs
for _, sub := range et.Subcalls {
tallyGasCharges(charges, sub)
var importAnalyzeCmd = &cli.Command{
Name: "analyze",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("must pass bench file to analyze")
return nil
go func() {
_ = http.ListenAndServe("localhost:6060", nil)
fi, err := os.Open(cctx.Args().First())
if err != nil {
return err
defer fi.Close() //nolint:errcheck
const nWorkers = 16
jsonIn := make(chan []byte, 2*nWorkers)
type result struct {
totalTime time.Duration
chargeStats map[string]*stats
expensiveInvocs []Invocation
results := make(chan result, nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
chargeStats := make(map[string]*stats)
var totalTime time.Duration
const invocsKeep = 32
var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep)
var leastExpensiveInvoc = time.Duration(0)
for {
b, ok := <-jsonIn
if !ok {
results <- result{
totalTime: totalTime,
chargeStats: chargeStats,
expensiveInvocs: expensiveInvocs,
var tse TipSetExec
err := json.Unmarshal(b, &tse)
if err != nil {
log.Warnf("error unmarshaling tipset: %+v", err)
totalTime += tse.Duration
for _, inv := range tse.Trace {
if inv.Duration > leastExpensiveInvoc {
expensiveInvocs = append(expensiveInvocs, Invocation{
TipSet: tse.TipSet,
Invoc: inv,
tallyGasCharges(chargeStats, inv.ExecutionTrace)
if len(expensiveInvocs) > 4*invocsKeep {
sort.Slice(expensiveInvocs, func(i, j int) bool {
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
n := 30
if len(expensiveInvocs) < n {
n = len(expensiveInvocs)
expensiveInvocs = expensiveInvocs[:n]
var totalTipsets int64
reader := bufio.NewReader(fi)
for {
b, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
return err
jsonIn <- b
fmt.Fprintf(os.Stderr, "\rProcessed %d tipsets", totalTipsets)
if err == io.EOF {
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "Collecting results\n")
var invocs []Invocation
var totalTime time.Duration
var keys []string
var charges = make(map[string]*stats)
for i := 0; i < nWorkers; i++ {
fmt.Fprintf(os.Stderr, "\rProcessing results from worker %d/%d", i+1, nWorkers)
res := <-results
invocs = append(invocs, res.expensiveInvocs...)
for k, v := range res.chargeStats {
s := charges[k]
if s == nil {
s = new(stats)
charges[k] = s
totalTime += res.totalTime
fmt.Fprintf(os.Stderr, "\nCollecting gas keys\n")
for k := range charges {
keys = append(keys, k)
fmt.Println("Gas Price Deltas")
for _, k := range keys {
s := charges[k]
fmt.Printf("%s: incr by %.4f~%.4f; tt %.4f~%.4f\n", k, s.gasRatio.Mean(), s.gasRatio.Stddev(),
s.timeTaken.Mean(), s.timeTaken.Stddev())
sort.Slice(invocs, func(i, j int) bool {
return invocs[i].Invoc.Duration > invocs[j].Invoc.Duration
fmt.Println("Total time: ", totalTime)
fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets))
if actorExec, ok := charges["OnActorExec"]; ok {
timeInActors := actorExec.timeTaken.Mean() * actorExec.timeTaken.n
fmt.Printf("Avarage time per epoch in actors: %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
if actorExecDone, ok := charges["OnMethodInvocationDone"]; ok {
timeInActors := actorExecDone.timeTaken.Mean() * actorExecDone.timeTaken.n
fmt.Printf("Avarage time per epoch in OnActorExecDone %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
n := 30
if len(invocs) < n {
n = len(invocs)
fmt.Printf("Top %d most expensive calls:\n", n)
for i := 0; i < n; i++ {
inv := invocs[i].Invoc
fmt.Printf("%s: %s %s %d %s\n", inv.Duration, inv.Msg.From, inv.Msg.To, inv.Msg.Method, invocs[i].TipSet)
return nil