VDB-284 Run watcher from earliest startingBlockNumber (#136)

* Watcher computes earliest starting block number

* Update tests for new watcher behaviour

* Fix error oopsie

* Extract conditional to helper and suppres tests logging
This commit is contained in:
Edvard Hübinette 2019-01-15 11:09:13 +01:00 committed by GitHub
parent d41209d293
commit 909d23e176
3 changed files with 70 additions and 19 deletions

View File

@ -1,6 +1,8 @@
package shared_test package shared_test
import ( import (
"io/ioutil"
"log"
"testing" "testing"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -11,3 +13,7 @@ func TestShared(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "Shared Suite") RunSpecs(t, "Shared Suite")
} }
var _ = BeforeSuite(func() {
log.SetOutput(ioutil.Discard)
})

View File

@ -1,8 +1,9 @@
package shared package shared
import ( import (
"fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -15,6 +16,7 @@ type Watcher struct {
Chunker shared.Chunker Chunker shared.Chunker
Addresses []common.Address Addresses []common.Address
Topics []common.Hash Topics []common.Hash
StartingBlock *int64
} }
func NewWatcher(db *postgres.DB, bc core.BlockChain) Watcher { func NewWatcher(db *postgres.DB, bc core.BlockChain) Watcher {
@ -40,6 +42,12 @@ func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitial
config := transformer.GetConfig() config := transformer.GetConfig()
configs = append(configs, config) configs = append(configs, config)
if watcher.StartingBlock == nil {
watcher.StartingBlock = &config.StartingBlockNumber
} else if earlierStartingBlockNumber(config.StartingBlockNumber, *watcher.StartingBlock) {
watcher.StartingBlock = &config.StartingBlockNumber
}
addresses := shared.HexStringsToAddresses(config.ContractAddresses) addresses := shared.HexStringsToAddresses(config.ContractAddresses)
contractAddresses = append(contractAddresses, addresses...) contractAddresses = append(contractAddresses, addresses...)
topic0s = append(topic0s, common.HexToHash(config.Topic)) topic0s = append(topic0s, common.HexToHash(config.Topic))
@ -51,14 +59,17 @@ func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitial
} }
func (watcher *Watcher) Execute() error { func (watcher *Watcher) Execute() error {
if watcher.Transformers == nil {
return fmt.Errorf("No transformers added to watcher")
}
checkedColumnNames, err := shared.GetCheckedColumnNames(watcher.DB) checkedColumnNames, err := shared.GetCheckedColumnNames(watcher.DB)
if err != nil { if err != nil {
return err return err
} }
notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames) notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames)
// TODO Handle start and end numbers in transformers missingHeaders, err := shared.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL)
missingHeaders, err := shared.MissingHeaders(0, -1, watcher.DB, notCheckedSQL)
if err != nil { if err != nil {
log.Error("Fetching of missing headers failed in watcher!") log.Error("Fetching of missing headers failed in watcher!")
return err return err
@ -69,7 +80,7 @@ func (watcher *Watcher) Execute() error {
logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header) logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header)
if err != nil { if err != nil {
// TODO Handle fetch error in watcher // TODO Handle fetch error in watcher
log.Error("Error while fetching logs for header %v in watcher", header.Id) log.Errorf("Error while fetching logs for header %v in watcher", header.Id)
return err return err
} }
@ -82,10 +93,14 @@ func (watcher *Watcher) Execute() error {
logChunk := chunkedLogs[transformerName] logChunk := chunkedLogs[transformerName]
err = transformer.Execute(logChunk, header) err = transformer.Execute(logChunk, header)
if err != nil { if err != nil {
log.Error("%v transformer failed to execute in watcher: %v", transformerName, err) log.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err)
return err return err
} }
} }
} }
return err return err
} }
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {
return transformerBlock < watcherBlock
}

View File

