2018-02-13 16:31:57 +00:00
|
|
|
package shared
|
|
|
|
|
|
|
|
import (
|
2018-11-30 16:28:52 +00:00
|
|
|
"github.com/ethereum/go-ethereum/common"
|
2018-12-10 14:40:45 +00:00
|
|
|
"github.com/ethereum/go-ethereum/log"
|
2018-12-11 10:35:13 +00:00
|
|
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
2018-02-13 16:31:57 +00:00
|
|
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
2018-08-09 21:55:02 +00:00
|
|
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
2018-02-13 16:31:57 +00:00
|
|
|
)
|
|
|
|
|
2018-12-11 10:35:13 +00:00
|
|
|
type WatcherRepository interface {
|
|
|
|
GetCheckedColumnNames(db *postgres.DB) ([]string, error)
|
|
|
|
CreateNotCheckedSQL(boolColumns []string) string
|
|
|
|
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error)
|
|
|
|
}
|
|
|
|
|
2018-02-13 16:31:57 +00:00
|
|
|
type Watcher struct {
|
2018-08-09 21:55:02 +00:00
|
|
|
Transformers []shared.Transformer
|
2018-12-10 15:50:13 +00:00
|
|
|
DB *postgres.DB
|
2018-11-30 16:28:52 +00:00
|
|
|
Fetcher shared.LogFetcher
|
2018-12-12 14:41:29 +00:00
|
|
|
Chunker shared.Chunker
|
2018-11-30 16:28:52 +00:00
|
|
|
Addresses []common.Address
|
|
|
|
Topics []common.Hash
|
2018-12-11 10:35:13 +00:00
|
|
|
Repository WatcherRepository
|
2018-11-30 16:28:52 +00:00
|
|
|
}
|
|
|
|
|
2018-12-13 11:39:57 +00:00
|
|
|
func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository) Watcher {
|
|
|
|
chunker := shared.NewLogChunker()
|
2018-11-30 16:28:52 +00:00
|
|
|
return Watcher{
|
2018-12-11 10:35:13 +00:00
|
|
|
DB: db,
|
|
|
|
Fetcher: fetcher,
|
|
|
|
Chunker: chunker,
|
|
|
|
Repository: repository,
|
2018-11-30 16:28:52 +00:00
|
|
|
}
|
2018-02-13 16:31:57 +00:00
|
|
|
}
|
|
|
|
|
2018-12-13 11:39:57 +00:00
|
|
|
// Adds transformers to the watcher and updates the chunker, so that it will consider the new transformers.
|
|
|
|
func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitializer) {
|
|
|
|
var contractAddresses []common.Address
|
|
|
|
var topic0s []common.Hash
|
|
|
|
var configs []shared.TransformerConfig
|
2018-12-12 14:41:29 +00:00
|
|
|
|
|
|
|
for _, initializer := range initializers {
|
|
|
|
transformer := initializer(watcher.DB)
|
2018-03-16 19:34:18 +00:00
|
|
|
watcher.Transformers = append(watcher.Transformers, transformer)
|
2018-12-12 14:41:29 +00:00
|
|
|
|
2018-12-13 11:39:57 +00:00
|
|
|
config := transformer.GetConfig()
|
|
|
|
configs = append(configs, config)
|
2018-12-12 14:41:29 +00:00
|
|
|
|
2018-12-13 11:39:57 +00:00
|
|
|
addresses := shared.HexStringsToAddresses(config.ContractAddresses)
|
|
|
|
contractAddresses = append(contractAddresses, addresses...)
|
2018-12-12 14:41:29 +00:00
|
|
|
topic0s = append(topic0s, common.HexToHash(config.Topic))
|
|
|
|
}
|
|
|
|
|
|
|
|
watcher.Addresses = append(watcher.Addresses, contractAddresses...)
|
|
|
|
watcher.Topics = append(watcher.Topics, topic0s...)
|
|
|
|
watcher.Chunker.AddConfigs(configs)
|
2018-02-13 16:31:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (watcher *Watcher) Execute() error {
|
2018-12-11 10:35:13 +00:00
|
|
|
checkedColumnNames, err := watcher.Repository.GetCheckedColumnNames(watcher.DB)
|
2018-12-04 15:04:13 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-12-11 10:35:13 +00:00
|
|
|
notCheckedSQL := watcher.Repository.CreateNotCheckedSQL(checkedColumnNames)
|
2018-12-04 15:04:13 +00:00
|
|
|
|
2018-12-11 10:35:13 +00:00
|
|
|
// TODO Handle start and end numbers in transformers
|
|
|
|
missingHeaders, err := watcher.Repository.MissingHeaders(0, -1, watcher.DB, notCheckedSQL)
|
2018-12-10 20:12:19 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Error("Fetching of missing headers failed in watcher!")
|
|
|
|
return err
|
|
|
|
}
|
2018-11-30 16:28:52 +00:00
|
|
|
|
|
|
|
for _, header := range missingHeaders {
|
|
|
|
// TODO Extend FetchLogs for doing several blocks at a time
|
|
|
|
logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header)
|
|
|
|
if err != nil {
|
|
|
|
// TODO Handle fetch error in watcher
|
2018-12-11 10:35:13 +00:00
|
|
|
log.Error("Error while fetching logs for header %v in watcher", header.Id)
|
2018-11-30 16:28:52 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-11-28 16:00:04 +00:00
|
|
|
|
2018-11-30 16:28:52 +00:00
|
|
|
chunkedLogs := watcher.Chunker.ChunkLogs(logs)
|
2018-11-28 16:00:04 +00:00
|
|
|
|
2018-12-13 11:39:57 +00:00
|
|
|
// Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync,
|
|
|
|
// not all logs we're interested in might have been fetched.
|
2018-11-30 16:28:52 +00:00
|
|
|
for _, transformer := range watcher.Transformers {
|
2018-12-13 11:39:57 +00:00
|
|
|
transformerName := transformer.GetConfig().TransformerName
|
|
|
|
logChunk := chunkedLogs[transformerName]
|
2018-12-04 16:05:34 +00:00
|
|
|
err = transformer.Execute(logChunk, header)
|
2018-12-10 14:40:45 +00:00
|
|
|
if err != nil {
|
2018-12-13 11:39:57 +00:00
|
|
|
log.Error("%v transformer failed to execute in watcher: %v", transformerName, err)
|
2018-12-10 14:40:45 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-11-30 16:28:52 +00:00
|
|
|
}
|
2018-02-13 16:31:57 +00:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|