diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go index 5c7147356..4648aee10 100644 --- a/cmd/lotus-bench/rpc.go +++ b/cmd/lotus-bench/rpc.go @@ -98,6 +98,7 @@ Here are some real examples: } } + // check if qps was specified qps := cctx.Int("qps") if len(entries) > 2 { if len(entries[2]) > 0 { @@ -112,7 +113,25 @@ Here are some real examples: // check if params was specified params := "[]" if len(entries) > 3 { - params = entries[3] + // the params are everything after the 3rd ':' character in str. Since params can itself + // contain the ':' characters we can't just split on ':', so instead we need to locate the + // index of the 3rd ':' character and then take everything after that + occur := 0 + idx := -1 + for i := 0; i < len(str); i++ { + if str[i] == ':' { + occur++ + if occur == 3 { + idx = i + break + } + } + } + if idx == -1 { + log.Fatalf("could not parse the params from method %s", entries[0]) + } + + params = str[idx+1:] } rpcMethods = append(rpcMethods, &RPCMethod{ @@ -132,9 +151,7 @@ Here are some real examples: <-c fmt.Println("Received interrupt, stopping...") for _, method := range rpcMethods { - fmt.Println("Stopping method") method.Stop() - fmt.Println("Stopping method DONE") } }() @@ -194,11 +211,11 @@ type RPCMethod struct { start time.Time // results channel is used by the workers to send results to the reporter results chan *result - // report handles reading the results from workers and printing the report statistics - report *Report + // reporter handles reading the results from workers and printing the report statistics + reporter *Reporter } -// result is the result of a single endpoint request. +// result is the result of a single rpc method request. type result struct { err error statusCode *int @@ -213,12 +230,12 @@ func (rpc *RPCMethod) Run() error { var wg sync.WaitGroup wg.Add(rpc.concurrency) - rpc.results = make(chan *result, rpc.concurrency*10_000_000) + rpc.results = make(chan *result, rpc.concurrency*1_000) rpc.stopCh = make(chan struct{}, rpc.concurrency) go func() { - rpc.report = NewReport(rpc.results) - rpc.report.Run() + rpc.reporter = NewReporter(rpc.results) + rpc.reporter.Run() }() rpc.start = time.Now() @@ -241,7 +258,7 @@ func (rpc *RPCMethod) Run() error { close(rpc.results) // wait until the reporter is done - <-rpc.report.doneCh + <-rpc.reporter.doneCh return nil } @@ -332,10 +349,11 @@ func (rpc *RPCMethod) Report() { fmt.Printf(" - concurrency: %d\n", rpc.concurrency) fmt.Printf(" - params: %s\n", rpc.params) fmt.Printf(" - qps: %d\n", rpc.qps) - rpc.report.Print(total) + rpc.reporter.Print(total) } -type Report struct { +// Reporter reads the results from the workers through the results channel and aggregates the results. +type Reporter struct { // the reporter read the results from this channel results chan *result // doneCh is used to signal that the reporter has finished reading the results (channel has closed) @@ -348,8 +366,8 @@ type Report struct { errors map[string]int } -func NewReport(results chan *result) *Report { - return &Report{ +func NewReporter(results chan *result) *Reporter { + return &Reporter{ results: results, doneCh: make(chan bool, 1), statusCodes: make(map[int]int), @@ -357,7 +375,7 @@ func NewReport(results chan *result) *Report { } } -func (r *Report) Run() { +func (r *Reporter) Run() { for res := range r.results { r.latencies = append(r.latencies, res.duration.Milliseconds()) @@ -380,7 +398,7 @@ func (r *Report) Run() { r.doneCh <- true } -func (r *Report) Print(elapsed time.Duration) { +func (r *Reporter) Print(elapsed time.Duration) { nrReq := int64(len(r.latencies)) if nrReq == 0 { fmt.Println("No requests were made")