From 6e8ee099e30dc42c4efb24febaa74edfcbe2d014 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 15 Mar 2023 18:07:41 -0500 Subject: [PATCH] updating database/dump indexer --- statediff/indexer/database/dump/batch_tx.go | 5 +- statediff/indexer/database/dump/indexer.go | 150 ++++++-------------- 2 files changed, 42 insertions(+), 113 deletions(-) diff --git a/statediff/indexer/database/dump/batch_tx.go b/statediff/indexer/database/dump/batch_tx.go index ee195a558..06820b8ac 100644 --- a/statediff/indexer/database/dump/batch_tx.go +++ b/statediff/indexer/database/dump/batch_tx.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/models" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - node "github.com/ipfs/go-ipld-format" ) // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration @@ -74,10 +73,10 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { } } -func (tx *BatchTx) cacheIPLD(i node.Node) { +func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Key: i.Cid().String(), Data: i.RawData(), } } diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 1764d5512..bfbc5c169 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -26,10 +26,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - - "github.com/ipfs/go-cid" - node "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" "github.com/ethereum/go-ethereum/common" @@ -40,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" @@ -83,16 +80,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - headerNode, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts) + headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } - if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) { - return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs)) - } - if len(txTrieNodes) != len(rctTrieNodes) { - return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes)) + if len(txNodes) != len(rctNodes) { + return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes)) } // Calculate reward @@ -160,17 +154,13 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // Publish and index receipts and txs err = sdi.processReceiptsAndTxs(blockTx, processArgs{ - headerID: headerID, - blockNumber: block.Number(), - receipts: receipts, - txs: transactions, - rctNodes: rctNodes, - rctTrieNodes: rctTrieNodes, - txNodes: txNodes, - txTrieNodes: txTrieNodes, - logTrieNodes: logTrieNodes, - logLeafNodeCIDs: logLeafNodeCIDs, - rctLeafNodeCIDs: rctLeafNodeCIDs, + headerID: headerID, + blockNumber: block.Number(), + receipts: receipts, + txs: transactions, + rctNodes: rctNodes, + txNodes: txNodes, + logNodes: logNodes, }) if err != nil { return nil, err @@ -185,7 +175,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) headerID := header.Hash().String() @@ -219,7 +209,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) { return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex()) } - unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256) + unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256) if err != nil { return err } @@ -251,17 +241,13 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu // processArgs bundles arguments to processReceiptsAndTxs type processArgs struct { - headerID string - blockNumber *big.Int - receipts types.Receipts - txs types.Transactions - rctNodes []*ipld2.EthReceipt - rctTrieNodes []*ipld2.EthRctTrie - txNodes []*ipld2.EthTx - txTrieNodes []*ipld2.EthTxTrie - logTrieNodes [][]node.Node - logLeafNodeCIDs [][]cid.Cid - rctLeafNodeCIDs []cid.Cid + headerID string + blockNumber *big.Int + receipts types.Receipts + txs types.Transactions + rctNodes []*ipld.EthReceipt + txNodes []*ipld.EthTx + logNodes [][]*ipld.EthLog } // processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres @@ -269,9 +255,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) for i, receipt := range args.receipts { - for _, logTrieNode := range args.logTrieNodes[i] { - tx.cacheIPLD(logTrieNode) - } txNode := args.txNodes[i] tx.cacheIPLD(txNode) @@ -331,17 +314,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // index the receipt - if !args.rctLeafNodeCIDs[i].Defined() { - return fmt.Errorf("invalid receipt leaf node cid") - } - rctModel := &models.ReceiptModel{ BlockNumber: args.blockNumber.String(), HeaderID: args.headerID, TxID: trxID, Contract: contract, ContractHash: contractHash, - LeafCID: args.rctLeafNodeCIDs[i].String(), + CID: args.rctNodes[i].Cid().String(), } if len(receipt.PostState) == 0 { rctModel.PostStatus = receipt.Status @@ -360,17 +339,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs topicSet[ti] = topic.Hex() } - if !args.logLeafNodeCIDs[i][idx].Defined() { - return fmt.Errorf("invalid log cid") - } - logDataSet[idx] = &models.LogsModel{ BlockNumber: args.blockNumber.String(), HeaderID: args.headerID, ReceiptID: trxID, Address: l.Address.String(), Index: int64(l.Index), - LeafCID: args.logLeafNodeCIDs[i][idx].String(), + CID: args.logNodes[i][idx].Cid().String(), Topic0: topicSet[0], Topic1: topicSet[1], Topic2: topicSet[2], @@ -383,46 +358,38 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } } - // publish trie nodes, these aren't indexed directly - for i, n := range args.txTrieNodes { - tx.cacheIPLD(n) - tx.cacheIPLD(args.rctTrieNodes[i]) - } - return nil } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // publish the state node var stateModel models.StateNodeModel - if stateNode.NodeType == sdtypes.Removed { + if stateNode.Removed { // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, Removed: true, } } else { - stateCIDStr, _, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) - if err != nil { - return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) - } stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + CID: stateNode.AccountWrapper.CID, Removed: false, + Balance: stateNode.AccountWrapper.Account.Balance.String(), + Nonce: stateNode.AccountWrapper.Account.Nonce, + CodeHash: stateNode.AccountWrapper.Account.CodeHash, + StorageRoot: stateNode.AccountWrapper.Account.Root.String(), } } @@ -431,43 +398,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return err } - // if we have a leaf, decode and index the account data - if stateNode.NodeType == sdtypes.Leaf { - var i []interface{} - if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil { - return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error()) - } - if len(i) != 2 { - return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements") - } - var account types.StateAccount - if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { - return fmt.Errorf("error decoding state account rlp: %s", err.Error()) - } - accountModel := models.StateAccountModel{ - BlockNumber: tx.BlockNumber, - HeaderID: headerID, - StatePath: stateNode.Path, - Balance: account.Balance.String(), - Nonce: account.Nonce, - CodeHash: account.CodeHash, - StorageRoot: account.Root.String(), - } - if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accountModel); err != nil { - return err - } - } - // if there are any storage nodes associated with this node, publish and index them - for _, storageNode := range stateNode.StorageNodes { - if storageNode.NodeType == sdtypes.Removed { + for _, storageNode := range stateNode.StorageDiff { + if storageNode.Removed { // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, + StateKey: stateNode.AccountWrapper.LeafKey, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, Removed: true, @@ -477,18 +416,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } continue } - storageCIDStr, storageMhKey, err := tx.cacheRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) - if err != nil { - return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) - } storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, + StateKey: stateNode.AccountWrapper.LeafKey, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, + CID: storageNode.CID, Removed: false, + Value: storageNode.Value, } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { return err @@ -498,18 +433,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return nil } -// PushCodeAndCodeHash publishes code and codehash pairs to the ipld sql -func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { +// PushIPLD publishes iplds to ipld.blocks +func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } - // codec doesn't matter since db key is multihash-based - mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) - if err != nil { - return fmt.Errorf("error deriving multihash key from codehash: %v", err) - } - tx.cacheDirect(mhKey, codeAndCodeHash.Code) + tx.cacheDirect(ipld.CID, ipld.Content) return nil }