replace prerun options with run command
This commit is contained in:
parent
472b2a7f61
commit
73292c4046
28
cmd/env.go
28
cmd/env.go
@ -35,7 +35,6 @@ const (
|
||||
LVLDB_ANCIENT = "LVLDB_ANCIENT"
|
||||
LVLDB_URL = "LVLDB_URL"
|
||||
|
||||
STATEDIFF_PRERUN = "STATEDIFF_PRERUN"
|
||||
STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS"
|
||||
STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS"
|
||||
STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE"
|
||||
@ -49,15 +48,10 @@ const (
|
||||
PROM_HTTP_PORT = "PROM_HTTP_PORT"
|
||||
PROM_DB_STATS = "PROM_DB_STATS"
|
||||
|
||||
PRERUN_ONLY = "PRERUN_ONLY"
|
||||
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
||||
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
||||
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES"
|
||||
PRERUN_INTERMEDIATE_STORAGE_NODES = "PRERUN_INTERMEDIATE_STORAGE_NODES"
|
||||
PRERUN_INCLUDE_BLOCK = "PRERUN_INCLUDE_BLOCK"
|
||||
PRERUN_INCLUDE_RECEIPTS = "PRERUN_INCLUDE_RECEIPTS"
|
||||
PRERUN_INCLUDE_TD = "PRERUN_INCLUDE_TD"
|
||||
PRERUN_INCLUDE_CODE = "PRERUN_INCLUDE_CODE"
|
||||
RUN_INCLUDE_BLOCK = "RUN_INCLUDE_BLOCK"
|
||||
RUN_INCLUDE_RECEIPTS = "RUN_INCLUDE_RECEIPTS"
|
||||
RUN_INCLUDE_TD = "RUN_INCLUDE_TD"
|
||||
RUN_INCLUDE_CODE = "RUN_INCLUDE_CODE"
|
||||
|
||||
LOG_LEVEL = "LOG_LEVEL"
|
||||
LOG_FILE_PATH = "LOG_FILE_PATH"
|
||||
@ -129,16 +123,10 @@ func init() {
|
||||
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
|
||||
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
|
||||
|
||||
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN)
|
||||
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
||||
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
||||
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
||||
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
|
||||
viper.BindEnv("prerun.params.intermediateStorageNodes", PRERUN_INTERMEDIATE_STORAGE_NODES)
|
||||
viper.BindEnv("prerun.params.includeBlock", PRERUN_INCLUDE_BLOCK)
|
||||
viper.BindEnv("prerun.params.includeReceipts", PRERUN_INCLUDE_RECEIPTS)
|
||||
viper.BindEnv("prerun.params.includeTD", PRERUN_INCLUDE_TD)
|
||||
viper.BindEnv("prerun.params.includeCode", PRERUN_INCLUDE_CODE)
|
||||
viper.BindEnv("run.params.includeBlock", RUN_INCLUDE_BLOCK)
|
||||
viper.BindEnv("run.params.includeReceipts", RUN_INCLUDE_RECEIPTS)
|
||||
viper.BindEnv("run.params.includeTD", RUN_INCLUDE_TD)
|
||||
viper.BindEnv("run.params.includeCode", RUN_INCLUDE_CODE)
|
||||
|
||||
viper.BindEnv("log.level", LOG_LEVEL)
|
||||
viper.BindEnv("log.file", LOG_FILE_PATH)
|
||||
|
17
cmd/root.go
17
cmd/root.go
@ -45,17 +45,16 @@ var (
|
||||
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "eth-statediff-service",
|
||||
PersistentPreRun: initFuncs,
|
||||
PersistentPreRun: setupLoggingAndMetrics,
|
||||
}
|
||||
|
||||
func Execute() {
|
||||
log.Info("----- Starting vDB -----")
|
||||
if err := rootCmd.Execute(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func initFuncs(cmd *cobra.Command, args []string) {
|
||||
func setupLoggingAndMetrics(cmd *cobra.Command, args []string) {
|
||||
logfile := viper.GetString("log.file")
|
||||
if logfile != "" {
|
||||
file, err := os.OpenFile(logfile,
|
||||
@ -121,7 +120,6 @@ func init() {
|
||||
rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
|
||||
rootCmd.PersistentFlags().String("leveldb-url", "", "url to primary leveldb-ethdb-rpc server")
|
||||
|
||||
rootCmd.PersistentFlags().Bool("prerun", false, "turn on prerun of toml configured ranges")
|
||||
rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently")
|
||||
rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing")
|
||||
rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers")
|
||||
@ -158,11 +156,6 @@ func init() {
|
||||
rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats")
|
||||
rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics")
|
||||
|
||||
rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards")
|
||||
rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range")
|
||||
rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range")
|
||||
rootCmd.PersistentFlags().Bool("prerun-intermediate-state-nodes", true, "include intermediate state nodes in state diff")
|
||||
rootCmd.PersistentFlags().Bool("prerun-intermediate-storage-nodes", true, "include intermediate storage nodes in state diff")
|
||||
rootCmd.PersistentFlags().Bool("prerun-include-block", true, "include block data in the statediff payload")
|
||||
rootCmd.PersistentFlags().Bool("prerun-include-receipts", true, "include receipts in the statediff payload")
|
||||
rootCmd.PersistentFlags().Bool("prerun-include-td", true, "include td in the statediff payload")
|
||||
@ -174,7 +167,6 @@ func init() {
|
||||
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
|
||||
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
|
||||
|
||||
viper.BindPFlag("statediff.prerun", rootCmd.PersistentFlags().Lookup("prerun"))
|
||||
viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers"))
|
||||
viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers"))
|
||||
viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size"))
|
||||
@ -216,11 +208,6 @@ func init() {
|
||||
viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats"))
|
||||
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
||||
|
||||
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only"))
|
||||
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
|
||||
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
|
||||
viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes"))
|
||||
viper.BindPFlag("prerun.params.intermediateStorageNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-storage-nodes"))
|
||||
viper.BindPFlag("prerun.params.includeBlock", rootCmd.PersistentFlags().Lookup("prerun-include-block"))
|
||||
viper.BindPFlag("prerun.params.includeReceipts", rootCmd.PersistentFlags().Lookup("prerun-include-receipts"))
|
||||
viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td"))
|
||||
|
91
cmd/run.go
Normal file
91
cmd/run.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright © 2023 Vulcanize, Inc
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
|
||||
statediff "github.com/cerc-io/plugeth-statediff"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
pkg "github.com/vulcanize/eth-statediff-service/pkg"
|
||||
)
|
||||
|
||||
// serveCmd represents the serve command
|
||||
var runCmd = &cobra.Command{
|
||||
Use: "run",
|
||||
Short: "Produce diffs for a specific block range",
|
||||
Long: `Usage
|
||||
|
||||
./eth-statediff-service run --config={path to toml config file}`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
subCommand = cmd.CalledAs()
|
||||
logWithCommand = *logrus.WithField("SubCommand", subCommand)
|
||||
runRanges()
|
||||
},
|
||||
}
|
||||
|
||||
func runRanges() {
|
||||
service := createStateDiffService()
|
||||
|
||||
// start service and servers
|
||||
var wg sync.WaitGroup
|
||||
if err := service.Loop(&wg); err != nil {
|
||||
logWithCommand.Fatalf("unable to start statediff service: %v", err)
|
||||
}
|
||||
ranges := getConfiguredRanges()
|
||||
service.Run(ranges)
|
||||
|
||||
// clean shutdown
|
||||
shutdown := make(chan os.Signal)
|
||||
signal.Notify(shutdown, os.Interrupt)
|
||||
<-shutdown
|
||||
logWithCommand.Info("Received interrupt signal, shutting down")
|
||||
service.Stop()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func getConfiguredRanges() []pkg.RangeRequest {
|
||||
params := statediff.Params{
|
||||
IncludeBlock: viper.GetBool("run.params.includeBlock"),
|
||||
IncludeReceipts: viper.GetBool("run.params.includeReceipts"),
|
||||
IncludeTD: viper.GetBool("run.params.includeTD"),
|
||||
IncludeCode: viper.GetBool("run.params.includeCode"),
|
||||
}
|
||||
var addrStrs []string
|
||||
viper.UnmarshalKey("run.params.watchedAddresses", &addrStrs)
|
||||
addrs := make([]common.Address, len(addrStrs))
|
||||
for i, addrStr := range addrStrs {
|
||||
addrs[i] = common.HexToAddress(addrStr)
|
||||
}
|
||||
params.WatchedAddresses = addrs
|
||||
var rawRanges []blockRange
|
||||
viper.UnmarshalKey("run.ranges", &rawRanges)
|
||||
blockRanges := make([]pkg.RangeRequest, len(rawRanges))
|
||||
for i, rawRange := range rawRanges {
|
||||
blockRanges[i] = pkg.RangeRequest{
|
||||
Start: rawRange[0],
|
||||
Stop: rawRange[1],
|
||||
Params: params,
|
||||
}
|
||||
}
|
||||
return blockRanges
|
||||
}
|
@ -52,14 +52,6 @@ func serve() {
|
||||
|
||||
service := createStateDiffService()
|
||||
|
||||
// short circuit if we only want to perform prerun
|
||||
if viper.GetBool("prerun.only") {
|
||||
if err := service.Run(nil); err != nil {
|
||||
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// start service and servers
|
||||
logWithCommand.Info("Starting statediff service")
|
||||
var wg sync.WaitGroup
|
||||
|
63
cmd/util.go
63
cmd/util.go
@ -85,69 +85,6 @@ func createStateDiffService() *sd.Service {
|
||||
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
|
||||
TrieWorkers: viper.GetUint("statediff.trieWorkers"),
|
||||
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
|
||||
PreRuns: setupPreRunRanges(),
|
||||
}
|
||||
return sd.NewStateDiffService(lvlDBReader, indexer, sdConf)
|
||||
}
|
||||
|
||||
func setupPreRunRanges() []sd.RangeRequest {
|
||||
if !viper.GetBool("statediff.prerun") {
|
||||
return nil
|
||||
}
|
||||
preRunParams := gethsd.Params{
|
||||
IncludeBlock: viper.GetBool("prerun.params.includeBlock"),
|
||||
IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"),
|
||||
IncludeTD: viper.GetBool("prerun.params.includeTD"),
|
||||
IncludeCode: viper.GetBool("prerun.params.includeCode"),
|
||||
}
|
||||
var addrStrs []string
|
||||
viper.UnmarshalKey("prerun.params.watchedAddresses", &addrStrs)
|
||||
addrs := make([]common.Address, len(addrStrs))
|
||||
for i, addrStr := range addrStrs {
|
||||
addrs[i] = common.HexToAddress(addrStr)
|
||||
}
|
||||
preRunParams.WatchedAddresses = addrs
|
||||
var rawRanges []blockRange
|
||||
viper.UnmarshalKey("prerun.ranges", &rawRanges)
|
||||
blockRanges := make([]sd.RangeRequest, len(rawRanges))
|
||||
for i, rawRange := range rawRanges {
|
||||
blockRanges[i] = sd.RangeRequest{
|
||||
Start: rawRange[0],
|
||||
Stop: rawRange[1],
|
||||
Params: preRunParams,
|
||||
}
|
||||
}
|
||||
if viper.IsSet("prerun.start") && viper.IsSet("prerun.stop") {
|
||||
hardStart := viper.GetInt("prerun.start")
|
||||
hardStop := viper.GetInt("prerun.stop")
|
||||
blockRanges = append(blockRanges, sd.RangeRequest{
|
||||
Start: uint64(hardStart),
|
||||
Stop: uint64(hardStop),
|
||||
Params: preRunParams,
|
||||
})
|
||||
}
|
||||
|
||||
return blockRanges
|
||||
}
|
||||
|
||||
// LoadConfig loads chain config from json file
|
||||
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
|
||||
file, err := os.Open(chainConfigPath)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Failed to read chain config file: %v", err))
|
||||
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
chainConfig := new(params.ChainConfig)
|
||||
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
|
||||
log.Error(fmt.Sprintf("invalid chain config file: %v", err))
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info(fmt.Sprintf("Using chain config from %s file. Content %+v", chainConfigPath, chainConfig))
|
||||
|
||||
return chainConfig, nil
|
||||
}
|
||||
|
@ -5,5 +5,4 @@ type ServiceConfig struct {
|
||||
ServiceWorkers uint
|
||||
TrieWorkers uint
|
||||
WorkerQueueSize uint
|
||||
PreRuns []RangeRequest
|
||||
}
|
||||
|
@ -52,8 +52,6 @@ type Service struct {
|
||||
queue chan RangeRequest
|
||||
// number of ranges we can work over concurrently
|
||||
workers uint
|
||||
// ranges configured locally
|
||||
preruns []RangeRequest
|
||||
}
|
||||
|
||||
// NewStateDiffService creates a new Service
|
||||
@ -71,7 +69,6 @@ func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer
|
||||
indexer: indexer,
|
||||
workers: conf.ServiceWorkers,
|
||||
queue: make(chan RangeRequest, conf.WorkerQueueSize),
|
||||
preruns: conf.PreRuns,
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,17 +91,7 @@ func (sds *Service) APIs() []rpc.API {
|
||||
|
||||
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
||||
func (sds *Service) Run(rngs []RangeRequest) error {
|
||||
for _, preRun := range sds.preruns {
|
||||
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||
for i := preRun.Start; i <= preRun.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
sds.preruns = nil
|
||||
for _, rng := range rngs {
|
||||
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
|
||||
for i := rng.Start; i <= rng.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
||||
@ -152,12 +139,6 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
for _, preRun := range sds.preruns {
|
||||
if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil {
|
||||
close(sds.quitChan)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user