Use new config getter on shared.Transformer <3

This commit is contained in:
Edvard 2018-12-13 12:39:57 +01:00
parent 882a6dd7a2
commit d30fcfed80
8 changed files with 73 additions and 115 deletions

View File

@ -49,11 +49,13 @@ func backfillMakerLogs() {
repository := &shared2.Repository{} repository := &shared2.Repository{}
fetcher := shared2.NewFetcher(blockChain) fetcher := shared2.NewFetcher(blockChain)
chunker := shared2.NewLogChunker() watcher := shared.NewWatcher(db, fetcher, repository)
watcher := shared.NewWatcher(db, fetcher, repository, chunker)
watcher.AddTransformers(transformers.TransformerInitializers(), transformers.TransformerConfigs()) watcher.AddTransformers(transformers.TransformerInitializers())
watcher.Execute() err = watcher.Execute()
if err != nil {
// TODO Handle watcher error in backfillMakerLogs
}
} }
func init() { func init() {

View File

@ -60,36 +60,32 @@ func syncMakerLogs() {
fetcher := shared2.NewFetcher(blockChain) fetcher := shared2.NewFetcher(blockChain)
repository := &shared2.Repository{} repository := &shared2.Repository{}
chunker := shared2.NewLogChunker()
initializers, configs := getTransformerSubset(transformerNames) initializers := getTransformerInitializers(transformerNames)
chunker.AddConfigs(configs)
watcher := shared.NewWatcher(db, fetcher, repository, chunker) watcher := shared.NewWatcher(db, fetcher, repository)
watcher.AddTransformers(initializers, configs) watcher.AddTransformers(initializers)
for range ticker.C { for range ticker.C {
watcher.Execute() err = watcher.Execute()
if err != nil {
// TODO Handle watcher errors in ContinuousLogSync
}
} }
} }
func getTransformerSubset(transformerNames []string) ([]shared2.TransformerInitializer, []shared2.TransformerConfig) { func getTransformerInitializers(transformerNames []string) []shared2.TransformerInitializer {
var initializers []shared2.TransformerInitializer var initializers []shared2.TransformerInitializer
var configs []shared2.TransformerConfig
if transformerNames[0] == "all" { if transformerNames[0] == "all" {
initializers = transformers.TransformerInitializers() initializers = transformers.TransformerInitializers()
configs = transformers.TransformerConfigs()
} else { } else {
initializerMap := buildTransformerInitializerMap() initializerMap := buildTransformerInitializerMap()
configMap := buildTransformerConfigMap()
for _, transformerName := range transformerNames { for _, transformerName := range transformerNames {
initializers = append(initializers, initializerMap[transformerName]) initializers = append(initializers, initializerMap[transformerName])
configs = append(configs, configMap[transformerName])
} }
} }
return initializers, configs return initializers
} }
func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer { func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer {
@ -128,42 +124,6 @@ func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer
return initializerMap return initializerMap
} }
func buildTransformerConfigMap() map[string]shared2.TransformerConfig {
configMap := make(map[string]shared2.TransformerConfig)
configMap[constants.BiteLabel] = transformers.BiteTransformer.Config
configMap[constants.BiteLabel] = transformers.BiteTransformer.Config
configMap[constants.CatFileChopLumpLabel] = transformers.CatFileChopLumpTransformer.Config
configMap[constants.CatFileFlipLabel] = transformers.CatFileFlipTransformer.Config
configMap[constants.CatFilePitVowLabel] = transformers.CatFilePitVowTransformer.Config
configMap[constants.DealLabel] = transformers.DealTransformer.Config
configMap[constants.DentLabel] = transformers.DentTransformer.Config
configMap[constants.DripDripLabel] = transformers.DripDripTransformer.Config
configMap[constants.DripFileIlkLabel] = transformers.DripFileIlkTransformer.Config
configMap[constants.DripFileRepoLabel] = transformers.DripFileRepoTransformer.Config
configMap[constants.DripFileVowLabel] = transformers.DripFileVowTransfromer.Config
configMap[constants.FlapKickLabel] = transformers.FlapKickTransformer.Config
configMap[constants.FlipKickLabel] = transformers.FlipKickTransformer.Config
configMap[constants.FlopKickLabel] = transformers.FlopKickTransformer.Config
configMap[constants.FrobLabel] = transformers.FrobTransformer.Config
configMap[constants.PitFileDebtCeilingLabel] = transformers.PitFileDebtCeilingTransformer.Config
configMap[constants.PitFileIlkLabel] = transformers.PitFileIlkTransformer.Config
configMap[constants.PriceFeedLabel] = transformers.PriceFeedTransformer.Config
configMap[constants.TendLabel] = transformers.TendTransformer.Config
configMap[constants.VatFluxLabel] = transformers.VatFluxTransformer.Config
configMap[constants.VatFoldLabel] = transformers.VatFoldTransformer.Config
configMap[constants.VatGrabLabel] = transformers.VatGrabTransformer.Config
configMap[constants.VatHealLabel] = transformers.VatHealTransformer.Config
configMap[constants.VatInitLabel] = transformers.VatInitTransformer.Config
configMap[constants.VatMoveLabel] = transformers.VatMoveTransformer.Config
configMap[constants.VatSlipLabel] = transformers.VatSlipTransformer.Config
configMap[constants.VatTollLabel] = transformers.VatTollTransformer.Config
configMap[constants.VatTuneLabel] = transformers.VatTuneTransformer.Config
configMap[constants.VowFlogLabel] = transformers.FlogTransformer.Config
return configMap
}
func init() { func init() {
rootCmd.AddCommand(continuousLogSyncCmd) rootCmd.AddCommand(continuousLogSyncCmd)
continuousLogSyncCmd.Flags().StringSliceVar(&transformerNames, "transformers", []string{"all"}, "transformer names to be run during this command") continuousLogSyncCmd.Flags().StringSliceVar(&transformerNames, "transformers", []string{"all"}, "transformer names to be run during this command")

View File

@ -24,7 +24,8 @@ type Watcher struct {
Repository WatcherRepository Repository WatcherRepository
} }
func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository, chunker shared.Chunker) Watcher { func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository) Watcher {
chunker := shared.NewLogChunker()
return Watcher{ return Watcher{
DB: db, DB: db,
Fetcher: fetcher, Fetcher: fetcher,
@ -33,25 +34,21 @@ func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRe
} }
} }
// Adds transformers to the watcher, each needs an initializer and the associated config. // Adds transformers to the watcher and updates the chunker, so that it will consider the new transformers.
// This also changes the configuration of the chunker, so that it will consider the new transformers. func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitializer) {
func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitializer, configs []shared.TransformerConfig) { var contractAddresses []common.Address
if len(initializers) != len(configs) { var topic0s []common.Hash
panic("Mismatch in number of transformers initializers and configs!") var configs []shared.TransformerConfig
}
for _, initializer := range initializers { for _, initializer := range initializers {
transformer := initializer(watcher.DB) transformer := initializer(watcher.DB)
watcher.Transformers = append(watcher.Transformers, transformer) watcher.Transformers = append(watcher.Transformers, transformer)
}
var contractAddresses []common.Address config := transformer.GetConfig()
var topic0s []common.Hash configs = append(configs, config)
for _, config := range configs { addresses := shared.HexStringsToAddresses(config.ContractAddresses)
for _, address := range config.ContractAddresses { contractAddresses = append(contractAddresses, addresses...)
contractAddresses = append(contractAddresses, common.HexToAddress(address))
}
topic0s = append(topic0s, common.HexToHash(config.Topic)) topic0s = append(topic0s, common.HexToHash(config.Topic))
} }
@ -85,11 +82,14 @@ func (watcher *Watcher) Execute() error {
chunkedLogs := watcher.Chunker.ChunkLogs(logs) chunkedLogs := watcher.Chunker.ChunkLogs(logs)
// Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync,
// not all logs we're interested in might have been fetched.
for _, transformer := range watcher.Transformers { for _, transformer := range watcher.Transformers {
logChunk := chunkedLogs[transformer.Name()] transformerName := transformer.GetConfig().TransformerName
logChunk := chunkedLogs[transformerName]
err = transformer.Execute(logChunk, header) err = transformer.Execute(logChunk, header)
if err != nil { if err != nil {
log.Error("%v transformer failed to execute in watcher: %v", transformer.Name(), err) log.Error("%v transformer failed to execute in watcher: %v", transformerName, err)
return err return err
} }
} }

View File

@ -21,7 +21,7 @@ type MockTransformer struct {
executeError error executeError error
passedLogs []types.Log passedLogs []types.Log
passedHeader core.Header passedHeader core.Header
transformerName string config shared2.TransformerConfig
} }
func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error { func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error {
@ -34,57 +34,60 @@ func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error {
return nil return nil
} }
func (mh *MockTransformer) Name() string { func (mh *MockTransformer) GetConfig() shared2.TransformerConfig {
return mh.transformerName return mh.config
} }
func (mh *MockTransformer) SetTransformerName(name string) { func (mh *MockTransformer) SetTransformerConfig(config shared2.TransformerConfig) {
mh.transformerName = name mh.config = config
} }
func fakeTransformerInitializer(db *postgres.DB) shared2.Transformer { func (mh *MockTransformer) FakeTransformerInitializer(db *postgres.DB) shared2.Transformer {
return &MockTransformer{} return mh
} }
var fakeTransformerConfig = []shared2.TransformerConfig{{ var fakeTransformerConfig = shared2.TransformerConfig{
TransformerName: "FakeTransformer", TransformerName: "FakeTransformer",
ContractAddresses: []string{"FakeAddress"}, ContractAddresses: []string{"FakeAddress"},
Topic: "FakeTopic", Topic: "FakeTopic",
}} }
var _ = Describe("Watcher", func() { var _ = Describe("Watcher", func() {
It("initialises correctly", func() { It("initialises correctly", func() {
db := test_config.NewTestDB(core.Node{ID: "testNode"}) db := test_config.NewTestDB(core.Node{ID: "testNode"})
fetcher := &mocks.MockLogFetcher{} fetcher := &mocks.MockLogFetcher{}
repository := &mocks.MockWatcherRepository{} repository := &mocks.MockWatcherRepository{}
chunker := shared2.NewLogChunker()
watcher := shared.NewWatcher(db, fetcher, repository, chunker) watcher := shared.NewWatcher(db, fetcher, repository)
Expect(watcher.DB).To(Equal(db)) Expect(watcher.DB).To(Equal(db))
Expect(watcher.Fetcher).To(Equal(fetcher)) Expect(watcher.Fetcher).To(Equal(fetcher))
Expect(watcher.Chunker).To(Equal(chunker)) Expect(watcher.Chunker).NotTo(BeNil())
Expect(watcher.Repository).To(Equal(repository)) Expect(watcher.Repository).To(Equal(repository))
}) })
It("adds transformers", func() { It("adds transformers", func() {
chunker := shared2.NewLogChunker() watcher := shared.NewWatcher(nil, nil, nil)
watcher := shared.NewWatcher(nil, nil, nil, chunker) fakeTransformer := &MockTransformer{}
fakeTransformer.SetTransformerConfig(fakeTransformerConfig)
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(len(watcher.Transformers)).To(Equal(1)) Expect(len(watcher.Transformers)).To(Equal(1))
Expect(watcher.Transformers).To(ConsistOf(&MockTransformer{})) Expect(watcher.Transformers).To(ConsistOf(fakeTransformer))
Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic")})) Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic")}))
Expect(watcher.Addresses).To(Equal([]common.Address{common.HexToAddress("FakeAddress")})) Expect(watcher.Addresses).To(Equal([]common.Address{common.HexToAddress("FakeAddress")}))
}) })
It("adds transformers from multiple sources", func() { It("adds transformers from multiple sources", func() {
chunker := shared2.NewLogChunker() watcher := shared.NewWatcher(nil, nil, nil)
watcher := shared.NewWatcher(nil, nil, nil, chunker) fakeTransformer1 := &MockTransformer{}
fakeTransformer1.SetTransformerConfig(fakeTransformerConfig)
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) fakeTransformer2 := &MockTransformer{}
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformerInitializer}, fakeTransformerConfig) fakeTransformer2.SetTransformerConfig(fakeTransformerConfig)
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer1.FakeTransformerInitializer})
watcher.AddTransformers([]shared2.TransformerInitializer{fakeTransformer2.FakeTransformerInitializer})
Expect(len(watcher.Transformers)).To(Equal(2)) Expect(len(watcher.Transformers)).To(Equal(2))
Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic"), Expect(watcher.Topics).To(Equal([]common.Hash{common.HexToHash("FakeTopic"),
@ -101,7 +104,6 @@ var _ = Describe("Watcher", func() {
headerRepository repositories.HeaderRepository headerRepository repositories.HeaderRepository
mockFetcher mocks.MockLogFetcher mockFetcher mocks.MockLogFetcher
repository mocks.MockWatcherRepository repository mocks.MockWatcherRepository
chunker *shared2.LogChunker
) )
BeforeEach(func() { BeforeEach(func() {
@ -111,10 +113,9 @@ var _ = Describe("Watcher", func() {
headerRepository = repositories.NewHeaderRepository(db) headerRepository = repositories.NewHeaderRepository(db)
_, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) _, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
chunker = shared2.NewLogChunker()
repository = mocks.MockWatcherRepository{} repository = mocks.MockWatcherRepository{}
watcher = shared.NewWatcher(db, &mockFetcher, &repository, chunker) watcher = shared.NewWatcher(db, &mockFetcher, &repository)
}) })
It("executes each transformer", func() { It("executes each transformer", func() {
@ -141,9 +142,7 @@ var _ = Describe("Watcher", func() {
It("passes only relevant logs to each transformer", func() { It("passes only relevant logs to each transformer", func() {
transformerA := &MockTransformer{} transformerA := &MockTransformer{}
transformerA.SetTransformerName("transformerA")
transformerB := &MockTransformer{} transformerB := &MockTransformer{}
transformerB.SetTransformerName("transformerB")
configA := shared2.TransformerConfig{TransformerName: "transformerA", configA := shared2.TransformerConfig{TransformerName: "transformerA",
ContractAddresses: []string{"0x000000000000000000000000000000000000000A"}, ContractAddresses: []string{"0x000000000000000000000000000000000000000A"},
@ -151,7 +150,9 @@ var _ = Describe("Watcher", func() {
configB := shared2.TransformerConfig{TransformerName: "transformerB", configB := shared2.TransformerConfig{TransformerName: "transformerB",
ContractAddresses: []string{"0x000000000000000000000000000000000000000b"}, ContractAddresses: []string{"0x000000000000000000000000000000000000000b"},
Topic: "0xB"} Topic: "0xB"}
configs := []shared2.TransformerConfig{configA, configB}
transformerA.SetTransformerConfig(configA)
transformerB.SetTransformerConfig(configB)
logA := types.Log{Address: common.HexToAddress("0xA"), logA := types.Log{Address: common.HexToAddress("0xA"),
Topics: []common.Hash{common.HexToHash("0xA")}} Topics: []common.Hash{common.HexToHash("0xA")}}
@ -159,11 +160,10 @@ var _ = Describe("Watcher", func() {
Topics: []common.Hash{common.HexToHash("0xB")}} Topics: []common.Hash{common.HexToHash("0xB")}}
mockFetcher.SetFetchedLogs([]types.Log{logA, logB}) mockFetcher.SetFetchedLogs([]types.Log{logA, logB})
chunker.AddConfigs(configs)
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
watcher = shared.NewWatcher(db, &mockFetcher, &repository, chunker) watcher = shared.NewWatcher(db, &mockFetcher, &repository)
watcher.Transformers = []shared2.Transformer{transformerA, transformerB} watcher.AddTransformers([]shared2.TransformerInitializer{
transformerA.FakeTransformerInitializer, transformerB.FakeTransformerInitializer})
err := watcher.Execute() err := watcher.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -61,6 +61,10 @@ func (transformer LogNoteTransformer) Execute(logs []types.Log, header core.Head
return nil return nil
} }
func (transformer LogNoteTransformer) Name() string { func (transformer LogNoteTransformer) GetName() string {
return transformer.Config.TransformerName return transformer.Config.TransformerName
} }
func (transformer LogNoteTransformer) GetConfig() shared.TransformerConfig {
return transformer.Config
}

View File

@ -68,6 +68,10 @@ func (transformer Transformer) Execute(logs []types.Log, header core.Header) err
return nil return nil
} }
func (transformer Transformer) Name() string { func (transformer Transformer) GetName() string {
return transformer.Config.TransformerName return transformer.Config.TransformerName
} }
func (transformer Transformer) GetConfig() shared.TransformerConfig {
return transformer.Config
}

View File

@ -24,7 +24,7 @@ import (
type Transformer interface { type Transformer interface {
Execute(logs []types.Log, header core.Header) error Execute(logs []types.Log, header core.Header) error
Name() string GetConfig() TransformerConfig
} }
type TransformerInitializer func(db *postgres.DB) Transformer type TransformerInitializer func(db *postgres.DB) Transformer

View File

@ -267,15 +267,3 @@ func TransformerInitializers() (initializers []shared.TransformerInitializer) {
} }
return return
} }
// `TransformerConfigs` returns the config structs for all available transformers
func TransformerConfigs() (allConfigs []shared.TransformerConfig) {
for _, transformer := range logNoteTransformers {
allConfigs = append(allConfigs, transformer.Config)
}
for _, transformer := range logNoteTransformers {
allConfigs = append(allConfigs, transformer.Config)
}
return
}