Decode in workers

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-06-26 23:38:06 +02:00
parent 3134fcb54a
commit 9712bbc854
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA

View File

@ -1,6 +1,7 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
@ -300,10 +301,9 @@ var importAnalyzeCmd = &cli.Command{
if err != nil {
return err
}
dec := json.NewDecoder(fi)
const nWorkers = 16
tseIn := make(chan TipSetExec, 2*nWorkers)
jsonIn := make(chan []byte, 2*nWorkers)
type result struct {
totalTime time.Duration
chargeStats map[string]*stats
@ -316,11 +316,15 @@ var importAnalyzeCmd = &cli.Command{
go func() {
chargeStats := make(map[string]*stats)
var totalTime time.Duration
var expensiveInvocs []Invocation
const invocsKeep = 32
var expensiveInvocs = make([]Invocation, 0, 8*invocsKeep)
var leastExpensiveInvoc = time.Duration(0)
for {
tse, ok := <-tseIn
b, ok := <-jsonIn
var tse TipSetExec
json.Unmarshal(b, &tse)
if !ok {
results <- result{
totalTime: totalTime,
@ -341,10 +345,11 @@ var importAnalyzeCmd = &cli.Command{
tallyGasCharges(chargeStats, &inv.ExecutionTrace)
}
sort.Slice(expensiveInvocs, func(i, j int) bool {
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
})
if len(expensiveInvocs) != 0 {
if len(expensiveInvocs) > 4*invocsKeep {
sort.Slice(expensiveInvocs, func(i, j int) bool {
log.Warnf("i: %v, j: %v", expensiveInvocs[i], expensiveInvocs[j])
return expensiveInvocs[i].Invoc.Duration > expensiveInvocs[j].Invoc.Duration
})
leastExpensiveInvoc = expensiveInvocs[len(expensiveInvocs)-1].Invoc.Duration
n := 30
if len(expensiveInvocs) < n {
@ -357,24 +362,23 @@ var importAnalyzeCmd = &cli.Command{
}
var totalTipsets int64
reader := bufio.NewReader(fi)
for {
var tse TipSetExec
if err := dec.Decode(&tse); err != nil {
if err != io.EOF {
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
}
return err
b, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
}
break
return err
}
totalTipsets++
tseIn <- tse
if totalTipsets%10 == 0 {
fmt.Printf("\rProcessed %d tipsets", totalTipsets)
jsonIn <- b
fmt.Printf("\rProcessed %d tipsets", totalTipsets)
if err == io.EOF {
break
}
}
close(tseIn)
close(jsonIn)
fmt.Printf("\n")
fmt.Printf("Collecting results\n")