adjust indexers for tx, rct, and log trie changes

This commit is contained in:
i-norden 2023-02-13 15:04:08 -06:00
parent f05d6b6baa
commit 530fed2614
3 changed files with 37 additions and 91 deletions

View File

@ -30,7 +30,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format" node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
@ -153,16 +152,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
// Generate the block iplds // Generate the block iplds
headerNode, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts) headerNode, txNodes, rctNodes, logNodes, err := ipld2.FromBlockAndReceipts(block, receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
} }
if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) { if len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs)) return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
}
if len(txTrieNodes) != len(rctTrieNodes) {
return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
} }
// Calculate reward // Calculate reward
@ -212,17 +208,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// write receipts and txs // write receipts and txs
err = sdi.processReceiptsAndTxs(processArgs{ err = sdi.processReceiptsAndTxs(processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
receipts: receipts, receipts: receipts,
txs: transactions, txs: transactions,
rctNodes: rctNodes, rctNodes: rctNodes,
rctTrieNodes: rctTrieNodes, txNodes: txNodes,
txNodes: txNodes, logNodes: logNodes,
txTrieNodes: txTrieNodes,
logTrieNodes: logTrieNodes,
logLeafNodeCIDs: logLeafNodeCIDs,
rctLeafNodeCIDs: rctLeafNodeCIDs,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -305,17 +297,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int
// processArgs bundles arguments to processReceiptsAndTxs // processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct { type processArgs struct {
headerID string headerID string
blockNumber *big.Int blockNumber *big.Int
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld2.EthReceipt rctNodes []*ipld2.EthReceipt
rctTrieNodes []*ipld2.EthRctTrie txNodes []*ipld2.EthTx
txNodes []*ipld2.EthTx logNodes [][]*ipld2.EthLog
txTrieNodes []*ipld2.EthTxTrie
logTrieNodes [][]node.Node
logLeafNodeCIDs [][]cid.Cid
rctLeafNodeCIDs []cid.Cid
} }
// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file // processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file
@ -323,9 +311,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] {
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), logTrieNode)
}
txNode := args.txNodes[i] txNode := args.txNodes[i]
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), txNode) sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), txNode)
@ -380,17 +365,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
// index receipt // index receipt
if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid")
}
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), CID: args.rctNodes[i].Cid().String(),
} }
if len(receipt.PostState) == 0 { if len(receipt.PostState) == 0 {
rctModel.PostStatus = receipt.Status rctModel.PostStatus = receipt.Status
@ -407,17 +388,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
topicSet[ti] = topic.Hex() topicSet[ti] = topic.Hex()
} }
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
LeafCID: args.logLeafNodeCIDs[i][idx].String(), CID: args.logNodes[i][idx].Cid().String(),
Topic0: topicSet[0], Topic0: topicSet[0],
Topic1: topicSet[1], Topic1: topicSet[1],
Topic2: topicSet[2], Topic2: topicSet[2],
@ -427,12 +404,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
sdi.fileWriter.upsertLogCID(logDataSet) sdi.fileWriter.upsertLogCID(logDataSet)
} }
// publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes {
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), n)
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), args.rctTrieNodes[i])
}
return nil return nil
} }

View File

@ -43,7 +43,6 @@ type FileWriter interface {
upsertReceiptCID(rct *models.ReceiptModel) upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel) upsertLogCID(logs []*models.LogsModel)
upsertStateCID(stateNode models.StateNodeModel) upsertStateCID(stateNode models.StateNodeModel)
upsertStateAccount(stateAccount models.StateAccountModel)
upsertStorageCID(storageCID models.StorageNodeModel) upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel) upsertIPLD(ipld models.IPLDModel)

View File

@ -29,7 +29,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format" node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
@ -209,17 +208,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
// Publish and index receipts and txs // Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{ err = sdi.processReceiptsAndTxs(blockTx, processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
receipts: receipts, receipts: receipts,
txs: transactions, txs: transactions,
rctNodes: rctNodes, rctNodes: rctNodes,
rctTrieNodes: rctTrieNodes, txNodes: txNodes,
txNodes: txNodes, logNodes: logNodes,
txTrieNodes: txTrieNodes,
logTrieNodes: logTrieNodes,
logLeafNodeCIDs: logLeafNodeCIDs,
rctLeafNodeCIDs: rctLeafNodeCIDs,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -304,17 +299,13 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
// processArgs bundles arguments to processReceiptsAndTxs // processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct { type processArgs struct {
headerID string headerID string
blockNumber *big.Int blockNumber *big.Int
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld2.EthReceipt rctNodes []*ipld2.EthReceipt
rctTrieNodes []*ipld2.EthRctTrie txNodes []*ipld2.EthTx
txNodes []*ipld2.EthTx logNodes [][]*ipld2.EthLog
txTrieNodes []*ipld2.EthTxTrie
logTrieNodes [][]node.Node
logLeafNodeCIDs [][]cid.Cid
rctLeafNodeCIDs []cid.Cid
} }
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres // processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
@ -322,8 +313,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] { for _, logNode := range args.logNodes[i] {
tx.cacheIPLD(logTrieNode) tx.cacheIPLD(logNode)
} }
txNode := args.txNodes[i] txNode := args.txNodes[i]
tx.cacheIPLD(txNode) tx.cacheIPLD(txNode)
@ -382,18 +373,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
} }
// index receipt
if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid")
}
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), CID: args.rctNodes[i].Cid().String(),
} }
if len(receipt.PostState) == 0 { if len(receipt.PostState) == 0 {
rctModel.PostStatus = receipt.Status rctModel.PostStatus = receipt.Status
@ -413,17 +399,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
topicSet[ti] = topic.Hex() topicSet[ti] = topic.Hex()
} }
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
LeafCID: args.logLeafNodeCIDs[i][idx].String(), CID: args.logNodes[i][idx].Cid().String(),
Topic0: topicSet[0], Topic0: topicSet[0],
Topic1: topicSet[1], Topic1: topicSet[1],
Topic2: topicSet[2], Topic2: topicSet[2],
@ -436,12 +418,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
} }
// publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes {
tx.cacheIPLD(n)
tx.cacheIPLD(args.rctTrieNodes[i])
}
return nil return nil
} }