unify service RPC api and commands

This commit is contained in:
i-norden 2021-10-25 14:06:05 -05:00
parent b49e30d8e9
commit aace7793ba
13 changed files with 250 additions and 317 deletions

View File

@ -33,8 +33,12 @@ const (
TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB" TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB"
LVLDB_PATH = "LVLDB_PATH" LVLDB_PATH = "LVLDB_PATH"
LVLDB_ANCIENT = "LVLDB_ANCIENT" LVLDB_ANCIENT = "LVLDB_ANCIENT"
STATEDIFF_WORKERS = "STATEDIFF_WORKERS" STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS"
WRITE_SERVER = "WRITE_SERVER" STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS"
STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE"
SERVICE_IPC_PATH = "SERVICE_IPC_PATH"
SERVICE_HTTP_PATH = "SERVICE_HTTP_PATH"
PROM_METRICS = "PROM_METRICS" PROM_METRICS = "PROM_METRICS"
PROM_HTTP = "PROM_HTTP" PROM_HTTP = "PROM_HTTP"
@ -44,6 +48,9 @@ const (
// Bind env vars for eth node and DB configuration // Bind env vars for eth node and DB configuration
func init() { func init() {
viper.BindEnv("server.ipcPath", SERVICE_IPC_PATH)
viper.BindEnv("server.httpPath", SERVICE_HTTP_PATH)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
@ -70,5 +77,7 @@ func init() {
viper.BindEnv("prom.httpAddr", PROM_HTTP_ADDR) viper.BindEnv("prom.httpAddr", PROM_HTTP_ADDR)
viper.BindEnv("prom.httpPort", PROM_HTTP_PORT) viper.BindEnv("prom.httpPort", PROM_HTTP_PORT)
viper.BindEnv("statediff.workers", STATEDIFF_WORKERS) viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS)
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
} }

View File

@ -68,6 +68,7 @@ func initFuncs(cmd *cobra.Command, args []string) {
} }
if viper.GetBool("prom.metrics") { if viper.GetBool("prom.metrics") {
log.Info("initializing prometheus metrics")
prom.Init() prom.Init()
} }
@ -77,6 +78,7 @@ func initFuncs(cmd *cobra.Command, args []string) {
viper.GetString("prom.httpAddr"), viper.GetString("prom.httpAddr"),
viper.GetString("prom.httpPort"), viper.GetString("prom.httpPort"),
) )
log.Info("starting prometheus server")
prom.Listen(addr) prom.Listen(addr)
} }
} }
@ -100,6 +102,8 @@ func init() {
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv() viper.AutomaticEnv()
rootCmd.PersistentFlags().String("http-path", "", "vdb server http path")
rootCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location")
rootCmd.PersistentFlags().String("log-file", "", "file path for logging") rootCmd.PersistentFlags().String("log-file", "", "file path for logging")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(),
@ -107,7 +111,11 @@ func init() {
rootCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") rootCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore")
rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use")
rootCmd.PersistentFlags().Bool("prerun", false, "turn on prerun of toml configured ranges")
rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently")
rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing")
rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers")
rootCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name") rootCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name")
rootCmd.PersistentFlags().Int("database-port", 5432, "database port") rootCmd.PersistentFlags().Int("database-port", 5432, "database port")
@ -131,9 +139,14 @@ func init() {
rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics") rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics")
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers")) viper.BindPFlag("statediff.prerun", rootCmd.PersistentFlags().Lookup("prerun"))
viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers"))
viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers"))
viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size"))
viper.BindPFlag("leveldb.path", rootCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.path", rootCmd.PersistentFlags().Lookup("leveldb-path"))
viper.BindPFlag("leveldb.ancient", rootCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("leveldb.ancient", rootCmd.PersistentFlags().Lookup("ancient-path"))
viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name"))

View File

@ -16,13 +16,11 @@
package cmd package cmd
import ( import (
"fmt"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -34,7 +32,7 @@ import (
// serveCmd represents the serve command // serveCmd represents the serve command
var serveCmd = &cobra.Command{ var serveCmd = &cobra.Command{
Use: "serve", Use: "serve",
Short: "Standup a standalone statediffing RPC service on top of leveldb", Short: "Stand up a standalone statediffing RPC service on top of leveldb",
Long: `Usage Long: `Usage
./eth-statediff-service serve --config={path to toml config file}`, ./eth-statediff-service serve --config={path to toml config file}`,
@ -45,6 +43,10 @@ var serveCmd = &cobra.Command{
}, },
} }
func init() {
rootCmd.AddCommand(serveCmd)
}
func serve() { func serve() {
logWithCommand.Info("Running eth-statediff-service serve command") logWithCommand.Info("Running eth-statediff-service serve command")
@ -72,19 +74,7 @@ func serve() {
wg.Wait() wg.Wait()
} }
func init() { func startServers(serv sd.StateDiffService) error {
rootCmd.AddCommand(serveCmd)
serveCmd.PersistentFlags().String("http-path", "", "vdb server http path")
serveCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path")
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("http-path"))
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("ipc-path"))
}
func startServers(serv sd.IService) error {
viper.BindEnv("server.ipcPath", "SERVER_IPC_PATH")
viper.BindEnv("server.httpPath", "SERVER_HTTP_PATH")
ipcPath := viper.GetString("server.ipcPath") ipcPath := viper.GetString("server.ipcPath")
httpPath := viper.GetString("server.httpPath") httpPath := viper.GetString("server.httpPath")
if ipcPath == "" && httpPath == "" { if ipcPath == "" && httpPath == "" {
@ -106,18 +96,3 @@ func startServers(serv sd.IService) error {
} }
return nil return nil
} }
func chainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 3:
return params.RopstenChainConfig, nil // Ropsten
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}

View File

@ -1,6 +1,11 @@
package cmd package cmd
import ( import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
gethsd "github.com/ethereum/go-ethereum/statediff"
ind "github.com/ethereum/go-ethereum/statediff/indexer" ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
@ -11,7 +16,10 @@ import (
"github.com/vulcanize/eth-statediff-service/pkg/prom" "github.com/vulcanize/eth-statediff-service/pkg/prom"
) )
func createStateDiffService() (sd.IService, error) { type blockRange [2]uint64
func createStateDiffService() (sd.StateDiffService, error) {
// load some necessary params
logWithCommand.Info("Loading statediff service parameters") logWithCommand.Info("Loading statediff service parameters")
path := viper.GetString("leveldb.path") path := viper.GetString("leveldb.path")
ancientPath := viper.GetString("leveldb.ancient") ancientPath := viper.GetString("leveldb.ancient")
@ -20,25 +28,25 @@ func createStateDiffService() (sd.IService, error) {
} }
nodeInfo := GetEthNodeInfo() nodeInfo := GetEthNodeInfo()
config, err := chainConfig(nodeInfo.ChainID) chainConf, err := chainConfig(nodeInfo.ChainID)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
// create leveldb reader // create leveldb reader
logWithCommand.Info("Creating leveldb reader") logWithCommand.Info("Creating leveldb reader")
conf := sd.ReaderConfig{ readerConf := sd.LvLDBReaderConfig{
TrieConfig: &trie.Config{ TrieConfig: &trie.Config{
Cache: viper.GetInt("cache.trie"), Cache: viper.GetInt("cache.trie"),
Journal: "", Journal: "",
Preimages: false, Preimages: false,
}, },
ChainConfig: config, ChainConfig: chainConf,
Path: path, Path: path,
AncientPath: ancientPath, AncientPath: ancientPath,
DBCacheSize: viper.GetInt("cache.database"), DBCacheSize: viper.GetInt("cache.database"),
} }
lvlDBReader, err := sd.NewLvlDBReader(conf) lvlDBReader, err := sd.NewLvlDBReader(readerConf)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
@ -49,11 +57,17 @@ func createStateDiffService() (sd.IService, error) {
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
indexer, err := ind.NewStateDiffIndexer(config, db) indexer, err := ind.NewStateDiffIndexer(chainConf, db)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
return sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers")) sdConf := sd.Config{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"),
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
PreRuns: setupPreRunRanges(),
}
return sd.NewStateDiffService(lvlDBReader, indexer, sdConf)
} }
func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) { func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) {
@ -65,3 +79,57 @@ func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) {
prom.RegisterDBCollector(params.Name, db.DB) prom.RegisterDBCollector(params.Name, db.DB)
return db, nil return db, nil
} }
func setupPreRunRanges() []sd.RangeRequest {
if !viper.GetBool("statediff.prerun") {
return nil
}
preRunParams := gethsd.Params{
IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"),
IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"),
IncludeBlock: viper.GetBool("prerun.params.includeBlock"),
IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"),
IncludeTD: viper.GetBool("prerun.params.includeTD"),
IncludeCode: viper.GetBool("prerun.params.includeCode"),
}
var addrStrs []string
viper.UnmarshalKey("prerun.params.watchedAddresses", &addrStrs)
addrs := make([]common.Address, len(addrStrs))
for i, addrStr := range addrStrs {
addrs[i] = common.HexToAddress(addrStr)
}
preRunParams.WatchedAddresses = addrs
var storageKeyStrs []string
viper.UnmarshalKey("prerun.params.watchedStorageKeys", &storageKeyStrs)
keys := make([]common.Hash, len(storageKeyStrs))
for i, keyStr := range storageKeyStrs {
keys[i] = common.HexToHash(keyStr)
}
preRunParams.WatchedStorageSlots = keys
var rawRanges []blockRange
blockRanges := make([]sd.RangeRequest, len(rawRanges))
viper.UnmarshalKey("prerun.ranges", &rawRanges)
for i, rawRange := range rawRanges {
blockRanges[i] = sd.RangeRequest{
Start: rawRange[0],
Stop: rawRange[1],
Params: preRunParams,
}
}
return blockRanges
}
func chainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 3:
return params.RopstenChainConfig, nil // Ropsten
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}

