From 288408a2b9e57a492aeafdc4e4d8992ec373b83f Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 15 Mar 2023 18:08:19 -0500 Subject: [PATCH] update database/sql indexer --- statediff/indexer/database/sql/batch_tx.go | 5 +- statediff/indexer/database/sql/indexer.go | 90 +++++++--------------- 2 files changed, 28 insertions(+), 67 deletions(-) diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index 5f9d09b25..4c3941e76 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -23,7 +23,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - node "github.com/ipfs/go-ipld-format" "github.com/lib/pq" "github.com/ethereum/go-ethereum/log" @@ -100,11 +99,11 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { } } -func (tx *BatchTx) cacheIPLD(i node.Node) { +func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Key: i.Cid().String(), Data: i.RawData(), } } diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 770231db8..2133a6bf3 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -29,7 +29,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - node "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" "github.com/ethereum/go-ethereum/common" @@ -40,7 +39,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" - ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" @@ -103,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - headerNode, txNodes, rctNodes, logNodes, err := ipld2.FromBlockAndReceipts(block, receipts) + headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -229,7 +228,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) var baseFee *string @@ -267,7 +266,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) { return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex()) } - unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256) + unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256) if err != nil { return err } @@ -303,9 +302,9 @@ type processArgs struct { blockNumber *big.Int receipts types.Receipts txs types.Transactions - rctNodes []*ipld2.EthReceipt - txNodes []*ipld2.EthTx - logNodes [][]*ipld2.EthLog + rctNodes []*ipld.EthReceipt + txNodes []*ipld.EthTx + logNodes [][]*ipld.EthLog } // processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres @@ -422,7 +421,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -430,25 +429,25 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // publish the state node var stateModel models.StateNodeModel if stateNode.Removed { - tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) + tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{}) stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, Removed: true, } } else { - stateCIDStr, _, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) - if err != nil { - return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) - } stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + CID: stateNode.AccountWrapper.CID, Removed: false, + Balance: stateNode.AccountWrapper.Account.Balance.String(), + Nonce: stateNode.AccountWrapper.Account.Nonce, + CodeHash: stateNode.AccountWrapper.Account.CodeHash, + StorageRoot: stateNode.AccountWrapper.Account.Root.String(), } } @@ -457,42 +456,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return err } - // if we have a leaf, decode and index the account data - if stateNode.NodeType == sdtypes.Leaf { - var i []interface{} - if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil { - return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error()) - } - if len(i) != 2 { - return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements") - } - var account types.StateAccount - if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { - return fmt.Errorf("error decoding state account rlp: %s", err.Error()) - } - accountModel := models.StateAccountModel{ - BlockNumber: tx.BlockNumber, - HeaderID: headerID, - StatePath: stateNode.Path, - Balance: account.Balance.String(), - Nonce: account.Nonce, - CodeHash: account.CodeHash, - StorageRoot: account.Root.String(), - } - if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil { - return err - } - } - // if there are any storage nodes associated with this node, publish and index them - for _, storageNode := range stateNode.StorageNodes { - if storageNode.NodeType == sdtypes.Removed { - tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) + for _, storageNode := range stateNode.StorageDiff { + if storageNode.Removed { + tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{}) storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, + StateKey: stateNode.AccountWrapper.LeafKey, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, Removed: true, @@ -502,18 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } continue } - storageCIDStr, _, err := tx.cacheRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) - if err != nil { - return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) - } storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, + StateKey: stateNode.AccountWrapper.LeafKey, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, + CID: storageNode.CID, Removed: true, + Value: storageNode.Value, } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err @@ -523,18 +490,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return nil } -// PushCodeAndCodeHash publishes code and codehash pairs to the ipld sql -func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { +// PushIPLD publishes iplds to ipld.blocks +func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } - // codec doesn't matter since db key is multihash-based - mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) - if err != nil { - return fmt.Errorf("error deriving multihash key from codehash: %v", err) - } - tx.cacheDirect(mhKey, codeAndCodeHash.Code) + tx.cacheDirect(ipld.CID, ipld.Content) return nil }