Make lotus-bench import analyze multithreaded and less RAM hungry

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-06-26 21:54:20 +02:00
parent c97720e731
commit e65215f1b5
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA

View File

@ -255,7 +255,55 @@ var importAnalyzeCmd = &cli.Command{
}
dec := json.NewDecoder(fi)
var results []TipSetExec
const nWorkers = 16
tseIn := make(chan TipSetExec, 2*nWorkers)
type result struct {
totalTime time.Duration
chargeDeltas map[string][]float64
expensiveInvocs []Invocation
}
results := make(chan result, nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
chargeDeltas := make(map[string][]float64)
var totalTime time.Duration
var expensiveInvocs []Invocation
var leastExpensiveInvoc = time.Duration(0)
for {
tse, ok := <-tseIn
if !ok {
results <- result{
totalTime: totalTime,
chargeDeltas: chargeDeltas,
expensiveInvocs: expensiveInvocs,
}
return
}
totalTime += tse.Duration
for _, inv := range tse.Trace {
if inv.Duration > leastExpensiveInvoc {
expensiveInvocs = append(expensiveInvocs, Invocation{
TipSet: tse.TipSet,
Invoc: inv,
})
}
tallyGasCharges(chargeDeltas, &inv.ExecutionTrace)
}
sort.Slice(expensiveInvocs, func(i, j int) bool {
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
})
if len(expensiveInvocs) != 0 {
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
}
}
}()
}
var totalTipsets int64
for {
var tse TipSetExec
if err := dec.Decode(&tse); err != nil {
@ -267,32 +315,31 @@ var importAnalyzeCmd = &cli.Command{
}
break
}
results = append(results, tse)
totalTipsets++
tseIn <- tse
if totalTipsets%10 == 0 {
fmt.Printf("\rProcessed %d tipsets", totalTipsets)
}
}
chargeDeltas := make(map[string][]float64)
close(tseIn)
fmt.Printf("\n")
fmt.Printf("Collecting results\n")
var invocs []Invocation
var totalTime time.Duration
for i, r := range results {
_ = i
totalTime += r.Duration
for _, inv := range r.Trace {
invocs = append(invocs, Invocation{
TipSet: r.TipSet,
Invoc: inv,
})
cgas, vgas := countGasCosts(&inv.ExecutionTrace)
fmt.Printf("Invocation: %d %s: %s %d -> %0.2f\n", inv.Msg.Method, inv.Msg.To, inv.Duration, cgas+vgas, float64(GasPerNs*inv.Duration.Nanoseconds())/float64(cgas+vgas))
tallyGasCharges(chargeDeltas, &inv.ExecutionTrace)
var keys []string
var chargeDeltas = make(map[string][]float64)
for i := 0; i < nWorkers; i++ {
fmt.Printf("\rProcessing results from worker %d/%d", i+1, nWorkers)
res := <-results
invocs = append(invocs, res.expensiveInvocs...)
for k, v := range res.chargeDeltas {
chargeDeltas[k] = append(chargeDeltas[k], v...)
}
totalTime += res.totalTime
}
var keys []string
fmt.Printf("\nCollecting gas keys\n")
for k := range chargeDeltas {
keys = append(keys, k)
}
@ -311,7 +358,7 @@ var importAnalyzeCmd = &cli.Command{
})
fmt.Println("Total time: ", totalTime)
fmt.Println("Average time per epoch: ", totalTime/time.Duration(len(results)))
fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets))
n := 30
if len(invocs) < n {