From 867323a1ab37f24bc130c04122cdc2008710bc06 Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Mon, 25 Apr 2022 17:07:11 +0530 Subject: [PATCH] Update queries to get data from IPLD blocks table --- pkg/eth/ipld_fetcher.go | 38 +++++++-- pkg/eth/ipld_retriever.go | 167 ++++++++++++++++++++++++++++++-------- pkg/shared/functions.go | 7 ++ 3 files changed, 173 insertions(+), 39 deletions(-) diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index 8396d368..47bd34ac 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/big" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -102,7 +103,12 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) { // FetchHeaders fetches headers func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) { log.Debug("fetching header ipld") - headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) + blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) + if err != nil { + return models.IPLDModel{}, err + } + + headerBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) if err != nil { return models.IPLDModel{}, err } @@ -117,7 +123,11 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]mode log.Debug("fetching uncle iplds") uncleIPLDs := make([]models.IPLDModel, len(cids)) for i, c := range cids { - uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) + blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + uncleBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) if err != nil { return nil, err } @@ -134,7 +144,11 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]models.IP log.Debug("fetching transaction iplds") trxIPLDs := make([]models.IPLDModel, len(cids)) for i, c := range cids { - txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) + blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + txBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) if err != nil { return nil, err } @@ -151,7 +165,11 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]mode log.Debug("fetching receipt iplds") rctIPLDs := make([]models.IPLDModel, len(cids)) for i, c := range cids { - rctBytes, err := shared.FetchIPLDByMhKey(tx, c.LeafMhKey) + blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + rctBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.LeafMhKey, blockNumber) if err != nil { return nil, err } @@ -172,7 +190,11 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]S if stateNode.CID == "" { continue } - stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey) + blockNumber, err := strconv.ParseUint(stateNode.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + stateBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, stateNode.MhKey, blockNumber) if err != nil { return nil, err } @@ -197,7 +219,11 @@ func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithSta if storageNode.CID == "" || storageNode.StateKey == "" { continue } - storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey) + blockNumber, err := strconv.ParseUint(storageNode.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + storageBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, storageNode.MhKey, blockNumber) if err != nil { return nil, err } diff --git a/pkg/eth/ipld_retriever.go b/pkg/eth/ipld_retriever.go index 3b6d4073..9caa763d 100644 --- a/pkg/eth/ipld_retriever.go +++ b/pkg/eth/ipld_retriever.go @@ -36,82 +36,163 @@ const ( RetrieveHeadersByHashesPgStr = `SELECT cid, data FROM eth.header_cids - INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + header_cids.mh_key = blocks.key + AND header_cids.block_number = blocks.block_number + ) WHERE block_hash = ANY($1::VARCHAR(66)[])` RetrieveHeadersByBlockNumberPgStr = `SELECT cid, data FROM eth.header_cids - INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + header_cids.mh_key = blocks.key + AND header_cids.block_number = blocks.block_number + ) WHERE block_number = $1` RetrieveHeaderByHashPgStr = `SELECT cid, data FROM eth.header_cids - INNER JOIN public.blocks ON (header_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + header_cids.mh_key = blocks.key + AND header_cids.block_number = blocks.block_number + ) WHERE block_hash = $1` RetrieveUnclesByHashesPgStr = `SELECT cid, data FROM eth.uncle_cids - INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + uncle_cids.mh_key = blocks.key + AND uncle_cids.block_number = blocks.block_number + ) WHERE block_hash = ANY($1::VARCHAR(66)[])` RetrieveUnclesByBlockHashPgStr = `SELECT uncle_cids.cid, data FROM eth.uncle_cids - INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + uncle_cids.header_id = header_cids.block_hash + AND uncle_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + uncle_cids.mh_key = blocks.key + AND uncle_cids.block_number = blocks.block_number + ) WHERE block_hash = $1` RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data FROM eth.uncle_cids - INNER JOIN eth.header_cids ON (uncle_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + uncle_cids.header_id = header_cids.block_hash + AND uncle_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + uncle_cids.mh_key = blocks.key + AND uncle_cids.block_number = blocks.block_number + ) WHERE block_number = $1` RetrieveUncleByHashPgStr = `SELECT cid, data FROM eth.uncle_cids - INNER JOIN public.blocks ON (uncle_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + uncle_cids.mh_key = blocks.key + AND uncle_cids.block_number = blocks.block_number + ) WHERE block_hash = $1` RetrieveTransactionsByHashesPgStr = `SELECT cid, data FROM eth.transaction_cids - INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + transaction_cids.mh_key = blocks.key + AND transaction_cids.block_number = blocks.block_number + ) WHERE tx_hash = ANY($1::VARCHAR(66)[])` RetrieveTransactionsByBlockHashPgStr = `SELECT transaction_cids.cid, data FROM eth.transaction_cids - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + transaction_cids.header_id = header_cids.block_hash + AND transaction_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + transaction_cids.mh_key = blocks.key + AND transaction_cids.block_number = blocks.block_number + ) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data FROM eth.transaction_cids - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + transaction_cids.header_id = header_cids.block_hash + AND transaction_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + transaction_cids.mh_key = blocks.key + AND transaction_cids.block_number = blocks.block_number + ) WHERE block_number = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveTransactionByHashPgStr = `SELECT cid, data FROM eth.transaction_cids - INNER JOIN public.blocks ON (transaction_cids.mh_key = blocks.key) + INNER JOIN public.blocks ON ( + transaction_cids.mh_key = blocks.key + AND transaction_cids.block_number = blocks.block_number + ) WHERE tx_hash = $1` RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) - INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) + INNER JOIN eth.transaction_cids ON ( + receipt_cids.tx_id = transaction_cids.tx_hash + AND receipt_cids.block_number = transaction_cids.block_number + ) + INNER JOIN public.blocks ON ( + receipt_cids.leaf_mh_key = blocks.key + AND receipt_cids.block_number = blocks.block_number + ) WHERE tx_hash = ANY($1::VARCHAR(66)[])` RetrieveReceiptsByBlockHashPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) + INNER JOIN eth.transaction_cids ON ( + receipt_cids.tx_id = transaction_cids.tx_hash + AND receipt_cids.block_number = transaction_cids.block_number + ) + INNER JOIN eth.header_cids ON ( + transaction_cids.header_id = header_cids.block_hash + AND transaction_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + receipt_cids.leaf_mh_key = blocks.key + AND receipt_cids.block_number = blocks.block_number + ) WHERE block_hash = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) - INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) + INNER JOIN eth.transaction_cids ON ( + receipt_cids.tx_id = transaction_cids.tx_hash + AND receipt_cids.block_number = transaction_cids.block_number + ) + INNER JOIN eth.header_cids ON ( + transaction_cids.header_id = header_cids.block_hash + AND transaction_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + receipt_cids.leaf_mh_key = blocks.key + AND receipt_cids.block_number = blocks.block_number + ) WHERE block_number = $1 ORDER BY eth.transaction_cids.index ASC` RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data FROM eth.receipt_cids - INNER JOIN eth.transaction_cids ON (receipt_cids.tx_id = transaction_cids.tx_hash) - INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = blocks.key) + INNER JOIN eth.transaction_cids ON ( + receipt_cids.tx_id = transaction_cids.tx_hash + AND receipt_cids.block_number = transaction_cids.block_number + ) + INNER JOIN public.blocks ON ( + receipt_cids.leaf_mh_key = blocks.key + AND receipt_cids.block_number = blocks.block_number + ) WHERE tx_hash = $1` RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, data, state_cids.node_type FROM eth.state_cids - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + state_cids.header_id = header_cids.block_hash + AND state_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + state_cids.mh_key = blocks.key + AND state_cids.block_number = blocks.block_number + ) WHERE state_leaf_key = $1 AND block_number <= (SELECT block_number FROM eth.header_cids @@ -121,8 +202,14 @@ const ( LIMIT 1` RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, data, state_cids.node_type FROM eth.state_cids - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key) + INNER JOIN eth.header_cids ON ( + state_cids.header_id = header_cids.block_hash + AND state_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + state_cids.mh_key = blocks.key + AND state_cids.block_number = blocks.block_number + ) WHERE state_leaf_key = $1 AND block_number <= $2 ORDER BY block_number DESC @@ -132,9 +219,16 @@ const ( INNER JOIN eth.state_cids ON ( storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path + AND storage_cids.block_number = state_cids.block_number + ) + INNER JOIN eth.header_cids ON ( + state_cids.header_id = header_cids.block_hash + AND state_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + storage_cids.mh_key = blocks.key + AND storage_cids.block_number = blocks.block_number ) - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 AND storage_leaf_key = $2 AND block_number <= $3 @@ -145,9 +239,16 @@ const ( INNER JOIN eth.state_cids ON ( storage_cids.header_id = state_cids.header_id AND storage_cids.state_path = state_cids.state_path + AND storage_cids.block_number = state_cids.block_number + ) + INNER JOIN eth.header_cids ON ( + state_cids.header_id = header_cids.block_hash + AND state_cids.block_number = header_cids.block_number + ) + INNER JOIN public.blocks ON ( + storage_cids.mh_key = blocks.key + AND storage_cids.block_number = blocks.block_number ) - INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) - INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key) WHERE state_leaf_key = $1 AND storage_leaf_key = $2 AND block_number <= (SELECT block_number diff --git a/pkg/shared/functions.go b/pkg/shared/functions.go index b1ac4550..8989cfb2 100644 --- a/pkg/shared/functions.go +++ b/pkg/shared/functions.go @@ -77,6 +77,13 @@ func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) { return block, tx.Get(&block, pgStr, mhKey) } +// FetchIPLDByMhKeyAndBlockNumber is used to retrieve an ipld from Postgres blockstore with the provided tx, mhkey string and blockNumber +func FetchIPLDByMhKeyAndBlockNumber(tx *sqlx.Tx, mhKey string, blockNumber uint64) ([]byte, error) { + pgStr := `SELECT data FROM public.blocks WHERE key = $1 AND block_number = $2` + var block []byte + return block, tx.Get(&block, pgStr, mhKey, blockNumber) +} + // MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string func MultihashKeyFromCID(c cid.Cid) string { dbKey := dshelp.MultihashToDsKey(c.Hash())