[WIP] Add a service to fill indexing gap for watched addresses #135

Closed
prathamesh0 wants to merge 16 commits from pm-watched-addresses into master
2 changed files with 48 additions and 40 deletions
Showing only changes of commit 6c90c366bd - Show all commits

View File

@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2" "github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -309,8 +308,8 @@ type WatchedAddress struct {
WatchedAt uint64 `db:"watched_at"` WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"` LastFilledAt uint64 `db:"last_filled_at"`
start uint64 startBlock uint64
end uint64 endBlock uint64
} }
func startWatchedAddressGapFiller(config *s.Config) { func startWatchedAddressGapFiller(config *s.Config) {
@ -322,10 +321,10 @@ func startWatchedAddressGapFiller(config *s.Config) {
// Get watched addresses from the db // Get watched addresses from the db
// Get the block number to start fill at // Get the block number to start fill at
// Get the block number to end fill at // Get the block number to end fill at
fillWatchedAddresses, minStart, maxEnd := getFillAddresses(config) fillWatchedAddresses, minStartBlock, maxEndBlock := getFillAddresses(config)
// Fill the missing diffs // Fill the missing diffs
for blockNumber := minStart; blockNumber <= maxEnd; blockNumber++ { for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
params := statediff.Params{ params := statediff.Params{
IntermediateStateNodes: true, IntermediateStateNodes: true,
IntermediateStorageNodes: true, IntermediateStorageNodes: true,
@ -335,9 +334,9 @@ func startWatchedAddressGapFiller(config *s.Config) {
IncludeCode: true, IncludeCode: true,
} }
fillAddresses := []string{} fillAddresses := []interface{}{}
for _, fillWatchedAddress := range fillWatchedAddresses { for _, fillWatchedAddress := range fillWatchedAddresses {
if blockNumber >= fillWatchedAddress.start && blockNumber <= fillWatchedAddress.end { if blockNumber >= fillWatchedAddress.startBlock && blockNumber <= fillWatchedAddress.endBlock {
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address) fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
} }
@ -348,70 +347,71 @@ func startWatchedAddressGapFiller(config *s.Config) {
} }
} }
// getFillAddresses gets the addresses and finds the encompassing range to perform the fill
// it also sets the address specific fill range
func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) { func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) {
rows := []WatchedAddress{} rows := []WatchedAddress{}
err := config.DB.Select(&rows, "SELECT * FROM eth.watched_addresses") pgStr := "SELECT * FROM eth.watched_addresses"
err := config.DB.Select(&rows, pgStr)
if err != nil { if err != nil {
log.Fatalf(err.Error()) log.Fatalf("Error fetching watched addreesses:", err.Error())
} }
fillWatchedAddresses := []WatchedAddress{} fillWatchedAddresses := []WatchedAddress{}
minStart := uint64(math.MaxUint64) minStartBlock := uint64(math.MaxUint64)
maxEnd := uint64(0) maxEndBlock := uint64(0)
for _, row := range rows { for _, row := range rows {
// Check for a gap between created_at and // Check for a gap between created_at and watched_at
if row.CreatedAt > row.WatchedAt { if row.CreatedAt > row.WatchedAt {
continue continue
} }
var start uint64 = 0 var startBlock uint64 = 0
var end uint64 = 0 var endBlock uint64 = 0
// Check if some of the gap was filled earlier // Check if some of the gap was filled earlier
if row.LastFilledAt > 0 { if row.LastFilledAt > 0 {
if row.LastFilledAt < row.WatchedAt { if row.LastFilledAt < row.WatchedAt {
start = row.LastFilledAt + 1 startBlock = row.LastFilledAt + 1
} }
} else { } else {
start = row.CreatedAt startBlock = row.CreatedAt
} }
// Add the address for filling // Add the address for filling
if start > 0 { if startBlock > 0 {
row.start = start row.startBlock = startBlock
if start < minStart { if startBlock < minStartBlock {
minStart = start minStartBlock = startBlock
} }
end = row.WatchedAt endBlock = row.WatchedAt
row.end = end row.endBlock = endBlock
if end > maxEnd { if endBlock > maxEndBlock {
maxEnd = end maxEndBlock = endBlock
} }
fillWatchedAddresses = append(fillWatchedAddresses, row) fillWatchedAddresses = append(fillWatchedAddresses, row)
} }
} }
return fillWatchedAddresses, minStart, maxEnd return fillWatchedAddresses, minStartBlock, maxEndBlock
} }
func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []string) { func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []interface{}) {
// Make a RPC call to write the statediffs // Make a RPC call to write the statediffs
var data json.RawMessage var data json.RawMessage
err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params) err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params)
if err != nil { if err != nil {
log.Fatalf(err.Error()) log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error())
} }
// Update the db // Update the db
query, args, err := sqlx.In("UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?)", blockNumber, fillAddresses) query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
if err != nil {
log.Fatalf(err.Error())
}
query = config.DB.Rebind(query) query = config.DB.Rebind(query)
args := []interface{}{blockNumber}
args = append(args, fillAddresses...)
_, err = config.DB.Exec(query, args...) _, err = config.DB.Exec(query, args...)
if err != nil { if err != nil {
log.Fatalf(err.Error()) log.Fatalf(err.Error())
@ -487,6 +487,10 @@ func init() {
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator") serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block") serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
// watched address gap filler flags
serveCmd.PersistentFlags().Bool("watched-address-gap-filler-enabled", false, "turn on the watched address gap filler")
serveCmd.PersistentFlags().Int("watched-address-gap-filler-interval", 60, "watched address gap fill interval in secs")
// and their bindings // and their bindings
// eth graphql server // eth graphql server
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql")) viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
@ -535,4 +539,8 @@ func init() {
// state validator flags // state validator flags
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled")) viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block")) viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
// watched address gap filler flags
viper.BindPFlag("watch.fill.enabled", serveCmd.PersistentFlags().Lookup("watch-fill-enabled"))
viper.BindPFlag("watch.fill.interval", serveCmd.PersistentFlags().Lookup("watch-fill-interval"))
} }

