From 00031e2b8363cc2ec1924cecf4451a34976666a9 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 3 Apr 2020 14:29:28 -0500 Subject: [PATCH] resync reset validation level --- pkg/super_node/btc/cleaner.go | 18 ++++++++++ pkg/super_node/btc/cleaner_test.go | 54 ++++++++++++++++++++++++++++++ pkg/super_node/eth/cleaner.go | 18 ++++++++++ pkg/super_node/eth/cleaner_test.go | 54 ++++++++++++++++++++++++++++++ pkg/super_node/resync/config.go | 24 +++++++------ pkg/super_node/resync/service.go | 35 ++++++++++++------- pkg/super_node/shared/intefaces.go | 1 + 7 files changed, 181 insertions(+), 23 deletions(-) diff --git a/pkg/super_node/btc/cleaner.go b/pkg/super_node/btc/cleaner.go index cf39cd5b..95a00b27 100644 --- a/pkg/super_node/btc/cleaner.go +++ b/pkg/super_node/btc/cleaner.go @@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner { } } +// ResetValidation resets the validation level to 0 to enable revalidation +func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + logrus.Infof("btc db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1]) + pgStr := `UPDATE btc.header_cids + SET times_validated = 0 + WHERE block_number BETWEEN $1 AND $2` + if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + return err + } + } + return tx.Commit() +} + // Clean removes the specified data from the db within the provided block range func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { tx, err := c.db.Beginx() diff --git a/pkg/super_node/btc/cleaner_test.go b/pkg/super_node/btc/cleaner_test.go index a245a294..86efd607 100644 --- a/pkg/super_node/btc/cleaner_test.go +++ b/pkg/super_node/btc/cleaner_test.go @@ -285,4 +285,58 @@ var _ = Describe("Cleaner", func() { Expect(headerCount).To(Equal(2)) }) }) + + Describe("ResetValidation", func() { + BeforeEach(func() { + err := repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM btc.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(1)) + Expect(validationTimes[1]).To(Equal(1)) + + err = repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(2)) + Expect(validationTimes[1]).To(Equal(1)) + }) + AfterEach(func() { + btc.TearDownDB(db) + }) + It("Resets the validation level", func() { + err := cleaner.ResetValidation(rngs) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM btc.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(0)) + + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(1)) + }) + }) }) diff --git a/pkg/super_node/eth/cleaner.go b/pkg/super_node/eth/cleaner.go index c3496504..c07dd063 100644 --- a/pkg/super_node/eth/cleaner.go +++ b/pkg/super_node/eth/cleaner.go @@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner { } } +// ResetValidation resets the validation level to 0 to enable revalidation +func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + logrus.Infof("eth db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1]) + pgStr := `UPDATE eth.header_cids + SET times_validated = 0 + WHERE block_number BETWEEN $1 AND $2` + if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + return err + } + } + return tx.Commit() +} + // Clean removes the specified data from the db within the provided block range func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { tx, err := c.db.Beginx() diff --git a/pkg/super_node/eth/cleaner_test.go b/pkg/super_node/eth/cleaner_test.go index 351ce77d..0c3c5b88 100644 --- a/pkg/super_node/eth/cleaner_test.go +++ b/pkg/super_node/eth/cleaner_test.go @@ -611,4 +611,58 @@ var _ = Describe("Cleaner", func() { Expect(blocksCount).To(Equal(12)) }) }) + + Describe("ResetValidation", func() { + BeforeEach(func() { + err := repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM eth.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(1)) + Expect(validationTimes[1]).To(Equal(1)) + + err = repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(2)) + Expect(validationTimes[1]).To(Equal(1)) + }) + AfterEach(func() { + eth.TearDownDB(db) + }) + It("Resets the validation level", func() { + err := cleaner.ResetValidation(rngs) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM eth.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(0)) + + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(1)) + }) + }) }) diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index 04c5b118..64c5a266 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -30,20 +30,22 @@ import ( // Env variables const ( - RESYNC_CHAIN = "RESYNC_CHAIN" - RESYNC_START = "RESYNC_START" - RESYNC_STOP = "RESYNC_STOP" - RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE" - RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER" - RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE" - RESYNC_TYPE = "RESYNC_TYPE" + RESYNC_CHAIN = "RESYNC_CHAIN" + RESYNC_START = "RESYNC_START" + RESYNC_STOP = "RESYNC_STOP" + RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE" + RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER" + RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE" + RESYNC_TYPE = "RESYNC_TYPE" + RESYNC_RESET_VALIDATION = "RESYNC_RESET_VALIDATION" ) // Config holds the parameters needed to perform a resync type Config struct { - Chain shared.ChainType // The type of resync to perform - ResyncType shared.DataType // The type of data to resync - ClearOldCache bool // Resync will first clear all the data within the range + Chain shared.ChainType // The type of resync to perform + ResyncType shared.DataType // The type of data to resync + ClearOldCache bool // Resync will first clear all the data within the range + ResetValidation bool // If true, resync will reset the validation level to 0 for the given range // DB info DB *postgres.DB @@ -73,11 +75,13 @@ func NewReSyncConfig() (*Config, error) { viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH) viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE) viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER) + viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION) start := uint64(viper.GetInt64("resync.start")) stop := uint64(viper.GetInt64("resync.stop")) c.Ranges = [][2]uint64{{start, stop}} c.ClearOldCache = viper.GetBool("resync.clearOldCache") + c.ResetValidation = viper.GetBool("resync.resetValidation") c.IPFSPath, err = shared.GetIPFSPath() if err != nil { diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index 18dacb21..0b9fd1ec 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -58,6 +58,8 @@ type Service struct { ranges [][2]uint64 // Flag to turn on or off old cache destruction clearOldCache bool + // Flag to turn on or off validation level reset + resetValidation bool } // NewResyncService creates and returns a resync service from the provided settings @@ -95,23 +97,30 @@ func NewResyncService(settings *Config) (Resync, error) { batchNumber = super_node.DefaultMaxBatchNumber } return &Service{ - Indexer: indexer, - Converter: converter, - Publisher: publisher, - Retriever: retriever, - Fetcher: fetcher, - Cleaner: cleaner, - BatchSize: batchSize, - BatchNumber: int64(batchNumber), - QuitChan: settings.Quit, - chain: settings.Chain, - ranges: settings.Ranges, - data: settings.ResyncType, - clearOldCache: settings.ClearOldCache, + Indexer: indexer, + Converter: converter, + Publisher: publisher, + Retriever: retriever, + Fetcher: fetcher, + Cleaner: cleaner, + BatchSize: batchSize, + BatchNumber: int64(batchNumber), + QuitChan: settings.Quit, + chain: settings.Chain, + ranges: settings.Ranges, + data: settings.ResyncType, + clearOldCache: settings.ClearOldCache, + resetValidation: settings.ResetValidation, }, nil } func (rs *Service) Resync() error { + if rs.resetValidation { + logrus.Infof("resetting validation level") + if err := rs.Cleaner.ResetValidation(rs.ranges); err != nil { + return fmt.Errorf("validation reset failed: %v", err) + } + } if rs.clearOldCache { logrus.Infof("cleaning out old data from Postgres") if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil { diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 39393919..77b8a8fe 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -79,6 +79,7 @@ type DagPutter interface { // Cleaner is for cleaning out data from the cache within the given ranges type Cleaner interface { Clean(rngs [][2]uint64, t DataType) error + ResetValidation(rngs [][2]uint64) error } // SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain