diff --git a/statediff/doc.md b/statediff/doc.md index 678d13172..3e4689d6b 100644 --- a/statediff/doc.md +++ b/statediff/doc.md @@ -79,6 +79,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff` flag is used to turn on the service `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database +`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database `--statediff.db` is the connection string for the Postgres database to write to `--statediff.dbnodeid` is the node id to use in the Postgres database `--statediff.dbclientname` is the client name to use in the Postgres database diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 2cc09fe98..f0a24c730 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -111,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // Generate the block iplds headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } if len(txNodes) != len(txTrieNodes) && len(rctNodes) != len(rctTrieNodes) && len(txNodes) != len(rctNodes) { return nil, fmt.Errorf("expected number of transactions (%d), transaction trie nodes (%d), receipts (%d), and receipt trie nodes (%d)to be equal", len(txNodes), len(txTrieNodes), len(rctNodes), len(rctTrieNodes)) @@ -124,7 +124,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - blocktx := BlockTx{ + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } + }() + blockTx := &BlockTx{ dbtx: tx, // handle transaction commit or rollback for any return case Close: func(err error) error { @@ -164,7 +172,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil { + err = sdi.processUncles(tx, headerID, height, uncleNodes) + if err != nil { return nil, err } tDiff = time.Since(t) @@ -172,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs - if err := sdi.processReceiptsAndTxs(tx, processArgs{ + err = sdi.processReceiptsAndTxs(tx, processArgs{ headerID: headerID, blockNumber: block.Number(), receipts: receipts, @@ -181,7 +190,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip rctTrieNodes: rctTrieNodes, txNodes: txNodes, txTrieNodes: txTrieNodes, - }); err != nil { + }) + if err != nil { return nil, err } tDiff = time.Since(t) @@ -189,9 +199,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() - blocktx.BlockNumber = height - blocktx.headerID = headerID - return &blocktx, err + blockTx.BlockNumber = height + blockTx.headerID = headerID + return blockTx, err } // processHeader publishes and indexes a header IPLD in Postgres @@ -199,7 +209,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { // publish header if err := shared.PublishIPLD(tx, headerNode); err != nil { - return 0, err + return 0, fmt.Errorf("error publishing header IPLD: %v", err) } // index header return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{ @@ -223,7 +233,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum // publish and index uncles for _, uncleNode := range uncleNodes { if err := shared.PublishIPLD(tx, uncleNode); err != nil { - return err + return fmt.Errorf("error publishing uncle IPLD: %v", err) } uncleReward := CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncle := models.UncleModel{ @@ -261,24 +271,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs trx := args.txs[i] from, err := types.Sender(signer, trx) if err != nil { - return err + return fmt.Errorf("error deriving tx sender: %v", err) } // Publishing // publish trie nodes, these aren't indexed directly if err := shared.PublishIPLD(tx, args.txTrieNodes[i]); err != nil { - return err + return fmt.Errorf("error publishing tx trie node IPLD: %v", err) } if err := shared.PublishIPLD(tx, args.rctTrieNodes[i]); err != nil { - return err + return fmt.Errorf("error publishing rct trie node IPLD: %v", err) } // publish the txs and receipts txNode, rctNode := args.txNodes[i], args.rctNodes[i] if err := shared.PublishIPLD(tx, txNode); err != nil { - return err + return fmt.Errorf("error publishing tx IPLD: %v", err) } if err := shared.PublishIPLD(tx, rctNode); err != nil { - return err + return fmt.Errorf("error publishing rct IPLD: %v", err) } // Indexing @@ -344,7 +354,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN // publish the state node stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { - return err + return fmt.Errorf("error publishing state node IPLD: %v", err) } mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr) stateModel := models.StateNodeModel{ @@ -386,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN for _, storageNode := range stateNode.StorageNodes { storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { - return err + return fmt.Errorf("error publishing storage node IPLD: %v", err) } mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr) storageModel := models.StorageNodeModel{ @@ -409,10 +419,10 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd // codec doesn't matter since db key is multihash-based mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) if err != nil { - return err + return fmt.Errorf("error deriving multihash key from codehash: %v", err) } if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil { - return err + return fmt.Errorf("error publishing code IPLD: %v", err) } return nil } diff --git a/statediff/indexer/writer.go b/statediff/indexer/writer.go index ecd21297d..db71538f4 100644 --- a/statediff/indexer/writer.go +++ b/statediff/indexer/writer.go @@ -17,12 +17,13 @@ package indexer import ( + "fmt" + "github.com/jmoiron/sqlx" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) var ( @@ -49,36 +50,19 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo RETURNING id`, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID) - if err == nil { - indexerMetrics.blocks.Inc(1) + if err != nil { + return 0, fmt.Errorf("error upserting header_cids entry: %v", err) } - return headerID, err + indexerMetrics.blocks.Inc(1) + return headerID, nil } func (in *PostgresCIDWriter) upsertUncleCID(tx *sqlx.Tx, uncle models.UncleModel, headerID int64) error { _, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`, uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) - return err -} - -func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payload shared.CIDPayload, headerID int64) error { - for _, trxCidMeta := range payload.TransactionCIDs { - var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8) - RETURNING id`, - headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data).Scan(&txID) - if err != nil { - return err - } - indexerMetrics.transactions.Inc(1) - receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)] - if ok { - if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil { - return err - } - } + if err != nil { + return fmt.Errorf("error upserting uncle_cids entry: %v", err) } return nil } @@ -89,20 +73,22 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8) RETURNING id`, headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID) - if err == nil { - indexerMetrics.transactions.Inc(1) + if err != nil { + return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err) } - return txID, err + indexerMetrics.transactions.Inc(1) + return txID, nil } func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptModel, txID int64) error { _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key, post_state, post_status) = ($2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey, rct.PostState, rct.PostStatus) - if err == nil { - indexerMetrics.receipts.Inc(1) + if err != nil { + return fmt.Errorf("error upserting receipt_cids entry: %v", err) } - return err + indexerMetrics.receipts.Inc(1) + return nil } func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) { @@ -115,14 +101,20 @@ func (in *PostgresCIDWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateN ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) - return stateID, err + if err != nil { + return 0, fmt.Errorf("error upserting state_cids entry: %v", err) + } + return stateID, nil } func (in *PostgresCIDWriter) upsertStateAccount(tx *sqlx.Tx, stateAccount models.StateAccountModel, stateID int64) error { _, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`, stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) - return err + if err != nil { + return fmt.Errorf("error upserting state_accounts entry: %v", err) + } + return nil } func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.StorageNodeModel, stateID int64) error { @@ -133,5 +125,8 @@ func (in *PostgresCIDWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.Sto _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) - return err + if err != nil { + return fmt.Errorf("error upserting storage_cids entry: %v", err) + } + return nil } diff --git a/statediff/service.go b/statediff/service.go index 9784306e5..12e6bf9e6 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -634,11 +634,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p receipts = sds.BlockChain.GetReceiptsByHash(block.Hash()) } tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty) - // defer handling of commit/rollback for any return case - defer tx.Close(err) if err != nil { return err } + // defer handling of commit/rollback for any return case + defer tx.Close(err) output := func(node StateNode) error { return sds.indexer.PushStateNode(tx, node) }