Merge pull request #93 from vulcanize/vdb-570-storage-duplicates
(VDB-570) Handle duplicate storage diffs
This commit is contained in:
commit
3b531950f2
@ -193,5 +193,5 @@ func composeAndExecute() {
|
|||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(composeAndExecuteCmd)
|
rootCmd.AddCommand(composeAndExecuteCmd)
|
||||||
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
||||||
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
|
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
|
||||||
}
|
}
|
||||||
|
@ -139,7 +139,7 @@ func execute() {
|
|||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(executeCmd)
|
rootCmd.AddCommand(executeCmd)
|
||||||
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
||||||
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
|
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exporter interface {
|
type Exporter interface {
|
||||||
|
24
cmd/root.go
24
cmd/root.go
@ -36,22 +36,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
cfgFile string
|
cfgFile string
|
||||||
databaseConfig config.Database
|
databaseConfig config.Database
|
||||||
genConfig config.Plugin
|
genConfig config.Plugin
|
||||||
ipc string
|
ipc string
|
||||||
levelDbPath string
|
levelDbPath string
|
||||||
queueRecheckInterval time.Duration
|
queueRecheckInterval time.Duration
|
||||||
startingBlockNumber int64
|
startingBlockNumber int64
|
||||||
storageDiffsPath string
|
storageDiffsPath string
|
||||||
syncAll bool
|
syncAll bool
|
||||||
endingBlockNumber int64
|
endingBlockNumber int64
|
||||||
recheckHeadersArg bool
|
recheckHeadersArg bool
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
pollingInterval = 7 * time.Second
|
pollingInterval = 7 * time.Second
|
||||||
validationWindow = 15
|
validationWindow = 15
|
||||||
)
|
)
|
||||||
|
|
||||||
var rootCmd = &cobra.Command{
|
var rootCmd = &cobra.Command{
|
||||||
|
@ -17,9 +17,12 @@
|
|||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrRowExists = errors.New("parsed row for storage diff already exists")
|
||||||
|
|
||||||
type ErrContractNotFound struct {
|
type ErrContractNotFound struct {
|
||||||
Contract string
|
Contract string
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
executeErr := storageTransformer.Execute(row)
|
executeErr := storageTransformer.Execute(row)
|
||||||
if executeErr != nil {
|
if executeErr != nil && executeErr != utils.ErrRowExists {
|
||||||
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
|
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
|
||||||
queueErr := storageWatcher.Queue.Add(row)
|
queueErr := storageWatcher.Queue.Add(row)
|
||||||
if queueErr != nil {
|
if queueErr != nil {
|
||||||
@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
executeErr := storageTransformer.Execute(row)
|
executeErr := storageTransformer.Execute(row)
|
||||||
if executeErr == nil {
|
if executeErr == nil || executeErr == utils.ErrRowExists {
|
||||||
storageWatcher.deleteRow(row.Id)
|
storageWatcher.deleteRow(row.Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -109,6 +109,18 @@ var _ = Describe("Storage Watcher", func() {
|
|||||||
close(done)
|
close(done)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("does not queue row if transformer execution fails because row already exists", func(done Done) {
|
||||||
|
mockTransformer.ExecuteErr = utils.ErrRowExists
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Hour)
|
||||||
|
|
||||||
|
Expect(<-errs).To(BeNil())
|
||||||
|
Consistently(func() bool {
|
||||||
|
return mockQueue.AddCalled
|
||||||
|
}).Should(BeFalse())
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
It("queues row for later processing if transformer execution fails", func(done Done) {
|
It("queues row for later processing if transformer execution fails", func(done Done) {
|
||||||
mockTransformer.ExecuteErr = fakes.FakeError
|
mockTransformer.ExecuteErr = fakes.FakeError
|
||||||
|
|
||||||
@ -187,6 +199,17 @@ var _ = Describe("Storage Watcher", func() {
|
|||||||
close(done)
|
close(done)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("deletes row from queue if transformer execution errors because row already exists", func(done Done) {
|
||||||
|
mockTransformer.ExecuteErr = utils.ErrRowExists
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
return mockQueue.DeletePassedId
|
||||||
|
}).Should(Equal(row.Id))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
It("logs error if deleting persisted row fails", func(done Done) {
|
It("logs error if deleting persisted row fails", func(done Done) {
|
||||||
mockQueue.DeleteErr = fakes.FakeError
|
mockQueue.DeleteErr = fakes.FakeError
|
||||||
tempFile, fileErr := ioutil.TempFile("", "log")
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
|
Loading…
Reference in New Issue
Block a user