From d7ab0e0863a5e6f35c9f315b57c7f8e113f12f62 Mon Sep 17 00:00:00 2001 From: nabarun Date: Fri, 27 May 2022 19:16:10 +0530 Subject: [PATCH] Implement single query for transactions and blockByMhKey --- go.mod | 1 + pkg/eth/cid_retriever.go | 42 +++++++++++++++++++---- pkg/eth/ipld_fetcher.go | 35 ++++++++++++++++++- pkg/graphql/graphql.go | 73 ++++++++++++++++++++++++++++++++++++++++ pkg/graphql/schema.go | 17 ++++++++++ pkg/shared/functions.go | 14 ++++++++ 6 files changed, 174 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index ef9eae11..72b836bb 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.11.0 + github.com/thoas/go-funk v0.9.2 github.com/vulcanize/eth-ipfs-state-validator/v4 v4.0.0-alpha github.com/vulcanize/gap-filler v0.3.1 github.com/vulcanize/ipfs-ethdb/v4 v4.0.0-alpha diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index b804d462..53d147e3 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -27,6 +27,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" + "github.com/thoas/go-funk" "github.com/vulcanize/ipld-eth-server/v4/pkg/shared" ) @@ -626,6 +627,17 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID string, return txCIDs, tx.Select(&txCIDs, pgStr, headerID, blockNumber) } +func (ecr *CIDRetriever) RetrieveTxCIDsByBlockNumber(tx *sqlx.Tx, blockNumber int64) ([]models.TxModel, error) { + log.Debug("retrieving tx cids for block number ", blockNumber) + pgStr := `SELECT CAST(block_number as Text), header_id, index, tx_hash, cid, mh_key, + dst, src, tx_data, tx_type, value + FROM eth.transaction_cids + WHERE block_number = $1 + ORDER BY index` + var txCIDs []models.TxModel + return txCIDs, tx.Select(&txCIDs, pgStr, blockNumber) +} + // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txHashes []string) ([]models.ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx hashes %v", txHashes) @@ -670,13 +682,30 @@ func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockNumber(blockNumber int64) } var allTxCIDs [][]models.TxModel + txCIDs, err := ecr.RetrieveTxCIDsByBlockNumber(tx, blockNumber) + if err != nil { + log.Error("tx cid retrieval error") + return nil, nil, err + } + + txCIDsByHeaderID := funk.Reduce( + txCIDs, + func(acc map[string][]models.TxModel, txCID models.TxModel) map[string][]models.TxModel { + if _, ok := acc[txCID.HeaderID]; !ok { + acc[txCID.HeaderID] = []models.TxModel{} + } + + txCIDs = append(acc[txCID.HeaderID], txCID) + acc[txCID.HeaderID] = txCIDs + return acc + }, + make(map[string][]models.TxModel), + ) + + txCIDsByHeaderIDMap := txCIDsByHeaderID.(map[string][]models.TxModel) + for _, headerCID := range headerCIDs { - var txCIDs []models.TxModel - txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash, blockNumber) - if err != nil { - log.Error("tx cid retrieval error") - return nil, nil, err - } + txCIDs := txCIDsByHeaderIDMap[headerCID.BlockHash] allTxCIDs = append(allTxCIDs, txCIDs) } @@ -712,7 +741,6 @@ func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockHash(blockHash common.Has if err != nil { return models.HeaderModel{}, nil, err } - fmt.Println("RetrieveHeaderAndTxCIDsByBlockHash", headerCID.ParentHash, headerCID.Timestamp) var txCIDs []models.TxModel txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash, blockNumber) diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index e8b459d5..45185b17 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" + "github.com/thoas/go-funk" "github.com/vulcanize/ipld-eth-server/v4/pkg/shared" ) @@ -100,7 +101,7 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) { return iplds, err } -// FetchHeaders fetches headers +// FetchHeader fetches header func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) { log.Debug("fetching header ipld") blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) @@ -119,6 +120,38 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPL }, nil } +// FetchHeaders fetches headers +func (f *IPLDFetcher) FetchHeaders(tx *sqlx.Tx, cids []models.HeaderModel) ([]models.IPLDModel, error) { + log.Debug("fetching header iplds") + headerIPLDs := make([]models.IPLDModel, len(cids)) + + blockNumbers := make([]uint64, len(cids)) + mhKeys := make([]string, len(cids)) + for i, c := range cids { + var err error + mhKeys[i] = c.MhKey + blockNumbers[i], err = strconv.ParseUint(c.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + } + + fetchedIPLDs, err := shared.FetchIPLDsByMhKeysAndBlockNumbers(tx, mhKeys, blockNumbers) + if err != nil { + return nil, err + } + + for i, c := range cids { + headerIPLD := funk.Find(fetchedIPLDs, func(ipld models.IPLDModel) bool { + return ipld.Key == c.MhKey + }).(models.IPLDModel) + + headerIPLDs[i] = headerIPLD + } + + return headerIPLDs, nil +} + // FetchUncles fetches uncles func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) { log.Debug("fetching uncle iplds") diff --git a/pkg/graphql/graphql.go b/pkg/graphql/graphql.go index 1d1fae8c..6dd6baf8 100644 --- a/pkg/graphql/graphql.go +++ b/pkg/graphql/graphql.go @@ -1161,13 +1161,33 @@ func (transactionCIDResult EthTransactionCidsConnection) Nodes(ctx context.Conte return transactionCIDResult.nodes } +type IPFSBlock struct { + key string + data string +} + +func (b IPFSBlock) Key(ctx context.Context) string { + return b.key +} + +func (b IPFSBlock) Data(ctx context.Context) string { + return b.data +} + type EthHeaderCid struct { cid string blockNumber BigInt blockHash string parentHash string timestamp BigInt + stateRoot string + td BigInt + txRoot string + receiptRoot string + uncleRoot string + bloom string transactions []*EthTransactionCid + ipfsBlock IPFSBlock } func (h EthHeaderCid) Cid(ctx context.Context) string { @@ -1190,10 +1210,38 @@ func (h EthHeaderCid) Timestamp(ctx context.Context) BigInt { return h.timestamp } +func (h EthHeaderCid) StateRoot(ctx context.Context) string { + return h.stateRoot +} + +func (h EthHeaderCid) Td(ctx context.Context) BigInt { + return h.td +} + +func (h EthHeaderCid) TxRoot(ctx context.Context) string { + return h.txRoot +} + +func (h EthHeaderCid) ReceiptRoot(ctx context.Context) string { + return h.receiptRoot +} + +func (h EthHeaderCid) UncleRoot(ctx context.Context) string { + return h.uncleRoot +} + +func (h EthHeaderCid) Bloom(ctx context.Context) string { + return h.bloom +} + func (h EthHeaderCid) EthTransactionCidsByHeaderId(ctx context.Context) EthTransactionCidsConnection { return EthTransactionCidsConnection{nodes: h.transactions} } +func (h EthHeaderCid) BlockByMhKey(ctx context.Context) IPFSBlock { + return h.ipfsBlock +} + type EthHeaderCidsConnection struct { nodes []*EthHeaderCid } @@ -1230,6 +1278,17 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct { return nil, fmt.Errorf("provide block number or block hash") } + // Begin tx + tx, err := r.backend.DB.Beginx() + if err != nil { + return nil, err + } + + headerIPLDs, err := r.backend.Fetcher.FetchHeaders(tx, headerCIDs) + if err != nil { + return nil, err + } + var resultNodes []*EthHeaderCid for idx, headerCID := range headerCIDs { var blockNumber BigInt @@ -1238,12 +1297,21 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct { var timestamp BigInt timestamp.SetUint64(headerCID.Timestamp) + var td BigInt + td.UnmarshalText([]byte(headerCID.TotalDifficulty)) + ethHeaderCidNode := EthHeaderCid{ cid: headerCID.CID, blockNumber: blockNumber, blockHash: headerCID.BlockHash, parentHash: headerCID.ParentHash, timestamp: timestamp, + stateRoot: headerCID.StateRoot, + td: td, + txRoot: headerCID.TxRoot, + receiptRoot: headerCID.RctRoot, + uncleRoot: headerCID.UncleRoot, + bloom: hexutil.Bytes(headerCID.Bloom).String(), } txCIDs := allTxCIDs[idx] @@ -1257,6 +1325,11 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct { }) } + ethHeaderCidNode.ipfsBlock = IPFSBlock{ + key: headerIPLDs[idx].Key, + data: hexutil.Bytes(headerIPLDs[idx].Data).String(), + } + resultNodes = append(resultNodes, ðHeaderCidNode) } diff --git a/pkg/graphql/schema.go b/pkg/graphql/schema.go index b8f5ef6d..2a25043e 100644 --- a/pkg/graphql/schema.go +++ b/pkg/graphql/schema.go @@ -29,6 +29,8 @@ const schema string = ` scalar BigInt # Long is a 64 bit unsigned integer. scalar Long + # BigFloat is a floating point number. + scalar BigFloat schema { query: Query @@ -298,13 +300,28 @@ const schema string = ` nodes: [EthTransactionCid]! } + type IPFSBlock { + key: String! + data: String! + } + type EthHeaderCid { cid: String! blockNumber: BigInt! blockHash: String! parentHash: String! timestamp: BigInt! + stateRoot: String! + + # TODO: Use BigFloat + td: BigInt! + + txRoot: String! + receiptRoot: String! + uncleRoot: String! + bloom: String! ethTransactionCidsByHeaderId: EthTransactionCidsConnection! + blockByMhKey: IPFSBlock! } type EthHeaderCidsConnection { diff --git a/pkg/shared/functions.go b/pkg/shared/functions.go index 8c78d0d4..f4d3f73a 100644 --- a/pkg/shared/functions.go +++ b/pkg/shared/functions.go @@ -18,6 +18,7 @@ package shared import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -55,6 +56,19 @@ func FetchIPLDByMhKeyAndBlockNumber(tx *sqlx.Tx, mhKey string, blockNumber uint6 return block, tx.Get(&block, pgStr, mhKey, blockNumber) } +// FetchIPLDByMhKeysAndBlockNumbers is used to retrieve iplds from Postgres blockstore with the provided tx, mhkey strings and blockNumbers +func FetchIPLDsByMhKeysAndBlockNumbers(tx *sqlx.Tx, mhKeys []string, blockNumbers []uint64) ([]models.IPLDModel, error) { + var blocks []models.IPLDModel + pgStr := `SELECT key, data, block_number FROM public.blocks WHERE key IN (?) AND block_number IN (?)` + query, args, err := sqlx.In(pgStr, mhKeys, blockNumbers) + if err != nil { + return blocks, err + } + query = tx.Rebind(query) + + return blocks, tx.Select(&blocks, query, args...) +} + // MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string func MultihashKeyFromCID(c cid.Cid) string { dbKey := dshelp.MultihashToDsKey(c.Hash())