View File

@ -1,142 +0,0 @@
// Copyright © 2019 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"net/http"
"strconv"
"sync"
"time"
gethsd "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var writeCmd = &cobra.Command{
Use: "write",
Short: "Write statediffs directly to DB for preconfigured block ranges",
Long: `Usage
./eth-statediff-service write --config={path to toml config file}`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *logrus.WithField("SubCommand", subCommand)
write()
},
}
type blockRange [2]uint64
func init() {
rootCmd.AddCommand(writeCmd)
writeCmd.PersistentFlags().String("write-api", "", "starts a server which handles write request through endpoints")
viper.BindPFlag("write.serve", writeCmd.PersistentFlags().Lookup("write-api"))
}
func write() {
logWithCommand.Info("Running eth-statediff-service write command")
viper.BindEnv("write.serve", WRITE_SERVER)
addr := viper.GetString("write.serve")
statediffService, err := createStateDiffService()
if err != nil {
logWithCommand.Fatal(err)
}
// Read all defined block ranges, write statediffs to database
var blockRanges []blockRange
diffParams := gethsd.Params{ // todo: configurable?
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
viper.UnmarshalKey("write.ranges", &blockRanges)
viper.UnmarshalKey("write.params", &diffParams)
blockRangesCh := make(chan blockRange, 100)
go func() {
for _, r := range blockRanges {
blockRangesCh <- r
}
if addr == "" {
close(blockRangesCh)
return
}
startServer(addr, blockRangesCh)
}()
processRanges(statediffService, diffParams, blockRangesCh)
}
func startServer(addr string, blockRangesCh chan<- blockRange) {
handler := func(w http.ResponseWriter, req *http.Request) {
start, err := strconv.Atoi(req.URL.Query().Get("start"))
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse start value: %v", err), http.StatusInternalServerError)
return
}
end, err := strconv.Atoi(req.URL.Query().Get("end"))
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse end value: %v", err), http.StatusInternalServerError)
return
}
select {
case blockRangesCh <- blockRange{uint64(start), uint64(end)}:
case <-time.After(time.Millisecond * 200):
http.Error(w, "server is busy", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "added block range to the queue\n")
}
http.HandleFunc("/writeDiff", handler)
logrus.Fatal(http.ListenAndServe(addr, nil))
}
type diffService interface {
WriteStateDiffAt(blockNumber uint64, params gethsd.Params) error
}
func processRanges(sds diffService, param gethsd.Params, blockRangesCh chan blockRange) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for rng := range blockRangesCh {
if rng[1] < rng[0] {
logWithCommand.Fatal("range ending block number needs to be greater than starting block number")
}
logrus.Infof("Writing statediffs from block %d to %d", rng[0], rng[1])
for height := rng[0]; height <= rng[1]; height++ {
err := sds.WriteStateDiffAt(height, param)
if err != nil {
logrus.Errorf("failed to write state diff for range: %v %v", rng, err)
}
}
}
}()
wg.Wait()
}

