Refactor to use plugeth-statediff #1

Merged
roysc merged 19 commits from refactor-use-plugin into v5 2023-09-28 04:04:01 +00:00
7 changed files with 51 additions and 50 deletions
Showing only changes of commit f15ec1cd07 - Show all commits

View File

@ -299,7 +299,7 @@ func getConfig(nodeInfo node.Info) (interfaces.Config, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
logWithCommand.Infof("Configuring service for database type: %s", dbType) logWithCommand.Debugf("Configuring service for database type: %s", dbType)
var indexerConfig interfaces.Config var indexerConfig interfaces.Config
switch dbType { switch dbType {
case shared.FILE: case shared.FILE:

View File

@ -28,7 +28,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
sd "github.com/cerc-io/eth-statediff-service/pkg" pkg "github.com/cerc-io/eth-statediff-service/pkg"
srpc "github.com/cerc-io/eth-statediff-service/pkg/rpc" srpc "github.com/cerc-io/eth-statediff-service/pkg/rpc"
) )
@ -60,20 +60,12 @@ func maxParallelism() int {
} }
func serve() { func serve() {
logWithCommand.Info("Running eth-statediff-service serve command") logWithCommand.Debug("Running eth-statediff-service serve command")
logWithCommand.Infof("Parallelism: %d", maxParallelism()) logWithCommand.Debugf("Parallelism: %d", maxParallelism())
reader, chainConf, nodeInfo := instantiateLevelDBReader() reader, chainConf, nodeInfo := instantiateLevelDBReader()
// report latest block info reportLatestBlock(reader)
header, err := reader.GetLatestHeader()
if err != nil {
logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error())
}
if header.Number == nil {
logWithCommand.Fatal("Latest header found in levelDB has a nil block height")
}
logWithCommand.Infof("Latest block found in the levelDB\r\nheight: %s, hash: %s", header.Number.String(), header.Hash().Hex())
service, err := createStateDiffService(reader, chainConf, nodeInfo) service, err := createStateDiffService(reader, chainConf, nodeInfo)
if err != nil { if err != nil {
@ -100,16 +92,15 @@ func serve() {
} }
// start service and servers // start service and servers
logWithCommand.Info("Starting statediff service")
var wg sync.WaitGroup var wg sync.WaitGroup
if err := service.Loop(&wg); err != nil { if err := service.Loop(&wg); err != nil {
logWithCommand.Fatalf("unable to start statediff service: %v", err) logWithCommand.Fatalf("unable to start statediff service: %v", err)
} }
logWithCommand.Info("Starting RPC servers")
if err := startServers(service); err != nil { if err := startServers(service); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
logWithCommand.Info("RPC servers successfully spun up; awaiting requests") logWithCommand.Debug("RPC servers successfully spun up; awaiting requests")
// clean shutdown // clean shutdown
shutdown := make(chan os.Signal) shutdown := make(chan os.Signal)
@ -120,21 +111,19 @@ func serve() {
wg.Wait() wg.Wait()
} }
func startServers(serv *sd.Service) error { func startServers(serv *pkg.Service) error {
ipcPath := viper.GetString("server.ipcPath") ipcPath := viper.GetString("server.ipcPath")
httpPath := viper.GetString("server.httpPath") httpPath := viper.GetString("server.httpPath")
if ipcPath == "" && httpPath == "" { if ipcPath == "" && httpPath == "" {
logWithCommand.Fatal("Need an ipc path and/or an http path") logWithCommand.Fatal("Need an IPC path and/or an HTTP path")
} }
if ipcPath != "" { if ipcPath != "" {
logWithCommand.Info("Starting up IPC server")
_, _, err := srpc.StartIPCEndpoint(ipcPath, serv.APIs()) _, _, err := srpc.StartIPCEndpoint(ipcPath, serv.APIs())
if err != nil { if err != nil {
return err return err
} }
} }
if httpPath != "" { if httpPath != "" {
logWithCommand.Info("Starting up HTTP server")
_, err := srpc.StartHTTPEndpoint(httpPath, serv.APIs(), []string{"statediff"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) _, err := srpc.StartHTTPEndpoint(httpPath, serv.APIs(), []string{"statediff"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
if err != nil { if err != nil {
return err return err

View File

@ -42,13 +42,5 @@ func stats() {
logWithCommand.Info("Running eth-statediff-service stats command") logWithCommand.Info("Running eth-statediff-service stats command")
reader, _, _ := instantiateLevelDBReader() reader, _, _ := instantiateLevelDBReader()
reportLatestBlock(reader)
header, err := reader.GetLatestHeader()
if err != nil {
logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error())
}
if header.Number == nil {
logWithCommand.Fatal("Latest header found in levelDB has a nil block height")
}
logWithCommand.Infof("Latest block found in the levelDB\r\nheight: %s, hash: %s", header.Number.String(), header.Hash().Hex())
} }

View File

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/indexer" "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
@ -24,13 +24,13 @@ type blockRange [2]uint64
func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfig, nodeInfo node.Info) (*pkg.Service, error) { func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfig, nodeInfo node.Info) (*pkg.Service, error) {
// create statediff service // create statediff service
logWithCommand.Info("Setting up database") logWithCommand.Debug("Setting up database")
conf, err := getConfig(nodeInfo) conf, err := getConfig(nodeInfo)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
logWithCommand.Info("Creating statediff indexer") logWithCommand.Debug("Creating statediff indexer")
db, indexer, err := indexer.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf) db, indexer, err := indexer.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
@ -39,7 +39,7 @@ func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfi
prom.RegisterDBCollector(viper.GetString("database.name"), db) prom.RegisterDBCollector(viper.GetString("database.name"), db)
} }
logWithCommand.Info("Creating statediff service") logWithCommand.Debug("Creating statediff service")
sdConf := pkg.ServiceConfig{ sdConf := pkg.ServiceConfig{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"), ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"), TrieWorkers: viper.GetUint("statediff.trieWorkers"),
@ -93,7 +93,7 @@ func setupPreRunRanges() []pkg.RangeRequest {
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) { func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath) file, err := os.Open(chainConfigPath)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("Failed to read chain config file: %v", err)) log.Error("Failed to read chain config file", "error", err)
return nil, err return nil, err
} }
@ -101,19 +101,19 @@ func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
chainConfig := new(params.ChainConfig) chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil { if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error(fmt.Sprintf("invalid chain config file: %v", err)) log.Error("invalid chain config file", "error", err)
return nil, err return nil, err
} }
log.Info(fmt.Sprintf("Using chain config from %s file. Content %+v", chainConfigPath, chainConfig)) log.Debug(fmt.Sprintf("Using chain config from '%s'. Content: %+v", chainConfigPath, chainConfig))
return chainConfig, nil return chainConfig, nil
} }
func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) { func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) {
// load some necessary params // load some necessary params
logWithCommand.Info("Loading statediff service parameters") logWithCommand.Debug("Loading statediff service parameters")
mode := viper.GetString("leveldb.mode") mode := viper.GetString("leveldb.mode")
path := viper.GetString("leveldb.path") path := viper.GetString("leveldb.path")
ancientPath := viper.GetString("leveldb.ancient") ancientPath := viper.GetString("leveldb.ancient")
@ -140,7 +140,7 @@ func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) {
} }
// create LevelDB reader // create LevelDB reader
logWithCommand.Info("Creating LevelDB reader") logWithCommand.Debug("Creating LevelDB reader")
readerConf := pkg.LvLDBReaderConfig{ readerConf := pkg.LvLDBReaderConfig{
TrieConfig: &trie.Config{ TrieConfig: &trie.Config{
Cache: viper.GetInt("cache.trie"), Cache: viper.GetInt("cache.trie"),
@ -160,3 +160,18 @@ func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) {
} }
return reader, chainConf, nodeInfo return reader, chainConf, nodeInfo
} }
// report latest block info
func reportLatestBlock(reader pkg.Reader) {
header, err := reader.GetLatestHeader()
if err != nil {
logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error())
}
if header.Number == nil {
logWithCommand.Fatal("Latest header found in levelDB has a nil block height")
}
logWithCommand.
WithField("height", header.Number).
WithField("hash", header.Hash()).
Info("Latest block found in levelDB")
}

