Fix get logs API.

This commit is contained in:
Arijit Das 2021-08-14 19:20:22 +05:30
parent d8a5358a70
commit b3e4fbfa39
8 changed files with 318 additions and 273 deletions

View File

@ -2,31 +2,25 @@
CREATE TABLE eth.log_cids ( CREATE TABLE eth.log_cids (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
address TEXT NOT NULL, address VARCHAR(66),
cid TEXT NOT NULL, cid TEXT NOT NULL,
data BYTEA,
mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
tx_index INTEGER NOT NULL,
index INTEGER NOT NULL, index INTEGER NOT NULL,
topic0s VARCHAR(66)[], topic0 VARCHAR(66),
topic1s VARCHAR(66)[], topic1 VARCHAR(66),
topic2s VARCHAR(66)[], topic2 VARCHAR(66),
topic3s VARCHAR(66)[], topic3 VARCHAR(66),
UNIQUE (block_hash, tx_hash, index) UNIQUE (receipt_id, index)
); );
-- TODO: Remove topics from receipts to avoid redundancy.
-- ALTER TABLE eth.receipt_cids
-- DROP COLUMN topic0s,
-- DROP COLUMN topic1s,
-- DROP COLUMN topic2s,
-- DROP COLUMN topic3s,
ALTER TABLE eth.receipt_cids ALTER TABLE eth.receipt_cids
ADD COLUMN log_root VARCHAR(66); DROP COLUMN topic0s,
DROP COLUMN topic1s,
CREATE INDEX log_rct_id_index ON eth.log_cids USING btree (receipt_id); DROP COLUMN topic2s,
DROP COLUMN topic3s,
DROP COLUMN log_contracts,
ADD COLUMN log_root VARCHAR(66);
CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key); CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key);
@ -36,28 +30,28 @@ CREATE INDEX log_cid_index ON eth.log_cids USING btree (cid);
-- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: - -- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: -
-- --
CREATE INDEX log_topic0_index ON eth.log_cids USING gin (topic0s); CREATE INDEX log_topic0_index ON eth.log_cids USING btree (topic0);
-- --
-- Name: log_topic1_index; Type: INDEX; Schema: eth; Owner: - -- Name: log_topic1_index; Type: INDEX; Schema: eth; Owner: -
-- --
CREATE INDEX log_topic1_index ON eth.log_cids USING gin (topic1s); CREATE INDEX log_topic1_index ON eth.log_cids USING btree (topic1);
-- --
-- Name: log_topic2_index; Type: INDEX; Schema: eth; Owner: - -- Name: log_topic2_index; Type: INDEX; Schema: eth; Owner: -
-- --
CREATE INDEX log_topic2_index ON eth.log_cids USING gin (topic2s); CREATE INDEX log_topic2_index ON eth.log_cids USING btree (topic2);
-- --
-- Name: log_topic3_index; Type: INDEX; Schema: eth; Owner: - -- Name: log_topic3_index; Type: INDEX; Schema: eth; Owner: -
-- --
CREATE INDEX log_topic3_index ON eth.log_cids USING gin (topic3s); CREATE INDEX log_topic3_index ON eth.log_cids USING btree (topic3);
-- +goose Down -- +goose Down

View File

