From 06881db3502a49fcf5e6ec2f799ce9910e5c997a Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Thu, 13 Sep 2018 10:05:16 -0500 Subject: [PATCH] Sync unchecked headers for price feeds - Indicate that a header has been checked for logs if no logs returned - Indicate that a header has been checked for logs when persisting a log - Fetch headers that need to be checked based on absence of the above --- ...5159_create_checked_headers_table.down.sql | 1 + ...785159_create_checked_headers_table.up.sql | 5 + db/schema.sql | 62 +++++++++++++ pkg/transformers/price_feeds/repository.go | 29 +++++- .../price_feeds/repository_test.go | 93 +++++++++++++++++-- pkg/transformers/price_feeds/transformer.go | 6 ++ .../price_feeds/transformer_test.go | 36 +++++++ .../test_data/mocks/price_feeds/repository.go | 33 +++++-- 8 files changed, 246 insertions(+), 19 deletions(-) create mode 100644 db/migrations/1536785159_create_checked_headers_table.down.sql create mode 100644 db/migrations/1536785159_create_checked_headers_table.up.sql diff --git a/db/migrations/1536785159_create_checked_headers_table.down.sql b/db/migrations/1536785159_create_checked_headers_table.down.sql new file mode 100644 index 00000000..d9bf78df --- /dev/null +++ b/db/migrations/1536785159_create_checked_headers_table.down.sql @@ -0,0 +1 @@ +DROP TABLE public.checked_headers; \ No newline at end of file diff --git a/db/migrations/1536785159_create_checked_headers_table.up.sql b/db/migrations/1536785159_create_checked_headers_table.up.sql new file mode 100644 index 00000000..6a39e0dd --- /dev/null +++ b/db/migrations/1536785159_create_checked_headers_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE public.checked_headers ( + id SERIAL PRIMARY KEY, + header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE, + price_feeds_checked BOOLEAN NOT NULL DEFAULT FALSE +); \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 0c0ee729..2bb736f4 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -662,6 +662,37 @@ CREATE SEQUENCE public.blocks_id_seq ALTER SEQUENCE public.blocks_id_seq OWNED BY public.blocks.id; +-- +-- Name: checked_headers; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.checked_headers ( + id integer NOT NULL, + header_id integer NOT NULL, + price_feeds_checked boolean DEFAULT false NOT NULL +); + + +-- +-- Name: checked_headers_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.checked_headers_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: checked_headers_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.checked_headers_id_seq OWNED BY public.checked_headers.id; + + -- -- Name: eth_nodes; Type: TABLE; Schema: public; Owner: - -- @@ -1071,6 +1102,13 @@ ALTER TABLE ONLY maker.vat_init ALTER COLUMN id SET DEFAULT nextval('maker.vat_i ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.blocks_id_seq'::regclass); +-- +-- Name: checked_headers id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('public.checked_headers_id_seq'::regclass); + + -- -- Name: eth_nodes id; Type: DEFAULT; Schema: public; Owner: - -- @@ -1375,6 +1413,22 @@ ALTER TABLE ONLY public.blocks ADD CONSTRAINT blocks_pkey PRIMARY KEY (id); +-- +-- Name: checked_headers checked_headers_header_id_key; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_headers + ADD CONSTRAINT checked_headers_header_id_key UNIQUE (header_id); + + +-- +-- Name: checked_headers checked_headers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_headers + ADD CONSTRAINT checked_headers_pkey PRIMARY KEY (id); + + -- -- Name: watched_contracts contract_hash_uc; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1649,6 +1703,14 @@ ALTER TABLE ONLY public.token_supply ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; +-- +-- Name: checked_headers checked_headers_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_headers + ADD CONSTRAINT checked_headers_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; + + -- -- Name: headers eth_nodes_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/pkg/transformers/price_feeds/repository.go b/pkg/transformers/price_feeds/repository.go index 9293927a..59071a25 100644 --- a/pkg/transformers/price_feeds/repository.go +++ b/pkg/transformers/price_feeds/repository.go @@ -21,6 +21,7 @@ import ( type IPriceFeedRepository interface { Create(headerID int64, model PriceFeedModel) error + MarkHeaderChecked(headerID int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -33,8 +34,30 @@ func NewPriceFeedRepository(db *postgres.DB) PriceFeedRepository { } func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedModel) error { - _, err := repository.db.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) + tx, err := repository.db.Begin() + _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw) + if err != nil { + tx.Rollback() + return err + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET price_feeds_checked = $2`, headerID, true) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} + +func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error { + _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET price_feeds_checked = $2`, headerID, true) return err } @@ -43,8 +66,8 @@ func (repository PriceFeedRepository) MissingHeaders(startingBlockNumber, ending err := repository.db.Select( &result, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.price_feeds on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR price_feeds_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, diff --git a/pkg/transformers/price_feeds/repository_test.go b/pkg/transformers/price_feeds/repository_test.go index e2270132..dfa62ee3 100644 --- a/pkg/transformers/price_feeds/repository_test.go +++ b/pkg/transformers/price_feeds/repository_test.go @@ -48,12 +48,28 @@ var _ = Describe("Price feeds repository", func() { Expect(dbPriceFeedUpdate.Raw).To(MatchJSON(test_data.PriceFeedModel.Raw)) }) + It("marks headerID as checked for price feed logs", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + priceFeedRepository := price_feeds.NewPriceFeedRepository(db) + + err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT price_feeds_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + It("does not duplicate price feed updates", func() { db := test_config.NewTestDB(core.Node{}) test_config.CleanTestDB(db) headerRepository := repositories.NewHeaderRepository(db) - blockNumber := uint64(12345) - header := core.Header{BlockNumber: int64(blockNumber)} + header := core.Header{BlockNumber: int64(uint64(12345))} headerID, err := headerRepository.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) priceFeedRepository := price_feeds.NewPriceFeedRepository(db) @@ -66,10 +82,46 @@ var _ = Describe("Price feeds repository", func() { }) }) + Describe("MarkHeaderChecked", func() { + It("creates a row for a new headerID", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + priceFeedRepository := price_feeds.NewPriceFeedRepository(db) + + err = priceFeedRepository.MarkHeaderChecked(headerID) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT price_feeds_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerID already exists", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + priceFeedRepository := price_feeds.NewPriceFeedRepository(db) + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) + + err = priceFeedRepository.MarkHeaderChecked(headerID) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT price_feeds_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + Describe("MissingHeaders", func() { - It("returns headers with no associated price feed event", func() { - node := core.Node{} - db := test_config.NewTestDB(node) + It("returns headers that haven't been checked", func() { + db := test_config.NewTestDB(core.Node{}) test_config.CleanTestDB(db) headerRepository := repositories.NewHeaderRepository(db) startingBlockNumber := int64(1) @@ -83,7 +135,7 @@ var _ = Describe("Price feeds repository", func() { Expect(err).NotTo(HaveOccurred()) } priceFeedRepository := price_feeds.NewPriceFeedRepository(db) - err := priceFeedRepository.Create(headerIDs[1], test_data.PriceFeedModel) + err := priceFeedRepository.MarkHeaderChecked(headerIDs[1]) Expect(err).NotTo(HaveOccurred()) headers, err := priceFeedRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) @@ -94,6 +146,33 @@ var _ = Describe("Price feeds repository", func() { Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) }) + It("only treats headers as checked if price feeds have been checked", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + startingBlockNumber := int64(1) + priceFeedBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, priceFeedBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + priceFeedRepository := price_feeds.NewPriceFeedRepository(db) + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + Expect(err).NotTo(HaveOccurred()) + + headers, err := priceFeedRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(3)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(priceFeedBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(priceFeedBlockNumber))) + Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(priceFeedBlockNumber))) + }) + It("only returns headers associated with the current node", func() { nodeOne := core.Node{} db := test_config.NewTestDB(nodeOne) @@ -113,7 +192,7 @@ var _ = Describe("Price feeds repository", func() { } priceFeedRepository := price_feeds.NewPriceFeedRepository(db) priceFeedRepositoryTwo := price_feeds.NewPriceFeedRepository(dbTwo) - err := priceFeedRepository.Create(headerIDs[0], test_data.PriceFeedModel) + err := priceFeedRepository.MarkHeaderChecked(headerIDs[0]) Expect(err).NotTo(HaveOccurred()) nodeOneMissingHeaders, err := priceFeedRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) diff --git a/pkg/transformers/price_feeds/transformer.go b/pkg/transformers/price_feeds/transformer.go index 4cff92f9..cd3bacb6 100644 --- a/pkg/transformers/price_feeds/transformer.go +++ b/pkg/transformers/price_feeds/transformer.go @@ -53,6 +53,12 @@ func (transformer PriceFeedTransformer) Execute() error { if err != nil { return err } + if len(logs) < 1 { + err := transformer.Repository.MarkHeaderChecked(header.Id) + if err != nil { + return err + } + } for _, log := range logs { model, err := transformer.Converter.ToModel(log, header.Id) if err != nil { diff --git a/pkg/transformers/price_feeds/transformer_test.go b/pkg/transformers/price_feeds/transformer_test.go index 50ccf80e..3fbc6b26 100644 --- a/pkg/transformers/price_feeds/transformer_test.go +++ b/pkg/transformers/price_feeds/transformer_test.go @@ -96,6 +96,42 @@ var _ = Describe("Price feed transformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) + It("marks header checked if no logs returned", func() { + mockConverter := &price_feeds_mocks.MockPriceFeedConverter{} + mockRepository := &price_feeds_mocks.MockPriceFeedRepository{} + headerID := int64(123) + mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}}) + mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{} + transformer := price_feeds.PriceFeedTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &price_feeds_mocks.MockPriceFeedConverter{} + mockRepository := &price_feeds_mocks.MockPriceFeedRepository{} + mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{} + transformer := price_feeds.PriceFeedTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + It("converts log to a model", func() { mockConverter := &price_feeds_mocks.MockPriceFeedConverter{} mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{} diff --git a/pkg/transformers/test_data/mocks/price_feeds/repository.go b/pkg/transformers/test_data/mocks/price_feeds/repository.go index bd685c77..f56c62a7 100644 --- a/pkg/transformers/test_data/mocks/price_feeds/repository.go +++ b/pkg/transformers/test_data/mocks/price_feeds/repository.go @@ -22,19 +22,25 @@ import ( ) type MockPriceFeedRepository struct { - createErr error - missingHeaders []core.Header - missingHeadersErr error - passedEndingBlockNumber int64 - passedHeaderID int64 - passedModel price_feeds.PriceFeedModel - passedStartingBlockNumber int64 + createErr error + createPassedHeaderID int64 + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderID int64 + missingHeaders []core.Header + missingHeadersErr error + passedEndingBlockNumber int64 + passedModel price_feeds.PriceFeedModel + passedStartingBlockNumber int64 } func (repository *MockPriceFeedRepository) SetCreateErr(err error) { repository.createErr = err } +func (repository *MockPriceFeedRepository) SetMarkHeaderCheckedErr(err error) { + repository.markHeaderCheckedErr = err +} + func (repository *MockPriceFeedRepository) SetMissingHeadersErr(err error) { repository.missingHeadersErr = err } @@ -44,11 +50,16 @@ func (repository *MockPriceFeedRepository) SetMissingHeaders(headers []core.Head } func (repository *MockPriceFeedRepository) Create(headerID int64, model price_feeds.PriceFeedModel) error { - repository.passedHeaderID = headerID + repository.createPassedHeaderID = headerID repository.passedModel = model return repository.createErr } +func (repository *MockPriceFeedRepository) MarkHeaderChecked(headerID int64) error { + repository.markHeaderCheckedPassedHeaderID = headerID + return repository.markHeaderCheckedErr +} + func (repository *MockPriceFeedRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { repository.passedStartingBlockNumber = startingBlockNumber repository.passedEndingBlockNumber = endingBlockNumber @@ -56,10 +67,14 @@ func (repository *MockPriceFeedRepository) MissingHeaders(startingBlockNumber, e } func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, model price_feeds.PriceFeedModel) { - Expect(repository.passedHeaderID).To(Equal(headerID)) + Expect(repository.createPassedHeaderID).To(Equal(headerID)) Expect(repository.passedModel).To(Equal(model)) } +func (repository *MockPriceFeedRepository) AssertMarkHeaderCheckedCalledWith(headerID int64) { + Expect(repository.markHeaderCheckedPassedHeaderID).To(Equal(headerID)) +} + func (repository *MockPriceFeedRepository) AssertMissingHeadersCalledwith(startingBlockNumber, endingBlockNumber int64) { Expect(repository.passedStartingBlockNumber).To(Equal(startingBlockNumber)) Expect(repository.passedEndingBlockNumber).To(Equal(endingBlockNumber))