From 9775355d2b8ca5f009c6b21229d253541f38bc58 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 17 Mar 2022 07:17:04 -0500 Subject: [PATCH] update sql indexer to use new v4 schema that denormalizes by block_number for the purposes of partitioning & sharding --- statediff/indexer/database/sql/batch_tx.go | 23 +-- statediff/indexer/database/sql/indexer.go | 132 ++++++++++-------- .../indexer/database/sql/postgres/database.go | 24 ++-- statediff/indexer/database/sql/writer.go | 47 ++++--- statediff/indexer/interfaces/interfaces.go | 2 +- statediff/indexer/models/batch.go | 81 ++++++----- statediff/indexer/models/models.go | 114 ++++++++------- 7 files changed, 233 insertions(+), 190 deletions(-) diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index fb1b289a1..a7c9a474a 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -29,9 +29,11 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/models" ) +const startingCacheCapacity = 1024 * 24 + // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber uint64 + BlockNumber string ctx context.Context dbtx Tx stm string @@ -48,7 +50,8 @@ func (tx *BatchTx) Submit(err error) error { } func (tx *BatchTx) flush() error { - _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) + _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys), + pq.Array(tx.ipldCache.Values)) if err != nil { return err } @@ -61,6 +64,7 @@ func (tx *BatchTx) cache() { for { select { case i := <-tx.iplds: + tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber) tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) case <-tx.quit: @@ -72,15 +76,17 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.iplds <- models.IPLDModel{ - Key: key, - Data: value, + BlockNumber: tx.BlockNumber, + Key: key, + Data: value, } } func (tx *BatchTx) cacheIPLD(i node.Node) { tx.iplds <- models.IPLDModel{ - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), - Data: i.RawData(), + BlockNumber: tx.BlockNumber, + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), } } @@ -91,8 +97,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() tx.iplds <- models.IPLDModel{ - Key: prefixedKey, - Data: raw, + BlockNumber: tx.BlockNumber, + Key: prefixedKey, + Data: raw, } return c.String(), prefixedKey, err } diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index e2667cecd..4db268dc7 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -141,12 +141,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }() blockTx := &BatchTx{ ctx: sdi.ctx, - BlockNumber: height, + BlockNumber: block.Number().String(), stm: sdi.dbWriter.db.InsertIPLDsStm(), iplds: make(chan models.IPLDModel), quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - dbtx: tx, + ipldCache: models.IPLDBatch{ + BlockNumbers: make([]string, 0, startingCacheCapacity), + Keys: make([]string, 0, startingCacheCapacity), + Values: make([][]byte, 0, startingCacheCapacity), + }, + dbtx: tx, // handle transaction commit or rollback for any return case submit: func(self *BatchTx, err error) error { defer func() { @@ -200,7 +204,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, height, uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) if err != nil { return nil, err } @@ -264,7 +268,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -273,15 +277,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - HeaderID: headerID, - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), + BlockNumber: blockNumber.String(), + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), } if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil { return err @@ -331,16 +336,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: txID, - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), - Value: val, + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + Value: val, } if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil { return err @@ -353,6 +359,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + BlockNumber: args.blockNumber.String(), TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -376,6 +383,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ + BlockNumber: args.blockNumber.String(), TxID: txID, Contract: contract, ContractHash: contractHash, @@ -406,16 +414,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - ReceiptID: txID, - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], + BlockNumber: args.blockNumber.String(), + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], } } @@ -434,7 +443,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -444,12 +453,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: shared.RemovedNodeStateCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), } return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel) } @@ -458,12 +468,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, - MhKey: stateMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } // index the state node if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil { @@ -483,6 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + BlockNumber: blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -500,13 +512,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: shared.RemovedNodeStorageCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err @@ -518,13 +531,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, - MhKey: storageMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 4cff518a0..b0aa49515 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -44,58 +44,58 @@ func (db *DB) InsertHeaderStm() string { // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { - return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) + return `INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (block_hash) DO NOTHING` } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { - return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (tx_hash) DO NOTHING` } // InsertAccessListElementStm satisfies the sql.Statements interface func (db *DB) InsertAccessListElementStm() string { - return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) + return `INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (tx_id, index) DO NOTHING` } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { - return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_id) DO NOTHING` } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (rct_id, index) DO NOTHING` } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)` } // InsertAccountStm satisfies the sql.Statements interface func (db *DB) InsertAccountStm() string { - return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) + return `INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO NOTHING` } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)` + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)` } // InsertIPLDStm satisfies the sql.Statements interface func (db *DB) InsertIPLDStm() string { - return `INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` + return `INSERT INTO public.blocks (block_number, key, data) VALUES ($1, $2, $3) ON CONFLICT (block_number, key) DO NOTHING` } // InsertIPLDsStm satisfies the sql.Statements interface func (db *DB) InsertIPLDsStm() string { - return `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING` + return `INSERT INTO public.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT (block_number, key) DO NOTHING` } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 3f1dfc0b5..bc543fa3f 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -62,12 +62,12 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { } /* -INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) +INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (block_hash) DO NOTHING */ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), - uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) + uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { return fmt.Errorf("error upserting uncle_cids entry: %v", err) } @@ -75,13 +75,13 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { } /* -INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (tx_hash) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, - transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) + transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) if err != nil { return fmt.Errorf("error upserting transaction_cids entry: %v", err) } @@ -90,12 +90,13 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { } /* -INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) +INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (tx_id, index) DO NOTHING */ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(), - accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) + accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, + accessListElement.StorageKeys) if err != nil { return fmt.Errorf("error upserting access_list_element entry: %v", err) } @@ -104,12 +105,13 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL } /* -INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_id) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) + rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, + rct.PostStatus, rct.LogRoot) if err != nil { return fmt.Errorf("error upserting receipt_cids entry: %w", err) } @@ -118,14 +120,14 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { } /* -INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (rct_id, index) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, - log.Topic3, log.Data) + log.BlockNumber, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, + log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) } @@ -135,8 +137,8 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { } /* -INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) +INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1 $3, $4, $6, $7, $8) */ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { var stateKey string @@ -144,7 +146,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateKey = stateNode.StateKey } _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) + stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, + stateNode.MhKey) if err != nil { return fmt.Errorf("error upserting state_cids entry: %v", err) } @@ -152,13 +155,13 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { } /* -INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) +INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), - stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, - stateAccount.StorageRoot) + stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { return fmt.Errorf("error upserting state_accounts entry: %v", err) } @@ -166,8 +169,8 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel } /* -INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) -ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) +INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9) */ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { var storageKey string @@ -175,8 +178,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageKey = storageCID.StorageKey } _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, - true, storageCID.MhKey) + storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, + storageCID.NodeType, true, storageCID.MhKey) if err != nil { return fmt.Errorf("error upserting storage_cids entry: %v", err) } diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 8f951230d..4ad9175a5 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go index 16096f292..94e9b4e96 100644 --- a/statediff/indexer/models/batch.go +++ b/statediff/indexer/models/batch.go @@ -20,12 +20,14 @@ import "github.com/lib/pq" // IPLDBatch holds the arguments for a batch insert of IPLD data type IPLDBatch struct { - Keys []string - Values [][]byte + BlockNumbers []string + Keys []string + Values [][]byte } // UncleBatch holds the arguments for a batch insert of uncle data type UncleBatch struct { + BlockNumbers []string HeaderID []string BlockHashes []string ParentHashes []string @@ -36,19 +38,21 @@ type UncleBatch struct { // TxBatch holds the arguments for a batch insert of tx data type TxBatch struct { - HeaderID string - Indexes []int64 - TxHashes []string - CIDs []string - MhKeys []string - Dsts []string - Srcs []string - Datas [][]byte - Types []uint8 + BlockNumbers []string + HeaderID string + Indexes []int64 + TxHashes []string + CIDs []string + MhKeys []string + Dsts []string + Srcs []string + Datas [][]byte + Types []uint8 } // AccessListBatch holds the arguments for a batch insert of access list data type AccessListBatch struct { + BlockNumbers []string Indexes []int64 TxIDs []string Addresses []string @@ -57,6 +61,7 @@ type AccessListBatch struct { // ReceiptBatch holds the arguments for a batch insert of receipt data type ReceiptBatch struct { + BlockNumbers []string TxIDs []string LeafCIDs []string LeafMhKeys []string @@ -69,31 +74,34 @@ type ReceiptBatch struct { // LogBatch holds the arguments for a batch insert of log data type LogBatch struct { - LeafCIDs []string - LeafMhKeys []string - ReceiptIDs []string - Addresses []string - Indexes []int64 - Datas [][]byte - Topic0s []string - Topic1s []string - Topic2s []string - Topic3s []string + BlockNumbers []string + LeafCIDs []string + LeafMhKeys []string + ReceiptIDs []string + Addresses []string + Indexes []int64 + Datas [][]byte + Topic0s []string + Topic1s []string + Topic2s []string + Topic3s []string } // StateBatch holds the arguments for a batch insert of state data type StateBatch struct { - HeaderID string - Paths [][]byte - StateKeys []string - NodeTypes []int - CIDs []string - MhKeys []string - Diff bool + BlockNumbers []string + HeaderID string + Paths [][]byte + StateKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } // AccountBatch holds the arguments for a batch insert of account data type AccountBatch struct { + BlockNumbers []string HeaderID string StatePaths [][]byte Balances []string @@ -104,12 +112,13 @@ type AccountBatch struct { // StorageBatch holds the arguments for a batch insert of storage data type StorageBatch struct { - HeaderID string - StatePaths [][]string - Paths [][]byte - StorageKeys []string - NodeTypes []int - CIDs []string - MhKeys []string - Diff bool + BlockNumbers []string + HeaderID string + StatePaths [][]string + Paths [][]byte + StorageKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 2caed1bcb..4e1cfa888 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -20,8 +20,9 @@ import "github.com/lib/pq" // IPLDModel is the db model for public.blocks type IPLDModel struct { - Key string `db:"key"` - Data []byte `db:"data"` + BlockNumber string `db:"block_number"` + Key string `db:"key"` + Data []byte `db:"data"` } // HeaderModel is the db model for eth.header_cids @@ -46,30 +47,33 @@ type HeaderModel struct { // UncleModel is the db model for eth.uncle_cids type UncleModel struct { - HeaderID string `db:"header_id"` - BlockHash string `db:"block_hash"` - ParentHash string `db:"parent_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Reward string `db:"reward"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + BlockHash string `db:"block_hash"` + ParentHash string `db:"parent_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Reward string `db:"reward"` } // TxModel is the db model for eth.transaction_cids type TxModel struct { - HeaderID string `db:"header_id"` - Index int64 `db:"index"` - TxHash string `db:"tx_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Dst string `db:"dst"` - Src string `db:"src"` - Data []byte `db:"tx_data"` - Type uint8 `db:"tx_type"` - Value string `db:"value"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + Index int64 `db:"index"` + TxHash string `db:"tx_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Dst string `db:"dst"` + Src string `db:"src"` + Data []byte `db:"tx_data"` + Type uint8 `db:"tx_type"` + Value string `db:"value"` } // AccessListElementModel is the db model for eth.access_list_entry type AccessListElementModel struct { + BlockNumber string `db:"block_number"` Index int64 `db:"index"` TxID string `db:"tx_id"` Address string `db:"address"` @@ -78,6 +82,7 @@ type AccessListElementModel struct { // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { + BlockNumber string `db:"block_number"` TxID string `db:"tx_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` @@ -90,42 +95,46 @@ type ReceiptModel struct { // StateNodeModel is the db model for eth.state_cids type StateNodeModel struct { - HeaderID string `db:"header_id"` - Path []byte `db:"state_path"` - StateKey string `db:"state_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + Path []byte `db:"state_path"` + StateKey string `db:"state_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StorageNodeModel is the db model for eth.storage_cids type StorageNodeModel struct { - HeaderID string `db:"header_id"` - StatePath []byte `db:"state_path"` - Path []byte `db:"storage_path"` - StorageKey string `db:"storage_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` + Path []byte `db:"storage_path"` + StorageKey string `db:"storage_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key type StorageNodeWithStateKeyModel struct { - HeaderID string `db:"header_id"` - StatePath []byte `db:"state_path"` - Path []byte `db:"storage_path"` - StateKey string `db:"state_leaf_key"` - StorageKey string `db:"storage_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` + Path []byte `db:"storage_path"` + StateKey string `db:"state_leaf_key"` + StorageKey string `db:"storage_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StateAccountModel is a db model for an eth state account (decoded value of state leaf node) type StateAccountModel struct { + BlockNumber string `db:"block_number"` HeaderID string `db:"header_id"` StatePath []byte `db:"state_path"` Balance string `db:"balance"` @@ -136,14 +145,15 @@ type StateAccountModel struct { // LogsModel is the db model for eth.logs type LogsModel struct { - ReceiptID string `db:"rct_id"` - LeafCID string `db:"leaf_cid"` - LeafMhKey string `db:"leaf_mh_key"` - Address string `db:"address"` - Index int64 `db:"index"` - Data []byte `db:"log_data"` - Topic0 string `db:"topic0"` - Topic1 string `db:"topic1"` - Topic2 string `db:"topic2"` - Topic3 string `db:"topic3"` + BlockNumber string `db:"block_number"` + ReceiptID string `db:"rct_id"` + LeafCID string `db:"leaf_cid"` + LeafMhKey string `db:"leaf_mh_key"` + Address string `db:"address"` + Index int64 `db:"index"` + Data []byte `db:"log_data"` + Topic0 string `db:"topic0"` + Topic1 string `db:"topic1"` + Topic2 string `db:"topic2"` + Topic3 string `db:"topic3"` }