diff --git a/cmd/root.go b/cmd/root.go index 2badc52..a755324 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -299,7 +299,7 @@ func getConfig(nodeInfo node.Info) (interfaces.Config, error) { if err != nil { return nil, err } - logWithCommand.Infof("Configuring service for database type: %s", dbType) + logWithCommand.Debugf("Configuring service for database type: %s", dbType) var indexerConfig interfaces.Config switch dbType { case shared.FILE: diff --git a/cmd/serve.go b/cmd/serve.go index 1713eea..faada92 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -28,7 +28,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - sd "github.com/cerc-io/eth-statediff-service/pkg" + pkg "github.com/cerc-io/eth-statediff-service/pkg" srpc "github.com/cerc-io/eth-statediff-service/pkg/rpc" ) @@ -60,20 +60,12 @@ func maxParallelism() int { } func serve() { - logWithCommand.Info("Running eth-statediff-service serve command") - logWithCommand.Infof("Parallelism: %d", maxParallelism()) + logWithCommand.Debug("Running eth-statediff-service serve command") + logWithCommand.Debugf("Parallelism: %d", maxParallelism()) reader, chainConf, nodeInfo := instantiateLevelDBReader() - // report latest block info - header, err := reader.GetLatestHeader() - if err != nil { - logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error()) - } - if header.Number == nil { - logWithCommand.Fatal("Latest header found in levelDB has a nil block height") - } - logWithCommand.Infof("Latest block found in the levelDB\r\nheight: %s, hash: %s", header.Number.String(), header.Hash().Hex()) + reportLatestBlock(reader) service, err := createStateDiffService(reader, chainConf, nodeInfo) if err != nil { @@ -100,16 +92,15 @@ func serve() { } // start service and servers - logWithCommand.Info("Starting statediff service") var wg sync.WaitGroup if err := service.Loop(&wg); err != nil { logWithCommand.Fatalf("unable to start statediff service: %v", err) } - logWithCommand.Info("Starting RPC servers") + if err := startServers(service); err != nil { logWithCommand.Fatal(err) } - logWithCommand.Info("RPC servers successfully spun up; awaiting requests") + logWithCommand.Debug("RPC servers successfully spun up; awaiting requests") // clean shutdown shutdown := make(chan os.Signal) @@ -120,21 +111,19 @@ func serve() { wg.Wait() } -func startServers(serv *sd.Service) error { +func startServers(serv *pkg.Service) error { ipcPath := viper.GetString("server.ipcPath") httpPath := viper.GetString("server.httpPath") if ipcPath == "" && httpPath == "" { - logWithCommand.Fatal("Need an ipc path and/or an http path") + logWithCommand.Fatal("Need an IPC path and/or an HTTP path") } if ipcPath != "" { - logWithCommand.Info("Starting up IPC server") _, _, err := srpc.StartIPCEndpoint(ipcPath, serv.APIs()) if err != nil { return err } } if httpPath != "" { - logWithCommand.Info("Starting up HTTP server") _, err := srpc.StartHTTPEndpoint(httpPath, serv.APIs(), []string{"statediff"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) if err != nil { return err diff --git a/cmd/stats.go b/cmd/stats.go index abf91f7..3d494f2 100644 --- a/cmd/stats.go +++ b/cmd/stats.go @@ -42,13 +42,5 @@ func stats() { logWithCommand.Info("Running eth-statediff-service stats command") reader, _, _ := instantiateLevelDBReader() - - header, err := reader.GetLatestHeader() - if err != nil { - logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error()) - } - if header.Number == nil { - logWithCommand.Fatal("Latest header found in levelDB has a nil block height") - } - logWithCommand.Infof("Latest block found in the levelDB\r\nheight: %s, hash: %s", header.Number.String(), header.Hash().Hex()) + reportLatestBlock(reader) } diff --git a/cmd/util.go b/cmd/util.go index bb620d8..78c3a64 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -6,7 +6,7 @@ import ( "fmt" "os" - "github.com/cerc-io/plugeth-statediff" + statediff "github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff/indexer" "github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/cerc-io/plugeth-statediff/indexer/shared" @@ -24,13 +24,13 @@ type blockRange [2]uint64 func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfig, nodeInfo node.Info) (*pkg.Service, error) { // create statediff service - logWithCommand.Info("Setting up database") + logWithCommand.Debug("Setting up database") conf, err := getConfig(nodeInfo) if err != nil { logWithCommand.Fatal(err) } - logWithCommand.Info("Creating statediff indexer") + logWithCommand.Debug("Creating statediff indexer") db, indexer, err := indexer.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf) if err != nil { logWithCommand.Fatal(err) @@ -39,7 +39,7 @@ func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfi prom.RegisterDBCollector(viper.GetString("database.name"), db) } - logWithCommand.Info("Creating statediff service") + logWithCommand.Debug("Creating statediff service") sdConf := pkg.ServiceConfig{ ServiceWorkers: viper.GetUint("statediff.serviceWorkers"), TrieWorkers: viper.GetUint("statediff.trieWorkers"), @@ -93,7 +93,7 @@ func setupPreRunRanges() []pkg.RangeRequest { func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) { file, err := os.Open(chainConfigPath) if err != nil { - log.Error(fmt.Sprintf("Failed to read chain config file: %v", err)) + log.Error("Failed to read chain config file", "error", err) return nil, err } @@ -101,19 +101,19 @@ func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) { chainConfig := new(params.ChainConfig) if err := json.NewDecoder(file).Decode(chainConfig); err != nil { - log.Error(fmt.Sprintf("invalid chain config file: %v", err)) + log.Error("invalid chain config file", "error", err) return nil, err } - log.Info(fmt.Sprintf("Using chain config from %s file. Content %+v", chainConfigPath, chainConfig)) + log.Debug(fmt.Sprintf("Using chain config from '%s'. Content: %+v", chainConfigPath, chainConfig)) return chainConfig, nil } func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) { // load some necessary params - logWithCommand.Info("Loading statediff service parameters") + logWithCommand.Debug("Loading statediff service parameters") mode := viper.GetString("leveldb.mode") path := viper.GetString("leveldb.path") ancientPath := viper.GetString("leveldb.ancient") @@ -140,7 +140,7 @@ func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) { } // create LevelDB reader - logWithCommand.Info("Creating LevelDB reader") + logWithCommand.Debug("Creating LevelDB reader") readerConf := pkg.LvLDBReaderConfig{ TrieConfig: &trie.Config{ Cache: viper.GetInt("cache.trie"), @@ -160,3 +160,18 @@ func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) { } return reader, chainConf, nodeInfo } + +// report latest block info +func reportLatestBlock(reader pkg.Reader) { + header, err := reader.GetLatestHeader() + if err != nil { + logWithCommand.Fatalf("Unable to determine latest header height and hash: %s", err.Error()) + } + if header.Number == nil { + logWithCommand.Fatal("Latest header found in levelDB has a nil block height") + } + logWithCommand. + WithField("height", header.Number). + WithField("hash", header.Hash()). + Info("Latest block found in levelDB") +} diff --git a/pkg/reader.go b/pkg/reader.go index e79cdce..21e506b 100644 --- a/pkg/reader.go +++ b/pkg/reader.go @@ -90,11 +90,11 @@ func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) { func (ldr *LvlDBReader) GetBlockByHash(hash common.Hash) (*types.Block, error) { height := rawdb.ReadHeaderNumber(ldr.ethDB, hash) if height == nil { - return nil, fmt.Errorf("unable to read header height for header hash %s", hash.String()) + return nil, fmt.Errorf("unable to read header height for header hash %s", hash) } block := rawdb.ReadBlock(ldr.ethDB, hash, *height) if block == nil { - return nil, fmt.Errorf("unable to read block at height %d hash %s", *height, hash.String()) + return nil, fmt.Errorf("unable to read block at height %d hash %s", *height, hash) } return block, nil } @@ -103,7 +103,7 @@ func (ldr *LvlDBReader) GetBlockByNumber(number uint64) (*types.Block, error) { hash := rawdb.ReadCanonicalHash(ldr.ethDB, number) block := rawdb.ReadBlock(ldr.ethDB, hash, number) if block == nil { - return nil, fmt.Errorf("unable to read block at height %d hash %s", number, hash.String()) + return nil, fmt.Errorf("unable to read block at height %d hash %s", number, hash) } return block, nil } @@ -112,11 +112,11 @@ func (ldr *LvlDBReader) GetBlockByNumber(number uint64) (*types.Block, error) { func (ldr *LvlDBReader) GetReceiptsByHash(hash common.Hash) (types.Receipts, error) { number := rawdb.ReadHeaderNumber(ldr.ethDB, hash) if number == nil { - return nil, fmt.Errorf("unable to read header height for header hash %s", hash.String()) + return nil, fmt.Errorf("unable to read header height for header hash %s", hash) } receipts := rawdb.ReadReceipts(ldr.ethDB, hash, *number, ldr.chainConfig) if receipts == nil { - return nil, fmt.Errorf("unable to read receipts at height %d hash %s", number, hash.String()) + return nil, fmt.Errorf("unable to read receipts at height %d hash %s", number, hash) } return receipts, nil } @@ -125,11 +125,11 @@ func (ldr *LvlDBReader) GetReceiptsByHash(hash common.Hash) (types.Receipts, err func (ldr *LvlDBReader) GetTdByHash(hash common.Hash) (*big.Int, error) { number := rawdb.ReadHeaderNumber(ldr.ethDB, hash) if number == nil { - return nil, fmt.Errorf("unable to read header height for header hash %s", hash.String()) + return nil, fmt.Errorf("unable to read header height for header hash %s", hash) } td := rawdb.ReadTd(ldr.ethDB, hash, *number) if td == nil { - return nil, fmt.Errorf("unable to read total difficulty at height %d hash %s", number, hash.String()) + return nil, fmt.Errorf("unable to read total difficulty at height %d hash %s", number, hash) } return td, nil } diff --git a/pkg/rpc/ipc.go b/pkg/rpc/ipc.go index 57c29a5..3dd8a1b 100644 --- a/pkg/rpc/ipc.go +++ b/pkg/rpc/ipc.go @@ -79,7 +79,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Se if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return nil, nil, err } - log.Debug("IPC registered", "namespace", api.Namespace) + log.WithField("namespace", api.Namespace).Debug("IPC server registered") } // All APIs registered, start the IPC listener. listener, err := ipcListen(ipcEndpoint) diff --git a/pkg/service.go b/pkg/service.go index 5afb472..69a893a 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -212,25 +212,24 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error { for { select { case blockRange := <-sds.queue: - logrus.Infof("service worker %d received range (%d, %d) off of work queue, beginning processing", id, blockRange.Start, blockRange.Stop) + log := logrus.WithField("range", blockRange).WithField("worker", id) + log.Debug("processing range") prom.DecQueuedRanges() for j := blockRange.Start; j <= blockRange.Stop; j++ { if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { - logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err) + log.Errorf("error writing statediff at block %d: %v", j, err) } select { case <-sds.quitChan: - logrus.Infof("closing service worker %d\n"+ - "working in range (%d, %d)\n"+ - "last processed height: %d", id, blockRange.Start, blockRange.Stop, j) + log.Infof("closing service worker (last processed block: %d)", j) return default: - logrus.Infof("service worker %d finished processing statediff height %d in range (%d, %d)", id, j, blockRange.Start, blockRange.Stop) + log.Infof("Finished processing block %d", j) } } - logrus.Infof("service worker %d finished processing range (%d, %d)", id, blockRange.Start, blockRange.Stop) + log.Debugf("Finished processing range") case <-sds.quitChan: - logrus.Infof("closing the statediff service loop worker %d", id) + logrus.Debugf("closing the statediff service loop worker %d", id) return } } @@ -274,7 +273,7 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params statediff.Params) if err != nil { return nil, err } - logrus.Infof("sending state diff at block %s", blockHash.Hex()) + logrus.Infof("sending state diff at block %s", blockHash) // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -382,7 +381,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params statediff.Params // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params statediff.Params) error { - logrus.Infof("Writing state diff for block %s", blockHash.Hex()) + logrus.Infof("Writing state diff for block %s", blockHash) t := time.Now() currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash) if err != nil { @@ -460,7 +459,7 @@ func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params statediff. select { case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}: prom.IncQueuedRanges() - logrus.Infof("added range (%d, %d) to the worker queue", start, stop) + logrus.Infof("Added range (%d, %d) to the worker queue", start, stop) return nil case <-blocked.C: return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop) diff --git a/pkg/types.go b/pkg/types.go index 1fffcb6..6e6e1fa 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -20,6 +20,8 @@ package statediff import ( + "fmt" + sd "github.com/cerc-io/plugeth-statediff" ) @@ -28,3 +30,7 @@ type RangeRequest struct { Start, Stop uint64 Params sd.Params } + +func (r RangeRequest) String() string { + return fmt.Sprintf("[%d,%d]", r.Start, r.Stop) +}