View File

@ -54,8 +54,8 @@ const (
VALIDATOR_ENABLED = "VALIDATOR_ENABLED" VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK" VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
GAP_FILLER_ENABLED = "GAP_FILLER_ENABLED" WATCHED_ADDRESS_GAP_FILLER_ENABLED = "WATCHED_ADDRESS_GAP_FILLER_ENABLED"
GAP_FILLER_INTERVAL = "GAP_FILLER_INTERVAL" WATCHED_ADDRESS_GAP_FILLER_INTERVAL = "WATCHED_ADDRESS_GAP_FILLER_INTERVAL"
) )
// Config struct // Config struct
@ -234,7 +234,7 @@ func NewConfig() (*Config, error) {
c.loadValidatorConfig() c.loadValidatorConfig()
c.loadGapFillerConfig() c.loadWatchedAddressGapFillerConfig()
return c, err return c, err
} }
@ -299,10 +299,10 @@ func (c *Config) loadValidatorConfig() {
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock") c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
} }
func (c *Config) loadGapFillerConfig() { func (c *Config) loadWatchedAddressGapFillerConfig() {
viper.BindEnv("gapfiller.enabled", GAP_FILLER_ENABLED) viper.BindEnv("watch.fill.enabled", WATCHED_ADDRESS_GAP_FILLER_ENABLED)
viper.BindEnv("gapfiller.interval", GAP_FILLER_INTERVAL) viper.BindEnv("watch.fill.interval", WATCHED_ADDRESS_GAP_FILLER_INTERVAL)
c.WatchedAddressGapFillerEnabled = viper.GetBool("gapfiller.enabled") c.WatchedAddressGapFillerEnabled = viper.GetBool("watch.fill.enabled")
c.WatchedAddressGapFillInterval = viper.GetInt("gapfiller.interval") c.WatchedAddressGapFillInterval = viper.GetInt("watch.fill.interval")
} }