From b3e4fbfa3999a536c305a4d20eca0066dd79e679 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Sat, 14 Aug 2021 19:20:22 +0530 Subject: [PATCH] Fix get logs API. --- .../00016_create_eth_log_cids_table.sql | 40 ++-- pkg/eth/api.go | 81 ++++++- pkg/eth/api_test.go | 38 ++-- pkg/eth/backend_utils.go | 89 ++++++-- pkg/eth/cid_retriever.go | 204 ++++++++---------- pkg/eth/filterer.go | 14 +- pkg/eth/ipld_fetcher.go | 27 +++ pkg/eth/test_helpers/test_data.go | 98 +-------- 8 files changed, 318 insertions(+), 273 deletions(-) diff --git a/db/migrations/00016_create_eth_log_cids_table.sql b/db/migrations/00016_create_eth_log_cids_table.sql index fc25a836..776614b9 100644 --- a/db/migrations/00016_create_eth_log_cids_table.sql +++ b/db/migrations/00016_create_eth_log_cids_table.sql @@ -2,31 +2,25 @@ CREATE TABLE eth.log_cids ( id SERIAL PRIMARY KEY, receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, - address TEXT NOT NULL, + address VARCHAR(66), cid TEXT NOT NULL, + data BYTEA, mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, - block_number BIGINT NOT NULL, - block_hash VARCHAR(66) NOT NULL, - tx_hash VARCHAR(66) NOT NULL, - tx_index INTEGER NOT NULL, index INTEGER NOT NULL, - topic0s VARCHAR(66)[], - topic1s VARCHAR(66)[], - topic2s VARCHAR(66)[], - topic3s VARCHAR(66)[], - UNIQUE (block_hash, tx_hash, index) + topic0 VARCHAR(66), + topic1 VARCHAR(66), + topic2 VARCHAR(66), + topic3 VARCHAR(66), + UNIQUE (receipt_id, index) ); --- TODO: Remove topics from receipts to avoid redundancy. --- ALTER TABLE eth.receipt_cids --- DROP COLUMN topic0s, --- DROP COLUMN topic1s, --- DROP COLUMN topic2s, --- DROP COLUMN topic3s, ALTER TABLE eth.receipt_cids -ADD COLUMN log_root VARCHAR(66); - -CREATE INDEX log_rct_id_index ON eth.log_cids USING btree (receipt_id); +DROP COLUMN topic0s, +DROP COLUMN topic1s, +DROP COLUMN topic2s, +DROP COLUMN topic3s, +DROP COLUMN log_contracts, +ADD COLUMN log_root VARCHAR(66); CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key); @@ -36,28 +30,28 @@ CREATE INDEX log_cid_index ON eth.log_cids USING btree (cid); -- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: - -- -CREATE INDEX log_topic0_index ON eth.log_cids USING gin (topic0s); +CREATE INDEX log_topic0_index ON eth.log_cids USING btree (topic0); -- -- Name: log_topic1_index; Type: INDEX; Schema: eth; Owner: - -- -CREATE INDEX log_topic1_index ON eth.log_cids USING gin (topic1s); +CREATE INDEX log_topic1_index ON eth.log_cids USING btree (topic1); -- -- Name: log_topic2_index; Type: INDEX; Schema: eth; Owner: - -- -CREATE INDEX log_topic2_index ON eth.log_cids USING gin (topic2s); +CREATE INDEX log_topic2_index ON eth.log_cids USING btree (topic2); -- -- Name: log_topic3_index; Type: INDEX; Schema: eth; Owner: - -- -CREATE INDEX log_topic3_index ON eth.log_cids USING gin (topic3s); +CREATE INDEX log_topic3_index ON eth.log_cids USING btree (topic3); -- +goose Down diff --git a/pkg/eth/api.go b/pkg/eth/api.go index 0dfb0f59..73c982fc 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "math/big" + "strconv" "time" "github.com/ethereum/go-ethereum/common" @@ -592,18 +593,54 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log if err != nil { return nil, err } + rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs) if err != nil { return nil, err } - if err := tx.Commit(); err != nil { - return nil, err + + rcptIDs := make([]int64, len(rctCIDs)) + txnIDs := make([]int64, len(rctCIDs)) + for idx, v := range rctCIDs { + rcptIDs[idx] = v.ID + txnIDs[idx] = v.TxID } - block, err := pea.B.BlockByHash(context.Background(), *crit.BlockHash) + + logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs) if err != nil { return nil, err } - return extractLogsOfInterest(pea.B.Config.ChainConfig, *crit.BlockHash, block.NumberU64(), block.Transactions(), rctIPLDs, filter) + + logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs) + if err != nil { + return nil, err + } + + txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs) + if err != nil { + return nil, err + } + + txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs) + if err != nil { + return nil, err + } + + header, err := pea.B.Retriever.RetrieveHeaderCIDByHash(tx, *crit.BlockHash) + if err != nil { + return nil, err + } + + if err = tx.Commit(); err != nil { + return nil, err + } + + blockNumber, err := strconv.ParseUint(header.BlockNumber, 10, 64) + if err != nil { + return nil, err + } + + return extractLogsOfInterest(pea.B.Config.ChainConfig, *crit.BlockHash, blockNumber, rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs) } // Otherwise, create block range from criteria @@ -631,22 +668,44 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log return nil, err } - block, err := pea.B.BlockByNumber(context.Background(), rpc.BlockNumber(i)) - if err != nil { - return nil, err - } - rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs) if err != nil { return nil, err } - log, err := extractLogsOfInterest(pea.B.Config.ChainConfig, block.Hash(), uint64(i), block.Transactions(), rctIPLDs, filter) + rcptIDs := make([]int64, len(rctCIDs)) + txnIDs := make([]int64, len(rctCIDs)) + for idx, v := range rctCIDs { + rcptIDs[idx] = v.ID + txnIDs[idx] = v.TxID + } + + logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs) if err != nil { return nil, err } - logs = append(logs, log...) + logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs) + if err != nil { + return nil, err + } + + txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs) + if err != nil { + return nil, err + } + + txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs) + if err != nil { + return nil, err + } + + header, err := pea.B.Retriever.RetrieveHeaderCIDByNumber(tx, i) + if err != nil { + return nil, err + } + + return extractLogsOfInterest(pea.B.Config.ChainConfig, common.HexToHash(header.BlockHash), uint64(i), rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs) } if err := tx.Commit(); err != nil { diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index 587a0c67..d8056df1 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/shared" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/ipld-eth-server/pkg/eth" @@ -41,16 +42,15 @@ import ( ) var ( - randomAddr = common.HexToAddress("0x1C3ab14BBaD3D99F4203bd7a11aCB94882050E6f") - randomHash = crypto.Keccak256Hash(randomAddr.Bytes()) - number = rpc.BlockNumber(test_helpers.BlockNumber.Int64()) - londonBlockNum = rpc.BlockNumber(test_helpers.LondonBlockNum.Int64()) - wrongNumber = rpc.BlockNumber(number + 1) - blockHash = test_helpers.MockBlock.Header().Hash() - londonBlockHash = test_helpers.MockLondonBlock.Header().Hash() - baseFee = test_helpers.MockLondonBlock.BaseFee() - ctx = context.Background() - expectedBlock = map[string]interface{}{ + randomAddr = common.HexToAddress("0x1C3ab14BBaD3D99F4203bd7a11aCB94882050E6f") + randomHash = crypto.Keccak256Hash(randomAddr.Bytes()) + number = rpc.BlockNumber(test_helpers.BlockNumber.Int64()) + londonBlockNum = rpc.BlockNumber(test_helpers.LondonBlockNum.Int64()) + wrongNumber = rpc.BlockNumber(number + 1) + blockHash = test_helpers.MockBlock.Header().Hash() + baseFee = test_helpers.MockLondonBlock.BaseFee() + ctx = context.Background() + expectedBlock = map[string]interface{}{ "number": (*hexutil.Big)(test_helpers.MockBlock.Number()), "hash": test_helpers.MockBlock.Hash(), "parentHash": test_helpers.MockBlock.ParentHash(), @@ -219,15 +219,21 @@ var _ = Describe("API", func() { api = eth.NewPublicEthAPI(backend, nil, false) tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) Expect(err).ToNot(HaveOccurred()) + for _, node := range test_helpers.MockStateNodes { err = indexAndPublisher.PushStateNode(tx, node) Expect(err).ToNot(HaveOccurred()) } - err = tx.Close(err) + ccHash := sdtypes.CodeAndCodeHash{ + Hash: test_helpers.ContractCodeHash, + Code: test_helpers.ContractCode, + } + + err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash) Expect(err).ToNot(HaveOccurred()) - err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) uncles := test_helpers.MockBlock.Uncles() @@ -236,6 +242,9 @@ var _ = Describe("API", func() { uncleHashes[i] = uncle.Hash() } expectedBlock["uncles"] = uncleHashes + + chainConfig.LondonBlock = big.NewInt(2) + indexAndPublisher = indexer.NewStateDiffIndexer(chainConfig, db) tx, err = indexAndPublisher.PushBlock(test_helpers.MockLondonBlock, test_helpers.MockLondonReceipts, test_helpers.MockLondonBlock.Difficulty()) Expect(err).ToNot(HaveOccurred()) @@ -365,8 +374,7 @@ var _ = Describe("API", func() { Expect(err).ToNot(HaveOccurred()) _, ok := block["baseFee"] Expect(ok).To(Equal(false)) - - block, err = api.GetBlockByHash(ctx, londonBlockHash, false) + block, err = api.GetBlockByHash(ctx, test_helpers.MockLondonBlock.Hash(), false) Expect(err).ToNot(HaveOccurred()) Expect(block["baseFee"].(*big.Int)).To(Equal(baseFee)) }) @@ -516,7 +524,7 @@ var _ = Describe("API", func() { }) It("Retrieves the GasFeeCap and GasTipCap for dynamic transaction from the london block hash", func() { - tx := api.GetTransactionByBlockHashAndIndex(ctx, londonBlockHash, 0) + tx := api.GetTransactionByBlockHashAndIndex(ctx, test_helpers.MockLondonBlock.Hash(), 0) Expect(tx).ToNot(BeNil()) Expect(tx.GasFeeCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasFeeCap()))) Expect(tx.GasTipCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasTipCap()))) diff --git a/pkg/eth/backend_utils.go b/pkg/eth/backend_utils.go index 66e9eac1..26d51e98 100644 --- a/pkg/eth/backend_utils.go +++ b/pkg/eth/backend_utils.go @@ -27,10 +27,12 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/models" ) // RPCMarshalHeader converts the given header to the RPC output. @@ -272,42 +274,93 @@ func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransacti // extractLogsOfInterest returns logs from the receipt IPLD func extractLogsOfInterest(config *params.ChainConfig, blockHash common.Hash, blockNumber uint64, - txs types.Transactions, rctIPLDs []ipfs.BlockModel, filter ReceiptFilter) ([]*types.Log, error) { + rctCIDs []models.ReceiptModel, txnIPLDs, rctIPLDs []ipfs.BlockModel, logIPLDs map[int64]map[int64]ipfs.BlockModel, + txnCIDs []models.TxModel) ([]*types.Log, error) { receipts := make(types.Receipts, len(rctIPLDs)) - for i, rctBytes := range rctIPLDs { + for k, rctBytes := range rctIPLDs { rct := new(types.Receipt) if err := rct.UnmarshalBinary(rctBytes.Data); err != nil { return nil, err } - receipts[i] = rct + + if logModels, ok := logIPLDs[rctCIDs[k].ID]; ok { + idx := 0 + for logIdx, v := range logModels { + l := &types.Log{} + err := rlp.DecodeBytes(v.Data, l) + if err != nil { + return nil, err + } + l.Index = uint(logIdx) + rct.Logs[idx] = l + idx++ + } + } + + receipts[k] = rct } - err := receipts.DeriveFields(config, blockHash, blockNumber, txs) + txns := make(types.Transactions, len(txnIPLDs)) + for idx, txnBytes := range txnIPLDs { + txn := new(types.Transaction) + if err := txn.UnmarshalBinary(txnBytes.Data); err != nil { + return nil, err + } + txns[idx] = txn + } + + receipts, err := deriveFields(receipts, config, blockHash, blockNumber, txns, txnCIDs) if err != nil { return nil, err } - var unfilteredLogs []*types.Log - for _, receipt := range receipts { - unfilteredLogs = append(unfilteredLogs, receipt.Logs...) + var logs []*types.Log + for _, r := range receipts { + logs = append(logs, r.Logs...) } - adders := make([]common.Address, len(filter.LogAddresses)) - for i, addr := range filter.LogAddresses { - adders[i] = common.HexToAddress(addr) - } + return logs, nil +} - topics := make([][]common.Hash, len(filter.Topics)) - for i, v := range filter.Topics { - topics[i] = make([]common.Hash, len(v)) - for j, topic := range v { - topics[i][j] = common.HexToHash(topic) +func deriveFields(rs types.Receipts, config *params.ChainConfig, hash common.Hash, number uint64, txs types.Transactions, + txnCIDs []models.TxModel) (types.Receipts, error) { + + signer := types.MakeSigner(config, new(big.Int).SetUint64(number)) + for i := 0; i < len(rs); i++ { + // The transaction type and hash can be retrieved from the transaction itself + rs[i].Type = txs[i].Type() + rs[i].TxHash = txs[i].Hash() + + // block location fields + rs[i].BlockHash = hash + rs[i].BlockNumber = new(big.Int).SetUint64(number) + rs[i].TransactionIndex = uint(txnCIDs[i].Index) + + // The contract address can be derived from the transaction itself + if txs[i].To() == nil { + // Deriving the signer is expensive, only do if it's actually needed + from, _ := types.Sender(signer, txs[i]) + rs[i].ContractAddress = crypto.CreateAddress(from, txs[i].Nonce()) + } + // The used gas can be calculated based on previous r + if i == 0 { + rs[i].GasUsed = rs[i].CumulativeGasUsed + } else { + rs[i].GasUsed = rs[i].CumulativeGasUsed - rs[i-1].CumulativeGasUsed + } + for j := 0; j < len(rs[i].Logs); j++ { + + } + for j := 0; j < len(rs[i].Logs); j++ { + rs[i].Logs[j].BlockNumber = number + rs[i].Logs[j].BlockHash = hash + rs[i].Logs[j].TxHash = rs[i].TxHash + rs[i].Logs[j].TxIndex = uint(txnCIDs[i].Index) } } - logs := filterLogs(unfilteredLogs, nil, nil, adders, topics) - return logs, nil + return rs, nil } func includes(addresses []common.Address, a common.Address) bool { diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index d83ea605..e72d254b 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -209,44 +209,40 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID return results, tx.Select(&results, pgStr, args...) } -// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided -// filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) { - log.Debug("retrieving receipt cids for header id ", headerID) - args := make([]interface{}, 0, 4) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, - receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts - FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.id = $1` - id := 2 - args = append(args, headerID) +func topicsQuery(id int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}, int) { + for i, topicSet := range topics { + if len(topicSet) == 0 { + continue + } + + if !first { + pgStr += " AND" + } else { + first = false + } + pgStr += fmt.Sprintf(` eth.log_cids.topic%d = ANY ($%d)`, i, id) + args = append(args, pq.Array(topicSet)) + id++ + } + return pgStr, args, id +} + +func retrieveFilteredRctQuery(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter, trxIds []int64) (string, []interface{}, int) { + rctCond := " AND (receipt_cids.id = ANY ( " + logQuery := "SELECT receipt_id FROM eth.log_cids WHERE" if len(rctFilter.LogAddresses) > 0 { // Filter on log contract addresses if there are any - pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id) + pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, id) args = append(args, pq.Array(rctFilter.LogAddresses)) id++ + // Filter on topics if there are any if hasTopics(rctFilter.Topics) { - pgStr += " AND (" - first := true - for i, topicSet := range rctFilter.Topics { - if i < 4 && len(topicSet) > 0 { - if first { - pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - first = false - } else { - pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - } - args = append(args, pq.Array(topicSet)) - id++ - } - } - pgStr += ")" + pgStr, args, id = topicsQuery(id, rctFilter.Topics, args, pgStr, false) } + pgStr += ")" + // Filter on txIDs if there are any and we are matching txs if rctFilter.MatchTxs && len(trxIds) > 0 { pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) @@ -256,20 +252,8 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip } else { // If there are no contract addresses to filter on // Filter on topics if there are any if hasTopics(rctFilter.Topics) { - pgStr += " AND ((" - first := true - for i, topicSet := range rctFilter.Topics { - if i < 4 && len(topicSet) > 0 { - if first { - pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - first = false - } else { - pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - } - args = append(args, pq.Array(topicSet)) - id++ - } - } + pgStr += rctCond + logQuery + pgStr, args, id = topicsQuery(id, rctFilter.Topics, args, pgStr, true) pgStr += ")" // Filter on txIDs if there are any and we are matching txs if rctFilter.MatchTxs && len(trxIds) > 0 { @@ -284,6 +268,43 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip args = append(args, pq.Array(trxIds)) } } + + return pgStr, args, id +} + +// RetrieveLogCID retrieves and returns all of the log cids at the provided receipt IDs +func (ecr *CIDRetriever) RetrieveLogCID(tx *sqlx.Tx, rcptIDs []int64) ([]models.LogsModel, error) { + log.Debug("retrieving log cids for receipt ids") + args := make([]interface{}, 0, 4) + pgStr := `SELECT eth.log_cids.id, eth.log_cids.index, eth.log_cids.receipt_id, eth.log_cids.address, eth.log_cids.cid, eth.log_cids.mh_key, + eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3 + FROM eth.log_cids WHERE eth.log_cids.receipt_id = ANY ( $1 ) + ORDER BY log_cids.index` + args = append(args, pq.Array(rcptIDs)) + logCIDs := make([]models.LogsModel, 0) + err := tx.Select(&logCIDs, pgStr, args...) + if err != nil { + return nil, err + } + + return logCIDs, nil +} + +// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided +// filter parameters and correspond to the provided tx ids +func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) { + log.Debug("retrieving receipt cids for header id ", headerID) + args := make([]interface{}, 0, 4) + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, + receipt_cids.contract, receipt_cids.contract_hash + FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.id = $1` + id := 2 + args = append(args, headerID) + pgStr, args, id = retrieveFilteredRctQuery(id, pgStr, args, rctFilter, trxIds) + pgStr += ` ORDER BY transaction_cids.index` receiptCids := make([]models.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) @@ -295,8 +316,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, - receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts + receipt_cids.contract, receipt_cids.contract_hash,receipt_cids.log_root FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id AND transaction_cids.header_id = header_cids.id` @@ -312,74 +332,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b id++ } - // TODO: Add the below filters when we have log index in DB. - if true { - pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]models.ReceiptModel, 0) - return receiptCids, tx.Select(&receiptCids, pgStr, args...) - } - - if len(rctFilter.LogAddresses) > 0 { - // Filter on log contract addresses if there are any - pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id) - args = append(args, pq.Array(rctFilter.LogAddresses)) - id++ - // Filter on topics if there are any - if hasTopics(rctFilter.Topics) { - pgStr += " AND (" - first := true - for i, topicSet := range rctFilter.Topics { - if i < 4 && len(topicSet) > 0 { - if first { - pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - first = false - } else { - pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - } - args = append(args, pq.Array(topicSet)) - id++ - } - } - pgStr += ")" - } - pgStr += ")" - // Filter on txIDs if there are any and we are matching txs - if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) - args = append(args, pq.Array(trxIds)) - } - pgStr += ")" - } else { // If there are no contract addresses to filter on - // Filter on topics if there are any - if hasTopics(rctFilter.Topics) { - pgStr += " AND ((" - first := true - for i, topicSet := range rctFilter.Topics { - if i < 4 && len(topicSet) > 0 { - if first { - pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - first = false - } else { - pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) - } - args = append(args, pq.Array(topicSet)) - id++ - } - } - pgStr += ")" - // Filter on txIDs if there are any and we are matching txs - if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) - args = append(args, pq.Array(trxIds)) - } - pgStr += ")" - } else if rctFilter.MatchTxs && len(trxIds) > 0 { - // If there are no contract addresses or topics to filter on, - // Filter on txIDs if there are any and we are matching txs - pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) - args = append(args, pq.Array(trxIds)) - } - } + pgStr, args, id = retrieveFilteredRctQuery(id, pgStr, args, rctFilter, trxIds) pgStr += ` ORDER BY transaction_cids.index` receiptCids := make([]models.ReceiptModel, 0) @@ -565,6 +518,15 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) } +// RetrieveHeaderCIDByNumber returns the header for the given block number +func (ecr *CIDRetriever) RetrieveHeaderCIDByNumber(tx *sqlx.Tx, blockNumber int64) (models.HeaderModel, error) { + log.Debug("retrieving header cids for block hash ", blockNumber) + pgStr := `SELECT * FROM eth.header_cids + WHERE block_number = $1` + var headerCID models.HeaderModel + return headerCID, tx.Get(&headerCID, pgStr, blockNumber) +} + // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) @@ -579,8 +541,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ( func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]models.ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, - receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts + receipt_cids.contract, receipt_cids.contract_hash FROM eth.receipt_cids, eth.transaction_cids WHERE tx_id = ANY($1::INTEGER[]) AND receipt_cids.tx_id = transaction_cids.id @@ -588,3 +549,14 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) var rctCIDs []models.ReceiptModel return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) } + +func (ecr *CIDRetriever) RetrieveTxCIDsByReceipt(tx *sqlx.Tx, txIDs []int64) ([]models.TxModel, error) { + log.Debugf("retrieving receipt cids for tx ids %v", txIDs) + pgStr := `SELECT transaction_cids.id,transaction_cids.mh_key,transaction_cids.cid, + transaction_cids.tx_hash,transaction_cids.index,transaction_cids.tx_type + FROM eth.transaction_cids WHERE eth.transaction_cids.id = ANY ( $1 ) + ORDER BY transaction_cids.index` + + var txnCIDs []models.TxModel + return txnCIDs, tx.Select(&txnCIDs, pgStr, pq.Array(txIDs)) +} diff --git a/pkg/eth/filterer.go b/pkg/eth/filterer.go index 6bea92c5..09774c73 100644 --- a/pkg/eth/filterer.go +++ b/pkg/eth/filterer.go @@ -165,10 +165,18 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) - for i, receipt := range payload.Receipts { + for _, receipt := range payload.Receipts { // topics is always length 4 - topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} - if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, payload.ReceiptMetaData[i].LogContracts, trxHashes) { + topics := make([][]string, 4) + contracts := make([]string, len(receipt.Logs)) + for _, l := range receipt.Logs { + contracts = append(contracts, l.Address.String()) + for idx, t := range l.Topics { + topics[idx] = append(topics[idx], t.String()) + } + } + + if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) { receiptBuffer := new(bytes.Buffer) if err := receipt.EncodeRLP(receiptBuffer); err != nil { return err diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index 57e26d6f..bd4c4986 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -166,6 +166,33 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs return rctIPLDs, nil } +// FetchLogs fetches logs +func (f *IPLDFetcher) FetchLogs(tx *sqlx.Tx, logCIDs []models.LogsModel) (map[int64]map[int64]ipfs.BlockModel, error) { + log.Debug("fetching log iplds") + + // receipt id and log index as key to store log IPLD at log index inside receipt. + logIPLDs := make(map[int64]map[int64]ipfs.BlockModel, len(logCIDs)) + for _, l := range logCIDs { + logBytes, err := shared.FetchIPLDByMhKey(tx, l.MhKey) + if err != nil { + return nil, err + } + + if v, ok := logIPLDs[l.ReceiptID]; ok { + v[l.Index] = ipfs.BlockModel{ + Data: logBytes, + CID: l.CID, + } + continue + } + + logIPLDs[l.ReceiptID] = map[int64]ipfs.BlockModel{ + l.Index: {Data: logBytes, CID: l.CID}, + } + } + return logIPLDs, nil +} + // FetchState fetches state nodes func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) { log.Debug("fetching state iplds") diff --git a/pkg/eth/test_helpers/test_data.go b/pkg/eth/test_helpers/test_data.go index 167cc204..53cf89a5 100644 --- a/pkg/eth/test_helpers/test_data.go +++ b/pkg/eth/test_helpers/test_data.go @@ -208,80 +208,42 @@ var ( } MockRctMeta = []models.ReceiptModel{ { - CID: "", - MhKey: "", - Topic0s: []string{ - mockTopic11.String(), - }, - Topic1s: []string{ - mockTopic12.String(), - }, + CID: "", + MhKey: "", Contract: "", ContractHash: "", - LogContracts: []string{ - Address.String(), - }, }, { - CID: "", - MhKey: "", - Topic0s: []string{ - mockTopic21.String(), - }, - Topic1s: []string{ - mockTopic22.String(), - }, + CID: "", + MhKey: "", Contract: "", ContractHash: "", - LogContracts: []string{ - AnotherAddress.String(), - }, }, { CID: "", MhKey: "", Contract: ContractAddress.String(), ContractHash: ContractHash, - LogContracts: []string{}, }, } MockRctMetaPostPublish = []models.ReceiptModel{ { - CID: Rct1CID.String(), - MhKey: Rct1MhKey, - Topic0s: []string{ - mockTopic11.String(), - }, - Topic1s: []string{ - mockTopic12.String(), - }, + CID: Rct1CID.String(), + MhKey: Rct1MhKey, Contract: "", ContractHash: "", - LogContracts: []string{ - Address.String(), - }, }, { - CID: Rct2CID.String(), - MhKey: Rct2MhKey, - Topic0s: []string{ - mockTopic21.String(), - }, - Topic1s: []string{ - mockTopic22.String(), - }, + CID: Rct2CID.String(), + MhKey: Rct2MhKey, Contract: "", ContractHash: "", - LogContracts: []string{ - AnotherAddress.String(), - }, }, { CID: Rct3CID.String(), MhKey: Rct3MhKey, Contract: ContractAddress.String(), ContractHash: ContractHash, - LogContracts: []string{}, }, } @@ -524,46 +486,8 @@ var ( BaseFee: big.NewInt(params.InitialBaseFee), } - MockLondonTransactions, MockLondonReceipts, SenderAdd = createDynamicTransactionsAndReceipts(LondonBlockNum) - MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie)) - MockLondonTrxMeta = []models.TxModel{ - { - CID: "", // This is empty until we go to publish to ipfs - MhKey: "", - Src: SenderAdd.Hex(), - Dst: Address.String(), - Index: 0, - TxHash: MockLondonTransactions[0].Hash().String(), - Data: []byte{}, - }, - } - MockLondonRctMeta = []models.ReceiptModel{ - { - CID: "", - MhKey: "", - Topic0s: []string{ - mockTopic11.String(), - }, - Topic1s: []string{ - mockTopic12.String(), - }, - Contract: "", - ContractHash: "", - LogContracts: []string{ - Address.String(), - }, - }, - } - - MockConvertedLondonPayload = eth.ConvertedPayload{ - TotalDifficulty: MockLondonBlock.Difficulty(), - Block: MockLondonBlock, - Receipts: MockLondonReceipts, - TxMetaData: MockLondonTrxMeta, - ReceiptMetaData: MockLondonRctMeta, - StorageNodes: MockStorageNodes, - StateNodes: MockStateNodes, - } + MockLondonTransactions, MockLondonReceipts, _ = createDynamicTransactionsAndReceipts(LondonBlockNum) + MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie)) ) func createNewBlock(header *types.Header, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt, hasher types.TrieHasher) *types.Block { @@ -614,7 +538,7 @@ func createDynamicTransactionsAndReceipts(blockNumber *big.Int) (types.Transacti // https://github.com/ethereum/go-ethereum/pull/22806 mockReceipt1 := &types.Receipt{ Type: types.DynamicFeeTxType, - PostState: common.HexToHash("0x1").Bytes(), + PostState: common.HexToHash("0x0").Bytes(), Status: types.ReceiptStatusSuccessful, CumulativeGasUsed: 50, Logs: []*types.Log{},