From 9775355d2b8ca5f009c6b21229d253541f38bc58 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 17 Mar 2022 07:17:04 -0500 Subject: [PATCH 1/3] update sql indexer to use new v4 schema that denormalizes by block_number for the purposes of partitioning & sharding --- statediff/indexer/database/sql/batch_tx.go | 23 +-- statediff/indexer/database/sql/indexer.go | 132 ++++++++++-------- .../indexer/database/sql/postgres/database.go | 24 ++-- statediff/indexer/database/sql/writer.go | 47 ++++--- statediff/indexer/interfaces/interfaces.go | 2 +- statediff/indexer/models/batch.go | 81 ++++++----- statediff/indexer/models/models.go | 114 ++++++++------- 7 files changed, 233 insertions(+), 190 deletions(-) diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index fb1b289a1..a7c9a474a 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -29,9 +29,11 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/models" ) +const startingCacheCapacity = 1024 * 24 + // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber uint64 + BlockNumber string ctx context.Context dbtx Tx stm string @@ -48,7 +50,8 @@ func (tx *BatchTx) Submit(err error) error { } func (tx *BatchTx) flush() error { - _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) + _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys), + pq.Array(tx.ipldCache.Values)) if err != nil { return err } @@ -61,6 +64,7 @@ func (tx *BatchTx) cache() { for { select { case i := <-tx.iplds: + tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber) tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) case <-tx.quit: @@ -72,15 +76,17 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.iplds <- models.IPLDModel{ - Key: key, - Data: value, + BlockNumber: tx.BlockNumber, + Key: key, + Data: value, } } func (tx *BatchTx) cacheIPLD(i node.Node) { tx.iplds <- models.IPLDModel{ - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), - Data: i.RawData(), + BlockNumber: tx.BlockNumber, + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), } } @@ -91,8 +97,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() tx.iplds <- models.IPLDModel{ - Key: prefixedKey, - Data: raw, + BlockNumber: tx.BlockNumber, + Key: prefixedKey, + Data: raw, } return c.String(), prefixedKey, err } diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index e2667cecd..4db268dc7 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -141,12 +141,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }() blockTx := &BatchTx{ ctx: sdi.ctx, - BlockNumber: height, + BlockNumber: block.Number().String(), stm: sdi.dbWriter.db.InsertIPLDsStm(), iplds: make(chan models.IPLDModel), quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - dbtx: tx, + ipldCache: models.IPLDBatch{ + BlockNumbers: make([]string, 0, startingCacheCapacity), + Keys: make([]string, 0, startingCacheCapacity), + Values: make([][]byte, 0, startingCacheCapacity), + }, + dbtx: tx, // handle transaction commit or rollback for any return case submit: func(self *BatchTx, err error) error { defer func() { @@ -200,7 +204,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, height, uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) if err != nil { return nil, err } @@ -264,7 +268,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -273,15 +277,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - HeaderID: headerID, - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), + BlockNumber: blockNumber.String(), + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), } if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil { return err @@ -331,16 +336,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: txID, - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), - Value: val, + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + Value: val, } if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil { return err @@ -353,6 +359,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + BlockNumber: args.blockNumber.String(), TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -376,6 +383,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ + BlockNumber: args.blockNumber.String(), TxID: txID, Contract: contract, ContractHash: contractHash, @@ -406,16 +414,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - ReceiptID: txID, - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], + BlockNumber: args.blockNumber.String(), + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], } } @@ -434,7 +443,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -444,12 +453,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: shared.RemovedNodeStateCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), } return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel) } @@ -458,12 +468,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, - MhKey: stateMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } // index the state node if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil { @@ -483,6 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + BlockNumber: blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -500,13 +512,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: shared.RemovedNodeStorageCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err @@ -518,13 +531,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, - MhKey: storageMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 4cff518a0..b0aa49515 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -44,58 +44,58 @@ func (db *DB) InsertHeaderStm() string { // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { - return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) + return `INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (block_hash) DO NOTHING` } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { - return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (tx_hash) DO NOTHING` } // InsertAccessListElementStm satisfies the sql.Statements interface func (db *DB) InsertAccessListElementStm() string { - return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) + return `INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (tx_id, index) DO NOTHING` } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { - return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_id) DO NOTHING` } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (rct_id, index) DO NOTHING` } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)` } // InsertAccountStm satisfies the sql.Statements interface func (db *DB) InsertAccountStm() string { - return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) + return `INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO NOTHING` } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)` + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)` } // InsertIPLDStm satisfies the sql.Statements interface func (db *DB) InsertIPLDStm() string { - return `INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` + return `INSERT INTO public.blocks (block_number, key, data) VALUES ($1, $2, $3) ON CONFLICT (block_number, key) DO NOTHING` } // InsertIPLDsStm satisfies the sql.Statements interface func (db *DB) InsertIPLDsStm() string { - return `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING` + return `INSERT INTO public.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT (block_number, key) DO NOTHING` } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 3f1dfc0b5..bc543fa3f 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -62,12 +62,12 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { } /* -INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) +INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (block_hash) DO NOTHING */ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), - uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) + uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { return fmt.Errorf("error upserting uncle_cids entry: %v", err) } @@ -75,13 +75,13 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { } /* -INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (tx_hash) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, - transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) + transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) if err != nil { return fmt.Errorf("error upserting transaction_cids entry: %v", err) } @@ -90,12 +90,13 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { } /* -INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) +INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (tx_id, index) DO NOTHING */ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(), - accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) + accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, + accessListElement.StorageKeys) if err != nil { return fmt.Errorf("error upserting access_list_element entry: %v", err) } @@ -104,12 +105,13 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL } /* -INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_id) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) + rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, + rct.PostStatus, rct.LogRoot) if err != nil { return fmt.Errorf("error upserting receipt_cids entry: %w", err) } @@ -118,14 +120,14 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { } /* -INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (rct_id, index) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, - log.Topic3, log.Data) + log.BlockNumber, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, + log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) } @@ -135,8 +137,8 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { } /* -INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) +INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1 $3, $4, $6, $7, $8) */ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { var stateKey string @@ -144,7 +146,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateKey = stateNode.StateKey } _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) + stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, + stateNode.MhKey) if err != nil { return fmt.Errorf("error upserting state_cids entry: %v", err) } @@ -152,13 +155,13 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { } /* -INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) +INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), - stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, - stateAccount.StorageRoot) + stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { return fmt.Errorf("error upserting state_accounts entry: %v", err) } @@ -166,8 +169,8 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel } /* -INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) -ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) +INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9) */ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { var storageKey string @@ -175,8 +178,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageKey = storageCID.StorageKey } _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, - true, storageCID.MhKey) + storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, + storageCID.NodeType, true, storageCID.MhKey) if err != nil { return fmt.Errorf("error upserting storage_cids entry: %v", err) } diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 8f951230d..4ad9175a5 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go index 16096f292..94e9b4e96 100644 --- a/statediff/indexer/models/batch.go +++ b/statediff/indexer/models/batch.go @@ -20,12 +20,14 @@ import "github.com/lib/pq" // IPLDBatch holds the arguments for a batch insert of IPLD data type IPLDBatch struct { - Keys []string - Values [][]byte + BlockNumbers []string + Keys []string + Values [][]byte } // UncleBatch holds the arguments for a batch insert of uncle data type UncleBatch struct { + BlockNumbers []string HeaderID []string BlockHashes []string ParentHashes []string @@ -36,19 +38,21 @@ type UncleBatch struct { // TxBatch holds the arguments for a batch insert of tx data type TxBatch struct { - HeaderID string - Indexes []int64 - TxHashes []string - CIDs []string - MhKeys []string - Dsts []string - Srcs []string - Datas [][]byte - Types []uint8 + BlockNumbers []string + HeaderID string + Indexes []int64 + TxHashes []string + CIDs []string + MhKeys []string + Dsts []string + Srcs []string + Datas [][]byte + Types []uint8 } // AccessListBatch holds the arguments for a batch insert of access list data type AccessListBatch struct { + BlockNumbers []string Indexes []int64 TxIDs []string Addresses []string @@ -57,6 +61,7 @@ type AccessListBatch struct { // ReceiptBatch holds the arguments for a batch insert of receipt data type ReceiptBatch struct { + BlockNumbers []string TxIDs []string LeafCIDs []string LeafMhKeys []string @@ -69,31 +74,34 @@ type ReceiptBatch struct { // LogBatch holds the arguments for a batch insert of log data type LogBatch struct { - LeafCIDs []string - LeafMhKeys []string - ReceiptIDs []string - Addresses []string - Indexes []int64 - Datas [][]byte - Topic0s []string - Topic1s []string - Topic2s []string - Topic3s []string + BlockNumbers []string + LeafCIDs []string + LeafMhKeys []string + ReceiptIDs []string + Addresses []string + Indexes []int64 + Datas [][]byte + Topic0s []string + Topic1s []string + Topic2s []string + Topic3s []string } // StateBatch holds the arguments for a batch insert of state data type StateBatch struct { - HeaderID string - Paths [][]byte - StateKeys []string - NodeTypes []int - CIDs []string - MhKeys []string - Diff bool + BlockNumbers []string + HeaderID string + Paths [][]byte + StateKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } // AccountBatch holds the arguments for a batch insert of account data type AccountBatch struct { + BlockNumbers []string HeaderID string StatePaths [][]byte Balances []string @@ -104,12 +112,13 @@ type AccountBatch struct { // StorageBatch holds the arguments for a batch insert of storage data type StorageBatch struct { - HeaderID string - StatePaths [][]string - Paths [][]byte - StorageKeys []string - NodeTypes []int - CIDs []string - MhKeys []string - Diff bool + BlockNumbers []string + HeaderID string + StatePaths [][]string + Paths [][]byte + StorageKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 2caed1bcb..4e1cfa888 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -20,8 +20,9 @@ import "github.com/lib/pq" // IPLDModel is the db model for public.blocks type IPLDModel struct { - Key string `db:"key"` - Data []byte `db:"data"` + BlockNumber string `db:"block_number"` + Key string `db:"key"` + Data []byte `db:"data"` } // HeaderModel is the db model for eth.header_cids @@ -46,30 +47,33 @@ type HeaderModel struct { // UncleModel is the db model for eth.uncle_cids type UncleModel struct { - HeaderID string `db:"header_id"` - BlockHash string `db:"block_hash"` - ParentHash string `db:"parent_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Reward string `db:"reward"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + BlockHash string `db:"block_hash"` + ParentHash string `db:"parent_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Reward string `db:"reward"` } // TxModel is the db model for eth.transaction_cids type TxModel struct { - HeaderID string `db:"header_id"` - Index int64 `db:"index"` - TxHash string `db:"tx_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Dst string `db:"dst"` - Src string `db:"src"` - Data []byte `db:"tx_data"` - Type uint8 `db:"tx_type"` - Value string `db:"value"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + Index int64 `db:"index"` + TxHash string `db:"tx_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Dst string `db:"dst"` + Src string `db:"src"` + Data []byte `db:"tx_data"` + Type uint8 `db:"tx_type"` + Value string `db:"value"` } // AccessListElementModel is the db model for eth.access_list_entry type AccessListElementModel struct { + BlockNumber string `db:"block_number"` Index int64 `db:"index"` TxID string `db:"tx_id"` Address string `db:"address"` @@ -78,6 +82,7 @@ type AccessListElementModel struct { // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { + BlockNumber string `db:"block_number"` TxID string `db:"tx_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` @@ -90,42 +95,46 @@ type ReceiptModel struct { // StateNodeModel is the db model for eth.state_cids type StateNodeModel struct { - HeaderID string `db:"header_id"` - Path []byte `db:"state_path"` - StateKey string `db:"state_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + Path []byte `db:"state_path"` + StateKey string `db:"state_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StorageNodeModel is the db model for eth.storage_cids type StorageNodeModel struct { - HeaderID string `db:"header_id"` - StatePath []byte `db:"state_path"` - Path []byte `db:"storage_path"` - StorageKey string `db:"storage_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` + Path []byte `db:"storage_path"` + StorageKey string `db:"storage_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key type StorageNodeWithStateKeyModel struct { - HeaderID string `db:"header_id"` - StatePath []byte `db:"state_path"` - Path []byte `db:"storage_path"` - StateKey string `db:"state_leaf_key"` - StorageKey string `db:"storage_leaf_key"` - NodeType int `db:"node_type"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Diff bool `db:"diff"` + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` + Path []byte `db:"storage_path"` + StateKey string `db:"state_leaf_key"` + StorageKey string `db:"storage_leaf_key"` + NodeType int `db:"node_type"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Diff bool `db:"diff"` } // StateAccountModel is a db model for an eth state account (decoded value of state leaf node) type StateAccountModel struct { + BlockNumber string `db:"block_number"` HeaderID string `db:"header_id"` StatePath []byte `db:"state_path"` Balance string `db:"balance"` @@ -136,14 +145,15 @@ type StateAccountModel struct { // LogsModel is the db model for eth.logs type LogsModel struct { - ReceiptID string `db:"rct_id"` - LeafCID string `db:"leaf_cid"` - LeafMhKey string `db:"leaf_mh_key"` - Address string `db:"address"` - Index int64 `db:"index"` - Data []byte `db:"log_data"` - Topic0 string `db:"topic0"` - Topic1 string `db:"topic1"` - Topic2 string `db:"topic2"` - Topic3 string `db:"topic3"` + BlockNumber string `db:"block_number"` + ReceiptID string `db:"rct_id"` + LeafCID string `db:"leaf_cid"` + LeafMhKey string `db:"leaf_mh_key"` + Address string `db:"address"` + Index int64 `db:"index"` + Data []byte `db:"log_data"` + Topic0 string `db:"topic0"` + Topic1 string `db:"topic1"` + Topic2 string `db:"topic2"` + Topic3 string `db:"topic3"` } From 7bc4c7520c3d963086698f10a1181f98e467b69d Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 24 Mar 2022 08:01:26 -0500 Subject: [PATCH 2/3] update file writing mode --- statediff/indexer/database/file/batch_tx.go | 2 +- statediff/indexer/database/file/indexer.go | 162 +++++++++++--------- statediff/indexer/database/file/writer.go | 72 +++++---- statediff/indexer/database/sql/batch_tx.go | 27 +++- statediff/indexer/database/sql/indexer.go | 53 +++---- statediff/indexer/interfaces/interfaces.go | 2 +- 6 files changed, 175 insertions(+), 143 deletions(-) diff --git a/statediff/indexer/database/file/batch_tx.go b/statediff/indexer/database/file/batch_tx.go index 39e5d3713..d38bd1211 100644 --- a/statediff/indexer/database/file/batch_tx.go +++ b/statediff/indexer/database/file/batch_tx.go @@ -18,7 +18,7 @@ package file // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber uint64 + BlockNumber string submit func(blockTx *BatchTx, err error) error } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 870c1f259..045ac238a 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -23,6 +23,7 @@ import ( "math/big" "os" "sync" + "sync/atomic" "time" "github.com/ipfs/go-cid" @@ -53,10 +54,12 @@ var ( // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { - fileWriter *SQLWriter - chainConfig *params.ChainConfig - nodeID string - wg *sync.WaitGroup + fileWriter *SQLWriter + chainConfig *params.ChainConfig + nodeID string + wg *sync.WaitGroup + blockNumber string + removedCacheFlag *uint32 } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer @@ -77,7 +80,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c wg := new(sync.WaitGroup) w.Loop() w.upsertNode(config.NodeInfo) - w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{}) return &StateDiffIndexer{ fileWriter: w, chainConfig: chainConfig, @@ -92,6 +94,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { + sdi.removedCacheFlag = new(uint32) + sdi.blockNumber = block.Number().String() start, t := time.Now(), time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() @@ -127,7 +131,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() blockTx := &BatchTx{ - BlockNumber: height, + BlockNumber: sdi.blockNumber, submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) @@ -189,7 +193,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader write a header IPLD insert SQL stmt to a file // it returns the headerID func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { - sdi.fileWriter.upsertIPLDNode(headerNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode) var baseFee *string if header.BaseFee != nil { @@ -202,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -221,7 +225,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { // publish and index uncles for _, uncleNode := range uncleNodes { - sdi.fileWriter.upsertIPLDNode(uncleNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode) var uncleReward *big.Int // in PoA networks uncle reward is 0 if sdi.chainConfig.Clique != nil { @@ -230,12 +234,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } sdi.fileWriter.upsertUncleCID(models.UncleModel{ - HeaderID: headerID, - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), }) } } @@ -261,10 +266,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) for i, receipt := range args.receipts { for _, logTrieNode := range args.logTrieNodes[i] { - sdi.fileWriter.upsertIPLDNode(logTrieNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode) } txNode := args.txNodes[i] - sdi.fileWriter.upsertIPLDNode(txNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode) // index tx trx := args.txs[i] @@ -281,16 +286,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: txID, - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), - Value: val, + BlockNumber: sdi.blockNumber, + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + Value: val, } sdi.fileWriter.upsertTransactionCID(txModel) @@ -301,6 +307,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + BlockNumber: sdi.blockNumber, TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -322,6 +329,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } rctModel := &models.ReceiptModel{ + BlockNumber: sdi.blockNumber, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -349,16 +357,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } logDataSet[idx] = &models.LogsModel{ - ReceiptID: txID, - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], + BlockNumber: sdi.blockNumber, + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], } } sdi.fileWriter.upsertLogCID(logDataSet) @@ -366,8 +375,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { // publish trie nodes, these aren't indexed directly for i, n := range args.txTrieNodes { - sdi.fileWriter.upsertIPLDNode(n) - sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i]) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i]) } return nil @@ -377,30 +386,34 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { // publish the state node if stateNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(sdi.removedCacheFlag, 1) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: shared.RemovedNodeStateCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), } sdi.fileWriter.upsertStateCID(stateModel) return nil } - stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, - MhKey: stateMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } // index the state node sdi.fileWriter.upsertStateCID(stateModel) @@ -418,6 +431,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -430,32 +444,36 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { if storageNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(sdi.removedCacheFlag, 1) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: shared.RemovedNodeStorageCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } sdi.fileWriter.upsertStorageCID(storageModel) continue } - storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, - MhKey: storageMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } sdi.fileWriter.upsertStorageCID(storageModel) } @@ -470,7 +488,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd if err != nil { return fmt.Errorf("error deriving multihash key from codehash: %v", err) } - sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code) return nil } diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index 48de0853d..3c11b5eea 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -127,34 +127,35 @@ const ( nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " + "('%s', '%s', '%s', '%s', %d);\n" - ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n" + ipldInsert = "INSERT INTO public.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " + "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n" - uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s');\n" + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s');\n" - txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " + - "value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n" + txInsert = "INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " + + "value) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n" - alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n" + alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " + + "('%s', '%s', %d, '%s', '%s');\n" - rctInsert = "INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + - "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" + rctInsert = "INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + + "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" - logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + - "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" + logInsert = "INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + + "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" - stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + - "VALUES ('%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" + stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + + "VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" - accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " + - "VALUES ('%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" + accountInsert = "INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) " + + "VALUES ('%s', '%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" - storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + - "node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" + storageInsert = "INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, " + + "node_type, diff, mh_key) VALUES ('%s', '%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" ) func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { @@ -162,32 +163,35 @@ func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { } func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) { - sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data)) + sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.BlockNumber, ipld.Key, ipld.Data)) } -func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) { +func (sqw *SQLWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { sqw.upsertIPLD(models.IPLDModel{ - Key: key, - Data: value, + BlockNumber: blockNumber, + Key: key, + Data: value, }) } -func (sqw *SQLWriter) upsertIPLDNode(i node.Node) { +func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i node.Node) { sqw.upsertIPLD(models.IPLDModel{ - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), - Data: i.RawData(), + BlockNumber: blockNumber, + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), }) } -func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) { +func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) { c, err := ipld.RawdataToCid(codec, raw, mh) if err != nil { return "", "", err } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() sqw.upsertIPLD(models.IPLDModel{ - Key: prefixedKey, - Data: raw, + BlockNumber: blockNumber, + Key: prefixedKey, + Data: raw, }) return c.String(), prefixedKey, err } @@ -201,31 +205,31 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { } func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { - sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, + sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)) } func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { - sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)) indexerMetrics.transactions.Inc(1) } func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { - sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, + sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys))) indexerMetrics.accessListEntries.Inc(1) } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { - sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, + sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)) indexerMetrics.receipts.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { - sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, + sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data)) indexerMetrics.logs.Inc(1) } @@ -236,12 +240,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)) } func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) { - sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)) } @@ -250,6 +254,6 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) } diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index a7c9a474a..06bb49c9e 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -18,6 +18,7 @@ package sql import ( "context" + "sync/atomic" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -33,13 +34,14 @@ const startingCacheCapacity = 1024 * 24 // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - ctx context.Context - dbtx Tx - stm string - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch + BlockNumber string + ctx context.Context + dbtx Tx + stm string + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch + removedCacheFlag *uint32 submit func(blockTx *BatchTx, err error) error } @@ -104,6 +106,17 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error return c.String(), prefixedKey, err } +func (tx *BatchTx) cacheRemoved(key string, value []byte) { + if atomic.LoadUint32(tx.removedCacheFlag) == 0 { + atomic.StoreUint32(tx.removedCacheFlag, 1) + tx.iplds <- models.IPLDModel{ + BlockNumber: tx.BlockNumber, + Key: key, + Data: value, + } + } +} + // rollback sql transaction and log any error func rollback(ctx context.Context, tx Tx) { if err := tx.Rollback(ctx); err != nil { diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 4db268dc7..15fcd60b2 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -55,14 +55,11 @@ type StateDiffIndexer struct { ctx context.Context chainConfig *params.ChainConfig dbWriter *Writer + blockNumber string } // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) { - // Write the removed node to the db on init - if _, err := db.Exec(ctx, db.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { - return nil, err - } return &StateDiffIndexer{ ctx: ctx, chainConfig: chainConfig, @@ -93,6 +90,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { start, t := time.Now(), time.Now() + sdi.blockNumber = block.Number().String() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -140,11 +138,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } }() blockTx := &BatchTx{ - ctx: sdi.ctx, - BlockNumber: block.Number().String(), - stm: sdi.dbWriter.db.InsertIPLDsStm(), - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), + removedCacheFlag: new(uint32), + ctx: sdi.ctx, + BlockNumber: sdi.blockNumber, + stm: sdi.dbWriter.db.InsertIPLDsStm(), + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), ipldCache: models.IPLDBatch{ BlockNumbers: make([]string, 0, startingCacheCapacity), Keys: make([]string, 0, startingCacheCapacity), @@ -204,7 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes) if err != nil { return nil, err } @@ -253,7 +252,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -268,7 +267,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -277,10 +276,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - BlockNumber: blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -336,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: args.headerID, Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), @@ -359,7 +358,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -383,7 +382,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -414,7 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), @@ -443,17 +442,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // publish the state node if stateNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) stateModel := models.StateNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -468,7 +466,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -494,7 +492,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -509,10 +507,9 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { if storageNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) storageModel := models.StorageNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -531,7 +528,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 4ad9175a5..8f951230d 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer From 1e64f486d8e0671da87caa84013f595c120a9118 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 24 Mar 2022 10:09:48 -0500 Subject: [PATCH 3/3] update unit tests --- .../database/file/indexer_legacy_test.go | 11 +- .../indexer/database/file/indexer_test.go | 152 ++++++++------- .../database/sql/indexer_shared_test.go | 2 +- .../database/sql/pgx_indexer_legacy_test.go | 11 +- .../indexer/database/sql/pgx_indexer_test.go | 176 ++++++++++-------- .../indexer/database/sql/postgres/config.go | 6 +- .../database/sql/sqlx_indexer_legacy_test.go | 11 +- .../indexer/database/sql/sqlx_indexer_test.go | 164 ++++++++-------- statediff/indexer/mocks/test_data.go | 6 +- .../indexer/test_helpers/test_helpers.go | 12 -- 10 files changed, 286 insertions(+), 265 deletions(-) diff --git a/statediff/indexer/database/file/indexer_legacy_test.go b/statediff/indexer/database/file/indexer_legacy_test.go index 56bca2683..ed4bcf000 100644 --- a/statediff/indexer/database/file/indexer_legacy_test.go +++ b/statediff/indexer/database/file/indexer_legacy_test.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/mocks" - "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" ) var ( @@ -71,7 +70,7 @@ func setupLegacy(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) + require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber) connStr := postgres.DefaultConfig.DbConnectionString() @@ -123,10 +122,10 @@ func TestFileIndexerLegacy(t *testing.T) { err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header) require.NoError(t, err) - test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) - test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250") - test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockBlock.Coinbase().String()) + require.Equal(t, legacyHeaderCID.String(), header.CID) + require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "5000000000000011250", header.Reward) + require.Equal(t, legacyData.MockBlock.Coinbase().String(), header.Coinbase) require.Nil(t, legacyData.MockHeader.BaseFee) }) } diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go index ef849e8e8..bb03c1ec3 100644 --- a/statediff/indexer/database/file/indexer_test.go +++ b/statediff/indexer/database/file/indexer_test.go @@ -191,7 +191,7 @@ func setup(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) + require.Equal(t, mocks.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber) connStr := postgres.DefaultConfig.DbConnectionString() @@ -223,10 +223,10 @@ func TestFileIndexer(t *testing.T) { t.Fatal(err) } - test_helpers.ExpectEqual(t, header.CID, headerCID.String()) - test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") - test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String()) + require.Equal(t, headerCID.String(), header.CID) + require.Equal(t, mocks.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "2000000000000021250", header.Reward) + require.Equal(t, mocks.MockHeader.Coinbase.String(), header.Coinbase) dc, err := cid.Decode(header.CID) if err != nil { t.Fatal(err) @@ -238,7 +238,7 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp) + require.Equal(t, mocks.MockHeaderRlp, data) }) t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { setup(t) @@ -253,7 +253,7 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(trxs), 5) + require.Equal(t, 5, len(trxs)) expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) @@ -280,7 +280,7 @@ func TestFileIndexer(t *testing.T) { txTypeAndValueStr := `SELECT tx_type, value FROM eth.transaction_cids WHERE cid = $1` switch c { case trx1CID.String(): - test_helpers.ExpectEqual(t, data, tx1) + require.Equal(t, tx1, data) txRes := new(txResult) err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes) if err != nil { @@ -293,7 +293,7 @@ func TestFileIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value) } case trx2CID.String(): - test_helpers.ExpectEqual(t, data, tx2) + require.Equal(t, tx2, data) txRes := new(txResult) err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes) if err != nil { @@ -306,7 +306,7 @@ func TestFileIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value) } case trx3CID.String(): - test_helpers.ExpectEqual(t, data, tx3) + require.Equal(t, tx3, data) txRes := new(txResult) err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes) if err != nil { @@ -319,7 +319,7 @@ func TestFileIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value) } case trx4CID.String(): - test_helpers.ExpectEqual(t, data, tx4) + require.Equal(t, tx4, data) txRes := new(txResult) err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes) if err != nil { @@ -332,7 +332,9 @@ func TestFileIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` + pgStr = "SELECT cast(access_list_elements.block_number AS TEXT), access_list_elements.index, access_list_elements.tx_id, " + + "access_list_elements.address, access_list_elements.storage_keys FROM eth.access_list_elements " + + "INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC" err = sqlxdb.Select(&accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -341,18 +343,20 @@ func TestFileIndexer(t *testing.T) { t.Fatalf("expected two access list entries, got %d", len(accessListElementModels)) } model1 := models.AccessListElementModel{ - Index: accessListElementModels[0].Index, - Address: accessListElementModels[0].Address, + BlockNumber: mocks.BlockNumber.String(), + Index: accessListElementModels[0].Index, + Address: accessListElementModels[0].Address, } model2 := models.AccessListElementModel{ + BlockNumber: mocks.BlockNumber.String(), Index: accessListElementModels[1].Index, Address: accessListElementModels[1].Address, StorageKeys: accessListElementModels[1].StorageKeys, } - test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model) - test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model) + require.Equal(t, mocks.AccessListEntry1Model, model1) + require.Equal(t, mocks.AccessListEntry2Model, model2) case trx5CID.String(): - test_helpers.ExpectEqual(t, data, tx5) + require.Equal(t, tx5, data) txRes := new(txResult) err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes) if err != nil { @@ -402,7 +406,7 @@ func TestFileIndexer(t *testing.T) { // expecting MockLog1 and MockLog2 for mockReceipt4 expectedLogs := mocks.MockReceipts[i].Logs - test_helpers.ExpectEqual(t, len(results), len(expectedLogs)) + require.Equal(t, len(expectedLogs), len(results)) var nodeElements []interface{} for idx, r := range results { @@ -413,12 +417,12 @@ func TestFileIndexer(t *testing.T) { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // 2nd element of the leaf node contains the encoded log data. - test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte)) + require.Equal(t, nodeElements[1].([]byte), logRaw) } else { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // raw log was IPLDized - test_helpers.ExpectEqual(t, logRaw, r.Data) + require.Equal(t, r.Data, logRaw) } } } @@ -439,7 +443,7 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(rcts), 5) + require.Equal(t, 5, len(rcts)) expectTrue(t, test_helpers.ListContainsString(rcts, rct1CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct2CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct3CID.String())) @@ -465,7 +469,7 @@ func TestFileIndexer(t *testing.T) { expectedRct, err := mocks.MockReceipts[idx].MarshalBinary() require.NoError(t, err) - test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte)) + require.Equal(t, expectedRct, nodeElements[1].([]byte)) dc, err := cid.Decode(c) if err != nil { @@ -481,46 +485,46 @@ func TestFileIndexer(t *testing.T) { postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` switch c { case rct1CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf1) + require.Equal(t, rctLeaf1, data) var postStatus uint64 pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1` err = sqlxdb.Get(&postStatus, pgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus) + require.Equal(t, mocks.ExpectedPostStatus, postStatus) case rct2CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf2) + require.Equal(t, rctLeaf2, data) var postState string err = sqlxdb.Get(&postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1) + require.Equal(t, mocks.ExpectedPostState1, postState) case rct3CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf3) + require.Equal(t, rctLeaf3, data) var postState string err = sqlxdb.Get(&postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2) + require.Equal(t, mocks.ExpectedPostState2, postState) case rct4CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf4) + require.Equal(t, rctLeaf4, data) var postState string err = sqlxdb.Get(&postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) case rct5CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf5) + require.Equal(t, rctLeaf5, data) var postState string err = sqlxdb.Get(&postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) } } }) @@ -539,7 +543,7 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 2) + require.Equal(t, 2, len(stateNodes)) for _, stateNode := range stateNodes { var data []byte dc, err := cid.Decode(stateNode.CID) @@ -552,39 +556,41 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` + pgStr = `SELECT cast(block_number AS TEXT), header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel err = sqlxdb.Get(&account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } if stateNode.CID == state1CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) - test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.ContractLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x06'}, stateNode.Path) + require.Equal(t, mocks.ContractLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, Nonce: 1, - }) + }, account) } if stateNode.CID == state2CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) - test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.AccountLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x0c'}, stateNode.Path) + require.Equal(t, mocks.AccountLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, Nonce: 0, - }) + }, account) } } @@ -597,7 +603,7 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 1) + require.Equal(t, 1, len(stateNodes)) stateNode := stateNodes[0] var data []byte dc, err := cid.Decode(stateNode.CID) @@ -606,14 +612,14 @@ func TestFileIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, shared.RemovedNodeStateCID, stateNode.CID) + require.Equal(t, []byte{'\x02'}, stateNode.Path) + require.Equal(t, []byte{}, data) }) t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { @@ -623,7 +629,7 @@ func TestFileIndexer(t *testing.T) { // check that storage nodes were properly indexed storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) - pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr := `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -633,14 +639,15 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: storageCID.String(), - NodeType: 2, - StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: storageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + }, storageNodes[0]) var data []byte dc, err := cid.Decode(storageNodes[0].CID) if err != nil { @@ -652,11 +659,11 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode) + require.Equal(t, mocks.StorageLeafNode, data) // check that Removed storage nodes were properly indexed storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) - pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr = `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -666,25 +673,26 @@ func TestFileIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: shared.RemovedNodeStorageCID, - NodeType: 3, - StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{'\x03'}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: shared.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }, storageNodes[0]) dc, err = cid.Decode(storageNodes[0].CID) if err != nil { t.Fatal(err) } mhKey = dshelp.MultihashToDsKey(dc.Hash()) prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, []byte{}, data) }) } diff --git a/statediff/indexer/database/sql/indexer_shared_test.go b/statediff/indexer/database/sql/indexer_shared_test.go index 0351fb134..394ae0731 100644 --- a/statediff/indexer/database/sql/indexer_shared_test.go +++ b/statediff/indexer/database/sql/indexer_shared_test.go @@ -23,7 +23,7 @@ var ( err error ind interfaces.StateDiffIndexer ipfsPgGet = `SELECT data FROM public.blocks - WHERE key = $1` + WHERE key = $1 AND block_number = $2` tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte mockBlock *types.Block headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 22e695c1d..bb3b36446 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" ) func setupLegacyPGX(t *testing.T) { @@ -56,7 +55,7 @@ func setupLegacyPGX(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) + require.Equal(t, legacyData.BlockNumber.String(), tx.(*sql.BatchTx).BlockNumber) } func TestLegacyPGXIndexer(t *testing.T) { @@ -81,10 +80,10 @@ func TestLegacyPGXIndexer(t *testing.T) { &header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.Coinbase) require.NoError(t, err) - test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) - test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250") - test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String()) + require.Equal(t, legacyHeaderCID.String(), header.CID) + require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "5000000000000011250", header.Reward) + require.Equal(t, legacyData.MockHeader.Coinbase.String(), header.Coinbase) require.Nil(t, legacyData.MockHeader.BaseFee) }) } diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index ec5b94fd5..197dda04c 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "github.com/lib/pq" + "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -62,7 +64,7 @@ func setupPGX(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) + require.Equal(t, mocks.BlockNumber.String(), tx.(*sql.BatchTx).BlockNumber) } func TestPGXIndexer(t *testing.T) { @@ -91,10 +93,10 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, header.CID, headerCID.String()) - test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") - test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String()) + require.Equal(t, headerCID.String(), header.CID) + require.Equal(t, mocks.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "2000000000000021250", header.Reward) + require.Equal(t, mocks.MockHeader.Coinbase.String(), header.Coinbase) dc, err := cid.Decode(header.CID) if err != nil { t.Fatal(err) @@ -102,11 +104,11 @@ func TestPGXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp) + require.Equal(t, mocks.MockHeaderRlp, data) }) t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { @@ -121,7 +123,7 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(trxs), 5) + require.Equal(t, 5, len(trxs)) expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) @@ -141,14 +143,14 @@ func TestPGXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } txTypeAndValueStr := `SELECT tx_type, CAST(value as TEXT) FROM eth.transaction_cids WHERE cid = $1` switch c { case trx1CID.String(): - test_helpers.ExpectEqual(t, data, tx1) + require.Equal(t, tx1, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value) if err != nil { @@ -161,7 +163,7 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value) } case trx2CID.String(): - test_helpers.ExpectEqual(t, data, tx2) + require.Equal(t, tx2, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value) if err != nil { @@ -174,7 +176,7 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value) } case trx3CID.String(): - test_helpers.ExpectEqual(t, data, tx3) + require.Equal(t, tx3, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value) if err != nil { @@ -187,7 +189,7 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value) } case trx4CID.String(): - test_helpers.ExpectEqual(t, data, tx4) + require.Equal(t, tx4, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value) if err != nil { @@ -199,8 +201,18 @@ func TestPGXIndexer(t *testing.T) { if txRes.Value != transactions[3].Value().String() { t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value) } + // AccessListElementModel is the db model for eth.access_list_entry + type AccessListElementModel struct { + BlockNumber string `db:"block_number"` + Index int64 `db:"index"` + TxID string `db:"tx_id"` + Address string `db:"address"` + StorageKeys pq.StringArray `db:"storage_keys"` + } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` + pgStr = "SELECT cast(access_list_elements.block_number AS TEXT), access_list_elements.index, access_list_elements.tx_id, " + + "access_list_elements.address, access_list_elements.storage_keys FROM eth.access_list_elements " + + "INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC" err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -209,18 +221,20 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected two access list entries, got %d", len(accessListElementModels)) } model1 := models.AccessListElementModel{ - Index: accessListElementModels[0].Index, - Address: accessListElementModels[0].Address, + BlockNumber: mocks.BlockNumber.String(), + Index: accessListElementModels[0].Index, + Address: accessListElementModels[0].Address, } model2 := models.AccessListElementModel{ + BlockNumber: mocks.BlockNumber.String(), Index: accessListElementModels[1].Index, Address: accessListElementModels[1].Address, StorageKeys: accessListElementModels[1].StorageKeys, } - test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model) - test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model) + require.Equal(t, mocks.AccessListEntry1Model, model1) + require.Equal(t, mocks.AccessListEntry2Model, model2) case trx5CID.String(): - test_helpers.ExpectEqual(t, data, tx5) + require.Equal(t, tx5, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value) if err != nil { @@ -272,7 +286,7 @@ func TestPGXIndexer(t *testing.T) { require.NoError(t, err) expectedLogs := mocks.MockReceipts[i].Logs - test_helpers.ExpectEqual(t, len(results), len(expectedLogs)) + require.Equal(t, len(expectedLogs), len(results)) var nodeElements []interface{} for idx, r := range results { @@ -283,12 +297,12 @@ func TestPGXIndexer(t *testing.T) { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // 2nd element of the leaf node contains the encoded log data. - test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte)) + require.Equal(t, nodeElements[1].([]byte), logRaw) } else { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // raw log was IPLDized - test_helpers.ExpectEqual(t, logRaw, r.Data) + require.Equal(t, r.Data, logRaw) } } } @@ -309,7 +323,7 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(rcts), 5) + require.Equal(t, 5, len(rcts)) expectTrue(t, test_helpers.ListContainsString(rcts, rct1CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct2CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct3CID.String())) @@ -335,7 +349,7 @@ func TestPGXIndexer(t *testing.T) { expectedRct, err := mocks.MockReceipts[idx].MarshalBinary() require.NoError(t, err) - test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte)) + require.Equal(t, nodeElements[1].([]byte), expectedRct) dc, err := cid.Decode(c) if err != nil { @@ -344,7 +358,7 @@ func TestPGXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } @@ -352,46 +366,46 @@ func TestPGXIndexer(t *testing.T) { postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` switch c { case rct1CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf1) + require.Equal(t, rctLeaf1, data) var postStatus uint64 pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1` err = db.Get(context.Background(), &postStatus, pgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus) + require.Equal(t, mocks.ExpectedPostStatus, postStatus) case rct2CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf2) + require.Equal(t, rctLeaf2, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1) + require.Equal(t, mocks.ExpectedPostState1, postState) case rct3CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf3) + require.Equal(t, rctLeaf3, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2) + require.Equal(t, mocks.ExpectedPostState2, postState) case rct4CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf4) + require.Equal(t, rctLeaf4, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) case rct5CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf5) + require.Equal(t, rctLeaf5, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) } } }) @@ -409,7 +423,7 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 2) + require.Equal(t, 2, len(stateNodes)) for _, stateNode := range stateNodes { var data []byte dc, err := cid.Decode(stateNode.CID) @@ -418,43 +432,45 @@ func TestPGXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - pgStr = `SELECT header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2` + pgStr = `SELECT cast(block_number AS TEXT), header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } if stateNode.CID == state1CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) - test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.ContractLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x06'}, stateNode.Path) + require.Equal(t, mocks.ContractLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, Nonce: 1, - }) + }, account) } if stateNode.CID == state2CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) - test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.AccountLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x0c'}, stateNode.Path) + require.Equal(t, mocks.AccountLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, Nonce: 0, - }) + }, account) } } @@ -467,7 +483,7 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 1) + require.Equal(t, 1, len(stateNodes)) stateNode := stateNodes[0] var data []byte dc, err := cid.Decode(stateNode.CID) @@ -476,14 +492,14 @@ func TestPGXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, shared.RemovedNodeStateCID, stateNode.CID) + require.Equal(t, []byte{'\x02'}, stateNode.Path) + require.Equal(t, []byte{}, data) }) t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { @@ -492,7 +508,7 @@ func TestPGXIndexer(t *testing.T) { defer checkTxClosure(t, 1, 0, 1) // check that storage nodes were properly indexed storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) - pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr := `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -502,14 +518,15 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: storageCID.String(), - NodeType: 2, - StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: storageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + }, storageNodes[0]) var data []byte dc, err := cid.Decode(storageNodes[0].CID) if err != nil { @@ -517,15 +534,15 @@ func TestPGXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode) + require.Equal(t, mocks.StorageLeafNode, data) // check that Removed storage nodes were properly indexed storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) - pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr = `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -535,25 +552,26 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: shared.RemovedNodeStorageCID, - NodeType: 3, - StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{'\x03'}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: shared.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }, storageNodes[0]) dc, err = cid.Decode(storageNodes[0].CID) if err != nil { t.Fatal(err) } mhKey = dshelp.MultihashToDsKey(dc.Hash()) prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, []byte{}, data) }) } diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 095b4dd24..4f4fd304d 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) { var DefaultConfig = Config{ Hostname: "localhost", Port: 5432, - DatabaseName: "vulcanize_testing_v3", - Username: "vdbm", - Password: "password", + DatabaseName: "vulcanize_testing_v4", + Username: "postgres", + Password: "", } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go index 4a6594eb3..daaa1550c 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/mocks" - "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" ) var ( @@ -66,7 +65,7 @@ func setupLegacySQLX(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) + require.Equal(t, legacyData.BlockNumber.String(), tx.(*sql.BatchTx).BlockNumber) } func TestLegacySQLXIndexer(t *testing.T) { @@ -89,10 +88,10 @@ func TestLegacySQLXIndexer(t *testing.T) { err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) require.NoError(t, err) - test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) - test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250") - test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String()) + require.Equal(t, legacyHeaderCID.String(), header.CID) + require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "5000000000000011250", header.Reward) + require.Equal(t, legacyData.MockHeader.Coinbase.String(), header.Coinbase) require.Nil(t, legacyData.MockHeader.BaseFee) }) } diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index 65c0a7615..d64cb90b7 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -63,7 +63,7 @@ func setupSQLX(t *testing.T) { require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) + require.Equal(t, mocks.BlockNumber.String(), tx.(*sql.BatchTx).BlockNumber) } func TestSQLXIndexer(t *testing.T) { @@ -87,10 +87,10 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, header.CID, headerCID.String()) - test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) - test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") - test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String()) + require.Equal(t, headerCID.String(), header.CID) + require.Equal(t, mocks.MockBlock.Difficulty().String(), header.TD) + require.Equal(t, "2000000000000021250", header.Reward) + require.Equal(t, mocks.MockHeader.Coinbase.String(), header.Coinbase) dc, err := cid.Decode(header.CID) if err != nil { t.Fatal(err) @@ -98,11 +98,11 @@ func TestSQLXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp) + require.Equal(t, mocks.MockHeaderRlp, data) }) t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { @@ -117,7 +117,7 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(trxs), 5) + require.Equal(t, 5, len(trxs)) expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) @@ -137,14 +137,14 @@ func TestSQLXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } txTypeAndValueStr := `SELECT tx_type, value FROM eth.transaction_cids WHERE cid = $1` switch c { case trx1CID.String(): - test_helpers.ExpectEqual(t, data, tx1) + require.Equal(t, tx1, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes) if err != nil { @@ -157,7 +157,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value) } case trx2CID.String(): - test_helpers.ExpectEqual(t, data, tx2) + require.Equal(t, tx2, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes) if err != nil { @@ -170,7 +170,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value) } case trx3CID.String(): - test_helpers.ExpectEqual(t, data, tx3) + require.Equal(t, tx3, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes) if err != nil { @@ -183,7 +183,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value) } case trx4CID.String(): - test_helpers.ExpectEqual(t, data, tx4) + require.Equal(t, tx4, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes) if err != nil { @@ -196,7 +196,9 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` + pgStr = "SELECT cast(access_list_elements.block_number AS TEXT), access_list_elements.index, access_list_elements.tx_id, " + + "access_list_elements.address, access_list_elements.storage_keys FROM eth.access_list_elements " + + "INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC" err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -205,18 +207,20 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected two access list entries, got %d", len(accessListElementModels)) } model1 := models.AccessListElementModel{ - Index: accessListElementModels[0].Index, - Address: accessListElementModels[0].Address, + BlockNumber: mocks.BlockNumber.String(), + Index: accessListElementModels[0].Index, + Address: accessListElementModels[0].Address, } model2 := models.AccessListElementModel{ + BlockNumber: mocks.BlockNumber.String(), Index: accessListElementModels[1].Index, Address: accessListElementModels[1].Address, StorageKeys: accessListElementModels[1].StorageKeys, } - test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model) - test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model) + require.Equal(t, mocks.AccessListEntry1Model, model1) + require.Equal(t, mocks.AccessListEntry2Model, model2) case trx5CID.String(): - test_helpers.ExpectEqual(t, data, tx5) + require.Equal(t, tx5, data) txRes := new(txResult) err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes) if err != nil { @@ -266,7 +270,7 @@ func TestSQLXIndexer(t *testing.T) { require.NoError(t, err) expectedLogs := mocks.MockReceipts[i].Logs - test_helpers.ExpectEqual(t, len(results), len(expectedLogs)) + require.Equal(t, len(expectedLogs), len(results)) var nodeElements []interface{} for idx, r := range results { @@ -277,12 +281,12 @@ func TestSQLXIndexer(t *testing.T) { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // 2nd element of the leaf node contains the encoded log data. - test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte)) + require.Equal(t, nodeElements[1].([]byte), logRaw) } else { logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) require.NoError(t, err) // raw log was IPLDized - test_helpers.ExpectEqual(t, logRaw, r.Data) + require.Equal(t, r.Data, logRaw) } } } @@ -303,7 +307,7 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(rcts), 5) + require.Equal(t, 5, len(rcts)) expectTrue(t, test_helpers.ListContainsString(rcts, rct1CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct2CID.String())) expectTrue(t, test_helpers.ListContainsString(rcts, rct3CID.String())) @@ -329,7 +333,7 @@ func TestSQLXIndexer(t *testing.T) { expectedRct, err := mocks.MockReceipts[idx].MarshalBinary() require.NoError(t, err) - test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte)) + require.Equal(t, nodeElements[1].([]byte), expectedRct) dc, err := cid.Decode(c) if err != nil { @@ -338,53 +342,53 @@ func TestSQLXIndexer(t *testing.T) { mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() var data []byte - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` switch c { case rct1CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf1) + require.Equal(t, rctLeaf1, data) var postStatus uint64 pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1` err = db.Get(context.Background(), &postStatus, pgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus) + require.Equal(t, mocks.ExpectedPostStatus, postStatus) case rct2CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf2) + require.Equal(t, rctLeaf2, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1) + require.Equal(t, mocks.ExpectedPostState1, postState) case rct3CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf3) + require.Equal(t, rctLeaf3, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2) + require.Equal(t, mocks.ExpectedPostState2, postState) case rct4CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf4) + require.Equal(t, rctLeaf4, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) case rct5CID.String(): - test_helpers.ExpectEqual(t, data, rctLeaf5) + require.Equal(t, rctLeaf5, data) var postState string err = db.Get(context.Background(), &postState, postStatePgStr, c) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + require.Equal(t, mocks.ExpectedPostState3, postState) } } }) @@ -402,7 +406,7 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 2) + require.Equal(t, 2, len(stateNodes)) for _, stateNode := range stateNodes { var data []byte dc, err := cid.Decode(stateNode.CID) @@ -411,7 +415,7 @@ func TestSQLXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } @@ -422,32 +426,34 @@ func TestSQLXIndexer(t *testing.T) { t.Fatal(err) } if stateNode.CID == state1CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) - test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.ContractLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x06'}, stateNode.Path) + require.Equal(t, mocks.ContractLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, Nonce: 1, - }) + }, account) } if stateNode.CID == state2CID.String() { - test_helpers.ExpectEqual(t, stateNode.NodeType, 2) - test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex()) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) - test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) - test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + require.Equal(t, 2, stateNode.NodeType) + require.Equal(t, common.BytesToHash(mocks.AccountLeafKey).Hex(), stateNode.StateKey) + require.Equal(t, []byte{'\x0c'}, stateNode.Path) + require.Equal(t, mocks.AccountLeafNode, data) + require.Equal(t, models.StateAccountModel{ + BlockNumber: mocks.BlockNumber.String(), HeaderID: account.HeaderID, StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, Nonce: 0, - }) + }, account) } } @@ -460,7 +466,7 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(stateNodes), 1) + require.Equal(t, 1, len(stateNodes)) stateNode := stateNodes[0] var data []byte dc, err := cid.Decode(stateNode.CID) @@ -469,14 +475,14 @@ func TestSQLXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) - test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, shared.RemovedNodeStateCID, stateNode.CID) + require.Equal(t, []byte{'\x02'}, stateNode.Path) + require.Equal(t, []byte{}, data) }) t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { @@ -485,7 +491,7 @@ func TestSQLXIndexer(t *testing.T) { defer checkTxClosure(t, 0, 0, 0) // check that storage nodes were properly indexed storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) - pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr := `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -495,14 +501,15 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: storageCID.String(), - NodeType: 2, - StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: storageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + }, storageNodes[0]) var data []byte dc, err := cid.Decode(storageNodes[0].CID) if err != nil { @@ -510,15 +517,15 @@ func TestSQLXIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode) + require.Equal(t, mocks.StorageLeafNode, data) // check that Removed storage nodes were properly indexed storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) - pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + pgStr = `SELECT cast(storage_cids.block_number AS TEXT), storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) AND state_cids.header_id = header_cids.block_hash @@ -528,25 +535,26 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, len(storageNodes), 1) - test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: shared.RemovedNodeStorageCID, - NodeType: 3, - StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{'\x03'}, - }) + require.Equal(t, 1, len(storageNodes)) + require.Equal(t, models.StorageNodeWithStateKeyModel{ + BlockNumber: mocks.BlockNumber.String(), + CID: shared.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }, storageNodes[0]) dc, err = cid.Decode(storageNodes[0].CID) if err != nil { t.Fatal(err) } mhKey = dshelp.MultihashToDsKey(dc.Hash()) prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) - err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + require.Equal(t, shared.RemovedNodeMhKey, prefixedKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, data, []byte{}) + require.Equal(t, []byte{}, data) }) } diff --git a/statediff/indexer/mocks/test_data.go b/statediff/indexer/mocks/test_data.go index e5d72e5ba..dc7e7638f 100644 --- a/statediff/indexer/mocks/test_data.go +++ b/statediff/indexer/mocks/test_data.go @@ -106,10 +106,12 @@ var ( StorageKeys: []common.Hash{common.BytesToHash(StorageLeafKey), common.BytesToHash(MockStorageLeafKey)}, } AccessListEntry1Model = models.AccessListElementModel{ - Index: 0, - Address: Address.Hex(), + BlockNumber: BlockNumber.String(), + Index: 0, + Address: Address.Hex(), } AccessListEntry2Model = models.AccessListElementModel{ + BlockNumber: BlockNumber.String(), Index: 1, Address: AnotherAddress.Hex(), StorageKeys: []string{common.BytesToHash(StorageLeafKey).Hex(), common.BytesToHash(MockStorageLeafKey).Hex()}, diff --git a/statediff/indexer/test_helpers/test_helpers.go b/statediff/indexer/test_helpers/test_helpers.go index b519d80b5..6073db434 100644 --- a/statediff/indexer/test_helpers/test_helpers.go +++ b/statediff/indexer/test_helpers/test_helpers.go @@ -16,18 +16,6 @@ package test_helpers -import ( - "reflect" - "testing" -) - -// ExpectEqual asserts the provided interfaces are deep equal -func ExpectEqual(t *testing.T, got interface{}, want interface{}) { - if !reflect.DeepEqual(got, want) { - t.Fatalf("Expected: %v\nActual: %v", want, got) - } -} - // ListContainsString used to check if a list of strings contains a particular string func ListContainsString(sss []string, s string) bool { for _, str := range sss {