replace prerun options with run command

This commit is contained in:
Roy Crihfield 2023-08-25 16:21:17 +08:00
parent b9cba4c605
commit e49839f859
7 changed files with 109 additions and 72 deletions

View File

@ -35,7 +35,6 @@ const (
LVLDB_ANCIENT = "LVLDB_ANCIENT" LVLDB_ANCIENT = "LVLDB_ANCIENT"
LVLDB_URL = "LVLDB_URL" LVLDB_URL = "LVLDB_URL"
STATEDIFF_PRERUN = "STATEDIFF_PRERUN"
STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS" STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS"
STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS" STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS"
STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE" STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE"
@ -49,15 +48,10 @@ const (
PROM_HTTP_PORT = "PROM_HTTP_PORT" PROM_HTTP_PORT = "PROM_HTTP_PORT"
PROM_DB_STATS = "PROM_DB_STATS" PROM_DB_STATS = "PROM_DB_STATS"
PRERUN_ONLY = "PRERUN_ONLY" RUN_INCLUDE_BLOCK = "RUN_INCLUDE_BLOCK"
PRERUN_RANGE_START = "PRERUN_RANGE_START" RUN_INCLUDE_RECEIPTS = "RUN_INCLUDE_RECEIPTS"
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP" RUN_INCLUDE_TD = "RUN_INCLUDE_TD"
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES" RUN_INCLUDE_CODE = "RUN_INCLUDE_CODE"
PRERUN_INTERMEDIATE_STORAGE_NODES = "PRERUN_INTERMEDIATE_STORAGE_NODES"
PRERUN_INCLUDE_BLOCK = "PRERUN_INCLUDE_BLOCK"
PRERUN_INCLUDE_RECEIPTS = "PRERUN_INCLUDE_RECEIPTS"
PRERUN_INCLUDE_TD = "PRERUN_INCLUDE_TD"
PRERUN_INCLUDE_CODE = "PRERUN_INCLUDE_CODE"
LOG_LEVEL = "LOG_LEVEL" LOG_LEVEL = "LOG_LEVEL"
LOG_FILE_PATH = "LOG_FILE_PATH" LOG_FILE_PATH = "LOG_FILE_PATH"
@ -129,16 +123,10 @@ func init() {
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS) viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE) viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN) viper.BindEnv("run.params.includeBlock", RUN_INCLUDE_BLOCK)
viper.BindEnv("prerun.only", PRERUN_ONLY) viper.BindEnv("run.params.includeReceipts", RUN_INCLUDE_RECEIPTS)
viper.BindEnv("prerun.start", PRERUN_RANGE_START) viper.BindEnv("run.params.includeTD", RUN_INCLUDE_TD)
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP) viper.BindEnv("run.params.includeCode", RUN_INCLUDE_CODE)
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
viper.BindEnv("prerun.params.intermediateStorageNodes", PRERUN_INTERMEDIATE_STORAGE_NODES)
viper.BindEnv("prerun.params.includeBlock", PRERUN_INCLUDE_BLOCK)
viper.BindEnv("prerun.params.includeReceipts", PRERUN_INCLUDE_RECEIPTS)
viper.BindEnv("prerun.params.includeTD", PRERUN_INCLUDE_TD)
viper.BindEnv("prerun.params.includeCode", PRERUN_INCLUDE_CODE)
viper.BindEnv("log.level", LOG_LEVEL) viper.BindEnv("log.level", LOG_LEVEL)
viper.BindEnv("log.file", LOG_FILE_PATH) viper.BindEnv("log.file", LOG_FILE_PATH)

View File

@ -45,17 +45,16 @@ var (
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{
Use: "eth-statediff-service", Use: "eth-statediff-service",
PersistentPreRun: initFuncs, PersistentPreRun: setupLoggingAndMetrics,
} }
func Execute() { func Execute() {
log.Info("----- Starting vDB -----")
if err := rootCmd.Execute(); err != nil { if err := rootCmd.Execute(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
func initFuncs(cmd *cobra.Command, args []string) { func setupLoggingAndMetrics(cmd *cobra.Command, args []string) {
logfile := viper.GetString("log.file") logfile := viper.GetString("log.file")
if logfile != "" { if logfile != "" {
file, err := os.OpenFile(logfile, file, err := os.OpenFile(logfile,
@ -121,7 +120,6 @@ func init() {
rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
rootCmd.PersistentFlags().String("leveldb-url", "", "url to primary leveldb-ethdb-rpc server") rootCmd.PersistentFlags().String("leveldb-url", "", "url to primary leveldb-ethdb-rpc server")
rootCmd.PersistentFlags().Bool("prerun", false, "turn on prerun of toml configured ranges")
rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently") rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently")
rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing") rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing")
rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers") rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers")
@ -158,15 +156,10 @@ func init() {
rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats") rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats")
rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics") rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics")
rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards") rootCmd.PersistentFlags().Bool("run-include-block", true, "include block data in the statediff payload")
rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range") rootCmd.PersistentFlags().Bool("run-include-receipts", true, "include receipts in the statediff payload")
rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range") rootCmd.PersistentFlags().Bool("run-include-td", true, "include td in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-intermediate-state-nodes", true, "include intermediate state nodes in state diff") rootCmd.PersistentFlags().Bool("run-include-code", true, "include code and codehash mappings in statediff payload")
rootCmd.PersistentFlags().Bool("prerun-intermediate-storage-nodes", true, "include intermediate storage nodes in state diff")
rootCmd.PersistentFlags().Bool("prerun-include-block", true, "include block data in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-include-receipts", true, "include receipts in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-include-td", true, "include td in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-include-code", true, "include code and codehash mappings in statediff payload")
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path")) viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path")) viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
@ -174,7 +167,6 @@ func init() {
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("statediff.prerun", rootCmd.PersistentFlags().Lookup("prerun"))
viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers")) viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers"))
viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers")) viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers"))
viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size")) viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size"))
@ -216,15 +208,10 @@ func init() {
viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats")) viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats"))
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics")) viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only")) viper.BindPFlag("run.params.includeBlock", rootCmd.PersistentFlags().Lookup("run-include-block"))
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start")) viper.BindPFlag("run.params.includeReceipts", rootCmd.PersistentFlags().Lookup("run-include-receipts"))
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop")) viper.BindPFlag("run.params.includeTD", rootCmd.PersistentFlags().Lookup("run-include-td"))
viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes")) viper.BindPFlag("run.params.includeCode", rootCmd.PersistentFlags().Lookup("run-include-code"))
viper.BindPFlag("prerun.params.intermediateStorageNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-storage-nodes"))
viper.BindPFlag("prerun.params.includeBlock", rootCmd.PersistentFlags().Lookup("prerun-include-block"))
viper.BindPFlag("prerun.params.includeReceipts", rootCmd.PersistentFlags().Lookup("prerun-include-receipts"))
viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td"))
viper.BindPFlag("prerun.params.includeCode", rootCmd.PersistentFlags().Lookup("prerun-include-code"))
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }

