From 8c4a4d6587ef92c880a33cf998df03a0e6837bc6 Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Wed, 10 Jul 2019 14:34:14 -0500 Subject: [PATCH] Handle different contract address format for geth vs csv --- libraries/shared/watcher/storage_watcher.go | 32 +- .../shared/watcher/storage_watcher_test.go | 371 ++++++++++++------ 2 files changed, 280 insertions(+), 123 deletions(-) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 7b2c5362..856aac44 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,6 +18,7 @@ package watcher import ( "fmt" + "github.com/ethereum/go-ethereum/crypto" "reflect" "time" @@ -33,6 +34,7 @@ import ( type StorageWatcher struct { db *postgres.DB + diffSource string StorageFetcher fetcher.IStorageFetcher Queue storage.IStorageQueue Transformers map[common.Address]transformer.StorageTransformer @@ -43,12 +45,17 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage queue := storage.NewStorageQueue(db) return StorageWatcher{ db: db, + diffSource: "csv", StorageFetcher: fetcher, Queue: queue, Transformers: transformers, } } +func (storageWatcher *StorageWatcher) SetStorageDiffSource(source string) { + storageWatcher.diffSource = source +} + func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { storageTransformer := initializer(storageWatcher.db) @@ -71,10 +78,29 @@ func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, err } } +func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) { + if storageWatcher.diffSource == "csv" { + storageTransformer, ok := storageWatcher.Transformers[contractAddress] + return storageTransformer, ok + } else if storageWatcher.diffSource == "geth" { + logrus.Debug("number of transformers", len(storageWatcher.Transformers)) + for address, t := range storageWatcher.Transformers { + keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:])) + if keccakOfTransformerAddress == contractAddress { + return t, true + } + } + + return nil, false + } + return nil, false +} + + func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { - storageTransformer, ok := storageWatcher.Transformers[row.Contract] + storageTransformer, ok := storageWatcher.getTransformer(row.Contract) if !ok { - // ignore rows from unwatched contracts + logrus.Debug("ignoring a row from an unwatched contract") return } executeErr := storageTransformer.Execute(row) @@ -93,7 +119,7 @@ func (storageWatcher StorageWatcher) processQueue() { logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr)) } for _, row := range rows { - storageTransformer, ok := storageWatcher.Transformers[row.Contract] + storageTransformer, ok := storageWatcher.getTransformer(row.Contract) if !ok { // delete row from queue if address no longer watched storageWatcher.deleteRow(row.Id) diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 6c12358d..2ccbec2c 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,6 +17,7 @@ package watcher_test import ( + "github.com/ethereum/go-ethereum/crypto" "io/ioutil" "os" "time" @@ -51,7 +52,8 @@ var _ = Describe("Storage Watcher", func() { mockFetcher *mocks.MockStorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer - row utils.StorageDiffRow + csvRow utils.StorageDiffRow + gethRow utils.StorageDiffRow rows chan utils.StorageDiffRow storageWatcher watcher.StorageWatcher ) @@ -63,7 +65,7 @@ var _ = Describe("Storage Watcher", func() { mockFetcher = mocks.NewMockStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{Address: address} - row = utils.StorageDiffRow{ + csvRow = utils.StorageDiffRow{ Id: 1337, Contract: address, BlockHash: common.HexToHash("0xfedcba9876543210"), @@ -71,6 +73,14 @@ var _ = Describe("Storage Watcher", func() { StorageKey: common.HexToHash("0xabcdef1234567890"), StorageValue: common.HexToHash("0x9876543210abcdef"), } + gethRow = utils.StorageDiffRow{ + Id: 1338, + Contract: common.BytesToAddress(crypto.Keccak256(address[:])), + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } }) It("logs error if fetching storage diffs fails", func(done Done) { @@ -92,153 +102,274 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - Describe("transforming new storage diffs", func() { - BeforeEach(func() { - mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + Describe("transforming new storage diffs from csv", func() { + Describe("where diff source is a csv file", func() { + BeforeEach(func() { + mockFetcher.RowsToReturn = []utils.StorageDiffRow{csvRow} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for recognized storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(csvRow)) + close(done) + }) + + It("queues row for later processing if transformer execution fails", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError + + go storageWatcher.Execute(rows, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiffRow { + return mockQueue.AddPassedRow + }).Should(Equal(csvRow)) + close(done) + }) + + It("logs error if queueing row fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) - It("executes transformer for recognized storage row", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Hour) + Describe("where diff source is geth RPC pub sub", func() { + BeforeEach(func() { + mockFetcher.RowsToReturn = []utils.StorageDiffRow{gethRow} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.SetStorageDiffSource("geth") + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - Eventually(func() utils.StorageDiffRow { - return mockTransformer.PassedRow - }).Should(Equal(row)) - close(done) - }) + It("executes transformer for recognized storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Hour) - It("queues row for later processing if transformer execution fails", func(done Done) { - mockTransformer.ExecuteErr = fakes.FakeError + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(gethRow)) + close(done) + }) - go storageWatcher.Execute(rows, errs, time.Hour) + It("queues row for later processing if transformer execution fails", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError - Expect(<-errs).To(BeNil()) - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() utils.StorageDiffRow { - return mockQueue.AddPassedRow - }).Should(Equal(row)) - close(done) - }) + go storageWatcher.Execute(rows, errs, time.Hour) - It("logs error if queueing row fails", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiffRow { + return mockQueue.AddPassedRow + }).Should(Equal(gethRow)) + close(done) + }) - go storageWatcher.Execute(rows, errs, time.Hour) + It("logs error if queueing row fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) }) Describe("transforming queued storage diffs", func() { - BeforeEach(func() { - mockQueue.RowsToReturn = []utils.StorageDiffRow{row} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + Describe("where diff source is a csv file", func() { + BeforeEach(func() { + mockQueue.RowsToReturn = []utils.StorageDiffRow{csvRow} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(csvRow)) + close(done) + }) + + It("deletes row from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(csvRow.Id)) + close(done) + }) + + It("logs error if deleting persisted row fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("deletes obsolete row from queue if contract not recognized", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: csvRow.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteRow.Id)) + close(done) + }) + + It("logs error if deleting obsolete row fails", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: csvRow.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) - It("logs error if getting queued storage fails", func(done Done) { - mockQueue.GetAllErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) + Describe("where diff source is geth RPC pub sub", func() { + BeforeEach(func() { + mockQueue.RowsToReturn = []utils.StorageDiffRow{gethRow} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.SetStorageDiffSource("geth") + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("executes transformer for storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(gethRow)) + close(done) + }) - It("executes transformer for storage row", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("deletes row from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) - Eventually(func() utils.StorageDiffRow { - return mockTransformer.PassedRow - }).Should(Equal(row)) - close(done) - }) + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(gethRow.Id)) + close(done) + }) - It("deletes row from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("logs error if deleting persisted row fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(row.Id)) - close(done) - }) + go storageWatcher.Execute(rows, errs, time.Nanosecond) - It("logs error if deleting persisted row fails", func(done Done) { - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("deletes obsolete row from queue if contract not recognized", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: gethRow.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) + go storageWatcher.Execute(rows, errs, time.Nanosecond) - It("deletes obsolete row from queue if contract not recognized", func(done Done) { - obsoleteRow := utils.StorageDiffRow{ - Id: row.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteRow.Id)) + close(done) + }) - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("logs error if deleting obsolete row fails", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: gethRow.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(obsoleteRow.Id)) - close(done) - }) + go storageWatcher.Execute(rows, errs, time.Nanosecond) - It("logs error if deleting obsolete row fails", func(done Done) { - obsoleteRow := utils.StorageDiffRow{ - Id: row.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(rows, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) }) - }) })