View File

@ -1,76 +0,0 @@
package cmd
import (
"fmt"
"net/http"
"testing"
"time"
gethsd "github.com/ethereum/go-ethereum/statediff"
"github.com/stretchr/testify/require"
)
type mockService struct {
reqCount int
}
func (ms *mockService) WriteStateDiffAt(_ uint64, _ gethsd.Params) error {
ms.reqCount++
return nil
}
func TestProcessRanges(t *testing.T) {
blockRangesCh := make(chan blockRange)
srv := new(mockService)
go func() {
blockRangesCh <- blockRange{uint64(1), uint64(5)}
blockRangesCh <- blockRange{uint64(8), uint64(10)}
blockRangesCh <- blockRange{uint64(6), uint64(7)}
blockRangesCh <- blockRange{uint64(50), uint64(100)}
blockRangesCh <- blockRange{uint64(5), uint64(8)}
close(blockRangesCh)
}()
processRanges(srv, gethsd.Params{}, blockRangesCh)
require.Equal(t, 65, srv.reqCount)
}
func TestHttpEndpoint(t *testing.T) {
addr := ":11111"
queueSize := 5
blockRangesCh := make(chan blockRange, queueSize)
srv := new(mockService)
go startServer(addr, blockRangesCh)
go func() {
br := []blockRange{
{uint64(1), uint64(5)},
{uint64(8), uint64(10)},
{uint64(6), uint64(7)},
{uint64(50), uint64(100)},
{uint64(5), uint64(8)},
// Below request should fail since server has queue size of 5
{uint64(52), uint64(328)},
{uint64(35), uint64(428)},
{uint64(45), uint64(844)},
}
for idx, r := range br {
res, err := http.Get(fmt.Sprintf("http://localhost:11111/writeDiff?start=%d&end=%d", r[0], r[1]))
require.NoError(t, err)
require.NotNil(t, res)
if idx < queueSize {
require.Equal(t, res.StatusCode, 200)
} else {
require.Equal(t, res.StatusCode, 500)
}
require.NoError(t, res.Body.Close())
}
processRanges(srv, gethsd.Params{}, blockRangesCh)
}()
time.Sleep(1 * time.Second)
require.Equal(t, 65, srv.reqCount)
}

