From 909d23e176a7cf4ad33365e0e84dffb22cb2c691 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edvard=20H=C3=BCbinette?= Date: Tue, 15 Jan 2019 11:09:13 +0100 Subject: [PATCH] VDB-284 Run watcher from earliest startingBlockNumber (#136) * Watcher computes earliest starting block number * Update tests for new watcher behaviour * Fix error oopsie * Extract conditional to helper and suppres tests logging --- libraries/shared/shared_suite_test.go | 6 ++++ libraries/shared/watcher.go | 37 ++++++++++++++------- libraries/shared/watcher_test.go | 46 ++++++++++++++++++++++----- 3 files changed, 70 insertions(+), 19 deletions(-) diff --git a/libraries/shared/shared_suite_test.go b/libraries/shared/shared_suite_test.go index b2c78ef2..62a8f3d4 100644 --- a/libraries/shared/shared_suite_test.go +++ b/libraries/shared/shared_suite_test.go @@ -1,6 +1,8 @@ package shared_test import ( + "io/ioutil" + "log" "testing" . "github.com/onsi/ginkgo" @@ -11,3 +13,7 @@ func TestShared(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Shared Suite") } + +var _ = BeforeSuite(func() { + log.SetOutput(ioutil.Discard) +}) diff --git a/libraries/shared/watcher.go b/libraries/shared/watcher.go index 8a8ded13..5e1bfdfd 100644 --- a/libraries/shared/watcher.go +++ b/libraries/shared/watcher.go @@ -1,20 +1,22 @@ package shared import ( + "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" + log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" ) type Watcher struct { - Transformers []shared.Transformer - DB *postgres.DB - Fetcher shared.LogFetcher - Chunker shared.Chunker - Addresses []common.Address - Topics []common.Hash + Transformers []shared.Transformer + DB *postgres.DB + Fetcher shared.LogFetcher + Chunker shared.Chunker + Addresses []common.Address + Topics []common.Hash + StartingBlock *int64 } func NewWatcher(db *postgres.DB, bc core.BlockChain) Watcher { @@ -40,6 +42,12 @@ func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitial config := transformer.GetConfig() configs = append(configs, config) + if watcher.StartingBlock == nil { + watcher.StartingBlock = &config.StartingBlockNumber + } else if earlierStartingBlockNumber(config.StartingBlockNumber, *watcher.StartingBlock) { + watcher.StartingBlock = &config.StartingBlockNumber + } + addresses := shared.HexStringsToAddresses(config.ContractAddresses) contractAddresses = append(contractAddresses, addresses...) topic0s = append(topic0s, common.HexToHash(config.Topic)) @@ -51,14 +59,17 @@ func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitial } func (watcher *Watcher) Execute() error { + if watcher.Transformers == nil { + return fmt.Errorf("No transformers added to watcher") + } + checkedColumnNames, err := shared.GetCheckedColumnNames(watcher.DB) if err != nil { return err } notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames) - // TODO Handle start and end numbers in transformers - missingHeaders, err := shared.MissingHeaders(0, -1, watcher.DB, notCheckedSQL) + missingHeaders, err := shared.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL) if err != nil { log.Error("Fetching of missing headers failed in watcher!") return err @@ -69,7 +80,7 @@ func (watcher *Watcher) Execute() error { logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header) if err != nil { // TODO Handle fetch error in watcher - log.Error("Error while fetching logs for header %v in watcher", header.Id) + log.Errorf("Error while fetching logs for header %v in watcher", header.Id) return err } @@ -82,10 +93,14 @@ func (watcher *Watcher) Execute() error { logChunk := chunkedLogs[transformerName] err = transformer.Execute(logChunk, header) if err != nil { - log.Error("%v transformer failed to execute in watcher: %v", transformerName, err) + log.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err) return err } } } return err } + +func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { + return transformerBlock < watcherBlock +} diff --git a/libraries/shared/watcher_test.go b/libraries/shared/watcher_test.go index c93c6336..8312c2b9 100644 --- a/libraries/shared/watcher_test.go +++ b/libraries/shared/watcher_test.go @@ -59,12 +59,33 @@ var _ = Describe("Watcher", func() { common.HexToAddress("FakeAddress")})) }) + It("calculates earliest starting block number", func() { + fakeTransformer1 := &mocks.MockTransformer{} + fakeTransformer1.SetTransformerConfig(shared2.TransformerConfig{StartingBlockNumber: 5}) + + fakeTransformer2 := &mocks.MockTransformer{} + fakeTransformer2.SetTransformerConfig(shared2.TransformerConfig{StartingBlockNumber: 3}) + + watcher := shared.NewWatcher(nil, nil) + watcher.AddTransformers([]shared2.TransformerInitializer{ + fakeTransformer1.FakeTransformerInitializer, + fakeTransformer2.FakeTransformerInitializer, + }) + + Expect(*watcher.StartingBlock).To(Equal(int64(3))) + }) + + It("returns an error when run without transformers", func() { + watcher := shared.NewWatcher(nil, nil) + err := watcher.Execute() + Expect(err).To(MatchError("No transformers added to watcher")) + }) + Describe("with missing headers", func() { var ( db *postgres.DB watcher shared.Watcher mockBlockChain fakes.MockBlockChain - fakeTransformer *mocks.MockTransformer headerRepository repositories.HeaderRepository repository mocks.MockWatcherRepository ) @@ -82,8 +103,8 @@ var _ = Describe("Watcher", func() { }) It("executes each transformer", func() { - fakeTransformer = &mocks.MockTransformer{} - watcher.Transformers = []shared2.Transformer{fakeTransformer} + fakeTransformer := &mocks.MockTransformer{} + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) err := watcher.Execute() @@ -93,8 +114,8 @@ var _ = Describe("Watcher", func() { }) It("returns an error if transformer returns an error", func() { - fakeTransformer = &mocks.MockTransformer{ExecuteError: errors.New("Something bad happened")} - watcher.Transformers = []shared2.Transformer{fakeTransformer} + fakeTransformer := &mocks.MockTransformer{ExecuteError: errors.New("Something bad happened")} + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) err := watcher.Execute() @@ -135,19 +156,27 @@ var _ = Describe("Watcher", func() { }) Describe("uses the LogFetcher correctly:", func() { + var fakeTransformer mocks.MockTransformer BeforeEach(func() { repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) + fakeTransformer = mocks.MockTransformer{} }) - It("fetches logs", func() { + It("fetches logs for added transformers", func() { + addresses := []string{"0xA", "0xB"} + topic := "0x1" + fakeTransformer.SetTransformerConfig(shared2.TransformerConfig{ + Topic: topic, ContractAddresses: addresses}) + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + err := watcher.Execute() Expect(err).NotTo(HaveOccurred()) fakeHash := common.HexToHash(fakes.FakeHeader.Hash) mockBlockChain.AssertGetEthLogsWithCustomQueryCalledWith(ethereum.FilterQuery{ BlockHash: &fakeHash, - Addresses: nil, - Topics: [][]common.Hash{nil}, + Addresses: shared2.HexStringsToAddresses(addresses), + Topics: [][]common.Hash{{common.HexToHash(topic)}}, }) }) @@ -155,6 +184,7 @@ var _ = Describe("Watcher", func() { fetcherError := errors.New("FetcherError") mockBlockChain.SetGetEthLogsWithCustomQueryErr(fetcherError) + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) err := watcher.Execute() Expect(err).To(MatchError(fetcherError)) })