977 lines
25 KiB
Go
977 lines
25 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"sort"
|
|
"time"
|
|
|
|
ocprom "contrib.go.opencensus.io/exporter/prometheus"
|
|
"github.com/cockroachdb/pebble"
|
|
"github.com/cockroachdb/pebble/bloom"
|
|
"github.com/ipfs/go-cid"
|
|
metricsi "github.com/ipfs/go-metrics-interface"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
lcli "github.com/filecoin-project/lotus/cli"
|
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
|
badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger"
|
|
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
|
|
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
metricsprometheus "github.com/ipfs/go-metrics-prometheus"
|
|
"github.com/ipld/go-car"
|
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
|
|
|
bdg "github.com/dgraph-io/badger/v2"
|
|
"github.com/ipfs/go-datastore"
|
|
badger "github.com/ipfs/go-ds-badger2"
|
|
measure "github.com/ipfs/go-ds-measure"
|
|
pebbleds "github.com/ipfs/go-ds-pebble"
|
|
|
|
"github.com/urfave/cli/v2"
|
|
"golang.org/x/xerrors"
|
|
)
|
|
|
|
type TipSetExec struct {
|
|
TipSet types.TipSetKey
|
|
Trace []*api.InvocResult
|
|
Duration time.Duration
|
|
}
|
|
|
|
var importBenchCmd = &cli.Command{
|
|
Name: "import",
|
|
Usage: "benchmark chain import and validation",
|
|
Subcommands: []*cli.Command{
|
|
importAnalyzeCmd,
|
|
},
|
|
Flags: []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "start-tipset",
|
|
Usage: "start validation at the given tipset key; in format cid1,cid2,cid3...",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "end-tipset",
|
|
Usage: "halt validation at the given tipset key; in format cid1,cid2,cid3...",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "genesis-tipset",
|
|
Usage: "genesis tipset key; in format cid1,cid2,cid3...",
|
|
},
|
|
&cli.Int64Flag{
|
|
Name: "start-height",
|
|
Usage: "start validation at given height; beware that chain traversal by height is very slow",
|
|
},
|
|
&cli.Int64Flag{
|
|
Name: "end-height",
|
|
Usage: "halt validation after given height; beware that chain traversal by height is very slow",
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "batch-seal-verify-threads",
|
|
Usage: "set the parallelism factor for batch seal verification",
|
|
Value: runtime.NumCPU(),
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "repodir",
|
|
Usage: "set the repo directory for the lotus bench run (defaults to /tmp)",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "syscall-cache",
|
|
Usage: "read and write syscall results from datastore",
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "export-traces",
|
|
Usage: "should we export execution traces",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "no-import",
|
|
Usage: "should we import the chain? if set to true chain has to be previously imported",
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "global-profile",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "only-import",
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "use-pebble",
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "use-native-badger",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "car",
|
|
Usage: "path to CAR file; required for import; on validation, either " +
|
|
"a CAR path or the --head flag are required",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "head",
|
|
Usage: "tipset key of the head, useful when benchmarking validation " +
|
|
"on an existing chain store, where a CAR is not available; " +
|
|
"if both --car and --head are provided, --head takes precedence " +
|
|
"over the CAR root; the format is cid1,cid2,cid3...",
|
|
},
|
|
},
|
|
Action: func(cctx *cli.Context) error {
|
|
metricsprometheus.Inject() //nolint:errcheck
|
|
vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads")
|
|
|
|
go func() {
|
|
// Prometheus globals are exposed as interfaces, but the prometheus
|
|
// OpenCensus exporter expects a concrete *Registry. The concrete type of
|
|
// the globals are actually *Registry, so we downcast them, staying
|
|
// defensive in case things change under the hood.
|
|
registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry)
|
|
if !ok {
|
|
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", prometheus.DefaultRegisterer)
|
|
return
|
|
}
|
|
exporter, err := ocprom.NewExporter(ocprom.Options{
|
|
Registry: registry,
|
|
Namespace: "lotus",
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("could not create the prometheus stats exporter: %v", err)
|
|
}
|
|
|
|
http.Handle("/debug/metrics", exporter)
|
|
|
|
http.ListenAndServe("localhost:6060", nil) //nolint:errcheck
|
|
}()
|
|
|
|
var tdir string
|
|
if rdir := cctx.String("repodir"); rdir != "" {
|
|
tdir = rdir
|
|
} else {
|
|
tmp, err := ioutil.TempDir("", "lotus-import-bench")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tdir = tmp
|
|
}
|
|
|
|
var (
|
|
ds datastore.Batching
|
|
bs blockstore.Blockstore
|
|
err error
|
|
)
|
|
|
|
switch {
|
|
case cctx.Bool("use-pebble"):
|
|
log.Info("using pebble")
|
|
cache := 512
|
|
ds, err = pebbleds.NewDatastore(tdir, &pebble.Options{
|
|
// Pebble has a single combined cache area and the write
|
|
// buffers are taken from this too. Assign all available
|
|
// memory allowance for cache.
|
|
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
|
|
// The size of memory table(as well as the write buffer).
|
|
// Note, there may have more than two memory tables in the system.
|
|
// MemTableStopWritesThreshold can be configured to avoid the memory abuse.
|
|
MemTableSize: cache * 1024 * 1024 / 4,
|
|
// The default compaction concurrency(1 thread),
|
|
// Here use all available CPUs for faster compaction.
|
|
MaxConcurrentCompactions: runtime.NumCPU(),
|
|
// Per-level options. Options for at least one level must be specified. The
|
|
// options for the last level are used for all subsequent levels.
|
|
Levels: []pebble.LevelOptions{
|
|
{TargetFileSize: 16 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10), Compression: pebble.NoCompression},
|
|
},
|
|
Logger: log,
|
|
})
|
|
|
|
case cctx.Bool("use-native-badger"):
|
|
log.Info("using native badger")
|
|
var opts badgerbs.Options
|
|
if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil {
|
|
return err
|
|
}
|
|
opts.SyncWrites = false
|
|
bs, err = badgerbs.Open(opts)
|
|
|
|
default: // legacy badger via datastore.
|
|
log.Info("using legacy badger")
|
|
bdgOpt := badger.DefaultOptions
|
|
bdgOpt.GcInterval = 0
|
|
bdgOpt.Options = bdg.DefaultOptions("")
|
|
bdgOpt.Options.SyncWrites = false
|
|
bdgOpt.Options.Truncate = true
|
|
bdgOpt.Options.DetectConflicts = false
|
|
|
|
ds, err = badger.NewDatastore(tdir, &bdgOpt)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ds != nil {
|
|
ds = measure.New("dsbench", ds)
|
|
defer ds.Close() //nolint:errcheck
|
|
bs = blockstore.NewBlockstore(ds)
|
|
}
|
|
|
|
if c, ok := bs.(io.Closer); ok {
|
|
defer c.Close() //nolint:errcheck
|
|
}
|
|
|
|
ctx := metricsi.CtxScope(context.Background(), "lotus")
|
|
cacheOpts := blockstore.DefaultCacheOpts()
|
|
cacheOpts.HasBloomFilterSize = 0
|
|
bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier
|
|
if cctx.IsSet("syscall-cache") {
|
|
scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions)
|
|
if err != nil {
|
|
return xerrors.Errorf("opening syscall-cache datastore: %w", err)
|
|
}
|
|
defer scds.Close() //nolint:errcheck
|
|
|
|
verifier = &cachingVerifier{
|
|
ds: scds,
|
|
backend: verifier,
|
|
}
|
|
}
|
|
if cctx.Bool("only-gc") {
|
|
return nil
|
|
}
|
|
|
|
metadataDs := datastore.NewMapDatastore()
|
|
cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil)
|
|
defer cs.Close() //nolint:errcheck
|
|
|
|
stm := stmgr.NewStateManager(cs)
|
|
|
|
startTime := time.Now()
|
|
|
|
// register a gauge that reports how long since the measurable
|
|
// operation began.
|
|
promauto.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "lotus_bench_time_taken_secs",
|
|
}, func() float64 {
|
|
return time.Since(startTime).Seconds()
|
|
})
|
|
|
|
defer func() {
|
|
end := time.Now().Format(time.RFC3339)
|
|
|
|
resp, err := http.Get("http://localhost:6060/debug/metrics")
|
|
if err != nil {
|
|
log.Warnf("failed to scape prometheus: %s", err)
|
|
}
|
|
|
|
metricsfi, err := os.Create("bench.metrics")
|
|
if err != nil {
|
|
log.Warnf("failed to write prometheus data: %s", err)
|
|
}
|
|
|
|
_, _ = io.Copy(metricsfi, resp.Body) //nolint:errcheck
|
|
_ = metricsfi.Close() //nolint:errcheck
|
|
|
|
writeProfile := func(name string) {
|
|
if file, err := os.Create(fmt.Sprintf("%s.%s.%s.pprof", name, startTime.Format(time.RFC3339), end)); err == nil {
|
|
if err := pprof.Lookup(name).WriteTo(file, 0); err != nil {
|
|
log.Warnf("failed to write %s pprof: %s", name, err)
|
|
}
|
|
_ = file.Close()
|
|
} else {
|
|
log.Warnf("failed to create %s pprof file: %s", name, err)
|
|
}
|
|
}
|
|
|
|
writeProfile("heap")
|
|
writeProfile("allocs")
|
|
}()
|
|
|
|
var carFile *os.File
|
|
|
|
// open the CAR file if one is provided.
|
|
if path := cctx.String("car"); path != "" {
|
|
var err error
|
|
if carFile, err = os.Open(path); err != nil {
|
|
return xerrors.Errorf("failed to open provided CAR file: %w", err)
|
|
}
|
|
}
|
|
|
|
var head *types.TipSet
|
|
|
|
// --- IMPORT ---
|
|
if !cctx.Bool("no-import") {
|
|
if cctx.Bool("global-profile") {
|
|
prof, err := os.Create("bench.import.pprof")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer prof.Close() //nolint:errcheck
|
|
|
|
if err := pprof.StartCPUProfile(prof); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// import is NOT suppressed; do it.
|
|
if carFile == nil { // a CAR is compulsory for the import.
|
|
return fmt.Errorf("no CAR file provided for import")
|
|
}
|
|
|
|
head, err = cs.Import(carFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pprof.StopCPUProfile()
|
|
}
|
|
|
|
if cctx.Bool("only-import") {
|
|
return nil
|
|
}
|
|
|
|
// --- VALIDATION ---
|
|
//
|
|
// we are now preparing for the validation benchmark.
|
|
// a HEAD needs to be set; --head takes precedence over the root
|
|
// of the CAR, if both are provided.
|
|
if h := cctx.String("head"); h != "" {
|
|
cids, err := lcli.ParseTipSetString(h)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to parse head tipset key: %w", err)
|
|
}
|
|
|
|
head, err = cs.LoadTipSet(types.NewTipSetKey(cids...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else if carFile != nil && head == nil {
|
|
cr, err := car.NewCarReader(carFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
head, err = cs.LoadTipSet(types.NewTipSetKey(cr.Header.Roots...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else if h == "" && carFile == nil {
|
|
return xerrors.Errorf("neither --car nor --head flags supplied")
|
|
}
|
|
|
|
log.Infof("chain head is tipset: %s", head.Key())
|
|
|
|
var genesis *types.TipSet
|
|
log.Infof("getting genesis block")
|
|
if tsk := cctx.String("genesis-tipset"); tsk != "" {
|
|
var cids []cid.Cid
|
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
|
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
|
|
}
|
|
genesis, err = cs.LoadTipSet(types.NewTipSetKey(cids...))
|
|
} else {
|
|
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
|
|
// fallback to the slow path of walking the chain.
|
|
genesis, err = cs.GetTipsetByHeight(context.TODO(), 0, head, true)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = cs.SetGenesis(genesis.Blocks()[0]); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Resolve the end tipset, falling back to head if not provided.
|
|
end := head
|
|
if tsk := cctx.String("end-tipset"); tsk != "" {
|
|
var cids []cid.Cid
|
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
|
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
|
|
}
|
|
end, err = cs.LoadTipSet(types.NewTipSetKey(cids...))
|
|
} else if h := cctx.Int64("end-height"); h != 0 {
|
|
log.Infof("getting end tipset at height %d...", h)
|
|
end, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Resolve the start tipset, if provided; otherwise, fallback to
|
|
// height 1 for a start point.
|
|
var (
|
|
startEpoch = abi.ChainEpoch(1)
|
|
start *types.TipSet
|
|
)
|
|
|
|
if tsk := cctx.String("start-tipset"); tsk != "" {
|
|
var cids []cid.Cid
|
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
|
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
|
|
}
|
|
start, err = cs.LoadTipSet(types.NewTipSetKey(cids...))
|
|
} else if h := cctx.Int64("start-height"); h != 0 {
|
|
log.Infof("getting start tipset at height %d...", h)
|
|
// lookback from the end tipset (which falls back to head if not supplied).
|
|
start, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), end, true)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if start != nil {
|
|
startEpoch = start.Height()
|
|
if err := cs.ForceHeadSilent(context.Background(), start); err != nil {
|
|
// if err := cs.SetHead(start); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
inverseChain := append(make([]*types.TipSet, 0, end.Height()), end)
|
|
for ts := end; ts.Height() > startEpoch; {
|
|
if h := ts.Height(); h%100 == 0 {
|
|
log.Infof("walking back the chain; loaded tipset at height %d...", h)
|
|
}
|
|
next, err := cs.LoadTipSet(ts.Parents())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
inverseChain = append(inverseChain, next)
|
|
ts = next
|
|
}
|
|
|
|
var enc *json.Encoder
|
|
if cctx.Bool("export-traces") {
|
|
ibj, err := os.Create("bench.json")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ibj.Close() //nolint:errcheck
|
|
|
|
enc = json.NewEncoder(ibj)
|
|
}
|
|
|
|
if cctx.Bool("global-profile") {
|
|
prof, err := os.Create("bench.validation.pprof")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer prof.Close() //nolint:errcheck
|
|
|
|
if err := pprof.StartCPUProfile(prof); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for i := len(inverseChain) - 1; i >= 1; i-- {
|
|
cur := inverseChain[i]
|
|
start := time.Now()
|
|
log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids())
|
|
st, trace, err := stm.ExecutionTrace(context.TODO(), cur)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tse := &TipSetExec{
|
|
TipSet: cur.Key(),
|
|
Trace: trace,
|
|
Duration: time.Since(start),
|
|
}
|
|
if enc != nil {
|
|
stripCallers(tse.Trace)
|
|
|
|
if err := enc.Encode(tse); err != nil {
|
|
return xerrors.Errorf("failed to write out tipsetexec: %w", err)
|
|
}
|
|
}
|
|
if inverseChain[i-1].ParentState() != st {
|
|
stripCallers(tse.Trace)
|
|
lastTrace := tse.Trace
|
|
d, err := json.MarshalIndent(lastTrace, "", " ")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Println("TRACE")
|
|
fmt.Println(string(d))
|
|
//fmt.Println(statediff.Diff(context.Background(), bs, tschain[i-1].ParentState(), st, statediff.ExpandActors))
|
|
return xerrors.Errorf("tipset chain had state mismatch at height %d (%s != %s)", cur.Height(), cur.ParentState(), st)
|
|
}
|
|
}
|
|
|
|
pprof.StopCPUProfile()
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
func walkExecutionTrace(et *types.ExecutionTrace) {
|
|
for _, gc := range et.GasCharges {
|
|
gc.Callers = nil
|
|
}
|
|
for _, sub := range et.Subcalls {
|
|
walkExecutionTrace(&sub) //nolint:scopelint,gosec
|
|
}
|
|
}
|
|
|
|
func stripCallers(trace []*api.InvocResult) {
|
|
for _, t := range trace {
|
|
walkExecutionTrace(&t.ExecutionTrace)
|
|
}
|
|
}
|
|
|
|
type Invocation struct {
|
|
TipSet types.TipSetKey
|
|
Invoc *api.InvocResult
|
|
}
|
|
|
|
const GasPerNs = 10
|
|
|
|
func countGasCosts(et *types.ExecutionTrace) (int64, int64) {
|
|
var cgas, vgas int64
|
|
|
|
for _, gc := range et.GasCharges {
|
|
cgas += gc.ComputeGas
|
|
vgas += gc.VirtualComputeGas
|
|
}
|
|
|
|
for _, sub := range et.Subcalls {
|
|
c, v := countGasCosts(&sub) //nolint
|
|
cgas += c
|
|
vgas += v
|
|
}
|
|
|
|
return cgas, vgas
|
|
}
|
|
|
|
type stats struct {
|
|
timeTaken meanVar
|
|
gasRatio meanVar
|
|
|
|
extraCovar *covar
|
|
}
|
|
|
|
type covar struct {
|
|
meanX float64
|
|
meanY float64
|
|
c float64
|
|
n float64
|
|
m2x float64
|
|
m2y float64
|
|
}
|
|
|
|
func (cov1 *covar) Covariance() float64 {
|
|
return cov1.c / (cov1.n - 1)
|
|
}
|
|
|
|
func (cov1 *covar) VarianceX() float64 {
|
|
return cov1.m2x / (cov1.n - 1)
|
|
}
|
|
|
|
func (cov1 *covar) StddevX() float64 {
|
|
return math.Sqrt(cov1.VarianceX())
|
|
}
|
|
|
|
func (cov1 *covar) VarianceY() float64 {
|
|
return cov1.m2y / (cov1.n - 1)
|
|
}
|
|
|
|
func (cov1 *covar) StddevY() float64 {
|
|
return math.Sqrt(cov1.VarianceY())
|
|
}
|
|
|
|
func (cov1 *covar) AddPoint(x, y float64) {
|
|
cov1.n++
|
|
|
|
dx := x - cov1.meanX
|
|
cov1.meanX += dx / cov1.n
|
|
dx2 := x - cov1.meanX
|
|
cov1.m2x += dx * dx2
|
|
|
|
dy := y - cov1.meanY
|
|
cov1.meanY += dy / cov1.n
|
|
dy2 := y - cov1.meanY
|
|
cov1.m2y += dy * dy2
|
|
|
|
cov1.c += dx * dy
|
|
}
|
|
|
|
func (cov1 *covar) Combine(cov2 *covar) {
|
|
if cov1.n == 0 {
|
|
*cov1 = *cov2
|
|
return
|
|
}
|
|
if cov2.n == 0 {
|
|
return
|
|
}
|
|
|
|
if cov1.n == 1 {
|
|
cpy := *cov2
|
|
cpy.AddPoint(cov2.meanX, cov2.meanY)
|
|
*cov1 = cpy
|
|
return
|
|
}
|
|
if cov2.n == 1 {
|
|
cov1.AddPoint(cov2.meanX, cov2.meanY)
|
|
}
|
|
|
|
out := covar{}
|
|
out.n = cov1.n + cov2.n
|
|
|
|
dx := cov1.meanX - cov2.meanX
|
|
out.meanX = cov1.meanX - dx*cov2.n/out.n
|
|
out.m2x = cov1.m2x + cov2.m2x + dx*dx*cov1.n*cov2.n/out.n
|
|
|
|
dy := cov1.meanY - cov2.meanY
|
|
out.meanY = cov1.meanY - dy*cov2.n/out.n
|
|
out.m2y = cov1.m2y + cov2.m2y + dy*dy*cov1.n*cov2.n/out.n
|
|
|
|
out.c = cov1.c + cov2.c + dx*dy*cov1.n*cov2.n/out.n
|
|
*cov1 = out
|
|
}
|
|
|
|
func (cov1 *covar) A() float64 {
|
|
return cov1.Covariance() / cov1.VarianceX()
|
|
}
|
|
func (cov1 *covar) B() float64 {
|
|
return cov1.meanY - cov1.meanX*cov1.A()
|
|
}
|
|
func (cov1 *covar) Correl() float64 {
|
|
return cov1.Covariance() / cov1.StddevX() / cov1.StddevY()
|
|
}
|
|
|
|
type meanVar struct {
|
|
n float64
|
|
mean float64
|
|
m2 float64
|
|
}
|
|
|
|
func (v1 *meanVar) AddPoint(value float64) {
|
|
// based on https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
|
|
v1.n++
|
|
delta := value - v1.mean
|
|
v1.mean += delta / v1.n
|
|
delta2 := value - v1.mean
|
|
v1.m2 += delta * delta2
|
|
}
|
|
|
|
func (v1 *meanVar) Variance() float64 {
|
|
return v1.m2 / (v1.n - 1)
|
|
}
|
|
func (v1 *meanVar) Mean() float64 {
|
|
return v1.mean
|
|
}
|
|
func (v1 *meanVar) Stddev() float64 {
|
|
return math.Sqrt(v1.Variance())
|
|
}
|
|
|
|
func (v1 *meanVar) Combine(v2 *meanVar) {
|
|
if v1.n == 0 {
|
|
*v1 = *v2
|
|
return
|
|
}
|
|
if v2.n == 0 {
|
|
return
|
|
}
|
|
if v1.n == 1 {
|
|
cpy := *v2
|
|
cpy.AddPoint(v1.mean)
|
|
*v1 = cpy
|
|
return
|
|
}
|
|
if v2.n == 1 {
|
|
v1.AddPoint(v2.mean)
|
|
return
|
|
}
|
|
|
|
newCount := v1.n + v2.n
|
|
delta := v2.mean - v1.mean
|
|
meanDelta := delta * v2.n / newCount
|
|
m2 := v1.m2 + v2.m2 + delta*meanDelta*v1.n
|
|
v1.n = newCount
|
|
v1.mean += meanDelta
|
|
v1.m2 = m2
|
|
}
|
|
|
|
func getExtras(ex interface{}) (*string, *float64) {
|
|
if t, ok := ex.(string); ok {
|
|
return &t, nil
|
|
}
|
|
if size, ok := ex.(float64); ok {
|
|
return nil, &size
|
|
}
|
|
if exMap, ok := ex.(map[string]interface{}); ok {
|
|
t, tok := exMap["type"].(string)
|
|
size, sok := exMap["size"].(float64)
|
|
if tok && sok {
|
|
return &t, &size
|
|
}
|
|
if tok {
|
|
return &t, nil
|
|
}
|
|
if sok {
|
|
return nil, &size
|
|
}
|
|
return nil, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func tallyGasCharges(charges map[string]*stats, et types.ExecutionTrace) {
|
|
for i, gc := range et.GasCharges {
|
|
name := gc.Name
|
|
if name == "OnIpldGetEnd" {
|
|
continue
|
|
}
|
|
tt := float64(gc.TimeTaken.Nanoseconds())
|
|
if name == "OnVerifyPost" && tt > 2e9 {
|
|
log.Warnf("Skipping abnormally long OnVerifyPost: %fs", tt/1e9)
|
|
// discard initial very long OnVerifyPost
|
|
continue
|
|
}
|
|
eType, eSize := getExtras(gc.Extra)
|
|
|
|
if name == "OnIpldGet" {
|
|
next := &types.GasTrace{}
|
|
if i+1 < len(et.GasCharges) {
|
|
next = et.GasCharges[i+1]
|
|
}
|
|
if next.Name != "OnIpldGetEnd" {
|
|
log.Warn("OnIpldGet without OnIpldGetEnd")
|
|
} else {
|
|
_, size := getExtras(next.Extra)
|
|
eSize = size
|
|
}
|
|
}
|
|
if eType != nil {
|
|
name += "-" + *eType
|
|
}
|
|
compGas := gc.ComputeGas
|
|
if compGas == 0 {
|
|
compGas = gc.VirtualComputeGas
|
|
}
|
|
if compGas == 0 {
|
|
compGas = 1
|
|
}
|
|
s := charges[name]
|
|
if s == nil {
|
|
s = new(stats)
|
|
charges[name] = s
|
|
}
|
|
|
|
if eSize != nil {
|
|
if s.extraCovar == nil {
|
|
s.extraCovar = &covar{}
|
|
}
|
|
s.extraCovar.AddPoint(*eSize, tt)
|
|
}
|
|
|
|
s.timeTaken.AddPoint(tt)
|
|
|
|
ratio := tt / float64(compGas) * GasPerNs
|
|
s.gasRatio.AddPoint(ratio)
|
|
}
|
|
for _, sub := range et.Subcalls {
|
|
tallyGasCharges(charges, sub)
|
|
}
|
|
}
|
|
|
|
var importAnalyzeCmd = &cli.Command{
|
|
Name: "analyze",
|
|
Action: func(cctx *cli.Context) error {
|
|
if !cctx.Args().Present() {
|
|
fmt.Println("must pass bench file to analyze")
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
http.ListenAndServe("localhost:6060", nil) //nolint:errcheck
|
|
}()
|
|
|
|
fi, err := os.Open(cctx.Args().First())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer fi.Close() //nolint:errcheck
|
|
|
|
const nWorkers = 16
|
|
jsonIn := make(chan []byte, 2*nWorkers)
|
|
type result struct {
|
|
totalTime time.Duration
|
|
chargeStats map[string]*stats
|
|
expensiveInvocs []Invocation
|
|
}
|
|
|
|
results := make(chan result, nWorkers)
|
|
|
|
for i := 0; i < nWorkers; i++ {
|
|
go func() {
|
|
chargeStats := make(map[string]*stats)
|
|
var totalTime time.Duration
|
|
const invocsKeep = 32
|
|
var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep)
|
|
var leastExpensiveInvoc = time.Duration(0)
|
|
|
|
for {
|
|
b, ok := <-jsonIn
|
|
if !ok {
|
|
results <- result{
|
|
totalTime: totalTime,
|
|
chargeStats: chargeStats,
|
|
expensiveInvocs: expensiveInvocs,
|
|
}
|
|
return
|
|
}
|
|
|
|
var tse TipSetExec
|
|
err := json.Unmarshal(b, &tse)
|
|
if err != nil {
|
|
log.Warnf("error unmarshaling tipset: %+v", err)
|
|
continue
|
|
}
|
|
|
|
totalTime += tse.Duration
|
|
for _, inv := range tse.Trace {
|
|
if inv.Duration > leastExpensiveInvoc {
|
|
expensiveInvocs = append(expensiveInvocs, Invocation{
|
|
TipSet: tse.TipSet,
|
|
Invoc: inv,
|
|
})
|
|
}
|
|
|
|
tallyGasCharges(chargeStats, inv.ExecutionTrace)
|
|
}
|
|
if len(expensiveInvocs) > 4*invocsKeep {
|
|
sort.Slice(expensiveInvocs, func(i, j int) bool {
|
|
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
|
|
})
|
|
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
|
|
n := 30
|
|
if len(expensiveInvocs) < n {
|
|
n = len(expensiveInvocs)
|
|
}
|
|
expensiveInvocs = expensiveInvocs[:n]
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
var totalTipsets int64
|
|
reader := bufio.NewReader(fi)
|
|
for {
|
|
b, err := reader.ReadBytes('\n')
|
|
if err != nil && err != io.EOF {
|
|
if e, ok := err.(*json.SyntaxError); ok {
|
|
log.Warnf("syntax error at byte offset %d", e.Offset)
|
|
}
|
|
return err
|
|
}
|
|
totalTipsets++
|
|
jsonIn <- b
|
|
fmt.Fprintf(os.Stderr, "\rProcessed %d tipsets", totalTipsets)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
close(jsonIn)
|
|
fmt.Fprintf(os.Stderr, "\n")
|
|
fmt.Fprintf(os.Stderr, "Collecting results\n")
|
|
|
|
var invocs []Invocation
|
|
var totalTime time.Duration
|
|
var keys []string
|
|
var charges = make(map[string]*stats)
|
|
for i := 0; i < nWorkers; i++ {
|
|
fmt.Fprintf(os.Stderr, "\rProcessing results from worker %d/%d", i+1, nWorkers)
|
|
res := <-results
|
|
invocs = append(invocs, res.expensiveInvocs...)
|
|
for k, v := range res.chargeStats {
|
|
s := charges[k]
|
|
if s == nil {
|
|
s = new(stats)
|
|
charges[k] = s
|
|
}
|
|
s.timeTaken.Combine(&v.timeTaken)
|
|
s.gasRatio.Combine(&v.gasRatio)
|
|
|
|
if v.extraCovar != nil {
|
|
if s.extraCovar == nil {
|
|
s.extraCovar = &covar{}
|
|
}
|
|
s.extraCovar.Combine(v.extraCovar)
|
|
}
|
|
}
|
|
totalTime += res.totalTime
|
|
}
|
|
|
|
fmt.Fprintf(os.Stderr, "\nCollecting gas keys\n")
|
|
for k := range charges {
|
|
keys = append(keys, k)
|
|
}
|
|
|
|
fmt.Println("Gas Price Deltas")
|
|
sort.Strings(keys)
|
|
for _, k := range keys {
|
|
s := charges[k]
|
|
fmt.Printf("%s: incr by %.4f~%.4f; tt %.4f~%.4f\n", k, s.gasRatio.Mean(), s.gasRatio.Stddev(),
|
|
s.timeTaken.Mean(), s.timeTaken.Stddev())
|
|
if s.extraCovar != nil {
|
|
fmt.Printf("\t correll: %.2f, tt = %.2f * extra + %.2f\n", s.extraCovar.Correl(),
|
|
s.extraCovar.A(), s.extraCovar.B())
|
|
fmt.Printf("\t covar: %.2f, extra: %.2f~%.2f, tt2: %.2f~%.2f, count %.0f\n",
|
|
s.extraCovar.Covariance(), s.extraCovar.meanX, s.extraCovar.StddevX(),
|
|
s.extraCovar.meanY, s.extraCovar.StddevY(), s.extraCovar.n)
|
|
}
|
|
}
|
|
|
|
sort.Slice(invocs, func(i, j int) bool {
|
|
return invocs[i].Invoc.Duration > invocs[j].Invoc.Duration
|
|
})
|
|
|
|
fmt.Println("Total time: ", totalTime)
|
|
fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets))
|
|
if actorExec, ok := charges["OnActorExec"]; ok {
|
|
timeInActors := actorExec.timeTaken.Mean() * actorExec.timeTaken.n
|
|
fmt.Printf("Avarage time per epoch in actors: %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
|
|
}
|
|
if actorExecDone, ok := charges["OnMethodInvocationDone"]; ok {
|
|
timeInActors := actorExecDone.timeTaken.Mean() * actorExecDone.timeTaken.n
|
|
fmt.Printf("Avarage time per epoch in OnActorExecDone %s (%.1f%%)\n", time.Duration(timeInActors)/time.Duration(totalTipsets), timeInActors/float64(totalTime)*100)
|
|
}
|
|
|
|
n := 30
|
|
if len(invocs) < n {
|
|
n = len(invocs)
|
|
}
|
|
fmt.Printf("Top %d most expensive calls:\n", n)
|
|
for i := 0; i < n; i++ {
|
|
inv := invocs[i].Invoc
|
|
fmt.Printf("%s: %s %s %d %s\n", inv.Duration, inv.Msg.From, inv.Msg.To, inv.Msg.Method, invocs[i].TipSet)
|
|
}
|
|
return nil
|
|
},
|
|
}
|