Fix getLogs query (avoid repeated execution, separate tx, and improve performance) #250

Merged
telackey merged 5 commits from telackey/249 into v5 2023-06-16 16:49:57 +00:00
7 changed files with 107 additions and 76 deletions

View File

@ -325,6 +325,7 @@ func init() {
viper.BindPFlag("ethereum.forwardEthCalls", serveCmd.PersistentFlags().Lookup("eth-forward-eth-calls"))
viper.BindPFlag("ethereum.forwardGetStorageAt", serveCmd.PersistentFlags().Lookup("eth-forward-get-storage-at"))
viper.BindPFlag("ethereum.proxyOnError", serveCmd.PersistentFlags().Lookup("eth-proxy-on-error"))
viper.BindPFlag("ethereum.getLogsBlockLimit", serveCmd.PersistentFlags().Lookup("eth-getlogs-block-limit"))
// groupcache flags
viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled"))

View File

@ -41,7 +41,6 @@ import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
"github.com/cerc-io/ipld-eth-server/v5/pkg/shared"
ipld_direct_state "github.com/cerc-io/ipld-eth-statedb/direct_by_leaf"
)
@ -57,10 +56,11 @@ const APIVersion = "0.0.1"
type APIConfig struct {
// Proxy node for forwarding cache misses
SupportsStateDiff bool // Whether the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss
ForwardEthCalls bool // if true, forward eth_call calls directly to the configured proxy node
ForwardGetStorageAt bool // if true, forward eth_getStorageAt calls directly to the configured proxy node
ProxyOnError bool // turn on regular proxy fall-through on errors; needed to test difference between direct and indirect fall-through
SupportsStateDiff bool // Whether the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss
ForwardEthCalls bool // if true, forward eth_call calls directly to the configured proxy node
ForwardGetStorageAt bool // if true, forward eth_getStorageAt calls directly to the configured proxy node
ProxyOnError bool // turn on regular proxy fall-through on errors; needed to test difference between direct and indirect fall-through
GetLogsBlockLimit int64 // the maximum size of the block range to use in GetLogs
StateDiffTimeout time.Duration
}
@ -700,27 +700,10 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
Topics: topicStrSets,
}
// Begin tx
tx, err := pea.B.DB.Beginx()
if err != nil {
return nil, err
}
// we must avoid overshadowing `err` so that we update the value of the variable inside the defer
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// If we have a blockHash to filter on, fire off single retrieval query
if crit.BlockHash != nil {
var filteredLogs []LogResult
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, 0, crit.BlockHash)
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogsForBlock(pea.B.DB, filter, crit.BlockHash)
if err != nil {
return nil, err
}
@ -732,41 +715,48 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
// Otherwise, create block range from criteria
// nil values are filled in; to request a single block have both ToBlock and FromBlock equal that number
startingBlock := crit.FromBlock
endingBlock := crit.ToBlock
if startingBlock == nil {
startingBlock = common.Big0
// geth uses LatestBlockNumber as the default value for both begin and end, so we do the same
lastBlockNumber, err := pea.B.Retriever.RetrieveLastBlockNumber()
if err != nil {
return nil, err
}
if endingBlock == nil {
var endingBlockInt int64
endingBlockInt, err = pea.B.Retriever.RetrieveLastBlockNumber()
start := lastBlockNumber
if crit.FromBlock != nil {
start, err = pea.B.NormalizeBlockNumber(rpc.BlockNumber(crit.FromBlock.Int64()))
if err != nil {
return nil, err
}
endingBlock = big.NewInt(endingBlockInt)
}
start := startingBlock.Int64()
end := endingBlock.Int64()
var logs []*types.Log
for i := start; i <= end; i++ {
var filteredLogs []LogResult
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, i, nil)
end := lastBlockNumber
if crit.ToBlock != nil {
end, err = pea.B.NormalizeBlockNumber(rpc.BlockNumber(crit.ToBlock.Int64()))
if err != nil {
return nil, err
}
var logCIDs []*types.Log
logCIDs, err = decomposeLogs(filteredLogs)
if err != nil {
return nil, err
}
logs = append(logs, logCIDs...)
}
return logs, err // need to return err variable so that we return the err = tx.Commit() assignment in the defer
if pea.config.GetLogsBlockLimit > 0 && (end-start) > pea.config.GetLogsBlockLimit {
return nil, errors.New(
fmt.Sprintf(
"Invalid eth_getLogs request. 'fromBlock'-'toBlock' range too large. Max range: %d",
pea.config.GetLogsBlockLimit,
))
}
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogsForBlockRange(pea.B.DB, filter, start, end)
if err != nil {
return nil, err
}
var logCIDs []*types.Log
logCIDs, err = decomposeLogs(filteredLogs)
if err != nil {
return nil, err
}
return logCIDs, err // need to return err variable so that we return the err = tx.Commit() assignment in the defer
}
/*

View File

@ -125,7 +125,7 @@ func (b *Backend) ChainDb() ethdb.Database {
return b.EthDB
}
func (b *Backend) normalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, error) {
func (b *Backend) NormalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, error) {
var err error
number := blockNumber.Int64()
if blockNumber == rpc.LatestBlockNumber {
@ -151,7 +151,7 @@ func (b *Backend) normalizeBlockNumber(blockNumber rpc.BlockNumber) (int64, erro
// HeaderByNumber gets the canonical header for the provided block number
func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Header, error) {
number, err := b.normalizeBlockNumber(blockNumber)
number, err := b.NormalizeBlockNumber(blockNumber)
if err != nil {
return nil, err
}
@ -266,7 +266,7 @@ func (b *Backend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.Blo
// BlockByNumber returns the requested canonical block
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
number, err := b.normalizeBlockNumber(blockNumber)
number, err := b.NormalizeBlockNumber(blockNumber)
if err != nil {
return nil, err
}

View File

@ -223,7 +223,7 @@ func (r *Retriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter
}
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`
pgStr += ` ORDER BY log_cids.block_number, log_cids.index`
logs := make([]LogResult, 0)
err := tx.Select(&logs, pgStr, args...)
@ -234,29 +234,41 @@ func (r *Retriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter
return logs, nil
}
// RetrieveFilteredLogs retrieves and returns all the log CIDs provided blockHeight or blockHash that conform to the provided
// filter parameters.
func (r *Retriever) RetrieveFilteredLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash) ([]LogResult, error) {
// RetrieveFilteredLogsForBlock retrieves and returns all the log CIDs for the block that conform to the provided filter parameters.
func (r *Retriever) RetrieveFilteredLogsForBlock(db *sqlx.DB, rctFilter ReceiptFilter, blockHash *common.Hash) ([]LogResult, error) {
return r.retrieveFilteredLogs(db, rctFilter, -1, -1, blockHash)
}
// RetrieveFilteredLogsForBlockRange retrieves and returns all the log CIDs for the blocks in the range that conform
// to the provided filter parameters.
func (r *Retriever) RetrieveFilteredLogsForBlockRange(db *sqlx.DB, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64) ([]LogResult, error) {
return r.retrieveFilteredLogs(db, rctFilter, startBlockNumber, stopBlockNumber, nil)
}
// retrieveFilteredLogs retrieves all the log CIDs either for a single block (by hash) or range of blocks (by number) which
// conform to the provided filter parameters.
func (r *Retriever) retrieveFilteredLogs(db *sqlx.DB, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64, blockHash *common.Hash) ([]LogResult, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := RetrieveFilteredLogs
var pgStr string
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
id++
}
if blockHash != nil {
pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id)
pgStr = RetrieveFilteredLogsSingle
args = append(args, blockHash.String())
id++
} else {
pgStr = RetrieveFilteredLogsRange
args = append(args, startBlockNumber)
id++
args = append(args, stopBlockNumber)
id++
}
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`
pgStr += ` ORDER BY eth.log_cids.block_number, eth.log_cids.index`
logs := make([]LogResult, 0)
err := tx.Select(&logs, pgStr, args...)
err := db.Select(&logs, pgStr, args...)
if err != nil {
return nil, err
}

View File

@ -119,21 +119,36 @@ const (
AND log_cids.cid = blocks.key
AND log_cids.block_number = blocks.block_number
AND receipt_cids.header_id = $1`
RetrieveFilteredLogs = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
RetrieveFilteredLogsRange = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, header_cids.block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, ipld.blocks
WHERE eth.log_cids.rct_id = receipt_cids.tx_id
AND eth.log_cids.header_id = eth.receipt_cids.header_id
AND eth.log_cids.block_number = eth.receipt_cids.block_number
AND log_cids.cid = blocks.key
AND log_cids.block_number = blocks.block_number
AND receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
AND transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number`
ipld.blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, log_cids.header_id AS block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, ipld.blocks
WHERE eth.log_cids.block_number >= $1 AND eth.log_cids.block_number <= $2
AND eth.log_cids.header_id IN (SELECT canonical_header_hash(block_number) from eth.header_cids where eth.header_cids.block_number >= $1 AND header_cids.block_number <= $2)
AND eth.transaction_cids.block_number = eth.log_cids.block_number
AND eth.transaction_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.block_number = eth.log_cids.block_number
AND eth.receipt_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.tx_id = eth.log_cids.rct_id
AND eth.receipt_cids.tx_id = eth.transaction_cids.tx_hash
AND ipld.blocks.block_number = eth.log_cids.block_number
AND ipld.blocks.key = eth.log_cids.cid`
RetrieveFilteredLogsSingle = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
ipld.blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, log_cids.header_id AS block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, ipld.blocks
WHERE eth.log_cids.header_id = $1
AND eth.transaction_cids.block_number = eth.log_cids.block_number
AND eth.transaction_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.block_number = eth.log_cids.block_number
AND eth.receipt_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.tx_id = eth.log_cids.rct_id
AND eth.receipt_cids.tx_id = eth.transaction_cids.tx_hash
Review

We should also add an AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number)) statement to ensure only canonical results (based on internal weighing) are returned.

We should also add an `AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number))` statement to ensure only canonical results (based on internal weighing) are returned.
Review

Hmm, I need to examine that quite carefully, as it may have the side-effect of defeating the indexing.

Hmm, I need to examine that quite carefully, as it may have the side-effect of defeating the indexing.
Review

Yes, in a quick test against carrion using a (fairly) narrow 500 block range it moved it from an already too slow 4s to whopping 67s.

It's not apples to apples because I had to tweak the query for v4, but the idea is similar.

This could be quite tricky. Also, is there any particular reason we are opening an explicit TX to do this query? Unless we are dead certain we need one, we should remove it.

Yes, in a quick test against carrion using a (fairly) narrow 500 block range it moved it from an already too slow 4s to whopping 67s. It's not apples to apples because I had to tweak the query for v4, but the idea is similar. This could be quite tricky. Also, is there any particular reason we are opening an explicit TX to do this query? Unless we are dead certain we need one, we should remove it.
Review

We were originally opening a TX because the query was a hack that iterated over the range and repeated the same query used for a single blockheight for each blockheight. Now that that is fixed, getting rid of the tx sounds good to me. If canonical_header_hash is having that impact we need to return the higher level story of removing/preventing reorg/non-canonical data in the database.

We were originally opening a TX because the query was a hack that iterated over the range and repeated the same query used for a single blockheight for each blockheight. Now that that is fixed, getting rid of the tx sounds good to me. If `canonical_header_hash` is having that impact we need to return the higher level story of removing/preventing reorg/non-canonical data in the database.
Review
Curious what the performance imapact is on smaller queries like https://github.com/cerc-io/ipld-eth-server/blob/telackey/249/pkg/eth/sql.go#L98 and https://github.com/cerc-io/ipld-eth-server/blob/telackey/249/pkg/eth/sql.go#L162.
AND ipld.blocks.block_number = eth.log_cids.block_number
AND ipld.blocks.key = eth.log_cids.cid`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, val, block_number, removed, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)`
RetrieveStorageAndRLPByAddressHashAndLeafKeyAndBlockHashPgStr = `
SELECT cid, val, storage.block_number, removed, state_leaf_removed, data

View File

@ -56,6 +56,7 @@ const (
ETH_FORWARD_ETH_CALLS = "ETH_FORWARD_ETH_CALLS"
ETH_FORWARD_GET_STORAGE_AT = "ETH_FORWARD_GET_STORAGE_AT"
ETH_PROXY_ON_ERROR = "ETH_PROXY_ON_ERROR"
ETH_GETLOGS_BLOCK_LIMIT = "ETH_GETLOGS_BLOCK_LIMIT"
VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
@ -107,6 +108,7 @@ type Config struct {
ForwardEthCalls bool
ForwardGetStorageAt bool
ProxyOnError bool
GetLogsBlockLimit int64
NodeNetworkID string
// Cache configuration.
@ -134,6 +136,7 @@ func NewConfig() (*Config, error) {
viper.BindEnv("ethereum.forwardEthCalls", ETH_FORWARD_ETH_CALLS)
viper.BindEnv("ethereum.forwardGetStorageAt", ETH_FORWARD_GET_STORAGE_AT)
viper.BindEnv("ethereum.proxyOnError", ETH_PROXY_ON_ERROR)
viper.BindEnv("ethereum.getLogsBlockLimit", ETH_GETLOGS_BLOCK_LIMIT)
viper.BindEnv("log.file", "LOG_FILE")
viper.BindEnv("log.level", "LOG_LEVEL")
@ -152,6 +155,12 @@ func NewConfig() (*Config, error) {
c.ProxyOnError = viper.GetBool("ethereum.proxyOnError")
c.EthHttpEndpoint = ethHTTPEndpoint
if viper.IsSet("ethereum.getLogsBlockLimit") {
c.GetLogsBlockLimit = viper.GetInt64("ethereum.getLogsBlockLimit")
} else {
c.GetLogsBlockLimit = 500
}
// websocket server
wsEnabled := viper.GetBool("server.ws")
if wsEnabled {

View File

@ -72,6 +72,8 @@ type Service struct {
forwardEthCalls bool
// whether to forward eth_getStorageAt directly to proxy node
forwardGetStorageAt bool
// the maximum size of the block range to use in GetLogs
getLogsBlockLimit int64
// whether to forward all calls to proxy node if they throw an error locally
proxyOnError bool
// eth node network id
@ -88,6 +90,7 @@ func NewServer(settings *Config) (Server, error) {
sap.stateDiffTimeout = settings.StateDiffTimeout
sap.forwardEthCalls = settings.ForwardEthCalls
sap.forwardGetStorageAt = settings.ForwardGetStorageAt
sap.getLogsBlockLimit = settings.GetLogsBlockLimit
sap.proxyOnError = settings.ProxyOnError
sap.nodeNetworkId = settings.NodeNetworkID
var err error
@ -128,6 +131,7 @@ func (sap *Service) APIs() []rpc.API {
ForwardGetStorageAt: sap.forwardGetStorageAt,
ProxyOnError: sap.proxyOnError,
StateDiffTimeout: sap.stateDiffTimeout,
GetLogsBlockLimit: sap.getLogsBlockLimit,
}
ethAPI, err := eth.NewPublicEthAPI(sap.backend, sap.client, conf)
if err != nil {