cleanup, fix metrics
This commit is contained in:
parent
ddfc3e6509
commit
83b0c11b25
30
builder.go
30
builder.go
@ -340,7 +340,7 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
|
|||||||
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
|
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
|
||||||
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
|
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
|
||||||
logger.Debug("statediff BEGIN buildAccountUpdates",
|
logger.Debug("statediff BEGIN buildAccountUpdates",
|
||||||
"creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
|
"creations", len(creations), "deletions", len(deletions), "updated", len(updatedKeys))
|
||||||
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
|
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
|
||||||
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
|
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
|
||||||
var err error
|
var err error
|
||||||
@ -417,7 +417,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap,
|
|||||||
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sdtypes.StorageNodeSink,
|
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sdtypes.StorageNodeSink,
|
||||||
ipldOutput sdtypes.IPLDSink) error {
|
ipldOutput sdtypes.IPLDSink) error {
|
||||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
|
||||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
if sr == emptyContractRoot {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Debug("Storage root for eventual diff", "root", sr.String())
|
log.Debug("Storage root for eventual diff", "root", sr.String())
|
||||||
@ -427,11 +427,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sd
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
it := sTrie.NodeIterator(make([]byte, 0))
|
it := sTrie.NodeIterator(make([]byte, 0))
|
||||||
err = sdb.buildStorageNodesFromTrie(it, output, ipldOutput)
|
return sdb.buildStorageNodesFromTrie(it, output, ipldOutput)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
|
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
|
||||||
@ -480,21 +476,17 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, paren
|
|||||||
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
|
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
|
||||||
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output sdtypes.StorageNodeSink) error {
|
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output sdtypes.StorageNodeSink) error {
|
||||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
|
||||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
if sr == emptyContractRoot {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Debug("Storage Root For Removed Diffs", "root", sr.String())
|
log.Debug("Storage root for removed diffs", "root", sr.String())
|
||||||
sTrie, err := sdb.StateCache.OpenTrie(sr)
|
sTrie, err := sdb.StateCache.OpenTrie(sr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("error in build removed account storage diffs", "error", err)
|
log.Info("error in build removed account storage diffs", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
it := sTrie.NodeIterator(make([]byte, 0))
|
it := sTrie.NodeIterator(make([]byte, 0))
|
||||||
err = sdb.buildRemovedStorageNodesFromTrie(it, output)
|
return sdb.buildRemovedStorageNodesFromTrie(it, output)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
|
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
|
||||||
@ -521,10 +513,10 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
|
|||||||
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, newroot common.Hash, output sdtypes.StorageNodeSink,
|
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, newroot common.Hash, output sdtypes.StorageNodeSink,
|
||||||
ipldOutput sdtypes.IPLDSink) error {
|
ipldOutput sdtypes.IPLDSink) error {
|
||||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer)
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer)
|
||||||
if bytes.Equal(newroot.Bytes(), oldroot.Bytes()) {
|
if newroot == oldroot {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Trace("Storage Roots for Incremental Diff", "old", oldroot.String(), "new", newroot.String())
|
log.Trace("Storage roots for incremental diff", "old", oldroot.String(), "new", newroot.String())
|
||||||
oldTrie, err := sdb.StateCache.OpenTrie(oldroot)
|
oldTrie, err := sdb.StateCache.OpenTrie(oldroot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -539,12 +531,8 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, n
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
|
return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
|
||||||
diffSlotsAtB, output)
|
diffSlotsAtB, output)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output sdtypes.StorageNodeSink,
|
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output sdtypes.StorageNodeSink,
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package dump
|
package dump
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -201,7 +200,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
||||||
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
|
if preparedHash != unclesHash {
|
||||||
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
||||||
}
|
}
|
||||||
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package file
|
package file
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
@ -257,7 +256,7 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
||||||
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
|
if preparedHash != unclesHash {
|
||||||
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
||||||
}
|
}
|
||||||
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
||||||
|
@ -241,7 +241,8 @@ func (met *dbMetricsHandles) Update(stats DbStats) {
|
|||||||
|
|
||||||
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
|
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
|
||||||
since := UpdateDuration(start, timer)
|
since := UpdateDuration(start, timer)
|
||||||
logger.Debug(msg, "duration", since)
|
// This is very noisy so we log at Trace.
|
||||||
|
logger.Trace(msg, "duration", since)
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
|
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
|
||||||
|
@ -121,6 +121,6 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) {
|
|||||||
// rollback sql transaction and log any error
|
// rollback sql transaction and log any error
|
||||||
func rollback(ctx context.Context, tx Tx) {
|
func rollback(ctx context.Context, tx Tx) {
|
||||||
if err := tx.Rollback(ctx); err != nil {
|
if err := tx.Rollback(ctx); err != nil {
|
||||||
log.Error(err.Error())
|
log.Error("error during rollback", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
package sql
|
package sql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
@ -256,7 +255,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
preparedHash := crypto.Keccak256Hash(uncleEncoding)
|
||||||
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
|
if preparedHash != unclesHash {
|
||||||
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.String(), unclesHash.String())
|
||||||
}
|
}
|
||||||
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
||||||
|
18
service.go
18
service.go
@ -260,7 +260,7 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Log
|
|||||||
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params)
|
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to write state diff", "number",
|
log.Error("failed to write state diff", "number",
|
||||||
genesisBlockNumber, "error", err.Error())
|
genesisBlockNumber, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||||
@ -273,8 +273,6 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
select {
|
select {
|
||||||
case event := <-params.chainEventCh:
|
case event := <-params.chainEventCh:
|
||||||
block := event.Block
|
block := event.Block
|
||||||
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
|
|
||||||
|
|
||||||
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
||||||
if parent == nil {
|
if parent == nil {
|
||||||
log.Error("Parent block is nil, skipping this block", "number", block.Number())
|
log.Error("Parent block is nil, skipping this block", "number", block.Number())
|
||||||
@ -294,7 +292,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
log.Error("failed to write state diff",
|
log.Error("failed to write state diff",
|
||||||
"number", block.Number(),
|
"number", block.Number(),
|
||||||
"hash", block.Hash(),
|
"hash", block.Hash(),
|
||||||
"error", err.Error())
|
"error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,7 +369,7 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
|
|||||||
payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
|
payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("statediff processing error",
|
log.Error("statediff processing error",
|
||||||
"number", currentBlock.Number(), "parameters", params, "error", err.Error())
|
"number", currentBlock.Number(), "parameters", params, "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for id, sub := range subs {
|
for id, sub := range subs {
|
||||||
@ -700,15 +698,13 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
}
|
}
|
||||||
|
|
||||||
output := func(node types2.StateLeafNode) error {
|
output := func(node types2.StateLeafNode) error {
|
||||||
defer func() {
|
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), logger,
|
||||||
// This is very noisy so we log at Trace.
|
metrics.IndexerMetrics.OutputTimer)
|
||||||
since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
|
|
||||||
logger.Trace("statediff output", "duration", since)
|
|
||||||
}()
|
|
||||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||||
}
|
}
|
||||||
ipldOutput := func(c types2.IPLD) error {
|
ipldOutput := func(c types2.IPLD) error {
|
||||||
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer)
|
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger,
|
||||||
|
metrics.IndexerMetrics.IPLDOutputTimer)
|
||||||
return sds.indexer.PushIPLD(tx, c)
|
return sds.indexer.PushIPLD(tx, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user