From 6c90c366bd850fe35d0cf53d04ba1ab7d77c3b6c Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Wed, 19 Jan 2022 15:12:46 +0530 Subject: [PATCH] Avoid using sqlx to prepare the update query --- cmd/serve.go | 72 +++++++++++++++++++++++++-------------------- pkg/serve/config.go | 16 +++++----- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 4b565580..5a7c3d20 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" - "github.com/jmoiron/sqlx" "github.com/mailgun/groupcache/v2" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -309,8 +308,8 @@ type WatchedAddress struct { WatchedAt uint64 `db:"watched_at"` LastFilledAt uint64 `db:"last_filled_at"` - start uint64 - end uint64 + startBlock uint64 + endBlock uint64 } func startWatchedAddressGapFiller(config *s.Config) { @@ -322,10 +321,10 @@ func startWatchedAddressGapFiller(config *s.Config) { // Get watched addresses from the db // Get the block number to start fill at // Get the block number to end fill at - fillWatchedAddresses, minStart, maxEnd := getFillAddresses(config) + fillWatchedAddresses, minStartBlock, maxEndBlock := getFillAddresses(config) // Fill the missing diffs - for blockNumber := minStart; blockNumber <= maxEnd; blockNumber++ { + for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { params := statediff.Params{ IntermediateStateNodes: true, IntermediateStorageNodes: true, @@ -335,9 +334,9 @@ func startWatchedAddressGapFiller(config *s.Config) { IncludeCode: true, } - fillAddresses := []string{} + fillAddresses := []interface{}{} for _, fillWatchedAddress := range fillWatchedAddresses { - if blockNumber >= fillWatchedAddress.start && blockNumber <= fillWatchedAddress.end { + if blockNumber >= fillWatchedAddress.startBlock && blockNumber <= fillWatchedAddress.endBlock { params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) fillAddresses = append(fillAddresses, fillWatchedAddress.Address) } @@ -348,70 +347,71 @@ func startWatchedAddressGapFiller(config *s.Config) { } } +// getFillAddresses gets the addresses and finds the encompassing range to perform the fill +// it also sets the address specific fill range func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) { rows := []WatchedAddress{} - err := config.DB.Select(&rows, "SELECT * FROM eth.watched_addresses") + pgStr := "SELECT * FROM eth.watched_addresses" + err := config.DB.Select(&rows, pgStr) if err != nil { - log.Fatalf(err.Error()) + log.Fatalf("Error fetching watched addreesses:", err.Error()) } fillWatchedAddresses := []WatchedAddress{} - minStart := uint64(math.MaxUint64) - maxEnd := uint64(0) + minStartBlock := uint64(math.MaxUint64) + maxEndBlock := uint64(0) for _, row := range rows { - // Check for a gap between created_at and + // Check for a gap between created_at and watched_at if row.CreatedAt > row.WatchedAt { continue } - var start uint64 = 0 - var end uint64 = 0 + var startBlock uint64 = 0 + var endBlock uint64 = 0 // Check if some of the gap was filled earlier if row.LastFilledAt > 0 { if row.LastFilledAt < row.WatchedAt { - start = row.LastFilledAt + 1 + startBlock = row.LastFilledAt + 1 } } else { - start = row.CreatedAt + startBlock = row.CreatedAt } // Add the address for filling - if start > 0 { - row.start = start - if start < minStart { - minStart = start + if startBlock > 0 { + row.startBlock = startBlock + if startBlock < minStartBlock { + minStartBlock = startBlock } - end = row.WatchedAt - row.end = end - if end > maxEnd { - maxEnd = end + endBlock = row.WatchedAt + row.endBlock = endBlock + if endBlock > maxEndBlock { + maxEndBlock = endBlock } fillWatchedAddresses = append(fillWatchedAddresses, row) } } - return fillWatchedAddresses, minStart, maxEnd + return fillWatchedAddresses, minStartBlock, maxEndBlock } -func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []string) { +func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []interface{}) { // Make a RPC call to write the statediffs var data json.RawMessage err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params) if err != nil { - log.Fatalf(err.Error()) + log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error()) } // Update the db - query, args, err := sqlx.In("UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?)", blockNumber, fillAddresses) - if err != nil { - log.Fatalf(err.Error()) - } - + query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")" query = config.DB.Rebind(query) + args := []interface{}{blockNumber} + args = append(args, fillAddresses...) _, err = config.DB.Exec(query, args...) if err != nil { log.Fatalf(err.Error()) @@ -487,6 +487,10 @@ func init() { serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator") serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block") + // watched address gap filler flags + serveCmd.PersistentFlags().Bool("watched-address-gap-filler-enabled", false, "turn on the watched address gap filler") + serveCmd.PersistentFlags().Int("watched-address-gap-filler-interval", 60, "watched address gap fill interval in secs") + // and their bindings // eth graphql server viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql")) @@ -535,4 +539,8 @@ func init() { // state validator flags viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled")) viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block")) + + // watched address gap filler flags + viper.BindPFlag("watch.fill.enabled", serveCmd.PersistentFlags().Lookup("watch-fill-enabled")) + viper.BindPFlag("watch.fill.interval", serveCmd.PersistentFlags().Lookup("watch-fill-interval")) } diff --git a/pkg/serve/config.go b/pkg/serve/config.go index fbe94ad1..bc45a8dc 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -54,8 +54,8 @@ const ( VALIDATOR_ENABLED = "VALIDATOR_ENABLED" VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK" - GAP_FILLER_ENABLED = "GAP_FILLER_ENABLED" - GAP_FILLER_INTERVAL = "GAP_FILLER_INTERVAL" + WATCHED_ADDRESS_GAP_FILLER_ENABLED = "WATCHED_ADDRESS_GAP_FILLER_ENABLED" + WATCHED_ADDRESS_GAP_FILLER_INTERVAL = "WATCHED_ADDRESS_GAP_FILLER_INTERVAL" ) // Config struct @@ -234,7 +234,7 @@ func NewConfig() (*Config, error) { c.loadValidatorConfig() - c.loadGapFillerConfig() + c.loadWatchedAddressGapFillerConfig() return c, err } @@ -299,10 +299,10 @@ func (c *Config) loadValidatorConfig() { c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock") } -func (c *Config) loadGapFillerConfig() { - viper.BindEnv("gapfiller.enabled", GAP_FILLER_ENABLED) - viper.BindEnv("gapfiller.interval", GAP_FILLER_INTERVAL) +func (c *Config) loadWatchedAddressGapFillerConfig() { + viper.BindEnv("watch.fill.enabled", WATCHED_ADDRESS_GAP_FILLER_ENABLED) + viper.BindEnv("watch.fill.interval", WATCHED_ADDRESS_GAP_FILLER_INTERVAL) - c.WatchedAddressGapFillerEnabled = viper.GetBool("gapfiller.enabled") - c.WatchedAddressGapFillInterval = viper.GetInt("gapfiller.interval") + c.WatchedAddressGapFillerEnabled = viper.GetBool("watch.fill.enabled") + c.WatchedAddressGapFillInterval = viper.GetInt("watch.fill.interval") }