forked from cerc-io/ipld-eth-server
Remove injection of fetcher and repository to watcher
This commit is contained in:
parent
e54699c039
commit
833dde62cf
@ -17,8 +17,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
shared2 "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared"
|
"github.com/vulcanize/vulcanizedb/libraries/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers"
|
||||||
@ -47,9 +45,7 @@ func backfillMakerLogs() {
|
|||||||
log.Fatal("Failed to initialize database.")
|
log.Fatal("Failed to initialize database.")
|
||||||
}
|
}
|
||||||
|
|
||||||
repository := &shared2.Repository{}
|
watcher := shared.NewWatcher(db, blockChain)
|
||||||
fetcher := shared2.NewFetcher(blockChain)
|
|
||||||
watcher := shared.NewWatcher(db, fetcher, repository)
|
|
||||||
|
|
||||||
watcher.AddTransformers(transformers.TransformerInitializers())
|
watcher.AddTransformers(transformers.TransformerInitializers())
|
||||||
err = watcher.Execute()
|
err = watcher.Execute()
|
||||||
|
@ -58,12 +58,9 @@ func syncMakerLogs() {
|
|||||||
log.Fatal("Failed to initialize database.")
|
log.Fatal("Failed to initialize database.")
|
||||||
}
|
}
|
||||||
|
|
||||||
fetcher := shared2.NewFetcher(blockChain)
|
|
||||||
repository := &shared2.Repository{}
|
|
||||||
|
|
||||||
initializers := getTransformerInitializers(transformerNames)
|
initializers := getTransformerInitializers(transformerNames)
|
||||||
|
|
||||||
watcher := shared.NewWatcher(db, fetcher, repository)
|
watcher := shared.NewWatcher(db, blockChain)
|
||||||
watcher.AddTransformers(initializers)
|
watcher.AddTransformers(initializers)
|
||||||
|
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
|
@ -6,7 +6,7 @@ password = "vulcanize"
|
|||||||
port = 5432
|
port = 5432
|
||||||
|
|
||||||
[client]
|
[client]
|
||||||
ipcPath = "http://147.75.199.135:8545"
|
ipcPath = "http://kovan0.vulcanize.io:8545"
|
||||||
|
|
||||||
[datadog]
|
[datadog]
|
||||||
name = "maker_vdb_staging"
|
name = "maker_vdb_staging"
|
||||||
|
@ -8,12 +8,6 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WatcherRepository interface {
|
|
||||||
GetCheckedColumnNames(db *postgres.DB) ([]string, error)
|
|
||||||
CreateNotCheckedSQL(boolColumns []string) string
|
|
||||||
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Watcher struct {
|
type Watcher struct {
|
||||||
Transformers []shared.Transformer
|
Transformers []shared.Transformer
|
||||||
DB *postgres.DB
|
DB *postgres.DB
|
||||||
@ -21,16 +15,15 @@ type Watcher struct {
|
|||||||
Chunker shared.Chunker
|
Chunker shared.Chunker
|
||||||
Addresses []common.Address
|
Addresses []common.Address
|
||||||
Topics []common.Hash
|
Topics []common.Hash
|
||||||
Repository WatcherRepository
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWatcher(db *postgres.DB, fetcher shared.LogFetcher, repository WatcherRepository) Watcher {
|
func NewWatcher(db *postgres.DB, bc core.BlockChain) Watcher {
|
||||||
chunker := shared.NewLogChunker()
|
chunker := shared.NewLogChunker()
|
||||||
|
fetcher := shared.NewFetcher(bc)
|
||||||
return Watcher{
|
return Watcher{
|
||||||
DB: db,
|
DB: db,
|
||||||
Fetcher: fetcher,
|
Fetcher: fetcher,
|
||||||
Chunker: chunker,
|
Chunker: chunker,
|
||||||
Repository: repository,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,14 +51,14 @@ func (watcher *Watcher) AddTransformers(initializers []shared.TransformerInitial
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (watcher *Watcher) Execute() error {
|
func (watcher *Watcher) Execute() error {
|
||||||
checkedColumnNames, err := watcher.Repository.GetCheckedColumnNames(watcher.DB)
|
checkedColumnNames, err := shared.GetCheckedColumnNames(watcher.DB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
notCheckedSQL := watcher.Repository.CreateNotCheckedSQL(checkedColumnNames)
|
notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames)
|
||||||
|
|
||||||
// TODO Handle start and end numbers in transformers
|
// TODO Handle start and end numbers in transformers
|
||||||
missingHeaders, err := watcher.Repository.MissingHeaders(0, -1, watcher.DB, notCheckedSQL)
|
missingHeaders, err := shared.MissingHeaders(0, -1, watcher.DB, notCheckedSQL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Fetching of missing headers failed in watcher!")
|
log.Error("Fetching of missing headers failed in watcher!")
|
||||||
return err
|
return err
|
||||||
|
@ -52,7 +52,7 @@ func (repository BiteRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -62,5 +62,5 @@ func (repository BiteRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository BiteRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository BiteRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.BiteChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.BiteChecked)
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ func (repository CatFileChopLumpRepository) Create(headerID int64, models []inte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -58,7 +58,7 @@ func (repository CatFileChopLumpRepository) Create(headerID int64, models []inte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFileChopLumpRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository CatFileChopLumpRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.CatFileChopLumpChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileChopLumpChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *CatFileChopLumpRepository) SetDB(db *postgres.DB) {
|
func (repository *CatFileChopLumpRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -48,7 +48,7 @@ func (repository CatFileFlipRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -57,7 +57,7 @@ func (repository CatFileFlipRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFileFlipRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository CatFileFlipRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.CatFileFlipChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileFlipChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *CatFileFlipRepository) SetDB(db *postgres.DB) {
|
func (repository *CatFileFlipRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -48,7 +48,7 @@ func (repository CatFilePitVowRepository) Create(headerID int64, models []interf
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -57,7 +57,7 @@ func (repository CatFilePitVowRepository) Create(headerID int64, models []interf
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFilePitVowRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository CatFilePitVowRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.CatFilePitVowChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFilePitVowChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *CatFilePitVowRepository) SetDB(db *postgres.DB) {
|
func (repository *CatFilePitVowRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -49,7 +49,7 @@ func (repository DealRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -58,7 +58,7 @@ func (repository DealRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DealRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository DealRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.DealChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.DealChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DealRepository) SetDB(db *postgres.DB) {
|
func (repository *DealRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -54,7 +54,7 @@ func (repository DentRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -63,7 +63,7 @@ func (repository DentRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DentRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository DentRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.DentChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.DentChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DentRepository) SetDB(db *postgres.DB) {
|
func (repository *DentRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -48,7 +48,7 @@ func (repository DripDripRepository) Create(headerID int64, models []interface{}
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -57,7 +57,7 @@ func (repository DripDripRepository) Create(headerID int64, models []interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripDripRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository DripDripRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.DripDripChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripDripChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DripDripRepository) SetDB(db *postgres.DB) {
|
func (repository *DripDripRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -50,7 +50,7 @@ func (repository DripFileIlkRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -60,7 +60,7 @@ func (repository DripFileIlkRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileIlkRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository DripFileIlkRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.DripFileIlkChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileIlkChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DripFileIlkRepository) SetDB(db *postgres.DB) {
|
func (repository *DripFileIlkRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -50,7 +50,7 @@ func (repository DripFileRepoRepository) Create(headerID int64, models []interfa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -60,7 +60,7 @@ func (repository DripFileRepoRepository) Create(headerID int64, models []interfa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileRepoRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository DripFileRepoRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.DripFileRepoChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileRepoChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DripFileRepoRepository) SetDB(db *postgres.DB) {
|
func (repository *DripFileRepoRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -49,7 +49,7 @@ func (repository DripFileVowRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -59,7 +59,7 @@ func (repository DripFileVowRepository) Create(headerID int64, models []interfac
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileVowRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository DripFileVowRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.DripFileVowChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileVowChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *DripFileVowRepository) SetDB(db *postgres.DB) {
|
func (repository *DripFileVowRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -47,7 +47,7 @@ func (repository *FlapKickRepository) Create(headerID int64, models []interface{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -56,7 +56,7 @@ func (repository *FlapKickRepository) Create(headerID int64, models []interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FlapKickRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository *FlapKickRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.FlapKickChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.FlapKickChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FlapKickRepository) SetDB(db *postgres.DB) {
|
func (repository *FlapKickRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -47,7 +47,7 @@ func (repository FlipKickRepository) Create(headerID int64, models []interface{}
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -56,7 +56,7 @@ func (repository FlipKickRepository) Create(headerID int64, models []interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FlipKickRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository FlipKickRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.FlipKickChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.FlipKickChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FlipKickRepository) SetDB(db *postgres.DB) {
|
func (repository *FlipKickRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -48,7 +48,7 @@ func (repository FlopKickRepository) Create(headerID int64, models []interface{}
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -58,7 +58,7 @@ func (repository FlopKickRepository) Create(headerID int64, models []interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FlopKickRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository FlopKickRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.FlopKickChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.FlopKickChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FlopKickRepository) SetDB(db *postgres.DB) {
|
func (repository *FlopKickRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -46,7 +46,7 @@ func (repository FrobRepository) Create(headerID int64, models []interface{}) er
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -55,7 +55,7 @@ func (repository FrobRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FrobRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository FrobRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.FrobChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.FrobChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FrobRepository) SetDB(db *postgres.DB) {
|
func (repository *FrobRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -50,7 +50,7 @@ func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -60,7 +60,7 @@ func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []i
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PitFileDebtCeilingRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository PitFileDebtCeilingRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.PitFileDebtCeilingChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileDebtCeilingChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *PitFileDebtCeilingRepository) SetDB(db *postgres.DB) {
|
func (repository *PitFileDebtCeilingRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -49,7 +49,7 @@ func (repository PitFileIlkRepository) Create(headerID int64, models []interface
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -58,7 +58,7 @@ func (repository PitFileIlkRepository) Create(headerID int64, models []interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PitFileIlkRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository PitFileIlkRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.PitFileIlkChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileIlkChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *PitFileIlkRepository) SetDB(db *postgres.DB) {
|
func (repository *PitFileIlkRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -45,7 +45,7 @@ func (repository PriceFeedRepository) Create(headerID int64, models []interface{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -54,7 +54,7 @@ func (repository PriceFeedRepository) Create(headerID int64, models []interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.PriceFeedsChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.PriceFeedsChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *PriceFeedRepository) SetDB(db *postgres.DB) {
|
func (repository *PriceFeedRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -9,9 +9,7 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Repository struct{}
|
func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
|
||||||
|
|
||||||
func (_ Repository) MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
|
|
||||||
_, err := db.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
|
_, err := db.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
|
||||||
VALUES ($1, $2)
|
VALUES ($1, $2)
|
||||||
ON CONFLICT (header_id) DO
|
ON CONFLICT (header_id) DO
|
||||||
@ -19,7 +17,7 @@ func (_ Repository) MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHe
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ Repository) MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, checkedHeadersColumn string) error {
|
func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, checkedHeadersColumn string) error {
|
||||||
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
|
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
|
||||||
VALUES ($1, $2)
|
VALUES ($1, $2)
|
||||||
ON CONFLICT (header_id) DO
|
ON CONFLICT (header_id) DO
|
||||||
@ -28,7 +26,7 @@ func (_ Repository) MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Treats a header as missing if it's not in the headers table, or not checked for some log type
|
// Treats a header as missing if it's not in the headers table, or not checked for some log type
|
||||||
func (_ Repository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error) {
|
func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error) {
|
||||||
var result []core.Header
|
var result []core.Header
|
||||||
var query string
|
var query string
|
||||||
var err error
|
var err error
|
||||||
@ -53,7 +51,7 @@ func (_ Repository) MissingHeaders(startingBlockNumber, endingBlockNumber int64,
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_ Repository) GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
||||||
// Query returns `[]driver.Value`, nullable polymorphic interface
|
// Query returns `[]driver.Value`, nullable polymorphic interface
|
||||||
var queryResult []driver.Value
|
var queryResult []driver.Value
|
||||||
columnNamesQuery :=
|
columnNamesQuery :=
|
||||||
@ -84,7 +82,7 @@ func (_ Repository) GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
|||||||
// Defaults to FALSE when no columns are provided.
|
// Defaults to FALSE when no columns are provided.
|
||||||
// Ex: ["columnA", "columnB"] => "NOT (columnA AND columnB)"
|
// Ex: ["columnA", "columnB"] => "NOT (columnA AND columnB)"
|
||||||
// [] => "FALSE"
|
// [] => "FALSE"
|
||||||
func (_ Repository) CreateNotCheckedSQL(boolColumns []string) string {
|
func CreateNotCheckedSQL(boolColumns []string) string {
|
||||||
var result bytes.Buffer
|
var result bytes.Buffer
|
||||||
|
|
||||||
if len(boolColumns) == 0 {
|
if len(boolColumns) == 0 {
|
||||||
|
@ -55,7 +55,7 @@ func (repository TendRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -64,7 +64,7 @@ func (repository TendRepository) Create(headerID int64, models []interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository TendRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository TendRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.TendChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.TendChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *TendRepository) SetDB(db *postgres.DB) {
|
func (repository *TendRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -47,7 +47,7 @@ func (repository VatFluxRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -57,7 +57,7 @@ func (repository VatFluxRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.VatFluxChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.VatFluxChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatFluxRepository) SetDB(db *postgres.DB) {
|
func (repository *VatFluxRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -49,7 +49,7 @@ func (repository VatFoldRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -59,7 +59,7 @@ func (repository VatFoldRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatFoldChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatFoldChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatFoldRepository) SetDB(db *postgres.DB) {
|
func (repository *VatFoldRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -34,7 +34,7 @@ func (repository VatGrabRepository) Create(headerID int64, models []interface{})
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -43,7 +43,7 @@ func (repository VatGrabRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatGrabRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatGrabRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatGrabChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatGrabChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatGrabRepository) SetDB(db *postgres.DB) {
|
func (repository *VatGrabRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -52,7 +52,7 @@ func (repository VatHealRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -61,5 +61,5 @@ func (repository VatHealRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatHealRepository) MarkHeaderChecked(headerId int64) error {
|
func (repository VatHealRepository) MarkHeaderChecked(headerId int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerId, repository.db, constants.VatHealChecked)
|
return shared.MarkHeaderChecked(headerId, repository.db, constants.VatHealChecked)
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ func (repository VatInitRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -59,7 +59,7 @@ func (repository VatInitRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatInitRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatInitRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatInitChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatInitChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatInitRepository) SetDB(db *postgres.DB) {
|
func (repository *VatInitRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -50,7 +50,7 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -60,7 +60,7 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatMoveRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatMoveRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatMoveChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatMoveChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatMoveRepository) SetDB(db *postgres.DB) {
|
func (repository *VatMoveRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -34,7 +34,7 @@ func (repository VatSlipRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -44,7 +44,7 @@ func (repository VatSlipRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatSlipRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatSlipRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatSlipChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatSlipChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatSlipRepository) SetDB(db *postgres.DB) {
|
func (repository *VatSlipRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -34,7 +34,7 @@ func (repository VatTollRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -43,7 +43,7 @@ func (repository VatTollRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatTollRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatTollRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatTollChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTollChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatTollRepository) SetDB(db *postgres.DB) {
|
func (repository *VatTollRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -34,7 +34,7 @@ func (repository VatTuneRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -43,7 +43,7 @@ func (repository VatTuneRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatTuneRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VatTuneRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VatTuneChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTuneChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VatTuneRepository) SetDB(db *postgres.DB) {
|
func (repository *VatTuneRepository) SetDB(db *postgres.DB) {
|
||||||
|
@ -50,7 +50,7 @@ func (repository VowFlogRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.Repository{}.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked)
|
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
@ -60,7 +60,7 @@ func (repository VowFlogRepository) Create(headerID int64, models []interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VowFlogRepository) MarkHeaderChecked(headerID int64) error {
|
func (repository VowFlogRepository) MarkHeaderChecked(headerID int64) error {
|
||||||
return shared.Repository{}.MarkHeaderChecked(headerID, repository.db, constants.VowFlogChecked)
|
return shared.MarkHeaderChecked(headerID, repository.db, constants.VowFlogChecked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *VowFlogRepository) SetDB(db *postgres.DB) {
|
func (repository *VowFlogRepository) SetDB(db *postgres.DB) {
|
||||||
|
Loading…
Reference in New Issue
Block a user