Merge pull request #112 from cerc-io/ian_test_3

Avoid quantization problems during segmentation and use worker chan to spread work across set # of workers
This commit is contained in:
Ian Norden 2023-03-20 07:53:09 -05:00 committed by GitHub
commit 253b1087bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -117,42 +117,68 @@ func (sds *Service) APIs() []rpc.API {
} }
} }
func segmentRange(workers, start, stop uint64, params sd.Params) []RangeRequest {
segmentSize := ((stop - start) + 1) / workers
remainder := ((stop - start) + 1) % workers
numOfSegments := workers
if remainder > 0 {
numOfSegments++
}
segments := make([]RangeRequest, numOfSegments)
for i := range segments {
end := start + segmentSize - 1
if end > stop {
end = stop
}
segments[i] = RangeRequest{start, end, params}
start = end + 1
}
return segments
}
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards // Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { func (sds *Service) Run(rngs []RangeRequest, parallel bool) error {
for _, preRun := range sds.preruns { for _, preRun := range sds.preruns {
// if the rangeSize is smaller than the number of workers
// make sure we do synchronous processing to avoid quantization issues
rangeSize := (preRun.Stop - preRun.Start) + 1
numWorkers := uint64(sds.workers)
if rangeSize < numWorkers {
parallel = false
}
if parallel { if parallel {
// Chunk overall range into N subranges for workers
chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers)
logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop, logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop,
preRun.Stop-preRun.Start+1, chunkSize, sds.workers) rangeSize, rangeSize/numWorkers, numWorkers)
// Sanity floor the chunk size workChan := make(chan RangeRequest)
if chunkSize < 100 { quitChan := make(chan struct{})
chunkSize = 100 // spin up numWorkers number of worker goroutines
logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100")
}
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
for i := 0; i < int(sds.workers); i++ { for i := 0; i < int(numWorkers); i++ {
blockRange := RangeRequest{
Start: preRun.Start + uint64(i)*chunkSize,
Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1,
Params: preRun.Params,
}
// TODO(hack) this fixes quantization
if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize {
blockRange.Stop = preRun.Stop
}
wg.Add(1) wg.Add(1)
go func(id int) { go func(id int) {
defer wg.Done() defer wg.Done()
logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) for {
for j := blockRange.Start; j <= blockRange.Stop; j++ { select {
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { case workerSegment := <-workChan:
logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err) for j := workerSegment.Start; j <= workerSegment.Stop; j++ {
if err := sds.WriteStateDiffAt(j, workerSegment.Params); err != nil {
logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", id, workerSegment.Start, workerSegment.Stop, err)
}
}
logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, workerSegment.Start, workerSegment.Stop)
case <-quitChan:
return
} }
} }
logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
}(i) }(i)
} }
// break range up into segments
segments := segmentRange(numWorkers, preRun.Start, preRun.Stop, preRun.Params)
// send the segments to the work channel
for _, segment := range segments {
workChan <- segment
}
close(quitChan)
wg.Wait() wg.Wait()
} else { } else {
logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop) logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop)