diff --git a/db/migrations/1537467309_add_dent_to_checked_headers.down.sql b/db/migrations/1537467309_add_dent_to_checked_headers.down.sql new file mode 100644 index 00000000..95edcfef --- /dev/null +++ b/db/migrations/1537467309_add_dent_to_checked_headers.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + DROP COLUMN dent_checked; \ No newline at end of file diff --git a/db/migrations/1537467309_add_dent_to_checked_headers.up.sql b/db/migrations/1537467309_add_dent_to_checked_headers.up.sql new file mode 100644 index 00000000..5cc89a91 --- /dev/null +++ b/db/migrations/1537467309_add_dent_to_checked_headers.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + ADD COLUMN dent_checked BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index b2d4dbad..95db74be 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -670,7 +670,8 @@ CREATE TABLE public.checked_headers ( id integer NOT NULL, header_id integer NOT NULL, price_feeds_checked boolean DEFAULT false NOT NULL, - deal_checked boolean DEFAULT false NOT NULL + deal_checked boolean DEFAULT false NOT NULL, + dent_checked boolean DEFAULT false NOT NULL ); diff --git a/pkg/transformers/dent/converter.go b/pkg/transformers/dent/converter.go index 8b7d4acb..801db498 100644 --- a/pkg/transformers/dent/converter.go +++ b/pkg/transformers/dent/converter.go @@ -24,7 +24,7 @@ import ( ) type Converter interface { - ToModel(ethLog types.Log) (DentModel, error) + ToModels(ethLogs []types.Log) ([]DentModel, error) } type DentConverter struct{} @@ -33,36 +33,40 @@ func NewDentConverter() DentConverter { return DentConverter{} } -func (c DentConverter) ToModel(ethLog types.Log) (DentModel, error) { - err := validateLog(ethLog) - if err != nil { - return DentModel{}, err +func (c DentConverter) ToModels(ethLogs []types.Log) (result []DentModel, err error) { + for _, log := range ethLogs { + err := validateLog(log) + if err != nil { + return nil, err + } + + bidId := log.Topics[2].Big() + lot := log.Topics[3].Big().String() + bidValue := getBidValue(log) + guy := common.HexToAddress(log.Topics[1].Hex()).String() + tic := "0" + //TODO: it is likely that the tic value will need to be added to an emitted event, + //so this will need to be updated at that point + + transactionIndex := log.TxIndex + + raw, err := json.Marshal(log) + if err != nil { + return nil, err + } + + model := DentModel{ + BidId: bidId.String(), + Lot: lot, + Bid: bidValue, + Guy: guy, + Tic: tic, + TransactionIndex: transactionIndex, + Raw: raw, + } + result = append(result, model) } - - bidId := ethLog.Topics[2].Big() - lot := ethLog.Topics[3].Big().String() - bidValue := getBidValue(ethLog) - guy := common.HexToAddress(ethLog.Topics[1].Hex()).String() - tic := "0" - //TODO: it is likely that the tic value will need to be added to an emitted event, - //so this will need to be updated at that point - - transactionIndex := ethLog.TxIndex - - raw, err := json.Marshal(ethLog) - if err != nil { - return DentModel{}, err - } - - return DentModel{ - BidId: bidId.String(), - Lot: lot, - Bid: bidValue, - Guy: guy, - Tic: tic, - TransactionIndex: transactionIndex, - Raw: raw, - }, nil + return result, err } func validateLog(ethLog types.Log) error { diff --git a/pkg/transformers/dent/converter_test.go b/pkg/transformers/dent/converter_test.go index 8e470188..4337eabb 100644 --- a/pkg/transformers/dent/converter_test.go +++ b/pkg/transformers/dent/converter_test.go @@ -19,6 +19,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/transformers/dent" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" ) @@ -31,27 +32,26 @@ var _ = Describe("Dent Converter", func() { }) It("converts an eth log to a db model", func() { - model, err := converter.ToModel(test_data.DentLog) + models, err := converter.ToModels([]types.Log{test_data.DentLog}) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.DentModel)) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.DentModel)) }) It("returns an error if the expected amount of topics aren't in the log", func() { invalidLog := test_data.DentLog invalidLog.Topics = []common.Hash{} - model, err := converter.ToModel(invalidLog) + _, err := converter.ToModels([]types.Log{invalidLog}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError("dent log does not contain expected topics")) - Expect(model).To(Equal(dent.DentModel{})) }) It("returns an error if the log data is empty", func() { emptyDataLog := test_data.DentLog emptyDataLog.Data = []byte{} - model, err := converter.ToModel(emptyDataLog) + _, err := converter.ToModels([]types.Log{emptyDataLog}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError("dent log data is empty")) - Expect(model).To(Equal(dent.DentModel{})) }) }) diff --git a/pkg/transformers/dent/repository.go b/pkg/transformers/dent/repository.go index 47678c66..c4d8b2cd 100644 --- a/pkg/transformers/dent/repository.go +++ b/pkg/transformers/dent/repository.go @@ -20,7 +20,8 @@ import ( ) type Repository interface { - Create(headerId int64, model DentModel) error + Create(headerId int64, models []DentModel) error + MarkHeaderChecked(headerId int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -32,13 +33,38 @@ func NewDentRepository(database *postgres.DB) DentRepository { return DentRepository{db: database} } -func (r DentRepository) Create(headerId int64, model DentModel) error { - _, err := r.db.Exec( - `INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, tx_idx, raw_log) +func (r DentRepository) Create(headerId int64, models []DentModel) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + for _, model := range models { + _, err = tx.Exec( + `INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, tx_idx, raw_log) VALUES($1, $2, $3, $4, $5, $6, $7, $8)`, - headerId, model.BidId, model.Lot, model.Bid, model.Guy, model.Tic, model.TransactionIndex, model.Raw, - ) + headerId, model.BidId, model.Lot, model.Bid, model.Guy, model.Tic, model.TransactionIndex, model.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, dent_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET dent_checked = $2`, headerId, true) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} +func (r DentRepository) MarkHeaderChecked(headerId int64) error { + _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, dent_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET dent_checked = $2`, headerId, true) return err } @@ -48,8 +74,8 @@ func (r DentRepository) MissingHeaders(startingBlockNumber, endingBlockNumber in err := r.db.Select( &missingHeaders, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.dent on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR dent_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, diff --git a/pkg/transformers/dent/repository_test.go b/pkg/transformers/dent/repository_test.go index 6d286187..b173ddb4 100644 --- a/pkg/transformers/dent/repository_test.go +++ b/pkg/transformers/dent/repository_test.go @@ -49,7 +49,7 @@ var _ = Describe("Dent Repository", func() { headerId, err = headerRepository.CreateOrUpdateHeader(core.Header{}) Expect(err).NotTo(HaveOccurred()) - err := dentRepository.Create(headerId, test_data.DentModel) + err := dentRepository.Create(headerId, []dent.DentModel{test_data.DentModel}) Expect(err).NotTo(HaveOccurred()) }) @@ -70,8 +70,15 @@ var _ = Describe("Dent Repository", func() { Expect(dbResult.Raw).To(MatchJSON(test_data.DentModel.Raw)) }) + It("marks header as checked for logs", func() { + var headerChecked bool + err = db.Get(&headerChecked, `SELECT dent_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + It("returns an error if inserting a dent record fails", func() { - err = dentRepository.Create(headerId, test_data.DentModel) + err = dentRepository.Create(headerId, []dent.DentModel{test_data.DentModel}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) }) @@ -91,6 +98,36 @@ var _ = Describe("Dent Repository", func() { }) }) + Describe("MarkHeaderChecked", func() { + BeforeEach(func() { + headerId, err = headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("creates a row for a new headerId", func() { + err = dentRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT dent_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerId already exists", func() { + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) + Expect(err).NotTo(HaveOccurred()) + + err = dentRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT dent_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + Describe("MissingHeaders", func() { var dentBlockNumber int64 var startingBlockNumber int64 @@ -112,10 +149,10 @@ var _ = Describe("Dent Repository", func() { headerIds = append(headerIds, headerId) } - dentRepository.Create(headerIds[1], test_data.DentModel) + dentRepository.MarkHeaderChecked(headerIds[1]) }) - It("returns header records that don't have a corresponding dents", func() { + It("returns header records that haven't been checked", func() { missingHeaders, err := dentRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) Expect(err).NotTo(HaveOccurred()) @@ -124,6 +161,29 @@ var _ = Describe("Dent Repository", func() { Expect(missingHeaders[1].BlockNumber).To(Equal(endingBlockNumber)) }) + It("only treats headers as checked if deal have been checked", func() { + startingBlockNumber := int64(1) + dentBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, dentBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) VALUES ($1, $2)`, headerIDs[1], true) + Expect(err).NotTo(HaveOccurred()) + + headers, err := dentRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(3)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + }) + It("only returns missing headers for the given node", func() { node2 := core.Node{} db2 := test_config.NewTestDB(node2) diff --git a/pkg/transformers/dent/transformer.go b/pkg/transformers/dent/transformer.go index 8c047c89..e0ab0679 100644 --- a/pkg/transformers/dent/transformer.go +++ b/pkg/transformers/dent/transformer.go @@ -54,26 +54,27 @@ func (t DentTransformer) Execute() error { log.Printf("Fetching dent event logs for %d headers \n", len(headers)) for _, header := range headers { ethLogs, err := t.Fetcher.FetchLogs(config.ContractAddress, topics, header.BlockNumber) - if err != nil { log.Println("Error fetching dent logs:", err) return err } - - for _, ethLog := range ethLogs { - model, err := t.Converter.ToModel(ethLog) - + if len(ethLogs) < 1 { + err := t.Repository.MarkHeaderChecked(header.Id) if err != nil { - log.Println("Error converting dent log", err) return err } + } - err = t.Repository.Create(header.Id, model) + models, err := t.Converter.ToModels(ethLogs) + if err != nil { + log.Println("Error converting dent log", err) + return err + } - if err != nil { - log.Println("Error persisting dent record", err) - return err - } + err = t.Repository.Create(header.Id, models) + if err != nil { + log.Println("Error persisting dent record", err) + return err } } diff --git a/pkg/transformers/dent/transformer_test.go b/pkg/transformers/dent/transformer_test.go index 90146894..fae5c0f1 100644 --- a/pkg/transformers/dent/transformer_test.go +++ b/pkg/transformers/dent/transformer_test.go @@ -63,6 +63,7 @@ var _ = Describe("DentTransformer", func() { err := transformer.Execute() Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) It("fetches logs for each missing header", func() { @@ -87,6 +88,42 @@ var _ = Describe("DentTransformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) + It("marks header checked if no logs returned", func() { + mockConverter := &dent_mocks.MockDentConverter{} + mockRepository := &dent_mocks.MockDentRepository{} + headerID := int64(123) + mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}}) + mockFetcher := &mocks.MockLogFetcher{} + transformer := dent.DentTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &dent_mocks.MockDentConverter{} + mockRepository := &dent_mocks.MockDentRepository{} + mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &mocks.MockLogFetcher{} + transformer := dent.DentTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + It("converts each eth log to a Model", func() { dentRepository.SetMissingHeaders([]core.Header{{}}) fetcher.SetFetchedLogs([]types.Log{test_data.DentLog}) diff --git a/pkg/transformers/test_data/mocks/dent/converter.go b/pkg/transformers/test_data/mocks/dent/converter.go index 22701ee9..5e50756e 100644 --- a/pkg/transformers/test_data/mocks/dent/converter.go +++ b/pkg/transformers/test_data/mocks/dent/converter.go @@ -28,9 +28,9 @@ type MockDentConverter struct { LogsToConvert []types.Log } -func (c *MockDentConverter) ToModel(ethLog types.Log) (dent.DentModel, error) { - c.LogsToConvert = append(c.LogsToConvert, ethLog) - return test_data.DentModel, c.converterError +func (c *MockDentConverter) ToModels(ethLogs []types.Log) ([]dent.DentModel, error) { + c.LogsToConvert = append(c.LogsToConvert, ethLogs...) + return []dent.DentModel{test_data.DentModel}, c.converterError } func (c *MockDentConverter) SetConverterError(err error) { diff --git a/pkg/transformers/test_data/mocks/dent/repository.go b/pkg/transformers/test_data/mocks/dent/repository.go index 48dc657e..30b7e1d6 100644 --- a/pkg/transformers/test_data/mocks/dent/repository.go +++ b/pkg/transformers/test_data/mocks/dent/repository.go @@ -15,27 +15,36 @@ package dent import ( + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/transformers/dent" ) type MockDentRepository struct { - PassedStartingBlockNumber int64 - PassedEndingBlockNumber int64 - PassedDentModels []dent.DentModel - PassedHeaderIds []int64 - missingHeaders []core.Header - missingHeadersError error - createError error + PassedStartingBlockNumber int64 + PassedEndingBlockNumber int64 + PassedDentModels []dent.DentModel + PassedHeaderIds []int64 + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderId int64 + missingHeaders []core.Header + missingHeadersError error + createError error } -func (r *MockDentRepository) Create(headerId int64, model dent.DentModel) error { +func (r *MockDentRepository) Create(headerId int64, models []dent.DentModel) error { r.PassedHeaderIds = append(r.PassedHeaderIds, headerId) - r.PassedDentModels = append(r.PassedDentModels, model) + r.PassedDentModels = append(r.PassedDentModels, models...) return r.createError } +func (r *MockDentRepository) MarkHeaderChecked(headerId int64) error { + r.markHeaderCheckedPassedHeaderId = headerId + return r.markHeaderCheckedErr +} + func (r *MockDentRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { r.PassedStartingBlockNumber = startingBlockNumber r.PassedEndingBlockNumber = endingBlockNumber @@ -43,6 +52,10 @@ func (r *MockDentRepository) MissingHeaders(startingBlockNumber, endingBlockNumb return r.missingHeaders, r.missingHeadersError } +func (r *MockDentRepository) SetMarkHeaderCheckedErr(err error) { + r.markHeaderCheckedErr = err +} + func (r *MockDentRepository) SetMissingHeadersError(err error) { r.missingHeadersError = err } @@ -54,3 +67,7 @@ func (r *MockDentRepository) SetMissingHeaders(headers []core.Header) { func (r *MockDentRepository) SetCreateError(err error) { r.createError = err } + +func (r *MockDentRepository) AssertMarkHeaderCheckedCalledWith(headerId int64) { + Expect(r.markHeaderCheckedPassedHeaderId).To(Equal(headerId)) +}