diff --git a/README.md b/README.md index 26088a3..96fd600 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,8 @@ An example config file: trieWorkers = 4 # STATEDIFF_TRIE_WORKERS [prerun] - only = false # PRERUN_ONLY + only = false # PRERUN_ONLY + parallel = false # PRERUN_PARALLEL # to perform prerun in a specific range (optional) start = 0 # PRERUN_RANGE_START diff --git a/cmd/env.go b/cmd/env.go index f9792c0..1e20582 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -50,6 +50,7 @@ const ( PROM_DB_STATS = "PROM_DB_STATS" PRERUN_ONLY = "PRERUN_ONLY" + PRERUN_PARALLEL = "PRERUN_PARALLEL" PRERUN_RANGE_START = "PRERUN_RANGE_START" PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP" PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES" @@ -135,6 +136,7 @@ func init() { viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN) viper.BindEnv("prerun.only", PRERUN_ONLY) + viper.BindEnv("prerun.only", PRERUN_PARALLEL) viper.BindEnv("prerun.start", PRERUN_RANGE_START) viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP) viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES) diff --git a/cmd/serve.go b/cmd/serve.go index f5959be..d8fc0dd 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -57,7 +57,8 @@ func serve() { // short circuit if we only want to perform prerun if viper.GetBool("prerun.only") { - if err := statediffService.Run(nil); err != nil { + parallel := viper.GetBool("prerun.parallel") + if err := statediffService.Run(nil, parallel); err != nil { logWithCommand.Fatal("unable to perform prerun: %v", err) } return diff --git a/pkg/service.go b/pkg/service.go index 875d52b..129ac2b 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -48,7 +48,7 @@ type StateDiffService interface { // Loop is the main event loop for processing state diffs Loop(wg *sync.WaitGroup) error // Run is a one-off command to run on a predefined set of ranges - Run(ranges []RangeRequest) error + Run(ranges []RangeRequest, parallel bool) error // StateDiffAt method to get state diff object at specific block StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // StateDiffFor method to get state diff object at specific block @@ -118,18 +118,55 @@ func (sds *Service) APIs() []rpc.API { } // Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards -func (sds *Service) Run(rngs []RangeRequest) error { +func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { for _, preRun := range sds.preruns { - logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop) - for i := preRun.Start; i <= preRun.Stop; i++ { - if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil { - return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err) + if parallel { + // Chunk overall range into N subranges for workers + chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers) + logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop, + preRun.Stop-preRun.Start, chunkSize, sds.workers) + // Sanity floor the chunk size + if chunkSize < 100 { + chunkSize = 100 + logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100") + } + wg := new(sync.WaitGroup) + for i := 0; i < int(sds.workers); i++ { + blockRange := RangeRequest{ + Start: preRun.Start + uint64(i)*chunkSize, + Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1, + Params: preRun.Params, + } + // TODO(hack) this fixes quantization + if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize { + blockRange.Stop = preRun.Stop + } + wg.Add(1) + go func(id int) { + defer wg.Done() + logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + for j := blockRange.Start; j <= blockRange.Stop; j++ { + if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { + logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err) + } + } + logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + }(i) + } + wg.Wait() + } else { + logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop) + for i := preRun.Start; i <= preRun.Stop; i++ { + if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil { + return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err) + } } } } sds.preruns = nil + // TODO(dboreham): seems like this code is never called so we have not written the parallel version for _, rng := range rngs { - logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop) + logrus.Infof("processing requested range (%d, %d)", rng.Start, rng.Stop) for i := rng.Start; i <= rng.Stop; i++ { if err := sds.WriteStateDiffAt(i, rng.Params); err != nil { return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)