update to use new schema; fix pgx driver

This commit is contained in:
i-norden 2021-11-17 10:25:11 -06:00
parent 9a67034f29
commit b36b3f83cb
13 changed files with 263 additions and 240 deletions

View File

@ -23,16 +23,14 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )

View File

@ -136,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
// Publish and index header, collect headerID // Publish and index header, collect headerID
var headerID int64 var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil { if err != nil {
return nil, err return nil, err
@ -181,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader publishes and indexes a header IPLD in Postgres // processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode) tx.cacheIPLD(headerNode)
var baseFee *int64 var baseFee *int64
@ -190,12 +190,13 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
*baseFee = header.BaseFee.Int64() *baseFee = header.BaseFee.Int64()
} }
headerID := header.Hash().String()
mod := models.HeaderModel{ mod := models.HeaderModel{
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: header.Number.String(),
BlockHash: header.Hash().String(), BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
Bloom: header.Bloom.Bytes(), Bloom: header.Bloom.Bytes(),
@ -207,11 +208,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
BaseFee: baseFee, BaseFee: baseFee,
} }
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return 0, err return headerID, err
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -223,6 +224,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(), ParentHash: uncleNode.ParentHash.String(),
@ -238,7 +240,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
// processArgs bundles arguments to processReceiptsAndTxs // processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct { type processArgs struct {
headerID int64 headerID string
blockNumber *big.Int blockNumber *big.Int
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
@ -263,54 +265,19 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
tx.cacheIPLD(txNode) tx.cacheIPLD(txNode)
// Indexing // Indexing
// extract topic and contract data from the receipt for indexing // index tx
mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
for idx, l := range receipt.Logs {
topicSet := make([]string, 4)
for ti, topic := range l.Topics {
topicSet[ti] = topic.Hex()
}
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
mappedContracts[l.Address.String()] = true
logDataSet[idx] = &models.LogsModel{
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}
// these are the contracts seen in the logs
logContracts := make([]string, 0, len(mappedContracts))
for addr := range mappedContracts {
logContracts = append(logContracts, addr)
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index tx first so that the receipt can reference it by FK
trx := args.txs[i] trx := args.txs[i]
trxID := trx.Hash().String()
// derive sender for the tx that corresponds with this receipt // derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx) from, err := types.Sender(signer, trx)
if err != nil { if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
TxHash: trx.Hash().String(), TxHash: trxID,
Index: int64(i), Index: int64(i),
Data: trx.Data(), Data: trx.Data(),
CID: txNode.Cid().String(), CID: txNode.Cid().String(),
@ -328,6 +295,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
TxID: trxID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys, StorageKeys: storageKeys,
@ -337,12 +305,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
} }
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index the receipt // index the receipt
if !args.rctLeafNodeCIDs[i].Defined() { if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid") return fmt.Errorf("invalid receipt leaf node cid")
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
TxID: trxID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), LeafCID: args.rctLeafNodeCIDs[i].String(),
@ -359,6 +335,31 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err return err
} }
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
for idx, l := range receipt.Logs {
topicSet := make([]string, 4)
for ti, topic := range l.Topics {
topicSet[ti] = topic.Hex()
}
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
logDataSet[idx] = &models.LogsModel{
ReceiptID: trxID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil {
return err return err
} }
@ -374,7 +375,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@ -384,6 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID, CID: shared.RemovedNodeStateCID,
@ -398,6 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr, CID: stateCIDStr,
@ -422,6 +425,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
Nonce: account.Nonce, Nonce: account.Nonce,
CodeHash: account.CodeHash, CodeHash: account.CodeHash,
@ -437,6 +442,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID, CID: shared.RemovedNodeStorageCID,
@ -453,6 +460,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr, CID: storageCIDStr,
@ -482,7 +491,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
return nil return nil
} }
// Close satisfied io.Closer // Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error { func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close() return sdi.dump.Close()
} }

View File

@ -34,7 +34,6 @@ type BatchTx struct {
BlockNumber uint64 BlockNumber uint64
ctx context.Context ctx context.Context
dbtx Tx dbtx Tx
headerID int64
stm string stm string
quit chan struct{} quit chan struct{}
iplds chan models.IPLDModel iplds chan models.IPLDModel

View File

@ -187,7 +187,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
// Publish and index header, collect headerID // Publish and index header, collect headerID
var headerID int64 var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil { if err != nil {
return nil, err return nil, err
@ -227,13 +227,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
blockTx.headerID = headerID
return blockTx, err return blockTx, err
} }
// processHeader publishes and indexes a header IPLD in Postgres // processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode) tx.cacheIPLD(headerNode)
var baseFee *int64 var baseFee *int64
@ -241,14 +240,14 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
baseFee = new(int64) baseFee = new(int64)
*baseFee = header.BaseFee.Int64() *baseFee = header.BaseFee.Int64()
} }
headerID := header.Hash().String()
// index header // index header
return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: header.Number.String(),
BlockHash: header.Hash().String(), BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
Bloom: header.Bloom.Bytes(), Bloom: header.Bloom.Bytes(),
@ -262,7 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -274,13 +273,14 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(), ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(), BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(), Reward: uncleReward.String(),
} }
if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil { if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil {
return err return err
} }
} }
@ -289,7 +289,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
// processArgs bundles arguments to processReceiptsAndTxs // processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct { type processArgs struct {
headerID int64 headerID string
blockNumber *big.Int blockNumber *big.Int
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
@ -313,63 +313,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
txNode := args.txNodes[i] txNode := args.txNodes[i]
tx.cacheIPLD(txNode) tx.cacheIPLD(txNode)
// Indexing // index tx
// extract topic and contract data from the receipt for indexing
mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
for idx, l := range receipt.Logs {
topicSet := make([]string, 4)
for ti, topic := range l.Topics {
topicSet[ti] = topic.Hex()
}
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
mappedContracts[l.Address.String()] = true
logDataSet[idx] = &models.LogsModel{
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}
// these are the contracts seen in the logs
logContracts := make([]string, 0, len(mappedContracts))
for addr := range mappedContracts {
logContracts = append(logContracts, addr)
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index tx first so that the receipt can reference it by FK
trx := args.txs[i] trx := args.txs[i]
txID := trx.Hash().String()
// derive sender for the tx that corresponds with this receipt // derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx) from, err := types.Sender(signer, trx)
if err != nil { if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
TxHash: trx.Hash().String(), TxHash: txID,
Index: int64(i), Index: int64(i),
Data: trx.Data(), Data: trx.Data(),
CID: txNode.Cid().String(), CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()), MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(), Type: trx.Type(),
} }
txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID) if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
if err != nil {
return err return err
} }
@ -380,21 +343,30 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys, StorageKeys: storageKeys,
} }
if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil { if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel); err != nil {
return err return err
} }
} }
// index the receipt // this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index receipt
if !args.rctLeafNodeCIDs[i].Defined() { if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid") return fmt.Errorf("invalid receipt leaf node cid")
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), LeafCID: args.rctLeafNodeCIDs[i].String(),
@ -407,12 +379,37 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
rctModel.PostState = common.Bytes2Hex(receipt.PostState) rctModel.PostState = common.Bytes2Hex(receipt.PostState)
} }
receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID) if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil {
if err != nil {
return err return err
} }
if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil { // index logs
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
for idx, l := range receipt.Logs {
topicSet := make([]string, 4)
for ti, topic := range l.Topics {
topicSet[ti] = topic.Hex()
}
if !args.logLeafNodeCIDs[i][idx].Defined() {
return fmt.Errorf("invalid log cid")
}
logDataSet[idx] = &models.LogsModel{
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}
if err := sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet); err != nil {
return err return err
} }
} }
@ -427,7 +424,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@ -437,29 +434,29 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID, CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey, MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(), NodeType: stateNode.NodeType.Int(),
} }
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel)
return err
} }
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil { if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr, CID: stateCIDStr,
MhKey: stateMhKey, MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(), NodeType: stateNode.NodeType.Int(),
} }
// index the state node, collect the stateID to reference by FK // index the state node
stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil {
if err != nil {
return err return err
} }
// if we have a leaf, decode and index the account data // if we have a leaf, decode and index the account data
@ -476,12 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
Nonce: account.Nonce, Nonce: account.Nonce,
CodeHash: account.CodeHash, CodeHash: account.CodeHash,
StorageRoot: account.Root.String(), StorageRoot: account.Root.String(),
} }
if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil { if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil {
return err return err
} }
} }
@ -491,13 +490,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID, CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey, MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(), NodeType: storageNode.NodeType.Int(),
} }
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err return err
} }
continue continue
@ -507,13 +508,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr, CID: storageCIDStr,
MhKey: storageMhKey, MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(), NodeType: storageNode.NodeType.Int(),
} }
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err return err
} }
} }

