From 78d7ccd3913c351a64b7d3a0dd1d8299e5818ab6 Mon Sep 17 00:00:00 2001
From: Fridrik Asmundsson <fridrik01@gmail.com>
Date: Tue, 25 Apr 2023 15:42:18 +0000
Subject: [PATCH] Add new RPC stress testing tool (lotus-bench rpc) with rich
 reporting

This benchmark is designed to stress test the rpc methods of a lotus node so that we can simulate real world usage and measure the performance of rpc methods on the node.

This benchmark has the following features:
* Can query each method both sequentially and concurrently
* Supports rate limiting
* Can query multiple different endpoints at once (supporting different concurrency level and rate limiting for each method)
* Gives a nice reporting summary of the stress testing of each method (including latency distribution, histogram and more)
* Easy to use

To use this benchmark you must specify the rpc methods you want to test using the --method options, the format of it is:

  --method=NAME[:CONCURRENCY][:QPS][:PARAMS] where only METHOD is required.

Here are some real examples:
  lotus-bench rpc --method='eth_chainId' // run eth_chainId with default concurrency and qps
  lotus-bench rpc --method='eth_chainId:3'  // override concurrency to 3
  lotus-bench rpc --method='eth_chainId::100' // override to 100 qps while using default concurrency
  lotus-bench rpc --method='eth_chainId:3:100' // run using 3 workers but limit to 100 qps
  lotus-bench rpc --method='eth_getTransactionCount:::["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run using optional params while using default concurrency and qps
  lotus-bench rpc --method='eth_chainId' --method='eth_getTransactionCount:10:0:["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run multiple methods at once`,

Fixes: https://github.com/filecoin-project/lotus/issues/10752
---
 cmd/lotus-bench/main.go |   2 +
 cmd/lotus-bench/rpc.go  | 485 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 487 insertions(+)
 create mode 100644 cmd/lotus-bench/rpc.go

diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go
index 12d310b65..74d30115e 100644
--- a/cmd/lotus-bench/main.go
+++ b/cmd/lotus-bench/main.go
@@ -106,7 +106,9 @@ func main() {
 			sealBenchCmd,
 			simpleCmd,
 			importBenchCmd,
+			rpcCmd,
 		},
