Make check_count a column on public.headers

- Don't need to maintain it on public.checked_headers if we're not
  adding additional columns to that table
This commit is contained in:
Rob Mulholand 2019-08-27 16:30:50 -05:00
parent 222252f89a
commit 5ac76eee74
9 changed files with 68 additions and 70 deletions

View File

@ -1,12 +1,14 @@
-- +goose Up
CREATE TABLE public.headers (
id SERIAL PRIMARY KEY,
hash VARCHAR(66),
block_number BIGINT,
raw JSONB,
block_timestamp NUMERIC,
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
eth_node_fingerprint VARCHAR(128)
CREATE TABLE public.headers
(
id SERIAL PRIMARY KEY,
hash VARCHAR(66),
block_number BIGINT,
raw JSONB,
block_timestamp NUMERIC,
check_count INTEGER NOT NULL DEFAULT 0,
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
eth_node_fingerprint VARCHAR(128)
);
-- Index is removed when table is

View File

@ -1,9 +1,7 @@
-- +goose Up
CREATE TABLE public.checked_headers
(
id SERIAL PRIMARY KEY,
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
check_count INTEGER NOT NULL DEFAULT 1
CREATE TABLE public.checked_headers (
id SERIAL PRIMARY KEY,
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE
);
-- +goose Down

View File

@ -9,4 +9,4 @@ CREATE TABLE public.checked_logs
-- +goose Down
-- SQL in this section is executed when the migration is rolled back.
DROP TABLE public.checked_logs;
DROP TABLE public.checked_logs;

View File

@ -131,8 +131,7 @@ ALTER SEQUENCE public.blocks_id_seq OWNED BY public.blocks.id;
CREATE TABLE public.checked_headers (
id integer NOT NULL,
header_id integer NOT NULL,
check_count integer DEFAULT 1 NOT NULL
header_id integer NOT NULL
);
@ -456,6 +455,7 @@ CREATE TABLE public.headers (
block_number bigint,
raw jsonb,
block_timestamp numeric,
check_count integer DEFAULT 0 NOT NULL,
eth_node_id integer NOT NULL,
eth_node_fingerprint character varying(128)
);

View File

@ -77,17 +77,17 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx
return ErrNoWatchedAddresses, noMissingHeadersFound
}
missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
if missingHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", missingHeadersErr)
return missingHeadersErr, noMissingHeadersFound
uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
if uncheckedHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr)
return uncheckedHeadersErr, noMissingHeadersFound
}
if len(missingHeaders) < 1 {
if len(uncheckedHeaders) < 1 {
return nil, noMissingHeadersFound
}
for _, header := range missingHeaders {
for _, header := range uncheckedHeaders {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header)
@ -143,13 +143,13 @@ func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTran
return hasBeenCheckedErr
}
if !hasBeenChecked {
err := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber)
if err != nil {
return err
uncheckHeadersErr := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber)
if uncheckHeadersErr != nil {
return uncheckHeadersErr
}
nextErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic)
if nextErr != nil {
return nextErr
markLogsCheckedErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic)
if markLogsCheckedErr != nil {
return markLogsCheckedErr
}
}
return nil

View File

