Merge pull request #65 from vulcanize/v1.10.2-statediff-0.0.19

improve error logging; handle PushBlock internal err
This commit is contained in:
Ian Norden 2021-04-15 13:49:46 -05:00 committed by GitHub
commit d067d2d007
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 54 deletions

View File

@ -79,6 +79,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff` flag is used to turn on the service
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db` is the connection string for the Postgres database to write to
`--statediff.dbnodeid` is the node id to use in the Postgres database
`--statediff.dbclientname` is the client name to use in the Postgres database

View File

@ -111,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Generate the block iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, err
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
if len(txNodes) != len(txTrieNodes) && len(rctNodes) != len(rctTrieNodes) && len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), transaction trie nodes (%d), receipts (%d), and receipt trie nodes (%d)to be equal", len(txNodes), len(txTrieNodes), len(rctNodes), len(rctTrieNodes))
@ -124,7 +124,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
blocktx := BlockTx{
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
}
}()
blockTx := &BlockTx{
dbtx: tx,
// handle transaction commit or rollback for any return case
Close: func(err error) error {
@ -164,7 +172,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil {
err = sdi.processUncles(tx, headerID, height, uncleNodes)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
@ -172,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
if err := sdi.processReceiptsAndTxs(tx, processArgs{
err = sdi.processReceiptsAndTxs(tx, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
@ -181,7 +190,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
rctTrieNodes: rctTrieNodes,
txNodes: txNodes,
txTrieNodes: txTrieNodes,
}); err != nil {
})
if err != nil {
return nil, err
}
tDiff = time.Since(t)
@ -189,9 +199,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
blocktx.BlockNumber = height
blocktx.headerID = headerID
return &blocktx, err
blockTx.BlockNumber = height
blockTx.headerID = headerID
return blockTx, err
}
// processHeader publishes and indexes a header IPLD in Postgres
@ -199,7 +209,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
// publish header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return 0, err
return 0, fmt.Errorf("error publishing header IPLD: %v", err)
}
// index header
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
@ -223,7 +233,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum
// publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return err
return fmt.Errorf("error publishing uncle IPLD: %v", err)
}
uncleReward := CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
uncle := models.UncleModel{
@ -261,24 +271,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
trx := args.txs[i]
from, err := types.Sender(signer, trx)
if err != nil {
return err
return fmt.Errorf("error deriving tx sender: %v", err)
}
// Publishing
// publish trie nodes, these aren't indexed directly
if err := shared.PublishIPLD(tx, args.txTrieNodes[i]); err != nil {
return err
return fmt.Errorf("error publishing tx trie node IPLD: %v", err)
}
if err := shared.PublishIPLD(tx, args.rctTrieNodes[i]); err != nil {
return err
return fmt.Errorf("error publishing rct trie node IPLD: %v", err)
}
// publish the txs and receipts
txNode, rctNode := args.txNodes[i], args.rctNodes[i]
if err := shared.PublishIPLD(tx, txNode); err != nil {
return err
return fmt.Errorf("error publishing tx IPLD: %v", err)
}
if err := shared.PublishIPLD(tx, rctNode); err != nil {
return err
return fmt.Errorf("error publishing rct IPLD: %v", err)
}
// Indexing
@ -344,7 +354,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
// publish the state node
stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return err
return fmt.Errorf("error publishing state node IPLD: %v", err)
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
stateModel := models.StateNodeModel{
@ -386,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
for _, storageNode := range stateNode.StorageNodes {
storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return err
return fmt.Errorf("error publishing storage node IPLD: %v", err)
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
storageModel := models.StorageNodeModel{
@ -409,10 +419,10 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
if err != nil {
return err
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil {
return err
return fmt.Errorf("error publishing code IPLD: %v", err)
}
return nil
}

View File

@ -17,12 +17,13 @@
package indexer
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
var (
@ -49,36 +50,19 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo
RETURNING id`,
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
if err == nil {
indexerMetrics.blocks.Inc(1)
if err != nil {
return 0, fmt.Errorf("error upserting header_cids entry: %v", err)
}
return headerID, err
indexerMetrics.blocks.Inc(1)
return headerID, nil
}
func (in *PostgresCIDWriter) upsertUncleCID(tx *sqlx.Tx, uncle models.UncleModel, headerID int64) error {
_, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`,
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
return err
}
func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payload shared.CIDPayload, headerID int64) error {
for _, trxCidMeta := range payload.TransactionCIDs {
var txID int64
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8)
RETURNING id`,
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data).Scan(&txID)
if err != nil {
return err
}
indexerMetrics.transactions.Inc(1)
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
if ok {
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
return err
}
}
if err != nil {
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
}
return nil
}
@ -89,20 +73,22 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8)
RETURNING id`,
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
if err == nil {
indexerMetrics.transactions.Inc(1)
if err != nil {
return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err)
}
return txID, err
indexerMetrics.transactions.Inc(1)
return txID, nil
}
func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptModel, txID int64) error {
_, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) = ($2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey, rct.PostState, rct.PostStatus)
if err == nil {
indexerMetrics.receipts.Inc(1)
if err != nil {
return fmt.Errorf("error upserting receipt_cids entry: %v", err)
}
return err
indexerMetrics.receipts.Inc(1)
return nil
}
func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) {
@ -115,14 +101,20 @@ func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateN
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`,
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
return stateID, err
if err != nil {
return 0, fmt.Errorf("error upserting state_cids entry: %v", err)
}
return stateID, nil
}
func (in *PostgresCIDWriter) upsertStateAccount(tx *sqlx.Tx, stateAccount models.StateAccountModel, stateID int64) error {
_, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`,
stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
return err
if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err)
}
return nil
}
func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.StorageNodeModel, stateID int64) error {
@ -133,5 +125,8 @@ func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.Sto
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
return err
if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err)
}
return nil
}

View File

@ -634,11 +634,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
}
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
// defer handling of commit/rollback for any return case
defer tx.Close(err)
if err != nil {
return err
}
// defer handling of commit/rollback for any return case
defer tx.Close(err)
output := func(node StateNode) error {
return sds.indexer.PushStateNode(tx, node)
}