@ -24,6 +24,7 @@ import (
"fmt" "fmt"
"io" "io"
"math/big" "math/big"
"strconv"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -592,18 +593,54 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
if err != nil { if err != nil {
return nil, err return nil, err
} }
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs) rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := tx.Commit(); err != nil {
return nil, err rcptIDs := make([]int64, len(rctCIDs))
txnIDs := make([]int64, len(rctCIDs))
for idx, v := range rctCIDs {
rcptIDs[idx] = v.ID
txnIDs[idx] = v.TxID
} }
block, err := pea.B.BlockByHash(context.Background(), *crit.BlockHash)
logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return extractLogsOfInterest(pea.B.Config.ChainConfig, *crit.BlockHash, block.NumberU64(), block.Transactions(), rctIPLDs, filter)
logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs)
if err != nil {
return nil, err
}
txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs)
if err != nil {
return nil, err
}
txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs)
if err != nil {
return nil, err
}
header, err := pea.B.Retriever.RetrieveHeaderCIDByHash(tx, *crit.BlockHash)
if err != nil {
return nil, err
}
if err = tx.Commit(); err != nil {
return nil, err
}
blockNumber, err := strconv.ParseUint(header.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
return extractLogsOfInterest(pea.B.Config.ChainConfig, *crit.BlockHash, blockNumber, rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs)
} }
// Otherwise, create block range from criteria // Otherwise, create block range from criteria
@ -631,22 +668,44 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
return nil, err return nil, err
} }
block, err := pea.B.BlockByNumber(context.Background(), rpc.BlockNumber(i))
if err != nil {
return nil, err
}
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs) rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
log, err := extractLogsOfInterest(pea.B.Config.ChainConfig, block.Hash(), uint64(i), block.Transactions(), rctIPLDs, filter) rcptIDs := make([]int64, len(rctCIDs))
txnIDs := make([]int64, len(rctCIDs))
for idx, v := range rctCIDs {
rcptIDs[idx] = v.ID
txnIDs[idx] = v.TxID
}
logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
logs = append(logs, log...) logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs)
if err != nil {
return nil, err
}
txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs)
if err != nil {
return nil, err
}
txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs)
if err != nil {
return nil, err
}
header, err := pea.B.Retriever.RetrieveHeaderCIDByNumber(tx, i)
if err != nil {
return nil, err
}
return extractLogsOfInterest(pea.B.Config.ChainConfig, common.HexToHash(header.BlockHash), uint64(i), rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs)
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
@ -41,16 +42,15 @@ import (
) )
var ( var (
randomAddr = common.HexToAddress("0x1C3ab14BBaD3D99F4203bd7a11aCB94882050E6f") randomAddr = common.HexToAddress("0x1C3ab14BBaD3D99F4203bd7a11aCB94882050E6f")
randomHash = crypto.Keccak256Hash(randomAddr.Bytes()) randomHash = crypto.Keccak256Hash(randomAddr.Bytes())
number = rpc.BlockNumber(test_helpers.BlockNumber.Int64()) number = rpc.BlockNumber(test_helpers.BlockNumber.Int64())
londonBlockNum = rpc.BlockNumber(test_helpers.LondonBlockNum.Int64()) londonBlockNum = rpc.BlockNumber(test_helpers.LondonBlockNum.Int64())
wrongNumber = rpc.BlockNumber(number + 1) wrongNumber = rpc.BlockNumber(number + 1)
blockHash = test_helpers.MockBlock.Header().Hash() blockHash = test_helpers.MockBlock.Header().Hash()
londonBlockHash = test_helpers.MockLondonBlock.Header().Hash() baseFee = test_helpers.MockLondonBlock.BaseFee()
baseFee = test_helpers.MockLondonBlock.BaseFee() ctx = context.Background()
ctx = context.Background() expectedBlock = map[string]interface{}{
expectedBlock = map[string]interface{}{
"number": (*hexutil.Big)(test_helpers.MockBlock.Number()), "number": (*hexutil.Big)(test_helpers.MockBlock.Number()),
"hash": test_helpers.MockBlock.Hash(), "hash": test_helpers.MockBlock.Hash(),
"parentHash": test_helpers.MockBlock.ParentHash(), "parentHash": test_helpers.MockBlock.ParentHash(),
@ -219,15 +219,21 @@ var _ = Describe("API", func() {
api = eth.NewPublicEthAPI(backend, nil, false) api = eth.NewPublicEthAPI(backend, nil, false)
tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
for _, node := range test_helpers.MockStateNodes { for _, node := range test_helpers.MockStateNodes {
err = indexAndPublisher.PushStateNode(tx, node) err = indexAndPublisher.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
} }
err = tx.Close(err) ccHash := sdtypes.CodeAndCodeHash{
Hash: test_helpers.ContractCodeHash,
Code: test_helpers.ContractCode,
}
err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
uncles := test_helpers.MockBlock.Uncles() uncles := test_helpers.MockBlock.Uncles()
@ -236,6 +242,9 @@ var _ = Describe("API", func() {
uncleHashes[i] = uncle.Hash() uncleHashes[i] = uncle.Hash()
} }
expectedBlock["uncles"] = uncleHashes expectedBlock["uncles"] = uncleHashes
chainConfig.LondonBlock = big.NewInt(2)
indexAndPublisher = indexer.NewStateDiffIndexer(chainConfig, db)
tx, err = indexAndPublisher.PushBlock(test_helpers.MockLondonBlock, test_helpers.MockLondonReceipts, test_helpers.MockLondonBlock.Difficulty()) tx, err = indexAndPublisher.PushBlock(test_helpers.MockLondonBlock, test_helpers.MockLondonReceipts, test_helpers.MockLondonBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -365,8 +374,7 @@ var _ = Describe("API", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
_, ok := block["baseFee"] _, ok := block["baseFee"]
Expect(ok).To(Equal(false)) Expect(ok).To(Equal(false))
block, err = api.GetBlockByHash(ctx, test_helpers.MockLondonBlock.Hash(), false)
block, err = api.GetBlockByHash(ctx, londonBlockHash, false)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(block["baseFee"].(*big.Int)).To(Equal(baseFee)) Expect(block["baseFee"].(*big.Int)).To(Equal(baseFee))
}) })
@ -516,7 +524,7 @@ var _ = Describe("API", func() {
}) })
It("Retrieves the GasFeeCap and GasTipCap for dynamic transaction from the london block hash", func() { It("Retrieves the GasFeeCap and GasTipCap for dynamic transaction from the london block hash", func() {
tx := api.GetTransactionByBlockHashAndIndex(ctx, londonBlockHash, 0) tx := api.GetTransactionByBlockHashAndIndex(ctx, test_helpers.MockLondonBlock.Hash(), 0)
Expect(tx).ToNot(BeNil()) Expect(tx).ToNot(BeNil())
Expect(tx.GasFeeCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasFeeCap()))) Expect(tx.GasFeeCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasFeeCap())))
Expect(tx.GasTipCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasTipCap()))) Expect(tx.GasTipCap).To(Equal((*hexutil.Big)(test_helpers.MockLondonTransactions[0].GasTipCap())))