@ -59,12 +59,33 @@ var _ = Describe("Watcher", func() {
common.HexToAddress("FakeAddress")})) common.HexToAddress("FakeAddress")}))
}) })
It("calculates earliest starting block number", func() {
fakeTransformer1 := &mocks.MockTransformer{}
fakeTransformer1.SetTransformerConfig(shared2.TransformerConfig{StartingBlockNumber: 5})
fakeTransformer2 := &mocks.MockTransformer{}
fakeTransformer2.SetTransformerConfig(shared2.TransformerConfig{StartingBlockNumber: 3})
watcher := shared.NewWatcher(nil, nil)
watcher.AddTransformers([]shared2.TransformerInitializer{
fakeTransformer1.FakeTransformerInitializer,
fakeTransformer2.FakeTransformerInitializer,
})
Expect(*watcher.StartingBlock).To(Equal(int64(3)))
})
It("returns an error when run without transformers", func() {
watcher := shared.NewWatcher(nil, nil)
err := watcher.Execute()
Expect(err).To(MatchError("No transformers added to watcher"))
})
Describe("with missing headers", func() { Describe("with missing headers", func() {
var ( var (
db *postgres.DB db *postgres.DB
watcher shared.Watcher watcher shared.Watcher
mockBlockChain fakes.MockBlockChain mockBlockChain fakes.MockBlockChain
fakeTransformer *mocks.MockTransformer
headerRepository repositories.HeaderRepository headerRepository repositories.HeaderRepository
repository mocks.MockWatcherRepository repository mocks.MockWatcherRepository
) )
@ -82,8 +103,8 @@ var _ = Describe("Watcher", func() {
}) })
It("executes each transformer", func() { It("executes each transformer", func() {
fakeTransformer = &mocks.MockTransformer{} fakeTransformer := &mocks.MockTransformer{}
watcher.Transformers = []shared2.Transformer{fakeTransformer} watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
err := watcher.Execute() err := watcher.Execute()
@ -93,8 +114,8 @@ var _ = Describe("Watcher", func() {
}) })
It("returns an error if transformer returns an error", func() { It("returns an error if transformer returns an error", func() {
fakeTransformer = &mocks.MockTransformer{ExecuteError: errors.New("Something bad happened")} fakeTransformer := &mocks.MockTransformer{ExecuteError: errors.New("Something bad happened")}
watcher.Transformers = []shared2.Transformer{fakeTransformer} watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
err := watcher.Execute() err := watcher.Execute()
@ -135,19 +156,27 @@ var _ = Describe("Watcher", func() {
}) })
Describe("uses the LogFetcher correctly:", func() { Describe("uses the LogFetcher correctly:", func() {
var fakeTransformer mocks.MockTransformer
BeforeEach(func() { BeforeEach(func() {
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
fakeTransformer = mocks.MockTransformer{}
}) })
It("fetches logs", func() { It("fetches logs for added transformers", func() {
addresses := []string{"0xA", "0xB"}
topic := "0x1"
fakeTransformer.SetTransformerConfig(shared2.TransformerConfig{
Topic: topic, ContractAddresses: addresses})
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
err := watcher.Execute() err := watcher.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
fakeHash := common.HexToHash(fakes.FakeHeader.Hash) fakeHash := common.HexToHash(fakes.FakeHeader.Hash)
mockBlockChain.AssertGetEthLogsWithCustomQueryCalledWith(ethereum.FilterQuery{ mockBlockChain.AssertGetEthLogsWithCustomQueryCalledWith(ethereum.FilterQuery{
BlockHash: &fakeHash, BlockHash: &fakeHash,
Addresses: nil, Addresses: shared2.HexStringsToAddresses(addresses),
Topics: [][]common.Hash{nil}, Topics: [][]common.Hash{{common.HexToHash(topic)}},
}) })
}) })
@ -155,6 +184,7 @@ var _ = Describe("Watcher", func() {
fetcherError := errors.New("FetcherError") fetcherError := errors.New("FetcherError")
mockBlockChain.SetGetEthLogsWithCustomQueryErr(fetcherError) mockBlockChain.SetGetEthLogsWithCustomQueryErr(fetcherError)
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
err := watcher.Execute() err := watcher.Execute()
Expect(err).To(MatchError(fetcherError)) Expect(err).To(MatchError(fetcherError))
}) })