enable configuration of a prerun range by env variable; prerun only mode
This commit is contained in:
parent
84fff0aa03
commit
3d00e3ed05
@ -45,6 +45,10 @@ const (
|
|||||||
PROM_HTTP_ADDR = "PROM_HTTP_ADDR"
|
PROM_HTTP_ADDR = "PROM_HTTP_ADDR"
|
||||||
PROM_HTTP_PORT = "PROM_HTTP_PORT"
|
PROM_HTTP_PORT = "PROM_HTTP_PORT"
|
||||||
PROM_DB_STATS = "PROM_DB_STATS"
|
PROM_DB_STATS = "PROM_DB_STATS"
|
||||||
|
|
||||||
|
PRERUN_ONLY = "PRERUN_ONLY"
|
||||||
|
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
||||||
|
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bind env vars for eth node and DB configuration
|
// Bind env vars for eth node and DB configuration
|
||||||
@ -82,4 +86,8 @@ func init() {
|
|||||||
viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS)
|
viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS)
|
||||||
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
|
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
|
||||||
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
|
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
|
||||||
|
|
||||||
|
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
||||||
|
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
||||||
|
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
||||||
}
|
}
|
||||||
|
@ -139,6 +139,10 @@ func init() {
|
|||||||
rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats")
|
rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats")
|
||||||
rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics")
|
rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics")
|
||||||
|
|
||||||
|
rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards")
|
||||||
|
rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range")
|
||||||
|
rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range")
|
||||||
|
|
||||||
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
|
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
|
||||||
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
|
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
|
||||||
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
|
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
|
||||||
@ -166,6 +170,9 @@ func init() {
|
|||||||
viper.BindPFlag("prom.httpPort", rootCmd.PersistentFlags().Lookup("prom-http-port"))
|
viper.BindPFlag("prom.httpPort", rootCmd.PersistentFlags().Lookup("prom-http-port"))
|
||||||
viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats"))
|
viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats"))
|
||||||
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
||||||
|
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only"))
|
||||||
|
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
|
||||||
|
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func initConfig() {
|
func initConfig() {
|
||||||
|
@ -55,6 +55,14 @@ func serve() {
|
|||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// short circuit if we only want to perform prerun
|
||||||
|
if viper.GetBool("prerun.only") {
|
||||||
|
if err := statediffService.Run(nil); err != nil {
|
||||||
|
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// start service and servers
|
// start service and servers
|
||||||
logWithCommand.Info("Starting statediff service")
|
logWithCommand.Info("Starting statediff service")
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
|
10
cmd/util.go
10
cmd/util.go
@ -122,6 +122,16 @@ func setupPreRunRanges() []sd.RangeRequest {
|
|||||||
Params: preRunParams,
|
Params: preRunParams,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if viper.IsSet("prerun.start") && viper.IsSet("prerun.stop") {
|
||||||
|
hardStart := viper.GetInt("prerun.start")
|
||||||
|
hardStop := viper.GetInt("prerun.stop")
|
||||||
|
blockRanges = append(blockRanges, sd.RangeRequest{
|
||||||
|
Start: uint64(hardStart),
|
||||||
|
Stop: uint64(hardStop),
|
||||||
|
Params: preRunParams,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return blockRanges
|
return blockRanges
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
trieWorkers = 4
|
trieWorkers = 4
|
||||||
|
|
||||||
[prerun]
|
[prerun]
|
||||||
|
only = false
|
||||||
ranges = [
|
ranges = [
|
||||||
[0, 1000]
|
[0, 1000]
|
||||||
]
|
]
|
||||||
|
@ -47,6 +47,8 @@ type StateDiffService interface {
|
|||||||
Protocols() []p2p.Protocol
|
Protocols() []p2p.Protocol
|
||||||
// Loop is the main event loop for processing state diffs
|
// Loop is the main event loop for processing state diffs
|
||||||
Loop(wg *sync.WaitGroup) error
|
Loop(wg *sync.WaitGroup) error
|
||||||
|
// Run is a one-off command to run on a predefined set of ranges
|
||||||
|
Run(ranges []RangeRequest) error
|
||||||
// StateDiffAt method to get state diff object at specific block
|
// StateDiffAt method to get state diff object at specific block
|
||||||
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
||||||
// StateDiffFor method to get state diff object at specific block
|
// StateDiffFor method to get state diff object at specific block
|
||||||
@ -115,11 +117,34 @@ func (sds *Service) APIs() []rpc.API {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
||||||
|
func (sds *Service) Run(rngs []RangeRequest) error {
|
||||||
|
for _, preRun := range sds.preruns {
|
||||||
|
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||||
|
for i := preRun.Start; i <= preRun.Stop; i++ {
|
||||||
|
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
||||||
|
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sds.preruns = nil
|
||||||
|
for _, rng := range rngs {
|
||||||
|
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
|
||||||
|
for i := rng.Start; i <= rng.Stop; i++ {
|
||||||
|
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
||||||
|
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Loop is an empty service loop for awaiting rpc requests
|
// Loop is an empty service loop for awaiting rpc requests
|
||||||
func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
||||||
if sds.quitChan != nil {
|
if sds.quitChan != nil {
|
||||||
return fmt.Errorf("service loop is already running")
|
return fmt.Errorf("service loop is already running")
|
||||||
}
|
}
|
||||||
|
|
||||||
sds.quitChan = make(chan struct{})
|
sds.quitChan = make(chan struct{})
|
||||||
for i := 0; i < int(sds.workers); i++ {
|
for i := 0; i < int(sds.workers); i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -128,6 +153,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case blockRange := <-sds.queue:
|
case blockRange := <-sds.queue:
|
||||||
|
logrus.Infof("service worker %d received range (%d, %d) off of work queue, beginning processing", id, blockRange.Start, blockRange.Stop)
|
||||||
prom.DecQueuedRanges()
|
prom.DecQueuedRanges()
|
||||||
for j := blockRange.Start; j <= blockRange.Stop; j++ {
|
for j := blockRange.Start; j <= blockRange.Stop; j++ {
|
||||||
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
|
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user