diff --git a/cmd/env.go b/cmd/env.go index 934d318..1525602 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -29,12 +29,16 @@ const ( ETH_NETWORK_ID = "ETH_NETWORK_ID" ETH_CHAIN_ID = "ETH_CHAIN_ID" - DB_CACHE_SIZE_MB = "DB_CACHE_SIZE_MB" - TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB" - LVLDB_PATH = "LVLDB_PATH" - LVLDB_ANCIENT = "LVLDB_ANCIENT" - STATEDIFF_WORKERS = "STATEDIFF_WORKERS" - WRITE_SERVER = "WRITE_SERVER" + DB_CACHE_SIZE_MB = "DB_CACHE_SIZE_MB" + TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB" + LVLDB_PATH = "LVLDB_PATH" + LVLDB_ANCIENT = "LVLDB_ANCIENT" + STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS" + STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS" + STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE" + + SERVICE_IPC_PATH = "SERVICE_IPC_PATH" + SERVICE_HTTP_PATH = "SERVICE_HTTP_PATH" PROM_METRICS = "PROM_METRICS" PROM_HTTP = "PROM_HTTP" @@ -44,6 +48,9 @@ const ( // Bind env vars for eth node and DB configuration func init() { + viper.BindEnv("server.ipcPath", SERVICE_IPC_PATH) + viper.BindEnv("server.httpPath", SERVICE_HTTP_PATH) + viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) @@ -70,5 +77,7 @@ func init() { viper.BindEnv("prom.httpAddr", PROM_HTTP_ADDR) viper.BindEnv("prom.httpPort", PROM_HTTP_PORT) - viper.BindEnv("statediff.workers", STATEDIFF_WORKERS) + viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS) + viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS) + viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE) } diff --git a/cmd/root.go b/cmd/root.go index 34e322a..56739a8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -68,6 +68,7 @@ func initFuncs(cmd *cobra.Command, args []string) { } if viper.GetBool("prom.metrics") { + log.Info("initializing prometheus metrics") prom.Init() } @@ -77,6 +78,7 @@ func initFuncs(cmd *cobra.Command, args []string) { viper.GetString("prom.httpAddr"), viper.GetString("prom.httpPort"), ) + log.Info("starting prometheus server") prom.Listen(addr) } } @@ -100,6 +102,8 @@ func init() { viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.AutomaticEnv() + rootCmd.PersistentFlags().String("http-path", "", "vdb server http path") + rootCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path") rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") rootCmd.PersistentFlags().String("log-file", "", "file path for logging") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), @@ -107,7 +111,11 @@ func init() { rootCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") - rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use") + + rootCmd.PersistentFlags().Bool("prerun", false, "turn on prerun of toml configured ranges") + rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently") + rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing") + rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers") rootCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name") rootCmd.PersistentFlags().Int("database-port", 5432, "database port") @@ -131,9 +139,14 @@ func init() { rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics") + viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path")) + viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path")) viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) - viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers")) + viper.BindPFlag("statediff.prerun", rootCmd.PersistentFlags().Lookup("prerun")) + viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers")) + viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers")) + viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size")) viper.BindPFlag("leveldb.path", rootCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.ancient", rootCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) diff --git a/cmd/serve.go b/cmd/serve.go index c096f1e..7c69de5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -16,13 +16,11 @@ package cmd import ( - "fmt" "os" "os/signal" "sync" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -34,7 +32,7 @@ import ( // serveCmd represents the serve command var serveCmd = &cobra.Command{ Use: "serve", - Short: "Standup a standalone statediffing RPC service on top of leveldb", + Short: "Stand up a standalone statediffing RPC service on top of leveldb", Long: `Usage ./eth-statediff-service serve --config={path to toml config file}`, @@ -45,6 +43,10 @@ var serveCmd = &cobra.Command{ }, } +func init() { + rootCmd.AddCommand(serveCmd) +} + func serve() { logWithCommand.Info("Running eth-statediff-service serve command") @@ -56,7 +58,9 @@ func serve() { // start service and servers logWithCommand.Info("Starting statediff service") wg := new(sync.WaitGroup) - go statediffService.Loop(wg) + if err := statediffService.Loop(wg); err != nil { + logWithCommand.Fatalf("unable to start statediff service: %v", err) + } logWithCommand.Info("Starting RPC servers") if err := startServers(statediffService); err != nil { logWithCommand.Fatal(err) @@ -72,19 +76,7 @@ func serve() { wg.Wait() } -func init() { - rootCmd.AddCommand(serveCmd) - - serveCmd.PersistentFlags().String("http-path", "", "vdb server http path") - serveCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path") - - viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("http-path")) - viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("ipc-path")) -} - -func startServers(serv sd.IService) error { - viper.BindEnv("server.ipcPath", "SERVER_IPC_PATH") - viper.BindEnv("server.httpPath", "SERVER_HTTP_PATH") +func startServers(serv sd.StateDiffService) error { ipcPath := viper.GetString("server.ipcPath") httpPath := viper.GetString("server.httpPath") if ipcPath == "" && httpPath == "" { @@ -106,18 +98,3 @@ func startServers(serv sd.IService) error { } return nil } - -func chainConfig(chainID uint64) (*params.ChainConfig, error) { - switch chainID { - case 1: - return params.MainnetChainConfig, nil - case 3: - return params.RopstenChainConfig, nil // Ropsten - case 4: - return params.RinkebyChainConfig, nil - case 5: - return params.GoerliChainConfig, nil - default: - return nil, fmt.Errorf("chain config for chainid %d not available", chainID) - } -} diff --git a/cmd/util.go b/cmd/util.go index c3e9d4d..b55e5cd 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -1,6 +1,11 @@ package cmd import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" + gethsd "github.com/ethereum/go-ethereum/statediff" ind "github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" @@ -11,7 +16,10 @@ import ( "github.com/vulcanize/eth-statediff-service/pkg/prom" ) -func createStateDiffService() (sd.IService, error) { +type blockRange [2]uint64 + +func createStateDiffService() (sd.StateDiffService, error) { + // load some necessary params logWithCommand.Info("Loading statediff service parameters") path := viper.GetString("leveldb.path") ancientPath := viper.GetString("leveldb.ancient") @@ -20,25 +28,25 @@ func createStateDiffService() (sd.IService, error) { } nodeInfo := GetEthNodeInfo() - config, err := chainConfig(nodeInfo.ChainID) + chainConf, err := chainConfig(nodeInfo.ChainID) if err != nil { logWithCommand.Fatal(err) } // create leveldb reader logWithCommand.Info("Creating leveldb reader") - conf := sd.ReaderConfig{ + readerConf := sd.LvLDBReaderConfig{ TrieConfig: &trie.Config{ Cache: viper.GetInt("cache.trie"), Journal: "", Preimages: false, }, - ChainConfig: config, + ChainConfig: chainConf, Path: path, AncientPath: ancientPath, DBCacheSize: viper.GetInt("cache.database"), } - lvlDBReader, err := sd.NewLvlDBReader(conf) + lvlDBReader, err := sd.NewLvlDBReader(readerConf) if err != nil { logWithCommand.Fatal(err) } @@ -49,11 +57,17 @@ func createStateDiffService() (sd.IService, error) { if err != nil { logWithCommand.Fatal(err) } - indexer, err := ind.NewStateDiffIndexer(config, db) + indexer, err := ind.NewStateDiffIndexer(chainConf, db) if err != nil { logWithCommand.Fatal(err) } - return sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers")) + sdConf := sd.Config{ + ServiceWorkers: viper.GetUint("statediff.serviceWorkers"), + TrieWorkers: viper.GetUint("statediff.trieWorkers"), + WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"), + PreRuns: setupPreRunRanges(), + } + return sd.NewStateDiffService(lvlDBReader, indexer, sdConf) } func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) { @@ -65,3 +79,57 @@ func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) { prom.RegisterDBCollector(params.Name, db.DB) return db, nil } + +func setupPreRunRanges() []sd.RangeRequest { + if !viper.GetBool("statediff.prerun") { + return nil + } + preRunParams := gethsd.Params{ + IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"), + IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"), + IncludeBlock: viper.GetBool("prerun.params.includeBlock"), + IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"), + IncludeTD: viper.GetBool("prerun.params.includeTD"), + IncludeCode: viper.GetBool("prerun.params.includeCode"), + } + var addrStrs []string + viper.UnmarshalKey("prerun.params.watchedAddresses", &addrStrs) + addrs := make([]common.Address, len(addrStrs)) + for i, addrStr := range addrStrs { + addrs[i] = common.HexToAddress(addrStr) + } + preRunParams.WatchedAddresses = addrs + var storageKeyStrs []string + viper.UnmarshalKey("prerun.params.watchedStorageKeys", &storageKeyStrs) + keys := make([]common.Hash, len(storageKeyStrs)) + for i, keyStr := range storageKeyStrs { + keys[i] = common.HexToHash(keyStr) + } + preRunParams.WatchedStorageSlots = keys + var rawRanges []blockRange + viper.UnmarshalKey("prerun.ranges", &rawRanges) + blockRanges := make([]sd.RangeRequest, len(rawRanges)) + for i, rawRange := range rawRanges { + blockRanges[i] = sd.RangeRequest{ + Start: rawRange[0], + Stop: rawRange[1], + Params: preRunParams, + } + } + return blockRanges +} + +func chainConfig(chainID uint64) (*params.ChainConfig, error) { + switch chainID { + case 1: + return params.MainnetChainConfig, nil + case 3: + return params.RopstenChainConfig, nil // Ropsten + case 4: + return params.RinkebyChainConfig, nil + case 5: + return params.GoerliChainConfig, nil + default: + return nil, fmt.Errorf("chain config for chainid %d not available", chainID) + } +} diff --git a/cmd/write.go b/cmd/write.go deleted file mode 100644 index 47bee0c..0000000 --- a/cmd/write.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "fmt" - "net/http" - "strconv" - "sync" - "time" - - gethsd "github.com/ethereum/go-ethereum/statediff" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var writeCmd = &cobra.Command{ - Use: "write", - Short: "Write statediffs directly to DB for preconfigured block ranges", - Long: `Usage - -./eth-statediff-service write --config={path to toml config file}`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *logrus.WithField("SubCommand", subCommand) - write() - }, -} - -type blockRange [2]uint64 - -func init() { - rootCmd.AddCommand(writeCmd) - writeCmd.PersistentFlags().String("write-api", "", "starts a server which handles write request through endpoints") - viper.BindPFlag("write.serve", writeCmd.PersistentFlags().Lookup("write-api")) -} - -func write() { - logWithCommand.Info("Running eth-statediff-service write command") - viper.BindEnv("write.serve", WRITE_SERVER) - addr := viper.GetString("write.serve") - - statediffService, err := createStateDiffService() - if err != nil { - logWithCommand.Fatal(err) - } - - // Read all defined block ranges, write statediffs to database - var blockRanges []blockRange - diffParams := gethsd.Params{ // todo: configurable? - IntermediateStateNodes: true, - IntermediateStorageNodes: true, - IncludeBlock: true, - IncludeReceipts: true, - IncludeTD: true, - IncludeCode: true, - } - viper.UnmarshalKey("write.ranges", &blockRanges) - viper.UnmarshalKey("write.params", &diffParams) - - blockRangesCh := make(chan blockRange, 100) - go func() { - for _, r := range blockRanges { - blockRangesCh <- r - } - if addr == "" { - close(blockRangesCh) - return - } - startServer(addr, blockRangesCh) - }() - - processRanges(statediffService, diffParams, blockRangesCh) -} - -func startServer(addr string, blockRangesCh chan<- blockRange) { - handler := func(w http.ResponseWriter, req *http.Request) { - start, err := strconv.Atoi(req.URL.Query().Get("start")) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse start value: %v", err), http.StatusInternalServerError) - return - } - - end, err := strconv.Atoi(req.URL.Query().Get("end")) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse end value: %v", err), http.StatusInternalServerError) - return - } - - select { - case blockRangesCh <- blockRange{uint64(start), uint64(end)}: - case <-time.After(time.Millisecond * 200): - http.Error(w, "server is busy", http.StatusInternalServerError) - return - } - - fmt.Fprintf(w, "added block range to the queue\n") - } - - http.HandleFunc("/writeDiff", handler) - logrus.Fatal(http.ListenAndServe(addr, nil)) -} - -type diffService interface { - WriteStateDiffAt(blockNumber uint64, params gethsd.Params) error -} - -func processRanges(sds diffService, param gethsd.Params, blockRangesCh chan blockRange) { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for rng := range blockRangesCh { - if rng[1] < rng[0] { - logWithCommand.Fatal("range ending block number needs to be greater than starting block number") - } - logrus.Infof("Writing statediffs from block %d to %d", rng[0], rng[1]) - for height := rng[0]; height <= rng[1]; height++ { - err := sds.WriteStateDiffAt(height, param) - if err != nil { - logrus.Errorf("failed to write state diff for range: %v %v", rng, err) - } - } - } - }() - - wg.Wait() -} diff --git a/cmd/write_test.go b/cmd/write_test.go deleted file mode 100644 index bcbfadc..0000000 --- a/cmd/write_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package cmd - -import ( - "fmt" - "net/http" - "testing" - "time" - - gethsd "github.com/ethereum/go-ethereum/statediff" - "github.com/stretchr/testify/require" -) - -type mockService struct { - reqCount int -} - -func (ms *mockService) WriteStateDiffAt(_ uint64, _ gethsd.Params) error { - ms.reqCount++ - return nil -} - -func TestProcessRanges(t *testing.T) { - blockRangesCh := make(chan blockRange) - srv := new(mockService) - - go func() { - blockRangesCh <- blockRange{uint64(1), uint64(5)} - blockRangesCh <- blockRange{uint64(8), uint64(10)} - blockRangesCh <- blockRange{uint64(6), uint64(7)} - blockRangesCh <- blockRange{uint64(50), uint64(100)} - blockRangesCh <- blockRange{uint64(5), uint64(8)} - close(blockRangesCh) - }() - - processRanges(srv, gethsd.Params{}, blockRangesCh) - require.Equal(t, 65, srv.reqCount) -} - -func TestHttpEndpoint(t *testing.T) { - addr := ":11111" - queueSize := 5 - blockRangesCh := make(chan blockRange, queueSize) - srv := new(mockService) - - go startServer(addr, blockRangesCh) - - go func() { - br := []blockRange{ - {uint64(1), uint64(5)}, - {uint64(8), uint64(10)}, - {uint64(6), uint64(7)}, - {uint64(50), uint64(100)}, - {uint64(5), uint64(8)}, - // Below request should fail since server has queue size of 5 - {uint64(52), uint64(328)}, - {uint64(35), uint64(428)}, - {uint64(45), uint64(844)}, - } - - for idx, r := range br { - res, err := http.Get(fmt.Sprintf("http://localhost:11111/writeDiff?start=%d&end=%d", r[0], r[1])) - require.NoError(t, err) - require.NotNil(t, res) - if idx < queueSize { - require.Equal(t, res.StatusCode, 200) - } else { - require.Equal(t, res.StatusCode, 500) - } - require.NoError(t, res.Body.Close()) - } - processRanges(srv, gethsd.Params{}, blockRangesCh) - }() - - time.Sleep(1 * time.Second) - require.Equal(t, 65, srv.reqCount) -} diff --git a/environments/example.toml b/environments/example.toml index 995cdd5..0b1caa5 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -7,13 +7,24 @@ httpPath = "127.0.0.1:8545" [statediff] - workers = 4 + prerun = true + serviceWorkers = 1 + workerQueueSize = 1024 + trieWorkers = 4 -[write] +[prerun] ranges = [ - [0, 0] + [0, 1000] ] - serve = ":8888" + [prerun.params] + intermediateStateNodes = true + intermediateStorageNodes = true + includeBlock = true + includeReceipts = true + includeTD = true + includeCode = true + watchedAddresses = [] + watchedStorageKeys = [] [log] file = "" @@ -36,5 +47,5 @@ [prom] metrics = true http = true - addr = "localhost" - port = "8889" + httpAddr = "localhost" + httpPort = "8889" diff --git a/pkg/api.go b/pkg/api.go index 9a0311c..2d011ef 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -30,11 +30,11 @@ const APIVersion = "0.0.1" // PublicStateDiffAPI provides an RPC interface // that can be used to fetch historical diffs from leveldb directly type PublicStateDiffAPI struct { - sds IService + sds StateDiffService } // NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service -func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { +func NewPublicStateDiffAPI(sds StateDiffService) *PublicStateDiffAPI { return &PublicStateDiffAPI{ sds: sds, } @@ -54,3 +54,8 @@ func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error { return api.sds.WriteStateDiffAt(blockNumber, params) } + +// WriteStateDiffsInRange writes the state diff objects for the provided block range, with the provided params +func (api *PublicStateDiffAPI) WriteStateDiffsInRange(ctx context.Context, start, stop uint64, params sd.Params) error { + return api.sds.WriteStateDiffsInRange(start, stop, params) +} diff --git a/pkg/builder.go b/pkg/builder.go index 5dbd2ae..3e52d19 100644 --- a/pkg/builder.go +++ b/pkg/builder.go @@ -35,6 +35,7 @@ import ( sdtrie "github.com/ethereum/go-ethereum/statediff/trie" sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" + "github.com/sirupsen/logrus" iter "github.com/vulcanize/go-eth-state-node-iterator" ) @@ -206,7 +207,7 @@ func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.Sta }, nil } -// Writes a statediff object to output callback +// WriteStateDiffObject writes a statediff object to output callback func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error { if len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes @@ -234,8 +235,8 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o var iterPairs [][]iterPair for i := uint(0); i < sdb.numWorkers; i++ { iterPairs = append(iterPairs, []iterPair{ - iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, - iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, + {older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, + {older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, }) } @@ -252,7 +253,9 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o wg.Add(1) go func(worker uint) { defer wg.Done() - sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender) + if err := sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender); err != nil { + logrus.Errorf("buildStateDiff error for worker %d, pparams %+v", worker, params) + } }(w) } wg.Wait() diff --git a/pkg/config.go b/pkg/config.go new file mode 100644 index 0000000..318ffc1 --- /dev/null +++ b/pkg/config.go @@ -0,0 +1,9 @@ +package statediff + +// Config holds config params for the statediffing service +type Config struct { + ServiceWorkers uint + TrieWorkers uint + WorkerQueueSize uint + PreRuns []RangeRequest +} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index 61f5a8a..5ad7199 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -29,6 +29,7 @@ const statsSubsystem = "stats" var ( metrics bool + queuedRanges prometheus.Gauge lastLoadedHeight prometheus.Gauge lastProcessedHeight prometheus.Gauge @@ -39,6 +40,7 @@ var ( ) const ( + RANGES_QUEUED = "ranges_queued" LOADED_HEIGHT = "loaded_height" PROCESSED_HEIGHT = "processed_height" T_BLOCK_LOAD = "t_block_load" @@ -51,6 +53,11 @@ const ( func Init() { metrics = true + queuedRanges = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: RANGES_QUEUED, + Help: "Number of range requests currently queued", + }) lastLoadedHeight = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: LOADED_HEIGHT, @@ -88,13 +95,27 @@ func Init() { }) } -// RegisterDBCollector create metric colletor for given connection +// RegisterDBCollector create metric collector for given connection func RegisterDBCollector(name string, db *sqlx.DB) { if metrics { prometheus.Register(NewDBStatsCollector(name, db)) } } +// IncQueuedRanges increments the number of queued range requests +func IncQueuedRanges() { + if metrics { + queuedRanges.Inc() + } +} + +// DecQueuedRanges decrements the number of queued range requests +func DecQueuedRanges() { + if metrics { + queuedRanges.Dec() + } +} + // SetLastLoadedHeight sets last loaded height func SetLastLoadedHeight(height int64) { if metrics { diff --git a/pkg/reader.go b/pkg/reader.go index bc88660..bacf7b6 100644 --- a/pkg/reader.go +++ b/pkg/reader.go @@ -28,23 +28,32 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -// LvlDBReader exposes the necessary read functions on lvldb +// Reader interface required by the statediffing service +type Reader interface { + GetBlockByHash(hash common.Hash) (*types.Block, error) + GetBlockByNumber(number uint64) (*types.Block, error) + GetReceiptsByHash(hash common.Hash) (types.Receipts, error) + GetTdByHash(hash common.Hash) (*big.Int, error) + StateDB() state.Database +} + +// LvlDBReader exposes the necessary Reader methods on lvldb type LvlDBReader struct { ethDB ethdb.Database stateDB state.Database chainConfig *params.ChainConfig } -// ReaderConfig struct for initializing a LvlDBReader -type ReaderConfig struct { +// LvLDBReaderConfig struct for initializing a LvlDBReader +type LvLDBReaderConfig struct { TrieConfig *trie.Config ChainConfig *params.ChainConfig Path, AncientPath string DBCacheSize int } -// NewLvlDBReader creates a new LvlDBReader -func NewLvlDBReader(conf ReaderConfig) (*LvlDBReader, error) { +// NewLvlDBReader creates a new Read using LevelDB +func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) { edb, err := rawdb.NewLevelDBDatabaseWithFreezer(conf.Path, conf.DBCacheSize, 256, conf.AncientPath, "eth-statediff-service", true) if err != nil { return nil, err diff --git a/pkg/service.go b/pkg/service.go index 360b936..b176a2f 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -23,7 +23,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -37,34 +36,29 @@ import ( ind "github.com/ethereum/go-ethereum/statediff/indexer" ) -// lvlDBReader are the db interfaces required by the statediffing service -type lvlDBReader interface { - GetBlockByHash(hash common.Hash) (*types.Block, error) - GetBlockByNumber(number uint64) (*types.Block, error) - GetReceiptsByHash(hash common.Hash) (types.Receipts, error) - GetTdByHash(hash common.Hash) (*big.Int, error) - StateDB() state.Database -} +const defaultQueueSize = 1024 -// IService is the state-diffing service interface -type IService interface { - // Start() and Stop() +// StateDiffService is the state-diffing service interface +type StateDiffService interface { + // Lifecycle Start() and Stop() node.Lifecycle - // For node service registration + // APIs and Protocols() interface for node service registration APIs() []rpc.API Protocols() []p2p.Protocol - // Main event loop for processing state diffs - Loop(wg *sync.WaitGroup) - // Method to get state diff object at specific block + // Loop is the main event loop for processing state diffs + Loop(wg *sync.WaitGroup) error + // StateDiffAt method to get state diff object at specific block StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) - // Method to get state diff object at specific block + // StateDiffFor method to get state diff object at specific block StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error) - // Method to get state trie object at specific block + // StateTrieAt method to get state trie object at specific block StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) - // Method to write state diff object directly to DB + // WriteStateDiffAt method to write state diff object directly to DB WriteStateDiffAt(blockNumber uint64, params sd.Params) error - // Method to get state trie object at specific block + // WriteStateDiffFor method to get state trie object at specific block WriteStateDiffFor(blockHash common.Hash, params sd.Params) error + // WriteStateDiffsInRange method to wrtie state diff objects within the range directly to the DB + WriteStateDiffsInRange(start, stop uint64, params sd.Params) error } // Service is the underlying struct for the state diffing service @@ -72,24 +66,35 @@ type Service struct { // Used to build the state diff objects Builder Builder // Used to read data from leveldb - lvlDBReader lvlDBReader + lvlDBReader Reader // Used to signal shutdown of the service - QuitChan chan bool + quitChan chan struct{} // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer + // range queue + queue chan RangeRequest + // number of ranges we can work over concurrently + workers uint + // ranges configured locally + preruns []RangeRequest } // NewStateDiffService creates a new Service -func NewStateDiffService(lvlDBReader lvlDBReader, indexer ind.Indexer, workers uint) (*Service, error) { - builder, err := NewBuilder(lvlDBReader.StateDB(), workers) +func NewStateDiffService(lvlDBReader Reader, indexer ind.Indexer, conf Config) (*Service, error) { + builder, err := NewBuilder(lvlDBReader.StateDB(), conf.TrieWorkers) if err != nil { return nil, err } + if conf.WorkerQueueSize == 0 { + conf.WorkerQueueSize = defaultQueueSize + } return &Service{ lvlDBReader: lvlDBReader, Builder: builder, - QuitChan: make(chan bool), indexer: indexer, + workers: conf.ServiceWorkers, + queue: make(chan RangeRequest, conf.WorkerQueueSize), + preruns: conf.PreRuns, }, nil } @@ -111,16 +116,48 @@ func (sds *Service) APIs() []rpc.API { } // Loop is an empty service loop for awaiting rpc requests -func (sds *Service) Loop(wg *sync.WaitGroup) { - wg.Add(1) - for { - select { - case <-sds.QuitChan: - logrus.Info("closing the statediff service loop") - wg.Done() - return +func (sds *Service) Loop(wg *sync.WaitGroup) error { + if sds.quitChan != nil { + return fmt.Errorf("service loop is already running") + } + sds.quitChan = make(chan struct{}) + for i := 0; i < int(sds.workers); i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for { + select { + case blockRange := <-sds.queue: + prom.DecQueuedRanges() + for j := blockRange.Start; j <= blockRange.Stop; j++ { + if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { + logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err) + } + select { + case <-sds.quitChan: + logrus.Infof("closing service worker %d\n"+ + "working in range (%d, %d)\n"+ + "last processed height: %d", id, blockRange.Start, blockRange.Stop, j) + return + default: + logrus.Infof("service worker %d finished processing statediff height %d in range (%d, %d)", id, j, blockRange.Start, blockRange.Stop) + } + } + logrus.Infof("service worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + case <-sds.quitChan: + logrus.Infof("closing the statediff service loop worker %d", id) + return + } + } + }(i) + } + for _, preRun := range sds.preruns { + if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil { + close(sds.quitChan) + return err } } + return nil } // StateDiffAt returns a state diff object payload at the specific blockheight @@ -237,14 +274,13 @@ func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd. // Start is used to begin the service func (sds *Service) Start() error { logrus.Info("starting statediff service") - go sds.Loop(new(sync.WaitGroup)) - return nil + return sds.Loop(new(sync.WaitGroup)) } // Stop is used to close down the service func (sds *Service) Stop() error { logrus.Info("stopping statediff service") - close(sds.QuitChan) + close(sds.quitChan) return nil } @@ -335,3 +371,19 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t)) return err } + +// WriteStateDiffsInRange adds a RangeRequest to the work queue +func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) error { + if stop < start { + return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop) + } + blocked := time.NewTimer(30 * time.Second) + select { + case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}: + prom.IncQueuedRanges() + logrus.Infof("added range (%d, %d) to the worker queue", start, stop) + return nil + case <-blocked.C: + return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop) + } +} diff --git a/pkg/types.go b/pkg/types.go index c81b3e4..3aad530 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -21,7 +21,8 @@ package statediff import ( "github.com/ethereum/go-ethereum/core/types" - sd "github.com/ethereum/go-ethereum/statediff/types" + sd "github.com/ethereum/go-ethereum/statediff" + sdTypes "github.com/ethereum/go-ethereum/statediff/types" ) // AccountMap is a mapping of hex encoded path => account wrapper @@ -30,8 +31,14 @@ type AccountMap map[string]accountWrapper // accountWrapper is used to temporary associate the unpacked node with its raw values type accountWrapper struct { Account *types.StateAccount - NodeType sd.NodeType + NodeType sdTypes.NodeType Path []byte NodeValue []byte LeafKey []byte } + +// RangeRequest holds range quest work params +type RangeRequest struct { + Start, Stop uint64 + Params sd.Params +}