WIP: Refactor and update to use new schema. #118

Closed
arijitAD wants to merge 4 commits from postgres_refactor into master
14 changed files with 140 additions and 104 deletions
Showing only changes of commit 15184202f5 - Show all commits

View File

@ -18,7 +18,7 @@ services:
ipld-eth-db:
restart: always
image: vulcanize/ipld-eth-db:v0.2.0
image: vulcanize/ipld-eth-db:latest
environment:
POSTGRES_USER: "vdbm"
POSTGRES_DB: "vulcanize_testing"

1
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipld-format v0.2.0
github.com/jackc/pgx/v4 v4.13.0
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.10.2
github.com/machinebox/graphql v0.2.2

View File

@ -196,9 +196,14 @@ var _ = Describe("API", func() {
tx interfaces.Batch
)
goodInfo := node.Info{GenesisBlock: "GENESIS1", NetworkID: "1", ID: "1", ClientName: "geth5", ChainID: 1}
// DefaultConfig are default parameters for connecting to a Postgres sql
db, err = eth.Setup(ctx, goodInfo)
testInfo := node.Info{
GenesisBlock: test_helpers.Genesis.Hash().String(),
NetworkID: "1",
ID: "1",
ClientName: "geth",
ChainID: params.TestChainConfig.ChainID.Uint64(),
}
db, err = eth.Setup(ctx, testInfo)
Expect(err).ToNot(HaveOccurred())
indexAndPublisher, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
Expect(err).ToNot(HaveOccurred())
@ -220,11 +225,6 @@ var _ = Describe("API", func() {
tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred())
for _, node := range test_helpers.MockStateNodes {
err = indexAndPublisher.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String())
Expect(err).ToNot(HaveOccurred())
}
ccHash := sdtypes.CodeAndCodeHash{
Hash: test_helpers.ContractCodeHash,
Code: test_helpers.ContractCode,
@ -233,6 +233,11 @@ var _ = Describe("API", func() {
err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash)
Expect(err).ToNot(HaveOccurred())
for _, node := range test_helpers.MockStateNodes {
err = indexAndPublisher.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String())
Expect(err).ToNot(HaveOccurred())
}
err = tx.Submit(err)
Expect(err).ToNot(HaveOccurred())
@ -272,7 +277,7 @@ var _ = Describe("API", func() {
It("Throws an error if a header cannot be found", func() {
header, err := api.GetHeaderByNumber(ctx, wrongNumber)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no rows in result set"))
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
Expect(header).To(BeNil())
})
})

View File

@ -18,6 +18,7 @@ package eth
import (
"context"
dbsql "database/sql"
"errors"
"fmt"
"math/big"
@ -60,21 +61,20 @@ var (
// errMissingSignature is returned if a block's extra-data section doesn't seem
// to contain a 65 byte secp256k1 signature.
errMissingSignature = errors.New("extra-data 65 byte signature suffix missing")
errNoRows = errors.New("sql: no rows in result set")
)
const (
RetrieveCanonicalBlockHashByNumber = `SELECT block_hash FROM eth.header_cids
INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key)
WHERE id = (SELECT canonical_header_id($1))`
WHERE block_hash = (SELECT canonical_header_id($1))`
RetrieveCanonicalHeaderByNumber = `SELECT cid, data FROM eth.header_cids
INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key)
WHERE id = (SELECT canonical_header_id($1))`
WHERE block_hash = (SELECT canonical_header_id($1))`
RetrieveTD = `SELECT td FROM eth.header_cids
WHERE header_cids.block_hash = $1`
RetrieveRPCTransaction = `SELECT blocks.data, block_hash, block_number, index FROM public.blocks, eth.transaction_cids, eth.header_cids
WHERE blocks.key = transaction_cids.mh_key
AND transaction_cids.header_id = header_cids.id
AND transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.tx_hash = $1`
RetrieveCodeHashByLeafKeyAndBlockHash = `SELECT code_hash FROM eth.state_accounts, eth.state_cids, eth.header_cids
WHERE state_accounts.header_id = state_cids.header_id AND state_accounts.state_path = state_cids.state_path
@ -83,7 +83,7 @@ const (
AND block_number <= (SELECT block_number
FROM eth.header_cids
WHERE block_hash = $2)
AND header_cids.id = (SELECT canonical_header_id(block_number))
AND header_cids.block_hash = (SELECT canonical_header_id(block_number))
ORDER BY block_number DESC
LIMIT 1`
RetrieveCodeByMhKey = `SELECT data FROM public.blocks WHERE key = $1`
@ -293,7 +293,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
// Get the canonical hash
canonicalHash, err := b.GetCanonicalHash(ctx, uint64(number))
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -302,7 +302,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
// TODO: optimize this by retrieving iplds directly rather than the cids first (this is remanent from when we fetched iplds through ipfs blockservice interface)
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(ctx, canonicalHash)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -328,7 +328,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
var headerIPLD models.IPLDModel
headerIPLD, err = b.Fetcher.FetchHeader(ctx, tx, headerCID)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -398,7 +398,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
// Retrieve all the CIDs for the block
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(ctx, hash)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -424,7 +424,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
var headerIPLD models.IPLDModel
headerIPLD, err = b.Fetcher.FetchHeader(ctx, tx, headerCID)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -438,7 +438,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
var uncleIPLDs []models.IPLDModel
uncleIPLDs, err = b.Fetcher.FetchUncles(ctx, tx, uncleCIDs)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -456,7 +456,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
var txIPLDs []models.IPLDModel
txIPLDs, err = b.Fetcher.FetchTrxs(ctx, tx, txCIDs)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -474,7 +474,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
var rctIPLDs []models.IPLDModel
rctIPLDs, err = b.Fetcher.FetchRcts(ctx, tx, rctCIDs)
if err != nil {
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, nil
}
return nil, err
@ -659,7 +659,7 @@ func (b *Backend) GetAccountByNumber(ctx context.Context, address common.Address
return nil, errPendingBlockNumber
}
hash, err := b.GetCanonicalHash(ctx, uint64(number))
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, errHeaderNotFound
} else if err != nil {
return nil, err
@ -671,7 +671,7 @@ func (b *Backend) GetAccountByNumber(ctx context.Context, address common.Address
// GetAccountByHash returns the account object for the provided address at the block with the provided hash
func (b *Backend) GetAccountByHash(ctx context.Context, address common.Address, hash common.Hash) (*types.StateAccount, error) {
_, err := b.HeaderByHash(context.Background(), hash)
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, errHeaderHashNotFound
} else if err != nil {
return nil, err
@ -745,17 +745,19 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
err = tx.Commit(ctx)
}
}()
err = tx.QueryRow(ctx, RetrieveCodeHashByLeafKeyAndBlockHash, leafKey.Hex(), hash.Hex()).Scan(&codeHash)
err = tx.Get(ctx, &codeHash, RetrieveCodeHashByLeafKeyAndBlockHash, leafKey.Hex(), hash.Hex())
if err != nil {
return nil, err
}
var mhKey string
mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
if err != nil {
return nil, err
}
code := make([]byte, 0)
err = tx.QueryRow(ctx, RetrieveCodeByMhKey, mhKey).Scan(&code)
err = tx.Get(ctx, &code, RetrieveCodeByMhKey, mhKey)
return code, err
}
@ -790,7 +792,7 @@ func (b *Backend) GetStorageByNumber(ctx context.Context, address common.Address
return nil, errPendingBlockNumber
}
hash, err := b.GetCanonicalHash(ctx, uint64(number))
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, errHeaderNotFound
} else if err != nil {
return nil, err
@ -802,7 +804,7 @@ func (b *Backend) GetStorageByNumber(ctx context.Context, address common.Address
// GetStorageByHash returns the storage value for the provided contract address an storage key at the block corresponding to the provided hash
func (b *Backend) GetStorageByHash(ctx context.Context, address common.Address, key, hash common.Hash) (hexutil.Bytes, error) {
_, err := b.HeaderByHash(context.Background(), hash)
if err == errNoRows {
if err == dbsql.ErrNoRows {
return nil, errHeaderHashNotFound
} else if err != nil {
return nil, err

View File

@ -171,7 +171,8 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(ctx context.Context, tx sql.Tx, bloc
headers := make([]models.HeaderModel, 0)
pgStr := `SELECT * FROM eth.header_cids
WHERE block_number = $1`
return headers, tx.QueryRow(ctx, pgStr, blockNumber).Scan(&headers)
return headers, tx.Select(ctx, &headers, pgStr, blockNumber)
}
// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header
@ -181,7 +182,7 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(ctx context.Context, tx sql
pgStr := `SELECT * FROM eth.uncle_cids
WHERE header_id = $1`
return headers, tx.QueryRow(ctx, pgStr, headerID).Scan(headers)
return headers, tx.Select(ctx, &headers, pgStr, headerID)
}
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
@ -191,11 +192,11 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(ctx context.Context, tx sql.Tx, txFilter
args := make([]interface{}, 0, 3)
results := make([]models.TxModel, 0)
id := 1
pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id,
pgStr := fmt.Sprintf(`SELECT transaction_cids.tx_hash, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key,
transaction_cids.dst, transaction_cids.src, transaction_cids.index, transaction_cids.tx_data
FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.id = $%d`, id)
FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_hash = $%d`, id)
args = append(args, headerID)
id++
if len(txFilter.Dst) > 0 {
@ -208,7 +209,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(ctx context.Context, tx sql.Tx, txFilter
args = append(args, pq.Array(txFilter.Src))
}
pgStr += ` ORDER BY transaction_cids.index`
return results, tx.QueryRow(ctx, pgStr, args...).Scan(results)
return results, tx.Select(ctx, &results, pgStr, args...)
}
func topicFilterCondition(id *int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}) {
@ -245,8 +246,8 @@ func logFilterCondition(id *int, pgStr string, args []interface{}, rctFilter Rec
}
func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilter ReceiptFilter, txHashes []string) (string, []interface{}) {
rctCond := " AND (receipt_cids.id = ANY ( "
logQuery := "SELECT receipt_id FROM eth.log_cids WHERE"
rctCond := " AND (receipt_cids.tx_id = ANY ( "
logQuery := "SELECT rct_id FROM eth.log_cids WHERE"
if len(rctFilter.LogAddresses) > 0 {
// Filter on log contract addresses if there are any
pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, *id)
@ -262,7 +263,7 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte
// Filter on txHashes if there are any, and we are matching txs
if rctFilter.MatchTxs && len(txHashes) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::STRING[])`, *id)
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d)`, *id)
args = append(args, pq.Array(txHashes))
}
pgStr += ")"
@ -274,14 +275,14 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte
pgStr += ")"
// Filter on txHashes if there are any, and we are matching txs
if rctFilter.MatchTxs && len(txHashes) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::STRING[])`, *id)
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d)`, *id)
args = append(args, pq.Array(txHashes))
}
pgStr += ")"
} else if rctFilter.MatchTxs && len(txHashes) > 0 {
// If there are no contract addresses or topics to filter on,
// Filter on txHashes if there are any, and we are matching txs
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::STRING[])`, *id)
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d)`, *id)
args = append(args, pq.Array(txHashes))
}
}
@ -294,12 +295,12 @@ func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilte
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(ctx context.Context, tx sql.Tx, rctFilter ReceiptFilter, headerID string, trxHashes []string) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key,
pgStr := `SELECT receipt_cids.tx_id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key,
receipt_cids.contract, receipt_cids.contract_hash
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.id = $1`
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_hash = $1`
id := 2
args = append(args, headerID)
@ -307,7 +308,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(ctx context.Context, tx sql.T
pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.QueryRow(ctx, pgStr, args...).Scan(receiptCids)
return receiptCids, tx.Select(ctx, &receiptCids, pgStr, args...)
}
// RetrieveFilteredGQLLogs retrieves and returns all the log cIDs provided blockHash that conform to the provided
@ -320,9 +321,9 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(ctx context.Context, tx sql.Tx,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.log_cids.log_data, eth.transaction_cids.tx_hash, data, eth.receipt_cids.leaf_cid as cid, eth.receipt_cids.post_status
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, public.blocks
WHERE eth.log_cids.receipt_id = receipt_cids.id
AND receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
WHERE eth.log_cids.rct_id = receipt_cids.tx_id
AND receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash
AND log_cids.leaf_mh_key = blocks.key AND header_cids.block_hash = $1`
args = append(args, blockHash.String())
@ -332,7 +333,7 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(ctx context.Context, tx sql.Tx,
pgStr += ` ORDER BY log_cids.index`
logCIDs := make([]LogResult, 0)
return logCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&logCIDs)
return logCIDs, tx.Select(ctx, &logCIDs, pgStr, args...)
}
// RetrieveFilteredLog retrieves and returns all the log cIDs provided blockHeight or blockHash that conform to the provided
@ -345,9 +346,9 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct
eth.log_cids.log_data, eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
header_cids.block_hash, header_cids.block_number
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE eth.log_cids.receipt_id = receipt_cids.id
AND receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id`
WHERE eth.log_cids.rct_id = receipt_cids.tx_id
AND receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash`
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
@ -364,7 +365,7 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct
pgStr += ` ORDER BY log_cids.index`
logCIDs := make([]LogResult, 0)
return logCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&logCIDs)
return logCIDs, tx.Select(ctx, &logCIDs, pgStr, args...)
}
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
@ -372,10 +373,10 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(ctx context.Context, tx sql.Tx, rct
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, txHashes []string) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.tx_id
pgStr := `SELECT receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.tx_id
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id`
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash`
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
@ -408,10 +409,10 @@ func hasTopics(topics [][]string) bool {
func (ecr *CIDRetriever) RetrieveStateCIDs(ctx context.Context, tx sql.Tx, stateFilter StateFilter, headerID string) ([]models.StateNodeModel, error) {
log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id,
pgStr := `SELECT state_cids.header_id,
state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.id = $1`
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_hash = $1`
args = append(args, headerID)
addrLen := len(stateFilter.Addresses)
if addrLen > 0 {
@ -426,15 +427,15 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(ctx context.Context, tx sql.Tx, state
pgStr += ` AND state_cids.node_type = 2`
}
stateNodeCIDs := make([]models.StateNodeModel, 0)
return stateNodeCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&stateNodeCIDs)
return stateNodeCIDs, tx.Select(ctx, &stateNodeCIDs, pgStr, args...)
}
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(ctx context.Context, tx sql.Tx, storageFilter StorageFilter, headerID string) ([]models.StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type,
storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, state_cids.state_leaf_key
pgStr := `SELECT storage_cids.header_id, storage_cids.storage_leaf_key, storage_cids.node_type,
storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, storage_cids.state_path, state_cids.state_leaf_key
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path
AND state_cids.header_id = header_cids.block_hash
@ -459,7 +460,7 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(ctx context.Context, tx sql.Tx, sto
pgStr += ` AND storage_cids.node_type = 2`
}
storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0)
return storageNodeCIDs, tx.QueryRow(ctx, pgStr, args...).Scan(&storageNodeCIDs)
return storageNodeCIDs, tx.Select(ctx, &storageNodeCIDs, pgStr, args...)
}
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
@ -571,9 +572,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(ctx context.Context, tx sql.Tx,
pgStr := `SELECT * FROM eth.header_cids
WHERE block_hash = $1`
var headerCID models.HeaderModel
return headerCID, tx.QueryRow(ctx, pgStr, blockHash.String()).Scan(&headerCID.CID, &headerCID.NodeID, &headerCID.BlockHash, &headerCID.BlockNumber,
&headerCID.BaseFee, &headerCID.Bloom, &headerCID.MhKey, &headerCID.ParentHash, &headerCID.RctRoot, &headerCID.Reward, &headerCID.StateRoot,
&headerCID.Timestamp, &headerCID.TimesValidated, &headerCID.TotalDifficulty, &headerCID.TxRoot, &headerCID.UncleRoot)
return headerCID, tx.Get(ctx, &headerCID, pgStr, blockHash.String())
}
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
@ -583,18 +582,18 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(ctx context.Context, tx sql.Tx
WHERE header_id = $1
ORDER BY index`
var txCIDs []models.TxModel
return txCIDs, tx.QueryRow(ctx, pgStr, headerID).(*sqlx.Row).Scan(&txCIDs)
return txCIDs, tx.Select(ctx, &txCIDs, pgStr, headerID)
}
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(ctx context.Context, tx sql.Tx, txHashes []string) ([]models.ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txHashes)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key,
log.Debugf("retrieving receipt cids for tx hashes %v", txHashes)
pgStr := `SELECT receipt_cids.tx_id, receipt_cids.leaf_cid, receipt_cids.leaf_mh_key,
receipt_cids.contract, receipt_cids.contract_hash
FROM eth.receipt_cids, eth.transaction_cids
WHERE tx_id = ANY($1::INTEGER[])
AND receipt_cids.tx_id = transaction_cids.id
WHERE tx_id = ANY($1)
AND receipt_cids.tx_id = transaction_cids.tx_hash
ORDER BY transaction_cids.index`
var rctCIDs []models.ReceiptModel
return rctCIDs, tx.QueryRow(ctx, pgStr, pq.Array(txHashes)).Scan(rctCIDs)
return rctCIDs, tx.Select(ctx, &rctCIDs, pgStr, pq.Array(txHashes))
}

View File

@ -218,8 +218,14 @@ var _ = Describe("Retriever", func() {
)
BeforeEach(func() {
var err error
goodInfo := node.Info{GenesisBlock: "GENESIS2", NetworkID: "2", ID: "2", ClientName: "geth2", ChainID: 2}
db, err = eth.Setup(ctx, goodInfo)
testInfo := node.Info{
GenesisBlock: test_helpers.Genesis.Hash().String(),
NetworkID: "2",
ID: "2",
ClientName: "geth",
ChainID: params.TestChainConfig.ChainID.Uint64(),
}
db, err = eth.Setup(ctx, testInfo)
Expect(err).ToNot(HaveOccurred())
diffIndexer, err = sql.NewStateDiffIndexer(ctx, params.TestChainConfig, db)
@ -301,8 +307,8 @@ var _ = Describe("Retriever", func() {
}
expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0)
pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err := db.Select(ctx, &expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64())

View File

@ -21,6 +21,7 @@ import (
"context"
"io/ioutil"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
@ -74,8 +75,15 @@ var _ = Describe("eth state reading tests", func() {
It("test init", func() {
// db and type initializations
var err error
goodInfo := node.Info{GenesisBlock: "GENESIS3", NetworkID: "3", ID: "3", ClientName: "geth3", ChainID: 3}
db, err = eth.Setup(ctx, goodInfo)
testInfo := node.Info{
GenesisBlock: test_helpers.Genesis.Hash().String(),
NetworkID: "3",
ID: "3",
ClientName: "geth",
ChainID: params.TestChainConfig.ChainID.Uint64(),
}
db, err = eth.Setup(ctx, testInfo)
Expect(err).ToNot(HaveOccurred())
transformer, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
@ -180,6 +188,8 @@ var _ = Describe("eth state reading tests", func() {
err = indexAndPublisher.PushCodeAndCodeHash(tx, hash)
Expect(err).ToNot(HaveOccurred())
// wait for tx batch process to complete.
time.Sleep(600 * time.Millisecond)
err = tx.Submit(err)
Expect(err).ToNot(HaveOccurred())
})

View File

@ -40,8 +40,15 @@ var _ = Describe("IPLDFetcher", func() {
err error
tx interfaces.Batch
)
goodInfo := node.Info{GenesisBlock: "GENESIS4", NetworkID: "4", ID: "4", ClientName: "geth4", ChainID: 4}
db, err = eth.Setup(ctx, goodInfo)
testInfo := node.Info{
GenesisBlock: test_helpers.Genesis.Hash().String(),
NetworkID: "4",
ID: "4",
ClientName: "geth",
ChainID: params.TestChainConfig.ChainID.Uint64(),
}
db, err = eth.Setup(ctx, testInfo)
Expect(err).ToNot(HaveOccurred())
pubAndIndexer, err = sql.NewStateDiffIndexer(ctx, params.TestChainConfig, db)
Expect(err).ToNot(HaveOccurred())

View File

@ -53,12 +53,12 @@ const (
WHERE block_hash = ANY($1::VARCHAR(66)[])`
RetrieveUnclesByBlockHashPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
WHERE block_hash = $1`
RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
WHERE block_number = $1`
RetrieveUncleByHashPgStr = `SELECT cid, data
@ -71,13 +71,13 @@ const (
WHERE tx_hash = ANY($1::VARCHAR(66)[])`
RetrieveTransactionsByBlockHashPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
WHERE block_number = $1
ORDER BY eth.transaction_cids.index ASC`
@ -87,42 +87,42 @@ const (
WHERE tx_hash = $1`
RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id)
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
WHERE tx_hash = ANY($1::VARCHAR(66)[])`
RetrieveReceiptsByBlockHashPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
WHERE block_number = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.id)
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
WHERE tx_hash = $1`
RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, data, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
AND block_number <= (SELECT block_number
FROM eth.header_cids
WHERE block_hash = $2)
AND header_cids.id = (SELECT canonical_header_id(block_number))
AND header_cids.block_hash = (SELECT canonical_header_id(block_number))
ORDER BY block_number DESC
LIMIT 1`
RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, data, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
AND block_number <= $2
@ -130,7 +130,7 @@ const (
LIMIT 1`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr = `SELECT storage_cids.cid, data, storage_cids.node_type, was_state_leaf_removed($1, $3) AS state_leaf_removed
FROM eth.storage_cids
INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path)
INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
@ -140,7 +140,7 @@ const (
LIMIT 1`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT storage_cids.cid, data, storage_cids.node_type, was_state_leaf_removed($1, $3) AS state_leaf_removed
FROM eth.storage_cids
INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path)
INNER JOIN eth.state_cids ON (storage_cids.header_id = state_cids.header_id)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
@ -148,7 +148,7 @@ const (
AND block_number <= (SELECT block_number
FROM eth.header_cids
WHERE block_hash = $3)
AND header_cids.id = (SELECT canonical_header_id(block_number))
AND header_cids.block_hash = (SELECT canonical_header_id(block_number))
ORDER BY block_number DESC
LIMIT 1`
)

View File

@ -28,7 +28,7 @@ import (
)
func Setup(ctx context.Context, info node.Info) (sql.Database, error) {
driver, err := postgres.NewPGXDriver(ctx, getConfig(), info)
driver, err := postgres.NewSQLXDriver(ctx, getConfig(), info)
Expect(err).NotTo(HaveOccurred())
return postgres.NewPostgresDB(driver), nil
}

View File

@ -248,7 +248,7 @@ type ConvertedPayload struct {
// LogResult represent a log.
type LogResult struct {
LeafCID string `db:"leaf_cid"`
ReceiptID int64 `db:"rct_id"`
ReceiptID string `db:"rct_id"`
Address string `db:"address"`
Index int64 `db:"index"`
Data []byte `db:"log_data"`

View File

@ -66,8 +66,14 @@ var _ = Describe("GraphQL", func() {
It("test init", func() {
var err error
goodInfo := node.Info{GenesisBlock: "GENESIS5", NetworkID: "5", ID: "5", ClientName: "geth5", ChainID: 5}
db, err = eth.Setup(ctx, goodInfo)
testInfo := node.Info{
GenesisBlock: test_helpers.Genesis.Hash().String(),
NetworkID: "5",
ID: "5",
ClientName: "geth",
ChainID: params.TestChainConfig.ChainID.Uint64(),
}
db, err = eth.Setup(ctx, testInfo)
Expect(err).ToNot(HaveOccurred())
transformer, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)

View File

@ -17,7 +17,7 @@
package prom
import (
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
@ -72,7 +72,7 @@ func Init() {
}
// RegisterDBCollector create metric colletor for given connection
func RegisterDBCollector(name string, db *postgres.PGXDriver) {
func RegisterDBCollector(name string, db sql.Database) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}

View File

@ -191,13 +191,13 @@ func NewConfig() (*Config, error) {
c.IpldGraphqlEnabled = ipldGraphqlEnabled
overrideDBConnConfig(&c.DBConfig)
driver, err := postgres.NewPGXDriver(context.Background(), c.DBConfig, nodeInfo)
driver, err := postgres.NewSQLXDriver(context.Background(), c.DBConfig, nodeInfo)
if err != nil {
return nil, err
}
prom.RegisterDBCollector(c.DBConfig.DatabaseName, driver)
c.DB = postgres.NewPostgresDB(driver)
prom.RegisterDBCollector(c.DBConfig.DatabaseName, c.DB)
defaultSenderStr := viper.GetString("ethereum.defaultSender")
if defaultSenderStr != "" {