diff --git a/pkg/service.go b/pkg/service.go index 2725e60..11d0264 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -117,42 +117,68 @@ func (sds *Service) APIs() []rpc.API { } } +func segmentRange(workers, start, stop uint64, params sd.Params) []RangeRequest { + segmentSize := ((stop - start) + 1) / workers + remainder := ((stop - start) + 1) % workers + numOfSegments := workers + if remainder > 0 { + numOfSegments++ + } + segments := make([]RangeRequest, numOfSegments) + for i := range segments { + end := start + segmentSize - 1 + if end > stop { + end = stop + } + segments[i] = RangeRequest{start, end, params} + start = end + 1 + } + return segments +} + // Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { for _, preRun := range sds.preruns { + // if the rangeSize is smaller than the number of workers + // make sure we do synchronous processing to avoid quantization issues + rangeSize := (preRun.Stop - preRun.Start) + 1 + numWorkers := uint64(sds.workers) + if rangeSize < numWorkers { + parallel = false + } if parallel { - // Chunk overall range into N subranges for workers - chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers) logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop, - preRun.Stop-preRun.Start+1, chunkSize, sds.workers) - // Sanity floor the chunk size - if chunkSize < 100 { - chunkSize = 100 - logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100") - } + rangeSize, rangeSize/numWorkers, numWorkers) + workChan := make(chan RangeRequest) + quitChan := make(chan struct{}) + // spin up numWorkers number of worker goroutines wg := new(sync.WaitGroup) - for i := 0; i < int(sds.workers); i++ { - blockRange := RangeRequest{ - Start: preRun.Start + uint64(i)*chunkSize, - Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1, - Params: preRun.Params, - } - // TODO(hack) this fixes quantization - if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize { - blockRange.Stop = preRun.Stop - } + for i := 0; i < int(numWorkers); i++ { wg.Add(1) go func(id int) { defer wg.Done() - logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) - for j := blockRange.Start; j <= blockRange.Stop; j++ { - if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { - logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err) + for { + select { + case workerSegment := <-workChan: + for j := workerSegment.Start; j <= workerSegment.Stop; j++ { + if err := sds.WriteStateDiffAt(j, workerSegment.Params); err != nil { + logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", id, workerSegment.Start, workerSegment.Stop, err) + } + } + logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, workerSegment.Start, workerSegment.Stop) + case <-quitChan: + return } } - logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) }(i) } + // break range up into segments + segments := segmentRange(numWorkers, preRun.Start, preRun.Stop, preRun.Params) + // send the segments to the work channel + for _, segment := range segments { + workChan <- segment + } + close(quitChan) wg.Wait() } else { logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop)