update sql indexer to use new v4 schema that denormalizes by block_number for the purposes of partitioning & sharding

This commit is contained in:
i-norden 2022-03-17 07:17:04 -05:00
parent 6b74310941
commit 9775355d2b
7 changed files with 233 additions and 190 deletions

View File

@ -29,9 +29,11 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
) )
const startingCacheCapacity = 1024 * 24
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct { type BatchTx struct {
BlockNumber uint64 BlockNumber string
ctx context.Context ctx context.Context
dbtx Tx dbtx Tx
stm string stm string
@ -48,7 +50,8 @@ func (tx *BatchTx) Submit(err error) error {
} }
func (tx *BatchTx) flush() error { func (tx *BatchTx) flush() error {
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
pq.Array(tx.ipldCache.Values))
if err != nil { if err != nil {
return err return err
} }
@ -61,6 +64,7 @@ func (tx *BatchTx) cache() {
for { for {
select { select {
case i := <-tx.iplds: case i := <-tx.iplds:
tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber)
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
case <-tx.quit: case <-tx.quit:
@ -72,6 +76,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
Key: key, Key: key,
Data: value, Data: value,
} }
@ -79,6 +84,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i node.Node) { func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(), Data: i.RawData(),
} }
@ -91,6 +97,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
} }
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
Key: prefixedKey, Key: prefixedKey,
Data: raw, Data: raw,
} }

View File

@ -141,11 +141,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}() }()
blockTx := &BatchTx{ blockTx := &BatchTx{
ctx: sdi.ctx, ctx: sdi.ctx,
BlockNumber: height, BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(), stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel), iplds: make(chan models.IPLDModel),
quit: make(chan struct{}), quit: make(chan struct{}),
ipldCache: models.IPLDBatch{}, ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx, dbtx: tx,
// handle transaction commit or rollback for any return case // handle transaction commit or rollback for any return case
submit: func(self *BatchTx, err error) error { submit: func(self *BatchTx, err error) error {
@ -200,7 +204,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, height, uncleNodes) err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -264,7 +268,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -273,9 +277,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0) uncleReward = big.NewInt(0)
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
BlockNumber: blockNumber.String(),
HeaderID: headerID, HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
@ -331,6 +336,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
@ -353,6 +359,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -376,6 +383,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -406,6 +414,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(),
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
@ -434,7 +443,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@ -444,6 +453,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
BlockNumber: blockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -458,6 +468,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
BlockNumber: blockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -483,6 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -500,6 +512,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -518,6 +531,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,

View File

@ -44,58 +44,58 @@ func (db *DB) InsertHeaderStm() string {
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string { func (db *DB) InsertUncleStm() string {
return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) return `INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (block_hash) DO NOTHING` ON CONFLICT (block_hash) DO NOTHING`
} }
// InsertTxStm satisfies the sql.Statements interface // InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string { func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tx_hash) DO NOTHING` ON CONFLICT (tx_hash) DO NOTHING`
} }
// InsertAccessListElementStm satisfies the sql.Statements interface // InsertAccessListElementStm satisfies the sql.Statements interface
func (db *DB) InsertAccessListElementStm() string { func (db *DB) InsertAccessListElementStm() string {
return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) return `INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_id, index) DO NOTHING` ON CONFLICT (tx_id, index) DO NOTHING`
} }
// InsertRctStm satisfies the sql.Statements interface // InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string { func (db *DB) InsertRctStm() string {
return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tx_id) DO NOTHING` ON CONFLICT (tx_id) DO NOTHING`
} }
// InsertLogStm satisfies the sql.Statements interface // InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string { func (db *DB) InsertLogStm() string {
return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (rct_id, index) DO NOTHING` ON CONFLICT (rct_id, index) DO NOTHING`
} }
// InsertStateStm satisfies the sql.Statements interface // InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string { func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
} }
// InsertAccountStm satisfies the sql.Statements interface // InsertAccountStm satisfies the sql.Statements interface
func (db *DB) InsertAccountStm() string { func (db *DB) InsertAccountStm() string {
return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) return `INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO NOTHING` ON CONFLICT (header_id, state_path) DO NOTHING`
} }
// InsertStorageStm satisfies the sql.Statements interface // InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string { func (db *DB) InsertStorageStm() string {
return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)` ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
} }
// InsertIPLDStm satisfies the sql.Statements interface // InsertIPLDStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDStm() string {
return `INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` return `INSERT INTO public.blocks (block_number, key, data) VALUES ($1, $2, $3) ON CONFLICT (block_number, key) DO NOTHING`
} }
// InsertIPLDsStm satisfies the sql.Statements interface // InsertIPLDsStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDsStm() string { func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING` return `INSERT INTO public.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT (block_number, key) DO NOTHING`
} }

View File