View File

@ -27,10 +27,12 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
) )
// RPCMarshalHeader converts the given header to the RPC output. // RPCMarshalHeader converts the given header to the RPC output.
@ -272,42 +274,93 @@ func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransacti
// extractLogsOfInterest returns logs from the receipt IPLD // extractLogsOfInterest returns logs from the receipt IPLD
func extractLogsOfInterest(config *params.ChainConfig, blockHash common.Hash, blockNumber uint64, func extractLogsOfInterest(config *params.ChainConfig, blockHash common.Hash, blockNumber uint64,
txs types.Transactions, rctIPLDs []ipfs.BlockModel, filter ReceiptFilter) ([]*types.Log, error) { rctCIDs []models.ReceiptModel, txnIPLDs, rctIPLDs []ipfs.BlockModel, logIPLDs map[int64]map[int64]ipfs.BlockModel,
txnCIDs []models.TxModel) ([]*types.Log, error) {
receipts := make(types.Receipts, len(rctIPLDs)) receipts := make(types.Receipts, len(rctIPLDs))
for i, rctBytes := range rctIPLDs { for k, rctBytes := range rctIPLDs {
rct := new(types.Receipt) rct := new(types.Receipt)
if err := rct.UnmarshalBinary(rctBytes.Data); err != nil { if err := rct.UnmarshalBinary(rctBytes.Data); err != nil {
return nil, err return nil, err
} }
receipts[i] = rct
if logModels, ok := logIPLDs[rctCIDs[k].ID]; ok {
idx := 0
for logIdx, v := range logModels {
l := &types.Log{}
err := rlp.DecodeBytes(v.Data, l)
if err != nil {
return nil, err
}
l.Index = uint(logIdx)
rct.Logs[idx] = l
idx++
}
}
receipts[k] = rct
} }
err := receipts.DeriveFields(config, blockHash, blockNumber, txs) txns := make(types.Transactions, len(txnIPLDs))
for idx, txnBytes := range txnIPLDs {
txn := new(types.Transaction)
if err := txn.UnmarshalBinary(txnBytes.Data); err != nil {
return nil, err
}
txns[idx] = txn
}
receipts, err := deriveFields(receipts, config, blockHash, blockNumber, txns, txnCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var unfilteredLogs []*types.Log var logs []*types.Log
for _, receipt := range receipts { for _, r := range receipts {
unfilteredLogs = append(unfilteredLogs, receipt.Logs...) logs = append(logs, r.Logs...)
} }
adders := make([]common.Address, len(filter.LogAddresses)) return logs, nil
for i, addr := range filter.LogAddresses { }
adders[i] = common.HexToAddress(addr)
}
topics := make([][]common.Hash, len(filter.Topics)) func deriveFields(rs types.Receipts, config *params.ChainConfig, hash common.Hash, number uint64, txs types.Transactions,
for i, v := range filter.Topics { txnCIDs []models.TxModel) (types.Receipts, error) {
topics[i] = make([]common.Hash, len(v))
for j, topic := range v { signer := types.MakeSigner(config, new(big.Int).SetUint64(number))
topics[i][j] = common.HexToHash(topic) for i := 0; i < len(rs); i++ {
// The transaction type and hash can be retrieved from the transaction itself
rs[i].Type = txs[i].Type()
rs[i].TxHash = txs[i].Hash()
// block location fields
rs[i].BlockHash = hash
rs[i].BlockNumber = new(big.Int).SetUint64(number)
rs[i].TransactionIndex = uint(txnCIDs[i].Index)
// The contract address can be derived from the transaction itself
if txs[i].To() == nil {
// Deriving the signer is expensive, only do if it's actually needed
from, _ := types.Sender(signer, txs[i])
rs[i].ContractAddress = crypto.CreateAddress(from, txs[i].Nonce())
}
// The used gas can be calculated based on previous r
if i == 0 {
rs[i].GasUsed = rs[i].CumulativeGasUsed
} else {
rs[i].GasUsed = rs[i].CumulativeGasUsed - rs[i-1].CumulativeGasUsed
}
for j := 0; j < len(rs[i].Logs); j++ {
}
for j := 0; j < len(rs[i].Logs); j++ {
rs[i].Logs[j].BlockNumber = number
rs[i].Logs[j].BlockHash = hash
rs[i].Logs[j].TxHash = rs[i].TxHash
rs[i].Logs[j].TxIndex = uint(txnCIDs[i].Index)
} }
} }
logs := filterLogs(unfilteredLogs, nil, nil, adders, topics) return rs, nil
return logs, nil
} }
func includes(addresses []common.Address, a common.Address) bool { func includes(addresses []common.Address, a common.Address) bool {

View File

@ -209,44 +209,40 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
return results, tx.Select(&results, pgStr, args...) return results, tx.Select(&results, pgStr, args...)
} }
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided func topicsQuery(id int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}, int) {
// filter parameters and correspond to the provided tx ids for i, topicSet := range topics {
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) { if len(topicSet) == 0 {
log.Debug("retrieving receipt cids for header id ", headerID) continue
args := make([]interface{}, 0, 4) }
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, if !first {
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts pgStr += " AND"
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids } else {
WHERE receipt_cids.tx_id = transaction_cids.id first = false
AND transaction_cids.header_id = header_cids.id }
AND header_cids.id = $1` pgStr += fmt.Sprintf(` eth.log_cids.topic%d = ANY ($%d)`, i, id)
id := 2 args = append(args, pq.Array(topicSet))
args = append(args, headerID) id++
}
return pgStr, args, id
}
func retrieveFilteredRctQuery(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter, trxIds []int64) (string, []interface{}, int) {
rctCond := " AND (receipt_cids.id = ANY ( "
logQuery := "SELECT receipt_id FROM eth.log_cids WHERE"
if len(rctFilter.LogAddresses) > 0 { if len(rctFilter.LogAddresses) > 0 {
// Filter on log contract addresses if there are any // Filter on log contract addresses if there are any
pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id) pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, id)
args = append(args, pq.Array(rctFilter.LogAddresses)) args = append(args, pq.Array(rctFilter.LogAddresses))
id++ id++
// Filter on topics if there are any // Filter on topics if there are any
if hasTopics(rctFilter.Topics) { if hasTopics(rctFilter.Topics) {
pgStr += " AND (" pgStr, args, id = topicsQuery(id, rctFilter.Topics, args, pgStr, false)
first := true
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")"
} }
pgStr += ")" pgStr += ")"
// Filter on txIDs if there are any and we are matching txs // Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 { if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
@ -256,20 +252,8 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
} else { // If there are no contract addresses to filter on } else { // If there are no contract addresses to filter on
// Filter on topics if there are any // Filter on topics if there are any
if hasTopics(rctFilter.Topics) { if hasTopics(rctFilter.Topics) {
pgStr += " AND ((" pgStr += rctCond + logQuery
first := true pgStr, args, id = topicsQuery(id, rctFilter.Topics, args, pgStr, true)
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")" pgStr += ")"
// Filter on txIDs if there are any and we are matching txs // Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 { if rctFilter.MatchTxs && len(trxIds) > 0 {
@ -284,6 +268,43 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} }
} }
return pgStr, args, id
}
// RetrieveLogCID retrieves and returns all of the log cids at the provided receipt IDs
func (ecr *CIDRetriever) RetrieveLogCID(tx *sqlx.Tx, rcptIDs []int64) ([]models.LogsModel, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := `SELECT eth.log_cids.id, eth.log_cids.index, eth.log_cids.receipt_id, eth.log_cids.address, eth.log_cids.cid, eth.log_cids.mh_key,
eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3
FROM eth.log_cids WHERE eth.log_cids.receipt_id = ANY ( $1 )
ORDER BY log_cids.index`
args = append(args, pq.Array(rcptIDs))
logCIDs := make([]models.LogsModel, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
}
return logCIDs, nil
}
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.id = $1`
id := 2
args = append(args, headerID)
pgStr, args, id = retrieveFilteredRctQuery(id, pgStr, args, rctFilter, trxIds)
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0) receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
@ -295,8 +316,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
log.Debug("retrieving receipt cids for block ", blockNumber) log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5) args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.contract, receipt_cids.contract_hash,receipt_cids.log_root
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id` AND transaction_cids.header_id = header_cids.id`
@ -312,74 +332,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
id++ id++
} }
// TODO: Add the below filters when we have log index in DB. pgStr, args, id = retrieveFilteredRctQuery(id, pgStr, args, rctFilter, trxIds)
if true {
pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...)
}
if len(rctFilter.LogAddresses) > 0 {
// Filter on log contract addresses if there are any
pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id)
args = append(args, pq.Array(rctFilter.LogAddresses))
id++
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr += " AND ("
first := true
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")"
}
pgStr += ")"
// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else { // If there are no contract addresses to filter on
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr += " AND (("
first := true
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")"
// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else if rctFilter.MatchTxs && len(trxIds) > 0 {
// If there are no contract addresses or topics to filter on,
// Filter on txIDs if there are any and we are matching txs
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
}
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0) receiptCids := make([]models.ReceiptModel, 0)
@ -565,6 +518,15 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H
return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
} }
// RetrieveHeaderCIDByNumber returns the header for the given block number
func (ecr *CIDRetriever) RetrieveHeaderCIDByNumber(tx *sqlx.Tx, blockNumber int64) (models.HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockNumber)
pgStr := `SELECT * FROM eth.header_cids
WHERE block_number = $1`
var headerCID models.HeaderModel
return headerCID, tx.Get(&headerCID, pgStr, blockNumber)
}
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.TxModel, error) { func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID) log.Debug("retrieving tx cids for block id ", headerID)
@ -579,8 +541,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) (
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]models.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]models.ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs) log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.contract, receipt_cids.contract_hash
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts
FROM eth.receipt_cids, eth.transaction_cids FROM eth.receipt_cids, eth.transaction_cids
WHERE tx_id = ANY($1::INTEGER[]) WHERE tx_id = ANY($1::INTEGER[])
AND receipt_cids.tx_id = transaction_cids.id AND receipt_cids.tx_id = transaction_cids.id
@ -588,3 +549,14 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64)
var rctCIDs []models.ReceiptModel var rctCIDs []models.ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
} }
func (ecr *CIDRetriever) RetrieveTxCIDsByReceipt(tx *sqlx.Tx, txIDs []int64) ([]models.TxModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT transaction_cids.id,transaction_cids.mh_key,transaction_cids.cid,
transaction_cids.tx_hash,transaction_cids.index,transaction_cids.tx_type
FROM eth.transaction_cids WHERE eth.transaction_cids.id = ANY ( $1 )
ORDER BY transaction_cids.index`
var txnCIDs []models.TxModel
return txnCIDs, tx.Select(&txnCIDs, pgStr, pq.Array(txIDs))
}

View File

@ -165,10 +165,18 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off { if !receiptFilter.Off {
response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts))
for i, receipt := range payload.Receipts { for _, receipt := range payload.Receipts {
// topics is always length 4 // topics is always length 4
topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} topics := make([][]string, 4)
if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, payload.ReceiptMetaData[i].LogContracts, trxHashes) { contracts := make([]string, len(receipt.Logs))
for _, l := range receipt.Logs {
contracts = append(contracts, l.Address.String())
for idx, t := range l.Topics {
topics[idx] = append(topics[idx], t.String())
}
}
if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) {
receiptBuffer := new(bytes.Buffer) receiptBuffer := new(bytes.Buffer)
if err := receipt.EncodeRLP(receiptBuffer); err != nil { if err := receipt.EncodeRLP(receiptBuffer); err != nil {
return err return err

View File

@ -166,6 +166,33 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs
return rctIPLDs, nil return rctIPLDs, nil
} }
// FetchLogs fetches logs
func (f *IPLDFetcher) FetchLogs(tx *sqlx.Tx, logCIDs []models.LogsModel) (map[int64]map[int64]ipfs.BlockModel, error) {
log.Debug("fetching log iplds")
// receipt id and log index as key to store log IPLD at log index inside receipt.
logIPLDs := make(map[int64]map[int64]ipfs.BlockModel, len(logCIDs))
for _, l := range logCIDs {
logBytes, err := shared.FetchIPLDByMhKey(tx, l.MhKey)
if err != nil {
return nil, err
}
if v, ok := logIPLDs[l.ReceiptID]; ok {
v[l.Index] = ipfs.BlockModel{
Data: logBytes,
CID: l.CID,
}
continue
}
logIPLDs[l.ReceiptID] = map[int64]ipfs.BlockModel{
l.Index: {Data: logBytes, CID: l.CID},
}
}
return logIPLDs, nil
}
// FetchState fetches state nodes // FetchState fetches state nodes
func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) { func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds") log.Debug("fetching state iplds")

View File

@ -208,80 +208,42 @@ var (
} }
MockRctMeta = []models.ReceiptModel{ MockRctMeta = []models.ReceiptModel{
{ {
CID: "", CID: "",
MhKey: "", MhKey: "",
Topic0s: []string{
mockTopic11.String(),
},
Topic1s: []string{
mockTopic12.String(),
},
Contract: "", Contract: "",
ContractHash: "", ContractHash: "",
LogContracts: []string{
Address.String(),
},
}, },
{ {
CID: "", CID: "",
MhKey: "", MhKey: "",
Topic0s: []string{
mockTopic21.String(),
},
Topic1s: []string{
mockTopic22.String(),
},
Contract: "", Contract: "",
ContractHash: "", ContractHash: "",
LogContracts: []string{
AnotherAddress.String(),
},
}, },
{ {
CID: "", CID: "",
MhKey: "", MhKey: "",
Contract: ContractAddress.String(), Contract: ContractAddress.String(),
ContractHash: ContractHash, ContractHash: ContractHash,
LogContracts: []string{},
}, },
} }
MockRctMetaPostPublish = []models.ReceiptModel{ MockRctMetaPostPublish = []models.ReceiptModel{
{ {
CID: Rct1CID.String(), CID: Rct1CID.String(),
MhKey: Rct1MhKey, MhKey: Rct1MhKey,
Topic0s: []string{
mockTopic11.String(),
},
Topic1s: []string{
mockTopic12.String(),
},
Contract: "", Contract: "",
ContractHash: "", ContractHash: "",
LogContracts: []string{
Address.String(),
},
}, },
{ {
CID: Rct2CID.String(), CID: Rct2CID.String(),
MhKey: Rct2MhKey, MhKey: Rct2MhKey,
Topic0s: []string{
mockTopic21.String(),
},
Topic1s: []string{
mockTopic22.String(),
},
Contract: "", Contract: "",
ContractHash: "", ContractHash: "",
LogContracts: []string{
AnotherAddress.String(),
},
}, },
{ {
CID: Rct3CID.String(), CID: Rct3CID.String(),
MhKey: Rct3MhKey, MhKey: Rct3MhKey,
Contract: ContractAddress.String(), Contract: ContractAddress.String(),
ContractHash: ContractHash, ContractHash: ContractHash,
LogContracts: []string{},
}, },
} }
@ -524,46 +486,8 @@ var (
BaseFee: big.NewInt(params.InitialBaseFee), BaseFee: big.NewInt(params.InitialBaseFee),
} }
MockLondonTransactions, MockLondonReceipts, SenderAdd = createDynamicTransactionsAndReceipts(LondonBlockNum) MockLondonTransactions, MockLondonReceipts, _ = createDynamicTransactionsAndReceipts(LondonBlockNum)
MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie)) MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie))
MockLondonTrxMeta = []models.TxModel{
{
CID: "", // This is empty until we go to publish to ipfs
MhKey: "",
Src: SenderAdd.Hex(),
Dst: Address.String(),
Index: 0,
TxHash: MockLondonTransactions[0].Hash().String(),
Data: []byte{},
},
}
MockLondonRctMeta = []models.ReceiptModel{
{
CID: "",
MhKey: "",
Topic0s: []string{
mockTopic11.String(),
},
Topic1s: []string{
mockTopic12.String(),
},
Contract: "",
ContractHash: "",
LogContracts: []string{
Address.String(),
},
},
}
MockConvertedLondonPayload = eth.ConvertedPayload{
TotalDifficulty: MockLondonBlock.Difficulty(),
Block: MockLondonBlock,
Receipts: MockLondonReceipts,
TxMetaData: MockLondonTrxMeta,
ReceiptMetaData: MockLondonRctMeta,
StorageNodes: MockStorageNodes,
StateNodes: MockStateNodes,
}
) )
func createNewBlock(header *types.Header, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt, hasher types.TrieHasher) *types.Block { func createNewBlock(header *types.Header, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt, hasher types.TrieHasher) *types.Block {
@ -614,7 +538,7 @@ func createDynamicTransactionsAndReceipts(blockNumber *big.Int) (types.Transacti
// https://github.com/ethereum/go-ethereum/pull/22806 // https://github.com/ethereum/go-ethereum/pull/22806
mockReceipt1 := &types.Receipt{ mockReceipt1 := &types.Receipt{
Type: types.DynamicFeeTxType, Type: types.DynamicFeeTxType,
PostState: common.HexToHash("0x1").Bytes(), PostState: common.HexToHash("0x0").Bytes(),
Status: types.ReceiptStatusSuccessful, Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 50, CumulativeGasUsed: 50,
Logs: []*types.Log{}, Logs: []*types.Log{},