Add --watch option to see progress while benchmark is running

This commit is contained in:
Fridrik Asmundsson 2023-04-26 18:51:23 +00:00
parent dcc72a43ee
commit 4b0ca30daf

View File

@ -68,6 +68,11 @@ Here are some real examples:
Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage).
`,
},
&cli.DurationFlag{
Name: "watch",
Value: 0 * time.Second,
Usage: "If >0 then generates reports every N seconds (only supports linux/unix)",
},
&cli.BoolFlag{
Name: "print-response",
Value: false,
@ -135,6 +140,7 @@ Here are some real examples:
}
rpcMethods = append(rpcMethods, &RPCMethod{
w: os.Stdout,
uri: cctx.String("endpoint"),
method: entries[0],
concurrency: concurrency,
@ -177,8 +183,45 @@ Here are some real examples:
}(e)
}
// if watch is set then print a report every N seconds
var progressCh chan bool
if cctx.Duration("watch") > 0 {
progressCh = make(chan bool, 1)
go func(progressCh chan bool) {
ticker := time.NewTicker(cctx.Duration("watch"))
for {
clearAndPrintReport := func() {
// clear the screen move the curser to the top left
fmt.Print("\033[2J")
fmt.Printf("\033[%d;%dH", 1, 1)
for i, e := range rpcMethods {
e.Report()
if i < len(rpcMethods)-1 {
fmt.Println()
}
}
}
select {
case <-ticker.C:
clearAndPrintReport()
case <-progressCh:
clearAndPrintReport()
return
}
}
}(progressCh)
}
wg.Wait()
if progressCh != nil {
// wait for the watch go routine to return
progressCh <- true
// no need to print the report again
return nil
}
// print the report for each endpoint
for i, e := range rpcMethods {
e.Report()
@ -193,6 +236,7 @@ Here are some real examples:
// RPCMethod handles the benchmarking of a single endpoint method.
type RPCMethod struct {
w io.Writer
// the endpoint uri
uri string
// the rpc method we want to benchmark
@ -234,7 +278,7 @@ func (rpc *RPCMethod) Run() error {
rpc.stopCh = make(chan struct{}, rpc.concurrency)
go func() {
rpc.reporter = NewReporter(rpc.results)
rpc.reporter = NewReporter(rpc.results, rpc.w)
rpc.reporter.Run()
}()
@ -376,20 +420,25 @@ func (rpc *RPCMethod) Stop() {
func (rpc *RPCMethod) Report() {
total := time.Since(rpc.start)
fmt.Printf("[%s]:\n", rpc.method)
fmt.Printf("- Options:\n")
fmt.Printf(" - concurrency: %d\n", rpc.concurrency)
fmt.Printf(" - params: %s\n", rpc.params)
fmt.Printf(" - qps: %d\n", rpc.qps)
rpc.reporter.Print(total)
fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method)
fmt.Fprintf(rpc.w, "- Options:\n")
fmt.Fprintf(rpc.w, " - concurrency: %d\n", rpc.concurrency)
fmt.Fprintf(rpc.w, " - params: %s\n", rpc.params)
fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps)
rpc.reporter.Print(total, rpc.w)
}
// Reporter reads the results from the workers through the results channel and aggregates the results.
type Reporter struct {
// write the report to this writer
w io.Writer
// the reporter read the results from this channel
results chan *result
// doneCh is used to signal that the reporter has finished reading the results (channel has closed)
doneCh chan bool
// lock protect the following fields during critical sections (if --watch was specified)
lock sync.Mutex
// the latencies of all requests
latencies []int64
// the number of requests that returned each status code
@ -398,8 +447,9 @@ type Reporter struct {
errors map[string]int
}
func NewReporter(results chan *result) *Reporter {
func NewReporter(results chan *result, w io.Writer) *Reporter {
return &Reporter{
w: w,
results: results,
doneCh: make(chan bool, 1),
statusCodes: make(map[int]int),
@ -409,6 +459,8 @@ func NewReporter(results chan *result) *Reporter {
func (r *Reporter) Run() {
for res := range r.results {
r.lock.Lock()
r.latencies = append(r.latencies, res.duration.Milliseconds())
if res.statusCode != nil {
@ -425,12 +477,17 @@ func (r *Reporter) Run() {
} else {
r.errors["nil"]++
}
r.lock.Unlock()
}
r.doneCh <- true
}
func (r *Reporter) Print(elapsed time.Duration) {
func (r *Reporter) Print(elapsed time.Duration, w io.Writer) {
r.lock.Lock()
defer r.lock.Unlock()
nrReq := int64(len(r.latencies))
if nrReq == 0 {
fmt.Println("No requests were made")
@ -447,16 +504,16 @@ func (r *Reporter) Print(elapsed time.Duration) {
totalLatency += latency
}
fmt.Printf("- Total Requests: %d\n", nrReq)
fmt.Printf("- Total Duration: %dms\n", elapsed.Milliseconds())
fmt.Printf("- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
fmt.Printf("- Avg latency: %dms\n", totalLatency/nrReq)
fmt.Printf("- Median latency: %dms\n", r.latencies[nrReq/2])
fmt.Printf("- Latency distribution:\n")
fmt.Fprintf(w, "- Total Requests: %d\n", nrReq)
fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds())
fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq)
fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2])
fmt.Fprintf(w, "- Latency distribution:\n")
percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999}
for _, p := range percentiles {
idx := int64(p * float64(nrReq))
fmt.Printf(" %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
}
// create a simple histogram with 10 buckets spanning the range of latency
@ -489,19 +546,19 @@ func (r *Reporter) Print(elapsed time.Duration) {
}
// print the histogram using a tabwriter which will align the columns nicely
fmt.Printf("- Histogram:\n")
fmt.Fprintf(w, "- Histogram:\n")
const padding = 2
w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
for i := 0; i < nrBucket; i++ {
ratio := float64(buckets[i].cnt) / float64(nrReq)
bars := strings.Repeat("#", int(ratio*100))
fmt.Fprintf(w, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
}
w.Flush() //nolint:errcheck
tabWriter.Flush() //nolint:errcheck
fmt.Printf("- Status codes:\n")
fmt.Fprintf(w, "- Status codes:\n")
for code, cnt := range r.statusCodes {
fmt.Printf(" [%d]: %d\n", code, cnt)
fmt.Fprintf(w, " [%d]: %d\n", code, cnt)
}
// print the 10 most occurring errors (in case error values are not unique)
@ -517,12 +574,12 @@ func (r *Reporter) Print(elapsed time.Duration) {
sort.Slice(sortedErrors, func(i, j int) bool {
return sortedErrors[i].cnt > sortedErrors[j].cnt
})
fmt.Printf("- Errors (top 10):\n")
fmt.Fprintf(w, "- Errors (top 10):\n")
for i, se := range sortedErrors {
if i > 10 {
break
}
fmt.Printf(" [%s]: %d\n", se.err, se.cnt)
fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt)
}
}