Refactor fetching out from repositories to log_fetcher and watcher
This commit is contained in:
parent
c26736dc9e
commit
38c745e8c3
@ -51,9 +51,14 @@ func (watcher *Watcher) AddTransformers(us []shared.TransformerInitializer) {
|
||||
}
|
||||
|
||||
func (watcher *Watcher) Execute() error {
|
||||
// TODO Solve checkedHeadersColumn issue
|
||||
checkedColumnNames, err := shared.GetCheckedColumnNames(&watcher.DB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
notCheckedSQL := shared.CreateNotCheckedSQL(checkedColumnNames)
|
||||
|
||||
// TODO Handle start and end numbers in transformers?
|
||||
missingHeaders, err := shared.MissingHeaders(0, -1, &watcher.DB, "")
|
||||
missingHeaders, err := shared.MissingHeaders(0, -1, &watcher.DB, notCheckedSQL)
|
||||
|
||||
for _, header := range missingHeaders {
|
||||
// TODO Extend FetchLogs for doing several blocks at a time
|
||||
|
@ -16,7 +16,6 @@ package bite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -65,7 +64,3 @@ func (repository BiteRepository) Create(headerID int64, models []interface{}) er
|
||||
func (repository BiteRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.BiteChecked)
|
||||
}
|
||||
|
||||
func (repository BiteRepository) MissingHeaders(startingBlockNumber int64, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.BiteChecked)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package chop_lump
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -62,10 +61,6 @@ func (repository CatFileChopLumpRepository) MarkHeaderChecked(headerID int64) er
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileChopLumpChecked)
|
||||
}
|
||||
|
||||
func (repository CatFileChopLumpRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFileChopLumpChecked)
|
||||
}
|
||||
|
||||
func (repository *CatFileChopLumpRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package flip
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -61,10 +60,6 @@ func (repository CatFileFlipRepository) MarkHeaderChecked(headerID int64) error
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFileFlipChecked)
|
||||
}
|
||||
|
||||
func (repository CatFileFlipRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFileFlipChecked)
|
||||
}
|
||||
|
||||
func (repository *CatFileFlipRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package pit_vow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -61,10 +60,6 @@ func (repository CatFilePitVowRepository) MarkHeaderChecked(headerID int64) erro
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.CatFilePitVowChecked)
|
||||
}
|
||||
|
||||
func (repository CatFilePitVowRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.CatFilePitVowChecked)
|
||||
}
|
||||
|
||||
func (repository *CatFilePitVowRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package deal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -62,10 +61,6 @@ func (repository DealRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.DealChecked)
|
||||
}
|
||||
|
||||
func (repository DealRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DealChecked)
|
||||
}
|
||||
|
||||
func (repository *DealRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package dent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -67,10 +66,6 @@ func (repository DentRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.DentChecked)
|
||||
}
|
||||
|
||||
func (repository DentRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DentChecked)
|
||||
}
|
||||
|
||||
func (repository *DentRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package drip_drip
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -61,10 +60,6 @@ func (repository DripDripRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripDripChecked)
|
||||
}
|
||||
|
||||
func (repository DripDripRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripDripChecked)
|
||||
}
|
||||
|
||||
func (repository *DripDripRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package ilk
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -64,10 +63,6 @@ func (repository DripFileIlkRepository) MarkHeaderChecked(headerID int64) error
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileIlkChecked)
|
||||
}
|
||||
|
||||
func (repository DripFileIlkRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileIlkChecked)
|
||||
}
|
||||
|
||||
func (repository *DripFileIlkRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package repo
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -64,10 +63,6 @@ func (repository DripFileRepoRepository) MarkHeaderChecked(headerID int64) error
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileRepoChecked)
|
||||
}
|
||||
|
||||
func (repository DripFileRepoRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileRepoChecked)
|
||||
}
|
||||
|
||||
func (repository *DripFileRepoRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package vow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -63,10 +62,6 @@ func (repository DripFileVowRepository) MarkHeaderChecked(headerID int64) error
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.DripFileVowChecked)
|
||||
}
|
||||
|
||||
func (repository DripFileVowRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.DripFileVowChecked)
|
||||
}
|
||||
|
||||
func (repository *DripFileVowRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -15,13 +15,11 @@
|
||||
package factories
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
Create(headerID int64, models []interface{}) error
|
||||
MarkHeaderChecked(headerID int64) error
|
||||
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
|
||||
SetDB(db *postgres.DB)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package flap_kick
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -60,10 +59,6 @@ func (repository *FlapKickRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.FlapKickChecked)
|
||||
}
|
||||
|
||||
func (repository FlapKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlapKickChecked)
|
||||
}
|
||||
|
||||
func (repository *FlapKickRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package flip_kick
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -60,10 +59,6 @@ func (repository FlipKickRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.FlipKickChecked)
|
||||
}
|
||||
|
||||
func (repository FlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlipKickChecked)
|
||||
}
|
||||
|
||||
func (repository *FlipKickRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package flop_kick
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -62,10 +61,6 @@ func (repository FlopKickRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.FlopKickChecked)
|
||||
}
|
||||
|
||||
func (repository FlopKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FlopKickChecked)
|
||||
}
|
||||
|
||||
func (repository *FlopKickRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package frob
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -59,10 +58,6 @@ func (repository FrobRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.FrobChecked)
|
||||
}
|
||||
|
||||
func (repository FrobRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.FrobChecked)
|
||||
}
|
||||
|
||||
func (repository *FrobRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package debt_ceiling
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -64,10 +63,6 @@ func (repository PitFileDebtCeilingRepository) MarkHeaderChecked(headerID int64)
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileDebtCeilingChecked)
|
||||
}
|
||||
|
||||
func (repository PitFileDebtCeilingRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PitFileDebtCeilingChecked)
|
||||
}
|
||||
|
||||
func (repository *PitFileDebtCeilingRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package ilk
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -62,10 +61,6 @@ func (repository PitFileIlkRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.PitFileIlkChecked)
|
||||
}
|
||||
|
||||
func (repository PitFileIlkRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PitFileIlkChecked)
|
||||
}
|
||||
|
||||
func (repository *PitFileIlkRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package price_feeds
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -58,10 +57,6 @@ func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.PriceFeedsChecked)
|
||||
}
|
||||
|
||||
func (repository PriceFeedRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.PriceFeedsChecked)
|
||||
}
|
||||
|
||||
func (repository *PriceFeedRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -32,8 +32,12 @@ var _ = Describe("Fetcher", func() {
|
||||
fetcher := shared.NewFetcher(blockChain)
|
||||
header := fakes.FakeHeader
|
||||
|
||||
addresses := []string{"0xfakeAddress", "0xanotherFakeAddress"}
|
||||
topicZeros := [][]common.Hash{{common.BytesToHash([]byte{1, 2, 3, 4, 5})}}
|
||||
addresses := []common.Address{
|
||||
common.HexToAddress("0xfakeAddress"),
|
||||
common.HexToAddress("0xanotherFakeAddress"),
|
||||
}
|
||||
|
||||
topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})}
|
||||
|
||||
_, err := fetcher.FetchLogs(addresses, topicZeros, header)
|
||||
|
||||
@ -45,7 +49,7 @@ var _ = Describe("Fetcher", func() {
|
||||
expectedQuery := ethereum.FilterQuery{
|
||||
BlockHash: &blockHash,
|
||||
Addresses: []common.Address{address1, address2},
|
||||
Topics: topicZeros,
|
||||
Topics: [][]common.Hash{topicZeros},
|
||||
}
|
||||
blockChain.AssertGetEthLogsWithCustomQueryCalledWith(expectedQuery)
|
||||
})
|
||||
@ -55,7 +59,7 @@ var _ = Describe("Fetcher", func() {
|
||||
blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError)
|
||||
fetcher := shared.NewFetcher(blockChain)
|
||||
|
||||
_, err := fetcher.FetchLogs([]string{}, [][]common.Hash{}, core.Header{})
|
||||
_, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{})
|
||||
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err).To(MatchError(fakes.FakeError))
|
||||
|
@ -2,8 +2,11 @@ package shared
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
|
||||
@ -22,7 +25,9 @@ func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, checkedHeadersCo
|
||||
return err
|
||||
}
|
||||
|
||||
func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, checkedHeadersColumn string) ([]core.Header, error) {
|
||||
// Treats a header as missing if it's not in the headers table, or not checked for some log type
|
||||
// TODO Revisit definition of "checked header
|
||||
func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error) {
|
||||
var result []core.Header
|
||||
var query string
|
||||
var err error
|
||||
@ -30,14 +35,14 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D
|
||||
if endingBlockNumber == -1 {
|
||||
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
|
||||
LEFT JOIN checked_headers on headers.id = header_id
|
||||
WHERE (header_id ISNULL OR ` + checkedHeadersColumn + ` IS FALSE)
|
||||
WHERE (header_id ISNULL OR ` + notCheckedSQL + `)
|
||||
AND headers.block_number >= $1
|
||||
AND headers.eth_node_fingerprint = $2`
|
||||
err = db.Select(&result, query, startingBlockNumber, db.Node.ID)
|
||||
} else {
|
||||
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
|
||||
LEFT JOIN checked_headers on headers.id = header_id
|
||||
WHERE (header_id ISNULL OR ` + checkedHeadersColumn + ` IS FALSE)
|
||||
WHERE (header_id ISNULL OR ` + notCheckedSQL + `)
|
||||
AND headers.block_number >= $1
|
||||
AND headers.block_number <= $2
|
||||
AND headers.eth_node_fingerprint = $3`
|
||||
@ -46,3 +51,53 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
||||
// Query returns `[]driver.Value`, nullable polymorphic interface
|
||||
var queryResult []driver.Value
|
||||
columnNamesQuery :=
|
||||
`SELECT column_name FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'checked_headers'
|
||||
AND column_name != 'id'
|
||||
AND column_name != 'header_id';`
|
||||
|
||||
err := db.Select(&queryResult, columnNamesQuery)
|
||||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
|
||||
// Transform column names from `driver.Value` to strings
|
||||
var columnNames []string
|
||||
for _, result := range queryResult {
|
||||
if columnName, ok := result.(string); ok {
|
||||
columnNames = append(columnNames, columnName)
|
||||
} else {
|
||||
return []string{}, fmt.Errorf("incorrect value for checked_headers column name")
|
||||
}
|
||||
}
|
||||
|
||||
return columnNames, nil
|
||||
}
|
||||
|
||||
// Builds a SQL string that checks if any column value is FALSE, given the column names.
|
||||
// Defaults to FALSE when no columns are provided.
|
||||
// Ex: ["columnA", "columnB"] => "NOT (columnA AND columnB)"
|
||||
// [] => "FALSE"
|
||||
func CreateNotCheckedSQL(boolColumns []string) string {
|
||||
var result strings.Builder
|
||||
|
||||
if len(boolColumns) == 0 {
|
||||
return "FALSE"
|
||||
}
|
||||
|
||||
result.WriteString("NOT (")
|
||||
for _, column := range boolColumns[:len(boolColumns)-1] {
|
||||
result.WriteString(fmt.Sprintf("%v AND ", column))
|
||||
}
|
||||
|
||||
// No trailing "OR" for last column name
|
||||
result.WriteString(fmt.Sprintf("%v)", boolColumns[len(boolColumns)-1]))
|
||||
|
||||
return result.String()
|
||||
}
|
||||
|
90
pkg/transformers/shared/repository_utility_test.go
Normal file
90
pkg/transformers/shared/repository_utility_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared_test
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/test_config"
|
||||
)
|
||||
|
||||
var _ = Describe("Repository utilities", func() {
|
||||
Describe("GetCheckedColumnNames", func() {
|
||||
It("gets the column names from checked_headers", func() {
|
||||
db := test_config.NewTestDB(test_config.NewTestNode())
|
||||
test_config.CleanTestDB(db)
|
||||
expectedColumnNames := getExpectedColumnNames()
|
||||
actualColumnNames, err := shared.GetCheckedColumnNames(db)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(actualColumnNames).To(Equal(expectedColumnNames))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("CreateNotCheckedSQL", func() {
|
||||
It("generates a correct SQL string for one column", func() {
|
||||
columns := []string{"columnA"}
|
||||
expected := "NOT (columnA)"
|
||||
actual := shared.CreateNotCheckedSQL(columns)
|
||||
Expect(actual).To(Equal(expected))
|
||||
})
|
||||
|
||||
It("generates a correct SQL string for several columns", func() {
|
||||
columns := []string{"columnA", "columnB"}
|
||||
expected := "NOT (columnA AND columnB)"
|
||||
actual := shared.CreateNotCheckedSQL(columns)
|
||||
Expect(actual).To(Equal(expected))
|
||||
})
|
||||
|
||||
It("defaults to FALSE when there are no columns", func() {
|
||||
expected := "FALSE"
|
||||
actual := shared.CreateNotCheckedSQL([]string{})
|
||||
Expect(actual).To(Equal(expected))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func getExpectedColumnNames() []string {
|
||||
return []string{
|
||||
"price_feeds_checked",
|
||||
"flip_kick_checked",
|
||||
"frob_checked",
|
||||
"tend_checked",
|
||||
"bite_checked",
|
||||
"dent_checked",
|
||||
"pit_file_debt_ceiling_checked",
|
||||
"pit_file_ilk_checked",
|
||||
"vat_init_checked",
|
||||
"drip_file_ilk_checked",
|
||||
"drip_file_repo_checked",
|
||||
"drip_file_vow_checked",
|
||||
"deal_checked",
|
||||
"drip_drip_checked",
|
||||
"cat_file_chop_lump_checked",
|
||||
"cat_file_flip_checked",
|
||||
"cat_file_pit_vow_checked",
|
||||
"flop_kick_checked",
|
||||
"vat_move_checked",
|
||||
"vat_fold_checked",
|
||||
"vat_heal_checked",
|
||||
"vat_toll_checked",
|
||||
"vat_tune_checked",
|
||||
"vat_grab_checked",
|
||||
"vat_flux_checked",
|
||||
"vat_slip_checked",
|
||||
"vow_flog_checked",
|
||||
"flap_kick_checked",
|
||||
}
|
||||
}
|
@ -16,7 +16,6 @@ package tend
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -68,10 +67,6 @@ func (repository TendRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.TendChecked)
|
||||
}
|
||||
|
||||
func (repository TendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.TendChecked)
|
||||
}
|
||||
|
||||
func (repository *TendRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package vat_flux
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -61,10 +60,6 @@ func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.VatFluxChecked)
|
||||
}
|
||||
|
||||
func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlock, endingBlock, repository.db, constants.VatFluxChecked)
|
||||
}
|
||||
|
||||
func (repository *VatFluxRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package vat_fold
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -63,10 +62,6 @@ func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatFoldChecked)
|
||||
}
|
||||
|
||||
func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatFoldChecked)
|
||||
}
|
||||
|
||||
func (repository *VatFoldRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package vat_grab
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -47,10 +46,6 @@ func (repository VatGrabRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatGrabChecked)
|
||||
}
|
||||
|
||||
func (repository VatGrabRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatGrabChecked)
|
||||
}
|
||||
|
||||
func (repository *VatGrabRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package vat_heal
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -61,10 +60,6 @@ func (repository VatHealRepository) Create(headerID int64, models []interface{})
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (repository VatHealRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlock, endingBlock, repository.db, constants.VatHealChecked)
|
||||
}
|
||||
|
||||
func (repository VatHealRepository) MarkHeaderChecked(headerId int64) error {
|
||||
return shared.MarkHeaderChecked(headerId, repository.db, constants.VatHealChecked)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package vat_init
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -63,10 +62,6 @@ func (repository VatInitRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatInitChecked)
|
||||
}
|
||||
|
||||
func (repository VatInitRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatInitChecked)
|
||||
}
|
||||
|
||||
func (repository *VatInitRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package vat_move
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -60,10 +59,6 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{})
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (repository VatMoveRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatMoveChecked)
|
||||
}
|
||||
|
||||
func (repository VatMoveRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatMoveChecked)
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package vat_slip
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -48,10 +47,6 @@ func (repository VatSlipRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatSlipChecked)
|
||||
}
|
||||
|
||||
func (repository VatSlipRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatSlipChecked)
|
||||
}
|
||||
|
||||
func (repository *VatSlipRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package vat_toll
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -47,10 +46,6 @@ func (repository VatTollRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTollChecked)
|
||||
}
|
||||
|
||||
func (repository VatTollRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatTollChecked)
|
||||
}
|
||||
|
||||
func (repository *VatTollRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package vat_tune
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -47,10 +46,6 @@ func (repository VatTuneRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTuneChecked)
|
||||
}
|
||||
|
||||
func (repository VatTuneRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatTuneChecked)
|
||||
}
|
||||
|
||||
func (repository *VatTuneRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package vow_flog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||
@ -64,10 +63,6 @@ func (repository VowFlogRepository) MarkHeaderChecked(headerID int64) error {
|
||||
return shared.MarkHeaderChecked(headerID, repository.db, constants.VowFlogChecked)
|
||||
}
|
||||
|
||||
func (repository VowFlogRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VowFlogChecked)
|
||||
}
|
||||
|
||||
func (repository *VowFlogRepository) SetDB(db *postgres.DB) {
|
||||
repository.db = db
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user