From d235f3b84c8d7bb98f9e63d629c044d966c9320b Mon Sep 17 00:00:00 2001 From: David Boreham Date: Tue, 30 Aug 2022 22:31:54 -0600 Subject: [PATCH 1/9] Initial version of parallel workers for prerun-only mode --- README.md | 3 ++- cmd/env.go | 2 ++ cmd/serve.go | 3 ++- pkg/service.go | 51 +++++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 26088a3..96fd600 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,8 @@ An example config file: trieWorkers = 4 # STATEDIFF_TRIE_WORKERS [prerun] - only = false # PRERUN_ONLY + only = false # PRERUN_ONLY + parallel = false # PRERUN_PARALLEL # to perform prerun in a specific range (optional) start = 0 # PRERUN_RANGE_START diff --git a/cmd/env.go b/cmd/env.go index f9792c0..1e20582 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -50,6 +50,7 @@ const ( PROM_DB_STATS = "PROM_DB_STATS" PRERUN_ONLY = "PRERUN_ONLY" + PRERUN_PARALLEL = "PRERUN_PARALLEL" PRERUN_RANGE_START = "PRERUN_RANGE_START" PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP" PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES" @@ -135,6 +136,7 @@ func init() { viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN) viper.BindEnv("prerun.only", PRERUN_ONLY) + viper.BindEnv("prerun.only", PRERUN_PARALLEL) viper.BindEnv("prerun.start", PRERUN_RANGE_START) viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP) viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES) diff --git a/cmd/serve.go b/cmd/serve.go index f5959be..d8fc0dd 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -57,7 +57,8 @@ func serve() { // short circuit if we only want to perform prerun if viper.GetBool("prerun.only") { - if err := statediffService.Run(nil); err != nil { + parallel := viper.GetBool("prerun.parallel") + if err := statediffService.Run(nil, parallel); err != nil { logWithCommand.Fatal("unable to perform prerun: %v", err) } return diff --git a/pkg/service.go b/pkg/service.go index 875d52b..129ac2b 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -48,7 +48,7 @@ type StateDiffService interface { // Loop is the main event loop for processing state diffs Loop(wg *sync.WaitGroup) error // Run is a one-off command to run on a predefined set of ranges - Run(ranges []RangeRequest) error + Run(ranges []RangeRequest, parallel bool) error // StateDiffAt method to get state diff object at specific block StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // StateDiffFor method to get state diff object at specific block @@ -118,18 +118,55 @@ func (sds *Service) APIs() []rpc.API { } // Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards -func (sds *Service) Run(rngs []RangeRequest) error { +func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { for _, preRun := range sds.preruns { - logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop) - for i := preRun.Start; i <= preRun.Stop; i++ { - if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil { - return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err) + if parallel { + // Chunk overall range into N subranges for workers + chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers) + logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop, + preRun.Stop-preRun.Start, chunkSize, sds.workers) + // Sanity floor the chunk size + if chunkSize < 100 { + chunkSize = 100 + logrus.Infof("Computed range chunk size for each worker is too small, defaulting to 100") + } + wg := new(sync.WaitGroup) + for i := 0; i < int(sds.workers); i++ { + blockRange := RangeRequest{ + Start: preRun.Start + uint64(i)*chunkSize, + Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1, + Params: preRun.Params, + } + // TODO(hack) this fixes quantization + if blockRange.Stop < preRun.Stop && preRun.Stop-blockRange.Stop < chunkSize { + blockRange.Stop = preRun.Stop + } + wg.Add(1) + go func(id int) { + defer wg.Done() + logrus.Infof("prerun worker %d processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + for j := blockRange.Start; j <= blockRange.Stop; j++ { + if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { + logrus.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, blockRange.Start, blockRange.Stop, err) + } + } + logrus.Infof("prerun worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + }(i) + } + wg.Wait() + } else { + logrus.Infof("sequential processing prerun range (%d, %d)", preRun.Start, preRun.Stop) + for i := preRun.Start; i <= preRun.Stop; i++ { + if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil { + return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err) + } } } } sds.preruns = nil + // TODO(dboreham): seems like this code is never called so we have not written the parallel version for _, rng := range rngs { - logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop) + logrus.Infof("processing requested range (%d, %d)", rng.Start, rng.Stop) for i := rng.Start; i <= rng.Stop; i++ { if err := sds.WriteStateDiffAt(i, rng.Params); err != nil { return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err) From 7f8885f044a64df4d6e537b126af73c29d5beb29 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Wed, 31 Aug 2022 07:54:16 -0600 Subject: [PATCH 2/9] Add comment --- pkg/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/service.go b/pkg/service.go index 129ac2b..040dc62 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -133,6 +133,7 @@ func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { wg := new(sync.WaitGroup) for i := 0; i < int(sds.workers); i++ { blockRange := RangeRequest{ + // TODO(dboreham): check this math doesn't leave gaps (are start/stop inclusive?) Start: preRun.Start + uint64(i)*chunkSize, Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1, Params: preRun.Params, From 5b7f5feb1bc0c22c6821ade298710e53cfc679b2 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Thu, 1 Sep 2022 23:42:55 -0600 Subject: [PATCH 3/9] Enable pprof --- cmd/serve.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/serve.go b/cmd/serve.go index d8fc0dd..7cff741 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -16,6 +16,8 @@ package cmd import ( + "net/http" + _ "net/http/pprof" "os" "os/signal" "sync" @@ -57,6 +59,11 @@ func serve() { // short circuit if we only want to perform prerun if viper.GetBool("prerun.only") { + // See: https://www.farsightsecurity.com/blog/txt-record/go-remote-profiling-20161028/ + // Do not use the default http multiplexor elsewhere in this process. + go func() { + logWithCommand.Fatal(http.ListenAndServe("localhost:6060", nil)) + }() parallel := viper.GetBool("prerun.parallel") if err := statediffService.Run(nil, parallel); err != nil { logWithCommand.Fatal("unable to perform prerun: %v", err) From 99f84b6fe6acdbefad6711efd9c9011b953dcb8d Mon Sep 17 00:00:00 2001 From: David Boreham Date: Thu, 1 Sep 2022 23:43:38 -0600 Subject: [PATCH 4/9] Comment --- cmd/serve.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/serve.go b/cmd/serve.go index 7cff741..eb4438f 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -59,6 +59,7 @@ func serve() { // short circuit if we only want to perform prerun if viper.GetBool("prerun.only") { + // TODO: make pprof optional // See: https://www.farsightsecurity.com/blog/txt-record/go-remote-profiling-20161028/ // Do not use the default http multiplexor elsewhere in this process. go func() { From 6d103cb1f120cd29cc4abc61dc29417703bc79f3 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Fri, 2 Sep 2022 10:51:13 -0600 Subject: [PATCH 5/9] Add some logging --- cmd/serve.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cmd/serve.go b/cmd/serve.go index eb4438f..cd53a60 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -20,6 +20,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "runtime" "sync" "github.com/ethereum/go-ethereum/rpc" @@ -49,8 +50,18 @@ func init() { rootCmd.AddCommand(serveCmd) } +func maxParallelism() int { + maxProcs := runtime.GOMAXPROCS(0) + numCPU := runtime.NumCPU() + if maxProcs < numCPU { + return maxProcs + } + return numCPU +} + func serve() { logWithCommand.Info("Running eth-statediff-service serve command") + logWithCommand.Infof("Parallelism: %d", maxParallelism()) statediffService, err := createStateDiffService() if err != nil { @@ -63,6 +74,7 @@ func serve() { // See: https://www.farsightsecurity.com/blog/txt-record/go-remote-profiling-20161028/ // Do not use the default http multiplexor elsewhere in this process. go func() { + logWithCommand.Info("Starting pprof listener on port 6060") logWithCommand.Fatal(http.ListenAndServe("localhost:6060", nil)) }() parallel := viper.GetBool("prerun.parallel") From b1440d967382a788aea0c6bef628433321395482 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Wed, 21 Sep 2022 13:06:41 -0600 Subject: [PATCH 6/9] Fix off by one error --- pkg/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/service.go b/pkg/service.go index 040dc62..cefd726 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -124,7 +124,7 @@ func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { // Chunk overall range into N subranges for workers chunkSize := (preRun.Stop - preRun.Start) / uint64(sds.workers) logrus.Infof("parallel processing prerun range (%d, %d) (%d blocks) divided into %d sized chunks with %d workers", preRun.Start, preRun.Stop, - preRun.Stop-preRun.Start, chunkSize, sds.workers) + preRun.Stop-preRun.Start+1, chunkSize, sds.workers) // Sanity floor the chunk size if chunkSize < 100 { chunkSize = 100 From a2772762e1e0a4f3716d647f7946f0e43d6bf0b5 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Wed, 21 Sep 2022 13:33:15 -0600 Subject: [PATCH 7/9] Make pprof configurable, remove todos --- README.md | 3 +++ cmd/env.go | 6 +++++- cmd/serve.go | 11 +++++++---- pkg/service.go | 3 +-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 96fd600..96fc6fd 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,9 @@ An example config file: # path to custom chain config file (optional) # keep chainID same as that in chain config file chainConfig = "./chain.json" # ETH_CHAIN_CONFIG + +[debug] + pprof = false # Enable pprof agent listener on port 6060 ``` ### Local Setup diff --git a/cmd/env.go b/cmd/env.go index 1e20582..4fe244c 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -82,6 +82,8 @@ const ( DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME" DATABASE_CONN_TIMEOUT = "DATABSE_CONN_TIMEOUT" DATABASE_MAX_CONN_IDLE_TIME = "DATABASE_MAX_CONN_IDLE_TIME" + + DEBUG_PPROF = "DEBUG_PPROF" ) // Bind env vars for eth node and DB configuration @@ -136,7 +138,7 @@ func init() { viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN) viper.BindEnv("prerun.only", PRERUN_ONLY) - viper.BindEnv("prerun.only", PRERUN_PARALLEL) + viper.BindEnv("prerun.parallel", PRERUN_PARALLEL) viper.BindEnv("prerun.start", PRERUN_RANGE_START) viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP) viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES) @@ -148,4 +150,6 @@ func init() { viper.BindEnv("log.level", LOG_LEVEL) viper.BindEnv("log.file", LOG_FILE_PATH) + + viper.BindEnv("debug.pprof", DEBUG_PPROF) } diff --git a/cmd/serve.go b/cmd/serve.go index cd53a60..b178132 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -68,15 +68,18 @@ func serve() { logWithCommand.Fatal(err) } - // short circuit if we only want to perform prerun - if viper.GetBool("prerun.only") { - // TODO: make pprof optional + // Enable the pprof agent if configured + if viper.GetBool("debug.pprof") { // See: https://www.farsightsecurity.com/blog/txt-record/go-remote-profiling-20161028/ - // Do not use the default http multiplexor elsewhere in this process. + // For security reasons: do not use the default http multiplexor elsewhere in this process. go func() { logWithCommand.Info("Starting pprof listener on port 6060") logWithCommand.Fatal(http.ListenAndServe("localhost:6060", nil)) }() + } + + // short circuit if we only want to perform prerun + if viper.GetBool("prerun.only") { parallel := viper.GetBool("prerun.parallel") if err := statediffService.Run(nil, parallel); err != nil { logWithCommand.Fatal("unable to perform prerun: %v", err) diff --git a/pkg/service.go b/pkg/service.go index cefd726..0307236 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -133,7 +133,6 @@ func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { wg := new(sync.WaitGroup) for i := 0; i < int(sds.workers); i++ { blockRange := RangeRequest{ - // TODO(dboreham): check this math doesn't leave gaps (are start/stop inclusive?) Start: preRun.Start + uint64(i)*chunkSize, Stop: preRun.Start + uint64(i)*chunkSize + chunkSize - 1, Params: preRun.Params, @@ -165,7 +164,7 @@ func (sds *Service) Run(rngs []RangeRequest, parallel bool) error { } } sds.preruns = nil - // TODO(dboreham): seems like this code is never called so we have not written the parallel version + // At present this code is never called so we have not written the parallel version: for _, rng := range rngs { logrus.Infof("processing requested range (%d, %d)", rng.Start, rng.Stop) for i := rng.Start; i <= rng.Stop; i++ { From 14b9c169bc704444f9213623c7128ad7605a6381 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Wed, 21 Sep 2022 14:00:45 -0600 Subject: [PATCH 8/9] Add viper boiler plate --- README.md | 2 +- cmd/root.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 96fc6fd..c9bdba7 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ An example config file: [prerun] only = false # PRERUN_ONLY - parallel = false # PRERUN_PARALLEL + parallel = true # PRERUN_PARALLEL # to perform prerun in a specific range (optional) start = 0 # PRERUN_RANGE_START diff --git a/cmd/root.go b/cmd/root.go index a5e628c..ca1ed45 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -221,6 +221,7 @@ func init() { viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics")) viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only")) + viper.BindPFlag("prerun.parallel", rootCmd.PersistentFlags().Lookup("prerun-parallel")) viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start")) viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop")) viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes")) @@ -230,6 +231,8 @@ func init() { viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td")) viper.BindPFlag("prerun.params.includeCode", rootCmd.PersistentFlags().Lookup("prerun-include-code")) + viper.BindPFlag("debug.pprof", rootCmd.PersistentFlags().Lookup("debug-pprof")) + rand.Seed(time.Now().UnixNano()) } From c0cd87ba6a822eb9268f4c1e06df58fdbcaa3403 Mon Sep 17 00:00:00 2001 From: David Boreham Date: Thu, 22 Sep 2022 07:26:32 -0600 Subject: [PATCH 9/9] Make doc consistent --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7a10638..21f636f 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ An example config file: chainConfig = "./chain.json" # ETH_CHAIN_CONFIG [debug] - pprof = false # Enable pprof agent listener on port 6060 + pprof = false # DEBUG_PPROF ``` ### Local Setup