From e2909797fc0129bb88cc7f8101604ad19461d38c Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 15 May 2019 10:12:38 -0500 Subject: [PATCH] Remove handling of duplicate storage diffs in watcher - Can push this responsibility down to the transformers - Update docs to reflect that transformers should handle duplicates --- cmd/composeAndExecute.go | 2 +- cmd/execute.go | 2 +- libraries/shared/factories/storage/README.md | 1 + libraries/shared/storage/utils/errors.go | 3 --- libraries/shared/watcher/storage_watcher.go | 4 ++-- .../shared/watcher/storage_watcher_test.go | 23 ------------------- 6 files changed, 5 insertions(+), 30 deletions(-) diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 1efb887b..47a8ac74 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -193,5 +193,5 @@ func composeAndExecute() { func init() { rootCmd.AddCommand(composeAndExecuteCmd) composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") - composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") + composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)") } diff --git a/cmd/execute.go b/cmd/execute.go index 6579ffde..e8b5e513 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -139,7 +139,7 @@ func execute() { func init() { rootCmd.AddCommand(executeCmd) executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") - executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs") + executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)") } type Exporter interface { diff --git a/libraries/shared/factories/storage/README.md b/libraries/shared/factories/storage/README.md index cd8dc792..e22eb31b 100644 --- a/libraries/shared/factories/storage/README.md +++ b/libraries/shared/factories/storage/README.md @@ -100,6 +100,7 @@ type Repository interface { A contract-specific implementation of the repository interface enables the transformer to write the decoded storage value to the appropriate table in postgres. The `Create` function is expected to recognize and persist a given storage value by the variable's name, as indicated on the row's metadata. +Note: we advise silently discarding duplicates in `Create` - as it's possible that you may read the same diff several times, and an error will trigger the storage watcher to queue that diff for later processing. The `SetDB` function is required for the repository to connect to the database. diff --git a/libraries/shared/storage/utils/errors.go b/libraries/shared/storage/utils/errors.go index bf2bcc4a..0c87c642 100644 --- a/libraries/shared/storage/utils/errors.go +++ b/libraries/shared/storage/utils/errors.go @@ -17,12 +17,9 @@ package utils import ( - "errors" "fmt" ) -var ErrRowExists = errors.New("parsed row for storage diff already exists") - type ErrContractNotFound struct { Contract string } diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index ac5b8a8d..7b2c5362 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { return } executeErr := storageTransformer.Execute(row) - if executeErr != nil && executeErr != utils.ErrRowExists { + if executeErr != nil { logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) queueErr := storageWatcher.Queue.Add(row) if queueErr != nil { @@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() { continue } executeErr := storageTransformer.Execute(row) - if executeErr == nil || executeErr == utils.ErrRowExists { + if executeErr == nil { storageWatcher.deleteRow(row.Id) } } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index e3d804d9..6c12358d 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -109,18 +109,6 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - It("does not queue row if transformer execution fails because row already exists", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrRowExists - - go storageWatcher.Execute(rows, errs, time.Hour) - - Expect(<-errs).To(BeNil()) - Consistently(func() bool { - return mockQueue.AddCalled - }).Should(BeFalse()) - close(done) - }) - It("queues row for later processing if transformer execution fails", func(done Done) { mockTransformer.ExecuteErr = fakes.FakeError @@ -199,17 +187,6 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - It("deletes row from queue if transformer execution errors because row already exists", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrRowExists - - go storageWatcher.Execute(rows, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(row.Id)) - close(done) - }) - It("logs error if deleting persisted row fails", func(done Done) { mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log")