Updates to use v4 db schema #152

Merged
ashwinphatak merged 7 commits from pm-v4-single-node into sharding 2022-05-20 12:00:36 +00:00
3 changed files with 173 additions and 39 deletions
Showing only changes of commit 867323a1ab - Show all commits

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -102,7 +103,12 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) {
// FetchHeaders fetches headers
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
if err != nil {
return models.IPLDModel{}, err
}
headerBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
if err != nil {
return models.IPLDModel{}, err
}
@ -117,7 +123,11 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]mode
log.Debug("fetching uncle iplds")
uncleIPLDs := make([]models.IPLDModel, len(cids))
for i, c := range cids {
uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
uncleBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
if err != nil {
return nil, err
}
@ -134,7 +144,11 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]models.IP
log.Debug("fetching transaction iplds")
trxIPLDs := make([]models.IPLDModel, len(cids))
for i, c := range cids {
txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
txBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
if err != nil {
return nil, err
}
@ -151,7 +165,11 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]mode
log.Debug("fetching receipt iplds")
rctIPLDs := make([]models.IPLDModel, len(cids))
for i, c := range cids {
rctBytes, err := shared.FetchIPLDByMhKey(tx, c.LeafMhKey)
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
rctBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.LeafMhKey, blockNumber)
if err != nil {
return nil, err
}
@ -172,7 +190,11 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]S
if stateNode.CID == "" {
continue
}
stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey)
blockNumber, err := strconv.ParseUint(stateNode.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
stateBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, stateNode.MhKey, blockNumber)
if err != nil {
return nil, err
}
@ -197,7 +219,11 @@ func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithSta
if storageNode.CID == "" || storageNode.StateKey == "" {
continue
}
storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey)
blockNumber, err := strconv.ParseUint(storageNode.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
storageBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, storageNode.MhKey, blockNumber)
if err != nil {
return nil, err
}

View File

@ -36,82 +36,163 @@ const (
RetrieveHeadersByHashesPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
header_cids.mh_key = blocks.key
AND header_cids.block_number = blocks.block_number
)
WHERE block_hash = ANY($1::VARCHAR(66)[])`
RetrieveHeadersByBlockNumberPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
header_cids.mh_key = blocks.key
AND header_cids.block_number = blocks.block_number
)
WHERE block_number = $1`
RetrieveHeaderByHashPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
header_cids.mh_key = blocks.key
AND header_cids.block_number = blocks.block_number
)
WHERE block_hash = $1`
RetrieveUnclesByHashesPgStr = `SELECT cid, data
FROM eth.uncle_cids
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_hash = ANY($1::VARCHAR(66)[])`
RetrieveUnclesByBlockHashPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
uncle_cids.header_id = header_cids.block_hash
AND uncle_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_hash = $1`
RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
uncle_cids.header_id = header_cids.block_hash
AND uncle_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_number = $1`
RetrieveUncleByHashPgStr = `SELECT cid, data
FROM eth.uncle_cids
INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_hash = $1`
RetrieveTransactionsByHashesPgStr = `SELECT cid, data
FROM eth.transaction_cids
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE tx_hash = ANY($1::VARCHAR(66)[])`
RetrieveTransactionsByBlockHashPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE block_number = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveTransactionByHashPgStr = `SELECT cid, data
FROM eth.transaction_cids
INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key)
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE tx_hash = $1`
RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE tx_hash = ANY($1::VARCHAR(66)[])`
RetrieveReceiptsByBlockHashPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE block_number = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash)
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key)
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE tx_hash = $1`
RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, data, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
state_cids.mh_key = blocks.key
AND state_cids.block_number = blocks.block_number
)
WHERE state_leaf_key = $1
AND block_number <= (SELECT block_number
FROM eth.header_cids
@ -121,8 +202,14 @@ const (
LIMIT 1`
RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, data, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key)
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
state_cids.mh_key = blocks.key
AND state_cids.block_number = blocks.block_number
)
WHERE state_leaf_key = $1
AND block_number <= $2
ORDER BY block_number DESC
@ -132,9 +219,16 @@ const (
INNER JOIN eth.state_cids ON (
storage_cids.header_id = state_cids.header_id
AND storage_cids.state_path = state_cids.state_path
AND storage_cids.block_number = state_cids.block_number
)
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
storage_cids.mh_key = blocks.key
AND storage_cids.block_number = blocks.block_number
)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
AND storage_leaf_key = $2
AND block_number <= $3
@ -145,9 +239,16 @@ const (
INNER JOIN eth.state_cids ON (
storage_cids.header_id = state_cids.header_id
AND storage_cids.state_path = state_cids.state_path
AND storage_cids.block_number = state_cids.block_number
)
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
storage_cids.mh_key = blocks.key
AND storage_cids.block_number = blocks.block_number
)
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key)
WHERE state_leaf_key = $1
AND storage_leaf_key = $2
AND block_number <= (SELECT block_number

View File

@ -77,6 +77,13 @@ func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) {
return block, tx.Get(&block, pgStr, mhKey)
}
// FetchIPLDByMhKeyAndBlockNumber is used to retrieve an ipld from Postgres blockstore with the provided tx, mhkey string and blockNumber
func FetchIPLDByMhKeyAndBlockNumber(tx *sqlx.Tx, mhKey string, blockNumber uint64) ([]byte, error) {
pgStr := `SELECT data FROM public.blocks WHERE key = $1 AND block_number = $2`
var block []byte
return block, tx.Get(&block, pgStr, mhKey, blockNumber)
}
// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string
func MultihashKeyFromCID(c cid.Cid) string {
dbKey := dshelp.MultihashToDsKey(c.Hash())