diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go index 65fcb822e..bce68ccf9 100644 --- a/cmd/lotus-bench/rpc.go +++ b/cmd/lotus-bench/rpc.go @@ -68,6 +68,11 @@ Here are some real examples: Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage). `, }, + &cli.DurationFlag{ + Name: "watch", + Value: 0 * time.Second, + Usage: "If >0 then generates reports every N seconds (only supports linux/unix)", + }, &cli.BoolFlag{ Name: "print-response", Value: false, @@ -135,6 +140,7 @@ Here are some real examples: } rpcMethods = append(rpcMethods, &RPCMethod{ + w: os.Stdout, uri: cctx.String("endpoint"), method: entries[0], concurrency: concurrency, @@ -177,8 +183,45 @@ Here are some real examples: }(e) } + // if watch is set then print a report every N seconds + var progressCh chan bool + if cctx.Duration("watch") > 0 { + progressCh = make(chan bool, 1) + go func(progressCh chan bool) { + ticker := time.NewTicker(cctx.Duration("watch")) + for { + clearAndPrintReport := func() { + // clear the screen move the curser to the top left + fmt.Print("\033[2J") + fmt.Printf("\033[%d;%dH", 1, 1) + for i, e := range rpcMethods { + e.Report() + if i < len(rpcMethods)-1 { + fmt.Println() + } + } + } + select { + case <-ticker.C: + clearAndPrintReport() + case <-progressCh: + clearAndPrintReport() + return + } + } + }(progressCh) + } + wg.Wait() + if progressCh != nil { + // wait for the watch go routine to return + progressCh <- true + + // no need to print the report again + return nil + } + // print the report for each endpoint for i, e := range rpcMethods { e.Report() @@ -193,6 +236,7 @@ Here are some real examples: // RPCMethod handles the benchmarking of a single endpoint method. type RPCMethod struct { + w io.Writer // the endpoint uri uri string // the rpc method we want to benchmark @@ -234,7 +278,7 @@ func (rpc *RPCMethod) Run() error { rpc.stopCh = make(chan struct{}, rpc.concurrency) go func() { - rpc.reporter = NewReporter(rpc.results) + rpc.reporter = NewReporter(rpc.results, rpc.w) rpc.reporter.Run() }() @@ -376,20 +420,25 @@ func (rpc *RPCMethod) Stop() { func (rpc *RPCMethod) Report() { total := time.Since(rpc.start) - fmt.Printf("[%s]:\n", rpc.method) - fmt.Printf("- Options:\n") - fmt.Printf(" - concurrency: %d\n", rpc.concurrency) - fmt.Printf(" - params: %s\n", rpc.params) - fmt.Printf(" - qps: %d\n", rpc.qps) - rpc.reporter.Print(total) + fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method) + fmt.Fprintf(rpc.w, "- Options:\n") + fmt.Fprintf(rpc.w, " - concurrency: %d\n", rpc.concurrency) + fmt.Fprintf(rpc.w, " - params: %s\n", rpc.params) + fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps) + rpc.reporter.Print(total, rpc.w) } // Reporter reads the results from the workers through the results channel and aggregates the results. type Reporter struct { + // write the report to this writer + w io.Writer // the reporter read the results from this channel results chan *result // doneCh is used to signal that the reporter has finished reading the results (channel has closed) doneCh chan bool + + // lock protect the following fields during critical sections (if --watch was specified) + lock sync.Mutex // the latencies of all requests latencies []int64 // the number of requests that returned each status code @@ -398,8 +447,9 @@ type Reporter struct { errors map[string]int } -func NewReporter(results chan *result) *Reporter { +func NewReporter(results chan *result, w io.Writer) *Reporter { return &Reporter{ + w: w, results: results, doneCh: make(chan bool, 1), statusCodes: make(map[int]int), @@ -409,6 +459,8 @@ func NewReporter(results chan *result) *Reporter { func (r *Reporter) Run() { for res := range r.results { + r.lock.Lock() + r.latencies = append(r.latencies, res.duration.Milliseconds()) if res.statusCode != nil { @@ -425,12 +477,17 @@ func (r *Reporter) Run() { } else { r.errors["nil"]++ } + + r.lock.Unlock() } r.doneCh <- true } -func (r *Reporter) Print(elapsed time.Duration) { +func (r *Reporter) Print(elapsed time.Duration, w io.Writer) { + r.lock.Lock() + defer r.lock.Unlock() + nrReq := int64(len(r.latencies)) if nrReq == 0 { fmt.Println("No requests were made") @@ -447,16 +504,16 @@ func (r *Reporter) Print(elapsed time.Duration) { totalLatency += latency } - fmt.Printf("- Total Requests: %d\n", nrReq) - fmt.Printf("- Total Duration: %dms\n", elapsed.Milliseconds()) - fmt.Printf("- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds()) - fmt.Printf("- Avg latency: %dms\n", totalLatency/nrReq) - fmt.Printf("- Median latency: %dms\n", r.latencies[nrReq/2]) - fmt.Printf("- Latency distribution:\n") + fmt.Fprintf(w, "- Total Requests: %d\n", nrReq) + fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds()) + fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds()) + fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq) + fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2]) + fmt.Fprintf(w, "- Latency distribution:\n") percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999} for _, p := range percentiles { idx := int64(p * float64(nrReq)) - fmt.Printf(" %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx]) + fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx]) } // create a simple histogram with 10 buckets spanning the range of latency @@ -489,19 +546,19 @@ func (r *Reporter) Print(elapsed time.Duration) { } // print the histogram using a tabwriter which will align the columns nicely - fmt.Printf("- Histogram:\n") + fmt.Fprintf(w, "- Histogram:\n") const padding = 2 - w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) + tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) for i := 0; i < nrBucket; i++ { ratio := float64(buckets[i].cnt) / float64(nrReq) bars := strings.Repeat("#", int(ratio*100)) - fmt.Fprintf(w, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100)) + fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100)) } - w.Flush() //nolint:errcheck + tabWriter.Flush() //nolint:errcheck - fmt.Printf("- Status codes:\n") + fmt.Fprintf(w, "- Status codes:\n") for code, cnt := range r.statusCodes { - fmt.Printf(" [%d]: %d\n", code, cnt) + fmt.Fprintf(w, " [%d]: %d\n", code, cnt) } // print the 10 most occurring errors (in case error values are not unique) @@ -517,12 +574,12 @@ func (r *Reporter) Print(elapsed time.Duration) { sort.Slice(sortedErrors, func(i, j int) bool { return sortedErrors[i].cnt > sortedErrors[j].cnt }) - fmt.Printf("- Errors (top 10):\n") + fmt.Fprintf(w, "- Errors (top 10):\n") for i, se := range sortedErrors { if i > 10 { break } - fmt.Printf(" [%s]: %d\n", se.err, se.cnt) + fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt) } }