Fix getLogs query (avoid repeated execution, separate tx, and improve performance) #250

Merged
telackey merged 5 commits from telackey/249 into v5 2023-06-16 16:49:57 +00:00
4 changed files with 46 additions and 37 deletions
Showing only changes of commit c153fe6249 - Show all commits

View File

@ -677,6 +677,8 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit filters.FilterCriteri
}
func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log, error) {
// TODO: WIP, add to config.
limit := int64(100)
// TODO: this can be optimized away from using the old cid retriever and ipld fetcher interfaces
// Convert FilterQuery into ReceiptFilter
addrStrs := make([]string, len(crit.Addresses))
@ -720,7 +722,7 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
// If we have a blockHash to filter on, fire off single retrieval query
if crit.BlockHash != nil {
var filteredLogs []LogResult
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, 0, crit.BlockHash)
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, -1, -1, crit.BlockHash, limit)
if err != nil {
return nil, err
}
@ -732,41 +734,40 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
// Otherwise, create block range from criteria
// nil values are filled in; to request a single block have both ToBlock and FromBlock equal that number
startingBlock := crit.FromBlock
endingBlock := crit.ToBlock
if startingBlock == nil {
startingBlock = common.Big0
// geth uses LatestBlockNumber as the default value for both begin and end, so we do the same
lastBlockNumber, err := pea.B.Retriever.RetrieveLastBlockNumber()
if err != nil {
return nil, err
}
if endingBlock == nil {
var endingBlockInt int64
endingBlockInt, err = pea.B.Retriever.RetrieveLastBlockNumber()
start := lastBlockNumber
if crit.FromBlock != nil {
start, err = pea.B.NormalizeBlockNumber(rpc.BlockNumber(crit.FromBlock.Int64()))
if err != nil {
return nil, err
}
endingBlock = big.NewInt(endingBlockInt)
}
start := startingBlock.Int64()
end := endingBlock.Int64()
var logs []*types.Log
for i := start; i <= end; i++ {
var filteredLogs []LogResult
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, i, nil)
end := lastBlockNumber
if crit.ToBlock != nil {
end, err = pea.B.NormalizeBlockNumber(rpc.BlockNumber(crit.ToBlock.Int64()))
if err != nil {
return nil, err
}
var logCIDs []*types.Log
logCIDs, err = decomposeLogs(filteredLogs)
if err != nil {
return nil, err
}
logs = append(logs, logCIDs...)
}
return logs, err // need to return err variable so that we return the err = tx.Commit() assignment in the defer
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogs(tx, filter, start, end, nil, limit)
if err != nil {
return nil, err
}
var logCIDs []*types.Log
logCIDs, err = decomposeLogs(filteredLogs)
if err != nil {
return nil, err
}
return logCIDs, err // need to return err variable so that we return the err = tx.Commit() assignment in the defer
}
/*

View File

@ -125,7 +125,7 @@ func (b *Backend) ChainDb() ethdb.Database {
return b.EthDB
}
func (b *Backend) normalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, error) {
func (b *Backend) NormalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, error) {
var err error
number := blockNumber.Int64()
if blockNumber == rpc.LatestBlockNumber {
@ -151,7 +151,7 @@ func (b *Backend) normalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, erro
// HeaderByNumber gets the canonical header for the provided block number
func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Header, error) {
number, err := b.normalizeBlockNumber(blockNumber)
number, err := b.NormalizeBlockNumber(blockNumber)
if err != nil {
return nil, err
}
@ -266,7 +266,7 @@ func (b *Backend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.Blo
// BlockByNumber returns the requested canonical block
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
number, err := b.normalizeBlockNumber(blockNumber)
number, err := b.NormalizeBlockNumber(blockNumber)
if err != nil {
return nil, err
}

View File

@ -236,24 +236,32 @@ func (r *Retriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter
// RetrieveFilteredLogs retrieves and returns all the log CIDs provided blockHeight or blockHash that conform to the provided
// filter parameters.
func (r *Retriever) RetrieveFilteredLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash) ([]LogResult, error) {
func (r *Retriever) RetrieveFilteredLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64, blockHash *common.Hash, limit int64) ([]LogResult, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := RetrieveFilteredLogs
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
if startBlockNumber >= 0 {
pgStr += fmt.Sprintf(` AND log_cids.block_number >= $%d`, id)
args = append(args, startBlockNumber)
id++
}
if stopBlockNumber >= 0 {
pgStr += fmt.Sprintf(` AND log_cids.block_number <= $%d`, id)
args = append(args, stopBlockNumber)
id++
}
if blockHash != nil {
pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id)
pgStr += fmt.Sprintf(` AND log_cids.header_id = $%d`, id)
args = append(args, blockHash.String())
id++
}
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`
if limit > 0 {
pgStr += fmt.Sprintf(` LIMIT %d`, limit)
}
logs := make([]LogResult, 0)
err := tx.Select(&logs, pgStr, args...)

View File

@ -122,8 +122,8 @@ const (
RetrieveFilteredLogs = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, header_cids.block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, ipld.blocks
blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, eth.log_cids.header_id AS block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, ipld.blocks
WHERE eth.log_cids.rct_id = receipt_cids.tx_id
AND eth.log_cids.header_id = eth.receipt_cids.header_id
AND eth.log_cids.block_number = eth.receipt_cids.block_number
@ -132,8 +132,8 @@ const (
AND receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
AND transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number`
AND transaction_cids.header_id = log_cids.header_id
AND transaction_cids.block_number = log_cids.block_number`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, val, block_number, removed, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)`
RetrieveStorageAndRLPByAddressHashAndLeafKeyAndBlockHashPgStr = `
SELECT cid, val, storage.block_number, removed, state_leaf_removed, data