View File

@ -30,6 +30,7 @@ type DriverType string
const ( const (
PGX DriverType = "PGX" PGX DriverType = "PGX"
SQLX DriverType = "SQLX" SQLX DriverType = "SQLX"
FILE DriverType = "File"
Unknown DriverType = "Unknown" Unknown DriverType = "Unknown"
) )
@ -40,6 +41,8 @@ func ResolveDriverType(str string) (DriverType, error) {
return PGX, nil return PGX, nil
case "sqlx": case "sqlx":
return SQLX, nil return SQLX, nil
case "file":
return FILE, nil
default: default:
return Unknown, fmt.Errorf("unrecognized driver type string: %s", str) return Unknown, fmt.Errorf("unrecognized driver type string: %s", str)
} }

View File

@ -46,59 +46,55 @@ type DB struct {
func (db *DB) InsertHeaderStm() string { func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
RETURNING id`
} }
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string { func (db *DB) InsertUncleStm() string {
return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)` ON CONFLICT (block_hash) DO NOTHING`
} }
// InsertTxStm satisfies the sql.Statements interface // InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string { func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_hash) DO NOTHING`
RETURNING id`
} }
// InsertAccessListElementStm satisfies the sql.Statements interface // InsertAccessListElementStm satisfies the sql.Statements interface
func (db *DB) InsertAccessListElementStm() string { func (db *DB) InsertAccessListElementStm() string {
return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)` ON CONFLICT (tx_id, index) DO NOTHING`
} }
// InsertRctStm satisfies the sql.Statements interface // InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string { func (db *DB) InsertRctStm() string {
return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8) ON CONFLICT (tx_id) DO NOTHING`
RETURNING id`
} }
// InsertLogStm satisfies the sql.Statements interface // InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string { func (db *DB) InsertLogStm() string {
return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)` ON CONFLICT (rct_id, index) DO NOTHING`
} }
// InsertStateStm satisfies the sql.Statements interface // InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string { func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`
RETURNING id`
} }
// InsertAccountStm satisfies the sql.Statements interface // InsertAccountStm satisfies the sql.Statements interface
func (db *DB) InsertAccountStm() string { func (db *DB) InsertAccountStm() string {
return `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)` ON CONFLICT (header_id, state_path) DO NOTHING`
} }
// InsertStorageStm satisfies the sql.Statements interface // InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string { func (db *DB) InsertStorageStm() string {
return `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)`
} }
// InsertIPLDStm satisfies the sql.Statements interface // InsertIPLDStm satisfies the sql.Statements interface

View File

