Merge remote-tracking branch 'origin/v1.10.26-statediff-v4' into telackey/logging_001

# Conflicts:
#	statediff/service.go
This commit is contained in:
Thomas E Lackey 2023-01-12 10:48:17 -06:00
commit 0cb8ff4f64
3 changed files with 56 additions and 28 deletions

View File

@ -59,7 +59,17 @@ func (tx *BatchTx) flush() error {
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
pq.Array(tx.ipldCache.Values))
if err != nil {
return err
log.Debug(insertError{"public.blocks", err, tx.stm,
struct {
blockNumbers []string
keys []string
values [][]byte
}{
tx.ipldCache.BlockNumbers,
tx.ipldCache.Keys,
tx.ipldCache.Values,
}}.Error())
return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"}
}
tx.ipldCache = models.IPLDBatch{}
return nil

View File

@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
header.Timestamp, header.MhKey, 1, header.Coinbase)
if err != nil {
return fmt.Errorf("error upserting header_cids entry: %v", err)
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
}
indexerMetrics.blocks.Inc(1)
return nil
@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
if err != nil {
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle}
}
return nil
}
@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil {
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
}
indexerMetrics.transactions.Inc(1)
return nil
@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
accessListElement.StorageKeys)
if err != nil {
return fmt.Errorf("error upserting access_list_element entry: %v", err)
return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement}
}
indexerMetrics.accessListEntries.Inc(1)
return nil
@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot)
if err != nil {
return fmt.Errorf("error upserting receipt_cids entry: %w", err)
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
}
indexerMetrics.receipts.Inc(1)
return nil
@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
log.Topic2, log.Topic3, log.Data)
if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err)
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
}
indexerMetrics.logs.Inc(1)
}
@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
stateNode.MhKey)
if err != nil {
return fmt.Errorf("error upserting state_cids entry: %v", err)
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
}
return nil
}
@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err)
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
}
return nil
}
@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
storageCID.NodeType, true, storageCID.MhKey)
if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err)
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
}
return nil
}
type insertError struct {
table string
err error
stmt string
arguments interface{}
}
var _ error = insertError{}
func (dbe insertError) Error() string {
return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v",
dbe.table, dbe.err, dbe.stmt, dbe.arguments)
}

View File

@ -267,8 +267,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
for {
select {
case chainEvent := <-chainEventCh:
defaultStatediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
lastHeight := statediffMetrics.lastEventHeight.Value()
nextHeight := int64(chainEvent.Block.Number().Uint64())
if nextHeight-lastHeight != 1 {
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
}
statediffMetrics.lastEventHeight.Update(nextHeight)
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
@ -333,8 +338,11 @@ func (sds *Service) writeLoopWorker(params workerParams) {
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil {
// This is where the Postgres errors bubbles up to, so this is where we want to emit a comprehensie error trace/report
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
log.Error("statediff.Service.WriteLoop: processing error",
"block height", currentBlock.Number().Uint64(),
"block hash", currentBlock.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
continue
}
@ -789,12 +797,12 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
start, logger := countStateDiffBegin(block)
// log.Info("Writing state diff", "block height", block.Number().Uint64())
var totalDifficulty *big.Int
var receipts types.Receipts
var err error
var tx interfaces.Batch
start, logger := countStateDiffBegin(block)
defer countStateDiffEnd(start, logger, err)
if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
}
@ -805,29 +813,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
if err != nil {
return err
}
// defer handling of commit/rollback for any return case
defer func() {
if err := tx.Submit(err); err != nil {
logger.Error("batch transaction submission failed", "err", err)
}
countStateDiffEnd(start, logger, err)
}()
output := func(node types2.StateNode) error {
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
codeOutput := func(c types2.CodeAndCodeHash) error {
return sds.indexer.PushCodeAndCodeHash(tx, c)
}
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
}, params, output, codeOutput)
// TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil {
return fmt.Errorf("batch transaction submission failed: %s", err.Error())
}
// allow dereferencing of parent, keep current locked as it should be the next parent
sds.BlockChain.UnlockTrie(parentRoot)
if err != nil {
return err
}
return nil
}
@ -838,8 +842,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
if i != sds.maxRetry {
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
if i+1 < sds.maxRetry {
log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i)
}
continue
}