Change transformer.Execute to single header

This commit is contained in:
Edvard 2018-12-04 17:05:34 +01:00
parent 38c745e8c3
commit 8bebcdc064
4 changed files with 38 additions and 47 deletions

View File

@ -74,7 +74,7 @@ func (watcher *Watcher) Execute() error {
// TODO delegate log chunks to respective transformers // TODO delegate log chunks to respective transformers
// Need to get the transformer name... :/ // Need to get the transformer name... :/
logChunk := chunkedLogs["transformerName"] logChunk := chunkedLogs["transformerName"]
err = transformer.Execute(logChunk, missingHeaders) err = transformer.Execute(logChunk, header)
} }
} }
return err return err

View File

@ -34,32 +34,28 @@ func (transformer LogNoteTransformer) NewLogNoteTransformer(db *postgres.DB) sha
return transformer return transformer
} }
func (transformer LogNoteTransformer) Execute(logs []types.Log, missingHeaders []core.Header) error { func (transformer LogNoteTransformer) Execute(logs []types.Log, header core.Header) error {
transformerName := transformer.Config.TransformerName transformerName := transformer.Config.TransformerName
for _, header := range missingHeaders { // No matching logs, mark the header as checked for this type of logs
// No matching logs, mark the header as checked for this type of logs if len(logs) < 1 {
if len(logs) < 1 { err := transformer.Repository.MarkHeaderChecked(header.Id)
err := transformer.Repository.MarkHeaderChecked(header.Id)
if err != nil {
log.Printf("Error marking header as checked in %v: %v", transformerName, err)
return err
}
// Continue with the next header; nothing to persist
continue
}
models, err := transformer.Converter.ToModels(logs)
if err != nil { if err != nil {
log.Printf("Error converting logs in %v: %v", transformerName, err) log.Printf("Error marking header as checked in %v: %v", transformerName, err)
return err
}
err = transformer.Repository.Create(header.Id, models)
if err != nil {
log.Printf("Error persisting %v record: %v", transformerName, err)
return err return err
} }
} }
models, err := transformer.Converter.ToModels(logs)
if err != nil {
log.Printf("Error converting logs in %v: %v", transformerName, err)
return err
}
err = transformer.Repository.Create(header.Id, models)
if err != nil {
log.Printf("Error persisting %v record: %v", transformerName, err)
return err
}
return nil return nil
} }

View File

@ -34,39 +34,34 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) shared.Transforme
return transformer return transformer
} }
func (transformer Transformer) Execute(logs []types.Log, missingHeaders []core.Header) error { func (transformer Transformer) Execute(logs []types.Log, header core.Header) error {
transformerName := transformer.Config.TransformerName transformerName := transformer.Config.TransformerName
config := transformer.Config config := transformer.Config
for _, header := range missingHeaders { if len(logs) < 1 {
if len(logs) < 1 { err := transformer.Repository.MarkHeaderChecked(header.Id)
err := transformer.Repository.MarkHeaderChecked(header.Id)
if err != nil {
log.Printf("Error marking header as checked in %v: %v", transformerName, err)
return err
}
continue
}
entities, err := transformer.Converter.ToEntities(config.ContractAbi, logs)
if err != nil { if err != nil {
log.Printf("Error converting logs to entities in %v: %v", transformerName, err) log.Printf("Error marking header as checked in %v: %v", transformerName, err)
return err return err
} }
}
models, err := transformer.Converter.ToModels(entities) entities, err := transformer.Converter.ToEntities(config.ContractAbi, logs)
if err != nil { if err != nil {
log.Printf("Error converting entities to models in %v: %v", transformerName, err) log.Printf("Error converting logs to entities in %v: %v", transformerName, err)
return err return err
} }
err = transformer.Repository.Create(header.Id, models) models, err := transformer.Converter.ToModels(entities)
if err != nil { if err != nil {
log.Printf("Error persisting %v record: %v", transformerName, err) log.Printf("Error converting entities to models in %v: %v", transformerName, err)
return err return err
} }
err = transformer.Repository.Create(header.Id, models)
if err != nil {
log.Printf("Error persisting %v record: %v", transformerName, err)
return err
} }
return nil return nil

View File

@ -23,7 +23,7 @@ import (
) )
type Transformer interface { type Transformer interface {
Execute(logs []types.Log, missingHeaders []core.Header) error Execute(logs []types.Log, header core.Header) error
} }
type TransformerInitializer func(db *postgres.DB) Transformer type TransformerInitializer func(db *postgres.DB) Transformer