From 3914889d5304dbcb5411cd08a486cf7719bee6d1 Mon Sep 17 00:00:00 2001 From: nabarun Date: Wed, 16 Mar 2022 18:08:16 +0530 Subject: [PATCH] Add a service to fill indexing gap for watched addresses --- cmd/serve.go | 19 ++++- pkg/fill/service.go | 185 ++++++++++++++++++++++++++++++++++++++++++++ pkg/serve/config.go | 16 ++++ 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 pkg/fill/service.go diff --git a/cmd/serve.go b/cmd/serve.go index 11330ad6..913f7703 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -33,6 +33,7 @@ import ( "github.com/vulcanize/gap-filler/pkg/mux" "github.com/vulcanize/ipld-eth-server/pkg/eth" + fill "github.com/vulcanize/ipld-eth-server/pkg/fill" "github.com/vulcanize/ipld-eth-server/pkg/graphql" srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" s "github.com/vulcanize/ipld-eth-server/pkg/serve" @@ -100,6 +101,14 @@ func serve() { logWithCommand.Info("state validator disabled") } + if serverConfig.WatchedAddressGapFillerEnabled { + service := fill.New(serverConfig) + go service.Start() + logWithCommand.Info("watched address gap filler enabled") + } else { + logWithCommand.Info("watched address gap filler disabled") + } + shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt) <-shutdown @@ -133,7 +142,7 @@ func startServers(server s.Server, settings *s.Config) error { if settings.HTTPEnabled { logWithCommand.Info("starting up HTTP server") - _, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + _, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) if err != nil { return err } @@ -360,6 +369,10 @@ func init() { serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator") serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block") + // watched address gap filler flags + serveCmd.PersistentFlags().Bool("watched-address-gap-filler-enabled", false, "turn on the watched address gap filler") + serveCmd.PersistentFlags().Int("watched-address-gap-filler-interval", 60, "watched address gap fill interval in secs") + // and their bindings // eth graphql server viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql")) @@ -408,4 +421,8 @@ func init() { // state validator flags viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled")) viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block")) + + // watched address gap filler flags + viper.BindPFlag("watch.fill.enabled", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-enabled")) + viper.BindPFlag("watch.fill.interval", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-interval")) } diff --git a/pkg/fill/service.go b/pkg/fill/service.go new file mode 100644 index 00000000..807b2acb --- /dev/null +++ b/pkg/fill/service.go @@ -0,0 +1,185 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package fill + +import ( + "math" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/jmoiron/sqlx" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/ipld-eth-server/pkg/serve" +) + +// WatchedAddress type is used to process currently watched addresses +type WatchedAddress struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + + StartBlock uint64 + EndBlock uint64 +} + +// Service is the underlying struct for the watched address gap filling service +type Service struct { + db *sqlx.DB + client *rpc.Client + interval int +} + +// NewServer creates a new Service +func New(config *serve.Config) *Service { + return &Service{ + db: config.DB, + client: config.Client, + interval: config.WatchedAddressGapFillInterval, + } +} + +// Start is used to begin the service +func (s *Service) Start() { + for { + // Wait for specified interval duration + time.Sleep(time.Duration(s.interval) * time.Second) + + // Get watched addresses from the db + rows := s.fetchWatchedAddresses() + + // Get the block number to start fill at + // Get the block number to end fill at + fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows) + + if len(fillWatchedAddresses) > 0 { + log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) + } + + // Fill the missing diffs + for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { + params := statediff.Params{ + IntermediateStateNodes: true, + IntermediateStorageNodes: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + IncludeCode: true, + } + + fillAddresses := []interface{}{} + for _, fillWatchedAddress := range fillWatchedAddresses { + if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock { + params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) + fillAddresses = append(fillAddresses, fillWatchedAddress.Address) + } + } + + if len(fillAddresses) > 0 { + s.writeStateDiffAt(blockNumber, params) + s.UpdateLastFilledAt(blockNumber, fillAddresses) + } + } + } +} + +// GetFillAddresses finds the encompassing range to perform fill for the given watched addresses +// it also sets the address specific fill range +func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uint64, uint64) { + fillWatchedAddresses := []WatchedAddress{} + minStartBlock := uint64(math.MaxUint64) + maxEndBlock := uint64(0) + + for _, row := range rows { + // Check for a gap between created_at and watched_at + // CreatedAt and WatchedAt being equal is considered a gap of one block + if row.CreatedAt > row.WatchedAt { + continue + } + + startBlock := uint64(0) + endBlock := uint64(0) + + // Check if some of the gap was filled earlier + if row.LastFilledAt > 0 { + if row.LastFilledAt < row.WatchedAt { + startBlock = row.LastFilledAt + 1 + } + } else { + startBlock = row.CreatedAt + } + + // Add the address for filling + if startBlock > 0 { + row.StartBlock = startBlock + if startBlock < minStartBlock { + minStartBlock = startBlock + } + + endBlock = row.WatchedAt + row.EndBlock = endBlock + if endBlock > maxEndBlock { + maxEndBlock = endBlock + } + + fillWatchedAddresses = append(fillWatchedAddresses, row) + } + } + + return fillWatchedAddresses, minStartBlock, maxEndBlock +} + +// UpdateLastFilledAt updates the fill status for the provided addresses in the db +func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) { + // Prepare the query + query := "UPDATE eth_meta.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")" + query = s.db.Rebind(query) + + args := []interface{}{blockNumber} + args = append(args, fillAddresses...) + + // Execute the update query + _, err := s.db.Exec(query, args...) + if err != nil { + log.Fatalf(err.Error()) + } +} + +// fetchWatchedAddresses fetches watched addresses from the db +func (s *Service) fetchWatchedAddresses() []WatchedAddress { + rows := []WatchedAddress{} + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + err := s.db.Select(&rows, pgStr) + if err != nil { + log.Fatalf("Error fetching watched addreesses: %s", err.Error()) + } + + return rows +} + +// writeStateDiffAt makes a RPC call to writeout statediffs at a blocknumber with the given params +func (s *Service) writeStateDiffAt(blockNumber uint64, params statediff.Params) { + err := s.client.Call(nil, "statediff_writeStateDiffAt", blockNumber, params) + if err != nil { + log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error()) + } +} diff --git a/pkg/serve/config.go b/pkg/serve/config.go index c971691c..7f61b27e 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -56,6 +56,9 @@ const ( VALIDATOR_ENABLED = "VALIDATOR_ENABLED" VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK" + + WATCHED_ADDRESS_GAP_FILLER_ENABLED = "WATCHED_ADDRESS_GAP_FILLER_ENABLED" + WATCHED_ADDRESS_GAP_FILLER_INTERVAL = "WATCHED_ADDRESS_GAP_FILLER_INTERVAL" ) // Config struct @@ -96,6 +99,9 @@ type Config struct { StateValidationEnabled bool StateValidationEveryNthBlock uint64 + + WatchedAddressGapFillerEnabled bool + WatchedAddressGapFillInterval int } // NewConfig is used to initialize a watcher config from a .toml file @@ -232,6 +238,8 @@ func NewConfig() (*Config, error) { c.loadValidatorConfig() + c.loadWatchedAddressGapFillerConfig() + return c, err } @@ -294,3 +302,11 @@ func (c *Config) loadValidatorConfig() { c.StateValidationEnabled = viper.GetBool("validator.enabled") c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock") } + +func (c *Config) loadWatchedAddressGapFillerConfig() { + viper.BindEnv("watch.fill.enabled", WATCHED_ADDRESS_GAP_FILLER_ENABLED) + viper.BindEnv("watch.fill.interval", WATCHED_ADDRESS_GAP_FILLER_INTERVAL) + + c.WatchedAddressGapFillerEnabled = viper.GetBool("watch.fill.enabled") + c.WatchedAddressGapFillInterval = viper.GetInt("watch.fill.interval") +}