Simplify getLogs for performance and readability.

This commit is contained in:
Thomas E Lackey 2023-06-15 20:56:45 -05:00
parent 691d35a5df
commit 1e438e49e0
6 changed files with 81 additions and 59 deletions

View File

@ -325,6 +325,7 @@ func init() {
viper.BindPFlag("ethereum.forwardEthCalls", serveCmd.PersistentFlags().Lookup("eth-forward-eth-calls")) viper.BindPFlag("ethereum.forwardEthCalls", serveCmd.PersistentFlags().Lookup("eth-forward-eth-calls"))
viper.BindPFlag("ethereum.forwardGetStorageAt", serveCmd.PersistentFlags().Lookup("eth-forward-get-storage-at")) viper.BindPFlag("ethereum.forwardGetStorageAt", serveCmd.PersistentFlags().Lookup("eth-forward-get-storage-at"))
viper.BindPFlag("ethereum.proxyOnError", serveCmd.PersistentFlags().Lookup("eth-proxy-on-error")) viper.BindPFlag("ethereum.proxyOnError", serveCmd.PersistentFlags().Lookup("eth-proxy-on-error"))
viper.BindPFlag("ethereum.getLogsBlockLimit", serveCmd.PersistentFlags().Lookup("eth-getlogs-block-limit"))
// groupcache flags // groupcache flags
viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled")) viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled"))

View File

