diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index f5cbdeadc..5f9d09b25 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -59,7 +59,17 @@ func (tx *BatchTx) flush() error { _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) if err != nil { - return err + log.Debug(insertError{"public.blocks", err, tx.stm, + struct { + blockNumbers []string + keys []string + values [][]byte + }{ + tx.ipldCache.BlockNumbers, + tx.ipldCache.Keys, + tx.ipldCache.Values, + }}.Error()) + return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"} } tx.ipldCache = models.IPLDBatch{} return nil diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 70d8ba45f..c1a67f2f8 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) if err != nil { - return fmt.Errorf("error upserting header_cids entry: %v", err) + return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } indexerMetrics.blocks.Inc(1) return nil @@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { - return fmt.Errorf("error upserting uncle_cids entry: %v", err) + return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle} } return nil } @@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) if err != nil { - return fmt.Errorf("error upserting transaction_cids entry: %v", err) + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} } indexerMetrics.transactions.Inc(1) return nil @@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { - return fmt.Errorf("error upserting access_list_element entry: %v", err) + return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement} } indexerMetrics.accessListEntries.Inc(1) return nil @@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { - return fmt.Errorf("error upserting receipt_cids entry: %w", err) + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} } indexerMetrics.receipts.Inc(1) return nil @@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { - return fmt.Errorf("error upserting logs entry: %w", err) + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} } indexerMetrics.logs.Inc(1) } @@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { - return fmt.Errorf("error upserting state_cids entry: %v", err) + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} } return nil } @@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { - return fmt.Errorf("error upserting state_accounts entry: %v", err) + return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} } return nil } @@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { - return fmt.Errorf("error upserting storage_cids entry: %v", err) + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} } return nil } + +type insertError struct { + table string + err error + stmt string + arguments interface{} +} + +var _ error = insertError{} + +func (dbe insertError) Error() string { + return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v", + dbe.table, dbe.err, dbe.stmt, dbe.arguments) +} diff --git a/statediff/service.go b/statediff/service.go index 6001f8813..0165a1baf 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -267,8 +267,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { for { select { case chainEvent := <-chainEventCh: - defaultStatediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) - defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + lastHeight := statediffMetrics.lastEventHeight.Value() + nextHeight := int64(chainEvent.Block.Number().Uint64()) + if nextHeight-lastHeight != 1 { + log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) + } + statediffMetrics.lastEventHeight.Update(nextHeight) + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: log.Error("Error from chain event subscription", "error", err) @@ -333,8 +338,11 @@ func (sds *Service) writeLoopWorker(params workerParams) { err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params) writeLoopParams.RUnlock() if err != nil { - // This is where the Postgres errors bubbles up to, so this is where we want to emit a comprehensie error trace/report - log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) + log.Error("statediff.Service.WriteLoop: processing error", + "block height", currentBlock.Number().Uint64(), + "block hash", currentBlock.Hash().Hex(), + "error", err.Error(), + "worker", params.id) continue } @@ -789,12 +797,12 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - start, logger := countStateDiffBegin(block) - // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts var err error var tx interfaces.Batch + start, logger := countStateDiffBegin(block) + defer countStateDiffEnd(start, logger, err) if params.IncludeTD { totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) } @@ -805,29 +813,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if err != nil { return err } - // defer handling of commit/rollback for any return case - defer func() { - if err := tx.Submit(err); err != nil { - logger.Error("batch transaction submission failed", "err", err) - } - countStateDiffEnd(start, logger, err) - }() + output := func(node types2.StateNode) error { return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } codeOutput := func(c types2.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, }, params, output, codeOutput) + // TODO this anti-pattern needs to be sorted out eventually + if err := tx.Submit(err); err != nil { + return fmt.Errorf("batch transaction submission failed: %s", err.Error()) + } // allow dereferencing of parent, keep current locked as it should be the next parent sds.BlockChain.UnlockTrie(parentRoot) - if err != nil { - return err - } return nil } @@ -838,8 +842,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo err = sds.writeStateDiff(block, parentRoot, params) if err != nil && strings.Contains(err.Error(), deadlockDetected) { // Retry only when the deadlock is detected. - if i != sds.maxRetry { - log.Info("dead lock detected while writing statediff", "err", err, "retry number", i) + if i+1 < sds.maxRetry { + log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i) } continue }