From c3f1fcf7968a83fcf71bd70939118b4d7e8363bf Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 19 Sep 2018 15:59:17 -0500 Subject: [PATCH] Log checked headers for deal events - mark header as checked if no matching logs - mark header as checked if all matching logs persisted successfully - batch convert and insert matching logs to enable transactional persist - enable skipping previously checked block headers when restarting a sync --- ...89741_add_deal_to_checked_headers.down.sql | 2 + ...7389741_add_deal_to_checked_headers.up.sql | 2 + db/schema.sql | 3 +- pkg/transformers/deal/converter.go | 36 +++++----- pkg/transformers/deal/converter_test.go | 13 ++-- pkg/transformers/deal/repository.go | 47 ++++++++++--- pkg/transformers/deal/repository_test.go | 66 ++++++++++++++++++- pkg/transformers/deal/transformer.go | 20 +++--- pkg/transformers/deal/transformer_test.go | 36 ++++++++++ .../test_data/mocks/deal/converter.go | 6 +- .../test_data/mocks/deal/repository.go | 35 +++++++--- 11 files changed, 211 insertions(+), 55 deletions(-) create mode 100644 db/migrations/1537389741_add_deal_to_checked_headers.down.sql create mode 100644 db/migrations/1537389741_add_deal_to_checked_headers.up.sql diff --git a/db/migrations/1537389741_add_deal_to_checked_headers.down.sql b/db/migrations/1537389741_add_deal_to_checked_headers.down.sql new file mode 100644 index 00000000..be0aedb0 --- /dev/null +++ b/db/migrations/1537389741_add_deal_to_checked_headers.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + DROP COLUMN deal_checked; \ No newline at end of file diff --git a/db/migrations/1537389741_add_deal_to_checked_headers.up.sql b/db/migrations/1537389741_add_deal_to_checked_headers.up.sql new file mode 100644 index 00000000..4461287e --- /dev/null +++ b/db/migrations/1537389741_add_deal_to_checked_headers.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + ADD COLUMN deal_checked BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 2e9a909c..b2d4dbad 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -669,7 +669,8 @@ ALTER SEQUENCE public.blocks_id_seq OWNED BY public.blocks.id; CREATE TABLE public.checked_headers ( id integer NOT NULL, header_id integer NOT NULL, - price_feeds_checked boolean DEFAULT false NOT NULL + price_feeds_checked boolean DEFAULT false NOT NULL, + deal_checked boolean DEFAULT false NOT NULL ); diff --git a/pkg/transformers/deal/converter.go b/pkg/transformers/deal/converter.go index fffda063..a544b190 100644 --- a/pkg/transformers/deal/converter.go +++ b/pkg/transformers/deal/converter.go @@ -22,7 +22,7 @@ import ( ) type Converter interface { - ToModel(ethLog types.Log) (DealModel, error) + ToModels(ethLog []types.Log) ([]DealModel, error) } type DealConverter struct{} @@ -31,23 +31,27 @@ func NewDealConverter() DealConverter { return DealConverter{} } -func (DealConverter) ToModel(ethLog types.Log) (DealModel, error) { - err := validateLog(ethLog) - if err != nil { - return DealModel{}, err - } +func (DealConverter) ToModels(ethLogs []types.Log) (result []DealModel, err error) { + for _, log := range ethLogs { + err := validateLog(log) + if err != nil { + return nil, err + } - bidId := ethLog.Topics[2].Big() - raw, err := json.Marshal(ethLog) - if err != nil { - return DealModel{}, err - } + bidId := log.Topics[2].Big() + raw, err := json.Marshal(log) + if err != nil { + return nil, err + } - return DealModel{ - BidId: bidId.String(), - TransactionIndex: ethLog.TxIndex, - Raw: raw, - }, nil + model := DealModel{ + BidId: bidId.String(), + TransactionIndex: log.TxIndex, + Raw: raw, + } + result = append(result, model) + } + return result, nil } func validateLog(ethLog types.Log) error { diff --git a/pkg/transformers/deal/converter_test.go b/pkg/transformers/deal/converter_test.go index f28478cd..8de0b58b 100644 --- a/pkg/transformers/deal/converter_test.go +++ b/pkg/transformers/deal/converter_test.go @@ -19,27 +19,30 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/transformers/deal" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" ) var _ = Describe("Flip Deal Converter", func() { - It("converts a log to a model", func() { + It("converts logs to models", func() { converter := deal.DealConverter{} - model, err := converter.ToModel(test_data.DealLogNote) + models, err := converter.ToModels([]types.Log{test_data.DealLogNote}) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.DealModel)) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.DealModel)) }) It("returns an error if the expected amount of topics aren't in the log", func() { converter := deal.DealConverter{} invalidLog := test_data.DealLogNote invalidLog.Topics = []common.Hash{} - model, err := converter.ToModel(invalidLog) + + _, err := converter.ToModels([]types.Log{invalidLog}) + Expect(err).To(HaveOccurred()) Expect(err).To(MatchError("deal log does not contain expected topics")) - Expect(model).To(Equal(deal.DealModel{})) }) }) diff --git a/pkg/transformers/deal/repository.go b/pkg/transformers/deal/repository.go index 4f53467b..c851518c 100644 --- a/pkg/transformers/deal/repository.go +++ b/pkg/transformers/deal/repository.go @@ -20,7 +20,8 @@ import ( ) type Repository interface { - Create(headerId int64, model DealModel) error + Create(headerId int64, models []DealModel) error + MarkHeaderChecked(headerID int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } type DealRepository struct { @@ -30,21 +31,47 @@ type DealRepository struct { func NewDealRepository(database *postgres.DB) DealRepository { return DealRepository{db: database} } -func (r DealRepository) Create(headerId int64, model DealModel) error { - _, err := r.db.Exec( - `INSERT into maker.deal (header_id, bid_id, tx_idx, raw_log) +func (r DealRepository) Create(headerId int64, models []DealModel) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + for _, model := range models { + _, err = tx.Exec( + `INSERT into maker.deal (header_id, bid_id, tx_idx, raw_log) VALUES($1, $2, $3, $4)`, - headerId, model.BidId, model.TransactionIndex, model.Raw, - ) + headerId, model.BidId, model.TransactionIndex, model.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, deal_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET deal_checked = $2`, headerId, true) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} + +func (r DealRepository) MarkHeaderChecked(headerID int64) error { + _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, deal_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET deal_checked = $2`, headerID, true) return err } + func (r DealRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { var missingHeaders []core.Header - err := r.db.Select( - &missingHeaders, + err := r.db.Select(&missingHeaders, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.deal on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR deal_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, diff --git a/pkg/transformers/deal/repository_test.go b/pkg/transformers/deal/repository_test.go index 4bb8539d..f0614f57 100644 --- a/pkg/transformers/deal/repository_test.go +++ b/pkg/transformers/deal/repository_test.go @@ -43,11 +43,12 @@ var _ = Describe("Deal Repository", func() { dealRepository = deal.NewDealRepository(db) headerRepository = repositories.NewHeaderRepository(db) }) + Describe("Create", func() { BeforeEach(func() { headerId, err = headerRepository.CreateOrUpdateHeader(core.Header{}) Expect(err).NotTo(HaveOccurred()) - err := dealRepository.Create(headerId, test_data.DealModel) + err := dealRepository.Create(headerId, []deal.DealModel{test_data.DealModel}) Expect(err).NotTo(HaveOccurred()) }) @@ -63,8 +64,15 @@ var _ = Describe("Deal Repository", func() { Expect(dbResult.Raw).To(MatchJSON(test_data.DealModel.Raw)) }) + It("marks header as checked for logs", func() { + var headerChecked bool + err = db.Get(&headerChecked, `SELECT deal_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + It("returns an error if inserting a deal record fails", func() { - err = dealRepository.Create(headerId, test_data.DealModel) + err = dealRepository.Create(headerId, []deal.DealModel{test_data.DealModel}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) }) @@ -82,6 +90,35 @@ var _ = Describe("Deal Repository", func() { }) }) + Describe("MarkHeaderChecked", func() { + BeforeEach(func() { + headerId, err = headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("creates a row for a new headerID", func() { + err = dealRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT deal_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerID already exists", func() { + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) + + err = dealRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT deal_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + Describe("MissingHeaders", func() { var dealBlockNumber int64 var startingBlockNumber int64 @@ -100,7 +137,7 @@ var _ = Describe("Deal Repository", func() { Expect(err).NotTo(HaveOccurred()) headerIds = append(headerIds, headerId) } - dealRepository.Create(headerIds[1], test_data.DealModel) + dealRepository.MarkHeaderChecked(headerIds[1]) }) It("returns header records that don't have a corresponding deals", func() { @@ -111,6 +148,29 @@ var _ = Describe("Deal Repository", func() { Expect(missingHeaders[1].BlockNumber).To(Equal(endingBlockNumber)) }) + It("only treats headers as checked if deal have been checked", func() { + startingBlockNumber := int64(1) + dealBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, dealBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) VALUES ($1, $2)`, headerIDs[1], true) + Expect(err).NotTo(HaveOccurred()) + + headers, err := dealRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(3)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dealBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dealBlockNumber))) + Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dealBlockNumber))) + }) + It("only returns missing headers for the given node", func() { node2 := core.Node{} db2 := test_config.NewTestDB(node2) diff --git a/pkg/transformers/deal/transformer.go b/pkg/transformers/deal/transformer.go index 4a8047c6..1325619a 100644 --- a/pkg/transformers/deal/transformer.go +++ b/pkg/transformers/deal/transformer.go @@ -63,18 +63,22 @@ func (t DealTransformer) Execute() error { log.Println("Error fetching deal logs:", err) return err } - for _, ethLog := range ethLogs { - model, err := t.Converter.ToModel(ethLog) + if len(ethLogs) < 1 { + err := t.Repository.MarkHeaderChecked(header.Id) if err != nil { - log.Println("Error converting deal log", err) - return err - } - err = t.Repository.Create(header.Id, model) - if err != nil { - log.Println("Error persisting deal record", err) return err } } + models, err := t.Converter.ToModels(ethLogs) + if err != nil { + log.Println("Error converting deal log", err) + return err + } + err = t.Repository.Create(header.Id, models) + if err != nil { + log.Println("Error persisting deal record", err) + return err + } } return err } diff --git a/pkg/transformers/deal/transformer_test.go b/pkg/transformers/deal/transformer_test.go index 95c0e7b5..fd0c0ac0 100644 --- a/pkg/transformers/deal/transformer_test.go +++ b/pkg/transformers/deal/transformer_test.go @@ -82,6 +82,42 @@ var _ = Describe("DealTransformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) + It("marks header checked if no logs returned", func() { + mockConverter := &deal_mocks.MockDealConverter{} + mockRepository := &deal_mocks.MockDealRepository{} + headerID := int64(123) + mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}}) + mockFetcher := &mocks.MockLogFetcher{} + transformer := deal.DealTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &deal_mocks.MockDealConverter{} + mockRepository := &deal_mocks.MockDealRepository{} + mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &mocks.MockLogFetcher{} + transformer := deal.DealTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + It("converts each eth log to a Model", func() { dealRepository.SetMissingHeaders([]core.Header{{}}) fetcher.SetFetchedLogs([]types.Log{test_data.DealLogNote}) diff --git a/pkg/transformers/test_data/mocks/deal/converter.go b/pkg/transformers/test_data/mocks/deal/converter.go index 083da8ee..2aa7bbf0 100644 --- a/pkg/transformers/test_data/mocks/deal/converter.go +++ b/pkg/transformers/test_data/mocks/deal/converter.go @@ -28,9 +28,9 @@ type MockDealConverter struct { ConverterError error } -func (c *MockDealConverter) ToModel(ethLog types.Log) (deal.DealModel, error) { - c.LogsToConvert = append(c.LogsToConvert, ethLog) - return test_data.DealModel, c.ConverterError +func (c *MockDealConverter) ToModels(ethLogs []types.Log) ([]deal.DealModel, error) { + c.LogsToConvert = append(c.LogsToConvert, ethLogs...) + return []deal.DealModel{test_data.DealModel}, c.ConverterError } func (c *MockDealConverter) SetConverterError(err error) { diff --git a/pkg/transformers/test_data/mocks/deal/repository.go b/pkg/transformers/test_data/mocks/deal/repository.go index b99d3628..49dfc69b 100644 --- a/pkg/transformers/test_data/mocks/deal/repository.go +++ b/pkg/transformers/test_data/mocks/deal/repository.go @@ -15,23 +15,27 @@ package deal import ( + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/transformers/deal" ) type MockDealRepository struct { - createError error - PassedEndingBlockNumber int64 - PassedHeaderIDs []int64 - PassedStartingBlockNumber int64 - PassedDealModels []deal.DealModel - missingHeaders []core.Header - missingHeadersErr error + createError error + PassedEndingBlockNumber int64 + PassedHeaderIDs []int64 + PassedStartingBlockNumber int64 + PassedDealModels []deal.DealModel + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderID int64 + missingHeaders []core.Header + missingHeadersErr error } -func (repository *MockDealRepository) Create(headerId int64, deal deal.DealModel) error { +func (repository *MockDealRepository) Create(headerId int64, deals []deal.DealModel) error { repository.PassedHeaderIDs = append(repository.PassedHeaderIDs, headerId) - repository.PassedDealModels = append(repository.PassedDealModels, deal) + repository.PassedDealModels = append(repository.PassedDealModels, deals...) return repository.createError } @@ -39,6 +43,10 @@ func (repository *MockDealRepository) SetCreateError(err error) { repository.createError = err } +func (repository *MockDealRepository) SetMarkHeaderCheckedErr(err error) { + repository.markHeaderCheckedErr = err +} + func (repository *MockDealRepository) SetMissingHeadersErr(err error) { repository.missingHeadersErr = err } @@ -47,8 +55,17 @@ func (repository *MockDealRepository) SetMissingHeaders(headers []core.Header) { repository.missingHeaders = headers } +func (repository *MockDealRepository) MarkHeaderChecked(headerID int64) error { + repository.markHeaderCheckedPassedHeaderID = headerID + return repository.markHeaderCheckedErr +} + func (repository *MockDealRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber return repository.missingHeaders, repository.missingHeadersErr } + +func (repository *MockDealRepository) AssertMarkHeaderCheckedCalledWith(headerID int64) { + Expect(repository.markHeaderCheckedPassedHeaderID).To(Equal(headerID)) +}