View File

@ -79,7 +79,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Se
if err := handler.RegisterName(api.Namespace, api.Service); err != nil { if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err return nil, nil, err
} }
log.Debug("IPC registered", "namespace", api.Namespace) log.WithField("namespace", api.Namespace).Debug("IPC server registered")
} }
// All APIs registered, start the IPC listener. // All APIs registered, start the IPC listener.
listener, err := ipcListen(ipcEndpoint) listener, err := ipcListen(ipcEndpoint)

View File

@ -189,25 +189,24 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
for { for {
select { select {
case blockRange := <-sds.queue: case blockRange := <-sds.queue:
logrus.Infof("service worker %d received range (%d, %d) off of work queue, beginning processing", id, blockRange.Start, blockRange.Stop) log := logrus.WithField("range", blockRange).WithField("worker", id)
log.Debug("processing range")
prom.DecQueuedRanges() prom.DecQueuedRanges()
for j := blockRange.Start; j <= blockRange.Stop; j++ { for j := blockRange.Start; j <= blockRange.Stop; j++ {
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err) log.Errorf("error writing statediff at block %d: %v", j, err)
} }
select { select {
case <-sds.quitChan: case <-sds.quitChan:
logrus.Infof("closing service worker %d\n"+ log.Infof("closing service worker (last processed block: %d)", j)
"working in range (%d, %d)\n"+
"last processed height: %d", id, blockRange.Start, blockRange.Stop, j)
return return
default: default:
logrus.Infof("service worker %d finished processing statediff height %d in range (%d, %d)", id, j, blockRange.Start, blockRange.Stop) log.Infof("Finished processing block %d", j)
} }
} }
logrus.Infof("service worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) log.Debugf("Finished processing range")
case <-sds.quitChan: case <-sds.quitChan:
logrus.Infof("closing the statediff service loop worker %d", id) logrus.Debugf("closing the statediff service loop worker %d", id)
return return
} }
} }
@ -251,7 +250,7 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params statediff.Params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
logrus.Infof("sending state diff at block %s", blockHash.Hex()) logrus.Infof("sending state diff at block %s", blockHash)
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -359,7 +358,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params statediff.Params
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params statediff.Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params statediff.Params) error {
logrus.Infof("Writing state diff for block %s", blockHash.Hex()) logrus.Infof("Writing state diff for block %s", blockHash)
t := time.Now() t := time.Now()
currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash) currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash)
if err != nil { if err != nil {
@ -437,7 +436,7 @@ func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params statediff.
select { select {
case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}: case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}:
prom.IncQueuedRanges() prom.IncQueuedRanges()
logrus.Infof("added range (%d, %d) to the worker queue", start, stop) logrus.Infof("Added range (%d, %d) to the worker queue", start, stop)
return nil return nil
case <-blocked.C: case <-blocked.C:
return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop) return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop)

View File

@ -20,6 +20,8 @@
package statediff package statediff
import ( import (
"fmt"
sd "github.com/cerc-io/plugeth-statediff" sd "github.com/cerc-io/plugeth-statediff"
) )
@ -28,3 +30,7 @@ type RangeRequest struct {
Start, Stop uint64 Start, Stop uint64
Params sd.Params Params sd.Params
} }
func (r RangeRequest) String() string {
return fmt.Sprintf("[%d,%d]", r.Start, r.Stop)
}