updating database/dump indexer

This commit is contained in:
i-norden 2023-03-15 18:07:41 -05:00
parent 6d5a0157b7
commit 6e8ee099e3
2 changed files with 42 additions and 113 deletions

View File

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
) )
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
@ -74,10 +73,10 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
} }
} }
func (tx *BatchTx) cacheIPLD(i node.Node) { func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber, BlockNumber: tx.BlockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), Key: i.Cid().String(),
Data: i.RawData(), Data: i.RawData(),
} }
} }

View File

@ -26,10 +26,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -40,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
@ -83,16 +80,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
// Generate the block iplds // Generate the block iplds
headerNode, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts) headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
} }
if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) { if len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs)) return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
}
if len(txTrieNodes) != len(rctTrieNodes) {
return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
} }
// Calculate reward // Calculate reward
@ -160,17 +154,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
// Publish and index receipts and txs // Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{ err = sdi.processReceiptsAndTxs(blockTx, processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
receipts: receipts, receipts: receipts,
txs: transactions, txs: transactions,
rctNodes: rctNodes, rctNodes: rctNodes,
rctTrieNodes: rctTrieNodes, txNodes: txNodes,
txNodes: txNodes, logNodes: logNodes,
txTrieNodes: txTrieNodes,
logTrieNodes: logTrieNodes,
logLeafNodeCIDs: logLeafNodeCIDs,
rctLeafNodeCIDs: rctLeafNodeCIDs,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -185,7 +175,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader publishes and indexes a header IPLD in Postgres // processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode) tx.cacheIPLD(headerNode)
headerID := header.Hash().String() headerID := header.Hash().String()
@ -219,7 +209,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) { if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex()) return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex())
} }
unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256) unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
if err != nil { if err != nil {
return err return err
} }
@ -251,17 +241,13 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
// processArgs bundles arguments to processReceiptsAndTxs // processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct { type processArgs struct {
headerID string headerID string
blockNumber *big.Int blockNumber *big.Int
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld2.EthReceipt rctNodes []*ipld.EthReceipt
rctTrieNodes []*ipld2.EthRctTrie txNodes []*ipld.EthTx
txNodes []*ipld2.EthTx logNodes [][]*ipld.EthLog
txTrieNodes []*ipld2.EthTxTrie
logTrieNodes [][]node.Node
logLeafNodeCIDs [][]cid.Cid
rctLeafNodeCIDs []cid.Cid
} }
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres // processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
@ -269,9 +255,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] {
tx.cacheIPLD(logTrieNode)
}
txNode := args.txNodes[i] txNode := args.txNodes[i]
tx.cacheIPLD(txNode) tx.cacheIPLD(txNode)
@ -331,17 +314,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
// index the receipt // index the receipt
if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid")
}
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
TxID: trxID, TxID: trxID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), CID: args.rctNodes[i].Cid().String(),
} }
if len(receipt.PostState) == 0 { if len(receipt.PostState) == 0 {
rctModel.PostStatus = receipt.Status rctModel.PostStatus = receipt.Status
@ -360,17 +339,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
topicSet[ti] = topic.Hex() topicSet[ti] = topic.Hex()
} }
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
ReceiptID: trxID, ReceiptID: trxID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
LeafCID: args.logLeafNodeCIDs[i][idx].String(), CID: args.logNodes[i][idx].Cid().String(),
Topic0: topicSet[0], Topic0: topicSet[0],
Topic1: topicSet[1], Topic1: topicSet[1],
Topic2: topicSet[2], Topic2: topicSet[2],
@ -383,46 +358,38 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
} }
// publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes {
tx.cacheIPLD(n)
tx.cacheIPLD(args.rctTrieNodes[i])
}
return nil return nil
} }
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
if stateNode.NodeType == sdtypes.Removed { if stateNode.Removed {
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID, CID: shared.RemovedNodeStateCID,
Removed: true, Removed: true,
} }
} else { } else {
stateCIDStr, _, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateNode.AccountWrapper.CID,
CID: stateCIDStr,
Removed: false, Removed: false,
Balance: stateNode.AccountWrapper.Account.Balance.String(),
Nonce: stateNode.AccountWrapper.Account.Nonce,
CodeHash: stateNode.AccountWrapper.Account.CodeHash,
StorageRoot: stateNode.AccountWrapper.Account.Root.String(),
} }
} }
@ -431,43 +398,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return err return err
} }
// if we have a leaf, decode and index the account data
if stateNode.NodeType == sdtypes.Leaf {
var i []interface{}
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
}
if len(i) != 2 {
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
}
var account types.StateAccount
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accountModel); err != nil {
return err
}
}
// if there are any storage nodes associated with this node, publish and index them // if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes { for _, storageNode := range stateNode.StorageDiff {
if storageNode.NodeType == sdtypes.Removed { if storageNode.Removed {
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StateKey: stateNode.AccountWrapper.LeafKey,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID, CID: shared.RemovedNodeStorageCID,
Removed: true, Removed: true,
@ -477,18 +416,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
} }
continue continue
} }
storageCIDStr, storageMhKey, err := tx.cacheRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StateKey: stateNode.AccountWrapper.LeafKey,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr, CID: storageNode.CID,
Removed: false, Removed: false,
Value: storageNode.Value,
} }
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err return err
@ -498,18 +433,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return nil return nil
} }
// PushCodeAndCodeHash publishes code and codehash pairs to the ipld sql // PushIPLD publishes iplds to ipld.blocks
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// codec doesn't matter since db key is multihash-based tx.cacheDirect(ipld.CID, ipld.Content)
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
tx.cacheDirect(mhKey, codeAndCodeHash.Code)
return nil return nil
} }