simplify func names, clean up metrics
cleanup
This commit is contained in:
parent
5c59e09423
commit
ddf9aff945
50
builder.go
50
builder.go
@ -112,11 +112,11 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, outp
|
||||
// Load tries for old and new states
|
||||
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating trie for oldStateRoot: %v", err)
|
||||
return fmt.Errorf("error creating trie for oldStateRoot: %w", err)
|
||||
}
|
||||
newTrie, err := sdb.StateCache.OpenTrie(args.NewStateRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating trie for newStateRoot: %v", err)
|
||||
return fmt.Errorf("error creating trie for newStateRoot: %w", err)
|
||||
}
|
||||
|
||||
// we do two state trie iterations:
|
||||
@ -135,20 +135,20 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, outp
|
||||
}
|
||||
|
||||
logger := log.New("hash", args.BlockHash.String(), "number", args.BlockNumber)
|
||||
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger, nil)
|
||||
return sdb.BuildStateDiff(iterPairs, params, output, ipldOutput, logger, nil)
|
||||
}
|
||||
|
||||
func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params,
|
||||
func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params,
|
||||
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) error {
|
||||
logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes")
|
||||
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer)
|
||||
logger.Debug("statediff BEGIN BuildStateDiff")
|
||||
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer)
|
||||
// collect a slice of all the nodes that were touched and exist at B (B-A)
|
||||
// a map of their leafkey to all the accounts that were touched and exist at B
|
||||
// and a slice of all the paths for the nodes in both of the above sets
|
||||
diffAccountsAtB, err := sdb.createdAndUpdatedState(
|
||||
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger, prefixPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
|
||||
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
|
||||
}
|
||||
|
||||
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
|
||||
@ -157,14 +157,14 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
|
||||
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
|
||||
params.watchedAddressesLeafPaths, output, logger, prefixPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
|
||||
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err)
|
||||
}
|
||||
|
||||
// collect and sort the leafkey keys for both account mappings into a slice
|
||||
t := time.Now()
|
||||
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
|
||||
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
|
||||
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds()))
|
||||
logger.Debug("statediff BuildStateDiff sort", "duration", time.Since(t))
|
||||
|
||||
// and then find the intersection of these keys
|
||||
// these are the leafkeys for the accounts which exist at both A and B but are different
|
||||
@ -172,19 +172,19 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
|
||||
// and leaving the truly created or deleted keys in place
|
||||
t = time.Now()
|
||||
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
|
||||
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms",
|
||||
len(updatedKeys),
|
||||
time.Since(t).Milliseconds()))
|
||||
logger.Debug("statediff BuildStateDiff intersection",
|
||||
"count", len(updatedKeys),
|
||||
"duration", time.Since(t))
|
||||
|
||||
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
|
||||
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error building diff for updated accounts: %v", err)
|
||||
return fmt.Errorf("error building diff for updated accounts: %w", err)
|
||||
}
|
||||
// build the diff nodes for created accounts
|
||||
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error building diff for created accounts: %v", err)
|
||||
return fmt.Errorf("error building diff for created accounts: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -255,7 +255,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
|
||||
prevBlob = nodeVal
|
||||
}
|
||||
}
|
||||
logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
|
||||
logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
|
||||
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
|
||||
return diffAccountsAtB, it.Error()
|
||||
}
|
||||
@ -318,7 +318,7 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
|
||||
storageDiff := make([]sdtypes.StorageLeafNode, 0)
|
||||
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %v", leafKey, err)
|
||||
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
|
||||
}
|
||||
diff.StorageDiff = storageDiff
|
||||
if err := output(diff); err != nil {
|
||||
@ -339,8 +339,10 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
|
||||
// those account maps to remove the accounts which were updated
|
||||
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
|
||||
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
|
||||
logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
|
||||
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
|
||||
logger.Debug("statediff BEGIN buildAccountUpdates",
|
||||
"creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
|
||||
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
|
||||
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
|
||||
var err error
|
||||
for _, key := range updatedKeys {
|
||||
createdAcc := creations[key]
|
||||
@ -352,7 +354,7 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac
|
||||
StorageNodeAppender(&storageDiff), ipldOutput,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %s\r\nerror: %v", key, err)
|
||||
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
|
||||
}
|
||||
}
|
||||
if err = output(sdtypes.StateLeafNode{
|
||||
@ -374,7 +376,8 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac
|
||||
func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap, output sdtypes.StateNodeSink,
|
||||
ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
|
||||
logger.Debug("statediff BEGIN buildAccountCreations")
|
||||
defer metrics.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics.IndexerMetrics.BuildAccountCreationsTimer)
|
||||
defer metrics.ReportAndUpdateDuration("statediff END buildAccountCreations",
|
||||
time.Now(), logger, metrics.IndexerMetrics.BuildAccountCreationsTimer)
|
||||
for _, val := range accounts {
|
||||
diff := sdtypes.StateLeafNode{
|
||||
AccountWrapper: val,
|
||||
@ -385,14 +388,14 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap,
|
||||
storageDiff := make([]sdtypes.StorageLeafNode, 0)
|
||||
err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldOutput)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %v", val.LeafKey, err)
|
||||
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err)
|
||||
}
|
||||
diff.StorageDiff = storageDiff
|
||||
// emit codehash => code mappings for contract
|
||||
codeHash := common.BytesToHash(val.Account.CodeHash)
|
||||
code, err := sdb.StateCache.ContractCode(codeHash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||
return fmt.Errorf("failed to retrieve code for codehash %x\r\n error: %w", codeHash, err)
|
||||
}
|
||||
if err := ipldOutput(sdtypes.IPLD{
|
||||
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
|
||||
@ -431,8 +434,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sd
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node interator
|
||||
// including intermediate nodes can be turned on or off
|
||||
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
|
||||
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink,
|
||||
ipldOutput sdtypes.IPLDSink) error {
|
||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesFromTrieTimer)
|
||||
|
@ -175,7 +175,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
log.Trace(traceMsg)
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
@ -72,9 +72,8 @@ type IndexerMetricsHandles struct {
|
||||
StateStoreCodeProcessingTimer metrics.Timer
|
||||
|
||||
// Fine-grained code timers
|
||||
BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer
|
||||
BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer
|
||||
CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer
|
||||
BuildStateDiffTimer metrics.Timer
|
||||
CreatedAndUpdatedStateTimer metrics.Timer
|
||||
DeletedOrUpdatedStateTimer metrics.Timer
|
||||
BuildAccountUpdatesTimer metrics.Timer
|
||||
BuildAccountCreationsTimer metrics.Timer
|
||||
@ -88,11 +87,8 @@ type IndexerMetricsHandles struct {
|
||||
DeletedOrUpdatedStorageTimer metrics.Timer
|
||||
CreatedAndUpdatedStorageTimer metrics.Timer
|
||||
BuildStorageNodesIncrementalTimer metrics.Timer
|
||||
BuildStateTrieObjectTimer metrics.Timer
|
||||
BuildStateTrieTimer metrics.Timer
|
||||
BuildStateDiffObjectTimer metrics.Timer
|
||||
WriteStateDiffObjectTimer metrics.Timer
|
||||
CreatedAndUpdatedStateTimer metrics.Timer
|
||||
BuildStorageNodesEventualTimer metrics.Timer
|
||||
BuildStorageNodesFromTrieTimer metrics.Timer
|
||||
BuildRemovedAccountStorageNodesTimer metrics.Timer
|
||||
@ -113,9 +109,8 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
|
||||
UncleProcessingTimer: metrics.NewTimer(),
|
||||
TxAndRecProcessingTimer: metrics.NewTimer(),
|
||||
StateStoreCodeProcessingTimer: metrics.NewTimer(),
|
||||
BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(),
|
||||
BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(),
|
||||
BuildStateDiffTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
|
||||
DeletedOrUpdatedStateTimer: metrics.NewTimer(),
|
||||
BuildAccountUpdatesTimer: metrics.NewTimer(),
|
||||
BuildAccountCreationsTimer: metrics.NewTimer(),
|
||||
@ -129,11 +124,8 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
|
||||
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
|
||||
BuildStateTrieObjectTimer: metrics.NewTimer(),
|
||||
BuildStateTrieTimer: metrics.NewTimer(),
|
||||
BuildStateDiffObjectTimer: metrics.NewTimer(),
|
||||
WriteStateDiffObjectTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesEventualTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
|
||||
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
|
||||
@ -152,9 +144,8 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff"), ctx.BuildStateDiffTimer)
|
||||
reg.Register(metricName(subsys, "t_created_and_update_state"), ctx.CreatedAndUpdatedStateTimer)
|
||||
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
|
||||
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
|
||||
@ -168,8 +159,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
|
||||
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
|
||||
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
|
||||
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
|
||||
reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
|
||||
@ -252,7 +241,7 @@ func (met *dbMetricsHandles) Update(stats DbStats) {
|
||||
|
||||
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
|
||||
since := UpdateDuration(start, timer)
|
||||
logger.Trace(msg, "duration", since.Milliseconds())
|
||||
logger.Debug(msg, "duration", since)
|
||||
}
|
||||
|
||||
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
|
||||
|
@ -17,7 +17,6 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/utils/log"
|
||||
@ -29,12 +28,12 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
||||
logger := log.New("hash", block.Hash().String(), "number", block.NumberU64())
|
||||
|
||||
defaultStatediffMetrics.underway.Inc(1)
|
||||
logger.Debug(fmt.Sprintf("writeStateDiff BEGIN [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||
defaultStatediffMetrics.underway.Count(),
|
||||
defaultStatediffMetrics.succeeded.Count(),
|
||||
defaultStatediffMetrics.failed.Count(),
|
||||
defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
))
|
||||
logger.Debug("writeStateDiff BEGIN",
|
||||
"underway", defaultStatediffMetrics.underway.Count(),
|
||||
"succeeded", defaultStatediffMetrics.succeeded.Count(),
|
||||
"failed", defaultStatediffMetrics.failed.Count(),
|
||||
"total_time", defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
)
|
||||
|
||||
return start, logger
|
||||
}
|
||||
@ -49,13 +48,14 @@ func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Durat
|
||||
}
|
||||
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
||||
|
||||
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||
duration.Milliseconds(), nil != err,
|
||||
defaultStatediffMetrics.underway.Count(),
|
||||
defaultStatediffMetrics.succeeded.Count(),
|
||||
defaultStatediffMetrics.failed.Count(),
|
||||
defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
))
|
||||
logger.Debug("writeStateDiff END",
|
||||
"duration", duration,
|
||||
"error", err != nil,
|
||||
"underway", defaultStatediffMetrics.underway.Count(),
|
||||
"succeeded", defaultStatediffMetrics.succeeded.Count(),
|
||||
"failed", defaultStatediffMetrics.failed.Count(),
|
||||
"total_time", defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
)
|
||||
|
||||
return duration
|
||||
}
|
||||
@ -67,10 +67,10 @@ func countApiRequestBegin(methodName string, blockHashOrNumber interface{}) (tim
|
||||
defaultStatediffMetrics.apiRequests.Inc(1)
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Inc(1)
|
||||
|
||||
logger.Debug(fmt.Sprintf("statediff API BEGIN [underway=%d, requests=%d])",
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
defaultStatediffMetrics.apiRequests.Count(),
|
||||
))
|
||||
logger.Debug("statediff API BEGIN",
|
||||
"underway", defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
"requests", defaultStatediffMetrics.apiRequests.Count(),
|
||||
)
|
||||
|
||||
return start, logger
|
||||
}
|
||||
@ -79,11 +79,12 @@ func countApiRequestEnd(start time.Time, logger log.Logger, err error) time.Dura
|
||||
duration := time.Since(start)
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Dec(1)
|
||||
|
||||
logger.Debug(fmt.Sprintf("statediff API END (duration=%dms, err=%t) [underway=%d, requests=%d]",
|
||||
duration.Milliseconds(), nil != err,
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
defaultStatediffMetrics.apiRequests.Count(),
|
||||
))
|
||||
logger.Debug("statediff API END",
|
||||
"duration", duration,
|
||||
"error", err != nil,
|
||||
"underway", defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
"requests", defaultStatediffMetrics.apiRequests.Count(),
|
||||
)
|
||||
|
||||
return duration
|
||||
}
|
||||
|
26
service.go
26
service.go
@ -222,10 +222,10 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
for {
|
||||
select {
|
||||
case event := <-chainEventCh:
|
||||
log.Debug("Chain event received", "event", event)
|
||||
// First process metrics for chain events, then forward to workers
|
||||
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
|
||||
block := event.Block
|
||||
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
|
||||
nextHeight := int64(block.Number().Uint64())
|
||||
if nextHeight-lastHeight != 1 {
|
||||
log.Warn("Received block out-of-order", "next", nextHeight, "last", lastHeight)
|
||||
@ -252,27 +252,28 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
|
||||
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) {
|
||||
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
||||
log.Info("Writing genesis state diff", "number", genesisBlockNumber, "worker", workerId)
|
||||
log.Info("Writing genesis state diff", "number", genesisBlockNumber)
|
||||
sds.writeLoopParams.RLock()
|
||||
defer sds.writeLoopParams.RUnlock()
|
||||
|
||||
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params)
|
||||
if err != nil {
|
||||
log.Error("failed to write state diff", "number",
|
||||
genesisBlockNumber, "error", err.Error(), "worker", workerId)
|
||||
genesisBlockNumber, "error", err.Error())
|
||||
return
|
||||
}
|
||||
defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||
}
|
||||
|
||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
log := log.New("context", "statediff writing", "worker", params.id)
|
||||
defer params.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case chainEvent := <-params.chainEventCh:
|
||||
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
|
||||
log.Debug("Chain event received", "event", chainEvent)
|
||||
|
||||
block := chainEvent.Block
|
||||
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
||||
@ -283,10 +284,10 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
|
||||
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
|
||||
if parent.Number().Uint64() == genesisBlockNumber {
|
||||
sds.writeGenesisStateDiff(parent, params.id)
|
||||
sds.writeGenesisStateDiff(parent, log)
|
||||
}
|
||||
|
||||
log.Info("Writing state diff", "number", block.Number().Uint64(), "worker", params.id)
|
||||
log.Info("Writing state diff", "number", block.Number().Uint64())
|
||||
sds.writeLoopParams.RLock()
|
||||
err := sds.writeStateDiffWithRetry(block, parent.Root(), sds.writeLoopParams.Params)
|
||||
sds.writeLoopParams.RUnlock()
|
||||
@ -294,15 +295,14 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
log.Error("failed to write state diff",
|
||||
"number", block.Number().Uint64(),
|
||||
"hash", block.Hash().String(),
|
||||
"error", err.Error(),
|
||||
"worker", params.id)
|
||||
"error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// FIXME: reported height will be non-monotonic with concurrent workers
|
||||
defaultStatediffMetrics.lastStatediffHeight.Update(int64(block.Number().Uint64()))
|
||||
case <-sds.QuitChan:
|
||||
log.Info("Quitting the statediff writing process", "worker", params.id)
|
||||
log.Info("Quitting")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -325,14 +325,14 @@ func (sds *Service) PublishLoop(chainEventCh chan core.ChainEvent) {
|
||||
//Notify chain event channel of events
|
||||
case event := <-chainEventCh:
|
||||
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
log.Debug("Chain event received", "event", event)
|
||||
block := event.Block
|
||||
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
|
||||
// if we don't have any subscribers, do not process a statediff
|
||||
if atomic.LoadInt32(&sds.subscribers) == 0 {
|
||||
log.Debug("Currently no subscribers; processing is halted")
|
||||
continue
|
||||
}
|
||||
|
||||
block := event.Block
|
||||
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
||||
if parent == nil {
|
||||
log.Error("Parent block is nil, skipping this block", "number", block.Number())
|
||||
@ -703,7 +703,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
defer func() {
|
||||
// This is very noisy so we log at Trace.
|
||||
since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
|
||||
logger.Trace("statediff output", "duration", since.Milliseconds())
|
||||
logger.Trace("statediff output", "duration", since)
|
||||
}()
|
||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user