View File

@ -7,13 +7,24 @@
httpPath = "127.0.0.1:8545" httpPath = "127.0.0.1:8545"
[statediff] [statediff]
workers = 4 prerun = true
serviceWorkers = 1
workerQueueSize = 1024
trieWorkers = 4
[write] [prerun]
ranges = [ ranges = [
[0, 0] [0, 1000]
] ]
serve = ":8888" [prerun.params]
intermediateStateNodes = true
intermediateStorageNodes = true
includeBlock = true
includeReceipts = true
includeTD = true
includeCode = true
watchedAddresses = []
watchedStorageKeys = []
[log] [log]
file = "" file = ""
@ -36,5 +47,5 @@
[prom] [prom]
metrics = true metrics = true
http = true http = true
addr = "localhost" httpAddr = "localhost"
port = "8889" httpPort = "8889"

View File

@ -30,11 +30,11 @@ const APIVersion = "0.0.1"
// PublicStateDiffAPI provides an RPC interface // PublicStateDiffAPI provides an RPC interface
// that can be used to fetch historical diffs from leveldb directly // that can be used to fetch historical diffs from leveldb directly
type PublicStateDiffAPI struct { type PublicStateDiffAPI struct {
sds IService sds StateDiffService
} }
// NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service // NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service
func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { func NewPublicStateDiffAPI(sds StateDiffService) *PublicStateDiffAPI {
return &PublicStateDiffAPI{ return &PublicStateDiffAPI{
sds: sds, sds: sds,
} }
@ -54,3 +54,8 @@ func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error { func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error {
return api.sds.WriteStateDiffAt(blockNumber, params) return api.sds.WriteStateDiffAt(blockNumber, params)
} }
// WriteStateDiffsInRange writes the state diff objects for the provided block range, with the provided params
func (api *PublicStateDiffAPI) WriteStateDiffsInRange(ctx context.Context, start, stop uint64, params sd.Params) error {
return api.sds.WriteStateDiffsInRange(start, stop, params)
}

View File

@ -35,6 +35,7 @@ import (
sdtrie "github.com/ethereum/go-ethereum/statediff/trie" sdtrie "github.com/ethereum/go-ethereum/statediff/trie"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/sirupsen/logrus"
iter "github.com/vulcanize/go-eth-state-node-iterator" iter "github.com/vulcanize/go-eth-state-node-iterator"
) )
@ -206,7 +207,7 @@ func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.Sta
}, nil }, nil
} }
// Writes a statediff object to output callback // WriteStateDiffObject writes a statediff object to output callback
func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error { func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
if len(params.WatchedAddresses) > 0 { if len(params.WatchedAddresses) > 0 {
// if we are watching only specific accounts then we are only diffing leaf nodes // if we are watching only specific accounts then we are only diffing leaf nodes
@ -234,8 +235,8 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o
var iterPairs [][]iterPair var iterPairs [][]iterPair
for i := uint(0); i < sdb.numWorkers; i++ { for i := uint(0); i < sdb.numWorkers; i++ {
iterPairs = append(iterPairs, []iterPair{ iterPairs = append(iterPairs, []iterPair{
iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, {older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)}, {older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
}) })
} }
@ -252,7 +253,9 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o
wg.Add(1) wg.Add(1)
go func(worker uint) { go func(worker uint) {
defer wg.Done() defer wg.Done()
sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender) if err := sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender); err != nil {
logrus.Errorf("buildStateDiff error for worker %d, pparams %+v", worker, params)
}
}(w) }(w)
} }
wg.Wait() wg.Wait()

