From 5ac76eee74d0483ef06b8d5c079ebca44ee25f87 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Tue, 27 Aug 2019 16:30:50 -0500 Subject: [PATCH] Make check_count a column on public.headers - Don't need to maintain it on public.checked_headers if we're not adding additional columns to that table --- db/migrations/00023_create_headers_table.sql | 18 +++---- .../00024_create_checked_headers_table.sql | 8 ++-- ...ql => 00030_create_checked_logs_table.sql} | 2 +- db/schema.sql | 4 +- libraries/shared/logs/extractor.go | 24 +++++----- .../checked_headers_repository.go | 30 ++++++------ .../checked_headers_repository_test.go | 48 +++++++++---------- pkg/datastore/repository.go | 2 +- pkg/fakes/mock_checked_headers_repository.go | 2 +- 9 files changed, 68 insertions(+), 70 deletions(-) rename db/migrations/{00029_create_checked_logs_table.sql => 00030_create_checked_logs_table.sql} (90%) diff --git a/db/migrations/00023_create_headers_table.sql b/db/migrations/00023_create_headers_table.sql index 925c202b..da539de8 100644 --- a/db/migrations/00023_create_headers_table.sql +++ b/db/migrations/00023_create_headers_table.sql @@ -1,12 +1,14 @@ -- +goose Up -CREATE TABLE public.headers ( - id SERIAL PRIMARY KEY, - hash VARCHAR(66), - block_number BIGINT, - raw JSONB, - block_timestamp NUMERIC, - eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE, - eth_node_fingerprint VARCHAR(128) +CREATE TABLE public.headers +( + id SERIAL PRIMARY KEY, + hash VARCHAR(66), + block_number BIGINT, + raw JSONB, + block_timestamp NUMERIC, + check_count INTEGER NOT NULL DEFAULT 0, + eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE, + eth_node_fingerprint VARCHAR(128) ); -- Index is removed when table is diff --git a/db/migrations/00024_create_checked_headers_table.sql b/db/migrations/00024_create_checked_headers_table.sql index 95cedf22..acf0fbdb 100644 --- a/db/migrations/00024_create_checked_headers_table.sql +++ b/db/migrations/00024_create_checked_headers_table.sql @@ -1,9 +1,7 @@ -- +goose Up -CREATE TABLE public.checked_headers -( - id SERIAL PRIMARY KEY, - header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE, - check_count INTEGER NOT NULL DEFAULT 1 +CREATE TABLE public.checked_headers ( + id SERIAL PRIMARY KEY, + header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE ); -- +goose Down diff --git a/db/migrations/00029_create_checked_logs_table.sql b/db/migrations/00030_create_checked_logs_table.sql similarity index 90% rename from db/migrations/00029_create_checked_logs_table.sql rename to db/migrations/00030_create_checked_logs_table.sql index 91445cd9..1a77560a 100644 --- a/db/migrations/00029_create_checked_logs_table.sql +++ b/db/migrations/00030_create_checked_logs_table.sql @@ -9,4 +9,4 @@ CREATE TABLE public.checked_logs -- +goose Down -- SQL in this section is executed when the migration is rolled back. -DROP TABLE public.checked_logs; \ No newline at end of file +DROP TABLE public.checked_logs; diff --git a/db/schema.sql b/db/schema.sql index 16791a83..f78a8d1c 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -131,8 +131,7 @@ ALTER SEQUENCE public.blocks_id_seq OWNED BY public.blocks.id; CREATE TABLE public.checked_headers ( id integer NOT NULL, - header_id integer NOT NULL, - check_count integer DEFAULT 1 NOT NULL + header_id integer NOT NULL ); @@ -456,6 +455,7 @@ CREATE TABLE public.headers ( block_number bigint, raw jsonb, block_timestamp numeric, + check_count integer DEFAULT 0 NOT NULL, eth_node_id integer NOT NULL, eth_node_fingerprint character varying(128) ); diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index aed39d02..3e4df88a 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -77,17 +77,17 @@ func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerEx return ErrNoWatchedAddresses, noMissingHeadersFound } - missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) - if missingHeadersErr != nil { - logrus.Errorf("error fetching missing headers: %s", missingHeadersErr) - return missingHeadersErr, noMissingHeadersFound + uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) + if uncheckedHeadersErr != nil { + logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr) + return uncheckedHeadersErr, noMissingHeadersFound } - if len(missingHeaders) < 1 { + if len(uncheckedHeaders) < 1 { return nil, noMissingHeadersFound } - for _, header := range missingHeaders { + for _, header := range uncheckedHeaders { logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) if fetchLogsErr != nil { logError("error fetching logs for header: %s", fetchLogsErr, header) @@ -143,13 +143,13 @@ func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTran return hasBeenCheckedErr } if !hasBeenChecked { - err := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber) - if err != nil { - return err + uncheckHeadersErr := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber) + if uncheckHeadersErr != nil { + return uncheckHeadersErr } - nextErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic) - if nextErr != nil { - return nextErr + markLogsCheckedErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic) + if markLogsCheckedErr != nil { + return markLogsCheckedErr } } return nil diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository.go b/pkg/datastore/postgres/repositories/checked_headers_repository.go index 1e33e425..8d870461 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository.go @@ -22,9 +22,7 @@ import ( ) const ( - insertCheckedHeaderQuery = `INSERT INTO public.checked_headers (header_id) VALUES ($1) - ON CONFLICT (header_id) DO UPDATE - SET check_count = (SELECT check_count FROM public.checked_headers WHERE header_id = $1) + 1` + insertCheckedHeaderQuery = `UPDATE public.headers SET check_count = (SELECT check_count WHERE id = $1) + 1 WHERE id = $1` ) type CheckedHeadersRepository struct { @@ -43,30 +41,30 @@ func (repo CheckedHeadersRepository) MarkHeaderChecked(headerID int64) error { // Remove checked_headers rows with block number >= starting block number func (repo CheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error { - _, err := repo.db.Exec(`DELETE FROM public.checked_headers WHERE header_id IN (SELECT id FROM public.headers WHERE block_number >= $1)`, startingBlockNumber) + _, err := repo.db.Exec(`UPDATE public.headers SET check_count = 0 WHERE block_number >= $1`, startingBlockNumber) return err } // Return header_id if not present in checked_headers or its check_count is < passed checkCount -func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { +func (repo CheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { var result []core.Header var query string var err error if endingBlockNumber == -1 { - query = `SELECT headers.id, headers.block_number, headers.hash FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR check_count < $2) - AND headers.block_number >= $1 - AND headers.eth_node_fingerprint = $3` + query = `SELECT id, block_number, hash + FROM headers + WHERE check_count < $2 + AND block_number >= $1 + AND eth_node_fingerprint = $3` err = repo.db.Select(&result, query, startingBlockNumber, checkCount, repo.db.Node.ID) } else { - query = `SELECT headers.id, headers.block_number, headers.hash FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR check_count < $3) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $4` + query = `SELECT id, block_number, hash + FROM headers + WHERE check_count < $3 + AND block_number >= $1 + AND block_number <= $2 + AND eth_node_fingerprint = $4` err = repo.db.Select(&result, query, startingBlockNumber, endingBlockNumber, checkCount, repo.db.Node.ID) } diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go index 5ee5780b..b2c9948a 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go @@ -50,7 +50,7 @@ var _ = Describe("Checked Headers repository", func() { Expect(err).NotTo(HaveOccurred()) var checkedCount int - fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.checked_headers WHERE header_id = $1`, headerID) + fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerID) Expect(fetchErr).NotTo(HaveOccurred()) Expect(checkedCount).To(Equal(1)) }) @@ -67,7 +67,7 @@ var _ = Describe("Checked Headers repository", func() { Expect(updateErr).NotTo(HaveOccurred()) var checkedCount int - fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.checked_headers WHERE header_id = $1`, headerID) + fetchErr := db.Get(&checkedCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerID) Expect(fetchErr).NotTo(HaveOccurred()) Expect(checkedCount).To(Equal(2)) }) @@ -101,20 +101,20 @@ var _ = Describe("Checked Headers repository", func() { err := repo.MarkHeadersUnchecked(blockNumberTwo) Expect(err).NotTo(HaveOccurred()) - var headerOneChecked, headerTwoChecked, headerThreeChecked bool - getHeaderOneErr := db.Get(&headerOneChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdOne) + var headerOneCheckCount, headerTwoCheckCount, headerThreeCheckCount int + getHeaderOneErr := db.Get(&headerOneCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdOne) Expect(getHeaderOneErr).NotTo(HaveOccurred()) - Expect(headerOneChecked).To(BeTrue()) - getHeaderTwoErr := db.Get(&headerTwoChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdTwo) + Expect(headerOneCheckCount).To(Equal(1)) + getHeaderTwoErr := db.Get(&headerTwoCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdTwo) Expect(getHeaderTwoErr).NotTo(HaveOccurred()) - Expect(headerTwoChecked).To(BeFalse()) - getHeaderThreeErr := db.Get(&headerThreeChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdThree) + Expect(headerTwoCheckCount).To(BeZero()) + getHeaderThreeErr := db.Get(&headerThreeCheckCount, `SELECT check_count FROM public.headers WHERE id = $1`, headerIdThree) Expect(getHeaderThreeErr).NotTo(HaveOccurred()) - Expect(headerThreeChecked).To(BeFalse()) + Expect(headerThreeCheckCount).To(BeZero()) }) }) - Describe("MissingHeaders", func() { + Describe("UncheckedHeaders", func() { var ( headerRepository datastore.HeaderRepository startingBlockNumber int64 @@ -151,7 +151,7 @@ var _ = Describe("Checked Headers repository", func() { Describe("when ending block is specified", func() { It("excludes headers that are out of range", func() { - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) // doesn't include outOfRangeBlockNumber @@ -162,10 +162,10 @@ var _ = Describe("Checked Headers repository", func() { }) It("excludes headers that have been checked more than the check count", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + _, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1]) Expect(err).NotTo(HaveOccurred()) - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) // doesn't include middleBlockNumber @@ -175,10 +175,10 @@ var _ = Describe("Checked Headers repository", func() { }) It("does not exclude headers that have been checked less than the check count", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + _, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1]) Expect(err).NotTo(HaveOccurred()) - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, recheckCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, recheckCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(headers)).To(Equal(3)) @@ -197,14 +197,14 @@ var _ = Describe("Checked Headers repository", func() { } Expect(err).NotTo(HaveOccurred()) - nodeOneMissingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) + nodeOneMissingHeaders, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(nodeOneMissingHeaders)).To(Equal(3)) Expect(nodeOneMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber))) Expect(nodeOneMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber))) Expect(nodeOneMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber))) - nodeTwoMissingHeaders, err := repoTwo.MissingHeaders(startingBlockNumber, endingBlockNumber+10, uncheckedCheckCount) + nodeTwoMissingHeaders, err := repoTwo.UncheckedHeaders(startingBlockNumber, endingBlockNumber+10, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(nodeTwoMissingHeaders)).To(Equal(3)) Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10))) @@ -217,7 +217,7 @@ var _ = Describe("Checked Headers repository", func() { var endingBlock = int64(-1) It("includes all non-checked headers when ending block is -1 ", func() { - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(headers)).To(Equal(4)) @@ -228,10 +228,10 @@ var _ = Describe("Checked Headers repository", func() { }) It("excludes headers that have been checked more than the check count", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + _, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1]) Expect(err).NotTo(HaveOccurred()) - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) // doesn't include middleBlockNumber @@ -242,10 +242,10 @@ var _ = Describe("Checked Headers repository", func() { }) It("does not exclude headers that have been checked less than the check count", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + _, err = db.Exec(`UPDATE public.headers SET check_count = 1 WHERE id = $1`, headerIDs[1]) Expect(err).NotTo(HaveOccurred()) - headers, err := repo.MissingHeaders(startingBlockNumber, endingBlock, recheckCheckCount) + headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, recheckCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(headers)).To(Equal(4)) @@ -265,7 +265,7 @@ var _ = Describe("Checked Headers repository", func() { } Expect(err).NotTo(HaveOccurred()) - nodeOneMissingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) + nodeOneMissingHeaders, err := repo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(nodeOneMissingHeaders)).To(Equal(4)) Expect(nodeOneMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber))) @@ -273,7 +273,7 @@ var _ = Describe("Checked Headers repository", func() { Expect(nodeOneMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber))) Expect(nodeOneMissingHeaders[3].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(middleBlockNumber), Equal(endingBlockNumber), Equal(outOfRangeBlockNumber))) - nodeTwoMissingHeaders, err := repoTwo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) + nodeTwoMissingHeaders, err := repoTwo.UncheckedHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) Expect(err).NotTo(HaveOccurred()) Expect(len(nodeTwoMissingHeaders)).To(Equal(4)) Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10))) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index f762bb12..ff0f02dd 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -37,7 +37,7 @@ type BlockRepository interface { type CheckedHeadersRepository interface { MarkHeaderChecked(headerID int64) error MarkHeadersUnchecked(startingBlockNumber int64) error - MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) + UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) } type CheckedLogsRepository interface { diff --git a/pkg/fakes/mock_checked_headers_repository.go b/pkg/fakes/mock_checked_headers_repository.go index 687bf160..9781829c 100644 --- a/pkg/fakes/mock_checked_headers_repository.go +++ b/pkg/fakes/mock_checked_headers_repository.go @@ -44,7 +44,7 @@ func (repository *MockCheckedHeadersRepository) MarkHeaderChecked(headerID int64 return repository.MarkHeaderCheckedReturnError } -func (repository *MockCheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { +func (repository *MockCheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { repository.MissingHeadersStartingBlockNumber = startingBlockNumber repository.MissingHeadersEndingBlockNumber = endingBlockNumber repository.MissingHeadersCheckCount = checkCount