Remove handling of duplicate storage diffs in watcher

- Can push this responsibility down to the transformers
- Update docs to reflect that transformers should handle duplicates
This commit is contained in:
Rob Mulholand 2019-05-15 10:12:38 -05:00
parent 79765c7998
commit e2909797fc
6 changed files with 5 additions and 30 deletions

View File

@ -193,5 +193,5 @@ func composeAndExecute() {
func init() { func init() {
rootCmd.AddCommand(composeAndExecuteCmd) rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)")
} }

View File

@ -139,7 +139,7 @@ func execute() {
func init() { func init() {
rootCmd.AddCommand(executeCmd) rootCmd.AddCommand(executeCmd)
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)")
} }
type Exporter interface { type Exporter interface {

View File

@ -100,6 +100,7 @@ type Repository interface {
A contract-specific implementation of the repository interface enables the transformer to write the decoded storage value to the appropriate table in postgres. A contract-specific implementation of the repository interface enables the transformer to write the decoded storage value to the appropriate table in postgres.
The `Create` function is expected to recognize and persist a given storage value by the variable's name, as indicated on the row's metadata. The `Create` function is expected to recognize and persist a given storage value by the variable's name, as indicated on the row's metadata.
Note: we advise silently discarding duplicates in `Create` - as it's possible that you may read the same diff several times, and an error will trigger the storage watcher to queue that diff for later processing.
The `SetDB` function is required for the repository to connect to the database. The `SetDB` function is required for the repository to connect to the database.

View File

@ -17,12 +17,9 @@
package utils package utils
import ( import (
"errors"
"fmt" "fmt"
) )
var ErrRowExists = errors.New("parsed row for storage diff already exists")
type ErrContractNotFound struct { type ErrContractNotFound struct {
Contract string Contract string
} }

View File

@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
return return
} }
executeErr := storageTransformer.Execute(row) executeErr := storageTransformer.Execute(row)
if executeErr != nil && executeErr != utils.ErrRowExists { if executeErr != nil {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(row) queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil { if queueErr != nil {
@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() {
continue continue
} }
executeErr := storageTransformer.Execute(row) executeErr := storageTransformer.Execute(row)
if executeErr == nil || executeErr == utils.ErrRowExists { if executeErr == nil {
storageWatcher.deleteRow(row.Id) storageWatcher.deleteRow(row.Id)
} }
} }

View File

@ -109,18 +109,6 @@ var _ = Describe("Storage Watcher", func() {
close(done) close(done)
}) })
It("does not queue row if transformer execution fails because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(<-errs).To(BeNil())
Consistently(func() bool {
return mockQueue.AddCalled
}).Should(BeFalse())
close(done)
})
It("queues row for later processing if transformer execution fails", func(done Done) { It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError mockTransformer.ExecuteErr = fakes.FakeError
@ -199,17 +187,6 @@ var _ = Describe("Storage Watcher", func() {
close(done) close(done)
}) })
It("deletes row from queue if transformer execution errors because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(row.Id))
close(done)
})
It("logs error if deleting persisted row fails", func(done Done) { It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")