Merge pull request #46 from vulcanize/prerun
enable configuration of a prerun range by env variable; prerun only mode
This commit is contained in:
commit
a3857bffe4
72
cmd/client.go
Normal file
72
cmd/client.go
Normal file
@ -0,0 +1,72 @@
|
||||
// Copyright © 2019 Vulcanize, Inc
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// clientCmd represents the serve command
|
||||
var clientCmd = &cobra.Command{
|
||||
Use: "client",
|
||||
Short: "Client for queuing range requests",
|
||||
Long: `Usage
|
||||
|
||||
./eth-statediff-service client --config={path to toml config file}`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
subCommand = cmd.CalledAs()
|
||||
logWithCommand = *logrus.WithField("SubCommand", subCommand)
|
||||
client()
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(clientCmd)
|
||||
}
|
||||
|
||||
func client() {
|
||||
logWithCommand.Info("Running eth-statediff-service client command")
|
||||
|
||||
statediffService, err := createStateDiffService()
|
||||
if err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
// start service and clientrs
|
||||
logWithCommand.Info("Starting statediff service")
|
||||
wg := new(sync.WaitGroup)
|
||||
if err := statediffService.Loop(wg); err != nil {
|
||||
logWithCommand.Fatalf("unable to start statediff service: %v", err)
|
||||
}
|
||||
logWithCommand.Info("Starting RPC clientrs")
|
||||
if err := startServers(statediffService); err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
logWithCommand.Info("RPC clientrs successfully spun up; awaiting requests")
|
||||
|
||||
// clean shutdown
|
||||
shutdown := make(chan os.Signal)
|
||||
signal.Notify(shutdown, os.Interrupt)
|
||||
<-shutdown
|
||||
logWithCommand.Info("Received interrupt signal, shutting down")
|
||||
statediffService.Stop()
|
||||
wg.Wait()
|
||||
}
|
17
cmd/env.go
17
cmd/env.go
@ -33,6 +33,8 @@ const (
|
||||
TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB"
|
||||
LVLDB_PATH = "LVLDB_PATH"
|
||||
LVLDB_ANCIENT = "LVLDB_ANCIENT"
|
||||
|
||||
STATEDIFF_PRERUN = "STATEDIFF_PRERUN"
|
||||
STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS"
|
||||
STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS"
|
||||
STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE"
|
||||
@ -45,6 +47,13 @@ const (
|
||||
PROM_HTTP_ADDR = "PROM_HTTP_ADDR"
|
||||
PROM_HTTP_PORT = "PROM_HTTP_PORT"
|
||||
PROM_DB_STATS = "PROM_DB_STATS"
|
||||
|
||||
PRERUN_ONLY = "PRERUN_ONLY"
|
||||
PRERUN_RANGE_START = "PRERUN_RANGE_START"
|
||||
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
|
||||
|
||||
LOG_LEVEL = "LOG_LEVEL"
|
||||
LOG_FILE_PATH = "LOG_FILE_PATH"
|
||||
)
|
||||
|
||||
// Bind env vars for eth node and DB configuration
|
||||
@ -82,4 +91,12 @@ func init() {
|
||||
viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS)
|
||||
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
|
||||
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
|
||||
|
||||
viper.BindEnv("statediff.prerun", STATEDIFF_PRERUN)
|
||||
viper.BindEnv("prerun.only", PRERUN_ONLY)
|
||||
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
|
||||
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
|
||||
|
||||
viper.BindEnv("log.level", LOG_LEVEL)
|
||||
viper.BindEnv("log.file", LOG_FILE_PATH)
|
||||
}
|
||||
|
@ -84,7 +84,6 @@ func initFuncs(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
|
||||
func logLevel() error {
|
||||
viper.BindEnv("log.level", "LOGRUS_LEVEL")
|
||||
lvl, err := log.ParseLevel(viper.GetString("log.level"))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -139,6 +138,10 @@ func init() {
|
||||
rootCmd.PersistentFlags().Bool("prom-db-stats", false, "enables prometheus db stats")
|
||||
rootCmd.PersistentFlags().Bool("prom-metrics", false, "enable prometheus metrics")
|
||||
|
||||
rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards")
|
||||
rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range")
|
||||
rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range")
|
||||
|
||||
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
|
||||
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
|
||||
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
|
||||
@ -166,6 +169,9 @@ func init() {
|
||||
viper.BindPFlag("prom.httpPort", rootCmd.PersistentFlags().Lookup("prom-http-port"))
|
||||
viper.BindPFlag("prom.dbStats", rootCmd.PersistentFlags().Lookup("prom-db-stats"))
|
||||
viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("prom-metrics"))
|
||||
viper.BindPFlag("prerun.only", rootCmd.PersistentFlags().Lookup("prerun-only"))
|
||||
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
|
||||
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
|
||||
}
|
||||
|
||||
func initConfig() {
|
||||
|
@ -55,6 +55,14 @@ func serve() {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
// short circuit if we only want to perform prerun
|
||||
if viper.GetBool("prerun.only") {
|
||||
if err := statediffService.Run(nil); err != nil {
|
||||
logWithCommand.Fatal("unable to perform prerun: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// start service and servers
|
||||
logWithCommand.Info("Starting statediff service")
|
||||
wg := new(sync.WaitGroup)
|
||||
|
10
cmd/util.go
10
cmd/util.go
@ -122,6 +122,16 @@ func setupPreRunRanges() []sd.RangeRequest {
|
||||
Params: preRunParams,
|
||||
}
|
||||
}
|
||||
if viper.IsSet("prerun.start") && viper.IsSet("prerun.stop") {
|
||||
hardStart := viper.GetInt("prerun.start")
|
||||
hardStop := viper.GetInt("prerun.stop")
|
||||
blockRanges = append(blockRanges, sd.RangeRequest{
|
||||
Start: uint64(hardStart),
|
||||
Stop: uint64(hardStop),
|
||||
Params: preRunParams,
|
||||
})
|
||||
}
|
||||
|
||||
return blockRanges
|
||||
}
|
||||
|
||||
|
51
environments/config.toml
Normal file
51
environments/config.toml
Normal file
@ -0,0 +1,51 @@
|
||||
[leveldb]
|
||||
path = "/app/geth-rw/chaindata"
|
||||
ancient = "/app/geth-rw/chaindata/ancient"
|
||||
|
||||
[server]
|
||||
ipcPath = ""
|
||||
httpPath = "0.0.0.0:8545"
|
||||
|
||||
[statediff]
|
||||
prerun = true
|
||||
serviceWorkers = 1
|
||||
workerQueueSize = 1024
|
||||
trieWorkers = 16
|
||||
|
||||
[prerun]
|
||||
only = true
|
||||
ranges = []
|
||||
[prerun.params]
|
||||
intermediateStateNodes = true
|
||||
intermediateStorageNodes = true
|
||||
includeBlock = true
|
||||
includeReceipts = true
|
||||
includeTD = true
|
||||
includeCode = true
|
||||
watchedAddresses = []
|
||||
watchedStorageKeys = []
|
||||
|
||||
[log]
|
||||
file = ""
|
||||
level = "info"
|
||||
|
||||
[eth]
|
||||
chainID = 1
|
||||
|
||||
[database]
|
||||
name = ""
|
||||
hostname = ""
|
||||
port = 5432
|
||||
user = ""
|
||||
password = ""
|
||||
|
||||
[cache]
|
||||
database = 1024
|
||||
trie = 4096
|
||||
|
||||
[prom]
|
||||
dbStats = false
|
||||
metrics = true
|
||||
http = true
|
||||
httpAddr = "0.0.0.0"
|
||||
httpPort = 9100
|
@ -13,6 +13,7 @@
|
||||
trieWorkers = 4
|
||||
|
||||
[prerun]
|
||||
only = false
|
||||
ranges = [
|
||||
[0, 1000]
|
||||
]
|
||||
|
@ -47,6 +47,8 @@ type StateDiffService interface {
|
||||
Protocols() []p2p.Protocol
|
||||
// Loop is the main event loop for processing state diffs
|
||||
Loop(wg *sync.WaitGroup) error
|
||||
// Run is a one-off command to run on a predefined set of ranges
|
||||
Run(ranges []RangeRequest) error
|
||||
// StateDiffAt method to get state diff object at specific block
|
||||
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
|
||||
// StateDiffFor method to get state diff object at specific block
|
||||
@ -115,11 +117,34 @@ func (sds *Service) APIs() []rpc.API {
|
||||
}
|
||||
}
|
||||
|
||||
// Run does a one-off processing run on the provided RangeRequests + any pre-runs, exiting afterwards
|
||||
func (sds *Service) Run(rngs []RangeRequest) error {
|
||||
for _, preRun := range sds.preruns {
|
||||
logrus.Infof("processing prerun range (%d, %d)", preRun.Start, preRun.Stop)
|
||||
for i := preRun.Start; i <= preRun.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, preRun.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, preRun.Start, preRun.Stop, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
sds.preruns = nil
|
||||
for _, rng := range rngs {
|
||||
logrus.Infof("processing prerun range (%d, %d)", rng.Start, rng.Stop)
|
||||
for i := rng.Start; i <= rng.Stop; i++ {
|
||||
if err := sds.WriteStateDiffAt(i, rng.Params); err != nil {
|
||||
return fmt.Errorf("error writing statediff at height %d in range (%d, %d) : %v", i, rng.Start, rng.Stop, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Loop is an empty service loop for awaiting rpc requests
|
||||
func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
||||
if sds.quitChan != nil {
|
||||
return fmt.Errorf("service loop is already running")
|
||||
}
|
||||
|
||||
sds.quitChan = make(chan struct{})
|
||||
for i := 0; i < int(sds.workers); i++ {
|
||||
wg.Add(1)
|
||||
@ -128,6 +153,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
|
||||
for {
|
||||
select {
|
||||
case blockRange := <-sds.queue:
|
||||
logrus.Infof("service worker %d received range (%d, %d) off of work queue, beginning processing", id, blockRange.Start, blockRange.Stop)
|
||||
prom.DecQueuedRanges()
|
||||
for j := blockRange.Start; j <= blockRange.Stop; j++ {
|
||||
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user