update database/sql indexer
This commit is contained in:
parent
4d53681a27
commit
288408a2b9
@ -23,7 +23,6 @@ import (
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/lib/pq"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
@ -100,11 +99,11 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (tx *BatchTx) cacheIPLD(i node.Node) {
|
||||
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
|
||||
Key: i.Cid().String(),
|
||||
Data: i.RawData(),
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/multiformats/go-multihash"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -40,7 +39,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
@ -103,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
}
|
||||
|
||||
// Generate the block iplds
|
||||
headerNode, txNodes, rctNodes, logNodes, err := ipld2.FromBlockAndReceipts(block, receipts)
|
||||
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
|
||||
}
|
||||
@ -229,7 +228,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
|
||||
// processHeader publishes and indexes a header IPLD in Postgres
|
||||
// it returns the headerID
|
||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
|
||||
tx.cacheIPLD(headerNode)
|
||||
|
||||
var baseFee *string
|
||||
@ -267,7 +266,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
|
||||
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
|
||||
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex())
|
||||
}
|
||||
unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
||||
unclesCID, err := ipld.RawdataToCid(ipld.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -303,9 +302,9 @@ type processArgs struct {
|
||||
blockNumber *big.Int
|
||||
receipts types.Receipts
|
||||
txs types.Transactions
|
||||
rctNodes []*ipld2.EthReceipt
|
||||
txNodes []*ipld2.EthTx
|
||||
logNodes [][]*ipld2.EthLog
|
||||
rctNodes []*ipld.EthReceipt
|
||||
txNodes []*ipld.EthTx
|
||||
logNodes [][]*ipld.EthLog
|
||||
}
|
||||
|
||||
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
|
||||
@ -422,7 +421,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
|
||||
}
|
||||
|
||||
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
|
||||
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
|
||||
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
|
||||
tx, ok := batch.(*BatchTx)
|
||||
if !ok {
|
||||
return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
|
||||
@ -430,25 +429,25 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
// publish the state node
|
||||
var stateModel models.StateNodeModel
|
||||
if stateNode.Removed {
|
||||
tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
|
||||
tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{})
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStateCID,
|
||||
Removed: true,
|
||||
}
|
||||
} else {
|
||||
stateCIDStr, _, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
|
||||
}
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
CID: stateCIDStr,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: stateNode.AccountWrapper.CID,
|
||||
Removed: false,
|
||||
Balance: stateNode.AccountWrapper.Account.Balance.String(),
|
||||
Nonce: stateNode.AccountWrapper.Account.Nonce,
|
||||
CodeHash: stateNode.AccountWrapper.Account.CodeHash,
|
||||
StorageRoot: stateNode.AccountWrapper.Account.Root.String(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -457,42 +456,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
return err
|
||||
}
|
||||
|
||||
// if we have a leaf, decode and index the account data
|
||||
if stateNode.NodeType == sdtypes.Leaf {
|
||||
var i []interface{}
|
||||
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
|
||||
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
|
||||
}
|
||||
if len(i) != 2 {
|
||||
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
|
||||
}
|
||||
var account types.StateAccount
|
||||
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
|
||||
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
|
||||
}
|
||||
accountModel := models.StateAccountModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Balance: account.Balance.String(),
|
||||
Nonce: account.Nonce,
|
||||
CodeHash: account.CodeHash,
|
||||
StorageRoot: account.Root.String(),
|
||||
}
|
||||
if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// if there are any storage nodes associated with this node, publish and index them
|
||||
for _, storageNode := range stateNode.StorageNodes {
|
||||
if storageNode.NodeType == sdtypes.Removed {
|
||||
tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
|
||||
for _, storageNode := range stateNode.StorageDiff {
|
||||
if storageNode.Removed {
|
||||
tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{})
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Path: storageNode.Path,
|
||||
StateKey: stateNode.AccountWrapper.LeafKey,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStorageCID,
|
||||
Removed: true,
|
||||
@ -502,18 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
}
|
||||
continue
|
||||
}
|
||||
storageCIDStr, _, err := tx.cacheRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Path: storageNode.Path,
|
||||
StateKey: stateNode.AccountWrapper.LeafKey,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: storageCIDStr,
|
||||
CID: storageNode.CID,
|
||||
Removed: true,
|
||||
Value: storageNode.Value,
|
||||
}
|
||||
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
|
||||
return err
|
||||
@ -523,18 +490,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushCodeAndCodeHash publishes code and codehash pairs to the ipld sql
|
||||
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
|
||||
// PushIPLD publishes iplds to ipld.blocks
|
||||
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
|
||||
tx, ok := batch.(*BatchTx)
|
||||
if !ok {
|
||||
return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
|
||||
}
|
||||
// codec doesn't matter since db key is multihash-based
|
||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
||||
}
|
||||
tx.cacheDirect(mhKey, codeAndCodeHash.Code)
|
||||
tx.cacheDirect(ipld.CID, ipld.Content)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user