From d30fcfed807fdeea2eaf36c287ace82b74bf4af9 Mon Sep 17 00:00:00 2001 From: Edvard Date: Thu, 13 Dec 2018 12:39:57 +0100 Subject: [PATCH] Use new config getter on shared.Transformer <3 --- cmd/backfillMakerLogs.go | 10 +-- cmd/continuousLogSync.go | 58 +++-------------- libraries/shared/watcher.go | 32 +++++----- libraries/shared/watcher_test.go | 62 +++++++++---------- .../factories/log_note_transformer.go | 6 +- pkg/transformers/factories/transformer.go | 6 +- pkg/transformers/shared/transformer.go | 2 +- pkg/transformers/transformers.go | 12 ---- 8 files changed, 73 insertions(+), 115 deletions(-) diff --git a/cmd/backfillMakerLogs.go b/cmd/backfillMakerLogs.go index 56986c85..4f19038e 100644 --- a/cmd/backfillMakerLogs.go +++ b/cmd/backfillMakerLogs.go @@ -49,11 +49,13 @@ func backfillMakerLogs() { repository := &shared2.Repository{} fetcher := shared2.NewFetcher(blockChain) - chunker := shared2.NewLogChunker() - watcher := shared.NewWatcher(db, fetcher, repository, chunker) + watcher := shared.NewWatcher(db, fetcher, repository) - watcher.AddTransformers(transformers.TransformerInitializers(), transformers.TransformerConfigs()) - watcher.Execute() + watcher.AddTransformers(transformers.TransformerInitializers()) + err = watcher.Execute() + if err != nil { + // TODO Handle watcher error in backfillMakerLogs + } } func init() { diff --git a/cmd/continuousLogSync.go b/cmd/continuousLogSync.go index 9a70cbe4..9f3e2fec 100644 --- a/cmd/continuousLogSync.go +++ b/cmd/continuousLogSync.go @@ -60,36 +60,32 @@ func syncMakerLogs() { fetcher := shared2.NewFetcher(blockChain) repository := &shared2.Repository{} - chunker := shared2.NewLogChunker() - initializers, configs := getTransformerSubset(transformerNames) - chunker.AddConfigs(configs) + initializers := getTransformerInitializers(transformerNames) - watcher := shared.NewWatcher(db, fetcher, repository, chunker) - watcher.AddTransformers(initializers, configs) + watcher := shared.NewWatcher(db, fetcher, repository) + watcher.AddTransformers(initializers) for range ticker.C { - watcher.Execute() + err = watcher.Execute() + if err != nil { + // TODO Handle watcher errors in ContinuousLogSync + } } } -func getTransformerSubset(transformerNames []string) ([]shared2.TransformerInitializer, []shared2.TransformerConfig) { +func getTransformerInitializers(transformerNames []string) []shared2.TransformerInitializer { var initializers []shared2.TransformerInitializer - var configs []shared2.TransformerConfig if transformerNames[0] == "all" { initializers = transformers.TransformerInitializers() - configs = transformers.TransformerConfigs() } else { initializerMap := buildTransformerInitializerMap() - configMap := buildTransformerConfigMap() - for _, transformerName := range transformerNames { initializers = append(initializers, initializerMap[transformerName]) - configs = append(configs, configMap[transformerName]) } } - return initializers, configs + return initializers } func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer { @@ -128,42 +124,6 @@ func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer return initializerMap } -func buildTransformerConfigMap() map[string]shared2.TransformerConfig { - configMap := make(map[string]shared2.TransformerConfig) - - configMap[constants.BiteLabel] = transformers.BiteTransformer.Config - configMap[constants.BiteLabel] = transformers.BiteTransformer.Config - configMap[constants.CatFileChopLumpLabel] = transformers.CatFileChopLumpTransformer.Config - configMap[constants.CatFileFlipLabel] = transformers.CatFileFlipTransformer.Config - configMap[constants.CatFilePitVowLabel] = transformers.CatFilePitVowTransformer.Config - configMap[constants.DealLabel] = transformers.DealTransformer.Config - configMap[constants.DentLabel] = transformers.DentTransformer.Config - configMap[constants.DripDripLabel] = transformers.DripDripTransformer.Config - configMap[constants.DripFileIlkLabel] = transformers.DripFileIlkTransformer.Config - configMap[constants.DripFileRepoLabel] = transformers.DripFileRepoTransformer.Config - configMap[constants.DripFileVowLabel] = transformers.DripFileVowTransfromer.Config - configMap[constants.FlapKickLabel] = transformers.FlapKickTransformer.Config - configMap[constants.FlipKickLabel] = transformers.FlipKickTransformer.Config - configMap[constants.FlopKickLabel] = transformers.FlopKickTransformer.Config - configMap[constants.FrobLabel] = transformers.FrobTransformer.Config - configMap[constants.PitFileDebtCeilingLabel] = transformers.PitFileDebtCeilingTransformer.Config - configMap[constants.PitFileIlkLabel] = transformers.PitFileIlkTransformer.Config - configMap[constants.PriceFeedLabel] = transformers.PriceFeedTransformer.Config - configMap[constants.TendLabel] = transformers.TendTransformer.Config - configMap[constants.VatFluxLabel] = transformers.VatFluxTransformer.Config - configMap[constants.VatFoldLabel] = transformers.VatFoldTransformer.Config - configMap[constants.VatGrabLabel] = transformers.VatGrabTransformer.Config - configMap[constants.VatHealLabel] = transformers.VatHealTransformer.Config - configMap[constants.VatInitLabel] = transformers.VatInitTransformer.Config - configMap[constants.VatMoveLabel] = transformers.VatMoveTransformer.Config - configMap[constants.VatSlipLabel] = transformers.VatSlipTransformer.Config - configMap[constants.VatTollLabel] = transformers.VatTollTransformer.Config - configMap[constants.VatTuneLabel] = transformers.VatTuneTransformer.Config - configMap[constants.VowFlogLabel] = transformers.FlogTransformer.Config - - return configMap -} - func init() { rootCmd.AddCommand(continuousLogSyncCmd) continuousLogSyncCmd.Flags().StringSliceVar(&transformerNames, "transformers", []string{"all"}, "transformer names to be run during this command") diff --git a/libraries/shared/watcher.go b/libraries/shared/watcher.go index e5c96851..7b68752c 100644 --- a/libraries/shared/watcher.go +++ b/libraries/shared/watcher.go @@ -24,7 +24,8 @@ type Watcher struct { Repository WatcherRepository } -func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository, chunker shared.Chunker) Watcher { +func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository) Watcher { + chunker := shared.NewLogChunker() return Watcher{ DB: db, Fetcher: fetcher, @@ -33,25 +34,21 @@ func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRe } } -// Adds transformers to the watcher, each needs an initializer and the associated config. -// This also changes the configuration of the chunker, so that it will consider the new transformers. -func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitializer, configs []shared.TransformerConfig) { - if len(initializers) != len(configs) { - panic("Mismatch in number of transformers initializers and configs!") - } +// Adds transformers to the watcher and updates the chunker, so that it will consider the new transformers. +func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitializer) { + var contractAddresses []common.Address + var topic0s []common.Hash + var configs []shared.TransformerConfig for _, initializer := range initializers { transformer := initializer(watcher.DB) watcher.Transformers = append(watcher.Transformers, transformer) - } - var contractAddresses []common.Address - var topic0s []common.Hash + config := transformer.GetConfig() + configs = append(configs, config) - for _, config := range configs { - for _, address := range config.ContractAddresses { - contractAddresses = append(contractAddresses, common.HexToAddress(address)) - } + addresses := shared.HexStringsToAddresses(config.ContractAddresses) + contractAddresses = append(contractAddresses, addresses...) topic0s = append(topic0s, common.HexToHash(config.Topic)) } @@ -85,11 +82,14 @@ func (watcher *Watcher) Execute() error { chunkedLogs := watcher.Chunker.ChunkLogs(logs) + // Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync, + // not all logs we're interested in might have been fetched. for _, transformer := range watcher.Transformers { - logChunk := chunkedLogs[transformer.Name()] + transformerName := transformer.GetConfig().TransformerName + logChunk := chunkedLogs[transformerName] err = transformer.Execute(logChunk, header) if err != nil { - log.Error("%v transformer failed to execute in watcher: %v", transformer.Name(), err) + log.Error("%v transformer failed to execute in watcher: %v", transformerName, err) return err } } diff --git a/libraries/shared/watcher_test.go b/libraries/shared/watcher_test.go index 7b91ece9..5fc3c0c5 100644 --- a/libraries/shared/watcher_test.go +++ b/libraries/shared/watcher_test.go @@ -21,7 +21,7 @@ type MockTransformer struct { executeError error passedLogs []types.Log passedHeader core.Header - transformerName string + config shared2.TransformerConfig } func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error { @@ -34,57 +34,60 @@ func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error { return nil } -func (mh *MockTransformer) Name() string { - return mh.transformerName +func (mh *MockTransformer) GetConfig() shared2.TransformerConfig { + return mh.config } -func (mh *MockTransformer) SetTransformerName(name string) { - mh.transformerName = name +func (mh *MockTransformer) SetTransformerConfig(config shared2.TransformerConfig) { + mh.config = config } -func fakeTransformerInitializer(db *postgres.DB) shared2.Transformer { - return &MockTransformer{} +func (mh *MockTransformer) FakeTransformerInitializer(db *postgres.DB) shared2.Transformer { + return mh } -var fakeTransformerConfig = []shared2.TransformerConfig{{ +var fakeTransformerConfig = shared2.TransformerConfig{ TransformerName: "FakeTransformer", ContractAddresses: []string{"FakeAddress"}, Topic: "FakeTopic", -}} +} var _ = Describe("Watcher", func() { It("initialises correctly", func() { db := test_config.NewTestDB(core.Node{ID: "testNode"}) fetcher := &mocks.MockLogFetcher{} repository := &mocks.MockWatcherRepository{} - chunker := shared2.NewLogChunker() - watcher := shared.NewWatcher(db, fetcher, repository, chunker) + watcher := shared.NewWatcher(db, fetcher, repository) Expect(watcher.DB).To(Equal(db)) Expect(watcher.Fetcher).To(Equal(fetcher)) - Expect(watcher.Chunker).To(Equal(chunker)) + Expect(watcher.Chunker).NotTo(BeNil()) Expect(watcher.Repository).To(Equal(repository)) }) It("adds transformers", func() { - chunker := shared2.NewLogChunker() - watcher := shared.NewWatcher(nil, nil, nil, chunker) - - watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) + watcher := shared.NewWatcher(nil, nil, nil) + fakeTransformer := &MockTransformer{} + fakeTransformer.SetTransformerConfig(fakeTransformerConfig) + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) Expect(len(watcher.Transformers)).To(Equal(1)) - Expect(watcher.Transformers).To(ConsistOf(&MockTransformer{})) + Expect(watcher.Transformers).To(ConsistOf(fakeTransformer)) Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic")})) Expect(watcher.Addresses).To(Equal([]common.Address{common.HexToAddress("FakeAddress")})) }) It("adds transformers from multiple sources", func() { - chunker := shared2.NewLogChunker() - watcher := shared.NewWatcher(nil, nil, nil, chunker) + watcher := shared.NewWatcher(nil, nil, nil) + fakeTransformer1 := &MockTransformer{} + fakeTransformer1.SetTransformerConfig(fakeTransformerConfig) - watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) - watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) + fakeTransformer2 := &MockTransformer{} + fakeTransformer2.SetTransformerConfig(fakeTransformerConfig) + + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer1.FakeTransformerInitializer}) + watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer2.FakeTransformerInitializer}) Expect(len(watcher.Transformers)).To(Equal(2)) Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic"), @@ -101,7 +104,6 @@ var _ = Describe("Watcher", func() { headerRepository repositories.HeaderRepository mockFetcher mocks.MockLogFetcher repository mocks.MockWatcherRepository - chunker *shared2.LogChunker ) BeforeEach(func() { @@ -111,10 +113,9 @@ var _ = Describe("Watcher", func() { headerRepository = repositories.NewHeaderRepository(db) _, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) - chunker = shared2.NewLogChunker() repository = mocks.MockWatcherRepository{} - watcher = shared.NewWatcher(db, &mockFetcher, &repository, chunker) + watcher = shared.NewWatcher(db, &mockFetcher, &repository) }) It("executes each transformer", func() { @@ -141,9 +142,7 @@ var _ = Describe("Watcher", func() { It("passes only relevant logs to each transformer", func() { transformerA := &MockTransformer{} - transformerA.SetTransformerName("transformerA") transformerB := &MockTransformer{} - transformerB.SetTransformerName("transformerB") configA := shared2.TransformerConfig{TransformerName: "transformerA", ContractAddresses: []string{"0x000000000000000000000000000000000000000A"}, @@ -151,7 +150,9 @@ var _ = Describe("Watcher", func() { configB := shared2.TransformerConfig{TransformerName: "transformerB", ContractAddresses: []string{"0x000000000000000000000000000000000000000b"}, Topic: "0xB"} - configs := []shared2.TransformerConfig{configA, configB} + + transformerA.SetTransformerConfig(configA) + transformerB.SetTransformerConfig(configB) logA := types.Log{Address: common.HexToAddress("0xA"), Topics: []common.Hash{common.HexToHash("0xA")}} @@ -159,11 +160,10 @@ var _ = Describe("Watcher", func() { Topics: []common.Hash{common.HexToHash("0xB")}} mockFetcher.SetFetchedLogs([]types.Log{logA, logB}) - chunker.AddConfigs(configs) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) - watcher = shared.NewWatcher(db, &mockFetcher, &repository, chunker) - watcher.Transformers = []shared2.Transformer{transformerA, transformerB} + watcher = shared.NewWatcher(db, &mockFetcher, &repository) + watcher.AddTransformers([]shared2.TransformerInitializer{ + transformerA.FakeTransformerInitializer, transformerB.FakeTransformerInitializer}) err := watcher.Execute() Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/transformers/factories/log_note_transformer.go b/pkg/transformers/factories/log_note_transformer.go index 200f0bce..6ae63f4b 100644 --- a/pkg/transformers/factories/log_note_transformer.go +++ b/pkg/transformers/factories/log_note_transformer.go @@ -61,6 +61,10 @@ func (transformer LogNoteTransformer) Execute(logs []types.Log, header core.Head return nil } -func (transformer LogNoteTransformer) Name() string { +func (transformer LogNoteTransformer) GetName() string { return transformer.Config.TransformerName } + +func (transformer LogNoteTransformer) GetConfig() shared.TransformerConfig { + return transformer.Config +} diff --git a/pkg/transformers/factories/transformer.go b/pkg/transformers/factories/transformer.go index d6a4c385..5b164814 100644 --- a/pkg/transformers/factories/transformer.go +++ b/pkg/transformers/factories/transformer.go @@ -68,6 +68,10 @@ func (transformer Transformer) Execute(logs []types.Log, header core.Header) err return nil } -func (transformer Transformer) Name() string { +func (transformer Transformer) GetName() string { return transformer.Config.TransformerName } + +func (transformer Transformer) GetConfig() shared.TransformerConfig { + return transformer.Config +} diff --git a/pkg/transformers/shared/transformer.go b/pkg/transformers/shared/transformer.go index 2673a6f3..0cc510d4 100644 --- a/pkg/transformers/shared/transformer.go +++ b/pkg/transformers/shared/transformer.go @@ -24,7 +24,7 @@ import ( type Transformer interface { Execute(logs []types.Log, header core.Header) error - Name() string + GetConfig() TransformerConfig } type TransformerInitializer func(db *postgres.DB) Transformer diff --git a/pkg/transformers/transformers.go b/pkg/transformers/transformers.go index cb95908b..6b0b7938 100644 --- a/pkg/transformers/transformers.go +++ b/pkg/transformers/transformers.go @@ -267,15 +267,3 @@ func TransformerInitializers() (initializers []shared.TransformerInitializer) { } return } - -// `TransformerConfigs` returns the config structs for all available transformers -func TransformerConfigs() (allConfigs []shared.TransformerConfig) { - for _, transformer := range logNoteTransformers { - allConfigs = append(allConfigs, transformer.Config) - } - - for _, transformer := range logNoteTransformers { - allConfigs = append(allConfigs, transformer.Config) - } - return -}