9
pkg/config.go Normal file
View File

@ -0,0 +1,9 @@
package statediff
// Config holds config params for the statediffing service
type Config struct {
ServiceWorkers uint
TrieWorkers uint
WorkerQueueSize uint
PreRuns []RangeRequest
}

View File

@ -28,23 +28,32 @@ import (
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
// LvlDBReader exposes the necessary read functions on lvldb // Reader interface required by the statediffing service
type Reader interface {
GetBlockByHash(hash common.Hash) (*types.Block, error)
GetBlockByNumber(number uint64) (*types.Block, error)
GetReceiptsByHash(hash common.Hash) (types.Receipts, error)
GetTdByHash(hash common.Hash) (*big.Int, error)
StateDB() state.Database
}
// LvlDBReader exposes the necessary Reader methods on lvldb
type LvlDBReader struct { type LvlDBReader struct {
ethDB ethdb.Database ethDB ethdb.Database
stateDB state.Database stateDB state.Database
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
} }
// ReaderConfig struct for initializing a LvlDBReader // LvLDBReaderConfig struct for initializing a LvlDBReader
type ReaderConfig struct { type LvLDBReaderConfig struct {
TrieConfig *trie.Config TrieConfig *trie.Config
ChainConfig *params.ChainConfig ChainConfig *params.ChainConfig
Path, AncientPath string Path, AncientPath string
DBCacheSize int DBCacheSize int
} }
// NewLvlDBReader creates a new LvlDBReader // NewLvlDBReader creates a new Read using LevelDB
func NewLvlDBReader(conf ReaderConfig) (*LvlDBReader, error) { func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) {
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(conf.Path, conf.DBCacheSize, 256, conf.AncientPath, "eth-statediff-service", true) edb, err := rawdb.NewLevelDBDatabaseWithFreezer(conf.Path, conf.DBCacheSize, 256, conf.AncientPath, "eth-statediff-service", true)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -37,34 +36,29 @@ import (
ind "github.com/ethereum/go-ethereum/statediff/indexer" ind "github.com/ethereum/go-ethereum/statediff/indexer"
) )
// lvlDBReader are the db interfaces required by the statediffing service const defaultQueueSize = 1024
type lvlDBReader interface {
GetBlockByHash(hash common.Hash) (*types.Block, error)
GetBlockByNumber(number uint64) (*types.Block, error)
GetReceiptsByHash(hash common.Hash) (types.Receipts, error)
GetTdByHash(hash common.Hash) (*big.Int, error)
StateDB() state.Database
}
// IService is the state-diffing service interface // StateDiffService is the state-diffing service interface
type IService interface { type StateDiffService interface {
// Start() and Stop() // Lifecycle Start() and Stop()
node.Lifecycle node.Lifecycle
// For node service registration // APIs and Protocols() interface for node service registration
APIs() []rpc.API APIs() []rpc.API
Protocols() []p2p.Protocol Protocols() []p2p.Protocol
// Main event loop for processing state diffs // Loop is the main event loop for processing state diffs
Loop(wg *sync.WaitGroup) Loop(wg *sync.WaitGroup)
// Method to get state diff object at specific block // StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// Method to get state diff object at specific block // StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error)
// Method to get state trie object at specific block // StateTrieAt method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// Method to write state diff object directly to DB // WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params sd.Params) error WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// Method to get state trie object at specific block // WriteStateDiffFor method to get state trie object at specific block
WriteStateDiffFor(blockHash common.Hash, params sd.Params) error WriteStateDiffFor(blockHash common.Hash, params sd.Params) error
// WriteStateDiffsInRange method to wrtie state diff objects within the range directly to the DB
WriteStateDiffsInRange(start, stop uint64, params sd.Params) error
} }
// Service is the underlying struct for the state diffing service // Service is the underlying struct for the state diffing service
@ -72,24 +66,36 @@ type Service struct {
// Used to build the state diff objects // Used to build the state diff objects
Builder Builder Builder Builder
// Used to read data from leveldb // Used to read data from leveldb
lvlDBReader lvlDBReader lvlDBReader Reader
// Used to signal shutdown of the service // Used to signal shutdown of the service
QuitChan chan bool quitChan chan bool
// Interface for publishing statediffs as PG-IPLD objects // Interface for publishing statediffs as PG-IPLD objects
indexer ind.Indexer indexer ind.Indexer
// range queue
queue chan RangeRequest
// number of ranges we can work over concurrently
workers uint
// ranges configured locally
preruns []RangeRequest
} }
// NewStateDiffService creates a new Service // NewStateDiffService creates a new Service
func NewStateDiffService(lvlDBReader lvlDBReader, indexer ind.Indexer, workers uint) (*Service, error) { func NewStateDiffService(lvlDBReader Reader, indexer ind.Indexer, conf Config) (*Service, error) {
builder, err := NewBuilder(lvlDBReader.StateDB(), workers) builder, err := NewBuilder(lvlDBReader.StateDB(), conf.TrieWorkers)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if conf.WorkerQueueSize == 0 {
conf.WorkerQueueSize = defaultQueueSize
}
return &Service{ return &Service{
lvlDBReader: lvlDBReader, lvlDBReader: lvlDBReader,
Builder: builder, Builder: builder,
QuitChan: make(chan bool), quitChan: make(chan bool),
indexer: indexer, indexer: indexer,
workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize),
preruns: conf.PreRuns,
}, nil }, nil
} }
@ -112,15 +118,37 @@ func (sds *Service) APIs() []rpc.API {
// Loop is an empty service loop for awaiting rpc requests // Loop is an empty service loop for awaiting rpc requests
func (sds *Service) Loop(wg *sync.WaitGroup) { func (sds *Service) Loop(wg *sync.WaitGroup) {
for i := 0; i < int(sds.workers); i++ {
wg.Add(1) wg.Add(1)
go func(id int) {
defer wg.Done()
for { for {
select { select {
case <-sds.QuitChan: case blockRange := <-sds.queue:
logrus.Info("closing the statediff service loop") for j := blockRange.Start; j <= blockRange.Start; j++ {
wg.Done() if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err)
}
select {
case <-sds.quitChan:
logrus.Infof("closing service worker %d\n"+
"working in range (%d, %d)\n"+
"last processed height: %d", id, blockRange.Start, blockRange.Stop, j)
return
default:
logrus.Infof("service worker %d finished processing statediff height %d in range (%d, %d)", id, j, blockRange.Start, blockRange.Stop)
}
}
case <-sds.quitChan:
logrus.Infof("closing the statediff service loop worker %d", id)
return return
} }
} }
}(i)
}
for _, preRun := range sds.preruns {
sds.queue <- preRun
}
} }
// StateDiffAt returns a state diff object payload at the specific blockheight // StateDiffAt returns a state diff object payload at the specific blockheight
@ -244,7 +272,7 @@ func (sds *Service) Start() error {
// Stop is used to close down the service // Stop is used to close down the service
func (sds *Service) Stop() error { func (sds *Service) Stop() error {
logrus.Info("stopping statediff service") logrus.Info("stopping statediff service")
close(sds.QuitChan) close(sds.quitChan)
return nil return nil
} }
@ -335,3 +363,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t)) prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t))
return err return err
} }
// WriteStateDiffsInRange adds a RangeRequest to the work queue
func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) error {
if stop < start {
return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop)
}
blocked := time.NewTimer(30 * time.Second)
select {
case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}:
return nil
case <-blocked.C:
return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop)
}
}

View File

@ -21,7 +21,8 @@ package statediff
import ( import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
sd "github.com/ethereum/go-ethereum/statediff/types" sd "github.com/ethereum/go-ethereum/statediff"
sdTypes "github.com/ethereum/go-ethereum/statediff/types"
) )
// AccountMap is a mapping of hex encoded path => account wrapper // AccountMap is a mapping of hex encoded path => account wrapper
@ -30,8 +31,14 @@ type AccountMap map[string]accountWrapper
// accountWrapper is used to temporary associate the unpacked node with its raw values // accountWrapper is used to temporary associate the unpacked node with its raw values
type accountWrapper struct { type accountWrapper struct {
Account *types.StateAccount Account *types.StateAccount
NodeType sd.NodeType NodeType sdTypes.NodeType
Path []byte Path []byte
NodeValue []byte NodeValue []byte
LeafKey []byte LeafKey []byte
} }
// RangeRequest holds range quest work params
type RangeRequest struct {
Start, Stop uint64
Params sd.Params
}