Add a service to fill indexing gap for watched addresses

This commit is contained in:
Prathamesh Musale 2022-01-18 14:48:35 +05:30
parent f190404ab5
commit bea6525318
2 changed files with 144 additions and 1 deletions

View File

@ -16,7 +16,9 @@
package cmd
import (
"encoding/json"
"errors"
"math"
"net/http"
"net/url"
"os"
@ -25,7 +27,10 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -100,6 +105,13 @@ func serve() {
logWithCommand.Info("state validator disabled")
}
if serverConfig.WatchedAddressGapFillerEnabled {
go startWatchedAddressGapFiller(serverConfig)
logWithCommand.Info("watched address gap filler enabled")
} else {
logWithCommand.Info("watched address gap filler disabled")
}
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
@ -133,7 +145,7 @@ func startServers(server s.Server, settings *s.Config) error {
if settings.HTTPEnabled {
logWithCommand.Info("starting up HTTP server")
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
if err != nil {
return err
}
@ -291,6 +303,121 @@ func startStateTrieValidator(config *s.Config, server s.Server) {
}
}
type WatchedAddress struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
start uint64
end uint64
}
func startWatchedAddressGapFiller(config *s.Config) {
fillInterval := config.WatchedAddressGapFillInterval
for {
time.Sleep(time.Duration(fillInterval) * time.Second)
// Get watched addresses from the db
// Get the block number to start fill at
// Get the block number to end fill at
fillWatchedAddresses, minStart, maxEnd := getFillAddresses(config)
// Fill the missing diffs
for blockNumber := minStart; blockNumber <= maxEnd; blockNumber++ {
params := statediff.Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
fillAddresses := []string{}
for _, fillWatchedAddress := range fillWatchedAddresses {
if blockNumber >= fillWatchedAddress.start && blockNumber <= fillWatchedAddress.end {
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
}
}
fillWatchedAddressGap(config, blockNumber, params, fillAddresses)
}
}
}
func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) {
rows := []WatchedAddress{}
err := config.DB.Select(&rows, "SELECT * FROM eth.watched_addresses")
if err != nil {
log.Fatalf(err.Error())
}
fillWatchedAddresses := []WatchedAddress{}
minStart := uint64(math.MaxUint64)
maxEnd := uint64(0)
for _, row := range rows {
// Check for a gap between created_at and
if row.CreatedAt > row.WatchedAt {
continue
}
var start uint64 = 0
var end uint64 = 0
// Check if some of the gap was filled earlier
if row.LastFilledAt > 0 {
if row.LastFilledAt < row.WatchedAt {
start = row.LastFilledAt + 1
}
} else {
start = row.CreatedAt
}
// Add the address for filling
if start > 0 {
row.start = start
if start < minStart {
minStart = start
}
end = row.WatchedAt
row.end = end
if end > maxEnd {
maxEnd = end
}
fillWatchedAddresses = append(fillWatchedAddresses, row)
}
}
return fillWatchedAddresses, minStart, maxEnd
}
func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []string) {
// Make a RPC call to write the statediffs
var data json.RawMessage
err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params)
if err != nil {
log.Fatalf(err.Error())
}
// Update the db
query, args, err := sqlx.In("UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?)", blockNumber, fillAddresses)
if err != nil {
log.Fatalf(err.Error())
}
query = config.DB.Rebind(query)
_, err = config.DB.Exec(query, args...)
if err != nil {
log.Fatalf(err.Error())
}
}
func parseRpcAddresses(value string) ([]*rpc.Client, error) {
rpcAddresses := strings.Split(value, ",")
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))

View File

@ -53,6 +53,9 @@ const (
VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
GAP_FILLER_ENABLED = "GAP_FILLER_ENABLED"
GAP_FILLER_INTERVAL = "GAP_FILLER_INTERVAL"
)
// Config struct
@ -93,6 +96,9 @@ type Config struct {
StateValidationEnabled bool
StateValidationEveryNthBlock uint64
WatchedAddressGapFillerEnabled bool
WatchedAddressGapFillInterval int
}
// NewConfig is used to initialize a watcher config from a .toml file
@ -228,6 +234,8 @@ func NewConfig() (*Config, error) {
c.loadValidatorConfig()
c.loadGapFillerConfig()
return c, err
}
@ -290,3 +298,11 @@ func (c *Config) loadValidatorConfig() {
c.StateValidationEnabled = viper.GetBool("validator.enabled")
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
}
func (c *Config) loadGapFillerConfig() {
viper.BindEnv("gapfiller.enabled", GAP_FILLER_ENABLED)
viper.BindEnv("gapfiller.interval", GAP_FILLER_INTERVAL)
c.WatchedAddressGapFillerEnabled = viper.GetBool("gapfiller.enabled")
c.WatchedAddressGapFillInterval = viper.GetInt("gapfiller.interval")
}