@ -41,7 +41,6 @@ import (
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/cerc-io/ipld-eth-server/v5/pkg/log" "github.com/cerc-io/ipld-eth-server/v5/pkg/log"
"github.com/cerc-io/ipld-eth-server/v5/pkg/shared"
ipld_direct_state "github.com/cerc-io/ipld-eth-statedb/direct_by_leaf" ipld_direct_state "github.com/cerc-io/ipld-eth-statedb/direct_by_leaf"
) )
@ -57,10 +56,11 @@ const APIVersion = "0.0.1"
type APIConfig struct { type APIConfig struct {
// Proxy node for forwarding cache misses // Proxy node for forwarding cache misses
SupportsStateDiff bool // Whether the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss SupportsStateDiff bool // Whether the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss
ForwardEthCalls bool // if true, forward eth_call calls directly to the configured proxy node ForwardEthCalls bool // if true, forward eth_call calls directly to the configured proxy node
ForwardGetStorageAt bool // if true, forward eth_getStorageAt calls directly to the configured proxy node ForwardGetStorageAt bool // if true, forward eth_getStorageAt calls directly to the configured proxy node
ProxyOnError bool // turn on regular proxy fall-through on errors; needed to test difference between direct and indirect fall-through ProxyOnError bool // turn on regular proxy fall-through on errors; needed to test difference between direct and indirect fall-through
GetLogsBlockLimit int64 // the maximum size of the block range to use in GetLogs
StateDiffTimeout time.Duration StateDiffTimeout time.Duration
} }
@ -677,8 +677,6 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit filters.FilterCriteri
} }
func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log, error) { func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log, error) {
// TODO: WIP, add to config.
limit := int64(100)
// TODO: this can be optimized away from using the old cid retriever and ipld fetcher interfaces // TODO: this can be optimized away from using the old cid retriever and ipld fetcher interfaces
// Convert FilterQuery into ReceiptFilter // Convert FilterQuery into ReceiptFilter
addrStrs := make([]string, len(crit.Addresses)) addrStrs := make([]string, len(crit.Addresses))
@ -702,27 +700,10 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
Topics: topicStrSets, Topics: topicStrSets,
} }
// Begin tx
tx, err := pea.B.DB.Beginx()
if err != nil {
return nil, err
}
// we must avoid overshadowing `err` so that we update the value of the variable inside the defer
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// If we have a blockHash to filter on, fire off single retrieval query // If we have a blockHash to filter on, fire off single retrieval query
if crit.BlockHash != nil { if crit.BlockHash != nil {
var filteredLogs []LogResult var filteredLogs []LogResult
filteredLogs, err = pea.B.Retriever.RetrieveFilteredLogs(tx, filter, -1, -1, crit.BlockHash, limit) filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogsForBlock(pea.B.DB, filter, crit.BlockHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -756,7 +737,15 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
} }
} }
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogs(tx, filter, start, end, nil, limit) if pea.config.GetLogsBlockLimit > 0 && (end-start) > pea.config.GetLogsBlockLimit {
return nil, errors.New(
fmt.Sprintf(
"Invalid eth_getLogs request. 'fromBlock'-'toBlock' range too large. Max range: %d",
pea.config.GetLogsBlockLimit,
))
}
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLogsForBlockRange(pea.B.DB, filter, start, end)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -223,7 +223,7 @@ func (r *Retriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter
} }
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter) pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index` pgStr += ` ORDER BY log_cids.block_number, log_cids.index`
logs := make([]LogResult, 0) logs := make([]LogResult, 0)
err := tx.Select(&logs, pgStr, args...) err := tx.Select(&logs, pgStr, args...)
@ -234,37 +234,41 @@ func (r *Retriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter
return logs, nil return logs, nil
} }
// RetrieveFilteredLogs retrieves and returns all the log CIDs provided blockHeight or blockHash that conform to the provided // RetrieveFilteredLogsForBlock retrieves and returns all the log CIDs for the block that conform to the provided filter parameters.
// filter parameters. func (r *Retriever) RetrieveFilteredLogsForBlock(db *sqlx.DB, rctFilter ReceiptFilter, blockHash *common.Hash) ([]LogResult, error) {
func (r *Retriever) RetrieveFilteredLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64, blockHash *common.Hash, limit int64) ([]LogResult, error) { return r.retrieveFilteredLogs(db, rctFilter, -1, -1, blockHash)
}
// RetrieveFilteredLogsForBlockRange retrieves and returns all the log CIDs for the blocks in the range that conform
// to the provided filter parameters.
func (r *Retriever) RetrieveFilteredLogsForBlockRange(db *sqlx.DB, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64) ([]LogResult, error) {
return r.retrieveFilteredLogs(db, rctFilter, startBlockNumber, stopBlockNumber, nil)
}
// retrieveFilteredLogs retrieves all the log CIDs either for a single block (by hash) or range of blocks (by number) which
// conform to the provided filter parameters.
func (r *Retriever) retrieveFilteredLogs(db *sqlx.DB, rctFilter ReceiptFilter, startBlockNumber int64, stopBlockNumber int64, blockHash *common.Hash) ([]LogResult, error) {
log.Debug("retrieving log cids for receipt ids") log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4) args := make([]interface{}, 0, 4)
pgStr := RetrieveFilteredLogs var pgStr string
id := 1 id := 1
if startBlockNumber >= 0 { if blockHash != nil {
pgStr += fmt.Sprintf(` AND log_cids.block_number >= $%d`, id) pgStr = RetrieveFilteredLogsSingle
args = append(args, blockHash.String())
id++
} else {
pgStr = RetrieveFilteredLogsRange
args = append(args, startBlockNumber) args = append(args, startBlockNumber)
id++ id++
}
if stopBlockNumber >= 0 {
pgStr += fmt.Sprintf(` AND log_cids.block_number <= $%d`, id)
args = append(args, stopBlockNumber) args = append(args, stopBlockNumber)
id++ id++
} }
if blockHash != nil {
pgStr += fmt.Sprintf(` AND log_cids.header_id = $%d`, id)
args = append(args, blockHash.String())
id++
}
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter) pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index` pgStr += ` ORDER BY eth.log_cids.block_number, eth.log_cids.index`
if limit > 0 {
pgStr += fmt.Sprintf(` LIMIT %d`, limit)
}
logs := make([]LogResult, 0) logs := make([]LogResult, 0)
err := tx.Select(&logs, pgStr, args...) err := db.Select(&logs, pgStr, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -119,21 +119,36 @@ const (
AND log_cids.cid = blocks.key AND log_cids.cid = blocks.key
AND log_cids.block_number = blocks.block_number AND log_cids.block_number = blocks.block_number
AND receipt_cids.header_id = $1` AND receipt_cids.header_id = $1`
RetrieveFilteredLogs = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id, RetrieveFilteredLogsRange = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3, eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index, eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, header_cids.block_hash ipld.blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, log_cids.header_id AS block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, ipld.blocks FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, ipld.blocks
WHERE eth.log_cids.rct_id = receipt_cids.tx_id WHERE eth.log_cids.block_number >= $1 AND eth.log_cids.block_number <= $2
AND eth.log_cids.header_id = eth.receipt_cids.header_id AND eth.log_cids.header_id IN (SELECT canonical_header_hash(block_number) from eth.header_cids where eth.header_cids.block_number >= $1 AND header_cids.block_number <= $2)
AND eth.log_cids.block_number = eth.receipt_cids.block_number AND eth.transaction_cids.block_number = eth.log_cids.block_number
AND log_cids.cid = blocks.key AND eth.transaction_cids.header_id = eth.log_cids.header_id
AND log_cids.block_number = blocks.block_number AND eth.receipt_cids.block_number = eth.log_cids.block_number
AND receipt_cids.tx_id = transaction_cids.tx_hash AND eth.receipt_cids.header_id = eth.log_cids.header_id
AND receipt_cids.header_id = transaction_cids.header_id AND eth.receipt_cids.tx_id = eth.log_cids.rct_id
AND receipt_cids.block_number = transaction_cids.block_number AND eth.receipt_cids.tx_id = eth.transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash AND ipld.blocks.block_number = eth.log_cids.block_number
AND transaction_cids.block_number = header_cids.block_number` AND ipld.blocks.key = eth.log_cids.cid`
RetrieveFilteredLogsSingle = `SELECT CAST(eth.log_cids.block_number as TEXT), eth.log_cids.cid, eth.log_cids.index, eth.log_cids.rct_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
ipld.blocks.data, eth.receipt_cids.cid AS rct_cid, eth.receipt_cids.post_status, log_cids.header_id AS block_hash
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, ipld.blocks
WHERE eth.log_cids.header_id = $1
AND eth.transaction_cids.block_number = eth.log_cids.block_number
AND eth.transaction_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.block_number = eth.log_cids.block_number
AND eth.receipt_cids.header_id = eth.log_cids.header_id
AND eth.receipt_cids.tx_id = eth.log_cids.rct_id
AND eth.receipt_cids.tx_id = eth.transaction_cids.tx_hash
AND ipld.blocks.block_number = eth.log_cids.block_number
AND ipld.blocks.key = eth.log_cids.cid`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, val, block_number, removed, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)` RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, val, block_number, removed, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)`
RetrieveStorageAndRLPByAddressHashAndLeafKeyAndBlockHashPgStr = ` RetrieveStorageAndRLPByAddressHashAndLeafKeyAndBlockHashPgStr = `
SELECT cid, val, storage.block_number, removed, state_leaf_removed, data SELECT cid, val, storage.block_number, removed, state_leaf_removed, data

View File

@ -56,6 +56,7 @@ const (
ETH_FORWARD_ETH_CALLS = "ETH_FORWARD_ETH_CALLS" ETH_FORWARD_ETH_CALLS = "ETH_FORWARD_ETH_CALLS"
ETH_FORWARD_GET_STORAGE_AT = "ETH_FORWARD_GET_STORAGE_AT" ETH_FORWARD_GET_STORAGE_AT = "ETH_FORWARD_GET_STORAGE_AT"
ETH_PROXY_ON_ERROR = "ETH_PROXY_ON_ERROR" ETH_PROXY_ON_ERROR = "ETH_PROXY_ON_ERROR"
ETH_GETLOGS_BLOCK_LIMIT = "ETH_GETLOGS_BLOCK_LIMIT"
VALIDATOR_ENABLED = "VALIDATOR_ENABLED" VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK" VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
@ -107,6 +108,7 @@ type Config struct {
ForwardEthCalls bool ForwardEthCalls bool
ForwardGetStorageAt bool ForwardGetStorageAt bool
ProxyOnError bool ProxyOnError bool
GetLogsBlockLimit int64
NodeNetworkID string NodeNetworkID string
// Cache configuration. // Cache configuration.
@ -134,6 +136,7 @@ func NewConfig() (*Config, error) {
viper.BindEnv("ethereum.forwardEthCalls", ETH_FORWARD_ETH_CALLS) viper.BindEnv("ethereum.forwardEthCalls", ETH_FORWARD_ETH_CALLS)
viper.BindEnv("ethereum.forwardGetStorageAt", ETH_FORWARD_GET_STORAGE_AT) viper.BindEnv("ethereum.forwardGetStorageAt", ETH_FORWARD_GET_STORAGE_AT)
viper.BindEnv("ethereum.proxyOnError", ETH_PROXY_ON_ERROR) viper.BindEnv("ethereum.proxyOnError", ETH_PROXY_ON_ERROR)
viper.BindEnv("ethereum.getLogsBlockLimit", ETH_GETLOGS_BLOCK_LIMIT)
viper.BindEnv("log.file", "LOG_FILE") viper.BindEnv("log.file", "LOG_FILE")
viper.BindEnv("log.level", "LOG_LEVEL") viper.BindEnv("log.level", "LOG_LEVEL")
@ -152,6 +155,12 @@ func NewConfig() (*Config, error) {
c.ProxyOnError = viper.GetBool("ethereum.proxyOnError") c.ProxyOnError = viper.GetBool("ethereum.proxyOnError")
c.EthHttpEndpoint = ethHTTPEndpoint c.EthHttpEndpoint = ethHTTPEndpoint
if viper.IsSet("ethereum.getLogsBlockLimit") {
c.GetLogsBlockLimit = viper.GetInt64("ethereum.getLogsBlockLimit")
} else {
c.GetLogsBlockLimit = 100
}
// websocket server // websocket server
wsEnabled := viper.GetBool("server.ws") wsEnabled := viper.GetBool("server.ws")
if wsEnabled { if wsEnabled {

View File

@ -72,6 +72,8 @@ type Service struct {
forwardEthCalls bool forwardEthCalls bool
// whether to forward eth_getStorageAt directly to proxy node // whether to forward eth_getStorageAt directly to proxy node
forwardGetStorageAt bool forwardGetStorageAt bool
// the maximum size of the block range to use in GetLogs
getLogsBlockLimit int64
// whether to forward all calls to proxy node if they throw an error locally // whether to forward all calls to proxy node if they throw an error locally
proxyOnError bool proxyOnError bool
// eth node network id // eth node network id
@ -88,6 +90,7 @@ func NewServer(settings *Config) (Server, error) {
sap.stateDiffTimeout = settings.StateDiffTimeout sap.stateDiffTimeout = settings.StateDiffTimeout
sap.forwardEthCalls = settings.ForwardEthCalls sap.forwardEthCalls = settings.ForwardEthCalls
sap.forwardGetStorageAt = settings.ForwardGetStorageAt sap.forwardGetStorageAt = settings.ForwardGetStorageAt
sap.getLogsBlockLimit = settings.GetLogsBlockLimit
sap.proxyOnError = settings.ProxyOnError sap.proxyOnError = settings.ProxyOnError
sap.nodeNetworkId = settings.NodeNetworkID sap.nodeNetworkId = settings.NodeNetworkID
var err error var err error
@ -128,6 +131,7 @@ func (sap *Service) APIs() []rpc.API {
ForwardGetStorageAt: sap.forwardGetStorageAt, ForwardGetStorageAt: sap.forwardGetStorageAt,
ProxyOnError: sap.proxyOnError, ProxyOnError: sap.proxyOnError,
StateDiffTimeout: sap.stateDiffTimeout, StateDiffTimeout: sap.stateDiffTimeout,
GetLogsBlockLimit: sap.getLogsBlockLimit,
} }
ethAPI, err := eth.NewPublicEthAPI(sap.backend, sap.client, conf) ethAPI, err := eth.NewPublicEthAPI(sap.backend, sap.client, conf)
if err != nil { if err != nil {