+		DisableSliceFlagSeparator: true,
 	}
 
 	if err := app.Run(os.Args); err != nil {
diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go
new file mode 100644
index 000000000..5c7147356
--- /dev/null
+++ b/cmd/lotus-bench/rpc.go
@@ -0,0 +1,485 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"os/signal"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"text/tabwriter"
+	"time"
+
+	"github.com/urfave/cli/v2"
+)
+
+var rpcCmd = &cli.Command{
+	Name:  "rpc",
+	Usage: "Runs a concurrent stress test on one or more rpc methods and prints the performance metrics including latency distribution and histogram",
+	Description: `This benchmark is designed to stress test the rpc methods of a lotus node so that we can simulate real world usage and measure the performance of rpc methods on the node.
+	
+This benchmark has the following features:
+* Can query each method both sequentially and concurrently
+* Supports rate limiting
+* Can query multiple different endpoints at once (supporting different concurrency level and rate limiting for each method)
+* Gives a nice reporting summary of the stress testing of each method (including latency distribution, histogram and more)
+* Easy to use
+
+To use this benchmark you must specify the rpc methods you want to test using the --method options, the format of it is:
+
+  --method=NAME[:CONCURRENCY][:QPS][:PARAMS] where only METHOD is required.
+
+Here are some real examples:
+  lotus-bench rpc --method='eth_chainId' // run eth_chainId with default concurrency and qps
+  lotus-bench rpc --method='eth_chainId:3'  // override concurrency to 3
+  lotus-bench rpc --method='eth_chainId::100' // override to 100 qps while using default concurrency
+  lotus-bench rpc --method='eth_chainId:3:100' // run using 3 workers but limit to 100 qps
+  lotus-bench rpc --method='eth_getTransactionCount:::["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run using optional params while using default concurrency and qps
+  lotus-bench rpc --method='eth_chainId' --method='eth_getTransactionCount:10:0:["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run multiple methods at once`,
+	Flags: []cli.Flag{
+		&cli.StringFlag{
+			Name:  "endpoint",
+			Value: "http://127.0.0.1:1234/rpc/v1",
+			Usage: "The rpc endpoint to benchmark",
+		},
+		&cli.DurationFlag{
+			Name:  "duration",
+			Value: 60 * time.Second,
+			Usage: "Duration of benchmark in seconds",
+		},
+		&cli.IntFlag{
+			Name:  "concurrency",
+			Value: 10,
+			Usage: "How many workers should be used per rpc method (can be overridden per method)",
+		},
+		&cli.IntFlag{
+			Name:  "qps",
+			Value: 0,
+			Usage: "How many requests per second should be sent per rpc method (can be overridden per method), a value of 0 means no limit",
+		},
+		&cli.StringSliceFlag{
+			Name: "method",
+			Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage).
+`,
+		},
+		&cli.BoolFlag{
+			Name:  "print-response",
+			Value: false,
+			Usage: "print the response of each request",
+		},
+	},
+	Action: func(cctx *cli.Context) error {
+		if len(cctx.StringSlice("method")) == 0 {
+			return errors.New("you must specify and least one method to benchmark")
+		}
+
+		var rpcMethods []*RPCMethod
+		for _, str := range cctx.StringSlice("method") {
+			entries := strings.Split(str, ":")
+			if len(entries) == 0 {
+				return errors.New("invalid method format")
+			}
+
+			// check if concurrency was specified
+			concurrency := cctx.Int("concurrency")
+			if len(entries) > 1 {
+				if len(entries[1]) > 0 {
+					var err error
+					concurrency, err = strconv.Atoi(entries[1])
+					if err != nil {
+						return fmt.Errorf("could not parse concurrency value from method %s: %v", entries[0], err)
+					}
+				}
+			}
+
+			qps := cctx.Int("qps")
+			if len(entries) > 2 {
+				if len(entries[2]) > 0 {
+					var err error
+					qps, err = strconv.Atoi(entries[2])
+					if err != nil {
+						return fmt.Errorf("could not parse qps value from method %s: %v", entries[0], err)
+					}
+				}
+			}
+
+			// check if params was specified
+			params := "[]"
+			if len(entries) > 3 {
+				params = entries[3]
+			}
+
+			rpcMethods = append(rpcMethods, &RPCMethod{
+				uri:         cctx.String("endpoint"),
+				method:      entries[0],
+				concurrency: concurrency,
+				qps:         qps,
+				params:      params,
+				printResp:   cctx.Bool("print-response"),
+			})
+		}
+
+		// terminate early on ctrl+c
+		c := make(chan os.Signal, 1)
+		signal.Notify(c, os.Interrupt)
+		go func() {
+			<-c
+			fmt.Println("Received interrupt, stopping...")
+			for _, method := range rpcMethods {
+				fmt.Println("Stopping method")
+				method.Stop()
+				fmt.Println("Stopping method DONE")
+			}
+		}()
+
+		// stop all threads after duration
+		go func() {
+			time.Sleep(cctx.Duration("duration"))
+			for _, e := range rpcMethods {
+				e.Stop()
+			}
+		}()
+
+		// start all threads
+		var wg sync.WaitGroup
+		wg.Add(len(rpcMethods))
+
+		for _, e := range rpcMethods {
+			go func(e *RPCMethod) {
+				err := e.Run()
+				if err != nil {
+					fmt.Printf("error running rpc method: %v\n", err)
+				}
+				wg.Done()
+			}(e)
+		}
+
+		wg.Wait()
+
+		// print the report for each endpoint
+		for i, e := range rpcMethods {
+			e.Report()
+			if i < len(rpcMethods)-1 {
+				fmt.Println()
+			}
+		}
+
+		return nil
+	},
+}
+
+// RPCMethod handles the benchmarking of a single endpoint method.
+type RPCMethod struct {
+	// the endpoint uri
+	uri string
+	// the rpc method we want to benchmark
+	method string
+	// the number of concurrent requests to make to this endpoint
+	concurrency int
+	// if >0 then limit to qps is the max number of requests per second to make to this endpoint (0 = no limit)
+	qps int
+	// many endpoints require specific parameters to be passed
+	params string
+	// whether or not to print the response of each request (useful for debugging)
+	printResp bool
+	// instruct the worker go routines to stop
+	stopCh chan struct{}
+	// when the endpoint bencharking started
+	start time.Time
+	// results channel is used by the workers to send results to the reporter
+	results chan *result
+	// report handles reading the results from workers and printing the report statistics
+	report *Report
+}
+
+// result is the result of a single endpoint request.
+type result struct {
+	err        error
+	statusCode *int
+	duration   time.Duration
+}
+
+func (rpc *RPCMethod) Run() error {
+	client := &http.Client{
+		Timeout: 0,
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(rpc.concurrency)
+
+	rpc.results = make(chan *result, rpc.concurrency*10_000_000)
+	rpc.stopCh = make(chan struct{}, rpc.concurrency)
+
+	go func() {
+		rpc.report = NewReport(rpc.results)
+		rpc.report.Run()
+	}()
+
+	rpc.start = time.Now()
+
+	// throttle the number of requests per second
+	var qpsTicker *time.Ticker
+	if rpc.qps > 0 {
+		qpsTicker = time.NewTicker(time.Second / time.Duration(rpc.qps))
+	}
+
+	for i := 0; i < rpc.concurrency; i++ {
+		go func() {
+			rpc.startWorker(client, qpsTicker)
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+
+	// close the results channel so reporter will stop
+	close(rpc.results)
+
+	// wait until the reporter is done
+	<-rpc.report.doneCh
+
+	return nil
+}
+
+func (rpc *RPCMethod) startWorker(client *http.Client, qpsTicker *time.Ticker) {
+	for {
+		// check if we should stop
+		select {
+		case <-rpc.stopCh:
+			return
+		default:
+		}
+
+		// wait for the next tick if we are rate limiting this endpoint
+		if qpsTicker != nil {
+			<-qpsTicker.C
+		}
+
+		req, err := rpc.buildRequest()
+		if err != nil {
+			log.Fatalln(err)
+		}
+
+		start := time.Now()
+
+		var statusCode *int
+
+		// send request the endpoint
+		resp, err := client.Do(req)
+		if err == nil {
+			statusCode = &resp.StatusCode
+			if rpc.printResp {
+				b, err := io.ReadAll(resp.Body)
+				if err != nil {
+					log.Fatalln(err)
+				}
+				fmt.Printf("[%s] %s", rpc.method, string(b))
+			} else {
+				io.Copy(io.Discard, resp.Body) //nolint:errcheck
+			}
+			resp.Body.Close() //nolint:errcheck
+		}
+
+		rpc.results <- &result{
+			statusCode: statusCode,
+			err:        err,
+			duration:   time.Since(start),
+		}
+	}
+}
+
+func (rpc *RPCMethod) buildRequest() (*http.Request, error) {
+	jreq, err := json.Marshal(struct {
+		Jsonrpc string          `json:"jsonrpc"`
+		ID      int             `json:"id"`
+		Method  string          `json:"method"`
+		Params  json.RawMessage `json:"params"`
+	}{
+		Jsonrpc: "2.0",
+		Method:  rpc.method,
+		Params:  json.RawMessage(rpc.params),
+		ID:      0,
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	req, err := http.NewRequest("POST", rpc.uri, bytes.NewReader(jreq))
+	if err != nil {
+		return nil, err
+	}
+
+	req.Header.Set("Accept", "application/json")
+
+	return req, nil
+}
+
+func (rpc *RPCMethod) Stop() {
+	for i := 0; i < rpc.concurrency; i++ {
+		rpc.stopCh <- struct{}{}
+	}
+}
+
+func (rpc *RPCMethod) Report() {
+	total := time.Since(rpc.start)
+	fmt.Printf("[%s]:\n", rpc.method)
+	fmt.Printf("- Options:\n")
+	fmt.Printf("  - concurrency: %d\n", rpc.concurrency)
+	fmt.Printf("  - params: %s\n", rpc.params)
+	fmt.Printf("  - qps: %d\n", rpc.qps)
+	rpc.report.Print(total)
+}
+
+type Report struct {
+	// the reporter read the results from this channel
+	results chan *result
+	// doneCh is used to signal that the reporter has finished reading the results (channel has closed)
+	doneCh chan bool
+	// the latencies of all requests
+	latencies []int64
+	// the number of requests that returned each status code
+	statusCodes map[int]int
+	// the number of errors that occurred
+	errors map[string]int
+}
+
+func NewReport(results chan *result) *Report {
+	return &Report{
+		results:     results,
+		doneCh:      make(chan bool, 1),
+		statusCodes: make(map[int]int),
+		errors:      make(map[string]int),
+	}
+}
+
+func (r *Report) Run() {
+	for res := range r.results {
+		r.latencies = append(r.latencies, res.duration.Milliseconds())
+
+		if res.statusCode != nil {
+			r.statusCodes[*res.statusCode]++
+		}
+
+		if res.err != nil {
+			if len(r.errors) < 1_000_000 {
+				r.errors[res.err.Error()]++
+			} else {
+				// we don't want to store too many errors in memory
+				r.errors["hidden"]++
+			}
+		} else {
+			r.errors["nil"]++
+		}
+	}
+
+	r.doneCh <- true
+}
+
+func (r *Report) Print(elapsed time.Duration) {
+	nrReq := int64(len(r.latencies))
+	if nrReq == 0 {
+		fmt.Println("No requests were made")
+		return
+	}
+
+	// we need to sort the latencies slice to calculate the percentiles
+	sort.Slice(r.latencies, func(i, j int) bool {
+		return r.latencies[i] < r.latencies[j]
+	})
+
+	var totalLatency int64 = 0
+	for _, latency := range r.latencies {
+		totalLatency += latency
+	}
+
+	fmt.Printf("- Total Requests: %d\n", nrReq)
+	fmt.Printf("- Total Duration: %dms\n", elapsed.Milliseconds())
+	fmt.Printf("- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
+	fmt.Printf("- Avg latency: %dms\n", totalLatency/nrReq)
+	fmt.Printf("- Median latency: %dms\n", r.latencies[nrReq/2])
+	fmt.Printf("- Latency distribution:\n")
+	percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999}
+	for _, p := range percentiles {
+		idx := int64(p * float64(nrReq))
+		fmt.Printf("    %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
+	}
+
+	// create a simple histogram with 10 buckets spanning the range of latency
+	// into equal ranges
+	//
+	nrBucket := 10
+	buckets := make([]Bucket, nrBucket)
+	latencyRange := r.latencies[len(r.latencies)-1]
+	bucketRange := latencyRange / int64(nrBucket)
+
+	// mark the end of each bucket
+	for i := 0; i < nrBucket; i++ {
+		buckets[i].start = int64(i) * bucketRange
+		buckets[i].end = buckets[i].start + bucketRange
+		// extend the last bucked by any remaning range caused by the integer division
+		if i == nrBucket-1 {
+			buckets[i].end = latencyRange
+		}
+	}
+
+	// count the number of requests in each bucket
+	currBucket := 0
+	for i := 0; i < len(r.latencies); {
+		if r.latencies[i] <= buckets[currBucket].end {
+			buckets[currBucket].cnt++
+			i++
+		} else {
+			currBucket++
+		}
+	}
+
+	// print the histogram using a tabwriter which will align the columns nicely
+	fmt.Printf("- Histogram:\n")
+	const padding = 2
+	w := tabwriter.NewWriter(os.Stdout, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
+	for i := 0; i < nrBucket; i++ {
+		ratio := float64(buckets[i].cnt) / float64(nrReq)
+		bars := strings.Repeat("#", int(ratio*100))
+		fmt.Fprintf(w, "  %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
+	}
+	w.Flush() //nolint:errcheck
+
+	fmt.Printf("- Status codes:\n")
+	for code, cnt := range r.statusCodes {
+		fmt.Printf("    [%d]: %d\n", code, cnt)
+	}
+
+	// print the 10 most occurring errors (in case error values are not unique)
+	//
+	type kv struct {
+		err string
+		cnt int
+	}
+	var sortedErrors []kv
+	for err, cnt := range r.errors {
+		sortedErrors = append(sortedErrors, kv{err, cnt})
+	}
+	sort.Slice(sortedErrors, func(i, j int) bool {
+		return sortedErrors[i].cnt > sortedErrors[j].cnt
+	})
+	fmt.Printf("- Errors (top 10):\n")
+	for i, se := range sortedErrors {
+		if i > 10 {
+			break
+		}
+		fmt.Printf("    [%s]: %d\n", se.err, se.cnt)
+	}
+}
+
+type Bucket struct {
+	start int64
+	// the end value of the bucket
+	end int64
+	// how many entries are in the bucket
+	cnt int
+}