@ -138,11 +138,6 @@ func (pgx *PGXDriver) Stats() sql.Stats {
return pgxStatsWrapper{stats: stats} return pgxStatsWrapper{stats: stats}
} }
// NodeInfo satisfies sql.Database
func (pgx *PGXDriver) NodeInfo() node.Info {
return pgx.nodeInfo
}
// NodeID satisfies sql.Database // NodeID satisfies sql.Database
func (pgx *PGXDriver) NodeID() int64 { func (pgx *PGXDriver) NodeID() int64 {
return pgx.nodeID return pgx.nodeID

View File

@ -107,11 +107,6 @@ func (driver *SQLXDriver) Stats() sql.Stats {
return sqlxStatsWrapper{stats: stats} return sqlxStatsWrapper{stats: stats}
} }
// NodeInfo satisfies sql.Database
func (driver *SQLXDriver) NodeInfo() node.Info {
return driver.nodeInfo
}
// NodeID satisfies sql.Database // NodeID satisfies sql.Database
func (driver *SQLXDriver) NodeID() int64 { func (driver *SQLXDriver) NodeID() int64 {
return driver.nodeID return driver.nodeID

View File

@ -39,41 +39,56 @@ func NewWriter(db Database) *Writer {
} }
} }
func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) (int64, error) { /*
var headerID int64 INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
err := tx.QueryRow(in.db.Context(), in.db.InsertHeaderStm(), VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
*/
func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee).Scan(&headerID) header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
if err != nil { if err != nil {
return 0, fmt.Errorf("error upserting header_cids entry: %v", err) return fmt.Errorf("error upserting header_cids entry: %v", err)
} }
indexerMetrics.blocks.Inc(1) indexerMetrics.blocks.Inc(1)
return headerID, nil return nil
} }
func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel, headerID int64) error { /*
INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (block_hash) DO NOTHING
*/
func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(),
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
if err != nil { if err != nil {
return fmt.Errorf("error upserting uncle_cids entry: %v", err) return fmt.Errorf("error upserting uncle_cids entry: %v", err)
} }
return nil return nil
} }
func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel, headerID int64) (int64, error) { /*
var txID int64 INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
err := tx.QueryRow(in.db.Context(), in.db.InsertTxStm(), ON CONFLICT (tx_hash) DO NOTHING
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID) */
func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(),
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)
if err != nil { if err != nil {
return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err) return fmt.Errorf("error upserting transaction_cids entry: %v", err)
} }
indexerMetrics.transactions.Inc(1) indexerMetrics.transactions.Inc(1)
return txID, nil return nil
} }
func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel, txID int64) error { /*
INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
ON CONFLICT (tx_id, index) DO NOTHING
*/
func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(),
txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
if err != nil { if err != nil {
return fmt.Errorf("error upserting access_list_element entry: %v", err) return fmt.Errorf("error upserting access_list_element entry: %v", err)
} }
@ -81,21 +96,28 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access
return nil return nil
} }
func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel, txID int64) (int64, error) { /*
var receiptID int64 INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
err := tx.QueryRow(in.db.Context(), in.db.InsertRctStm(), ON CONFLICT (tx_id) DO NOTHING
txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID) */
func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(),
rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)
if err != nil { if err != nil {
return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err) return fmt.Errorf("error upserting receipt_cids entry: %w", err)
} }
indexerMetrics.receipts.Inc(1) indexerMetrics.receipts.Inc(1)
return receiptID, nil return nil
} }
func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) error { /*
INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (rct_id, index) DO NOTHING
*/
func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(),
log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
if err != nil { if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err) return fmt.Errorf("error upserting logs entry: %w", err)
} }
@ -104,36 +126,47 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64)
return nil return nil
} }
func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) { /*
var stateID int64 INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
*/
func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
var stateKey string var stateKey string
if stateNode.StateKey != nullHash.String() { if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey stateKey = stateNode.StateKey
} }
err := tx.QueryRow(in.db.Context(), in.db.InsertStateStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(),
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)
if err != nil { if err != nil {
return 0, fmt.Errorf("error upserting state_cids entry: %v", err) return fmt.Errorf("error upserting state_cids entry: %v", err)
} }
return stateID, nil return nil
} }
func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel, stateID int64) error { /*
INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, state_path) DO NOTHING
*/
func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(),
stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil { if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err) return fmt.Errorf("error upserting state_accounts entry: %v", err)
} }
return nil return nil
} }
func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel, stateID int64) error { /*
INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)
*/
func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
var storageKey string var storageKey string
if storageCID.StorageKey != nullHash.String() { if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey storageKey = storageCID.StorageKey
} }
_, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(), _, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(),
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
if err != nil { if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err) return fmt.Errorf("error upserting storage_cids entry: %v", err)
} }

View File

@ -29,7 +29,7 @@ import (
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface { type StateDiffIndexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateNode) error PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) ReportDBMetrics(delay time.Duration, quit <-chan bool)
io.Closer io.Closer

View File