@ -22,9 +22,7 @@ import (
)
const (
insertCheckedHeaderQuery = `INSERT INTO public.checked_headers (header_id) VALUES ($1)
ON CONFLICT (header_id) DO UPDATE
SET check_count = (SELECT check_count FROM public.checked_headers WHERE header_id = $1) + 1`
insertCheckedHeaderQuery = `UPDATE public.headers SET check_count = (SELECT check_count WHERE id = $1) + 1 WHERE id = $1`
)
type CheckedHeadersRepository struct {
@ -43,30 +41,30 @@ func (repo CheckedHeadersRepository) MarkHeaderChecked(headerID int64) error {
// Remove checked_headers rows with block number >= starting block number
func (repo CheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error {
_, err := repo.db.Exec(`DELETE FROM public.checked_headers WHERE header_id IN (SELECT id FROM public.headers WHERE block_number >= $1)`, startingBlockNumber)
_, err := repo.db.Exec(`UPDATE public.headers SET check_count = 0 WHERE block_number >= $1`, startingBlockNumber)
return err
}
// Return header_id if not present in checked_headers or its check_count is < passed checkCount
func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
func (repo CheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
var result []core.Header
var query string
var err error
if endingBlockNumber == -1 {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR check_count < $2)
AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $3`
query = `SELECT id, block_number, hash
FROM headers
WHERE check_count < $2
AND block_number >= $1
AND eth_node_fingerprint = $3`
err = repo.db.Select(&result, query, startingBlockNumber, checkCount, repo.db.Node.ID)
} else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR check_count < $3)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $4`
query = `SELECT id, block_number, hash
FROM headers
WHERE check_count < $3
AND block_number >= $1
AND block_number <= $2
AND eth_node_fingerprint = $4`
err = repo.db.Select(&result, query, startingBlockNumber, endingBlockNumber, checkCount, repo.db.Node.ID)
}

View File

@ -50,7 +50,7 @@ var _ = Describe("Checked Headers repository", func() {
Expect(err).NotTo(HaveOccurred())
var checkedCount int
fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.checked_headers WHERE header_id = $1`, headerID)
fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerID)
Expect(fetchErr).NotTo(HaveOccurred())
Expect(checkedCount).To(Equal(1))
})
@ -67,7 +67,7 @@ var _ = Describe("Checked Headers repository", func() {
Expect(updateErr).NotTo(HaveOccurred())
var checkedCount int
fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.checked_headers WHERE header_id = $1`, headerID)
fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerID)
Expect(fetchErr).NotTo(HaveOccurred())
Expect(checkedCount).To(Equal(2))
})
@ -101,20 +101,20 @@ var _ = Describe("Checked Headers repository", func() {
err := repo.MarkHeadersUnchecked(blockNumberTwo)
Expect(err).NotTo(HaveOccurred())
var headerOneChecked, headerTwoChecked, headerThreeChecked bool
getHeaderOneErr := db.Get(&headerOneChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdOne)
var headerOneCheckCount, headerTwoCheckCount, headerThreeCheckCount int
getHeaderOneErr := db.Get(&headerOneCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdOne)
Expect(getHeaderOneErr).NotTo(HaveOccurred())
Expect(headerOneChecked).To(BeTrue())
getHeaderTwoErr := db.Get(&headerTwoChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdTwo)
Expect(headerOneCheckCount).To(Equal(1))
getHeaderTwoErr := db.Get(&headerTwoCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdTwo)
Expect(getHeaderTwoErr).NotTo(HaveOccurred())
Expect(headerTwoChecked).To(BeFalse())
getHeaderThreeErr := db.Get(&headerThreeChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdThree)
Expect(headerTwoCheckCount).To(BeZero())
getHeaderThreeErr := db.Get(&headerThreeCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdThree)
Expect(getHeaderThreeErr).NotTo(HaveOccurred())
Expect(headerThreeChecked).To(BeFalse())
Expect(headerThreeCheckCount).To(BeZero())
})
})
Describe("MissingHeaders", func() {
Describe("UncheckedHeaders", func() {
var (
headerRepository datastore.HeaderRepository
startingBlockNumber int64
@ -151,7 +151,7 @@ var _ = Describe("Checked Headers repository", func() {
Describe("when ending block is specified", func() {
It("excludes headers that are out of range", func() {
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
// doesn't include outOfRangeBlockNumber
@ -162,10 +162,10 @@ var _ = Describe("Checked Headers repository", func() {
})
It("excludes headers that have been checked more than the check count", func() {
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
_, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1])
Expect(err).NotTo(HaveOccurred())
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
// doesn't include middleBlockNumber
@ -175,10 +175,10 @@ var _ = Describe("Checked Headers repository", func() {
})
It("does not exclude headers that have been checked less than the check count", func() {
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
_, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1])
Expect(err).NotTo(HaveOccurred())
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, recheckCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, recheckCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(headers)).To(Equal(3))
@ -197,14 +197,14 @@ var _ = Describe("Checked Headers repository", func() {
}
Expect(err).NotTo(HaveOccurred())
nodeOneMissingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
nodeOneMissingHeaders, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeOneMissingHeaders)).To(Equal(3))
Expect(nodeOneMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber)))
Expect(nodeOneMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber)))
Expect(nodeOneMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber)))
nodeTwoMissingHeaders, err := repoTwo.MissingHeaders(startingBlockNumber, endingBlockNumber+10, uncheckedCheckCount)
nodeTwoMissingHeaders, err := repoTwo.UncheckedHeaders(startingBlockNumber, endingBlockNumber+10, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeTwoMissingHeaders)).To(Equal(3))
Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10)))
@ -217,7 +217,7 @@ var _ = Describe("Checked Headers repository", func() {
var endingBlock = int64(-1)
It("includes all non-checked headers when ending block is -1 ", func() {
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(headers)).To(Equal(4))
@ -228,10 +228,10 @@ var _ = Describe("Checked Headers repository", func() {
})
It("excludes headers that have been checked more than the check count", func() {
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
_, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1])
Expect(err).NotTo(HaveOccurred())
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
// doesn't include middleBlockNumber
@ -242,10 +242,10 @@ var _ = Describe("Checked Headers repository", func() {
})
It("does not exclude headers that have been checked less than the check count", func() {
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
_, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1])
Expect(err).NotTo(HaveOccurred())
headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, recheckCheckCount)
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, recheckCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(headers)).To(Equal(4))
@ -265,7 +265,7 @@ var _ = Describe("Checked Headers repository", func() {
}
Expect(err).NotTo(HaveOccurred())
nodeOneMissingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
nodeOneMissingHeaders, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeOneMissingHeaders)).To(Equal(4))
Expect(nodeOneMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber)))
@ -273,7 +273,7 @@ var _ = Describe("Checked Headers repository", func() {
Expect(nodeOneMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber)))
Expect(nodeOneMissingHeaders[3].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber)))
nodeTwoMissingHeaders, err := repoTwo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
nodeTwoMissingHeaders, err := repoTwo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeTwoMissingHeaders)).To(Equal(4))
Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10)))

View File

@ -37,7 +37,7 @@ type BlockRepository interface {
type CheckedHeadersRepository interface {
MarkHeaderChecked(headerID int64) error
MarkHeadersUnchecked(startingBlockNumber int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error)
UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error)
}
type CheckedLogsRepository interface {

View File

@ -44,7 +44,7 @@ func (repository *MockCheckedHeadersRepository) MarkHeaderChecked(headerID int64
return repository.MarkHeaderCheckedReturnError
}
func (repository *MockCheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
func (repository *MockCheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
repository.MissingHeadersStartingBlockNumber = startingBlockNumber
repository.MissingHeadersEndingBlockNumber = endingBlockNumber
repository.MissingHeadersCheckCount = checkCount