From 38c745e8c3113a27e68f7cbe82799d169f5acf02 Mon Sep 17 00:00:00 2001 From: Edvard Date: Tue, 4 Dec 2018 16:04:13 +0100 Subject: [PATCH] Refactor fetching out from repositories to log_fetcher and watcher --- libraries/shared/watcher.go | 9 +- pkg/transformers/bite/repository.go | 5 -- .../cat_file/chop_lump/repository.go | 5 -- pkg/transformers/cat_file/flip/repository.go | 5 -- .../cat_file/pit_vow/repository.go | 5 -- pkg/transformers/deal/repository.go | 5 -- pkg/transformers/dent/repository.go | 5 -- pkg/transformers/drip_drip/repository.go | 5 -- pkg/transformers/drip_file/ilk/repository.go | 5 -- pkg/transformers/drip_file/repo/repository.go | 5 -- pkg/transformers/drip_file/vow/repository.go | 5 -- pkg/transformers/factories/repository.go | 2 - pkg/transformers/flap_kick/repository.go | 5 -- pkg/transformers/flip_kick/repository.go | 5 -- pkg/transformers/flop_kick/repository.go | 5 -- pkg/transformers/frob/repository.go | 5 -- .../pit_file/debt_ceiling/repository.go | 5 -- pkg/transformers/pit_file/ilk/repository.go | 5 -- pkg/transformers/price_feeds/repository.go | 5 -- pkg/transformers/shared/log_fetcher_test.go | 12 ++- pkg/transformers/shared/repository.go | 61 ++++++++++++- .../shared/repository_utility_test.go | 90 +++++++++++++++++++ pkg/transformers/tend/repository.go | 5 -- pkg/transformers/vat_flux/repository.go | 5 -- pkg/transformers/vat_fold/repository.go | 5 -- pkg/transformers/vat_grab/repository.go | 5 -- pkg/transformers/vat_heal/repository.go | 5 -- pkg/transformers/vat_init/repository.go | 5 -- pkg/transformers/vat_move/repository.go | 5 -- pkg/transformers/vat_slip/repository.go | 5 -- pkg/transformers/vat_toll/repository.go | 5 -- pkg/transformers/vat_tune/repository.go | 5 -- pkg/transformers/vow_flog/repository.go | 5 -- 33 files changed, 163 insertions(+), 151 deletions(-) create mode 100644 pkg/transformers/shared/repository_utility_test.go diff --git a/libraries/shared/watcher.go b/libraries/shared/watcher.go index 2820ad1d..ff138dfc 100644 --- a/libraries/shared/watcher.go +++ b/libraries/shared/watcher.go @@ -51,9 +51,14 @@ func (watcher *Watcher) AddTransformers(us []shared.TransformerInitializer) { } func (watcher *Watcher) Execute() error { - // TODO Solve checkedHeadersColumn issue + checkedColumnNames, err := shared.GetCheckedColumnNames(&watcher.DB) + if err != nil { + return err + } + notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames) + // TODO Handle start and end numbers in transformers? - missingHeaders, err := shared.MissingHeaders(0, -1, &watcher.DB, "") + missingHeaders, err := shared.MissingHeaders(0, -1, &watcher.DB, notCheckedSQL) for _, header := range missingHeaders { // TODO Extend FetchLogs for doing several blocks at a time diff --git a/pkg/transformers/bite/repository.go b/pkg/transformers/bite/repository.go index 54cdad84..1e3d860b 100644 --- a/pkg/transformers/bite/repository.go +++ b/pkg/transformers/bite/repository.go @@ -16,7 +16,6 @@ package bite import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -65,7 +64,3 @@ func (repository BiteRepository) Create(headerID int64, models []interface{}) er func (repository BiteRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.BiteChecked) } - -func (repository BiteRepository) MissingHeaders(startingBlockNumber int64, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.BiteChecked) -} diff --git a/pkg/transformers/cat_file/chop_lump/repository.go b/pkg/transformers/cat_file/chop_lump/repository.go index 52ef1446..9dff41f7 100644 --- a/pkg/transformers/cat_file/chop_lump/repository.go +++ b/pkg/transformers/cat_file/chop_lump/repository.go @@ -16,7 +16,6 @@ package chop_lump import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -62,10 +61,6 @@ func (repository CatFileChopLumpRepository) MarkHeaderChecked(headerID int64) er return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileChopLumpChecked) } -func (repository CatFileChopLumpRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFileChopLumpChecked) -} - func (repository *CatFileChopLumpRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/cat_file/flip/repository.go b/pkg/transformers/cat_file/flip/repository.go index 8f59c20f..a07ca207 100644 --- a/pkg/transformers/cat_file/flip/repository.go +++ b/pkg/transformers/cat_file/flip/repository.go @@ -16,7 +16,6 @@ package flip import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -61,10 +60,6 @@ func (repository CatFileFlipRepository) MarkHeaderChecked(headerID int64) error return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileFlipChecked) } -func (repository CatFileFlipRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFileFlipChecked) -} - func (repository *CatFileFlipRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/cat_file/pit_vow/repository.go b/pkg/transformers/cat_file/pit_vow/repository.go index 6aa7a108..861dc2a8 100644 --- a/pkg/transformers/cat_file/pit_vow/repository.go +++ b/pkg/transformers/cat_file/pit_vow/repository.go @@ -16,7 +16,6 @@ package pit_vow import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -61,10 +60,6 @@ func (repository CatFilePitVowRepository) MarkHeaderChecked(headerID int64) erro return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFilePitVowChecked) } -func (repository CatFilePitVowRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFilePitVowChecked) -} - func (repository *CatFilePitVowRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/deal/repository.go b/pkg/transformers/deal/repository.go index 84a40325..06f23230 100644 --- a/pkg/transformers/deal/repository.go +++ b/pkg/transformers/deal/repository.go @@ -16,7 +16,6 @@ package deal import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -62,10 +61,6 @@ func (repository DealRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.DealChecked) } -func (repository DealRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DealChecked) -} - func (repository *DealRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/dent/repository.go b/pkg/transformers/dent/repository.go index e237fb54..0ff5370b 100644 --- a/pkg/transformers/dent/repository.go +++ b/pkg/transformers/dent/repository.go @@ -16,7 +16,6 @@ package dent import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -67,10 +66,6 @@ func (repository DentRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.DentChecked) } -func (repository DentRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DentChecked) -} - func (repository *DentRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/drip_drip/repository.go b/pkg/transformers/drip_drip/repository.go index 95780c74..edb1208a 100644 --- a/pkg/transformers/drip_drip/repository.go +++ b/pkg/transformers/drip_drip/repository.go @@ -16,7 +16,6 @@ package drip_drip import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -61,10 +60,6 @@ func (repository DripDripRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.DripDripChecked) } -func (repository DripDripRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripDripChecked) -} - func (repository *DripDripRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/drip_file/ilk/repository.go b/pkg/transformers/drip_file/ilk/repository.go index 8c602ef9..117a28fd 100644 --- a/pkg/transformers/drip_file/ilk/repository.go +++ b/pkg/transformers/drip_file/ilk/repository.go @@ -16,7 +16,6 @@ package ilk import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -64,10 +63,6 @@ func (repository DripFileIlkRepository) MarkHeaderChecked(headerID int64) error return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileIlkChecked) } -func (repository DripFileIlkRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileIlkChecked) -} - func (repository *DripFileIlkRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/drip_file/repo/repository.go b/pkg/transformers/drip_file/repo/repository.go index 592e92e6..dc9589a7 100644 --- a/pkg/transformers/drip_file/repo/repository.go +++ b/pkg/transformers/drip_file/repo/repository.go @@ -16,7 +16,6 @@ package repo import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -64,10 +63,6 @@ func (repository DripFileRepoRepository) MarkHeaderChecked(headerID int64) error return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileRepoChecked) } -func (repository DripFileRepoRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileRepoChecked) -} - func (repository *DripFileRepoRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/drip_file/vow/repository.go b/pkg/transformers/drip_file/vow/repository.go index 35b089d0..aa23bea8 100644 --- a/pkg/transformers/drip_file/vow/repository.go +++ b/pkg/transformers/drip_file/vow/repository.go @@ -16,7 +16,6 @@ package vow import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -63,10 +62,6 @@ func (repository DripFileVowRepository) MarkHeaderChecked(headerID int64) error return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileVowChecked) } -func (repository DripFileVowRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileVowChecked) -} - func (repository *DripFileVowRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/factories/repository.go b/pkg/transformers/factories/repository.go index 015fe49f..0ddc9274 100644 --- a/pkg/transformers/factories/repository.go +++ b/pkg/transformers/factories/repository.go @@ -15,13 +15,11 @@ package factories import ( - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type Repository interface { Create(headerID int64, models []interface{}) error MarkHeaderChecked(headerID int64) error - MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) SetDB(db *postgres.DB) } diff --git a/pkg/transformers/flap_kick/repository.go b/pkg/transformers/flap_kick/repository.go index 59f5803a..f9f11c7a 100644 --- a/pkg/transformers/flap_kick/repository.go +++ b/pkg/transformers/flap_kick/repository.go @@ -16,7 +16,6 @@ package flap_kick import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -60,10 +59,6 @@ func (repository *FlapKickRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.FlapKickChecked) } -func (repository FlapKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlapKickChecked) -} - func (repository *FlapKickRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/flip_kick/repository.go b/pkg/transformers/flip_kick/repository.go index 21de65bc..db8a8d01 100644 --- a/pkg/transformers/flip_kick/repository.go +++ b/pkg/transformers/flip_kick/repository.go @@ -17,7 +17,6 @@ package flip_kick import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -60,10 +59,6 @@ func (repository FlipKickRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.FlipKickChecked) } -func (repository FlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlipKickChecked) -} - func (repository *FlipKickRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/flop_kick/repository.go b/pkg/transformers/flop_kick/repository.go index b175caf9..7a302c75 100644 --- a/pkg/transformers/flop_kick/repository.go +++ b/pkg/transformers/flop_kick/repository.go @@ -16,7 +16,6 @@ package flop_kick import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -62,10 +61,6 @@ func (repository FlopKickRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.FlopKickChecked) } -func (repository FlopKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlopKickChecked) -} - func (repository *FlopKickRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/frob/repository.go b/pkg/transformers/frob/repository.go index afe7c4b1..bf6d4d08 100644 --- a/pkg/transformers/frob/repository.go +++ b/pkg/transformers/frob/repository.go @@ -17,7 +17,6 @@ package frob import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -59,10 +58,6 @@ func (repository FrobRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.FrobChecked) } -func (repository FrobRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FrobChecked) -} - func (repository *FrobRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/pit_file/debt_ceiling/repository.go b/pkg/transformers/pit_file/debt_ceiling/repository.go index 34e398f8..12f69fb2 100644 --- a/pkg/transformers/pit_file/debt_ceiling/repository.go +++ b/pkg/transformers/pit_file/debt_ceiling/repository.go @@ -16,7 +16,6 @@ package debt_ceiling import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -64,10 +63,6 @@ func (repository PitFileDebtCeilingRepository) MarkHeaderChecked(headerID int64) return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileDebtCeilingChecked) } -func (repository PitFileDebtCeilingRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PitFileDebtCeilingChecked) -} - func (repository *PitFileDebtCeilingRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/pit_file/ilk/repository.go b/pkg/transformers/pit_file/ilk/repository.go index c186a638..9a187de7 100644 --- a/pkg/transformers/pit_file/ilk/repository.go +++ b/pkg/transformers/pit_file/ilk/repository.go @@ -16,7 +16,6 @@ package ilk import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -62,10 +61,6 @@ func (repository PitFileIlkRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileIlkChecked) } -func (repository PitFileIlkRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PitFileIlkChecked) -} - func (repository *PitFileIlkRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/price_feeds/repository.go b/pkg/transformers/price_feeds/repository.go index 59dbab8c..cbb9a012 100644 --- a/pkg/transformers/price_feeds/repository.go +++ b/pkg/transformers/price_feeds/repository.go @@ -16,7 +16,6 @@ package price_feeds import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -58,10 +57,6 @@ func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.PriceFeedsChecked) } -func (repository PriceFeedRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PriceFeedsChecked) -} - func (repository *PriceFeedRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/shared/log_fetcher_test.go b/pkg/transformers/shared/log_fetcher_test.go index 8e5aa7fa..4a0c55fa 100644 --- a/pkg/transformers/shared/log_fetcher_test.go +++ b/pkg/transformers/shared/log_fetcher_test.go @@ -32,8 +32,12 @@ var _ = Describe("Fetcher", func() { fetcher := shared.NewFetcher(blockChain) header := fakes.FakeHeader - addresses := []string{"0xfakeAddress", "0xanotherFakeAddress"} - topicZeros := [][]common.Hash{{common.BytesToHash([]byte{1, 2, 3, 4, 5})}} + addresses := []common.Address{ + common.HexToAddress("0xfakeAddress"), + common.HexToAddress("0xanotherFakeAddress"), + } + + topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})} _, err := fetcher.FetchLogs(addresses, topicZeros, header) @@ -45,7 +49,7 @@ var _ = Describe("Fetcher", func() { expectedQuery := ethereum.FilterQuery{ BlockHash: &blockHash, Addresses: []common.Address{address1, address2}, - Topics: topicZeros, + Topics: [][]common.Hash{topicZeros}, } blockChain.AssertGetEthLogsWithCustomQueryCalledWith(expectedQuery) }) @@ -55,7 +59,7 @@ var _ = Describe("Fetcher", func() { blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) fetcher := shared.NewFetcher(blockChain) - _, err := fetcher.FetchLogs([]string{}, [][]common.Hash{}, core.Header{}) + _, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/pkg/transformers/shared/repository.go b/pkg/transformers/shared/repository.go index d0c6b5f6..a1d5c8db 100644 --- a/pkg/transformers/shared/repository.go +++ b/pkg/transformers/shared/repository.go @@ -2,8 +2,11 @@ package shared import ( "database/sql" + "database/sql/driver" + "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "strings" ) func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error { @@ -22,7 +25,9 @@ func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, checkedHeadersCo return err } -func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, checkedHeadersColumn string) ([]core.Header, error) { +// Treats a header as missing if it's not in the headers table, or not checked for some log type +// TODO Revisit definition of "checked header +func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error) { var result []core.Header var query string var err error @@ -30,14 +35,14 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D if endingBlockNumber == -1 { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR ` + checkedHeadersColumn + ` IS FALSE) + WHERE (header_id ISNULL OR ` + notCheckedSQL + `) AND headers.block_number >= $1 AND headers.eth_node_fingerprint = $2` err = db.Select(&result, query, startingBlockNumber, db.Node.ID) } else { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR ` + checkedHeadersColumn + ` IS FALSE) + WHERE (header_id ISNULL OR ` + notCheckedSQL + `) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3` @@ -46,3 +51,53 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D return result, err } + +func GetCheckedColumnNames(db *postgres.DB) ([]string, error) { + // Query returns `[]driver.Value`, nullable polymorphic interface + var queryResult []driver.Value + columnNamesQuery := + `SELECT column_name FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'checked_headers' + AND column_name != 'id' + AND column_name != 'header_id';` + + err := db.Select(&queryResult, columnNamesQuery) + if err != nil { + return []string{}, err + } + + // Transform column names from `driver.Value` to strings + var columnNames []string + for _, result := range queryResult { + if columnName, ok := result.(string); ok { + columnNames = append(columnNames, columnName) + } else { + return []string{}, fmt.Errorf("incorrect value for checked_headers column name") + } + } + + return columnNames, nil +} + +// Builds a SQL string that checks if any column value is FALSE, given the column names. +// Defaults to FALSE when no columns are provided. +// Ex: ["columnA", "columnB"] => "NOT (columnA AND columnB)" +// [] => "FALSE" +func CreateNotCheckedSQL(boolColumns []string) string { + var result strings.Builder + + if len(boolColumns) == 0 { + return "FALSE" + } + + result.WriteString("NOT (") + for _, column := range boolColumns[:len(boolColumns)-1] { + result.WriteString(fmt.Sprintf("%v AND ", column)) + } + + // No trailing "OR" for last column name + result.WriteString(fmt.Sprintf("%v)", boolColumns[len(boolColumns)-1])) + + return result.String() +} diff --git a/pkg/transformers/shared/repository_utility_test.go b/pkg/transformers/shared/repository_utility_test.go new file mode 100644 index 00000000..e07f939b --- /dev/null +++ b/pkg/transformers/shared/repository_utility_test.go @@ -0,0 +1,90 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Repository utilities", func() { + Describe("GetCheckedColumnNames", func() { + It("gets the column names from checked_headers", func() { + db := test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + expectedColumnNames := getExpectedColumnNames() + actualColumnNames, err := shared.GetCheckedColumnNames(db) + Expect(err).NotTo(HaveOccurred()) + Expect(actualColumnNames).To(Equal(expectedColumnNames)) + }) + }) + + Describe("CreateNotCheckedSQL", func() { + It("generates a correct SQL string for one column", func() { + columns := []string{"columnA"} + expected := "NOT (columnA)" + actual := shared.CreateNotCheckedSQL(columns) + Expect(actual).To(Equal(expected)) + }) + + It("generates a correct SQL string for several columns", func() { + columns := []string{"columnA", "columnB"} + expected := "NOT (columnA AND columnB)" + actual := shared.CreateNotCheckedSQL(columns) + Expect(actual).To(Equal(expected)) + }) + + It("defaults to FALSE when there are no columns", func() { + expected := "FALSE" + actual := shared.CreateNotCheckedSQL([]string{}) + Expect(actual).To(Equal(expected)) + }) + }) +}) + +func getExpectedColumnNames() []string { + return []string{ + "price_feeds_checked", + "flip_kick_checked", + "frob_checked", + "tend_checked", + "bite_checked", + "dent_checked", + "pit_file_debt_ceiling_checked", + "pit_file_ilk_checked", + "vat_init_checked", + "drip_file_ilk_checked", + "drip_file_repo_checked", + "drip_file_vow_checked", + "deal_checked", + "drip_drip_checked", + "cat_file_chop_lump_checked", + "cat_file_flip_checked", + "cat_file_pit_vow_checked", + "flop_kick_checked", + "vat_move_checked", + "vat_fold_checked", + "vat_heal_checked", + "vat_toll_checked", + "vat_tune_checked", + "vat_grab_checked", + "vat_flux_checked", + "vat_slip_checked", + "vow_flog_checked", + "flap_kick_checked", + } +} \ No newline at end of file diff --git a/pkg/transformers/tend/repository.go b/pkg/transformers/tend/repository.go index 61982e12..17620a00 100644 --- a/pkg/transformers/tend/repository.go +++ b/pkg/transformers/tend/repository.go @@ -16,7 +16,6 @@ package tend import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -68,10 +67,6 @@ func (repository TendRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.TendChecked) } -func (repository TendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.TendChecked) -} - func (repository *TendRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_flux/repository.go b/pkg/transformers/vat_flux/repository.go index d87ffca7..2febe527 100644 --- a/pkg/transformers/vat_flux/repository.go +++ b/pkg/transformers/vat_flux/repository.go @@ -16,7 +16,6 @@ package vat_flux import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -61,10 +60,6 @@ func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.VatFluxChecked) } -func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlock, endingBlock, repository.db, constants.VatFluxChecked) -} - func (repository *VatFluxRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_fold/repository.go b/pkg/transformers/vat_fold/repository.go index bb14f172..5d831bca 100644 --- a/pkg/transformers/vat_fold/repository.go +++ b/pkg/transformers/vat_fold/repository.go @@ -17,7 +17,6 @@ package vat_fold import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -63,10 +62,6 @@ func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatFoldChecked) } -func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatFoldChecked) -} - func (repository *VatFoldRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_grab/repository.go b/pkg/transformers/vat_grab/repository.go index b826b7c1..a8a274a4 100644 --- a/pkg/transformers/vat_grab/repository.go +++ b/pkg/transformers/vat_grab/repository.go @@ -3,7 +3,6 @@ package vat_grab import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -47,10 +46,6 @@ func (repository VatGrabRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatGrabChecked) } -func (repository VatGrabRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatGrabChecked) -} - func (repository *VatGrabRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_heal/repository.go b/pkg/transformers/vat_heal/repository.go index 5050d6d6..aed8fcd0 100644 --- a/pkg/transformers/vat_heal/repository.go +++ b/pkg/transformers/vat_heal/repository.go @@ -17,7 +17,6 @@ package vat_heal import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -61,10 +60,6 @@ func (repository VatHealRepository) Create(headerID int64, models []interface{}) return tx.Commit() } -func (repository VatHealRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlock, endingBlock, repository.db, constants.VatHealChecked) -} - func (repository VatHealRepository) MarkHeaderChecked(headerId int64) error { return shared.MarkHeaderChecked(headerId, repository.db, constants.VatHealChecked) } diff --git a/pkg/transformers/vat_init/repository.go b/pkg/transformers/vat_init/repository.go index cee9e44a..14b2d4c8 100644 --- a/pkg/transformers/vat_init/repository.go +++ b/pkg/transformers/vat_init/repository.go @@ -16,7 +16,6 @@ package vat_init import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -63,10 +62,6 @@ func (repository VatInitRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatInitChecked) } -func (repository VatInitRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatInitChecked) -} - func (repository *VatInitRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_move/repository.go b/pkg/transformers/vat_move/repository.go index f47db4d6..2548c2b4 100644 --- a/pkg/transformers/vat_move/repository.go +++ b/pkg/transformers/vat_move/repository.go @@ -16,7 +16,6 @@ package vat_move import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -60,10 +59,6 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{}) return tx.Commit() } -func (repository VatMoveRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatMoveChecked) -} - func (repository VatMoveRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatMoveChecked) } diff --git a/pkg/transformers/vat_slip/repository.go b/pkg/transformers/vat_slip/repository.go index 1c6b8d06..15f412ca 100644 --- a/pkg/transformers/vat_slip/repository.go +++ b/pkg/transformers/vat_slip/repository.go @@ -2,7 +2,6 @@ package vat_slip import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -48,10 +47,6 @@ func (repository VatSlipRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatSlipChecked) } -func (repository VatSlipRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatSlipChecked) -} - func (repository *VatSlipRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_toll/repository.go b/pkg/transformers/vat_toll/repository.go index 39371193..2c9be8a4 100644 --- a/pkg/transformers/vat_toll/repository.go +++ b/pkg/transformers/vat_toll/repository.go @@ -2,7 +2,6 @@ package vat_toll import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -47,10 +46,6 @@ func (repository VatTollRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTollChecked) } -func (repository VatTollRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatTollChecked) -} - func (repository *VatTollRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vat_tune/repository.go b/pkg/transformers/vat_tune/repository.go index b7298cce..c9951ce6 100644 --- a/pkg/transformers/vat_tune/repository.go +++ b/pkg/transformers/vat_tune/repository.go @@ -2,7 +2,6 @@ package vat_tune import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -47,10 +46,6 @@ func (repository VatTuneRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTuneChecked) } -func (repository VatTuneRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatTuneChecked) -} - func (repository *VatTuneRepository) SetDB(db *postgres.DB) { repository.db = db } diff --git a/pkg/transformers/vow_flog/repository.go b/pkg/transformers/vow_flog/repository.go index 0754fd5f..48076802 100644 --- a/pkg/transformers/vow_flog/repository.go +++ b/pkg/transformers/vow_flog/repository.go @@ -16,7 +16,6 @@ package vow_flog import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" @@ -64,10 +63,6 @@ func (repository VowFlogRepository) MarkHeaderChecked(headerID int64) error { return shared.MarkHeaderChecked(headerID, repository.db, constants.VowFlogChecked) } -func (repository VowFlogRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VowFlogChecked) -} - func (repository *VowFlogRepository) SetDB(db *postgres.DB) { repository.db = db }