Queue storage diffs if transformer execution fails

- For any error, not just if key isn't recognized
- Means we don't lose track of diffs on random ephemeral errors
This commit is contained in:
Rob Mulholand 2019-04-29 14:20:57 -05:00
parent 76ab914bdc
commit b036053937
2 changed files with 6 additions and 25 deletions

View File

@ -79,13 +79,10 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
}
executeErr := storageTransformer.Execute(row)
if executeErr != nil {
if isKeyNotFound(executeErr) {
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr))
}
} else {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr))
}
}
}

View File

@ -109,8 +109,8 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})
It("queues row for later processing if row's key not recognized", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(rows, errs, time.Hour)
@ -143,22 +143,6 @@ var _ = Describe("Storage Watcher", func() {
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("logs error if transformer execution fails for reason other than key not found", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Hour)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
Describe("transforming queued storage diffs", func() {