update database/file/csv and database/file/sql indexers

This commit is contained in:
i-norden 2023-03-15 18:08:03 -05:00
parent 6e8ee099e3
commit 4d53681a27
4 changed files with 32 additions and 70 deletions

View File

@ -27,7 +27,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
@ -226,10 +225,10 @@ func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
})
}
func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i node.Node) {
func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Key: i.Cid().String(),
Data: i.RawData(),
})
}

View File

@ -30,7 +30,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/common"
@ -41,7 +40,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
@ -152,7 +151,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld2.FromBlockAndReceipts(block, receipts)
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -229,7 +228,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
var baseFee *string
@ -268,7 +267,7 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex())
}
unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
if err != nil {
return err
}
@ -301,9 +300,9 @@ type processArgs struct {
blockNumber *big.Int
receipts types.Receipts
txs types.Transactions
rctNodes []*ipld2.EthReceipt
txNodes []*ipld2.EthTx
logNodes [][]*ipld2.EthLog
rctNodes []*ipld.EthReceipt
txNodes []*ipld.EthTx
logNodes [][]*ipld.EthLog
}
// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file
@ -408,7 +407,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@ -418,69 +417,43 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if stateNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
}
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
Removed: true,
}
} else {
stateCIDStr, _, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
Removed: false,
Balance: stateNode.AccountWrapper.Account.Balance.String(),
Nonce: stateNode.AccountWrapper.Account.Nonce,
CodeHash: stateNode.AccountWrapper.Account.CodeHash,
StorageRoot: stateNode.AccountWrapper.Account.Root.String(),
}
}
// index the state node
sdi.fileWriter.upsertStateCID(stateModel)
// if we have a leaf, decode and index the account data
if stateNode.NodeType == sdtypes.Leaf {
var i []interface{}
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
}
if len(i) != 2 {
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
}
var account types.StateAccount
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
sdi.fileWriter.upsertStateAccount(accountModel)
}
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes {
if storageNode.NodeType == sdtypes.Removed {
for _, storageNode := range stateNode.StorageDiff {
if storageNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StateKey: stateNode.AccountWrapper.LeafKey,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
Removed: true,
@ -488,18 +461,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
sdi.fileWriter.upsertStorageCID(storageModel)
continue
}
storageCIDStr, _, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StateKey: stateNode.AccountWrapper.LeafKey,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
CID: storageNode.CID,
Removed: false,
Value: storageNode.Value,
}
sdi.fileWriter.upsertStorageCID(storageModel)
}
@ -507,18 +476,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return nil
}
// PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
// PushIPLD writes iplds to ipld.blocks
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, mhKey, codeAndCodeHash.Code)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content)
return nil
}

View File

@ -19,7 +19,7 @@ package file
import (
"math/big"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -48,7 +48,7 @@ type FileWriter interface {
// Methods to upsert IPLD in different ways
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDNode(blockNumber string, i ipld.IPLD)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
// Methods to read and write watched addresses

View File

@ -26,7 +26,6 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
@ -184,10 +183,10 @@ func (sqw *SQLWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
})
}
func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i node.Node) {
func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
sqw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Key: i.Cid().String(),
Data: i.RawData(),
})
}