@ -26,7 +26,7 @@ type IPLDBatch struct {
// UncleBatch holds the arguments for a batch insert of uncle data // UncleBatch holds the arguments for a batch insert of uncle data
type UncleBatch struct { type UncleBatch struct {
HeaderID []int64 HeaderID []string
BlockHashes []string BlockHashes []string
ParentHashes []string ParentHashes []string
CIDs []string CIDs []string
@ -36,7 +36,7 @@ type UncleBatch struct {
// TxBatch holds the arguments for a batch insert of tx data // TxBatch holds the arguments for a batch insert of tx data
type TxBatch struct { type TxBatch struct {
HeaderID int64 HeaderID string
Indexes []int64 Indexes []int64
TxHashes []string TxHashes []string
CIDs []string CIDs []string
@ -44,20 +44,20 @@ type TxBatch struct {
Dsts []string Dsts []string
Srcs []string Srcs []string
Datas [][]byte Datas [][]byte
Types []*uint8 Types []uint8
} }
// AccessListBatch holds the arguments for a batch insert of access list data // AccessListBatch holds the arguments for a batch insert of access list data
type AccessListBatch struct { type AccessListBatch struct {
Indexes []int64 Indexes []int64
TxIDs []int64 TxIDs []string
Addresses []string Addresses []string
StorageKeysSets []pq.StringArray StorageKeysSets []pq.StringArray
} }
// ReceiptBatch holds the arguments for a batch insert of receipt data // ReceiptBatch holds the arguments for a batch insert of receipt data
type ReceiptBatch struct { type ReceiptBatch struct {
TxIDs []int64 TxIDs []string
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
PostStatuses []uint64 PostStatuses []uint64
@ -71,7 +71,7 @@ type ReceiptBatch struct {
type LogBatch struct { type LogBatch struct {
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
ReceiptIDs []int64 ReceiptIDs []string
Addresses []string Addresses []string
Indexes []int64 Indexes []int64
Datas [][]byte Datas [][]byte
@ -83,34 +83,33 @@ type LogBatch struct {
// StateBatch holds the arguments for a batch insert of state data // StateBatch holds the arguments for a batch insert of state data
type StateBatch struct { type StateBatch struct {
ID int64 HeaderID string
HeaderID int64 Paths [][]byte
Path []byte StateKeys []string
StateKey string NodeTypes []int
NodeType int CIDs []string
CID string MhKeys []string
MhKey string
Diff bool Diff bool
} }
// AccountBatch holds the arguments for a batch insert of account data // AccountBatch holds the arguments for a batch insert of account data
type AccountBatch struct { type AccountBatch struct {
ID int64 HeaderID string
StateID int64 StatePaths [][]byte
Balance string Balances []string
Nonce uint64 Nonces []uint64
CodeHash []byte CodeHashes [][]byte
StorageRoot string StorageRoots []string
} }
// StorageBatch holds the arguments for a batch insert of storage data // StorageBatch holds the arguments for a batch insert of storage data
type StorageBatch struct { type StorageBatch struct {
ID int64 HeaderID string
StateID int64 StatePaths [][]string
Path []byte Paths [][]byte
StorageKey string StorageKeys []string
NodeType int NodeTypes []int
CID string CIDs []string
MhKey string MhKeys []string
Diff bool Diff bool
} }

View File

@ -26,7 +26,6 @@ type IPLDModel struct {
// HeaderModel is the db model for eth.header_cids // HeaderModel is the db model for eth.header_cids
type HeaderModel struct { type HeaderModel struct {
ID int64 `db:"id"`
BlockNumber string `db:"block_number"` BlockNumber string `db:"block_number"`
BlockHash string `db:"block_hash"` BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"` ParentHash string `db:"parent_hash"`
@ -47,8 +46,7 @@ type HeaderModel struct {
// UncleModel is the db model for eth.uncle_cids // UncleModel is the db model for eth.uncle_cids
type UncleModel struct { type UncleModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id"`
HeaderID int64 `db:"header_id"`
BlockHash string `db:"block_hash"` BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"` ParentHash string `db:"parent_hash"`
CID string `db:"cid"` CID string `db:"cid"`
@ -58,8 +56,7 @@ type UncleModel struct {
// TxModel is the db model for eth.transaction_cids // TxModel is the db model for eth.transaction_cids
type TxModel struct { type TxModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id"`
HeaderID int64 `db:"header_id"`
Index int64 `db:"index"` Index int64 `db:"index"`
TxHash string `db:"tx_hash"` TxHash string `db:"tx_hash"`
CID string `db:"cid"` CID string `db:"cid"`
@ -72,17 +69,15 @@ type TxModel struct {
// AccessListElementModel is the db model for eth.access_list_entry // AccessListElementModel is the db model for eth.access_list_entry
type AccessListElementModel struct { type AccessListElementModel struct {
ID int64 `db:"id"`
Index int64 `db:"index"` Index int64 `db:"index"`
TxID int64 `db:"tx_id"` TxID string `db:"tx_id"`
Address string `db:"address"` Address string `db:"address"`
StorageKeys pq.StringArray `db:"storage_keys"` StorageKeys pq.StringArray `db:"storage_keys"`
} }
// ReceiptModel is the db model for eth.receipt_cids // ReceiptModel is the db model for eth.receipt_cids
type ReceiptModel struct { type ReceiptModel struct {
ID int64 `db:"id"` TxID string `db:"tx_id"`
TxID int64 `db:"tx_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`
PostStatus uint64 `db:"post_status"` PostStatus uint64 `db:"post_status"`
@ -94,8 +89,7 @@ type ReceiptModel struct {
// StateNodeModel is the db model for eth.state_cids // StateNodeModel is the db model for eth.state_cids
type StateNodeModel struct { type StateNodeModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id"`
HeaderID int64 `db:"header_id"`
Path []byte `db:"state_path"` Path []byte `db:"state_path"`
StateKey string `db:"state_leaf_key"` StateKey string `db:"state_leaf_key"`
NodeType int `db:"node_type"` NodeType int `db:"node_type"`
@ -106,8 +100,8 @@ type StateNodeModel struct {
// StorageNodeModel is the db model for eth.storage_cids // StorageNodeModel is the db model for eth.storage_cids
type StorageNodeModel struct { type StorageNodeModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id""`
StateID int64 `db:"state_id"` StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"` Path []byte `db:"storage_path"`
StorageKey string `db:"storage_leaf_key"` StorageKey string `db:"storage_leaf_key"`
NodeType int `db:"node_type"` NodeType int `db:"node_type"`
@ -118,8 +112,8 @@ type StorageNodeModel struct {
// StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key
type StorageNodeWithStateKeyModel struct { type StorageNodeWithStateKeyModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id""`
StateID int64 `db:"state_id"` StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"` Path []byte `db:"storage_path"`
StateKey string `db:"state_leaf_key"` StateKey string `db:"state_leaf_key"`
StorageKey string `db:"storage_leaf_key"` StorageKey string `db:"storage_leaf_key"`
@ -131,8 +125,8 @@ type StorageNodeWithStateKeyModel struct {
// StateAccountModel is a db model for an eth state account (decoded value of state leaf node) // StateAccountModel is a db model for an eth state account (decoded value of state leaf node)
type StateAccountModel struct { type StateAccountModel struct {
ID int64 `db:"id"` HeaderID string `db:"header_id"`
StateID int64 `db:"state_id"` StatePath []byte `db:"state_path"`
Balance string `db:"balance"` Balance string `db:"balance"`
Nonce uint64 `db:"nonce"` Nonce uint64 `db:"nonce"`
CodeHash []byte `db:"code_hash"` CodeHash []byte `db:"code_hash"`
@ -141,10 +135,9 @@ type StateAccountModel struct {
// LogsModel is the db model for eth.logs // LogsModel is the db model for eth.logs
type LogsModel struct { type LogsModel struct {
ID int64 `db:"id"` ReceiptID string `db:"rct_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`
ReceiptID int64 `db:"receipt_id"`
Address string `db:"address"` Address string `db:"address"`
Index int64 `db:"index"` Index int64 `db:"index"`
Data []byte `db:"log_data"` Data []byte `db:"log_data"`

View File

@ -664,7 +664,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
} }
}() }()
output := func(node types2.StateNode) error { output := func(node types2.StateNode) error {
return sds.indexer.PushStateNode(tx, node) return sds.indexer.PushStateNode(tx, node, block.Hash().String())
} }
codeOutput := func(c types2.CodeAndCodeHash) error { codeOutput := func(c types2.CodeAndCodeHash) error {
return sds.indexer.PushCodeAndCodeHash(tx, c) return sds.indexer.PushCodeAndCodeHash(tx, c)