lotus/cmd/lotus-bench/import.go

577 lines
12 KiB
Go
Raw Normal View History

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
2020-06-23 03:08:19 +00:00
"io"
"io/ioutil"
"math"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"runtime/pprof"
"sort"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger2"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/urfave/cli/v2"
)
type TipSetExec struct {
TipSet types.TipSetKey
Trace []*api.InvocResult
Duration time.Duration
}
var importBenchCmd = &cli.Command{
Name: "import",
Usage: "benchmark chain import and validation",
Subcommands: []*cli.Command{
importAnalyzeCmd,
},
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "height",
Usage: "halt validation after given height",
},
&cli.IntFlag{
Name: "batch-seal-verify-threads",
Usage: "set the parallelism factor for batch seal verification",
Value: runtime.NumCPU(),
},
},
Action: func(cctx *cli.Context) error {
vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads")
if !cctx.Args().Present() {
fmt.Println("must pass car file of chain to benchmark importing")
return nil
}
cfi, err := os.Open(cctx.Args().First())
if err != nil {
return err
}
defer cfi.Close() //nolint:errcheck // read only file
tdir, err := ioutil.TempDir("", "lotus-import-bench")
if err != nil {
return err
}
bds, err := badger.NewDatastore(tdir, nil)
if err != nil {
return err
}
bs := blockstore.NewBlockstore(bds)
2020-05-16 18:31:14 +00:00
cbs, err := blockstore.CachedBlockstore(context.TODO(), bs, blockstore.DefaultCacheOpts())
if err != nil {
return err
}
bs = cbs
ds := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, ds, vm.Syscalls(ffiwrapper.ProofVerifier))
stm := stmgr.NewStateManager(cs)
prof, err := os.Create("import-bench.prof")
if err != nil {
return err
}
defer prof.Close() //nolint:errcheck
if err := pprof.StartCPUProfile(prof); err != nil {
return err
}
head, err := cs.Import(cfi)
if err != nil {
return err
}
if h := cctx.Int64("height"); h != 0 {
tsh, err := cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true)
if err != nil {
return err
}
head = tsh
}
ts := head
tschain := []*types.TipSet{ts}
for ts.Height() != 0 {
next, err := cs.LoadTipSet(ts.Parents())
if err != nil {
return err
}
tschain = append(tschain, next)
ts = next
}
2020-06-23 03:08:19 +00:00
ibj, err := os.Create("import-bench.json")
if err != nil {
return err
}
defer ibj.Close() //nolint:errcheck
enc := json.NewEncoder(ibj)
var lastTse *TipSetExec
lastState := tschain[len(tschain)-1].ParentState()
for i := len(tschain) - 2; i >= 0; i-- {
cur := tschain[i]
log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids())
if cur.ParentState() != lastState {
2020-06-23 03:08:19 +00:00
lastTrace := lastTse.Trace
d, err := json.MarshalIndent(lastTrace, "", " ")
if err != nil {
panic(err)
}
fmt.Println("TRACE")
fmt.Println(string(d))
return xerrors.Errorf("tipset chain had state mismatch at height %d (%s != %s)", cur.Height(), cur.ParentState(), lastState)
}
start := time.Now()
st, trace, err := stm.ExecutionTrace(context.TODO(), cur)
if err != nil {
return err
}
stripCallers(trace)
2020-06-23 03:08:19 +00:00
lastTse = &TipSetExec{
TipSet: cur.Key(),
Trace: trace,
Duration: time.Since(start),
2020-06-23 03:08:19 +00:00
}
lastState = st
2020-06-23 03:08:19 +00:00
if err := enc.Encode(lastTse); err != nil {
return xerrors.Errorf("failed to write out tipsetexec: %w", err)
}
}
pprof.StopCPUProfile()
return nil
},
}
func walkExecutionTrace(et *types.ExecutionTrace) {
for _, gc := range et.GasCharges {
gc.Callers = nil
}
for _, sub := range et.Subcalls {
walkExecutionTrace(&sub) //nolint:scopelint,gosec
}
}
func stripCallers(trace []*api.InvocResult) {
for _, t := range trace {
walkExecutionTrace(&t.ExecutionTrace)
}
}
type Invocation struct {
TipSet types.TipSetKey
Invoc *api.InvocResult
}
const GasPerNs = 10
func countGasCosts(et *types.ExecutionTrace) (int64, int64) {
var cgas, vgas int64
for _, gc := range et.GasCharges {
cgas += gc.ComputeGas
vgas += gc.VirtualComputeGas
}
for _, sub := range et.Subcalls {
c, v := countGasCosts(&sub)
cgas += c
vgas += v
}
return cgas, vgas
}
func compStats(vals []float64) (float64, float64) {
var sum float64
for _, v := range vals {
sum += v
}
av := sum / float64(len(vals))
var varsum float64
for _, v := range vals {
delta := av - v
varsum += delta * delta
}
return av, math.Sqrt(varsum / float64(len(vals)))
}
type stats struct {
timeTaken meanVar
gasRatio meanVar
extra *meanVar
extraCovar *covar
}
type covar struct {
meanX float64
meanY float64
c float64
n float64
m2 float64
}
func (cov1 *covar) Covariance() float64 {
return cov1.c / (cov1.n - 1)
}
func (cov1 *covar) Variance() float64 {
return cov1.m2 / (cov1.n - 1)
}
func (v1 *covar) Stddev() float64 {
return math.Sqrt(v1.Variance())
}
func (cov1 *covar) AddPoint(x, y float64) {
cov1.n += 1
dx := x - cov1.meanX
cov1.meanX += dx / cov1.n
dx2 := x - cov1.meanX // compute x variance using partial result for covariance
cov1.m2 += dx * dx2
cov1.meanY += (y - cov1.meanY) / cov1.n
cov1.c += dx * (y - cov1.meanY)
}
func (cov1 *covar) Combine(cov2 *covar) {
if cov1.n == 0 {
*cov1 = *cov2
return
}
if cov2.n == 0 {
return
}
if cov1.n == 1 {
cpy := *cov2
cpy.AddPoint(cov2.meanX, cov2.meanY)
*cov1 = cpy
return
}
if cov2.n == 1 {
cov1.AddPoint(cov2.meanX, cov2.meanY)
}
out := covar{}
out.n = cov1.n + cov2.n
dx := cov1.meanX - cov2.meanX
out.meanX = cov1.meanX - dx*cov2.n/out.n
out.m2 = cov1.m2 + cov2.m2 + dx*dx*cov1.n*cov2.n/out.n
dy := cov1.meanY - cov2.meanY
out.meanY = cov1.meanY + dy*cov2.n/out.n
out.c = cov1.c + cov2.c + dx*dy*cov1.n*cov2.n/out.n
*cov1 = out
}
func (cov1 *covar) A() float64 {
return cov1.Covariance() / cov1.Variance()
}
func (cov1 *covar) B() float64 {
return cov1.meanY - cov1.meanX*cov1.A()
}
type meanVar struct {
n float64
mean float64
m2 float64
}
func (v1 *meanVar) AddPoint(value float64) {
// based on https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
v1.n += 1
delta := value - v1.mean
v1.mean += delta / v1.n
delta2 := value - v1.mean
v1.m2 += delta * delta2
}
func (v1 *meanVar) Variance() float64 {
return v1.m2 / (v1.n - 1)
}
func (v1 *meanVar) Mean() float64 {
return v1.mean
}
func (v1 *meanVar) Stddev() float64 {
return math.Sqrt(v1.Variance())
}
func (v1 *meanVar) Combine(v2 *meanVar) {
if v1.n == 0 {
*v1 = *v2
return
}
if v2.n == 0 {
return
}
if v1.n == 1 {
cpy := *v2
cpy.AddPoint(v1.mean)
*v1 = cpy
return
}
if v2.n == 1 {
v1.AddPoint(v2.mean)
return
}
newCount := v1.n + v2.n
delta := v2.mean - v1.mean
meanDelta := delta * v2.n / newCount
m2 := v1.m2 + v2.m2 + delta*meanDelta*v1.n
v1.n = newCount
v1.mean += meanDelta
v1.m2 = m2
}
func tallyGasCharges(charges map[string]*stats, et types.ExecutionTrace) {
for i, gc := range et.GasCharges {
name := gc.Name
if name == "OnIpldGetStart" {
continue
}
tt := float64(gc.TimeTaken.Nanoseconds())
if name == "OnIpldGet" {
prev := et.GasCharges[i-1]
if prev.Name != "OnIpldGetStart" {
log.Warn("OnIpldGet without OnIpldGetStart")
}
tt += float64(prev.TimeTaken.Nanoseconds())
}
if eString, ok := gc.Extra.(string); ok {
name += "-" + eString
}
compGas := gc.VirtualComputeGas
if compGas == 0 {
compGas = 1
}
s := charges[name]
if s == nil {
s = new(stats)
charges[name] = s
}
if extra, ok := gc.Extra.(float64); ok {
if s.extraCovar == nil {
s.extraCovar = &covar{}
s.extra = &meanVar{}
}
s.extraCovar.AddPoint(extra, tt)
s.extra.AddPoint(extra)
}
s.timeTaken.AddPoint(tt)
ratio := tt / float64(compGas) * GasPerNs
s.gasRatio.AddPoint(ratio)
}
for _, sub := range et.Subcalls {
tallyGasCharges(charges, sub)
}
}
var importAnalyzeCmd = &cli.Command{
Name: "analyze",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("must pass bench file to analyze")
return nil
}
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
fi, err := os.Open(cctx.Args().First())
if err != nil {
return err
}
const nWorkers = 16
jsonIn := make(chan []byte, 2*nWorkers)
type result struct {
totalTime time.Duration
chargeStats map[string]*stats
expensiveInvocs []Invocation
}
results := make(chan result, nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
chargeStats := make(map[string]*stats)
var totalTime time.Duration
const invocsKeep = 32
var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep)
var leastExpensiveInvoc = time.Duration(0)
for {
b, ok := <-jsonIn
if !ok {
results <- result{
totalTime: totalTime,
chargeStats: chargeStats,
expensiveInvocs: expensiveInvocs,
}
return
}
var tse TipSetExec
err := json.Unmarshal(b, &tse)
if err != nil {
log.Warnf("error unmarshaling tipset: %+v", err)
continue
}
totalTime += tse.Duration
for _, inv := range tse.Trace {
if inv.Duration > leastExpensiveInvoc {
expensiveInvocs = append(expensiveInvocs, Invocation{
TipSet: tse.TipSet,
Invoc: inv,
})
}
tallyGasCharges(chargeStats, inv.ExecutionTrace)
}
if len(expensiveInvocs) > 4*invocsKeep {
sort.Slice(expensiveInvocs, func(i, j int) bool {
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
})
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
n := 30
if len(expensiveInvocs) < n {
n = len(expensiveInvocs)
}
expensiveInvocs = expensiveInvocs[:n]
}
}
}()
}
var totalTipsets int64
reader := bufio.NewReader(fi)
2020-06-23 03:08:19 +00:00
for {
b, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
2020-06-23 03:08:19 +00:00
}
return err
2020-06-23 03:08:19 +00:00
}
totalTipsets++
jsonIn <- b
fmt.Printf("\rProcessed %d tipsets", totalTipsets)
if err == io.EOF {
break
}
}
close(jsonIn)
fmt.Printf("\n")
fmt.Printf("Collecting results\n")
var invocs []Invocation
var totalTime time.Duration
var keys []string
var charges = make(map[string]*stats)
for i := 0; i < nWorkers; i++ {
fmt.Printf("\rProcessing results from worker %d/%d", i+1, nWorkers)
res := <-results
invocs = append(invocs, res.expensiveInvocs...)
for k, v := range res.chargeStats {
s := charges[k]
if s == nil {
s = new(stats)
charges[k] = s
}
s.timeTaken.Combine(&v.timeTaken)
s.gasRatio.Combine(&v.gasRatio)
if v.extra != nil {
if s.extra == nil {
s.extra = &meanVar{}
s.extraCovar = &covar{}
}
s.extra.Combine(v.extra)
s.extraCovar.Combine(v.extraCovar)
}
}
totalTime += res.totalTime
}
fmt.Printf("\nCollecting gas keys\n")
for k := range charges {
keys = append(keys, k)
}
fmt.Println("Gas Price Deltas")
sort.Strings(keys)
for _, k := range keys {
s := charges[k]
fmt.Printf("%s: incr by %f~%f; tt %f~%f\n", k, s.gasRatio.Mean(), s.gasRatio.Stddev(),
s.timeTaken.Mean(), s.timeTaken.Stddev())
if s.extra != nil {
fmt.Printf("\t covar: %f, tt = %f * extra + %f\n", s.extraCovar.Covariance(),
s.extraCovar.A(), s.extraCovar.B())
fmt.Printf("\t extra: %f~%f\n", s.extra.Mean(), s.extra.Stddev())
}
}
sort.Slice(invocs, func(i, j int) bool {
return invocs[i].Invoc.Duration > invocs[j].Invoc.Duration
})
fmt.Println("Total time: ", totalTime)
fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets))
n := 30
if len(invocs) < n {
n = len(invocs)
}
fmt.Printf("Top %d most expensive calls:\n", n)
for i := 0; i < n; i++ {
inv := invocs[i].Invoc
fmt.Printf("%s: %s %s %d %s\n", inv.Duration, inv.Msg.From, inv.Msg.To, inv.Msg.Method, invocs[i].TipSet)
}
return nil
},
}