From 4b0ca30daf1733a05018c12e65089610d4344eb7 Mon Sep 17 00:00:00 2001
From: Fridrik Asmundsson <fridrik01@gmail.com>
Date: Wed, 26 Apr 2023 18:51:23 +0000
Subject: [PATCH] Add --watch option to see progress while benchmark is running

---
 cmd/lotus-bench/rpc.go | 105 +++++++++++++++++++++++++++++++----------
 1 file changed, 81 insertions(+), 24 deletions(-)

diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go
index 65fcb822e..bce68ccf9 100644
--- a/cmd/lotus-bench/rpc.go
+++ b/cmd/lotus-bench/rpc.go
@@ -68,6 +68,11 @@ Here are some real examples:
 			Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage).
 `,
 		},
+		&cli.DurationFlag{
+			Name:  "watch",
+			Value: 0 * time.Second,
+			Usage: "If >0 then generates reports every N seconds (only supports linux/unix)",
+		},
 		&cli.BoolFlag{
 			Name:  "print-response",
 			Value: false,
@@ -135,6 +140,7 @@ Here are some real examples:
 			}
 
 			rpcMethods = append(rpcMethods, &RPCMethod{
+				w:           os.Stdout,
 				uri:         cctx.String("endpoint"),
 				method:      entries[0],
 				concurrency: concurrency,
@@ -177,8 +183,45 @@ Here are some real examples:
 			}(e)
 		}
 
+		// if watch is set then print a report every N seconds
+		var progressCh chan bool
+		if cctx.Duration("watch") > 0 {
+			progressCh = make(chan bool, 1)
+			go func(progressCh chan bool) {
+				ticker := time.NewTicker(cctx.Duration("watch"))
+				for {
+					clearAndPrintReport := func() {
+						// clear the screen move the curser to the top left
+						fmt.Print("\033[2J")
+						fmt.Printf("\033[%d;%dH", 1, 1)
+						for i, e := range rpcMethods {
+							e.Report()
+							if i < len(rpcMethods)-1 {
+								fmt.Println()
+							}
+						}
+					}
+					select {
+					case <-ticker.C:
+						clearAndPrintReport()
+					case <-progressCh:
+						clearAndPrintReport()
+						return
+					}
+				}
+			}(progressCh)
+		}
+
 		wg.Wait()
 
+		if progressCh != nil {
+			// wait for the watch go routine to return
+			progressCh <- true
+
+			// no need to print the report again
+			return nil
+		}
+
 		// print the report for each endpoint
 		for i, e := range rpcMethods {
 			e.Report()
@@ -193,6 +236,7 @@ Here are some real examples:
 
 // RPCMethod handles the benchmarking of a single endpoint method.
 type RPCMethod struct {
+	w io.Writer
 	// the endpoint uri
 	uri string
 	// the rpc method we want to benchmark
@@ -234,7 +278,7 @@ func (rpc *RPCMethod) Run() error {
 	rpc.stopCh = make(chan struct{}, rpc.concurrency)
 
 	go func() {
-		rpc.reporter = NewReporter(rpc.results)
+		rpc.reporter = NewReporter(rpc.results, rpc.w)
 		rpc.reporter.Run()
 	}()
 
@@ -376,20 +420,25 @@ func (rpc *RPCMethod) Stop() {
 
 func (rpc *RPCMethod) Report() {
 	total := time.Since(rpc.start)
-	fmt.Printf("[%s]:\n", rpc.method)
-	fmt.Printf("- Options:\n")
-	fmt.Printf("  - concurrency: %d\n", rpc.concurrency)
-	fmt.Printf("  - params: %s\n", rpc.params)
-	fmt.Printf("  - qps: %d\n", rpc.qps)
-	rpc.reporter.Print(total)
+	fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method)
+	fmt.Fprintf(rpc.w, "- Options:\n")
+	fmt.Fprintf(rpc.w, "  - concurrency: %d\n", rpc.concurrency)
+	fmt.Fprintf(rpc.w, "  - params: %s\n", rpc.params)
+	fmt.Fprintf(rpc.w, "  - qps: %d\n", rpc.qps)
+	rpc.reporter.Print(total, rpc.w)
 }
 
 // Reporter reads the results from the workers through the results channel and aggregates the results.
 type Reporter struct {
+	// write the report to this writer
+	w io.Writer
 	// the reporter read the results from this channel
 	results chan *result
 	// doneCh is used to signal that the reporter has finished reading the results (channel has closed)
 	doneCh chan bool
+
+	// lock protect the following fields during critical sections (if --watch was specified)
+	lock sync.Mutex
 	// the latencies of all requests
 	latencies []int64
 	// the number of requests that returned each status code
@@ -398,8 +447,9 @@ type Reporter struct {
 	errors map[string]int
 }
 
-func NewReporter(results chan *result) *Reporter {
+func NewReporter(results chan *result, w io.Writer) *Reporter {
 	return &Reporter{
+		w:           w,
 		results:     results,
 		doneCh:      make(chan bool, 1),
 		statusCodes: make(map[int]int),
@@ -409,6 +459,8 @@ func NewReporter(results chan *result) *Reporter {
 
 func (r *Reporter) Run() {
 	for res := range r.results {
+		r.lock.Lock()
+
 		r.latencies = append(r.latencies, res.duration.Milliseconds())
 
 		if res.statusCode != nil {
@@ -425,12 +477,17 @@ func (r *Reporter) Run() {
 		} else {
 			r.errors["nil"]++
 		}
+
+		r.lock.Unlock()
 	}
 
 	r.doneCh <- true
 }
 
-func (r *Reporter) Print(elapsed time.Duration) {
+func (r *Reporter) Print(elapsed time.Duration, w io.Writer) {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
 	nrReq := int64(len(r.latencies))
 	if nrReq == 0 {
 		fmt.Println("No requests were made")
@@ -447,16 +504,16 @@ func (r *Reporter) Print(elapsed time.Duration) {
 		totalLatency += latency
 	}
 
-	fmt.Printf("- Total Requests: %d\n", nrReq)
-	fmt.Printf("- Total Duration: %dms\n", elapsed.Milliseconds())
-	fmt.Printf("- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
-	fmt.Printf("- Avg latency: %dms\n", totalLatency/nrReq)
-	fmt.Printf("- Median latency: %dms\n", r.latencies[nrReq/2])
-	fmt.Printf("- Latency distribution:\n")
+	fmt.Fprintf(w, "- Total Requests: %d\n", nrReq)
+	fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds())
+	fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
+	fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq)
+	fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2])
+	fmt.Fprintf(w, "- Latency distribution:\n")
 	percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999}
 	for _, p := range percentiles {
 		idx := int64(p * float64(nrReq))
-		fmt.Printf("    %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
+		fmt.Fprintf(w, "    %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
 	}
 
 	// create a simple histogram with 10 buckets spanning the range of latency
@@ -489,19 +546,19 @@ func (r *Reporter) Print(elapsed time.Duration) {
 	}
 
 	// print the histogram using a tabwriter which will align the columns nicely
-	fmt.Printf("- Histogram:\n")
+	fmt.Fprintf(w, "- Histogram:\n")
 	const padding = 2
-	w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
+	tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
 	for i := 0; i < nrBucket; i++ {
 		ratio := float64(buckets[i].cnt) / float64(nrReq)
 		bars := strings.Repeat("#", int(ratio*100))
-		fmt.Fprintf(w, "  %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
+		fmt.Fprintf(tabWriter, "  %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
 	}
-	w.Flush() //nolint:errcheck
+	tabWriter.Flush() //nolint:errcheck
 
-	fmt.Printf("- Status codes:\n")
+	fmt.Fprintf(w, "- Status codes:\n")
 	for code, cnt := range r.statusCodes {
-		fmt.Printf("    [%d]: %d\n", code, cnt)
+		fmt.Fprintf(w, "    [%d]: %d\n", code, cnt)
 	}
 
 	// print the 10 most occurring errors (in case error values are not unique)
@@ -517,12 +574,12 @@ func (r *Reporter) Print(elapsed time.Duration) {
 	sort.Slice(sortedErrors, func(i, j int) bool {
 		return sortedErrors[i].cnt > sortedErrors[j].cnt
 	})
-	fmt.Printf("- Errors (top 10):\n")
+	fmt.Fprintf(w, "- Errors (top 10):\n")
 	for i, se := range sortedErrors {
 		if i > 10 {
 			break
 		}
-		fmt.Printf("    [%s]: %d\n", se.err, se.cnt)
+		fmt.Fprintf(w, "    [%s]: %d\n", se.err, se.cnt)
 	}
 }