diff --git a/cmd/execute.go b/cmd/execute.go index 160355b3..f3eea0f3 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -27,6 +27,7 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" @@ -168,7 +169,9 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { - w.Execute() + errs := make(chan error) + rows := make(chan storageUtils.StorageDiffRow) + w.Execute(rows, errs, queueRecheckInterval) } } diff --git a/cmd/root.go b/cmd/root.go index f322867e..7eee8793 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,8 +49,9 @@ var ( ) const ( - pollingInterval = 7 * time.Second - validationWindow = 15 + pollingInterval = 7 * time.Second + queueRecheckInterval = 5 * time.Minute + validationWindow = 15 ) var rootCmd = &cobra.Command{ diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go index 1323f9d4..f5d39dcd 100644 --- a/libraries/shared/fetcher/storage_fetcher_test.go +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -30,39 +30,35 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) }) - It("adds error to errors channel if tailing file fails", func() { + It("adds error to errors channel if tailing file fails", func(done Done) { mockTailer.TailErr = fakes.FakeError go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) - close(mockTailer.Lines) - returnedErr := <-errorsChannel - Expect(returnedErr).To(HaveOccurred()) - Expect(returnedErr).To(MatchError(fakes.FakeError)) + Expect(<-errorsChannel).To(MatchError(fakes.FakeError)) + close(done) }) - It("adds parsed csv row to rows channel for storage diff", func() { + It("adds parsed csv row to rows channel for storage diff", func(done Done) { line := getFakeLine() go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) mockTailer.Lines <- line - close(mockTailer.Lines) - returnedRow := <-rowsChannel expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) Expect(err).NotTo(HaveOccurred()) - Expect(expectedRow).To(Equal(returnedRow)) + Expect(<-rowsChannel).To(Equal(expectedRow)) + close(done) }) - It("adds error to errors channel if parsing csv fails", func() { + It("adds error to errors channel if parsing csv fails", func(done Done) { line := &tail.Line{Text: "invalid"} go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) mockTailer.Lines <- line - close(mockTailer.Lines) - returnedErr := <-errorsChannel - Expect(returnedErr).To(HaveOccurred()) + Expect(<-errorsChannel).To(HaveOccurred()) + close(done) }) }) diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index ef555252..27c61b20 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -12,12 +12,12 @@ func NewMockStorageFetcher() *MockStorageFetcher { } func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + defer close(out) + defer close(errs) for _, err := range fetcher.ErrsToReturn { errs <- err } for _, row := range fetcher.RowsToReturn { out <- row } - close(out) - close(errs) } diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index 729219a8..e667ec57 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -21,21 +21,26 @@ import ( ) type MockStorageQueue struct { - AddCalled bool - AddError error - PassedRow utils.StorageDiffRow + AddCalled bool + AddError error + AddPassedRow utils.StorageDiffRow + DeleteErr error + DeletePassedId int + GetAllErr error + RowsToReturn []utils.StorageDiffRow } func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { queue.AddCalled = true - queue.PassedRow = row + queue.AddPassedRow = row return queue.AddError } func (queue *MockStorageQueue) Delete(id int) error { - panic("implement me") + queue.DeletePassedId = id + return queue.DeleteErr } func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) { - panic("implement me") + return queue.RowsToReturn, queue.GetAllErr } diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index ca05f234..81f988c0 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -4,10 +4,10 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/test_config" ) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index e8dda2f1..fdc773fa 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -17,7 +17,9 @@ package watcher import ( + "fmt" "reflect" + "time" "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" @@ -47,29 +49,30 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage } } -func (watcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { +func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { - storageTransformer := initializer(watcher.db) - watcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer + storageTransformer := initializer(storageWatcher.db) + storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer } } -func (watcher StorageWatcher) Execute() error { - rows := make(chan utils.StorageDiffRow) - errs := make(chan error) - go watcher.StorageFetcher.FetchStorageDiffs(rows, errs) +func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, errs chan error, queueRecheckInterval time.Duration) { + ticker := time.NewTicker(queueRecheckInterval) + go storageWatcher.StorageFetcher.FetchStorageDiffs(rows, errs) for { select { + case fetchErr := <-errs: + logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) case row := <-rows: - watcher.processRow(row) - case err := <-errs: - return err + storageWatcher.processRow(row) + case <-ticker.C: + storageWatcher.processQueue() } } } -func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { - storageTransformer, ok := watcher.Transformers[row.Contract] +func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { + storageTransformer, ok := storageWatcher.Transformers[row.Contract] if !ok { // ignore rows from unwatched contracts return @@ -77,16 +80,42 @@ func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { executeErr := storageTransformer.Execute(row) if executeErr != nil { if isKeyNotFound(executeErr) { - queueErr := watcher.Queue.Add(row) + queueErr := storageWatcher.Queue.Add(row) if queueErr != nil { - logrus.Warn(queueErr.Error()) + logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr)) } } else { - logrus.Warn(executeErr.Error()) + logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) } } } +func (storageWatcher StorageWatcher) processQueue() { + rows, fetchErr := storageWatcher.Queue.GetAll() + if fetchErr != nil { + logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr)) + } + for _, row := range rows { + storageTransformer, ok := storageWatcher.Transformers[row.Contract] + if !ok { + // delete row from queue if address no longer watched + storageWatcher.deleteRow(row.Id) + continue + } + executeErr := storageTransformer.Execute(row) + if executeErr == nil { + storageWatcher.deleteRow(row.Id) + } + } +} + +func (storageWatcher StorageWatcher) deleteRow(id int) { + deleteErr := storageWatcher.Queue.Delete(id) + if deleteErr != nil { + logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr)) + } +} + func isKeyNotFound(executeErr error) bool { return reflect.TypeOf(executeErr) == reflect.TypeOf(utils.ErrStorageKeyNotFound{}) } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index bbafa75a..3dfef667 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -19,6 +19,7 @@ package watcher_test import ( "io/ioutil" "os" + "time" "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" @@ -46,78 +47,214 @@ var _ = Describe("Storage Watcher", func() { Describe("executing watcher", func() { var ( + errs chan error mockFetcher *mocks.MockStorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer row utils.StorageDiffRow + rows chan utils.StorageDiffRow storageWatcher watcher.StorageWatcher ) BeforeEach(func() { + errs = make(chan error) + rows = make(chan utils.StorageDiffRow) address := common.HexToAddress("0x0123456789abcdef") mockFetcher = mocks.NewMockStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{Address: address} row = utils.StorageDiffRow{ + Id: 1337, Contract: address, BlockHash: common.HexToHash("0xfedcba9876543210"), BlockHeight: 0, StorageKey: common.HexToHash("0xabcdef1234567890"), StorageValue: common.HexToHash("0x9876543210abcdef"), } - mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + }) + + It("logs error if fetching storage diffs fails", func(done Done) { + mockFetcher.ErrsToReturn = []error{fakes.FakeError} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for recognized storage row", func() { - err := storageWatcher.Execute() - - Expect(err).NotTo(HaveOccurred()) - Expect(mockTransformer.PassedRow).To(Equal(row)) - }) - - It("queues row for later processing if row's key not recognized", func() { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - - err := storageWatcher.Execute() - - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - Expect(mockQueue.PassedRow).To(Equal(row)) - }) - - It("logs error if queueing row fails", func() { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - err := storageWatcher.Execute() + go storageWatcher.Execute(rows, errs, time.Hour) - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) }) - It("logs error if transformer execution fails for reason other than key not found", func() { - mockTransformer.ExecuteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) + Describe("transforming new storage diffs", func() { + BeforeEach(func() { + mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - err := storageWatcher.Execute() + It("executes transformer for recognized storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Hour) - Expect(err).NotTo(HaveOccurred()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(row)) + close(done) + }) + + It("queues row for later processing if row's key not recognized", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + + go storageWatcher.Execute(rows, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiffRow { + return mockQueue.AddPassedRow + }).Should(Equal(row)) + close(done) + }) + + It("logs error if queueing row fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("logs error if transformer execution fails for reason other than key not found", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) + + Describe("transforming queued storage diffs", func() { + BeforeEach(func() { + mockQueue.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("logs error if getting queued storage fails", func(done Done) { + mockQueue.GetAllErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("executes transformer for storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(row)) + close(done) + }) + + It("deletes row from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(row.Id)) + close(done) + }) + + It("logs error if deleting persisted row fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("deletes obsolete row from queue if contract not recognized", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: row.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteRow.Id)) + close(done) + }) + + It("logs error if deleting obsolete row fails", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: row.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + }) })