@ -62,12 +62,12 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
} }
/* /*
INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (block_hash) DO NOTHING ON CONFLICT (block_hash) DO NOTHING
*/ */
func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
if err != nil { if err != nil {
return fmt.Errorf("error upserting uncle_cids entry: %v", err) return fmt.Errorf("error upserting uncle_cids entry: %v", err)
} }
@ -75,13 +75,13 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
} }
/* /*
INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tx_hash) DO NOTHING ON CONFLICT (tx_hash) DO NOTHING
*/ */
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil { if err != nil {
return fmt.Errorf("error upserting transaction_cids entry: %v", err) return fmt.Errorf("error upserting transaction_cids entry: %v", err)
} }
@ -90,12 +90,13 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
} }
/* /*
INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_id, index) DO NOTHING ON CONFLICT (tx_id, index) DO NOTHING
*/ */
func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(),
accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
accessListElement.StorageKeys)
if err != nil { if err != nil {
return fmt.Errorf("error upserting access_list_element entry: %v", err) return fmt.Errorf("error upserting access_list_element entry: %v", err)
} }
@ -104,12 +105,13 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
} }
/* /*
INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tx_id) DO NOTHING ON CONFLICT (tx_id) DO NOTHING
*/ */
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot)
if err != nil { if err != nil {
return fmt.Errorf("error upserting receipt_cids entry: %w", err) return fmt.Errorf("error upserting receipt_cids entry: %w", err)
} }
@ -118,14 +120,14 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
} }
/* /*
INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (rct_id, index) DO NOTHING ON CONFLICT (rct_id, index) DO NOTHING
*/ */
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.BlockNumber, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
log.Topic3, log.Data) log.Topic2, log.Topic3, log.Data)
if err != nil { if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err) return fmt.Errorf("error upserting logs entry: %w", err)
} }
@ -135,8 +137,8 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
} }
/* /*
INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1 $3, $4, $6, $7, $8)
*/ */
func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
var stateKey string var stateKey string
@ -144,7 +146,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
stateKey = stateNode.StateKey stateKey = stateNode.StateKey
} }
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
stateNode.MhKey)
if err != nil { if err != nil {
return fmt.Errorf("error upserting state_cids entry: %v", err) return fmt.Errorf("error upserting state_cids entry: %v", err)
} }
@ -152,13 +155,13 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
} }
/* /*
INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO NOTHING ON CONFLICT (header_id, state_path) DO NOTHING
*/ */
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.StorageRoot) stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil { if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err) return fmt.Errorf("error upserting state_accounts entry: %v", err)
} }
@ -166,8 +169,8 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel
} }
/* /*
INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)
*/ */
func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
var storageKey string var storageKey string
@ -175,8 +178,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
storageKey = storageCID.StorageKey storageKey = storageCID.StorageKey
} }
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
true, storageCID.MhKey) storageCID.NodeType, true, storageCID.MhKey)
if err != nil { if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err) return fmt.Errorf("error upserting storage_cids entry: %v", err)
} }

View File

