Initial version of parallel workers for prerun-only mode
This commit is contained in:
parent
1fdb8763ac
commit
d235f3b84c
@ -48,6 +48,7 @@ An example config file:
|
||||
|
||||
[prerun]
|
||||
only = false # PRERUN_ONLY
|
||||
parallel = false # PRERUN_PARALLEL
|
||||
|
||||
# to perform prerun in a specific range (optional)
|
||||
start = 0 # PRERUN_RANGE_START
|
||||
|
@ -50,6 +50,7 @@ const (
|
||||
PROM_DB_STATS = "PROM_DB_STATS"
|
||||
|
||||
PRERUN_ONLY = "PRERUN_ONLY"
|
||||
PRERUN_PARALLEL = "PRERUN_PARALLEL"
|
||||
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
||||
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
||||
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES"
|
||||
@ -135,6 +136,7 @@ func init() {
|
||||
|
||||
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN)
|
||||
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
||||
viper.BindEnv("prerun.only", PRERUN_PARALLEL)
|
||||
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
||||
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
||||
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
|
||||
|
@ -57,7 +57,8 @@ func serve() {
|
||||
|
||||
// short circuit if we only want to perform prerun
|
||||
if viper.GetBool("prerun.only") {
|
||||
if err := statediffService.Run(nil); err != nil {
|
||||
parallel := viper.GetBool("prerun.parallel")
|
||||
if err := statediffService.Run(nil, parallel); err != nil {
|
||||
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
||||
}
|
||||
return
|
||||
|
@ -48,7 +48,7 @@ type StateDiffService interface {
|
||||
// Loop is the main event loop for processing state diffs
|
||||
Loop(wg *sync.WaitGroup) error
|
||||
// Run is a one-off command to run on a predefined set of ranges
|
||||
Run(ranges []RangeRequest) error
|
||||
Run(ranges []RangeRequest, parallel bool) error
|
||||
// StateDiffAt method to get state diff object at specific block
|
||||
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
||||
// StateDiffFor method to get state diff object at specific block
|
||||
@ -118,18 +118,55 @@ func (sds *Service) APIs() []rpc.API {
|
||||
}
|
||||
|
||||
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
||||
func (sds *Service) Run(rngs []RangeRequest) error {
|
||||
func (sds *Service) Run(rngs []RangeRequest, parallel bool) error {
|
||||
for _, preRun := range sds.preruns {
|
||||
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||
if parallel {
|
||||
// Chunk overall range into N subranges for workers
|
||||
chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers)
|
||||
logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop,
|
||||
preRun.Stop-preRun.Start, chunkSize, sds.workers)
|
||||
// Sanity floor the chunk size
|
||||
if chunkSize < 100 {
|
||||
chunkSize = 100
|
||||
logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100")
|
||||
}
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < int(sds.workers); i++ {
|
||||
blockRange := RangeRequest{
|
||||
Start: preRun.Start + uint64(i)*chunkSize,
|
||||
Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1,
|
||||
Params: preRun.Params,
|
||||
}
|
||||
// TODO(hack) this fixes quantization
|
||||
if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize {
|
||||
blockRange.Stop = preRun.Stop
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
|
||||
for j := blockRange.Start; j <= blockRange.Stop; j++ {
|
||||
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
|
||||
logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err)
|
||||
}
|
||||
}
|
||||
logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
} else {
|
||||
logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||
for i := preRun.Start; i <= preRun.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sds.preruns = nil
|
||||
// TODO(dboreham): seems like this code is never called so we have not written the parallel version
|
||||
for _, rng := range rngs {
|
||||
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
|
||||
logrus.Infof("processing requested range (%d, %d)", rng.Start, rng.Stop)
|
||||
for i := rng.Start; i <= rng.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
||||
|
Loading…
Reference in New Issue
Block a user