diff --git a/cmd/client.go b/cmd/client.go new file mode 100644 index 0000000..359f5ac --- /dev/null +++ b/cmd/client.go @@ -0,0 +1,72 @@ +// Copyright © 2019 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "os" + "os/signal" + "sync" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +// clientCmd represents the serve command +var clientCmd = &cobra.Command{ + Use: "client", + Short: "Client for queuing range requests", + Long: `Usage + +./eth-statediff-service client --config={path to toml config file}`, + Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *logrus.WithField("SubCommand", subCommand) + client() + }, +} + +func init() { + rootCmd.AddCommand(clientCmd) +} + +func client() { + logWithCommand.Info("Running eth-statediff-service client command") + + statediffService, err := createStateDiffService() + if err != nil { + logWithCommand.Fatal(err) + } + + // start service and clientrs + logWithCommand.Info("Starting statediff service") + wg := new(sync.WaitGroup) + if err := statediffService.Loop(wg); err != nil { + logWithCommand.Fatalf("unable to start statediff service: %v", err) + } + logWithCommand.Info("Starting RPC clientrs") + if err := startServers(statediffService); err != nil { + logWithCommand.Fatal(err) + } + logWithCommand.Info("RPC clientrs successfully spun up; awaiting requests") + + // clean shutdown + shutdown := make(chan os.Signal) + signal.Notify(shutdown, os.Interrupt) + <-shutdown + logWithCommand.Info("Received interrupt signal, shutting down") + statediffService.Stop() + wg.Wait() +} diff --git a/cmd/env.go b/cmd/env.go index 4f81f65..ab1cd1e 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -33,6 +33,8 @@ const ( TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB" LVLDB_PATH = "LVLDB_PATH" LVLDB_ANCIENT = "LVLDB_ANCIENT" + + STATEDIFF_PRERUN = "STATEDIFF_PRERUN" STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS" STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS" STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE" @@ -45,6 +47,13 @@ const ( PROM_HTTP_ADDR = "PROM_HTTP_ADDR" PROM_HTTP_PORT = "PROM_HTTP_PORT" PROM_DB_STATS = "PROM_DB_STATS" + + PRERUN_ONLY = "PRERUN_ONLY" + PRERUN_RANGE_START = "PRERUN_RANGE_START" + PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP" + + LOG_LEVEL = "LOG_LEVEL" + LOG_FILE_PATH = "LOG_FILE_PATH" ) // Bind env vars for eth node and DB configuration @@ -82,4 +91,12 @@ func init() { viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS) viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS) viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE) + + viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN) + viper.BindEnv("prerun.only", PRERUN_ONLY) + viper.BindEnv("prerun.start", PRERUN_RANGE_START) + viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP) + + viper.BindEnv("log.level", LOG_LEVEL) + viper.BindEnv("log.file", LOG_FILE_PATH) } diff --git a/cmd/root.go b/cmd/root.go index 6356e0b..0a54421 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -84,7 +84,6 @@ func initFuncs(cmd *cobra.Command, args []string) { } func logLevel() error { - viper.BindEnv("log.level", "LOGRUS_LEVEL") lvl, err := log.ParseLevel(viper.GetString("log.level")) if err != nil { return err @@ -139,6 +138,10 @@ func init() { rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats") rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics") + rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards") + rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range") + rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range") + viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path")) viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path")) viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) @@ -166,6 +169,9 @@ func init() { viper.BindPFlag("prom.httpPort", rootCmd.PersistentFlags().Lookup("prom-http-port")) viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats")) viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics")) + viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only")) + viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start")) + viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop")) } func initConfig() { diff --git a/cmd/serve.go b/cmd/serve.go index 586b383..947014e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -55,6 +55,14 @@ func serve() { logWithCommand.Fatal(err) } + // short circuit if we only want to perform prerun + if viper.GetBool("prerun.only") { + if err := statediffService.Run(nil); err != nil { + logWithCommand.Fatal("unable to perform prerun: %v", err) + } + return + } + // start service and servers logWithCommand.Info("Starting statediff service") wg := new(sync.WaitGroup) diff --git a/cmd/util.go b/cmd/util.go index 7d38bd9..08d8a9b 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -122,6 +122,16 @@ func setupPreRunRanges() []sd.RangeRequest { Params: preRunParams, } } + if viper.IsSet("prerun.start") && viper.IsSet("prerun.stop") { + hardStart := viper.GetInt("prerun.start") + hardStop := viper.GetInt("prerun.stop") + blockRanges = append(blockRanges, sd.RangeRequest{ + Start: uint64(hardStart), + Stop: uint64(hardStop), + Params: preRunParams, + }) + } + return blockRanges } diff --git a/environments/config.toml b/environments/config.toml new file mode 100644 index 0000000..677af33 --- /dev/null +++ b/environments/config.toml @@ -0,0 +1,51 @@ +[leveldb] + path = "/app/geth-rw/chaindata" + ancient = "/app/geth-rw/chaindata/ancient" + +[server] + ipcPath = "" + httpPath = "0.0.0.0:8545" + +[statediff] + prerun = true + serviceWorkers = 1 + workerQueueSize = 1024 + trieWorkers = 16 + +[prerun] + only = true + ranges = [] + [prerun.params] + intermediateStateNodes = true + intermediateStorageNodes = true + includeBlock = true + includeReceipts = true + includeTD = true + includeCode = true + watchedAddresses = [] + watchedStorageKeys = [] + +[log] + file = "" + level = "info" + +[eth] + chainID = 1 + +[database] + name = "" + hostname = "" + port = 5432 + user = "" + password = "" + +[cache] + database = 1024 + trie = 4096 + +[prom] + dbStats = false + metrics = true + http = true + httpAddr = "0.0.0.0" + httpPort = 9100 diff --git a/environments/example.toml b/environments/example.toml index 0ee5b6f..dbd7282 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -13,6 +13,7 @@ trieWorkers = 4 [prerun] + only = false ranges = [ [0, 1000] ] diff --git a/pkg/service.go b/pkg/service.go index b176a2f..cecf6c0 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -47,6 +47,8 @@ type StateDiffService interface { Protocols() []p2p.Protocol // Loop is the main event loop for processing state diffs Loop(wg *sync.WaitGroup) error + // Run is a one-off command to run on a predefined set of ranges + Run(ranges []RangeRequest) error // StateDiffAt method to get state diff object at specific block StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // StateDiffFor method to get state diff object at specific block @@ -115,11 +117,34 @@ func (sds *Service) APIs() []rpc.API { } } +// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards +func (sds *Service) Run(rngs []RangeRequest) error { + for _, preRun := range sds.preruns { + logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop) + for i := preRun.Start; i <= preRun.Stop; i++ { + if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil { + return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err) + } + } + } + sds.preruns = nil + for _, rng := range rngs { + logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop) + for i := rng.Start; i <= rng.Stop; i++ { + if err := sds.WriteStateDiffAt(i, rng.Params); err != nil { + return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err) + } + } + } + return nil +} + // Loop is an empty service loop for awaiting rpc requests func (sds *Service) Loop(wg *sync.WaitGroup) error { if sds.quitChan != nil { return fmt.Errorf("service loop is already running") } + sds.quitChan = make(chan struct{}) for i := 0; i < int(sds.workers); i++ { wg.Add(1) @@ -128,6 +153,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error { for { select { case blockRange := <-sds.queue: + logrus.Infof("service worker %d received range (%d, %d) off of work queue, beginning processing", id, blockRange.Start, blockRange.Stop) prom.DecQueuedRanges() for j := blockRange.Start; j <= blockRange.Stop; j++ { if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {