From 2d684c5aecb6c4cf408bba8d78e6071ba44ac7c7 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 24 Apr 2019 15:05:57 -0500 Subject: [PATCH] Extract storage diff fetching behind an interface - Replaces directly reading from a CSV - Simplifies testing - Should hopefully make it easier to plug in other sources for storage diffs (e.g. differently formatted CSVs, JSON RPC, etc) --- cmd/composeAndExecute.go | 12 +- cmd/execute.go | 4 +- libraries/shared/fetcher/log_fetcher.go | 12 +- libraries/shared/fetcher/log_fetcher_test.go | 12 +- libraries/shared/fetcher/storage_fetcher.go | 33 +++ .../shared/fetcher/storage_fetcher_test.go | 81 +++++++ libraries/shared/mocks/converter.go | 4 +- libraries/shared/mocks/storage_fetcher.go | 23 ++ libraries/shared/mocks/storage_queue.go | 2 + ...er_repository.go => watcher_repository.go} | 0 libraries/shared/watcher/contract_watcher.go | 12 +- libraries/shared/watcher/event_watcher.go | 4 +- libraries/shared/watcher/storage_watcher.go | 71 +++--- .../shared/watcher/storage_watcher_test.go | 223 ++++++------------ pkg/fakes/mock_tailer.go | 10 +- 15 files changed, 279 insertions(+), 224 deletions(-) create mode 100644 libraries/shared/fetcher/storage_fetcher.go create mode 100644 libraries/shared/fetcher/storage_fetcher_test.go create mode 100644 libraries/shared/mocks/storage_fetcher.go rename libraries/shared/mocks/{mock_watcher_repository.go => watcher_repository.go} (100%) diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index d2abd045..1c5ac10a 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -16,16 +16,19 @@ package cmd import ( + "os" + "plugin" + syn "sync" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/utils" - "os" - "plugin" - syn "sync" ) // composeAndExecuteCmd represents the composeAndExecute command @@ -170,7 +173,8 @@ func composeAndExecute() { if len(ethStorageInitializers) > 0 { tailer := fs.FileTailer{Path: storageDiffsPath} - sw := watcher.NewStorageWatcher(tailer, &db) + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/cmd/execute.go b/cmd/execute.go index 3ebc15f1..160355b3 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fs" @@ -118,7 +119,8 @@ func execute() { if len(ethStorageInitializers) > 0 { tailer := fs.FileTailer{Path: storageDiffsPath} - sw := watcher.NewStorageWatcher(tailer, &db) + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/libraries/shared/fetcher/log_fetcher.go b/libraries/shared/fetcher/log_fetcher.go index a231b3de..a2b5f21a 100644 --- a/libraries/shared/fetcher/log_fetcher.go +++ b/libraries/shared/fetcher/log_fetcher.go @@ -24,22 +24,22 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -type LogFetcher interface { +type ILogFetcher interface { FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error) } -type Fetcher struct { +type LogFetcher struct { blockChain core.BlockChain } -func NewFetcher(blockchain core.BlockChain) *Fetcher { - return &Fetcher{ +func NewLogFetcher(blockchain core.BlockChain) *LogFetcher { + return &LogFetcher{ blockChain: blockchain, } } // Checks all topic0s, on all addresses, fetching matching logs for the given header -func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) { +func (logFetcher LogFetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) { blockHash := common.HexToHash(header.Hash) query := ethereum.FilterQuery{ BlockHash: &blockHash, @@ -48,7 +48,7 @@ func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Ha Topics: [][]common.Hash{topic0s}, } - logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query) + logs, err := logFetcher.blockChain.GetEthLogsWithCustomQuery(query) if err != nil { // TODO review aggregate fetching error handling return []types.Log{}, err diff --git a/libraries/shared/fetcher/log_fetcher_test.go b/libraries/shared/fetcher/log_fetcher_test.go index 1a41a5a3..8f9b36a7 100644 --- a/libraries/shared/fetcher/log_fetcher_test.go +++ b/libraries/shared/fetcher/log_fetcher_test.go @@ -22,16 +22,16 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - fetch "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" ) -var _ = Describe("Fetcher", func() { +var _ = Describe("LogFetcher", func() { Describe("FetchLogs", func() { It("fetches logs based on the given query", func() { blockChain := fakes.NewMockBlockChain() - fetcher := fetch.NewFetcher(blockChain) + logFetcher := fetcher.NewLogFetcher(blockChain) header := fakes.FakeHeader addresses := []common.Address{ @@ -41,7 +41,7 @@ var _ = Describe("Fetcher", func() { topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})} - _, err := fetcher.FetchLogs(addresses, topicZeros, header) + _, err := logFetcher.FetchLogs(addresses, topicZeros, header) address1 := common.HexToAddress("0xfakeAddress") address2 := common.HexToAddress("0xanotherFakeAddress") @@ -59,9 +59,9 @@ var _ = Describe("Fetcher", func() { It("returns an error if fetching the logs fails", func() { blockChain := fakes.NewMockBlockChain() blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) - fetcher := fetch.NewFetcher(blockChain) + logFetcher := fetcher.NewLogFetcher(blockChain) - _, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) + _, err := logFetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/fetcher/storage_fetcher.go b/libraries/shared/fetcher/storage_fetcher.go new file mode 100644 index 00000000..2115277b --- /dev/null +++ b/libraries/shared/fetcher/storage_fetcher.go @@ -0,0 +1,33 @@ +package fetcher + +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/fs" + "strings" +) + +type IStorageFetcher interface { + FetchStorageDiffs(chan<- utils.StorageDiffRow, chan<- error) +} + +type CsvTailStorageFetcher struct { + tailer fs.Tailer +} + +func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher { + return CsvTailStorageFetcher{tailer: tailer} +} + +func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + t, tailErr := storageFetcher.tailer.Tail() + if tailErr != nil { + errs <- tailErr + } + for line := range t.Lines { + row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) + if parseErr != nil { + errs <- parseErr + } + out <- row + } +} diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/storage_fetcher_test.go new file mode 100644 index 00000000..1323f9d4 --- /dev/null +++ b/libraries/shared/fetcher/storage_fetcher_test.go @@ -0,0 +1,81 @@ +package fetcher_test + +import ( + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/hpcloud/tail" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +var _ = Describe("Csv Tail Storage Fetcher", func() { + var ( + errorsChannel chan error + mockTailer *fakes.MockTailer + rowsChannel chan utils.StorageDiffRow + storageFetcher fetcher.CsvTailStorageFetcher + ) + + BeforeEach(func() { + errorsChannel = make(chan error) + rowsChannel = make(chan utils.StorageDiffRow) + mockTailer = fakes.NewMockTailer() + storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) + }) + + It("adds error to errors channel if tailing file fails", func() { + mockTailer.TailErr = fakes.FakeError + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + + close(mockTailer.Lines) + returnedErr := <-errorsChannel + Expect(returnedErr).To(HaveOccurred()) + Expect(returnedErr).To(MatchError(fakes.FakeError)) + }) + + It("adds parsed csv row to rows channel for storage diff", func() { + line := getFakeLine() + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + mockTailer.Lines <- line + + close(mockTailer.Lines) + returnedRow := <-rowsChannel + expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) + Expect(err).NotTo(HaveOccurred()) + Expect(expectedRow).To(Equal(returnedRow)) + }) + + It("adds error to errors channel if parsing csv fails", func() { + line := &tail.Line{Text: "invalid"} + + go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + mockTailer.Lines <- line + + close(mockTailer.Lines) + returnedErr := <-errorsChannel + Expect(returnedErr).To(HaveOccurred()) + }) +}) + +func getFakeLine() *tail.Line { + address := common.HexToAddress("0x1234567890abcdef") + blockHash := []byte{4, 5, 6} + blockHeight := int64(789) + storageKey := []byte{9, 8, 7} + storageValue := []byte{6, 5, 4} + return &tail.Line{ + Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address.Bytes()), common.Bytes2Hex(blockHash), + blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), + Time: time.Time{}, + Err: nil, + } +} diff --git a/libraries/shared/mocks/converter.go b/libraries/shared/mocks/converter.go index 01b0e4a6..0a687718 100644 --- a/libraries/shared/mocks/converter.go +++ b/libraries/shared/mocks/converter.go @@ -53,6 +53,6 @@ func (converter *MockConverter) SetToEntityConverterError(err error) { converter.entityConverterError = err } -func (c *MockConverter) SetToModelConverterError(err error) { - c.modelConverterError = err +func (converter *MockConverter) SetToModelConverterError(err error) { + converter.modelConverterError = err } diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go new file mode 100644 index 00000000..ef555252 --- /dev/null +++ b/libraries/shared/mocks/storage_fetcher.go @@ -0,0 +1,23 @@ +package mocks + +import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + +type MockStorageFetcher struct { + RowsToReturn []utils.StorageDiffRow + ErrsToReturn []error +} + +func NewMockStorageFetcher() *MockStorageFetcher { + return &MockStorageFetcher{} +} + +func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + for _, err := range fetcher.ErrsToReturn { + errs <- err + } + for _, row := range fetcher.RowsToReturn { + out <- row + } + close(out) + close(errs) +} diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index fa44c474..ae41f687 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -23,9 +23,11 @@ import ( type MockStorageQueue struct { AddCalled bool AddError error + PassedRow utils.StorageDiffRow } func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { queue.AddCalled = true + queue.PassedRow = row return queue.AddError } diff --git a/libraries/shared/mocks/mock_watcher_repository.go b/libraries/shared/mocks/watcher_repository.go similarity index 100% rename from libraries/shared/mocks/mock_watcher_repository.go rename to libraries/shared/mocks/watcher_repository.go diff --git a/libraries/shared/watcher/contract_watcher.go b/libraries/shared/watcher/contract_watcher.go index ebb4cc37..194e7950 100644 --- a/libraries/shared/watcher/contract_watcher.go +++ b/libraries/shared/watcher/contract_watcher.go @@ -54,10 +54,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { watcher.Transformers = append(watcher.Transformers, t) } - for _, transformer := range watcher.Transformers { - err := transformer.Init() + for _, contractTransformer := range watcher.Transformers { + err := contractTransformer.Init() if err != nil { - log.Print("Unable to initialize transformer:", transformer.GetConfig().Name, err) + log.Print("Unable to initialize transformer:", contractTransformer.GetConfig().Name, err) return err } } @@ -65,10 +65,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { } func (watcher *ContractWatcher) Execute() error { - for _, transformer := range watcher.Transformers { - err := transformer.Execute() + for _, contractTransformer := range watcher.Transformers { + err := contractTransformer.Execute() if err != nil { - log.Error("Unable to execute transformer:", transformer.GetConfig().Name, err) + log.Error("Unable to execute transformer:", contractTransformer.GetConfig().Name, err) return err } } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index 9bad688a..1fd6afde 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -37,7 +37,7 @@ type EventWatcher struct { Transformers []transformer.EventTransformer BlockChain core.BlockChain DB *postgres.DB - Fetcher fetcher.LogFetcher + Fetcher fetcher.ILogFetcher Chunker chunker.Chunker Addresses []common.Address Topics []common.Hash @@ -47,7 +47,7 @@ type EventWatcher struct { func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { logChunker := chunker.NewLogChunker() - logFetcher := fetcher.NewFetcher(bc) + logFetcher := fetcher.NewLogFetcher(bc) transactionSyncer := transactions.NewTransactionsSyncer(db, bc) return EventWatcher{ BlockChain: bc, diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 524cbcd4..e8dda2f1 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,33 +18,32 @@ package watcher import ( "reflect" - "strings" "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/fs" ) type StorageWatcher struct { - db *postgres.DB - tailer fs.Tailer - Queue storage.IStorageQueue - Transformers map[common.Address]transformer.StorageTransformer + db *postgres.DB + StorageFetcher fetcher.IStorageFetcher + Queue storage.IStorageQueue + Transformers map[common.Address]transformer.StorageTransformer } -func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher { +func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { transformers := make(map[common.Address]transformer.StorageTransformer) queue := storage.NewStorageQueue(db) return StorageWatcher{ - db: db, - tailer: tailer, - Queue: queue, - Transformers: transformers, + db: db, + StorageFetcher: fetcher, + Queue: queue, + Transformers: transformers, } } @@ -56,34 +55,36 @@ func (watcher StorageWatcher) AddTransformers(initializers []transformer.Storage } func (watcher StorageWatcher) Execute() error { - t, tailErr := watcher.tailer.Tail() - if tailErr != nil { - return tailErr + rows := make(chan utils.StorageDiffRow) + errs := make(chan error) + go watcher.StorageFetcher.FetchStorageDiffs(rows, errs) + for { + select { + case row := <-rows: + watcher.processRow(row) + case err := <-errs: + return err + } } - for line := range t.Lines { - row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) - if parseErr != nil { - return parseErr - } - storageTransformer, ok := watcher.Transformers[row.Contract] - if !ok { - logrus.Warn(utils.ErrContractNotFound{Contract: row.Contract.Hex()}.Error()) - continue - } - executeErr := storageTransformer.Execute(row) - if executeErr != nil { - if isKeyNotFound(executeErr) { - queueErr := watcher.Queue.Add(row) - if queueErr != nil { - logrus.Warn(queueErr.Error()) - } - } else { - logrus.Warn(executeErr.Error()) +} + +func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) { + storageTransformer, ok := watcher.Transformers[row.Contract] + if !ok { + // ignore rows from unwatched contracts + return + } + executeErr := storageTransformer.Execute(row) + if executeErr != nil { + if isKeyNotFound(executeErr) { + queueErr := watcher.Queue.Add(row) + if queueErr != nil { + logrus.Warn(queueErr.Error()) } - continue + } else { + logrus.Warn(executeErr.Error()) } } - return nil } func isKeyNotFound(executeErr error) bool { diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index dc74eba4..bbafa75a 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,15 +17,10 @@ package watcher_test import ( - "errors" - "fmt" "io/ioutil" "os" - "strings" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/hpcloud/tail" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" @@ -34,7 +29,6 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" ) @@ -43,170 +37,87 @@ var _ = Describe("Storage Watcher", func() { It("adds transformers", func() { fakeAddress := common.HexToAddress("0x12345") fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewStorageWatcher(&fakes.MockTailer{}, test_config.NewTestDB(core.Node{})) + w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) }) - It("reads the tail of the storage diffs file", func() { - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) + Describe("executing watcher", func() { + var ( + mockFetcher *mocks.MockStorageFetcher + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + row utils.StorageDiffRow + storageWatcher watcher.StorageWatcher + ) - assert(func(err error) { - Expect(err).To(BeNil()) - Expect(mockTailer.TailCalled).To(BeTrue()) - }, w, mockTailer, []*tail.Line{}) - }) + BeforeEach(func() { + address := common.HexToAddress("0x0123456789abcdef") + mockFetcher = mocks.NewMockStorageFetcher() + mockQueue = &mocks.MockStorageQueue{} + mockTransformer = &mocks.MockStorageTransformer{Address: address} + row = utils.StorageDiffRow{ + Contract: address, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) - It("returns error if row parsing fails", func() { - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - line := &tail.Line{Text: "oops"} + It("executes transformer for recognized storage row", func() { + err := storageWatcher.Execute() - assert(func(err error) { - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) - }, w, mockTailer, []*tail.Line{line}) - }) + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransformer.PassedRow).To(Equal(row)) + }) - It("logs error if no transformer can parse storage row", func() { - mockTailer := fakes.NewMockTailer() - address := common.HexToAddress("0x12345") - line := getFakeLine(address.Bytes()) - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) + It("queues row for later processing if row's key not recognized", func() { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + + err := storageWatcher.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + Expect(mockQueue.PassedRow).To(Equal(row)) + }) + + It("logs error if queueing row fails", func() { + mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} + mockQueue.AddError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + err := storageWatcher.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + logContent, readErr := ioutil.ReadFile(tempFile.Name()) + Expect(readErr).NotTo(HaveOccurred()) + Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + }) + + It("logs error if transformer execution fails for reason other than key not found", func() { + mockTransformer.ExecuteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + err := storageWatcher.Execute() - assert(func(err error) { Expect(err).NotTo(HaveOccurred()) logContent, readErr := ioutil.ReadFile(tempFile.Name()) Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(utils.ErrContractNotFound{Contract: address.Hex()}.Error())) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("executes transformer with storage row", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - assert(func(err error) { - Expect(err).To(BeNil()) - expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) - Expect(err).NotTo(HaveOccurred()) - Expect(fakeTransformer.PassedRow).To(Equal(expectedRow)) - }, w, mockTailer, []*tail.Line{line}) - }) - - Describe("when executing transformer fails", func() { - It("queues row when error is storage key not found", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - mockQueue := &mocks.MockStorageQueue{} - w.Queue = mockQueue - keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"} - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("logs error if queuing row fails", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - mockQueue := &mocks.MockStorageQueue{} - mockQueue.AddError = fakes.FakeError - w.Queue = mockQueue - keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"} - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - Expect(mockQueue.AddCalled).To(BeTrue()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) - }, w, mockTailer, []*tail.Line{line}) - }) - - It("logs any other error", func() { - address := []byte{1, 2, 3} - line := getFakeLine(address) - mockTailer := fakes.NewMockTailer() - w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - executionError := errors.New("storage watcher failed attempting to execute transformer") - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError} - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) - - assert(func(err error) { - Expect(err).NotTo(HaveOccurred()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(executionError.Error())) - }, w, mockTailer, []*tail.Line{line}) + Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) }) }) }) - -func assert(assertion func(err error), watcher watcher.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) { - errs := make(chan error, 1) - done := make(chan bool, 1) - go execute(watcher, errs, done) - for _, line := range lines { - mockTailer.Lines <- line - } - close(mockTailer.Lines) - - select { - case err := <-errs: - assertion(err) - break - case <-done: - assertion(nil) - break - } -} - -func execute(w watcher.StorageWatcher, errs chan error, done chan bool) { - err := w.Execute() - if err != nil { - errs <- err - } else { - done <- true - } -} - -func getFakeLine(address []byte) *tail.Line { - blockHash := []byte{4, 5, 6} - blockHeight := int64(789) - storageKey := []byte{9, 8, 7} - storageValue := []byte{6, 5, 4} - return &tail.Line{ - Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), - Time: time.Time{}, - Err: nil, - } -} diff --git a/pkg/fakes/mock_tailer.go b/pkg/fakes/mock_tailer.go index 888688d6..ba877644 100644 --- a/pkg/fakes/mock_tailer.go +++ b/pkg/fakes/mock_tailer.go @@ -6,24 +6,22 @@ import ( ) type MockTailer struct { - Lines chan *tail.Line - TailCalled bool + Lines chan *tail.Line + TailErr error } func NewMockTailer() *MockTailer { return &MockTailer{ - Lines: make(chan *tail.Line, 1), - TailCalled: false, + Lines: make(chan *tail.Line, 1), } } func (mock *MockTailer) Tail() (*tail.Tail, error) { - mock.TailCalled = true fakeTail := &tail.Tail{ Filename: "", Lines: mock.Lines, Config: tail.Config{}, Tomb: tomb.Tomb{}, } - return fakeTail, nil + return fakeTail, mock.TailErr }