From 2d684c5aecb6c4cf408bba8d78e6071ba44ac7c7 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 24 Apr 2019 15:05:57 -0500 Subject: [PATCH 1/7] Extract storage diff fetching behind an interface - Replaces directly reading from a CSV - Simplifies testing - Should hopefully make it easier to plug in other sources for storage diffs (e.g. differently formatted CSVs, JSON RPC, etc) --- cmd/composeAndExecute.go | 12 +- cmd/execute.go | 4 +- libraries/shared/fetcher/log_fetcher.go | 12 +- libraries/shared/fetcher/log_fetcher_test.go | 12 +- libraries/shared/fetcher/storage_fetcher.go | 33 +++ .../shared/fetcher/storage_fetcher_test.go | 81 +++++++ libraries/shared/mocks/converter.go | 4 +- libraries/shared/mocks/storage_fetcher.go | 23 ++ libraries/shared/mocks/storage_queue.go | 2 + ...er_repository.go => watcher_repository.go} | 0 libraries/shared/watcher/contract_watcher.go | 12 +- libraries/shared/watcher/event_watcher.go | 4 +- libraries/shared/watcher/storage_watcher.go | 71 +++--- .../shared/watcher/storage_watcher_test.go | 223 ++++++------------ pkg/fakes/mock_tailer.go | 10 +- 15 files changed, 279 insertions(+), 224 deletions(-) create mode 100644 libraries/shared/fetcher/storage_fetcher.go create mode 100644 libraries/shared/fetcher/storage_fetcher_test.go create mode 100644 libraries/shared/mocks/storage_fetcher.go rename libraries/shared/mocks/{mock_watcher_repository.go => watcher_repository.go} (100%) diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index d2abd045..1c5ac10a 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -16,16 +16,19 @@ package cmd import ( + "os" + "plugin" + syn "sync" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/utils" - "os" - "plugin" - syn "sync" ) // composeAndExecuteCmd represents the composeAndExecute command @@ -170,7 +173,8 @@ func composeAndExecute() { if len(ethStorageInitializers) > 0 { tailer := fs.FileTailer{Path: storageDiffsPath} - sw := watcher.NewStorageWatcher(tailer, &db) + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/cmd/execute.go b/cmd/execute.go index 3ebc15f1..160355b3 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" @@ -118,7 +119,8 @@ func execute() { if len(ethStorageInitializers) > 0 { tailer := fs.FileTailer{Path: storageDiffsPath} - sw := watcher.NewStorageWatcher(tailer, &db) + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/libraries/shared/fetcher/log_fetcher.go b/libraries/shared/fetcher/log_fetcher.go index a231b3de..a2b5f21a 100644 --- a/libraries/shared/fetcher/log_fetcher.go +++ b/libraries/shared/fetcher/log_fetcher.go @@ -24,22 +24,22 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -type LogFetcher interface { +type ILogFetcher interface { FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error) } -type Fetcher struct { +type LogFetcher struct { blockChain core.BlockChain } -func NewFetcher(blockchain core.BlockChain) *Fetcher { - return &Fetcher{ +func NewLogFetcher(blockchain core.BlockChain) *LogFetcher { + return &LogFetcher{ blockChain: blockchain, } } // Checks all topic0s, on all addresses, fetching matching logs for the given header -func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) { +func (logFetcher LogFetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) { blockHash := common.HexToHash(header.Hash) query := ethereum.FilterQuery{ BlockHash: &blockHash, @@ -48,7 +48,7 @@ func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Ha Topics: [][]common.Hash{topic0s}, } - logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query) + logs, err := logFetcher.blockChain.GetEthLogsWithCustomQuery(query) if err != nil { // TODO review aggregate fetching error handling return []types.Log{}, err diff --git a/libraries/shared/fetcher/log_fetcher_test.go b/libraries/shared/fetcher/log_fetcher_test.go index 1a41a5a3..8f9b36a7 100644 --- a/libraries/shared/fetcher/log_fetcher_test.go +++ b/libraries/shared/fetcher/log_fetcher_test.go @@ -22,16 +22,16 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - fetch "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" ) -var _ = Describe("Fetcher", func() { +var _ = Describe("LogFetcher", func() { Describe("FetchLogs", func() { It("fetches logs based on the given query", func() { blockChain := fakes.NewMockBlockChain() - fetcher := fetch.NewFetcher(blockChain) + logFetcher := fetcher.NewLogFetcher(blockChain) header := fakes.FakeHeader addresses := []common.Address{ @@ -41,7 +41,7 @@ var _ = Describe("Fetcher", func() { topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})} - _, err := fetcher.FetchLogs(addresses, topicZeros, header) + _, err := logFetcher.FetchLogs(addresses, topicZeros, header) address1 := common.HexToAddress("0xfakeAddress") address2 := common.HexToAddress("0xanotherFakeAddress") @@ -59,9 +59,9 @@ var _ = Describe("Fetcher", func() { It("returns an error if fetching the logs fails", func() { blockChain := fakes.NewMockBlockChain() blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) - fetcher := fetch.NewFetcher(blockChain) + logFetcher := fetcher.NewLogFetcher(blockChain) - _, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) + _, err := logFetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/fetcher/storage_fetcher.go b/libraries/shared/fetcher/storage_fetcher.go new file mode 100644 index 00000000..2115277b --- /dev/null +++ b/libraries/shared/fetcher/storage_fetcher.go @@ -0,0 +1,33 @@ +package fetcher + +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/fs" + "strings" +) + +type IStorageFetcher interface { + FetchStorageDiffs(chan<- utils.StorageDiffRow, chan<- error) +} + +type CsvTailStorageFetcher struct { + tailer fs.Tailer +} + +func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher { + return CsvTailStorageFetcher{tailer: tailer} +} + +func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + t, tailErr := storageFetcher.tailer.Tail() + if tailErr != nil { + errs <- tailErr + } + for line := range t.Lines { + row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) + if parseErr != nil { + errs <- parseErr + } + out <- row + } +} diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go new file mode 100644 index 00000000..1323f9d4 --- /dev/null +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -0,0 +1,81 @@ +package fetcher_test + +import ( + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/hpcloud/tail" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +var _ = Describe("Csv Tail Storage Fetcher", func() { + var ( + errorsChannel chan error + mockTailer *fakes.MockTailer + rowsChannel chan utils.StorageDiffRow + storageFetcher fetcher.CsvTailStorageFetcher + ) + + BeforeEach(func() { + errorsChannel = make(chan error) + rowsChannel = make(chan utils.StorageDiffRow) + mockTailer = fakes.NewMockTailer() + storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) + }) + + It("adds error to errors channel if tailing file fails", func() { + mockTailer.TailErr = fakes.FakeError + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + + close(mockTailer.Lines) + returnedErr := <-errorsChannel + Expect(returnedErr).To(HaveOccurred()) + Expect(returnedErr).To(MatchError(fakes.FakeError)) + }) + + It("adds parsed csv row to rows channel for storage diff", func() { + line := getFakeLine() + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + mockTailer.Lines <- line + + close(mockTailer.Lines) + returnedRow := <-rowsChannel + expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) + Expect(err).NotTo(HaveOccurred()) + Expect(expectedRow).To(Equal(returnedRow)) + }) + + It("adds error to errors channel if parsing csv fails", func() { + line := &tail.Line{Text: "invalid"} + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + mockTailer.Lines <- line + + close(mockTailer.Lines) + returnedErr := <-errorsChannel + Expect(returnedErr).To(HaveOccurred()) + }) +}) + +func getFakeLine() *tail.Line { + address := common.HexToAddress("0x1234567890abcdef") + blockHash := []byte{4, 5, 6} + blockHeight := int64(789) + storageKey := []byte{9, 8, 7} + storageValue := []byte{6, 5, 4} + return &tail.Line{ + Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address.Bytes()), common.Bytes2Hex(blockHash), + blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), + Time: time.Time{}, + Err: nil, + } +} diff --git a/libraries/shared/mocks/converter.go b/libraries/shared/mocks/converter.go index 01b0e4a6..0a687718 100644 --- a/libraries/shared/mocks/converter.go +++ b/libraries/shared/mocks/converter.go @@ -53,6 +53,6 @@ func (converter *MockConverter) SetToEntityConverterError(err error) { converter.entityConverterError = err } -func (c *MockConverter) SetToModelConverterError(err error) { - c.modelConverterError = err +func (converter *MockConverter) SetToModelConverterError(err error) { + converter.modelConverterError = err } diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go new file mode 100644 index 00000000..ef555252 --- /dev/null +++ b/libraries/shared/mocks/storage_fetcher.go @@ -0,0 +1,23 @@ +package mocks + +import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + +type MockStorageFetcher struct { + RowsToReturn []utils.StorageDiffRow + ErrsToReturn []error +} + +func NewMockStorageFetcher() *MockStorageFetcher { + return &MockStorageFetcher{} +} + +func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + for _, err := range fetcher.ErrsToReturn { + errs <- err + } + for _, row := range fetcher.RowsToReturn { + out <- row + } + close(out) + close(errs) +} diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index fa44c474..ae41f687 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -23,9 +23,11 @@ import ( type MockStorageQueue struct { AddCalled bool AddError error + PassedRow utils.StorageDiffRow } func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { queue.AddCalled = true + queue.PassedRow = row return queue.AddError } diff --git a/libraries/shared/mocks/mock_watcher_repository.go b/libraries/shared/mocks/watcher_repository.go similarity index 100% rename from libraries/shared/mocks/mock_watcher_repository.go rename to libraries/shared/mocks/watcher_repository.go diff --git a/libraries/shared/watcher/contract_watcher.go b/libraries/shared/watcher/contract_watcher.go index ebb4cc37..194e7950 100644 --- a/libraries/shared/watcher/contract_watcher.go +++ b/libraries/shared/watcher/contract_watcher.go @@ -54,10 +54,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { watcher.Transformers = append(watcher.Transformers, t) } - for _, transformer := range watcher.Transformers { - err := transformer.Init() + for _, contractTransformer := range watcher.Transformers { + err := contractTransformer.Init() if err != nil { - log.Print("Unable to initialize transformer:", transformer.GetConfig().Name, err) + log.Print("Unable to initialize transformer:", contractTransformer.GetConfig().Name, err) return err } } @@ -65,10 +65,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { } func (watcher *ContractWatcher) Execute() error { - for _, transformer := range watcher.Transformers { - err := transformer.Execute() + for _, contractTransformer := range watcher.Transformers { + err := contractTransformer.Execute() if err != nil { - log.Error("Unable to execute transformer:", transformer.GetConfig().Name, err) + log.Error("Unable to execute transformer:", contractTransformer.GetConfig().Name, err) return err } } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index 9bad688a..1fd6afde 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -37,7 +37,7 @@ type EventWatcher struct { Transformers []transformer.EventTransformer BlockChain core.BlockChain DB *postgres.DB - Fetcher fetcher.LogFetcher + Fetcher fetcher.ILogFetcher Chunker chunker.Chunker Addresses []common.Address Topics []common.Hash @@ -47,7 +47,7 @@ type EventWatcher struct { func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { logChunker := chunker.NewLogChunker() - logFetcher := fetcher.NewFetcher(bc) + logFetcher := fetcher.NewLogFetcher(bc) transactionSyncer := transactions.NewTransactionsSyncer(db, bc) return EventWatcher{ BlockChain: bc, diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 524cbcd4..e8dda2f1 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,33 +18,32 @@ package watcher import ( "reflect" - "strings" "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/fs" ) type StorageWatcher struct { - db *postgres.DB - tailer fs.Tailer - Queue storage.IStorageQueue - Transformers map[common.Address]transformer.StorageTransformer + db *postgres.DB + StorageFetcher fetcher.IStorageFetcher + Queue storage.IStorageQueue + Transformers map[common.Address]transformer.StorageTransformer } -func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher { +func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { transformers := make(map[common.Address]transformer.StorageTransformer) queue := storage.NewStorageQueue(db) return StorageWatcher{ - db: db, - tailer: tailer, - Queue: queue, - Transformers: transformers, + db: db, + StorageFetcher: fetcher, + Queue: queue, + Transformers: transformers, } } @@ -56,34 +55,36 @@ func (watcher StorageWatcher) AddTransformers(initializers []transformer.Storage } func (watcher StorageWatcher) Execute() error { - t, tailErr := watcher.tailer.Tail() - if tailErr != nil { - return tailErr + rows := make(chan utils.StorageDiffRow) + errs := make(chan error) + go watcher.StorageFetcher.FetchStorageDiffs(rows, errs) + for { + select { + case row := <-rows: + watcher.processRow(row) + case err := <-errs: + return err + } } - for line := range t.Lines { - row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) - if parseErr != nil { - return parseErr - } - storageTransformer, ok := watcher.Transformers[row.Contract] - if !ok { - logrus.Warn(utils.ErrContractNotFound{Contract: row.Contract.Hex()}.Error()) - continue - } - executeErr := storageTransformer.Execute(row) - if executeErr != nil { - if isKeyNotFound(executeErr) { - queueErr := watcher.Queue.Add(row) - if queueErr != nil { - logrus.Warn(queueErr.Error()) - } - } else { - logrus.Warn(executeErr.Error()) +} + +func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { + storageTransformer, ok := watcher.Transformers[row.Contract] + if !ok { + // ignore rows from unwatched contracts + return + } + executeErr := storageTransformer.Execute(row) + if executeErr != nil { + if isKeyNotFound(executeErr) { + queueErr := watcher.Queue.Add(row) + if queueErr != nil { + logrus.Warn(queueErr.Error()) } - continue + } else { + logrus.Warn(executeErr.Error()) } } - return nil } func isKeyNotFound(executeErr error) bool { diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index dc74eba4..bbafa75a 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,15 +17,10 @@ package watcher_test import ( - "errors" - "fmt" "io/ioutil" "os" - "strings" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/hpcloud/tail" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" @@ -34,7 +29,6 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" ) @@ -43,170 +37,87 @@ var _ = Describe("Storage Watcher", func() { It("adds transformers", func() { fakeAddress := common.HexToAddress("0x12345") fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewStorageWatcher(&fakes.MockTailer{}, test_config.NewTestDB(core.Node{})) + w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) }) - It("reads the tail of the storage diffs file", func() { - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) + Describe("executing watcher", func() { + var ( + mockFetcher *mocks.MockStorageFetcher + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + row utils.StorageDiffRow + storageWatcher watcher.StorageWatcher + ) - assert(func(err error) { - Expect(err).To(BeNil()) - Expect(mockTailer.TailCalled).To(BeTrue()) - }, w, mockTailer, []*tail.Line{}) - }) + BeforeEach(func() { + address := common.HexToAddress("0x0123456789abcdef") + mockFetcher = mocks.NewMockStorageFetcher() + mockQueue = &mocks.MockStorageQueue{} + mockTransformer = &mocks.MockStorageTransformer{Address: address} + row = utils.StorageDiffRow{ + Contract: address, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - It("returns error if row parsing fails", func() { - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - line := &tail.Line{Text: "oops"} + It("executes transformer for recognized storage row", func() { + err := storageWatcher.Execute() - assert(func(err error) { - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) - }, w, mockTailer, []*tail.Line{line}) - }) + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransformer.PassedRow).To(Equal(row)) + }) - It("logs error if no transformer can parse storage row", func() { - mockTailer := fakes.NewMockTailer() - address := common.HexToAddress("0x12345") - line := getFakeLine(address.Bytes()) - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) + It("queues row for later processing if row's key not recognized", func() { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + + err := storageWatcher.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + Expect(mockQueue.PassedRow).To(Equal(row)) + }) + + It("logs error if queueing row fails", func() { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + err := storageWatcher.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + logContent, readErr := ioutil.ReadFile(tempFile.Name()) + Expect(readErr).NotTo(HaveOccurred()) + Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + }) + + It("logs error if transformer execution fails for reason other than key not found", func() { + mockTransformer.ExecuteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + err := storageWatcher.Execute() - assert(func(err error) { Expect(err).NotTo(HaveOccurred()) logContent, readErr := ioutil.ReadFile(tempFile.Name()) Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(utils.ErrContractNotFound{Contract: address.Hex()}.Error())) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("executes transformer with storage row", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - assert(func(err error) { - Expect(err).To(BeNil()) - expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) - Expect(err).NotTo(HaveOccurred()) - Expect(fakeTransformer.PassedRow).To(Equal(expectedRow)) - }, w, mockTailer, []*tail.Line{line}) - }) - - Describe("when executing transformer fails", func() { - It("queues row when error is storage key not found", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - mockQueue := &mocks.MockStorageQueue{} - w.Queue = mockQueue - keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"} - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("logs error if queuing row fails", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - mockQueue := &mocks.MockStorageQueue{} - mockQueue.AddError = fakes.FakeError - w.Queue = mockQueue - keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"} - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("logs any other error", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - executionError := errors.New("storage watcher failed attempting to execute transformer") - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(executionError.Error())) - }, w, mockTailer, []*tail.Line{line}) + Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) }) }) }) - -func assert(assertion func(err error), watcher watcher.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) { - errs := make(chan error, 1) - done := make(chan bool, 1) - go execute(watcher, errs, done) - for _, line := range lines { - mockTailer.Lines <- line - } - close(mockTailer.Lines) - - select { - case err := <-errs: - assertion(err) - break - case <-done: - assertion(nil) - break - } -} - -func execute(w watcher.StorageWatcher, errs chan error, done chan bool) { - err := w.Execute() - if err != nil { - errs <- err - } else { - done <- true - } -} - -func getFakeLine(address []byte) *tail.Line { - blockHash := []byte{4, 5, 6} - blockHeight := int64(789) - storageKey := []byte{9, 8, 7} - storageValue := []byte{6, 5, 4} - return &tail.Line{ - Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), - Time: time.Time{}, - Err: nil, - } -} diff --git a/pkg/fakes/mock_tailer.go b/pkg/fakes/mock_tailer.go index 888688d6..ba877644 100644 --- a/pkg/fakes/mock_tailer.go +++ b/pkg/fakes/mock_tailer.go @@ -6,24 +6,22 @@ import ( ) type MockTailer struct { - Lines chan *tail.Line - TailCalled bool + Lines chan *tail.Line + TailErr error } func NewMockTailer() *MockTailer { return &MockTailer{ - Lines: make(chan *tail.Line, 1), - TailCalled: false, + Lines: make(chan *tail.Line, 1), } } func (mock *MockTailer) Tail() (*tail.Tail, error) { - mock.TailCalled = true fakeTail := &tail.Tail{ Filename: "", Lines: mock.Lines, Config: tail.Config{}, Tomb: tomb.Tomb{}, } - return fakeTail, nil + return fakeTail, mock.TailErr } From bf4b1687a039cc0eb4648401aed7b663bbf80916 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 24 Apr 2019 21:49:33 -0500 Subject: [PATCH 2/7] Add get/delete functions to storage queue --- libraries/shared/mocks/storage_queue.go | 8 +++ libraries/shared/storage/storage_queue.go | 13 ++++ .../shared/storage/storage_queue_test.go | 64 +++++++++++++++++-- libraries/shared/storage/utils/row.go | 1 + 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index ae41f687..729219a8 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -31,3 +31,11 @@ func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { queue.PassedRow = row return queue.AddError } + +func (queue *MockStorageQueue) Delete(id int) error { + panic("implement me") +} + +func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) { + panic("implement me") +} diff --git a/libraries/shared/storage/storage_queue.go b/libraries/shared/storage/storage_queue.go index a077a4f9..5fe67630 100644 --- a/libraries/shared/storage/storage_queue.go +++ b/libraries/shared/storage/storage_queue.go @@ -23,6 +23,8 @@ import ( type IStorageQueue interface { Add(row utils.StorageDiffRow) error + Delete(id int) error + GetAll() ([]utils.StorageDiffRow, error) } type StorageQueue struct { @@ -40,3 +42,14 @@ func (queue StorageQueue) Add(row utils.StorageDiffRow) error { row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes()) return err } + +func (queue StorageQueue) Delete(id int) error { + _, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE id = $1`, id) + return err +} + +func (queue StorageQueue) GetAll() ([]utils.StorageDiffRow, error) { + var result []utils.StorageDiffRow + err := queue.db.Select(&result, `SELECT * FROM public.queued_storage`) + return result, err +} diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index 9157fcda..ca05f234 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -4,6 +4,7 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" @@ -11,23 +12,74 @@ import ( ) var _ = Describe("Storage queue", func() { - It("adds a storage row to the db", func() { - row := utils.StorageDiffRow{ + var ( + db *postgres.DB + row utils.StorageDiffRow + queue storage.IStorageQueue + ) + + BeforeEach(func() { + row = utils.StorageDiffRow{ Contract: common.HexToAddress("0x123456"), BlockHash: common.HexToHash("0x678901"), BlockHeight: 987, StorageKey: common.HexToHash("0x654321"), StorageValue: common.HexToHash("0x198765"), } - db := test_config.NewTestDB(test_config.NewTestNode()) - queue := storage.NewStorageQueue(db) - + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + queue = storage.NewStorageQueue(db) addErr := queue.Add(row) - Expect(addErr).NotTo(HaveOccurred()) + }) + + It("adds a storage row to the db", func() { var result utils.StorageDiffRow getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`) Expect(getErr).NotTo(HaveOccurred()) Expect(result).To(Equal(row)) }) + + It("deletes storage row from db", func() { + rows, getErr := queue.GetAll() + Expect(getErr).NotTo(HaveOccurred()) + Expect(len(rows)).To(Equal(1)) + + err := queue.Delete(rows[0].Id) + + Expect(err).NotTo(HaveOccurred()) + remainingRows, secondGetErr := queue.GetAll() + Expect(secondGetErr).NotTo(HaveOccurred()) + Expect(len(remainingRows)).To(BeZero()) + }) + + It("gets all storage rows from db", func() { + rowTwo := utils.StorageDiffRow{ + Contract: common.HexToAddress("0x123456"), + BlockHash: common.HexToHash("0x678902"), + BlockHeight: 988, + StorageKey: common.HexToHash("0x654322"), + StorageValue: common.HexToHash("0x198766"), + } + addErr := queue.Add(rowTwo) + Expect(addErr).NotTo(HaveOccurred()) + + rows, err := queue.GetAll() + + Expect(err).NotTo(HaveOccurred()) + Expect(len(rows)).To(Equal(2)) + Expect(rows[0]).NotTo(Equal(rows[1])) + Expect(rows[0].Id).NotTo(BeZero()) + Expect(rows[0].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract))) + Expect(rows[0].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash))) + Expect(rows[0].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight))) + Expect(rows[0].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey))) + Expect(rows[0].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue))) + Expect(rows[1].Id).NotTo(BeZero()) + Expect(rows[1].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract))) + Expect(rows[1].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash))) + Expect(rows[1].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight))) + Expect(rows[1].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey))) + Expect(rows[1].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue))) + }) }) diff --git a/libraries/shared/storage/utils/row.go b/libraries/shared/storage/utils/row.go index dddb4e8f..70606fa5 100644 --- a/libraries/shared/storage/utils/row.go +++ b/libraries/shared/storage/utils/row.go @@ -25,6 +25,7 @@ import ( const ExpectedRowLength = 5 type StorageDiffRow struct { + Id int Contract common.Address BlockHash common.Hash `db:"block_hash"` BlockHeight int `db:"block_height"` From 6a86de87b4b2c8a377a9a1afb0b66cffbbadfff0 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Thu, 25 Apr 2019 12:44:14 -0500 Subject: [PATCH 3/7] (VDB-371) Recheck queued storage - Iterate through queued storage at defined interval, popping rows from the queue if successfully persisted --- cmd/execute.go | 5 +- cmd/root.go | 5 +- .../shared/fetcher/storage_fetcher_test.go | 22 +- libraries/shared/mocks/storage_fetcher.go | 4 +- libraries/shared/mocks/storage_queue.go | 17 +- .../shared/storage/storage_queue_test.go | 2 +- libraries/shared/watcher/storage_watcher.go | 59 +++-- .../shared/watcher/storage_watcher_test.go | 217 ++++++++++++++---- 8 files changed, 251 insertions(+), 80 deletions(-) diff --git a/cmd/execute.go b/cmd/execute.go index 160355b3..f3eea0f3 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -27,6 +27,7 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" @@ -168,7 +169,9 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { - w.Execute() + errs := make(chan error) + rows := make(chan storageUtils.StorageDiffRow) + w.Execute(rows, errs, queueRecheckInterval) } } diff --git a/cmd/root.go b/cmd/root.go index f322867e..7eee8793 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,8 +49,9 @@ var ( ) const ( - pollingInterval = 7 * time.Second - validationWindow = 15 + pollingInterval = 7 * time.Second + queueRecheckInterval = 5 * time.Minute + validationWindow = 15 ) var rootCmd = &cobra.Command{ diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go index 1323f9d4..f5d39dcd 100644 --- a/libraries/shared/fetcher/storage_fetcher_test.go +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -30,39 +30,35 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) }) - It("adds error to errors channel if tailing file fails", func() { + It("adds error to errors channel if tailing file fails", func(done Done) { mockTailer.TailErr = fakes.FakeError go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) - close(mockTailer.Lines) - returnedErr := <-errorsChannel - Expect(returnedErr).To(HaveOccurred()) - Expect(returnedErr).To(MatchError(fakes.FakeError)) + Expect(<-errorsChannel).To(MatchError(fakes.FakeError)) + close(done) }) - It("adds parsed csv row to rows channel for storage diff", func() { + It("adds parsed csv row to rows channel for storage diff", func(done Done) { line := getFakeLine() go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) mockTailer.Lines <- line - close(mockTailer.Lines) - returnedRow := <-rowsChannel expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) Expect(err).NotTo(HaveOccurred()) - Expect(expectedRow).To(Equal(returnedRow)) + Expect(<-rowsChannel).To(Equal(expectedRow)) + close(done) }) - It("adds error to errors channel if parsing csv fails", func() { + It("adds error to errors channel if parsing csv fails", func(done Done) { line := &tail.Line{Text: "invalid"} go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) mockTailer.Lines <- line - close(mockTailer.Lines) - returnedErr := <-errorsChannel - Expect(returnedErr).To(HaveOccurred()) + Expect(<-errorsChannel).To(HaveOccurred()) + close(done) }) }) diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index ef555252..27c61b20 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -12,12 +12,12 @@ func NewMockStorageFetcher() *MockStorageFetcher { } func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + defer close(out) + defer close(errs) for _, err := range fetcher.ErrsToReturn { errs <- err } for _, row := range fetcher.RowsToReturn { out <- row } - close(out) - close(errs) } diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index 729219a8..e667ec57 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -21,21 +21,26 @@ import ( ) type MockStorageQueue struct { - AddCalled bool - AddError error - PassedRow utils.StorageDiffRow + AddCalled bool + AddError error + AddPassedRow utils.StorageDiffRow + DeleteErr error + DeletePassedId int + GetAllErr error + RowsToReturn []utils.StorageDiffRow } func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { queue.AddCalled = true - queue.PassedRow = row + queue.AddPassedRow = row return queue.AddError } func (queue *MockStorageQueue) Delete(id int) error { - panic("implement me") + queue.DeletePassedId = id + return queue.DeleteErr } func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) { - panic("implement me") + return queue.RowsToReturn, queue.GetAllErr } diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index ca05f234..81f988c0 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -4,10 +4,10 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/test_config" ) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index e8dda2f1..fdc773fa 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -17,7 +17,9 @@ package watcher import ( + "fmt" "reflect" + "time" "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" @@ -47,29 +49,30 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage } } -func (watcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { +func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { - storageTransformer := initializer(watcher.db) - watcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer + storageTransformer := initializer(storageWatcher.db) + storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer } } -func (watcher StorageWatcher) Execute() error { - rows := make(chan utils.StorageDiffRow) - errs := make(chan error) - go watcher.StorageFetcher.FetchStorageDiffs(rows, errs) +func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, errs chan error, queueRecheckInterval time.Duration) { + ticker := time.NewTicker(queueRecheckInterval) + go storageWatcher.StorageFetcher.FetchStorageDiffs(rows, errs) for { select { + case fetchErr := <-errs: + logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) case row := <-rows: - watcher.processRow(row) - case err := <-errs: - return err + storageWatcher.processRow(row) + case <-ticker.C: + storageWatcher.processQueue() } } } -func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { - storageTransformer, ok := watcher.Transformers[row.Contract] +func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { + storageTransformer, ok := storageWatcher.Transformers[row.Contract] if !ok { // ignore rows from unwatched contracts return @@ -77,16 +80,42 @@ func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { executeErr := storageTransformer.Execute(row) if executeErr != nil { if isKeyNotFound(executeErr) { - queueErr := watcher.Queue.Add(row) + queueErr := storageWatcher.Queue.Add(row) if queueErr != nil { - logrus.Warn(queueErr.Error()) + logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr)) } } else { - logrus.Warn(executeErr.Error()) + logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) } } } +func (storageWatcher StorageWatcher) processQueue() { + rows, fetchErr := storageWatcher.Queue.GetAll() + if fetchErr != nil { + logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr)) + } + for _, row := range rows { + storageTransformer, ok := storageWatcher.Transformers[row.Contract] + if !ok { + // delete row from queue if address no longer watched + storageWatcher.deleteRow(row.Id) + continue + } + executeErr := storageTransformer.Execute(row) + if executeErr == nil { + storageWatcher.deleteRow(row.Id) + } + } +} + +func (storageWatcher StorageWatcher) deleteRow(id int) { + deleteErr := storageWatcher.Queue.Delete(id) + if deleteErr != nil { + logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr)) + } +} + func isKeyNotFound(executeErr error) bool { return reflect.TypeOf(executeErr) == reflect.TypeOf(utils.ErrStorageKeyNotFound{}) } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index bbafa75a..3dfef667 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -19,6 +19,7 @@ package watcher_test import ( "io/ioutil" "os" + "time" "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" @@ -46,78 +47,214 @@ var _ = Describe("Storage Watcher", func() { Describe("executing watcher", func() { var ( + errs chan error mockFetcher *mocks.MockStorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer row utils.StorageDiffRow + rows chan utils.StorageDiffRow storageWatcher watcher.StorageWatcher ) BeforeEach(func() { + errs = make(chan error) + rows = make(chan utils.StorageDiffRow) address := common.HexToAddress("0x0123456789abcdef") mockFetcher = mocks.NewMockStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{Address: address} row = utils.StorageDiffRow{ + Id: 1337, Contract: address, BlockHash: common.HexToHash("0xfedcba9876543210"), BlockHeight: 0, StorageKey: common.HexToHash("0xabcdef1234567890"), StorageValue: common.HexToHash("0x9876543210abcdef"), } - mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + }) + + It("logs error if fetching storage diffs fails", func(done Done) { + mockFetcher.ErrsToReturn = []error{fakes.FakeError} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for recognized storage row", func() { - err := storageWatcher.Execute() - - Expect(err).NotTo(HaveOccurred()) - Expect(mockTransformer.PassedRow).To(Equal(row)) - }) - - It("queues row for later processing if row's key not recognized", func() { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - - err := storageWatcher.Execute() - - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - Expect(mockQueue.PassedRow).To(Equal(row)) - }) - - It("logs error if queueing row fails", func() { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - err := storageWatcher.Execute() + go storageWatcher.Execute(rows, errs, time.Hour) - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) }) - It("logs error if transformer execution fails for reason other than key not found", func() { - mockTransformer.ExecuteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) + Describe("transforming new storage diffs", func() { + BeforeEach(func() { + mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - err := storageWatcher.Execute() + It("executes transformer for recognized storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Hour) - Expect(err).NotTo(HaveOccurred()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(row)) + close(done) + }) + + It("queues row for later processing if row's key not recognized", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + + go storageWatcher.Execute(rows, errs, time.Hour) + + Expect(<-errs).To(BeNil()) + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() utils.StorageDiffRow { + return mockQueue.AddPassedRow + }).Should(Equal(row)) + close(done) + }) + + It("logs error if queueing row fails", func(done Done) { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("logs error if transformer execution fails for reason other than key not found", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Hour) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) }) + + Describe("transforming queued storage diffs", func() { + BeforeEach(func() { + mockQueue.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("logs error if getting queued storage fails", func(done Done) { + mockQueue.GetAllErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("executes transformer for storage row", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiffRow { + return mockTransformer.PassedRow + }).Should(Equal(row)) + close(done) + }) + + It("deletes row from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(row.Id)) + close(done) + }) + + It("logs error if deleting persisted row fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + + It("deletes obsolete row from queue if contract not recognized", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: row.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(obsoleteRow.Id)) + close(done) + }) + + It("logs error if deleting obsolete row fails", func(done Done) { + obsoleteRow := utils.StorageDiffRow{ + Id: row.Id + 1, + Contract: common.HexToAddress("0xfedcba9876543210"), + } + mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(rows, errs, time.Nanosecond) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + }) }) From d77f3fe01244b0616e105a831fbdeca7f4837566 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Fri, 26 Apr 2019 10:36:42 -0500 Subject: [PATCH 4/7] Don't pass empty row to channel on error --- libraries/shared/fetcher/storage_fetcher.go | 3 ++- libraries/shared/fetcher/storage_fetcher_test.go | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/libraries/shared/fetcher/storage_fetcher.go b/libraries/shared/fetcher/storage_fetcher.go index 2115277b..e146a4ec 100644 --- a/libraries/shared/fetcher/storage_fetcher.go +++ b/libraries/shared/fetcher/storage_fetcher.go @@ -27,7 +27,8 @@ func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.S row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) if parseErr != nil { errs <- parseErr + } else { + out <- row } - out <- row } } diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go index f5d39dcd..b97fc55b 100644 --- a/libraries/shared/fetcher/storage_fetcher_test.go +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -58,6 +58,12 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { mockTailer.Lines <- line Expect(<-errorsChannel).To(HaveOccurred()) + select { + case <-rowsChannel: + Fail("value passed to rows channel on error") + default: + Succeed() + } close(done) }) }) From 76ab914bdc1e9db9c527541a70963b7da6248347 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Fri, 26 Apr 2019 10:42:40 -0500 Subject: [PATCH 5/7] Add license --- libraries/shared/fetcher/storage_fetcher.go | 16 ++++++++++++++++ libraries/shared/fetcher/storage_fetcher_test.go | 16 ++++++++++++++++ libraries/shared/mocks/storage_fetcher.go | 16 ++++++++++++++++ libraries/shared/storage/storage_queue_test.go | 16 ++++++++++++++++ libraries/shared/transactions/syncer.go | 16 ++++++++++++++++ libraries/shared/transactions/syncer_test.go | 16 ++++++++++++++++ .../transactions/transactions_suite_test.go | 16 ++++++++++++++++ pkg/fakes/mock_filter_repository.go | 16 ++++++++++++++++ pkg/fakes/mock_parser.go | 16 ++++++++++++++++ pkg/fakes/mock_poller.go | 16 ++++++++++++++++ pkg/fakes/mock_tailer.go | 16 ++++++++++++++++ pkg/fakes/mock_transaction_syncer.go | 16 ++++++++++++++++ 12 files changed, 192 insertions(+) diff --git a/libraries/shared/fetcher/storage_fetcher.go b/libraries/shared/fetcher/storage_fetcher.go index e146a4ec..dea891e8 100644 --- a/libraries/shared/fetcher/storage_fetcher.go +++ b/libraries/shared/fetcher/storage_fetcher.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fetcher import ( diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go index b97fc55b..20e9a6b5 100644 --- a/libraries/shared/fetcher/storage_fetcher_test.go +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fetcher_test import ( diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index 27c61b20..5009f116 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package mocks import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index 81f988c0..5eed4c58 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package storage_test import ( diff --git a/libraries/shared/transactions/syncer.go b/libraries/shared/transactions/syncer.go index f8590e2b..fb7b4e07 100644 --- a/libraries/shared/transactions/syncer.go +++ b/libraries/shared/transactions/syncer.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package transactions import ( diff --git a/libraries/shared/transactions/syncer_test.go b/libraries/shared/transactions/syncer_test.go index f38f68d2..736f5108 100644 --- a/libraries/shared/transactions/syncer_test.go +++ b/libraries/shared/transactions/syncer_test.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package transactions_test import ( diff --git a/libraries/shared/transactions/transactions_suite_test.go b/libraries/shared/transactions/transactions_suite_test.go index 1771a687..ca012b1a 100644 --- a/libraries/shared/transactions/transactions_suite_test.go +++ b/libraries/shared/transactions/transactions_suite_test.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package transactions_test import ( diff --git a/pkg/fakes/mock_filter_repository.go b/pkg/fakes/mock_filter_repository.go index e6f160a8..c4cf221a 100644 --- a/pkg/fakes/mock_filter_repository.go +++ b/pkg/fakes/mock_filter_repository.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fakes import "github.com/vulcanize/vulcanizedb/pkg/filters" diff --git a/pkg/fakes/mock_parser.go b/pkg/fakes/mock_parser.go index dd03bbab..d7a33ebb 100644 --- a/pkg/fakes/mock_parser.go +++ b/pkg/fakes/mock_parser.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fakes import ( diff --git a/pkg/fakes/mock_poller.go b/pkg/fakes/mock_poller.go index 2d782b18..7b2fe848 100644 --- a/pkg/fakes/mock_poller.go +++ b/pkg/fakes/mock_poller.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fakes import ( diff --git a/pkg/fakes/mock_tailer.go b/pkg/fakes/mock_tailer.go index ba877644..e30fb922 100644 --- a/pkg/fakes/mock_tailer.go +++ b/pkg/fakes/mock_tailer.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fakes import ( diff --git a/pkg/fakes/mock_transaction_syncer.go b/pkg/fakes/mock_transaction_syncer.go index 0753a69a..e5b49a9c 100644 --- a/pkg/fakes/mock_transaction_syncer.go +++ b/pkg/fakes/mock_transaction_syncer.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package fakes import "github.com/ethereum/go-ethereum/core/types" From b0360539371e7c0e1ff564acbeaa46eb54c052cc Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Mon, 29 Apr 2019 14:20:57 -0500 Subject: [PATCH 6/7] Queue storage diffs if transformer execution fails - For any error, not just if key isn't recognized - Means we don't lose track of diffs on random ephemeral errors --- libraries/shared/watcher/storage_watcher.go | 11 ++++------ .../shared/watcher/storage_watcher_test.go | 20 ++----------------- 2 files changed, 6 insertions(+), 25 deletions(-) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index fdc773fa..7b2c5362 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -79,13 +79,10 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { } executeErr := storageTransformer.Execute(row) if executeErr != nil { - if isKeyNotFound(executeErr) { - queueErr := storageWatcher.Queue.Add(row) - if queueErr != nil { - logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr)) - } - } else { - logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) + logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) + queueErr := storageWatcher.Queue.Add(row) + if queueErr != nil { + logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr)) } } } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 3dfef667..6c12358d 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -109,8 +109,8 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - It("queues row for later processing if row's key not recognized", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + It("queues row for later processing if transformer execution fails", func(done Done) { + mockTransformer.ExecuteErr = fakes.FakeError go storageWatcher.Execute(rows, errs, time.Hour) @@ -143,22 +143,6 @@ var _ = Describe("Storage Watcher", func() { }).Should(ContainSubstring(fakes.FakeError.Error())) close(done) }) - - It("logs error if transformer execution fails for reason other than key not found", func(done Done) { - mockTransformer.ExecuteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(rows, errs, time.Hour) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) }) Describe("transforming queued storage diffs", func() { From 6716c3b92addfc175df3e3f4a9fbcab38ca650a8 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Mon, 29 Apr 2019 15:21:32 -0500 Subject: [PATCH 7/7] Make queue recheck interval configurable via CLI --- cmd/composeAndExecute.go | 4 +++- cmd/execute.go | 3 ++- cmd/root.go | 2 +- documentation/composeAndExecute.md | 14 ++++++++++++++ 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 1c5ac10a..cb902d82 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -19,6 +19,7 @@ import ( "os" "plugin" syn "sync" + "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -191,5 +192,6 @@ func composeAndExecute() { func init() { rootCmd.AddCommand(composeAndExecuteCmd) - composeAndExecuteCmd.Flags().BoolVar(&recheckHeadersArg, "recheckHeaders", false, "checks headers that are already checked for each transformer.") + composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") + composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs") } diff --git a/cmd/execute.go b/cmd/execute.go index f3eea0f3..ecccdee0 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -138,7 +138,8 @@ func execute() { func init() { rootCmd.AddCommand(executeCmd) - executeCmd.Flags().BoolVar(&recheckHeadersArg, "recheckHeaders", false, "checks headers that are already checked for each transformer.") + executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events") + executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs") } type Exporter interface { diff --git a/cmd/root.go b/cmd/root.go index 7eee8793..dba91864 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -41,6 +41,7 @@ var ( genConfig config.Plugin ipc string levelDbPath string + queueRecheckInterval time.Duration startingBlockNumber int64 storageDiffsPath string syncAll bool @@ -50,7 +51,6 @@ var ( const ( pollingInterval = 7 * time.Second - queueRecheckInterval = 5 * time.Minute validationWindow = 15 ) diff --git a/documentation/composeAndExecute.md b/documentation/composeAndExecute.md index 1a0dac6b..8ea1abc9 100644 --- a/documentation/composeAndExecute.md +++ b/documentation/composeAndExecute.md @@ -36,6 +36,20 @@ composeAndExecute: `./vulcanizedb composeAndExecute --config=./environments/config_name.toml` +## Flags + +The `compose` and `composeAndExecute` commands can be passed optional flags to specify the operation of the watchers: + +- `--recheck-headers`/`-r` - specifies whether to re-check headers for events after the header has already been queried for watched logs. +Can be useful for redundancy if you suspect that your node is not always returning all desired logs on every query. +Argument is expected to be a boolean: e.g. `-r=true`. +Defaults to `false`. + +- `query-recheck-interval`/`-q` - specifies interval for re-checking storage diffs that haven been queued for later processing +(by default, the storage watched queues storage diffs if transformer execution fails, on the assumption that subsequent data derived from the event transformers may enable us to decode storage keys that we don't recognize right now). +Argument is expected to be a duration (integer measured in nanoseconds): e.g. `-q=10m30s` (for 10 minute, 30 second intervals). +Defaults to `5m` (5 minutes). + ## Configuration A .toml config file is specified when executing the commands. The config provides information for composing a set of transformers from external repositories: