resync reset validation level
This commit is contained in:
parent
060e3430c1
commit
00031e2b83
@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetValidation resets the validation level to 0 to enable revalidation
|
||||||
|
func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
|
||||||
|
tx, err := c.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, rng := range rngs {
|
||||||
|
logrus.Infof("btc db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1])
|
||||||
|
pgStr := `UPDATE btc.header_cids
|
||||||
|
SET times_validated = 0
|
||||||
|
WHERE block_number BETWEEN $1 AND $2`
|
||||||
|
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
// Clean removes the specified data from the db within the provided block range
|
// Clean removes the specified data from the db within the provided block range
|
||||||
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
|
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
|
||||||
tx, err := c.db.Beginx()
|
tx, err := c.db.Beginx()
|
||||||
|
@ -285,4 +285,58 @@ var _ = Describe("Cleaner", func() {
|
|||||||
Expect(headerCount).To(Equal(2))
|
Expect(headerCount).To(Equal(2))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("ResetValidation", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
err := repo.Index(mockCIDPayload1)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(mockCIDPayload2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
var validationTimes []int
|
||||||
|
pgStr := `SELECT times_validated FROM btc.header_cids`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(1))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
|
||||||
|
err = repo.Index(mockCIDPayload1)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
validationTimes = []int{}
|
||||||
|
pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(2))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
})
|
||||||
|
AfterEach(func() {
|
||||||
|
btc.TearDownDB(db)
|
||||||
|
})
|
||||||
|
It("Resets the validation level", func() {
|
||||||
|
err := cleaner.ResetValidation(rngs)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
var validationTimes []int
|
||||||
|
pgStr := `SELECT times_validated FROM btc.header_cids`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(0))
|
||||||
|
Expect(validationTimes[1]).To(Equal(0))
|
||||||
|
|
||||||
|
err = repo.Index(mockCIDPayload2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
validationTimes = []int{}
|
||||||
|
pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(0))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetValidation resets the validation level to 0 to enable revalidation
|
||||||
|
func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
|
||||||
|
tx, err := c.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, rng := range rngs {
|
||||||
|
logrus.Infof("eth db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1])
|
||||||
|
pgStr := `UPDATE eth.header_cids
|
||||||
|
SET times_validated = 0
|
||||||
|
WHERE block_number BETWEEN $1 AND $2`
|
||||||
|
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
// Clean removes the specified data from the db within the provided block range
|
// Clean removes the specified data from the db within the provided block range
|
||||||
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
|
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
|
||||||
tx, err := c.db.Beginx()
|
tx, err := c.db.Beginx()
|
||||||
|
@ -611,4 +611,58 @@ var _ = Describe("Cleaner", func() {
|
|||||||
Expect(blocksCount).To(Equal(12))
|
Expect(blocksCount).To(Equal(12))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("ResetValidation", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
err := repo.Index(mockCIDPayload1)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(mockCIDPayload2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
var validationTimes []int
|
||||||
|
pgStr := `SELECT times_validated FROM eth.header_cids`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(1))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
|
||||||
|
err = repo.Index(mockCIDPayload1)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
validationTimes = []int{}
|
||||||
|
pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(2))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
})
|
||||||
|
AfterEach(func() {
|
||||||
|
eth.TearDownDB(db)
|
||||||
|
})
|
||||||
|
It("Resets the validation level", func() {
|
||||||
|
err := cleaner.ResetValidation(rngs)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
var validationTimes []int
|
||||||
|
pgStr := `SELECT times_validated FROM eth.header_cids`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(0))
|
||||||
|
Expect(validationTimes[1]).To(Equal(0))
|
||||||
|
|
||||||
|
err = repo.Index(mockCIDPayload2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
validationTimes = []int{}
|
||||||
|
pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number`
|
||||||
|
err = db.Select(&validationTimes, pgStr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(len(validationTimes)).To(Equal(2))
|
||||||
|
Expect(validationTimes[0]).To(Equal(0))
|
||||||
|
Expect(validationTimes[1]).To(Equal(1))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
@ -30,20 +30,22 @@ import (
|
|||||||
|
|
||||||
// Env variables
|
// Env variables
|
||||||
const (
|
const (
|
||||||
RESYNC_CHAIN = "RESYNC_CHAIN"
|
RESYNC_CHAIN = "RESYNC_CHAIN"
|
||||||
RESYNC_START = "RESYNC_START"
|
RESYNC_START = "RESYNC_START"
|
||||||
RESYNC_STOP = "RESYNC_STOP"
|
RESYNC_STOP = "RESYNC_STOP"
|
||||||
RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE"
|
RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE"
|
||||||
RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER"
|
RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER"
|
||||||
RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE"
|
RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE"
|
||||||
RESYNC_TYPE = "RESYNC_TYPE"
|
RESYNC_TYPE = "RESYNC_TYPE"
|
||||||
|
RESYNC_RESET_VALIDATION = "RESYNC_RESET_VALIDATION"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config holds the parameters needed to perform a resync
|
// Config holds the parameters needed to perform a resync
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Chain shared.ChainType // The type of resync to perform
|
Chain shared.ChainType // The type of resync to perform
|
||||||
ResyncType shared.DataType // The type of data to resync
|
ResyncType shared.DataType // The type of data to resync
|
||||||
ClearOldCache bool // Resync will first clear all the data within the range
|
ClearOldCache bool // Resync will first clear all the data within the range
|
||||||
|
ResetValidation bool // If true, resync will reset the validation level to 0 for the given range
|
||||||
|
|
||||||
// DB info
|
// DB info
|
||||||
DB *postgres.DB
|
DB *postgres.DB
|
||||||
@ -73,11 +75,13 @@ func NewReSyncConfig() (*Config, error) {
|
|||||||
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
|
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
|
||||||
viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE)
|
viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE)
|
||||||
viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER)
|
viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER)
|
||||||
|
viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION)
|
||||||
|
|
||||||
start := uint64(viper.GetInt64("resync.start"))
|
start := uint64(viper.GetInt64("resync.start"))
|
||||||
stop := uint64(viper.GetInt64("resync.stop"))
|
stop := uint64(viper.GetInt64("resync.stop"))
|
||||||
c.Ranges = [][2]uint64{{start, stop}}
|
c.Ranges = [][2]uint64{{start, stop}}
|
||||||
c.ClearOldCache = viper.GetBool("resync.clearOldCache")
|
c.ClearOldCache = viper.GetBool("resync.clearOldCache")
|
||||||
|
c.ResetValidation = viper.GetBool("resync.resetValidation")
|
||||||
|
|
||||||
c.IPFSPath, err = shared.GetIPFSPath()
|
c.IPFSPath, err = shared.GetIPFSPath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -58,6 +58,8 @@ type Service struct {
|
|||||||
ranges [][2]uint64
|
ranges [][2]uint64
|
||||||
// Flag to turn on or off old cache destruction
|
// Flag to turn on or off old cache destruction
|
||||||
clearOldCache bool
|
clearOldCache bool
|
||||||
|
// Flag to turn on or off validation level reset
|
||||||
|
resetValidation bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResyncService creates and returns a resync service from the provided settings
|
// NewResyncService creates and returns a resync service from the provided settings
|
||||||
@ -95,23 +97,30 @@ func NewResyncService(settings *Config) (Resync, error) {
|
|||||||
batchNumber = super_node.DefaultMaxBatchNumber
|
batchNumber = super_node.DefaultMaxBatchNumber
|
||||||
}
|
}
|
||||||
return &Service{
|
return &Service{
|
||||||
Indexer: indexer,
|
Indexer: indexer,
|
||||||
Converter: converter,
|
Converter: converter,
|
||||||
Publisher: publisher,
|
Publisher: publisher,
|
||||||
Retriever: retriever,
|
Retriever: retriever,
|
||||||
Fetcher: fetcher,
|
Fetcher: fetcher,
|
||||||
Cleaner: cleaner,
|
Cleaner: cleaner,
|
||||||
BatchSize: batchSize,
|
BatchSize: batchSize,
|
||||||
BatchNumber: int64(batchNumber),
|
BatchNumber: int64(batchNumber),
|
||||||
QuitChan: settings.Quit,
|
QuitChan: settings.Quit,
|
||||||
chain: settings.Chain,
|
chain: settings.Chain,
|
||||||
ranges: settings.Ranges,
|
ranges: settings.Ranges,
|
||||||
data: settings.ResyncType,
|
data: settings.ResyncType,
|
||||||
clearOldCache: settings.ClearOldCache,
|
clearOldCache: settings.ClearOldCache,
|
||||||
|
resetValidation: settings.ResetValidation,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *Service) Resync() error {
|
func (rs *Service) Resync() error {
|
||||||
|
if rs.resetValidation {
|
||||||
|
logrus.Infof("resetting validation level")
|
||||||
|
if err := rs.Cleaner.ResetValidation(rs.ranges); err != nil {
|
||||||
|
return fmt.Errorf("validation reset failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if rs.clearOldCache {
|
if rs.clearOldCache {
|
||||||
logrus.Infof("cleaning out old data from Postgres")
|
logrus.Infof("cleaning out old data from Postgres")
|
||||||
if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil {
|
if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil {
|
||||||
|
@ -79,6 +79,7 @@ type DagPutter interface {
|
|||||||
// Cleaner is for cleaning out data from the cache within the given ranges
|
// Cleaner is for cleaning out data from the cache within the given ranges
|
||||||
type Cleaner interface {
|
type Cleaner interface {
|
||||||
Clean(rngs [][2]uint64, t DataType) error
|
Clean(rngs [][2]uint64, t DataType) error
|
||||||
|
ResetValidation(rngs [][2]uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain
|
// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain
|
||||||
|
Loading…
Reference in New Issue
Block a user