Add a service to fill indexing gap for watched addresses
This commit is contained in:
parent
1a6ff273ba
commit
3914889d53
19
cmd/serve.go
19
cmd/serve.go
@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/vulcanize/gap-filler/pkg/mux"
|
"github.com/vulcanize/gap-filler/pkg/mux"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
|
fill "github.com/vulcanize/ipld-eth-server/pkg/fill"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
|
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
|
||||||
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
|
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
|
||||||
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||||
@ -100,6 +101,14 @@ func serve() {
|
|||||||
logWithCommand.Info("state validator disabled")
|
logWithCommand.Info("state validator disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if serverConfig.WatchedAddressGapFillerEnabled {
|
||||||
|
service := fill.New(serverConfig)
|
||||||
|
go service.Start()
|
||||||
|
logWithCommand.Info("watched address gap filler enabled")
|
||||||
|
} else {
|
||||||
|
logWithCommand.Info("watched address gap filler disabled")
|
||||||
|
}
|
||||||
|
|
||||||
shutdown := make(chan os.Signal, 1)
|
shutdown := make(chan os.Signal, 1)
|
||||||
signal.Notify(shutdown, os.Interrupt)
|
signal.Notify(shutdown, os.Interrupt)
|
||||||
<-shutdown
|
<-shutdown
|
||||||
@ -133,7 +142,7 @@ func startServers(server s.Server, settings *s.Config) error {
|
|||||||
|
|
||||||
if settings.HTTPEnabled {
|
if settings.HTTPEnabled {
|
||||||
logWithCommand.Info("starting up HTTP server")
|
logWithCommand.Info("starting up HTTP server")
|
||||||
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
|
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -360,6 +369,10 @@ func init() {
|
|||||||
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
|
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
|
||||||
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
|
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
|
||||||
|
|
||||||
|
// watched address gap filler flags
|
||||||
|
serveCmd.PersistentFlags().Bool("watched-address-gap-filler-enabled", false, "turn on the watched address gap filler")
|
||||||
|
serveCmd.PersistentFlags().Int("watched-address-gap-filler-interval", 60, "watched address gap fill interval in secs")
|
||||||
|
|
||||||
// and their bindings
|
// and their bindings
|
||||||
// eth graphql server
|
// eth graphql server
|
||||||
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
|
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
|
||||||
@ -408,4 +421,8 @@ func init() {
|
|||||||
// state validator flags
|
// state validator flags
|
||||||
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
|
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
|
||||||
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
|
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
|
||||||
|
|
||||||
|
// watched address gap filler flags
|
||||||
|
viper.BindPFlag("watch.fill.enabled", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-enabled"))
|
||||||
|
viper.BindPFlag("watch.fill.interval", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-interval"))
|
||||||
}
|
}
|
||||||
|
185
pkg/fill/service.go
Normal file
185
pkg/fill/service.go
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2022 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package fill
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WatchedAddress type is used to process currently watched addresses
|
||||||
|
type WatchedAddress struct {
|
||||||
|
Address string `db:"address"`
|
||||||
|
CreatedAt uint64 `db:"created_at"`
|
||||||
|
WatchedAt uint64 `db:"watched_at"`
|
||||||
|
LastFilledAt uint64 `db:"last_filled_at"`
|
||||||
|
|
||||||
|
StartBlock uint64
|
||||||
|
EndBlock uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service is the underlying struct for the watched address gap filling service
|
||||||
|
type Service struct {
|
||||||
|
db *sqlx.DB
|
||||||
|
client *rpc.Client
|
||||||
|
interval int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServer creates a new Service
|
||||||
|
func New(config *serve.Config) *Service {
|
||||||
|
return &Service{
|
||||||
|
db: config.DB,
|
||||||
|
client: config.Client,
|
||||||
|
interval: config.WatchedAddressGapFillInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to begin the service
|
||||||
|
func (s *Service) Start() {
|
||||||
|
for {
|
||||||
|
// Wait for specified interval duration
|
||||||
|
time.Sleep(time.Duration(s.interval) * time.Second)
|
||||||
|
|
||||||
|
// Get watched addresses from the db
|
||||||
|
rows := s.fetchWatchedAddresses()
|
||||||
|
|
||||||
|
// Get the block number to start fill at
|
||||||
|
// Get the block number to end fill at
|
||||||
|
fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows)
|
||||||
|
|
||||||
|
if len(fillWatchedAddresses) > 0 {
|
||||||
|
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill the missing diffs
|
||||||
|
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
|
||||||
|
params := statediff.Params{
|
||||||
|
IntermediateStateNodes: true,
|
||||||
|
IntermediateStorageNodes: true,
|
||||||
|
IncludeBlock: true,
|
||||||
|
IncludeReceipts: true,
|
||||||
|
IncludeTD: true,
|
||||||
|
IncludeCode: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
fillAddresses := []interface{}{}
|
||||||
|
for _, fillWatchedAddress := range fillWatchedAddresses {
|
||||||
|
if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock {
|
||||||
|
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
|
||||||
|
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fillAddresses) > 0 {
|
||||||
|
s.writeStateDiffAt(blockNumber, params)
|
||||||
|
s.UpdateLastFilledAt(blockNumber, fillAddresses)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFillAddresses finds the encompassing range to perform fill for the given watched addresses
|
||||||
|
// it also sets the address specific fill range
|
||||||
|
func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uint64, uint64) {
|
||||||
|
fillWatchedAddresses := []WatchedAddress{}
|
||||||
|
minStartBlock := uint64(math.MaxUint64)
|
||||||
|
maxEndBlock := uint64(0)
|
||||||
|
|
||||||
|
for _, row := range rows {
|
||||||
|
// Check for a gap between created_at and watched_at
|
||||||
|
// CreatedAt and WatchedAt being equal is considered a gap of one block
|
||||||
|
if row.CreatedAt > row.WatchedAt {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
startBlock := uint64(0)
|
||||||
|
endBlock := uint64(0)
|
||||||
|
|
||||||
|
// Check if some of the gap was filled earlier
|
||||||
|
if row.LastFilledAt > 0 {
|
||||||
|
if row.LastFilledAt < row.WatchedAt {
|
||||||
|
startBlock = row.LastFilledAt + 1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
startBlock = row.CreatedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the address for filling
|
||||||
|
if startBlock > 0 {
|
||||||
|
row.StartBlock = startBlock
|
||||||
|
if startBlock < minStartBlock {
|
||||||
|
minStartBlock = startBlock
|
||||||
|
}
|
||||||
|
|
||||||
|
endBlock = row.WatchedAt
|
||||||
|
row.EndBlock = endBlock
|
||||||
|
if endBlock > maxEndBlock {
|
||||||
|
maxEndBlock = endBlock
|
||||||
|
}
|
||||||
|
|
||||||
|
fillWatchedAddresses = append(fillWatchedAddresses, row)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fillWatchedAddresses, minStartBlock, maxEndBlock
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateLastFilledAt updates the fill status for the provided addresses in the db
|
||||||
|
func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) {
|
||||||
|
// Prepare the query
|
||||||
|
query := "UPDATE eth_meta.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
|
||||||
|
query = s.db.Rebind(query)
|
||||||
|
|
||||||
|
args := []interface{}{blockNumber}
|
||||||
|
args = append(args, fillAddresses...)
|
||||||
|
|
||||||
|
// Execute the update query
|
||||||
|
_, err := s.db.Exec(query, args...)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchWatchedAddresses fetches watched addresses from the db
|
||||||
|
func (s *Service) fetchWatchedAddresses() []WatchedAddress {
|
||||||
|
rows := []WatchedAddress{}
|
||||||
|
pgStr := "SELECT * FROM eth_meta.watched_addresses"
|
||||||
|
|
||||||
|
err := s.db.Select(&rows, pgStr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error fetching watched addreesses: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeStateDiffAt makes a RPC call to writeout statediffs at a blocknumber with the given params
|
||||||
|
func (s *Service) writeStateDiffAt(blockNumber uint64, params statediff.Params) {
|
||||||
|
err := s.client.Call(nil, "statediff_writeStateDiffAt", blockNumber, params)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error())
|
||||||
|
}
|
||||||
|
}
|
@ -56,6 +56,9 @@ const (
|
|||||||
|
|
||||||
VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
|
VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
|
||||||
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
|
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
|
||||||
|
|
||||||
|
WATCHED_ADDRESS_GAP_FILLER_ENABLED = "WATCHED_ADDRESS_GAP_FILLER_ENABLED"
|
||||||
|
WATCHED_ADDRESS_GAP_FILLER_INTERVAL = "WATCHED_ADDRESS_GAP_FILLER_INTERVAL"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config struct
|
// Config struct
|
||||||
@ -96,6 +99,9 @@ type Config struct {
|
|||||||
|
|
||||||
StateValidationEnabled bool
|
StateValidationEnabled bool
|
||||||
StateValidationEveryNthBlock uint64
|
StateValidationEveryNthBlock uint64
|
||||||
|
|
||||||
|
WatchedAddressGapFillerEnabled bool
|
||||||
|
WatchedAddressGapFillInterval int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig is used to initialize a watcher config from a .toml file
|
// NewConfig is used to initialize a watcher config from a .toml file
|
||||||
@ -232,6 +238,8 @@ func NewConfig() (*Config, error) {
|
|||||||
|
|
||||||
c.loadValidatorConfig()
|
c.loadValidatorConfig()
|
||||||
|
|
||||||
|
c.loadWatchedAddressGapFillerConfig()
|
||||||
|
|
||||||
return c, err
|
return c, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,3 +302,11 @@ func (c *Config) loadValidatorConfig() {
|
|||||||
c.StateValidationEnabled = viper.GetBool("validator.enabled")
|
c.StateValidationEnabled = viper.GetBool("validator.enabled")
|
||||||
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
|
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Config) loadWatchedAddressGapFillerConfig() {
|
||||||
|
viper.BindEnv("watch.fill.enabled", WATCHED_ADDRESS_GAP_FILLER_ENABLED)
|
||||||
|
viper.BindEnv("watch.fill.interval", WATCHED_ADDRESS_GAP_FILLER_INTERVAL)
|
||||||
|
|
||||||
|
c.WatchedAddressGapFillerEnabled = viper.GetBool("watch.fill.enabled")
|
||||||
|
c.WatchedAddressGapFillInterval = viper.GetInt("watch.fill.interval")
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user