diff --git a/docker-compose.yml b/docker-compose.yml index cbd0070e..63cb3120 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,7 +18,7 @@ services: ipld-eth-db: restart: always - image: vulcanize/ipld-eth-db:v0.2.0 + image: vulcanize/ipld-eth-db:latest environment: POSTGRES_USER: "vdbm" POSTGRES_DB: "vulcanize_testing" diff --git a/go.mod b/go.mod index 91bab53e..0ffd62fe 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipld-format v0.2.0 + github.com/jackc/pgx/v4 v4.13.0 github.com/jmoiron/sqlx v1.2.0 github.com/lib/pq v1.10.2 github.com/machinebox/graphql v0.2.2 diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index 0dd703f9..f434d83f 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -196,9 +196,14 @@ var _ = Describe("API", func() { tx interfaces.Batch ) - goodInfo := node.Info{GenesisBlock: "GENESIS1", NetworkID: "1", ID: "1", ClientName: "geth5", ChainID: 1} - // DefaultConfig are default parameters for connecting to a Postgres sql - db, err = eth.Setup(ctx, goodInfo) + testInfo := node.Info{ + GenesisBlock: test_helpers.Genesis.Hash().String(), + NetworkID: "1", + ID: "1", + ClientName: "geth", + ChainID: params.TestChainConfig.ChainID.Uint64(), + } + db, err = eth.Setup(ctx, testInfo) Expect(err).ToNot(HaveOccurred()) indexAndPublisher, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) Expect(err).ToNot(HaveOccurred()) @@ -220,11 +225,6 @@ var _ = Describe("API", func() { tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) Expect(err).ToNot(HaveOccurred()) - for _, node := range test_helpers.MockStateNodes { - err = indexAndPublisher.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) - Expect(err).ToNot(HaveOccurred()) - } - ccHash := sdtypes.CodeAndCodeHash{ Hash: test_helpers.ContractCodeHash, Code: test_helpers.ContractCode, @@ -233,6 +233,11 @@ var _ = Describe("API", func() { err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash) Expect(err).ToNot(HaveOccurred()) + for _, node := range test_helpers.MockStateNodes { + err = indexAndPublisher.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) + Expect(err).ToNot(HaveOccurred()) + } + err = tx.Submit(err) Expect(err).ToNot(HaveOccurred()) @@ -272,7 +277,7 @@ var _ = Describe("API", func() { It("Throws an error if a header cannot be found", func() { header, err := api.GetHeaderByNumber(ctx, wrongNumber) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("no rows in result set")) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) Expect(header).To(BeNil()) }) }) diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index ec7ef407..0f390aaa 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( "context" + dbsql "database/sql" "errors" "fmt" "math/big" @@ -60,21 +61,20 @@ var ( // errMissingSignature is returned if a block's extra-data section doesn't seem // to contain a 65 byte secp256k1 signature. errMissingSignature = errors.New("extra-data 65 byte signature suffix missing") - errNoRows = errors.New("sql: no rows in result set") ) const ( RetrieveCanonicalBlockHashByNumber = `SELECT block_hash FROM eth.header_cids INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key) - WHERE id = (SELECT canonical_header_id($1))` + WHERE block_hash = (SELECT canonical_header_id($1))` RetrieveCanonicalHeaderByNumber = `SELECT cid, data FROM eth.header_cids INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key) - WHERE id = (SELECT canonical_header_id($1))` + WHERE block_hash = (SELECT canonical_header_id($1))` RetrieveTD = `SELECT td FROM eth.header_cids WHERE header_cids.block_hash = $1` RetrieveRPCTransaction = `SELECT blocks.data, block_hash, block_number, index FROM public.blocks, eth.transaction_cids, eth.header_cids WHERE blocks.key = transaction_cids.mh_key - AND transaction_cids.header_id = header_cids.id + AND transaction_cids.header_id = header_cids.block_hash AND transaction_cids.tx_hash = $1` RetrieveCodeHashByLeafKeyAndBlockHash = `SELECT code_hash FROM eth.state_accounts, eth.state_cids, eth.header_cids WHERE state_accounts.header_id = state_cids.header_id AND state_accounts.state_path = state_cids.state_path @@ -83,7 +83,7 @@ const ( AND block_number <= (SELECT block_number FROM eth.header_cids WHERE block_hash = $2) - AND header_cids.id = (SELECT canonical_header_id(block_number)) + AND header_cids.block_hash = (SELECT canonical_header_id(block_number)) ORDER BY block_number DESC LIMIT 1` RetrieveCodeByMhKey = `SELECT data FROM public.blocks WHERE key = $1` @@ -293,7 +293,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber // Get the canonical hash canonicalHash, err := b.GetCanonicalHash(ctx, uint64(number)) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -302,7 +302,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber // TODO: optimize this by retrieving iplds directly rather than the cids first (this is remanent from when we fetched iplds through ipfs blockservice interface) headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(ctx, canonicalHash) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -328,7 +328,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber var headerIPLD models.IPLDModel headerIPLD, err = b.Fetcher.FetchHeader(ctx, tx, headerCID) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -398,7 +398,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo // Retrieve all the CIDs for the block headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(ctx, hash) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -424,7 +424,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo var headerIPLD models.IPLDModel headerIPLD, err = b.Fetcher.FetchHeader(ctx, tx, headerCID) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -438,7 +438,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo var uncleIPLDs []models.IPLDModel uncleIPLDs, err = b.Fetcher.FetchUncles(ctx, tx, uncleCIDs) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -456,7 +456,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo var txIPLDs []models.IPLDModel txIPLDs, err = b.Fetcher.FetchTrxs(ctx, tx, txCIDs) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -474,7 +474,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo var rctIPLDs []models.IPLDModel rctIPLDs, err = b.Fetcher.FetchRcts(ctx, tx, rctCIDs) if err != nil { - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, nil } return nil, err @@ -659,7 +659,7 @@ func (b *Backend) GetAccountByNumber(ctx context.Context, address common.Address return nil, errPendingBlockNumber } hash, err := b.GetCanonicalHash(ctx, uint64(number)) - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, errHeaderNotFound } else if err != nil { return nil, err @@ -671,7 +671,7 @@ func (b *Backend) GetAccountByNumber(ctx context.Context, address common.Address // GetAccountByHash returns the account object for the provided address at the block with the provided hash func (b *Backend) GetAccountByHash(ctx context.Context, address common.Address, hash common.Hash) (*types.StateAccount, error) { _, err := b.HeaderByHash(context.Background(), hash) - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, errHeaderHashNotFound } else if err != nil { return nil, err @@ -745,17 +745,19 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has err = tx.Commit(ctx) } }() - err = tx.QueryRow(ctx, RetrieveCodeHashByLeafKeyAndBlockHash, leafKey.Hex(), hash.Hex()).Scan(&codeHash) + err = tx.Get(ctx, &codeHash, RetrieveCodeHashByLeafKeyAndBlockHash, leafKey.Hex(), hash.Hex()) if err != nil { return nil, err } + var mhKey string mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) if err != nil { return nil, err } + code := make([]byte, 0) - err = tx.QueryRow(ctx, RetrieveCodeByMhKey, mhKey).Scan(&code) + err = tx.Get(ctx, &code, RetrieveCodeByMhKey, mhKey) return code, err } @@ -790,7 +792,7 @@ func (b *Backend) GetStorageByNumber(ctx context.Context, address common.Address return nil, errPendingBlockNumber } hash, err := b.GetCanonicalHash(ctx, uint64(number)) - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, errHeaderNotFound } else if err != nil { return nil, err @@ -802,7 +804,7 @@ func (b *Backend) GetStorageByNumber(ctx context.Context, address common.Address // GetStorageByHash returns the storage value for the provided contract address an storage key at the block corresponding to the provided hash func (b *Backend) GetStorageByHash(ctx context.Context, address common.Address, key, hash common.Hash) (hexutil.Bytes, error) { _, err := b.HeaderByHash(context.Background(), hash) - if err == errNoRows { + if err == dbsql.ErrNoRows { return nil, errHeaderHashNotFound } else if err != nil { return nil, err diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index f724079a..4072567e 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -171,7 +171,8 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(ctx context.Context, tx sql.Tx, bloc headers := make([]models.HeaderModel, 0) pgStr := `SELECT * FROM eth.header_cids WHERE block_number = $1` - return headers, tx.QueryRow(ctx, pgStr, blockNumber).Scan(&headers) + + return headers, tx.Select(ctx, &headers, pgStr, blockNumber) } // RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header @@ -181,7 +182,7 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(ctx context.Context, tx sql pgStr := `SELECT * FROM eth.uncle_cids WHERE header_id = $1` - return headers, tx.QueryRow(ctx, pgStr, headerID).Scan(headers) + return headers, tx.Select(ctx, &headers, pgStr, headerID) } // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters @@ -191,11 +192,11 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(ctx context.Context, tx sql.Tx, txFilter args := make([]interface{}, 0, 3) results := make([]models.TxModel, 0) id := 1 - pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, + pgStr := fmt.Sprintf(`SELECT transaction_cids.tx_hash, transaction_cids.header_id, transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.dst, transaction_cids.src, transaction_cids.index, transaction_cids.tx_data - FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) - WHERE header_cids.id = $%d`, id) + FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_hash = $%d`, id) args = append(args, headerID) id++ if len(txFilter.Dst) > 0 { @@ -208,7 +209,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(ctx context.Context, tx sql.Tx, txFilter args = append(args, pq.Array(txFilter.Src)) } pgStr += ` ORDER BY transaction_cids.index` - return results, tx.QueryRow(ctx, pgStr, args...).Scan(results) + return results, tx.Select(ctx, &results, pgStr, args...) } func topicFilterCondition(id *int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}) { @@ -245,8 +246,8 @@ func logFilterCondition(id *int, pgStr string, args []interface{}, rctFilter Rec } func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilter ReceiptFilter, txHashes []string) (string, []interface{}) { - rctCond := " AND (receipt_cids.id = ANY ( " - logQuery := "SELECT receipt_id FROM eth.log_cids WHERE" + rctCond := " AND (receipt_cids.tx_id = ANY ( " + logQuery := "SELECT rct_id FROM eth.log_cids WHERE" if len(rctFilter.LogAddresses) > 0 { // Filter on log contract addresses if there are any pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, *id) @@ -262,7 +263,7 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte // Filter on txHashes if there are any, and we are matching txs if rctFilter.MatchTxs && len(txHashes) > 0 { - pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::STRING[])`, *id) + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d)`, *id) args = append(args, pq.Array(txHashes)) } pgStr += ")" @@ -274,14 +275,14 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte pgStr += ")" // Filter on txHashes if there are any, and we are matching txs if rctFilter.MatchTxs && len(txHashes) > 0 { - pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::STRING[])`, *id) + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d)`, *id) args = append(args, pq.Array(txHashes)) } pgStr += ")" } else if rctFilter.MatchTxs && len(txHashes) > 0 { // If there are no contract addresses or topics to filter on, // Filter on txHashes if there are any, and we are matching txs - pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::STRING[])`, *id) + pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d)`, *id) args = append(args, pq.Array(txHashes)) } } @@ -294,12 +295,12 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(ctx context.Context, tx sql.Tx, rctFilter ReceiptFilter, headerID string, trxHashes []string) ([]models.ReceiptModel, error) { log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, + pgStr := `SELECT receipt_cids.tx_id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.id = $1` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_hash = $1` id := 2 args = append(args, headerID) @@ -307,7 +308,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(ctx context.Context, tx sql.T pgStr += ` ORDER BY transaction_cids.index` receiptCids := make([]models.ReceiptModel, 0) - return receiptCids, tx.QueryRow(ctx, pgStr, args...).Scan(receiptCids) + return receiptCids, tx.Select(ctx, &receiptCids, pgStr, args...) } // RetrieveFilteredGQLLogs retrieves and returns all the log cIDs provided blockHash that conform to the provided @@ -320,9 +321,9 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(ctx context.Context, tx sql.Tx, eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3, eth.log_cids.log_data, eth.transaction_cids.tx_hash, data, eth.receipt_cids.leaf_cid as cid, eth.receipt_cids.post_status FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, public.blocks - WHERE eth.log_cids.receipt_id = receipt_cids.id - AND receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE eth.log_cids.rct_id = receipt_cids.tx_id + AND receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND log_cids.leaf_mh_key = blocks.key AND header_cids.block_hash = $1` args = append(args, blockHash.String()) @@ -332,7 +333,7 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(ctx context.Context, tx sql.Tx, pgStr += ` ORDER BY log_cids.index` logCIDs := make([]LogResult, 0) - return logCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&logCIDs) + return logCIDs, tx.Select(ctx, &logCIDs, pgStr, args...) } // RetrieveFilteredLog retrieves and returns all the log cIDs provided blockHeight or blockHash that conform to the provided @@ -345,9 +346,9 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct eth.log_cids.log_data, eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index, header_cids.block_hash, header_cids.block_number FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE eth.log_cids.receipt_id = receipt_cids.id - AND receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id` + WHERE eth.log_cids.rct_id = receipt_cids.tx_id + AND receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash` id := 1 if blockNumber > 0 { pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id) @@ -364,7 +365,7 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct pgStr += ` ORDER BY log_cids.index` logCIDs := make([]LogResult, 0) - return logCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&logCIDs) + return logCIDs, tx.Select(ctx, &logCIDs, pgStr, args...) } // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided @@ -372,10 +373,10 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, txHashes []string) ([]models.ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) - pgStr := `SELECT receipt_cids.id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.tx_id + pgStr := `SELECT receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.tx_id FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash` id := 1 if blockNumber > 0 { pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id) @@ -408,10 +409,10 @@ func hasTopics(topics [][]string) bool { func (ecr *CIDRetriever) RetrieveStateCIDs(ctx context.Context, tx sql.Tx, stateFilter StateFilter, headerID string) ([]models.StateNodeModel, error) { log.Debug("retrieving state cids for header id ", headerID) args := make([]interface{}, 0, 2) - pgStr := `SELECT state_cids.id, state_cids.header_id, + pgStr := `SELECT state_cids.header_id, state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) - WHERE header_cids.id = $1` + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_hash = $1` args = append(args, headerID) addrLen := len(stateFilter.Addresses) if addrLen > 0 { @@ -426,15 +427,15 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(ctx context.Context, tx sql.Tx, state pgStr += ` AND state_cids.node_type = 2` } stateNodeCIDs := make([]models.StateNodeModel, 0) - return stateNodeCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&stateNodeCIDs) + return stateNodeCIDs, tx.Select(ctx, &stateNodeCIDs, pgStr, args...) } // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters func (ecr *CIDRetriever) RetrieveStorageCIDs(ctx context.Context, tx sql.Tx, storageFilter StorageFilter, headerID string) ([]models.StorageNodeWithStateKeyModel, error) { log.Debug("retrieving storage cids for header id ", headerID) args := make([]interface{}, 0, 3) - pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, - storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, state_cids.state_leaf_key + pgStr := `SELECT storage_cids.header_id, storage_cids.storage_leaf_key, storage_cids.node_type, + storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, storage_cids.state_path, state_cids.state_leaf_key FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path AND state_cids.header_id = header_cids.block_hash @@ -459,7 +460,7 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(ctx context.Context, tx sql.Tx, sto pgStr += ` AND storage_cids.node_type = 2` } storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0) - return storageNodeCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&storageNodeCIDs) + return storageNodeCIDs, tx.Select(ctx, &storageNodeCIDs, pgStr, args...) } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash @@ -571,9 +572,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(ctx context.Context, tx sql.Tx, pgStr := `SELECT * FROM eth.header_cids WHERE block_hash = $1` var headerCID models.HeaderModel - return headerCID, tx.QueryRow(ctx, pgStr, blockHash.String()).Scan(&headerCID.CID, &headerCID.NodeID, &headerCID.BlockHash, &headerCID.BlockNumber, - &headerCID.BaseFee, &headerCID.Bloom, &headerCID.MhKey, &headerCID.ParentHash, &headerCID.RctRoot, &headerCID.Reward, &headerCID.StateRoot, - &headerCID.Timestamp, &headerCID.TimesValidated, &headerCID.TotalDifficulty, &headerCID.TxRoot, &headerCID.UncleRoot) + return headerCID, tx.Get(ctx, &headerCID, pgStr, blockHash.String()) } // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id @@ -583,18 +582,18 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(ctx context.Context, tx sql.Tx WHERE header_id = $1 ORDER BY index` var txCIDs []models.TxModel - return txCIDs, tx.QueryRow(ctx, pgStr, headerID).(*sqlx.Row).Scan(&txCIDs) + return txCIDs, tx.Select(ctx, &txCIDs, pgStr, headerID) } // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(ctx context.Context, tx sql.Tx, txHashes []string) ([]models.ReceiptModel, error) { - log.Debugf("retrieving receipt cids for tx ids %v", txHashes) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, + log.Debugf("retrieving receipt cids for tx hashes %v", txHashes) + pgStr := `SELECT receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash FROM eth.receipt_cids, eth.transaction_cids - WHERE tx_id = ANY($1::INTEGER[]) - AND receipt_cids.tx_id = transaction_cids.id + WHERE tx_id = ANY($1) + AND receipt_cids.tx_id = transaction_cids.tx_hash ORDER BY transaction_cids.index` var rctCIDs []models.ReceiptModel - return rctCIDs, tx.QueryRow(ctx, pgStr, pq.Array(txHashes)).Scan(rctCIDs) + return rctCIDs, tx.Select(ctx, &rctCIDs, pgStr, pq.Array(txHashes)) } diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index 06fffda6..fea6def4 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -218,8 +218,14 @@ var _ = Describe("Retriever", func() { ) BeforeEach(func() { var err error - goodInfo := node.Info{GenesisBlock: "GENESIS2", NetworkID: "2", ID: "2", ClientName: "geth2", ChainID: 2} - db, err = eth.Setup(ctx, goodInfo) + testInfo := node.Info{ + GenesisBlock: test_helpers.Genesis.Hash().String(), + NetworkID: "2", + ID: "2", + ClientName: "geth", + ChainID: params.TestChainConfig.ChainID.Uint64(), + } + db, err = eth.Setup(ctx, testInfo) Expect(err).ToNot(HaveOccurred()) diffIndexer, err = sql.NewStateDiffIndexer(ctx, params.TestChainConfig, db) @@ -301,8 +307,8 @@ var _ = Describe("Retriever", func() { } expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0) pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 ORDER BY transaction_cids.index` err := db.Select(ctx, &expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64()) diff --git a/pkg/eth/eth_state_test.go b/pkg/eth/eth_state_test.go index 3f9f0014..a582e054 100644 --- a/pkg/eth/eth_state_test.go +++ b/pkg/eth/eth_state_test.go @@ -21,6 +21,7 @@ import ( "context" "io/ioutil" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -74,8 +75,15 @@ var _ = Describe("eth state reading tests", func() { It("test init", func() { // db and type initializations var err error - goodInfo := node.Info{GenesisBlock: "GENESIS3", NetworkID: "3", ID: "3", ClientName: "geth3", ChainID: 3} - db, err = eth.Setup(ctx, goodInfo) + testInfo := node.Info{ + GenesisBlock: test_helpers.Genesis.Hash().String(), + NetworkID: "3", + ID: "3", + ClientName: "geth", + ChainID: params.TestChainConfig.ChainID.Uint64(), + } + + db, err = eth.Setup(ctx, testInfo) Expect(err).ToNot(HaveOccurred()) transformer, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) @@ -180,6 +188,8 @@ var _ = Describe("eth state reading tests", func() { err = indexAndPublisher.PushCodeAndCodeHash(tx, hash) Expect(err).ToNot(HaveOccurred()) + // wait for tx batch process to complete. + time.Sleep(600 * time.Millisecond) err = tx.Submit(err) Expect(err).ToNot(HaveOccurred()) }) diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index fc211af4..c76615f5 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -40,8 +40,15 @@ var _ = Describe("IPLDFetcher", func() { err error tx interfaces.Batch ) - goodInfo := node.Info{GenesisBlock: "GENESIS4", NetworkID: "4", ID: "4", ClientName: "geth4", ChainID: 4} - db, err = eth.Setup(ctx, goodInfo) + testInfo := node.Info{ + GenesisBlock: test_helpers.Genesis.Hash().String(), + NetworkID: "4", + ID: "4", + ClientName: "geth", + ChainID: params.TestChainConfig.ChainID.Uint64(), + } + + db, err = eth.Setup(ctx, testInfo) Expect(err).ToNot(HaveOccurred()) pubAndIndexer, err = sql.NewStateDiffIndexer(ctx, params.TestChainConfig, db) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/eth/ipld_retriever.go b/pkg/eth/ipld_retriever.go index 0e07ff5b..30c337df 100644 --- a/pkg/eth/ipld_retriever.go +++ b/pkg/eth/ipld_retriever.go @@ -53,12 +53,12 @@ const ( WHERE block_hash = ANY($1::VARCHAR(66)[])` RetrieveUnclesByBlockHashPgStr = `SELECT uncle_cids.cid, data FROM eth.uncle_cids - INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) WHERE block_hash = $1` RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data FROM eth.uncle_cids - INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) WHERE block_number = $1` RetrieveUncleByHashPgStr = `SELECT cid, data @@ -71,13 +71,13 @@ const ( WHERE tx_hash = ANY($1::VARCHAR(66)[])` RetrieveTransactionsByBlockHashPgStr = `SELECT transaction_cids.cid, data FROM eth.transaction_cids - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data FROM eth.transaction_cids - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) WHERE block_number = $1 ORDER BY eth.transaction_cids.index ASC` @@ -87,42 +87,42 @@ const ( WHERE tx_hash = $1` RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id) + INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) WHERE tx_hash = ANY($1::VARCHAR(66)[])` RetrieveReceiptsByBlockHashPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id) - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) + INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id) - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) + INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) WHERE block_number = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id) + INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) WHERE tx_hash = $1` RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, data, state_cids.node_type FROM eth.state_cids - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 AND block_number <= (SELECT block_number FROM eth.header_cids WHERE block_hash = $2) - AND header_cids.id = (SELECT canonical_header_id(block_number)) + AND header_cids.block_hash = (SELECT canonical_header_id(block_number)) ORDER BY block_number DESC LIMIT 1` RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, data, state_cids.node_type FROM eth.state_cids - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 AND block_number <= $2 @@ -130,7 +130,7 @@ const ( LIMIT 1` RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr = `SELECT storage_cids.cid, data, storage_cids.node_type, was_state_leaf_removed($1, $3) AS state_leaf_removed FROM eth.storage_cids - INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path) + INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id) INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 @@ -140,7 +140,7 @@ const ( LIMIT 1` RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT storage_cids.cid, data, storage_cids.node_type, was_state_leaf_removed($1, $3) AS state_leaf_removed FROM eth.storage_cids - INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path) + INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id) INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 @@ -148,7 +148,7 @@ const ( AND block_number <= (SELECT block_number FROM eth.header_cids WHERE block_hash = $3) - AND header_cids.id = (SELECT canonical_header_id(block_number)) + AND header_cids.block_hash = (SELECT canonical_header_id(block_number)) ORDER BY block_number DESC LIMIT 1` ) diff --git a/pkg/eth/test_helpers.go b/pkg/eth/test_helpers.go index 162f1716..a83fd9b7 100644 --- a/pkg/eth/test_helpers.go +++ b/pkg/eth/test_helpers.go @@ -28,7 +28,7 @@ import ( ) func Setup(ctx context.Context, info node.Info) (sql.Database, error) { - driver, err := postgres.NewPGXDriver(ctx, getConfig(), info) + driver, err := postgres.NewSQLXDriver(ctx, getConfig(), info) Expect(err).NotTo(HaveOccurred()) return postgres.NewPostgresDB(driver), nil } diff --git a/pkg/eth/types.go b/pkg/eth/types.go index 9b6b6ca7..5118eda2 100644 --- a/pkg/eth/types.go +++ b/pkg/eth/types.go @@ -248,7 +248,7 @@ type ConvertedPayload struct { // LogResult represent a log. type LogResult struct { LeafCID string `db:"leaf_cid"` - ReceiptID int64 `db:"rct_id"` + ReceiptID string `db:"rct_id"` Address string `db:"address"` Index int64 `db:"index"` Data []byte `db:"log_data"` diff --git a/pkg/graphql/graphql_test.go b/pkg/graphql/graphql_test.go index c370ff46..33aa0203 100644 --- a/pkg/graphql/graphql_test.go +++ b/pkg/graphql/graphql_test.go @@ -66,8 +66,14 @@ var _ = Describe("GraphQL", func() { It("test init", func() { var err error - goodInfo := node.Info{GenesisBlock: "GENESIS5", NetworkID: "5", ID: "5", ClientName: "geth5", ChainID: 5} - db, err = eth.Setup(ctx, goodInfo) + testInfo := node.Info{ + GenesisBlock: test_helpers.Genesis.Hash().String(), + NetworkID: "5", + ID: "5", + ClientName: "geth", + ChainID: params.TestChainConfig.ChainID.Uint64(), + } + db, err = eth.Setup(ctx, testInfo) Expect(err).ToNot(HaveOccurred()) transformer, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index 5845eb6a..8d820b6c 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -17,7 +17,7 @@ package prom import ( - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -72,7 +72,7 @@ func Init() { } // RegisterDBCollector create metric colletor for given connection -func RegisterDBCollector(name string, db *postgres.PGXDriver) { +func RegisterDBCollector(name string, db sql.Database) { if metrics { prometheus.Register(NewDBStatsCollector(name, db)) } diff --git a/pkg/serve/config.go b/pkg/serve/config.go index dad0c215..cee7d8b4 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -191,13 +191,13 @@ func NewConfig() (*Config, error) { c.IpldGraphqlEnabled = ipldGraphqlEnabled overrideDBConnConfig(&c.DBConfig) - driver, err := postgres.NewPGXDriver(context.Background(), c.DBConfig, nodeInfo) + driver, err := postgres.NewSQLXDriver(context.Background(), c.DBConfig, nodeInfo) if err != nil { return nil, err } - prom.RegisterDBCollector(c.DBConfig.DatabaseName, driver) c.DB = postgres.NewPostgresDB(driver) + prom.RegisterDBCollector(c.DBConfig.DatabaseName, c.DB) defaultSenderStr := viper.GetString("ethereum.defaultSender") if defaultSenderStr != "" {