From e65215f1b54cf45d77f2449a062b8bb8099ca0e0 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 26 Jun 2020 21:54:20 +0200 Subject: [PATCH] Make lotus-bench import analyze multithreaded and less RAM hungry Signed-off-by: Jakub Sztandera --- cmd/lotus-bench/import.go | 89 ++++++++++++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 21 deletions(-) diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 71457a708..58de65fb8 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -255,7 +255,55 @@ var importAnalyzeCmd = &cli.Command{ } dec := json.NewDecoder(fi) - var results []TipSetExec + const nWorkers = 16 + tseIn := make(chan TipSetExec, 2*nWorkers) + type result struct { + totalTime time.Duration + chargeDeltas map[string][]float64 + expensiveInvocs []Invocation + } + + results := make(chan result, nWorkers) + + for i := 0; i < nWorkers; i++ { + go func() { + chargeDeltas := make(map[string][]float64) + var totalTime time.Duration + var expensiveInvocs []Invocation + var leastExpensiveInvoc = time.Duration(0) + + for { + tse, ok := <-tseIn + if !ok { + results <- result{ + totalTime: totalTime, + chargeDeltas: chargeDeltas, + expensiveInvocs: expensiveInvocs, + } + return + } + totalTime += tse.Duration + for _, inv := range tse.Trace { + if inv.Duration > leastExpensiveInvoc { + expensiveInvocs = append(expensiveInvocs, Invocation{ + TipSet: tse.TipSet, + Invoc: inv, + }) + } + + tallyGasCharges(chargeDeltas, &inv.ExecutionTrace) + } + sort.Slice(expensiveInvocs, func(i, j int) bool { + return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration + }) + if len(expensiveInvocs) != 0 { + leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration + } + } + }() + } + + var totalTipsets int64 for { var tse TipSetExec if err := dec.Decode(&tse); err != nil { @@ -267,32 +315,31 @@ var importAnalyzeCmd = &cli.Command{ } break } - results = append(results, tse) + totalTipsets++ + tseIn <- tse + if totalTipsets%10 == 0 { + fmt.Printf("\rProcessed %d tipsets", totalTipsets) + } } - - chargeDeltas := make(map[string][]float64) + close(tseIn) + fmt.Printf("\n") + fmt.Printf("Collecting results\n") var invocs []Invocation var totalTime time.Duration - for i, r := range results { - _ = i - totalTime += r.Duration - - for _, inv := range r.Trace { - invocs = append(invocs, Invocation{ - TipSet: r.TipSet, - Invoc: inv, - }) - - cgas, vgas := countGasCosts(&inv.ExecutionTrace) - fmt.Printf("Invocation: %d %s: %s %d -> %0.2f\n", inv.Msg.Method, inv.Msg.To, inv.Duration, cgas+vgas, float64(GasPerNs*inv.Duration.Nanoseconds())/float64(cgas+vgas)) - - tallyGasCharges(chargeDeltas, &inv.ExecutionTrace) - + var keys []string + var chargeDeltas = make(map[string][]float64) + for i := 0; i < nWorkers; i++ { + fmt.Printf("\rProcessing results from worker %d/%d", i+1, nWorkers) + res := <-results + invocs = append(invocs, res.expensiveInvocs...) + for k, v := range res.chargeDeltas { + chargeDeltas[k] = append(chargeDeltas[k], v...) } + totalTime += res.totalTime } - var keys []string + fmt.Printf("\nCollecting gas keys\n") for k := range chargeDeltas { keys = append(keys, k) } @@ -311,7 +358,7 @@ var importAnalyzeCmd = &cli.Command{ }) fmt.Println("Total time: ", totalTime) - fmt.Println("Average time per epoch: ", totalTime/time.Duration(len(results))) + fmt.Println("Average time per epoch: ", totalTime/time.Duration(totalTipsets)) n := 30 if len(invocs) < n {