@ -29,7 +29,7 @@ import (
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface { type StateDiffIndexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) ReportDBMetrics(delay time.Duration, quit <-chan bool)
io.Closer io.Closer

View File

@ -20,12 +20,14 @@ import "github.com/lib/pq"
// IPLDBatch holds the arguments for a batch insert of IPLD data // IPLDBatch holds the arguments for a batch insert of IPLD data
type IPLDBatch struct { type IPLDBatch struct {
BlockNumbers []string
Keys []string Keys []string
Values [][]byte Values [][]byte
} }
// UncleBatch holds the arguments for a batch insert of uncle data // UncleBatch holds the arguments for a batch insert of uncle data
type UncleBatch struct { type UncleBatch struct {
BlockNumbers []string
HeaderID []string HeaderID []string
BlockHashes []string BlockHashes []string
ParentHashes []string ParentHashes []string
@ -36,6 +38,7 @@ type UncleBatch struct {
// TxBatch holds the arguments for a batch insert of tx data // TxBatch holds the arguments for a batch insert of tx data
type TxBatch struct { type TxBatch struct {
BlockNumbers []string
HeaderID string HeaderID string
Indexes []int64 Indexes []int64
TxHashes []string TxHashes []string
@ -49,6 +52,7 @@ type TxBatch struct {
// AccessListBatch holds the arguments for a batch insert of access list data // AccessListBatch holds the arguments for a batch insert of access list data
type AccessListBatch struct { type AccessListBatch struct {
BlockNumbers []string
Indexes []int64 Indexes []int64
TxIDs []string TxIDs []string
Addresses []string Addresses []string
@ -57,6 +61,7 @@ type AccessListBatch struct {
// ReceiptBatch holds the arguments for a batch insert of receipt data // ReceiptBatch holds the arguments for a batch insert of receipt data
type ReceiptBatch struct { type ReceiptBatch struct {
BlockNumbers []string
TxIDs []string TxIDs []string
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
@ -69,6 +74,7 @@ type ReceiptBatch struct {
// LogBatch holds the arguments for a batch insert of log data // LogBatch holds the arguments for a batch insert of log data
type LogBatch struct { type LogBatch struct {
BlockNumbers []string
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
ReceiptIDs []string ReceiptIDs []string
@ -83,6 +89,7 @@ type LogBatch struct {
// StateBatch holds the arguments for a batch insert of state data // StateBatch holds the arguments for a batch insert of state data
type StateBatch struct { type StateBatch struct {
BlockNumbers []string
HeaderID string HeaderID string
Paths [][]byte Paths [][]byte
StateKeys []string StateKeys []string
@ -94,6 +101,7 @@ type StateBatch struct {
// AccountBatch holds the arguments for a batch insert of account data // AccountBatch holds the arguments for a batch insert of account data
type AccountBatch struct { type AccountBatch struct {
BlockNumbers []string
HeaderID string HeaderID string
StatePaths [][]byte StatePaths [][]byte
Balances []string Balances []string
@ -104,6 +112,7 @@ type AccountBatch struct {
// StorageBatch holds the arguments for a batch insert of storage data // StorageBatch holds the arguments for a batch insert of storage data
type StorageBatch struct { type StorageBatch struct {
BlockNumbers []string
HeaderID string HeaderID string
StatePaths [][]string StatePaths [][]string
Paths [][]byte Paths [][]byte

View File

@ -20,6 +20,7 @@ import "github.com/lib/pq"
// IPLDModel is the db model for public.blocks // IPLDModel is the db model for public.blocks
type IPLDModel struct { type IPLDModel struct {
BlockNumber string `db:"block_number"`
Key string `db:"key"` Key string `db:"key"`
Data []byte `db:"data"` Data []byte `db:"data"`
} }
@ -46,6 +47,7 @@ type HeaderModel struct {
// UncleModel is the db model for eth.uncle_cids // UncleModel is the db model for eth.uncle_cids
type UncleModel struct { type UncleModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
BlockHash string `db:"block_hash"` BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"` ParentHash string `db:"parent_hash"`
@ -56,6 +58,7 @@ type UncleModel struct {
// TxModel is the db model for eth.transaction_cids // TxModel is the db model for eth.transaction_cids
type TxModel struct { type TxModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
Index int64 `db:"index"` Index int64 `db:"index"`
TxHash string `db:"tx_hash"` TxHash string `db:"tx_hash"`
@ -70,6 +73,7 @@ type TxModel struct {
// AccessListElementModel is the db model for eth.access_list_entry // AccessListElementModel is the db model for eth.access_list_entry
type AccessListElementModel struct { type AccessListElementModel struct {
BlockNumber string `db:"block_number"`
Index int64 `db:"index"` Index int64 `db:"index"`
TxID string `db:"tx_id"` TxID string `db:"tx_id"`
Address string `db:"address"` Address string `db:"address"`
@ -78,6 +82,7 @@ type AccessListElementModel struct {
// ReceiptModel is the db model for eth.receipt_cids // ReceiptModel is the db model for eth.receipt_cids
type ReceiptModel struct { type ReceiptModel struct {
BlockNumber string `db:"block_number"`
TxID string `db:"tx_id"` TxID string `db:"tx_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`
@ -90,6 +95,7 @@ type ReceiptModel struct {
// StateNodeModel is the db model for eth.state_cids // StateNodeModel is the db model for eth.state_cids
type StateNodeModel struct { type StateNodeModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
Path []byte `db:"state_path"` Path []byte `db:"state_path"`
StateKey string `db:"state_leaf_key"` StateKey string `db:"state_leaf_key"`
@ -101,6 +107,7 @@ type StateNodeModel struct {
// StorageNodeModel is the db model for eth.storage_cids // StorageNodeModel is the db model for eth.storage_cids
type StorageNodeModel struct { type StorageNodeModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
StatePath []byte `db:"state_path"` StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"` Path []byte `db:"storage_path"`
@ -113,6 +120,7 @@ type StorageNodeModel struct {
// StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key
type StorageNodeWithStateKeyModel struct { type StorageNodeWithStateKeyModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
StatePath []byte `db:"state_path"` StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"` Path []byte `db:"storage_path"`
@ -126,6 +134,7 @@ type StorageNodeWithStateKeyModel struct {
// StateAccountModel is a db model for an eth state account (decoded value of state leaf node) // StateAccountModel is a db model for an eth state account (decoded value of state leaf node)
type StateAccountModel struct { type StateAccountModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
StatePath []byte `db:"state_path"` StatePath []byte `db:"state_path"`
Balance string `db:"balance"` Balance string `db:"balance"`
@ -136,6 +145,7 @@ type StateAccountModel struct {
// LogsModel is the db model for eth.logs // LogsModel is the db model for eth.logs
type LogsModel struct { type LogsModel struct {
BlockNumber string `db:"block_number"`
ReceiptID string `db:"rct_id"` ReceiptID string `db:"rct_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`