Merge pull request #305 from cerc-io/v1.10.26-statediff-4.3.3-iannorden-0.0.1
logging tweaks
This commit is contained in:
commit
60a54c4639
@ -59,7 +59,17 @@ func (tx *BatchTx) flush() error {
|
|||||||
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
|
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
|
||||||
pq.Array(tx.ipldCache.Values))
|
pq.Array(tx.ipldCache.Values))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
log.Debug(insertError{"public.blocks", err, tx.stm,
|
||||||
|
struct {
|
||||||
|
blockNumbers []string
|
||||||
|
keys []string
|
||||||
|
values [][]byte
|
||||||
|
}{
|
||||||
|
tx.ipldCache.BlockNumbers,
|
||||||
|
tx.ipldCache.Keys,
|
||||||
|
tx.ipldCache.Values,
|
||||||
|
}}.Error())
|
||||||
|
return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"}
|
||||||
}
|
}
|
||||||
tx.ipldCache = models.IPLDBatch{}
|
tx.ipldCache = models.IPLDBatch{}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
|||||||
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
|
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
|
||||||
header.Timestamp, header.MhKey, 1, header.Coinbase)
|
header.Timestamp, header.MhKey, 1, header.Coinbase)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting header_cids entry: %v", err)
|
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
|
||||||
}
|
}
|
||||||
indexerMetrics.blocks.Inc(1)
|
indexerMetrics.blocks.Inc(1)
|
||||||
return nil
|
return nil
|
||||||
@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
|
|||||||
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
|
||||||
uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
|
uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
|
return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
|||||||
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
|
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
|
||||||
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
|
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
|
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
|
||||||
}
|
}
|
||||||
indexerMetrics.transactions.Inc(1)
|
indexerMetrics.transactions.Inc(1)
|
||||||
return nil
|
return nil
|
||||||
@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
|
|||||||
accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
|
accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
|
||||||
accessListElement.StorageKeys)
|
accessListElement.StorageKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting access_list_element entry: %v", err)
|
return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement}
|
||||||
}
|
}
|
||||||
indexerMetrics.accessListEntries.Inc(1)
|
indexerMetrics.accessListEntries.Inc(1)
|
||||||
return nil
|
return nil
|
||||||
@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
|||||||
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
|
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
|
||||||
rct.PostStatus, rct.LogRoot)
|
rct.PostStatus, rct.LogRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting receipt_cids entry: %w", err)
|
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
|
||||||
}
|
}
|
||||||
indexerMetrics.receipts.Inc(1)
|
indexerMetrics.receipts.Inc(1)
|
||||||
return nil
|
return nil
|
||||||
@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
|||||||
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
|
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
|
||||||
log.Topic2, log.Topic3, log.Data)
|
log.Topic2, log.Topic3, log.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting logs entry: %w", err)
|
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
|
||||||
}
|
}
|
||||||
indexerMetrics.logs.Inc(1)
|
indexerMetrics.logs.Inc(1)
|
||||||
}
|
}
|
||||||
@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
|||||||
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
|
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
|
||||||
stateNode.MhKey)
|
stateNode.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting state_cids entry: %v", err)
|
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel
|
|||||||
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
|
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
|
||||||
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting state_accounts entry: %v", err)
|
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
|||||||
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
|
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
|
||||||
storageCID.NodeType, true, storageCID.MhKey)
|
storageCID.NodeType, true, storageCID.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting storage_cids entry: %v", err)
|
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type insertError struct {
|
||||||
|
table string
|
||||||
|
err error
|
||||||
|
stmt string
|
||||||
|
arguments interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ error = insertError{}
|
||||||
|
|
||||||
|
func (dbe insertError) Error() string {
|
||||||
|
return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v",
|
||||||
|
dbe.table, dbe.err, dbe.stmt, dbe.arguments)
|
||||||
|
}
|
||||||
|
|||||||
@ -270,7 +270,12 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-chainEventCh:
|
||||||
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
|
lastHeight := statediffMetrics.lastEventHeight.Value()
|
||||||
|
nextHeight := int64(chainEvent.Block.Number().Uint64())
|
||||||
|
if nextHeight-lastHeight != 1 {
|
||||||
|
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
|
||||||
|
}
|
||||||
|
statediffMetrics.lastEventHeight.Update(nextHeight)
|
||||||
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
chainEventFwd <- chainEvent
|
chainEventFwd <- chainEvent
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
@ -336,8 +341,11 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
|
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
|
||||||
writeLoopParams.RUnlock()
|
writeLoopParams.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This is where the Postgres errors bubbles up to, so this is where we want to emit a comprehensie error trace/report
|
log.Error("statediff.Service.WriteLoop: processing error",
|
||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
"block height", currentBlock.Number().Uint64(),
|
||||||
|
"block hash", currentBlock.Hash().Hex(),
|
||||||
|
"error", err.Error(),
|
||||||
|
"worker", params.id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -807,28 +815,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// defer handling of commit/rollback for any return case
|
|
||||||
defer func() {
|
|
||||||
if err := tx.Submit(err); err != nil {
|
|
||||||
log.Error("batch transaction submission failed", "err", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
output := func(node types2.StateNode) error {
|
output := func(node types2.StateNode) error {
|
||||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||||
}
|
}
|
||||||
codeOutput := func(c types2.CodeAndCodeHash) error {
|
codeOutput := func(c types2.CodeAndCodeHash) error {
|
||||||
return sds.indexer.PushCodeAndCodeHash(tx, c)
|
return sds.indexer.PushCodeAndCodeHash(tx, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
|
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
|
||||||
NewStateRoot: block.Root(),
|
NewStateRoot: block.Root(),
|
||||||
OldStateRoot: parentRoot,
|
OldStateRoot: parentRoot,
|
||||||
}, params, output, codeOutput)
|
}, params, output, codeOutput)
|
||||||
|
// TODO this anti-pattern needs to be sorted out eventually
|
||||||
|
if err := tx.Submit(err); err != nil {
|
||||||
|
return fmt.Errorf("batch transaction submission failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
// allow dereferencing of parent, keep current locked as it should be the next parent
|
// allow dereferencing of parent, keep current locked as it should be the next parent
|
||||||
sds.BlockChain.UnlockTrie(parentRoot)
|
sds.BlockChain.UnlockTrie(parentRoot)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -839,8 +844,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
|||||||
err = sds.writeStateDiff(block, parentRoot, params)
|
err = sds.writeStateDiff(block, parentRoot, params)
|
||||||
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
||||||
// Retry only when the deadlock is detected.
|
// Retry only when the deadlock is detected.
|
||||||
if i != sds.maxRetry {
|
if i+1 < sds.maxRetry {
|
||||||
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
|
log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user