Merge pull request #39 from vulcanize/metrics

Metrics
This commit is contained in:
Ian Norden 2021-10-25 15:19:13 -05:00 committed by GitHub
commit ee59ff3650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 288 additions and 322 deletions

View File

@ -29,12 +29,16 @@ const (
ETH_NETWORK_ID = "ETH_NETWORK_ID"
ETH_CHAIN_ID = "ETH_CHAIN_ID"
DB_CACHE_SIZE_MB = "DB_CACHE_SIZE_MB"
TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB"
LVLDB_PATH = "LVLDB_PATH"
LVLDB_ANCIENT = "LVLDB_ANCIENT"
STATEDIFF_WORKERS = "STATEDIFF_WORKERS"
WRITE_SERVER = "WRITE_SERVER"
DB_CACHE_SIZE_MB = "DB_CACHE_SIZE_MB"
TRIE_CACHE_SIZE_MB = "TRIE_CACHE_SIZE_MB"
LVLDB_PATH = "LVLDB_PATH"
LVLDB_ANCIENT = "LVLDB_ANCIENT"
STATEDIFF_TRIE_WORKERS = "STATEDIFF_TRIE_WORKERS"
STATEDIFF_SERVICE_WORKERS = "STATEDIFF_SERVICE_WORKERS"
STATEDIFF_WORKER_QUEUE_SIZE = "STATEDIFF_WORKER_QUEUE_SIZE"
SERVICE_IPC_PATH = "SERVICE_IPC_PATH"
SERVICE_HTTP_PATH = "SERVICE_HTTP_PATH"
PROM_METRICS = "PROM_METRICS"
PROM_HTTP = "PROM_HTTP"
@ -44,6 +48,9 @@ const (
// Bind env vars for eth node and DB configuration
func init() {
viper.BindEnv("server.ipcPath", SERVICE_IPC_PATH)
viper.BindEnv("server.httpPath", SERVICE_HTTP_PATH)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
@ -70,5 +77,7 @@ func init() {
viper.BindEnv("prom.httpAddr", PROM_HTTP_ADDR)
viper.BindEnv("prom.httpPort", PROM_HTTP_PORT)
viper.BindEnv("statediff.workers", STATEDIFF_WORKERS)
viper.BindEnv("statediff.serviceWorkers", STATEDIFF_SERVICE_WORKERS)
viper.BindEnv("statediff.trieWorkers", STATEDIFF_TRIE_WORKERS)
viper.BindEnv("statediff.workerQueueSize", STATEDIFF_WORKER_QUEUE_SIZE)
}

View File

@ -68,6 +68,7 @@ func initFuncs(cmd *cobra.Command, args []string) {
}
if viper.GetBool("prom.metrics") {
log.Info("initializing prometheus metrics")
prom.Init()
}
@ -77,6 +78,7 @@ func initFuncs(cmd *cobra.Command, args []string) {
viper.GetString("prom.httpAddr"),
viper.GetString("prom.httpPort"),
)
log.Info("starting prometheus server")
prom.Listen(addr)
}
}
@ -100,6 +102,8 @@ func init() {
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
rootCmd.PersistentFlags().String("http-path", "", "vdb server http path")
rootCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location")
rootCmd.PersistentFlags().String("log-file", "", "file path for logging")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(),
@ -107,7 +111,11 @@ func init() {
rootCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore")
rootCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use")
rootCmd.PersistentFlags().Bool("prerun", false, "turn on prerun of toml configured ranges")
rootCmd.PersistentFlags().Int("service-workers", 0, "number of range requests to process concurrently")
rootCmd.PersistentFlags().Int("trie-workers", 0, "number of workers to use for trie traversal and processing")
rootCmd.PersistentFlags().Int("worker-queue-size", 0, "size of the range request queue for service workers")
rootCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name")
rootCmd.PersistentFlags().Int("database-port", 5432, "database port")
@ -131,9 +139,14 @@ func init() {
rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics")
viper.BindPFlag("server.httpPath", rootCmd.PersistentFlags().Lookup("http-path"))
viper.BindPFlag("server.ipcPath", rootCmd.PersistentFlags().Lookup("ipc-path"))
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers"))
viper.BindPFlag("statediff.prerun", rootCmd.PersistentFlags().Lookup("prerun"))
viper.BindPFlag("statediff.serviceWorkers", rootCmd.PersistentFlags().Lookup("service-workers"))
viper.BindPFlag("statediff.trieWorkers", rootCmd.PersistentFlags().Lookup("trie-workers"))
viper.BindPFlag("statediff.workerQueueSize", rootCmd.PersistentFlags().Lookup("worker-queue-size"))
viper.BindPFlag("leveldb.path", rootCmd.PersistentFlags().Lookup("leveldb-path"))
viper.BindPFlag("leveldb.ancient", rootCmd.PersistentFlags().Lookup("ancient-path"))
viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name"))

View File

@ -16,13 +16,11 @@
package cmd
import (
"fmt"
"os"
"os/signal"
"sync"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -34,7 +32,7 @@ import (
// serveCmd represents the serve command
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Standup a standalone statediffing RPC service on top of leveldb",
Short: "Stand up a standalone statediffing RPC service on top of leveldb",
Long: `Usage
./eth-statediff-service serve --config={path to toml config file}`,
@ -45,6 +43,10 @@ var serveCmd = &cobra.Command{
},
}
func init() {
rootCmd.AddCommand(serveCmd)
}
func serve() {
logWithCommand.Info("Running eth-statediff-service serve command")
@ -56,7 +58,9 @@ func serve() {
// start service and servers
logWithCommand.Info("Starting statediff service")
wg := new(sync.WaitGroup)
go statediffService.Loop(wg)
if err := statediffService.Loop(wg); err != nil {
logWithCommand.Fatalf("unable to start statediff service: %v", err)
}
logWithCommand.Info("Starting RPC servers")
if err := startServers(statediffService); err != nil {
logWithCommand.Fatal(err)
@ -72,19 +76,7 @@ func serve() {
wg.Wait()
}
func init() {
rootCmd.AddCommand(serveCmd)
serveCmd.PersistentFlags().String("http-path", "", "vdb server http path")
serveCmd.PersistentFlags().String("ipc-path", "", "vdb server ipc path")
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("http-path"))
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("ipc-path"))
}
func startServers(serv sd.IService) error {
viper.BindEnv("server.ipcPath", "SERVER_IPC_PATH")
viper.BindEnv("server.httpPath", "SERVER_HTTP_PATH")
func startServers(serv sd.StateDiffService) error {
ipcPath := viper.GetString("server.ipcPath")
httpPath := viper.GetString("server.httpPath")
if ipcPath == "" && httpPath == "" {
@ -106,18 +98,3 @@ func startServers(serv sd.IService) error {
}
return nil
}
func chainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 3:
return params.RopstenChainConfig, nil // Ropsten
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}

View File

@ -1,6 +1,11 @@
package cmd
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
gethsd "github.com/ethereum/go-ethereum/statediff"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
@ -11,7 +16,10 @@ import (
"github.com/vulcanize/eth-statediff-service/pkg/prom"
)
func createStateDiffService() (sd.IService, error) {
type blockRange [2]uint64
func createStateDiffService() (sd.StateDiffService, error) {
// load some necessary params
logWithCommand.Info("Loading statediff service parameters")
path := viper.GetString("leveldb.path")
ancientPath := viper.GetString("leveldb.ancient")
@ -20,25 +28,25 @@ func createStateDiffService() (sd.IService, error) {
}
nodeInfo := GetEthNodeInfo()
config, err := chainConfig(nodeInfo.ChainID)
chainConf, err := chainConfig(nodeInfo.ChainID)
if err != nil {
logWithCommand.Fatal(err)
}
// create leveldb reader
logWithCommand.Info("Creating leveldb reader")
conf := sd.ReaderConfig{
readerConf := sd.LvLDBReaderConfig{
TrieConfig: &trie.Config{
Cache: viper.GetInt("cache.trie"),
Journal: "",
Preimages: false,
},
ChainConfig: config,
ChainConfig: chainConf,
Path: path,
AncientPath: ancientPath,
DBCacheSize: viper.GetInt("cache.database"),
}
lvlDBReader, err := sd.NewLvlDBReader(conf)
lvlDBReader, err := sd.NewLvlDBReader(readerConf)
if err != nil {
logWithCommand.Fatal(err)
}
@ -49,11 +57,17 @@ func createStateDiffService() (sd.IService, error) {
if err != nil {
logWithCommand.Fatal(err)
}
indexer, err := ind.NewStateDiffIndexer(config, db)
indexer, err := ind.NewStateDiffIndexer(chainConf, db)
if err != nil {
logWithCommand.Fatal(err)
}
return sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers"))
sdConf := sd.Config{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"),
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
PreRuns: setupPreRunRanges(),
}
return sd.NewStateDiffService(lvlDBReader, indexer, sdConf)
}
func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) {
@ -65,3 +79,57 @@ func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) {
prom.RegisterDBCollector(params.Name, db.DB)
return db, nil
}
func setupPreRunRanges() []sd.RangeRequest {
if !viper.GetBool("statediff.prerun") {
return nil
}
preRunParams := gethsd.Params{
IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"),
IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"),
IncludeBlock: viper.GetBool("prerun.params.includeBlock"),
IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"),
IncludeTD: viper.GetBool("prerun.params.includeTD"),
IncludeCode: viper.GetBool("prerun.params.includeCode"),
}
var addrStrs []string
viper.UnmarshalKey("prerun.params.watchedAddresses", &addrStrs)
addrs := make([]common.Address, len(addrStrs))
for i, addrStr := range addrStrs {
addrs[i] = common.HexToAddress(addrStr)
}
preRunParams.WatchedAddresses = addrs
var storageKeyStrs []string
viper.UnmarshalKey("prerun.params.watchedStorageKeys", &storageKeyStrs)
keys := make([]common.Hash, len(storageKeyStrs))
for i, keyStr := range storageKeyStrs {
keys[i] = common.HexToHash(keyStr)
}
preRunParams.WatchedStorageSlots = keys
var rawRanges []blockRange
viper.UnmarshalKey("prerun.ranges", &rawRanges)
blockRanges := make([]sd.RangeRequest, len(rawRanges))
for i, rawRange := range rawRanges {
blockRanges[i] = sd.RangeRequest{
Start: rawRange[0],
Stop: rawRange[1],
Params: preRunParams,
}
}
return blockRanges
}
func chainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 3:
return params.RopstenChainConfig, nil // Ropsten
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}

View File

@ -1,142 +0,0 @@
// Copyright © 2019 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"net/http"
"strconv"
"sync"
"time"
gethsd "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var writeCmd = &cobra.Command{
Use: "write",
Short: "Write statediffs directly to DB for preconfigured block ranges",
Long: `Usage
./eth-statediff-service write --config={path to toml config file}`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *logrus.WithField("SubCommand", subCommand)
write()
},
}
type blockRange [2]uint64
func init() {
rootCmd.AddCommand(writeCmd)
writeCmd.PersistentFlags().String("write-api", "", "starts a server which handles write request through endpoints")
viper.BindPFlag("write.serve", writeCmd.PersistentFlags().Lookup("write-api"))
}
func write() {
logWithCommand.Info("Running eth-statediff-service write command")
viper.BindEnv("write.serve", WRITE_SERVER)
addr := viper.GetString("write.serve")
statediffService, err := createStateDiffService()
if err != nil {
logWithCommand.Fatal(err)
}
// Read all defined block ranges, write statediffs to database
var blockRanges []blockRange
diffParams := gethsd.Params{ // todo: configurable?
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
viper.UnmarshalKey("write.ranges", &blockRanges)
viper.UnmarshalKey("write.params", &diffParams)
blockRangesCh := make(chan blockRange, 100)
go func() {
for _, r := range blockRanges {
blockRangesCh <- r
}
if addr == "" {
close(blockRangesCh)
return
}
startServer(addr, blockRangesCh)
}()
processRanges(statediffService, diffParams, blockRangesCh)
}
func startServer(addr string, blockRangesCh chan<- blockRange) {
handler := func(w http.ResponseWriter, req *http.Request) {
start, err := strconv.Atoi(req.URL.Query().Get("start"))
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse start value: %v", err), http.StatusInternalServerError)
return
}
end, err := strconv.Atoi(req.URL.Query().Get("end"))
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse end value: %v", err), http.StatusInternalServerError)
return
}
select {
case blockRangesCh <- blockRange{uint64(start), uint64(end)}:
case <-time.After(time.Millisecond * 200):
http.Error(w, "server is busy", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "added block range to the queue\n")
}
http.HandleFunc("/writeDiff", handler)
logrus.Fatal(http.ListenAndServe(addr, nil))
}
type diffService interface {
WriteStateDiffAt(blockNumber uint64, params gethsd.Params) error
}
func processRanges(sds diffService, param gethsd.Params, blockRangesCh chan blockRange) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for rng := range blockRangesCh {
if rng[1] < rng[0] {
logWithCommand.Fatal("range ending block number needs to be greater than starting block number")
}
logrus.Infof("Writing statediffs from block %d to %d", rng[0], rng[1])
for height := rng[0]; height <= rng[1]; height++ {
err := sds.WriteStateDiffAt(height, param)
if err != nil {
logrus.Errorf("failed to write state diff for range: %v %v", rng, err)
}
}
}
}()
wg.Wait()
}

View File

@ -1,76 +0,0 @@
package cmd
import (
"fmt"
"net/http"
"testing"
"time"
gethsd "github.com/ethereum/go-ethereum/statediff"
"github.com/stretchr/testify/require"
)
type mockService struct {
reqCount int
}
func (ms *mockService) WriteStateDiffAt(_ uint64, _ gethsd.Params) error {
ms.reqCount++
return nil
}
func TestProcessRanges(t *testing.T) {
blockRangesCh := make(chan blockRange)
srv := new(mockService)
go func() {
blockRangesCh <- blockRange{uint64(1), uint64(5)}
blockRangesCh <- blockRange{uint64(8), uint64(10)}
blockRangesCh <- blockRange{uint64(6), uint64(7)}
blockRangesCh <- blockRange{uint64(50), uint64(100)}
blockRangesCh <- blockRange{uint64(5), uint64(8)}
close(blockRangesCh)
}()
processRanges(srv, gethsd.Params{}, blockRangesCh)
require.Equal(t, 65, srv.reqCount)
}
func TestHttpEndpoint(t *testing.T) {
addr := ":11111"
queueSize := 5
blockRangesCh := make(chan blockRange, queueSize)
srv := new(mockService)
go startServer(addr, blockRangesCh)
go func() {
br := []blockRange{
{uint64(1), uint64(5)},
{uint64(8), uint64(10)},
{uint64(6), uint64(7)},
{uint64(50), uint64(100)},
{uint64(5), uint64(8)},
// Below request should fail since server has queue size of 5
{uint64(52), uint64(328)},
{uint64(35), uint64(428)},
{uint64(45), uint64(844)},
}
for idx, r := range br {
res, err := http.Get(fmt.Sprintf("http://localhost:11111/writeDiff?start=%d&end=%d", r[0], r[1]))
require.NoError(t, err)
require.NotNil(t, res)
if idx < queueSize {
require.Equal(t, res.StatusCode, 200)
} else {
require.Equal(t, res.StatusCode, 500)
}
require.NoError(t, res.Body.Close())
}
processRanges(srv, gethsd.Params{}, blockRangesCh)
}()
time.Sleep(1 * time.Second)
require.Equal(t, 65, srv.reqCount)
}

View File

@ -7,13 +7,24 @@
httpPath = "127.0.0.1:8545"
[statediff]
workers = 4
prerun = true
serviceWorkers = 1
workerQueueSize = 1024
trieWorkers = 4
[write]
[prerun]
ranges = [
[0, 0]
[0, 1000]
]
serve = ":8888"
[prerun.params]
intermediateStateNodes = true
intermediateStorageNodes = true
includeBlock = true
includeReceipts = true
includeTD = true
includeCode = true
watchedAddresses = []
watchedStorageKeys = []
[log]
file = ""
@ -36,5 +47,5 @@
[prom]
metrics = true
http = true
addr = "localhost"
port = "8889"
httpAddr = "localhost"
httpPort = "8889"

View File

@ -30,11 +30,11 @@ const APIVersion = "0.0.1"
// PublicStateDiffAPI provides an RPC interface
// that can be used to fetch historical diffs from leveldb directly
type PublicStateDiffAPI struct {
sds IService
sds StateDiffService
}
// NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service
func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
func NewPublicStateDiffAPI(sds StateDiffService) *PublicStateDiffAPI {
return &PublicStateDiffAPI{
sds: sds,
}
@ -54,3 +54,8 @@ func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error {
return api.sds.WriteStateDiffAt(blockNumber, params)
}
// WriteStateDiffsInRange writes the state diff objects for the provided block range, with the provided params
func (api *PublicStateDiffAPI) WriteStateDiffsInRange(ctx context.Context, start, stop uint64, params sd.Params) error {
return api.sds.WriteStateDiffsInRange(start, stop, params)
}

View File

@ -35,6 +35,7 @@ import (
sdtrie "github.com/ethereum/go-ethereum/statediff/trie"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
"github.com/sirupsen/logrus"
iter "github.com/vulcanize/go-eth-state-node-iterator"
)
@ -206,7 +207,7 @@ func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.Sta
}, nil
}
// Writes a statediff object to output callback
// WriteStateDiffObject writes a statediff object to output callback
func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
if len(params.WatchedAddresses) > 0 {
// if we are watching only specific accounts then we are only diffing leaf nodes
@ -234,8 +235,8 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o
var iterPairs [][]iterPair
for i := uint(0); i < sdb.numWorkers; i++ {
iterPairs = append(iterPairs, []iterPair{
iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
iterPair{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
{older: oldIterFac.IteratorAt(i), newer: newIterFac.IteratorAt(i)},
})
}
@ -252,7 +253,9 @@ func (sdb *builder) WriteStateDiffObject(args sd.StateRoots, params sd.Params, o
wg.Add(1)
go func(worker uint) {
defer wg.Done()
sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender)
if err := sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender); err != nil {
logrus.Errorf("buildStateDiff error for worker %d, pparams %+v", worker, params)
}
}(w)
}
wg.Wait()

9
pkg/config.go Normal file
View File

@ -0,0 +1,9 @@
package statediff
// Config holds config params for the statediffing service
type Config struct {
ServiceWorkers uint
TrieWorkers uint
WorkerQueueSize uint
PreRuns []RangeRequest
}

View File

@ -29,6 +29,7 @@ const statsSubsystem = "stats"
var (
metrics bool
queuedRanges prometheus.Gauge
lastLoadedHeight prometheus.Gauge
lastProcessedHeight prometheus.Gauge
@ -39,6 +40,7 @@ var (
)
const (
RANGES_QUEUED = "ranges_queued"
LOADED_HEIGHT = "loaded_height"
PROCESSED_HEIGHT = "processed_height"
T_BLOCK_LOAD = "t_block_load"
@ -51,6 +53,11 @@ const (
func Init() {
metrics = true
queuedRanges = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: RANGES_QUEUED,
Help: "Number of range requests currently queued",
})
lastLoadedHeight = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: LOADED_HEIGHT,
@ -88,13 +95,27 @@ func Init() {
})
}
// RegisterDBCollector create metric colletor for given connection
// RegisterDBCollector create metric collector for given connection
func RegisterDBCollector(name string, db *sqlx.DB) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}
}
// IncQueuedRanges increments the number of queued range requests
func IncQueuedRanges() {
if metrics {
queuedRanges.Inc()
}
}
// DecQueuedRanges decrements the number of queued range requests
func DecQueuedRanges() {
if metrics {
queuedRanges.Dec()
}
}
// SetLastLoadedHeight sets last loaded height
func SetLastLoadedHeight(height int64) {
if metrics {

View File

@ -28,23 +28,32 @@ import (
"github.com/ethereum/go-ethereum/trie"
)
// LvlDBReader exposes the necessary read functions on lvldb
// Reader interface required by the statediffing service
type Reader interface {
GetBlockByHash(hash common.Hash) (*types.Block, error)
GetBlockByNumber(number uint64) (*types.Block, error)
GetReceiptsByHash(hash common.Hash) (types.Receipts, error)
GetTdByHash(hash common.Hash) (*big.Int, error)
StateDB() state.Database
}
// LvlDBReader exposes the necessary Reader methods on lvldb
type LvlDBReader struct {
ethDB ethdb.Database
stateDB state.Database
chainConfig *params.ChainConfig
}
// ReaderConfig struct for initializing a LvlDBReader
type ReaderConfig struct {
// LvLDBReaderConfig struct for initializing a LvlDBReader
type LvLDBReaderConfig struct {
TrieConfig *trie.Config
ChainConfig *params.ChainConfig
Path, AncientPath string
DBCacheSize int
}
// NewLvlDBReader creates a new LvlDBReader
func NewLvlDBReader(conf ReaderConfig) (*LvlDBReader, error) {
// NewLvlDBReader creates a new Read using LevelDB
func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) {
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(conf.Path, conf.DBCacheSize, 256, conf.AncientPath, "eth-statediff-service", true)
if err != nil {
return nil, err

View File

@ -23,7 +23,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
@ -37,34 +36,29 @@ import (
ind "github.com/ethereum/go-ethereum/statediff/indexer"
)
// lvlDBReader are the db interfaces required by the statediffing service
type lvlDBReader interface {
GetBlockByHash(hash common.Hash) (*types.Block, error)
GetBlockByNumber(number uint64) (*types.Block, error)
GetReceiptsByHash(hash common.Hash) (types.Receipts, error)
GetTdByHash(hash common.Hash) (*big.Int, error)
StateDB() state.Database
}
const defaultQueueSize = 1024
// IService is the state-diffing service interface
type IService interface {
// Start() and Stop()
// StateDiffService is the state-diffing service interface
type StateDiffService interface {
// Lifecycle Start() and Stop()
node.Lifecycle
// For node service registration
// APIs and Protocols() interface for node service registration
APIs() []rpc.API
Protocols() []p2p.Protocol
// Main event loop for processing state diffs
Loop(wg *sync.WaitGroup)
// Method to get state diff object at specific block
// Loop is the main event loop for processing state diffs
Loop(wg *sync.WaitGroup) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// Method to get state diff object at specific block
// StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error)
// Method to get state trie object at specific block
// StateTrieAt method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// Method to write state diff object directly to DB
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// Method to get state trie object at specific block
// WriteStateDiffFor method to get state trie object at specific block
WriteStateDiffFor(blockHash common.Hash, params sd.Params) error
// WriteStateDiffsInRange method to wrtie state diff objects within the range directly to the DB
WriteStateDiffsInRange(start, stop uint64, params sd.Params) error
}
// Service is the underlying struct for the state diffing service
@ -72,24 +66,35 @@ type Service struct {
// Used to build the state diff objects
Builder Builder
// Used to read data from leveldb
lvlDBReader lvlDBReader
lvlDBReader Reader
// Used to signal shutdown of the service
QuitChan chan bool
quitChan chan struct{}
// Interface for publishing statediffs as PG-IPLD objects
indexer ind.Indexer
// range queue
queue chan RangeRequest
// number of ranges we can work over concurrently
workers uint
// ranges configured locally
preruns []RangeRequest
}
// NewStateDiffService creates a new Service
func NewStateDiffService(lvlDBReader lvlDBReader, indexer ind.Indexer, workers uint) (*Service, error) {
builder, err := NewBuilder(lvlDBReader.StateDB(), workers)
func NewStateDiffService(lvlDBReader Reader, indexer ind.Indexer, conf Config) (*Service, error) {
builder, err := NewBuilder(lvlDBReader.StateDB(), conf.TrieWorkers)
if err != nil {
return nil, err
}
if conf.WorkerQueueSize == 0 {
conf.WorkerQueueSize = defaultQueueSize
}
return &Service{
lvlDBReader: lvlDBReader,
Builder: builder,
QuitChan: make(chan bool),
indexer: indexer,
workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize),
preruns: conf.PreRuns,
}, nil
}
@ -111,16 +116,48 @@ func (sds *Service) APIs() []rpc.API {
}
// Loop is an empty service loop for awaiting rpc requests
func (sds *Service) Loop(wg *sync.WaitGroup) {
wg.Add(1)
for {
select {
case <-sds.QuitChan:
logrus.Info("closing the statediff service loop")
wg.Done()
return
func (sds *Service) Loop(wg *sync.WaitGroup) error {
if sds.quitChan != nil {
return fmt.Errorf("service loop is already running")
}
sds.quitChan = make(chan struct{})
for i := 0; i < int(sds.workers); i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case blockRange := <-sds.queue:
prom.DecQueuedRanges()
for j := blockRange.Start; j <= blockRange.Stop; j++ {
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err)
}
select {
case <-sds.quitChan:
logrus.Infof("closing service worker %d\n"+
"working in range (%d, %d)\n"+
"last processed height: %d", id, blockRange.Start, blockRange.Stop, j)
return
default:
logrus.Infof("service worker %d finished processing statediff height %d in range (%d, %d)", id, j, blockRange.Start, blockRange.Stop)
}
}
logrus.Infof("service worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop)
case <-sds.quitChan:
logrus.Infof("closing the statediff service loop worker %d", id)
return
}
}
}(i)
}
for _, preRun := range sds.preruns {
if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil {
close(sds.quitChan)
return err
}
}
return nil
}
// StateDiffAt returns a state diff object payload at the specific blockheight
@ -237,14 +274,13 @@ func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.
// Start is used to begin the service
func (sds *Service) Start() error {
logrus.Info("starting statediff service")
go sds.Loop(new(sync.WaitGroup))
return nil
return sds.Loop(new(sync.WaitGroup))
}
// Stop is used to close down the service
func (sds *Service) Stop() error {
logrus.Info("stopping statediff service")
close(sds.QuitChan)
close(sds.quitChan)
return nil
}
@ -335,3 +371,19 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t))
return err
}
// WriteStateDiffsInRange adds a RangeRequest to the work queue
func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) error {
if stop < start {
return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop)
}
blocked := time.NewTimer(30 * time.Second)
select {
case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}:
prom.IncQueuedRanges()
logrus.Infof("added range (%d, %d) to the worker queue", start, stop)
return nil
case <-blocked.C:
return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop)
}
}

View File

@ -21,7 +21,8 @@ package statediff
import (
"github.com/ethereum/go-ethereum/core/types"
sd "github.com/ethereum/go-ethereum/statediff/types"
sd "github.com/ethereum/go-ethereum/statediff"
sdTypes "github.com/ethereum/go-ethereum/statediff/types"
)
// AccountMap is a mapping of hex encoded path => account wrapper
@ -30,8 +31,14 @@ type AccountMap map[string]accountWrapper
// accountWrapper is used to temporary associate the unpacked node with its raw values
type accountWrapper struct {
Account *types.StateAccount
NodeType sd.NodeType
NodeType sdTypes.NodeType
Path []byte
NodeValue []byte
LeafKey []byte
}
// RangeRequest holds range quest work params
type RangeRequest struct {
Start, Stop uint64
Params sd.Params
}