Merge pull request #103 from cerc-io/dboreham/prerun-workers
use workers in prerun mode
This commit is contained in:
commit
46cd8b1834
@ -47,7 +47,8 @@ An example config file:
|
|||||||
trieWorkers = 4 # STATEDIFF_TRIE_WORKERS
|
trieWorkers = 4 # STATEDIFF_TRIE_WORKERS
|
||||||
|
|
||||||
[prerun]
|
[prerun]
|
||||||
only = false # PRERUN_ONLY
|
only = false # PRERUN_ONLY
|
||||||
|
parallel = true # PRERUN_PARALLEL
|
||||||
|
|
||||||
# to perform prerun in a specific range (optional)
|
# to perform prerun in a specific range (optional)
|
||||||
start = 0 # PRERUN_RANGE_START
|
start = 0 # PRERUN_RANGE_START
|
||||||
@ -122,6 +123,9 @@ An example config file:
|
|||||||
# path to custom chain config file (optional)
|
# path to custom chain config file (optional)
|
||||||
# keep chainID same as that in chain config file
|
# keep chainID same as that in chain config file
|
||||||
chainConfig = "./chain.json" # ETH_CHAIN_CONFIG
|
chainConfig = "./chain.json" # ETH_CHAIN_CONFIG
|
||||||
|
|
||||||
|
[debug]
|
||||||
|
pprof = false # DEBUG_PPROF
|
||||||
```
|
```
|
||||||
|
|
||||||
### Local Setup
|
### Local Setup
|
||||||
|
@ -50,6 +50,7 @@ const (
|
|||||||
PROM_DB_STATS = "PROM_DB_STATS"
|
PROM_DB_STATS = "PROM_DB_STATS"
|
||||||
|
|
||||||
PRERUN_ONLY = "PRERUN_ONLY"
|
PRERUN_ONLY = "PRERUN_ONLY"
|
||||||
|
PRERUN_PARALLEL = "PRERUN_PARALLEL"
|
||||||
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
||||||
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
||||||
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES"
|
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES"
|
||||||
@ -81,6 +82,8 @@ const (
|
|||||||
DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME"
|
DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME"
|
||||||
DATABASE_CONN_TIMEOUT = "DATABSE_CONN_TIMEOUT"
|
DATABASE_CONN_TIMEOUT = "DATABSE_CONN_TIMEOUT"
|
||||||
DATABASE_MAX_CONN_IDLE_TIME = "DATABASE_MAX_CONN_IDLE_TIME"
|
DATABASE_MAX_CONN_IDLE_TIME = "DATABASE_MAX_CONN_IDLE_TIME"
|
||||||
|
|
||||||
|
DEBUG_PPROF = "DEBUG_PPROF"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bind env vars for eth node and DB configuration
|
// Bind env vars for eth node and DB configuration
|
||||||
@ -135,6 +138,7 @@ func init() {
|
|||||||
|
|
||||||
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN)
|
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN)
|
||||||
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
||||||
|
viper.BindEnv("prerun.parallel", PRERUN_PARALLEL)
|
||||||
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
||||||
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
||||||
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
|
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
|
||||||
@ -146,4 +150,6 @@ func init() {
|
|||||||
|
|
||||||
viper.BindEnv("log.level", LOG_LEVEL)
|
viper.BindEnv("log.level", LOG_LEVEL)
|
||||||
viper.BindEnv("log.file", LOG_FILE_PATH)
|
viper.BindEnv("log.file", LOG_FILE_PATH)
|
||||||
|
|
||||||
|
viper.BindEnv("debug.pprof", DEBUG_PPROF)
|
||||||
}
|
}
|
||||||
|
@ -221,6 +221,7 @@ func init() {
|
|||||||
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
||||||
|
|
||||||
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only"))
|
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only"))
|
||||||
|
viper.BindPFlag("prerun.parallel", rootCmd.PersistentFlags().Lookup("prerun-parallel"))
|
||||||
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
|
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
|
||||||
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
|
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
|
||||||
viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes"))
|
viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes"))
|
||||||
@ -230,6 +231,8 @@ func init() {
|
|||||||
viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td"))
|
viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td"))
|
||||||
viper.BindPFlag("prerun.params.includeCode", rootCmd.PersistentFlags().Lookup("prerun-include-code"))
|
viper.BindPFlag("prerun.params.includeCode", rootCmd.PersistentFlags().Lookup("prerun-include-code"))
|
||||||
|
|
||||||
|
viper.BindPFlag("debug.pprof", rootCmd.PersistentFlags().Lookup("debug-pprof"))
|
||||||
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
26
cmd/serve.go
26
cmd/serve.go
@ -16,8 +16,11 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
@ -47,17 +50,38 @@ func init() {
|
|||||||
rootCmd.AddCommand(serveCmd)
|
rootCmd.AddCommand(serveCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func maxParallelism() int {
|
||||||
|
maxProcs := runtime.GOMAXPROCS(0)
|
||||||
|
numCPU := runtime.NumCPU()
|
||||||
|
if maxProcs < numCPU {
|
||||||
|
return maxProcs
|
||||||
|
}
|
||||||
|
return numCPU
|
||||||
|
}
|
||||||
|
|
||||||
func serve() {
|
func serve() {
|
||||||
logWithCommand.Info("Running eth-statediff-service serve command")
|
logWithCommand.Info("Running eth-statediff-service serve command")
|
||||||
|
logWithCommand.Infof("Parallelism: %d", maxParallelism())
|
||||||
|
|
||||||
statediffService, err := createStateDiffService()
|
statediffService, err := createStateDiffService()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enable the pprof agent if configured
|
||||||
|
if viper.GetBool("debug.pprof") {
|
||||||
|
// See: https://www.farsightsecurity.com/blog/txt-record/go-remote-profiling-20161028/
|
||||||
|
// For security reasons: do not use the default http multiplexor elsewhere in this process.
|
||||||
|
go func() {
|
||||||
|
logWithCommand.Info("Starting pprof listener on port 6060")
|
||||||
|
logWithCommand.Fatal(http.ListenAndServe("localhost:6060", nil))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// short circuit if we only want to perform prerun
|
// short circuit if we only want to perform prerun
|
||||||
if viper.GetBool("prerun.only") {
|
if viper.GetBool("prerun.only") {
|
||||||
if err := statediffService.Run(nil); err != nil {
|
parallel := viper.GetBool("prerun.parallel")
|
||||||
|
if err := statediffService.Run(nil, parallel); err != nil {
|
||||||
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -48,7 +48,7 @@ type StateDiffService interface {
|
|||||||
// Loop is the main event loop for processing state diffs
|
// Loop is the main event loop for processing state diffs
|
||||||
Loop(wg *sync.WaitGroup) error
|
Loop(wg *sync.WaitGroup) error
|
||||||
// Run is a one-off command to run on a predefined set of ranges
|
// Run is a one-off command to run on a predefined set of ranges
|
||||||
Run(ranges []RangeRequest) error
|
Run(ranges []RangeRequest, parallel bool) error
|
||||||
// StateDiffAt method to get state diff object at specific block
|
// StateDiffAt method to get state diff object at specific block
|
||||||
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
||||||
// StateDiffFor method to get state diff object at specific block
|
// StateDiffFor method to get state diff object at specific block
|
||||||
@ -118,18 +118,55 @@ func (sds *Service) APIs() []rpc.API {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
||||||
func (sds *Service) Run(rngs []RangeRequest) error {
|
func (sds *Service) Run(rngs []RangeRequest, parallel bool) error {
|
||||||
for _, preRun := range sds.preruns {
|
for _, preRun := range sds.preruns {
|
||||||
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
if parallel {
|
||||||
for i := preRun.Start; i <= preRun.Stop; i++ {
|
// Chunk overall range into N subranges for workers
|
||||||
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers)
|
||||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop,
|
||||||
|
preRun.Stop-preRun.Start+1, chunkSize, sds.workers)
|
||||||
|
// Sanity floor the chunk size
|
||||||
|
if chunkSize < 100 {
|
||||||
|
chunkSize = 100
|
||||||
|
logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100")
|
||||||
|
}
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
|
for i := 0; i < int(sds.workers); i++ {
|
||||||
|
blockRange := RangeRequest{
|
||||||
|
Start: preRun.Start + uint64(i)*chunkSize,
|
||||||
|
Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1,
|
||||||
|
Params: preRun.Params,
|
||||||
|
}
|
||||||
|
// TODO(hack) this fixes quantization
|
||||||
|
if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize {
|
||||||
|
blockRange.Stop = preRun.Stop
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
|
||||||
|
for j := blockRange.Start; j <= blockRange.Stop; j++ {
|
||||||
|
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
|
||||||
|
logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
} else {
|
||||||
|
logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||||
|
for i := preRun.Start; i <= preRun.Stop; i++ {
|
||||||
|
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
||||||
|
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sds.preruns = nil
|
sds.preruns = nil
|
||||||
|
// At present this code is never called so we have not written the parallel version:
|
||||||
for _, rng := range rngs {
|
for _, rng := range rngs {
|
||||||
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
|
logrus.Infof("processing requested range (%d, %d)", rng.Start, rng.Stop)
|
||||||
for i := rng.Start; i <= rng.Stop; i++ {
|
for i := rng.Start; i <= rng.Stop; i++ {
|
||||||
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
||||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user