91
cmd/run.go Normal file
View File

@ -0,0 +1,91 @@
// Copyright © 2023 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"os"
"os/signal"
"sync"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
pkg "github.com/vulcanize/eth-statediff-service/pkg"
)
// serveCmd represents the serve command
var runCmd = &cobra.Command{
Use: "run",
Short: "Produce diffs for a specific block range",
Long: `Usage
./eth-statediff-service run --config={path to toml config file}`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *logrus.WithField("SubCommand", subCommand)
runRanges()
},
}
func init() {
rootCmd.AddCommand(runCmd)
}
func runRanges() {
service := createStateDiffService()
// start service and servers
var wg sync.WaitGroup
ranges := getConfiguredRanges()
service.Run(ranges)
// clean shutdown
shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
logWithCommand.Info("Received interrupt signal, shutting down")
service.Stop()
wg.Wait()
}
func getConfiguredRanges() []pkg.RangeRequest {
params := statediff.Params{
IncludeBlock: viper.GetBool("run.params.includeBlock"),
IncludeReceipts: viper.GetBool("run.params.includeReceipts"),
IncludeTD: viper.GetBool("run.params.includeTD"),
IncludeCode: viper.GetBool("run.params.includeCode"),
}
var addrStrs []string
viper.UnmarshalKey("run.params.watchedAddresses", &addrStrs)
addrs := make([]common.Address, len(addrStrs))
for i, addrStr := range addrStrs {
addrs[i] = common.HexToAddress(addrStr)
}
params.WatchedAddresses = addrs
var rawRanges []blockRange
viper.UnmarshalKey("run.ranges", &rawRanges)
blockRanges := make([]pkg.RangeRequest, len(rawRanges))
for i, rawRange := range rawRanges {
blockRanges[i] = pkg.RangeRequest{
Start: rawRange[0],
Stop: rawRange[1],
Params: params,
}
}
return blockRanges
}

View File

@ -52,14 +52,6 @@ func serve() {
service := createStateDiffService() service := createStateDiffService()
// short circuit if we only want to perform prerun
if viper.GetBool("prerun.only") {
if err := service.Run(nil); err != nil {
logWithCommand.Fatal("unable to perform prerun: %v", err)
}
return
}
// start service and servers // start service and servers
logWithCommand.Info("Starting statediff service") logWithCommand.Info("Starting statediff service")
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -83,7 +83,6 @@ func createStateDiffService() *sd.Service {
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"), ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"), TrieWorkers: viper.GetUint("statediff.trieWorkers"),
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"), WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
PreRuns: setupPreRunRanges(),
} }
return sd.NewStateDiffService(lvlDBReader, indexer, sdConf) return sd.NewStateDiffService(lvlDBReader, indexer, sdConf)
} }

View File

@ -5,5 +5,4 @@ type ServiceConfig struct {
ServiceWorkers uint ServiceWorkers uint
TrieWorkers uint TrieWorkers uint
WorkerQueueSize uint WorkerQueueSize uint
PreRuns []RangeRequest
} }

View File

@ -52,8 +52,6 @@ type Service struct {
queue chan RangeRequest queue chan RangeRequest
// number of ranges we can work over concurrently // number of ranges we can work over concurrently
workers uint workers uint
// ranges configured locally
preruns []RangeRequest
} }
// NewStateDiffService creates a new Service // NewStateDiffService creates a new Service
@ -71,7 +69,6 @@ func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer
indexer: indexer, indexer: indexer,
workers: conf.ServiceWorkers, workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize), queue: make(chan RangeRequest, conf.WorkerQueueSize),
preruns: conf.PreRuns,
} }
} }
@ -94,17 +91,7 @@ func (sds *Service) APIs() []rpc.API {
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards // Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
func (sds *Service) Run(rngs []RangeRequest) error { func (sds *Service) Run(rngs []RangeRequest) error {
for _, preRun := range sds.preruns {
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
for i := preRun.Start; i <= preRun.Stop; i++ {
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
}
}
}
sds.preruns = nil
for _, rng := range rngs { for _, rng := range rngs {
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
for i := rng.Start; i <= rng.Stop; i++ { for i := rng.Start; i <= rng.Stop; i++ {
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil { if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err) return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
@ -152,12 +139,6 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
} }
}(i) }(i)
} }
for _, preRun := range sds.preruns {
if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil {
close(sds.quitChan)
return err
}
}
return nil return nil
} }