From 52c66b31e525abed0973a931b996d93fc0884ad9 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 11 Jan 2023 23:18:40 -0600 Subject: [PATCH] logging tweaks --- statediff/indexer/database/sql/batch_tx.go | 12 +++++++- statediff/indexer/database/sql/writer.go | 32 +++++++++++++++------ statediff/service.go | 33 +++++++++++++--------- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index f5cbdeadc..5f9d09b25 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -59,7 +59,17 @@ func (tx *BatchTx) flush() error { _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) if err != nil { - return err + log.Debug(insertError{"public.blocks", err, tx.stm, + struct { + blockNumbers []string + keys []string + values [][]byte + }{ + tx.ipldCache.BlockNumbers, + tx.ipldCache.Keys, + tx.ipldCache.Values, + }}.Error()) + return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"} } tx.ipldCache = models.IPLDBatch{} return nil diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 70d8ba45f..c1a67f2f8 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) if err != nil { - return fmt.Errorf("error upserting header_cids entry: %v", err) + return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } indexerMetrics.blocks.Inc(1) return nil @@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { - return fmt.Errorf("error upserting uncle_cids entry: %v", err) + return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle} } return nil } @@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) if err != nil { - return fmt.Errorf("error upserting transaction_cids entry: %v", err) + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} } indexerMetrics.transactions.Inc(1) return nil @@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { - return fmt.Errorf("error upserting access_list_element entry: %v", err) + return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement} } indexerMetrics.accessListEntries.Inc(1) return nil @@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { - return fmt.Errorf("error upserting receipt_cids entry: %w", err) + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} } indexerMetrics.receipts.Inc(1) return nil @@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { - return fmt.Errorf("error upserting logs entry: %w", err) + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} } indexerMetrics.logs.Inc(1) } @@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { - return fmt.Errorf("error upserting state_cids entry: %v", err) + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} } return nil } @@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { - return fmt.Errorf("error upserting state_accounts entry: %v", err) + return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} } return nil } @@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { - return fmt.Errorf("error upserting storage_cids entry: %v", err) + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} } return nil } + +type insertError struct { + table string + err error + stmt string + arguments interface{} +} + +var _ error = insertError{} + +func (dbe insertError) Error() string { + return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v", + dbe.table, dbe.err, dbe.stmt, dbe.arguments) +} diff --git a/statediff/service.go b/statediff/service.go index a7a1b8384..d6f184184 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -270,7 +270,12 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { for { select { case chainEvent := <-chainEventCh: - statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) + lastHeight := statediffMetrics.lastEventHeight.Value() + nextHeight := int64(chainEvent.Block.Number().Uint64()) + if nextHeight-lastHeight != 1 { + log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) + } + statediffMetrics.lastEventHeight.Update(nextHeight) statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: @@ -336,8 +341,11 @@ func (sds *Service) writeLoopWorker(params workerParams) { err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params) writeLoopParams.RUnlock() if err != nil { - // This is where the Postgres errors bubbles up to, so this is where we want to emit a comprehensie error trace/report - log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) + log.Error("statediff.Service.WriteLoop: processing error", + "block height", currentBlock.Number().Uint64(), + "block hash", currentBlock.Hash().Hex(), + "error", err.Error(), + "worker", params.id) continue } @@ -807,28 +815,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if err != nil { return err } - // defer handling of commit/rollback for any return case - defer func() { - if err := tx.Submit(err); err != nil { - log.Error("batch transaction submission failed", "err", err) - } - }() + output := func(node types2.StateNode) error { return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } codeOutput := func(c types2.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, }, params, output, codeOutput) + // TODO this anti-pattern needs to be sorted out eventually + if err := tx.Submit(err); err != nil { + return fmt.Errorf("batch transaction submission failed: %s", err.Error()) + } // allow dereferencing of parent, keep current locked as it should be the next parent sds.BlockChain.UnlockTrie(parentRoot) - if err != nil { - return err - } return nil } @@ -839,8 +844,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo err = sds.writeStateDiff(block, parentRoot, params) if err != nil && strings.Contains(err.Error(), deadlockDetected) { // Retry only when the deadlock is detected. - if i != sds.maxRetry { - log.Info("dead lock detected while writing statediff", "err", err, "retry number", i) + if i+1 < sds.maxRetry { + log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i) } continue }