diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index ff7581b4..945b9cc0 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -188,8 +188,7 @@ func composeAndExecute() { stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) - sw := watcher.NewStorageWatcher(&storageFetcher, &db) - sw.SetStorageDiffSource("geth") + sw := watcher.NewGethStorageWatcher(&storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) @@ -197,7 +196,7 @@ func composeAndExecute() { log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw := watcher.NewCsvStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/cmd/execute.go b/cmd/execute.go index 61943124..aeffe1ce 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -132,8 +132,7 @@ func execute() { stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) - sw := watcher.NewStorageWatcher(&storageFetcher, &db) - sw.SetStorageDiffSource("geth") + sw := watcher.NewGethStorageWatcher(&storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) @@ -141,7 +140,7 @@ func execute() { log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw := watcher.NewCsvStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) @@ -183,7 +182,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { } } -func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { +func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the StorageTransformerInitializer set using the storage watcher LogWithCommand.Info("executing storage transformers") @@ -191,8 +190,8 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { defer ticker.Stop() for range ticker.C { errs := make(chan error) - rows := make(chan storageUtils.StorageDiffRow) - w.Execute(rows, errs, queueRecheckInterval) + diffs := make(chan storageUtils.StorageDiff) + w.Execute(diffs, errs, queueRecheckInterval) } } diff --git a/libraries/shared/watcher/csv_storage_watcher.go b/libraries/shared/watcher/csv_storage_watcher.go new file mode 100644 index 00000000..a08fc938 --- /dev/null +++ b/libraries/shared/watcher/csv_storage_watcher.go @@ -0,0 +1,45 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watcher + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type CsvStorageWatcher struct { + StorageWatcher +} + +func NewCsvStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) CsvStorageWatcher { + queue := storage.NewStorageQueue(db) + transformers := make(map[common.Address]transformer.StorageTransformer) + storageWatcher := StorageWatcher{ + db: db, + StorageFetcher: fetcher, + Queue: queue, + Transformers: transformers, + } + storageWatcher.transformerGetter = storageWatcher.getCsvTransformer + return CsvStorageWatcher{StorageWatcher: storageWatcher} +} + +func (storageWatcher StorageWatcher) getCsvTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) { + storageTransformer, ok := storageWatcher.Transformers[contractAddress] + return storageTransformer, ok +} diff --git a/libraries/shared/watcher/csv_storage_watcher_test.go b/libraries/shared/watcher/csv_storage_watcher_test.go new file mode 100644 index 00000000..f8617cdf --- /dev/null +++ b/libraries/shared/watcher/csv_storage_watcher_test.go @@ -0,0 +1,228 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package watcher_test + +import ( + "io/ioutil" + "os" + "time" + + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Csv Storage Watcher", func() { + It("adds transformers", func() { + fakeAddress := common.HexToAddress("0x12345") + fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} + w := watcher.NewCsvStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) + + w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + + Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) + }) + + Describe("executing watcher", func() { + var ( + errs chan error + mockFetcher *mocks.MockStorageFetcher + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + csvDiff utils.StorageDiff + diffs chan utils.StorageDiff + storageWatcher watcher.CsvStorageWatcher + address common.Address + ) + + BeforeEach(func() { + errs = make(chan error) + diffs = make(chan utils.StorageDiff) + address = common.HexToAddress("0x0123456789abcdef") + mockFetcher = mocks.NewMockStorageFetcher() + mockQueue = &mocks.MockStorageQueue{} + mockTransformer = &mocks.MockStorageTransformer{Address: address} + csvDiff = utils.StorageDiff{ + Id: 1337, + Contract: address, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + }) + + It("logs error if fetching storage diffs fails", func(done Done) { + mockFetcher.ErrsToReturn = []error{fakes.FakeError} + storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + Describe("transforming new storage diffs from csv", func() { + BeforeEach(func() { + mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} + storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for recognized storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(csvDiff)) + close(done) + }) + + It("queues diff for later processing if transformer execution fails", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiff { + return mockQueue.AddPassedDiff + }).Should(Equal(csvDiff)) + close(done) + }) + + It("logs error if queueing diff fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + + Describe("transforming queued storage diffs", func() { + BeforeEach(func() { + mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} + storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(csvDiff)) + close(done) + }) + + It("deletes diff from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(csvDiff.Id)) + close(done) + }) + + It("logs error if deleting persisted diff fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("deletes obsolete diff from queue if contract not recognized", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: csvDiff.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteDiff.Id)) + close(done) + }) + + It("logs error if deleting obsolete diff fails", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: csvDiff.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + }) +}) diff --git a/libraries/shared/watcher/geth_storage_watcher.go b/libraries/shared/watcher/geth_storage_watcher.go new file mode 100644 index 00000000..58076590 --- /dev/null +++ b/libraries/shared/watcher/geth_storage_watcher.go @@ -0,0 +1,60 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watcher + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type GethStorageWatcher struct { + StorageWatcher +} + +func NewGethStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) GethStorageWatcher { + queue := storage.NewStorageQueue(db) + transformers := make(map[common.Address]transformer.StorageTransformer) + keccakAddressTransformers := make(map[common.Address]transformer.StorageTransformer) + storageWatcher := StorageWatcher{ + db: db, + StorageFetcher: fetcher, + Queue: queue, + Transformers: transformers, + KeccakAddressTransformers: keccakAddressTransformers, + } + storageWatcher.transformerGetter = storageWatcher.getTransformerForGethWatcher + return GethStorageWatcher{StorageWatcher: storageWatcher} +} + +func (storageWatcher StorageWatcher) getTransformerForGethWatcher(contractAddress common.Address) (transformer.StorageTransformer, bool) { + storageTransformer, ok := storageWatcher.KeccakAddressTransformers[contractAddress] + if ok { + return storageTransformer, ok + } else { + for address, transformer := range storageWatcher.Transformers { + keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:])) + if keccakOfTransformerAddress == contractAddress { + storageWatcher.KeccakAddressTransformers[contractAddress] = transformer + return transformer, true + } + } + } + + return nil, false +} diff --git a/libraries/shared/watcher/geth_storage_watcher_test.go b/libraries/shared/watcher/geth_storage_watcher_test.go new file mode 100644 index 00000000..99eee605 --- /dev/null +++ b/libraries/shared/watcher/geth_storage_watcher_test.go @@ -0,0 +1,271 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package watcher_test + +import ( + "github.com/ethereum/go-ethereum/crypto" + "io/ioutil" + "os" + "time" + + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Geth Storage Watcher", func() { + It("adds transformers", func() { + fakeAddress := common.HexToAddress("0x12345") + fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} + w := watcher.NewGethStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) + + w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + + Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) + }) + + Describe("executing watcher", func() { + var ( + errs chan error + mockFetcher *mocks.MockStorageFetcher + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + gethDiff utils.StorageDiff + diffs chan utils.StorageDiff + storageWatcher watcher.GethStorageWatcher + address common.Address + keccakOfAddress common.Address + ) + + BeforeEach(func() { + errs = make(chan error) + diffs = make(chan utils.StorageDiff) + address = common.HexToAddress("0x0123456789abcdef") + keccakOfAddress = common.BytesToAddress(crypto.Keccak256(address[:])) + mockFetcher = mocks.NewMockStorageFetcher() + mockQueue = &mocks.MockStorageQueue{} + mockTransformer = &mocks.MockStorageTransformer{Address: address} + gethDiff = utils.StorageDiff{ + Id: 1338, + Contract: keccakOfAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + }) + + It("logs error if fetching storage diffs fails", func(done Done) { + mockFetcher.ErrsToReturn = []error{fakes.FakeError} + storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + Describe("transforming new storage diffs", func() { + BeforeEach(func() { + mockFetcher.DiffsToReturn = []utils.StorageDiff{gethDiff} + storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.SetStorageDiffSource("geth") + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for recognized storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(gethDiff)) + close(done) + }) + + It("queues diff for later processing if transformer execution fails", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiff { + return mockQueue.AddPassedDiff + }).Should(Equal(gethDiff)) + close(done) + }) + + It("logs error if queueing diff fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("keeps track transformers by the keccak256 hash of their contract address ", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Hour) + + m := make(map[common.Address]transformer.StorageTransformer) + m[keccakOfAddress] = mockTransformer + + Eventually(func() map[common.Address]transformer.StorageTransformer { + return storageWatcher.KeccakAddressTransformers + }).Should(Equal(m)) + + close(done) + }) + + It("gets the transformer from the known keccak address map first", func(done Done) { + anotherAddress := common.HexToAddress("0xafakeaddress") + anotherTransformer := &mocks.MockStorageTransformer{Address: anotherAddress} + keccakOfAnotherAddress := common.BytesToAddress(crypto.Keccak256(anotherAddress[:])) + + anotherGethDiff := utils.StorageDiff{ + Id: 1338, + Contract: keccakOfAnotherAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + mockFetcher.DiffsToReturn = []utils.StorageDiff{anotherGethDiff} + storageWatcher.KeccakAddressTransformers[keccakOfAnotherAddress] = anotherTransformer + + go storageWatcher.Execute(diffs, errs, time.Hour) + + Eventually(func() utils.StorageDiff { + return anotherTransformer.PassedDiff + }).Should(Equal(anotherGethDiff)) + + close(done) + }) + }) + + Describe("transforming queued storage diffs", func() { + BeforeEach(func() { + mockQueue.DiffsToReturn = []utils.StorageDiff{gethDiff} + storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.SetStorageDiffSource("geth") + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(gethDiff)) + close(done) + }) + + It("deletes diff from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(gethDiff.Id)) + close(done) + }) + + It("logs error if deleting persisted diff fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("deletes obsolete diff from queue if contract not recognized", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: gethDiff.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteDiff.Id)) + close(done) + }) + + It("logs error if deleting obsolete diff fails", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: gethDiff.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + }) +}) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 4ac2a3cf..9d38cb91 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,7 +18,6 @@ package watcher import ( "fmt" - "github.com/ethereum/go-ethereum/crypto" "reflect" "time" @@ -32,31 +31,18 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +type IStorageWatcher interface { + AddTransformers(initializers []transformer.StorageTransformerInitializer) + Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) +} + type StorageWatcher struct { db *postgres.DB - diffSource string StorageFetcher fetcher.IStorageFetcher Queue storage.IStorageQueue Transformers map[common.Address]transformer.StorageTransformer KeccakAddressTransformers map[common.Address]transformer.StorageTransformer // keccak hash of an address => transformer -} - -func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { - queue := storage.NewStorageQueue(db) - transformers := make(map[common.Address]transformer.StorageTransformer) - keccakAddressTransformers := make(map[common.Address]transformer.StorageTransformer) - return StorageWatcher{ - db: db, - diffSource: "csv", - StorageFetcher: fetcher, - Queue: queue, - Transformers: transformers, - KeccakAddressTransformers: keccakAddressTransformers, - } -} - -func (storageWatcher *StorageWatcher) SetStorageDiffSource(source string) { - storageWatcher.diffSource = source + transformerGetter func(common.Address) (transformer.StorageTransformer, bool) } func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { @@ -82,26 +68,7 @@ func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, e } func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) { - if storageWatcher.diffSource == "csv" { - storageTransformer, ok := storageWatcher.Transformers[contractAddress] - return storageTransformer, ok - } else if storageWatcher.diffSource == "geth" { - storageTransformer, ok := storageWatcher.KeccakAddressTransformers[contractAddress] - if ok { - return storageTransformer, ok - } else { - for address, transformer := range storageWatcher.Transformers { - keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:])) - if keccakOfTransformerAddress == contractAddress { - storageWatcher.KeccakAddressTransformers[contractAddress] = transformer - return transformer, true - } - } - } - - return nil, false - } - return nil, false + return storageWatcher.transformerGetter(contractAddress) } func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go deleted file mode 100644 index 6e54fabb..00000000 --- a/libraries/shared/watcher/storage_watcher_test.go +++ /dev/null @@ -1,416 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package watcher_test - -import ( - "github.com/ethereum/go-ethereum/crypto" - "io/ioutil" - "os" - "time" - - "github.com/ethereum/go-ethereum/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/sirupsen/logrus" - - "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" - "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/test_config" -) - -var _ = Describe("Storage Watcher", func() { - It("adds transformers", func() { - fakeAddress := common.HexToAddress("0x12345") - fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) - - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) - }) - - Describe("executing watcher", func() { - var ( - errs chan error - mockFetcher *mocks.MockStorageFetcher - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - csvDiff utils.StorageDiff - gethDiff utils.StorageDiff - diffs chan utils.StorageDiff - storageWatcher watcher.StorageWatcher - address common.Address - keccakOfAddress common.Address - ) - - BeforeEach(func() { - errs = make(chan error) - diffs = make(chan utils.StorageDiff) - address = common.HexToAddress("0x0123456789abcdef") - keccakOfAddress = common.BytesToAddress(crypto.Keccak256(address[:])) - mockFetcher = mocks.NewMockStorageFetcher() - mockQueue = &mocks.MockStorageQueue{} - mockTransformer = &mocks.MockStorageTransformer{Address: address} - csvDiff = utils.StorageDiff{ - Id: 1337, - Contract: address, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } - gethDiff = utils.StorageDiff{ - Id: 1338, - Contract: keccakOfAddress, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } - }) - - It("logs error if fetching storage diffs fails", func(done Done) { - mockFetcher.ErrsToReturn = []error{fakes.FakeError} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - Describe("transforming new storage diffs from csv", func() { - Describe("where diff source is a csv file", func() { - BeforeEach(func() { - mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for recognized storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(csvDiff)) - close(done) - }) - - It("queues diff for later processing if transformer execution fails", func(done Done) { - mockTransformer.ExecuteErr = fakes.FakeError - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Expect(<-errs).To(BeNil()) - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() utils.StorageDiff { - return mockQueue.AddPassedDiff - }).Should(Equal(csvDiff)) - close(done) - }) - - It("logs error if queueing diff fails", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - }) - - Describe("where diff source is geth RPC pub sub", func() { - BeforeEach(func() { - mockFetcher.DiffsToReturn = []utils.StorageDiff{gethDiff} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.SetStorageDiffSource("geth") - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for recognized storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("queues diff for later processing if transformer execution fails", func(done Done) { - mockTransformer.ExecuteErr = fakes.FakeError - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Expect(<-errs).To(BeNil()) - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() utils.StorageDiff { - return mockQueue.AddPassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("logs error if queueing diff fails", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("keeps track transformers by the keccak256 hash of their contract address ", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) - - m := make(map[common.Address]transformer.StorageTransformer) - m[keccakOfAddress] = mockTransformer - - Eventually(func() map[common.Address]transformer.StorageTransformer { - return storageWatcher.KeccakAddressTransformers - }).Should(Equal(m)) - - close(done) - }) - - It("gets the transformer from the known keccak address map first", func(done Done) { - anotherAddress := common.HexToAddress("0xafakeaddress") - anotherTransformer := &mocks.MockStorageTransformer{Address: anotherAddress} - keccakOfAnotherAddress := common.BytesToAddress(crypto.Keccak256(anotherAddress[:])) - - anotherGethDiff := utils.StorageDiff{ - Id: 1338, - Contract: keccakOfAnotherAddress, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } - mockFetcher.DiffsToReturn = []utils.StorageDiff{anotherGethDiff} - storageWatcher.KeccakAddressTransformers[keccakOfAnotherAddress] = anotherTransformer - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() utils.StorageDiff { - return anotherTransformer.PassedDiff - }).Should(Equal(anotherGethDiff)) - - close(done) - }) - }) - }) - - Describe("transforming queued storage diffs", func() { - Describe("where diff source is a csv file", func() { - BeforeEach(func() { - mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(csvDiff)) - close(done) - }) - - It("deletes diff from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(csvDiff.Id)) - close(done) - }) - - It("logs error if deleting persisted diff fails", func(done Done) { - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("deletes obsolete diff from queue if contract not recognized", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: csvDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(obsoleteDiff.Id)) - close(done) - }) - - It("logs error if deleting obsolete diff fails", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: csvDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - }) - - Describe("where diff source is geth RPC pub sub", func() { - BeforeEach(func() { - mockQueue.DiffsToReturn = []utils.StorageDiff{gethDiff} - storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.SetStorageDiffSource("geth") - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("deletes diff from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(gethDiff.Id)) - close(done) - }) - - It("logs error if deleting persisted diff fails", func(done Done) { - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("deletes obsolete diff from queue if contract not recognized", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: gethDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(obsoleteDiff.Id)) - close(done) - }) - - It("logs error if deleting obsolete diff fails", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: gethDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - }) - }) - }) -})