diff --git a/db/migrations/1537474562_add_tend_to_checked_headers.down.sql b/db/migrations/1537474562_add_tend_to_checked_headers.down.sql new file mode 100644 index 00000000..7e1bc235 --- /dev/null +++ b/db/migrations/1537474562_add_tend_to_checked_headers.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + DROP COLUMN tend_checked; \ No newline at end of file diff --git a/db/migrations/1537474562_add_tend_to_checked_headers.up.sql b/db/migrations/1537474562_add_tend_to_checked_headers.up.sql new file mode 100644 index 00000000..a76e05eb --- /dev/null +++ b/db/migrations/1537474562_add_tend_to_checked_headers.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + ADD COLUMN tend_checked BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index c255742e..030145e7 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -672,7 +672,8 @@ CREATE TABLE public.checked_headers ( price_feeds_checked boolean DEFAULT false NOT NULL, deal_checked boolean DEFAULT false NOT NULL, dent_checked boolean DEFAULT false NOT NULL, - flip_kick_checked boolean DEFAULT false NOT NULL + flip_kick_checked boolean DEFAULT false NOT NULL, + tend_checked boolean DEFAULT false NOT NULL ); diff --git a/pkg/transformers/tend/converter.go b/pkg/transformers/tend/converter.go index f27dfef9..827a500d 100644 --- a/pkg/transformers/tend/converter.go +++ b/pkg/transformers/tend/converter.go @@ -24,7 +24,7 @@ import ( ) type Converter interface { - Convert(contractAddress string, contractAbi string, ethLog types.Log) (TendModel, error) + Convert(ethLogs []types.Log) ([]TendModel, error) } type TendConverter struct{} @@ -33,40 +33,44 @@ func NewTendConverter() TendConverter { return TendConverter{} } -func (c TendConverter) Convert(contractAddress string, contractAbi string, ethLog types.Log) (TendModel, error) { - err := validateLog(ethLog) - if err != nil { - return TendModel{}, err +func (c TendConverter) Convert(ethLogs []types.Log) (results []TendModel, err error) { + for _, ethLog := range ethLogs { + err := validateLog(ethLog) + if err != nil { + return nil, err + } + + bidId := ethLog.Topics[2].Big() + guy := common.HexToAddress(ethLog.Topics[1].Hex()).String() + lot := ethLog.Topics[3].Big().String() + + lastDataItemStartIndex := len(ethLog.Data) - 32 + lastItem := ethLog.Data[lastDataItemStartIndex:] + last := big.NewInt(0).SetBytes(lastItem) + bidValue := last.String() + tic := "0" + //TODO: it is likely that the tic value will need to be added to an emitted event, + //so this will need to be updated at that point + transactionIndex := ethLog.TxIndex + + rawJson, err := json.Marshal(ethLog) + if err != nil { + return nil, err + } + raw := string(rawJson) + + model := TendModel{ + BidId: bidId.String(), + Lot: lot, + Bid: bidValue, + Guy: guy, + Tic: tic, + TransactionIndex: transactionIndex, + Raw: raw, + } + results = append(results, model) } - - bidId := ethLog.Topics[2].Big() - guy := common.HexToAddress(ethLog.Topics[1].Hex()).String() - lot := ethLog.Topics[3].Big().String() - - lastDataItemStartIndex := len(ethLog.Data) - 32 - lastItem := ethLog.Data[lastDataItemStartIndex:] - last := big.NewInt(0).SetBytes(lastItem) - bidValue := last.String() - tic := "0" - //TODO: it is likely that the tic value will need to be added to an emitted event, - //so this will need to be updated at that point - transactionIndex := ethLog.TxIndex - - rawJson, err := json.Marshal(ethLog) - if err != nil { - return TendModel{}, err - } - raw := string(rawJson) - - return TendModel{ - BidId: bidId.String(), - Lot: lot, - Bid: bidValue, - Guy: guy, - Tic: tic, - TransactionIndex: transactionIndex, - Raw: raw, - }, nil + return results, err } func validateLog(ethLog types.Log) error { diff --git a/pkg/transformers/tend/converter_test.go b/pkg/transformers/tend/converter_test.go index decc5450..d090f9cd 100644 --- a/pkg/transformers/tend/converter_test.go +++ b/pkg/transformers/tend/converter_test.go @@ -19,7 +19,7 @@ import ( . "github.com/onsi/gomega" "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/transformers/tend" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" ) @@ -33,29 +33,28 @@ var _ = Describe("Tend TendConverter", func() { Describe("Convert", func() { It("converts an eth log to a db model", func() { - model, err := converter.Convert(shared.FlipperContractAddress, shared.FlipperABI, test_data.TendLogNote) + models, err := converter.Convert([]types.Log{test_data.TendLogNote}) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.TendModel)) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.TendModel)) }) It("returns an error if the log data is empty", func() { emptyDataLog := test_data.TendLogNote emptyDataLog.Data = []byte{} - model, err := converter.Convert(shared.FlipperContractAddress, shared.FlipperABI, emptyDataLog) + _, err := converter.Convert([]types.Log{emptyDataLog}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError("tend log note data is empty")) - Expect(model).To(Equal(tend.TendModel{})) }) It("returns an error if the expected amount of topics aren't in the log", func() { invalidLog := test_data.TendLogNote invalidLog.Topics = []common.Hash{} - model, err := converter.Convert(shared.FlipperContractAddress, shared.FlipperABI, invalidLog) + _, err := converter.Convert([]types.Log{invalidLog}) Expect(err).To(MatchError("tend log does not contain expected topics")) - Expect(model).To(Equal(tend.TendModel{})) }) }) }) diff --git a/pkg/transformers/tend/repository.go b/pkg/transformers/tend/repository.go index 75ab3386..84cf2d22 100644 --- a/pkg/transformers/tend/repository.go +++ b/pkg/transformers/tend/repository.go @@ -20,7 +20,8 @@ import ( ) type Repository interface { - Create(headerId int64, tend TendModel) error + Create(headerId int64, tends []TendModel) error + MarkHeaderChecked(headerId int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -32,13 +33,38 @@ func NewTendRepository(db *postgres.DB) TendRepository { return TendRepository{DB: db} } -func (r TendRepository) Create(headerId int64, tend TendModel) error { - _, err := r.DB.Exec( - `INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, tx_idx, raw_log) +func (r TendRepository) Create(headerId int64, tends []TendModel) error { + tx, err := r.DB.Begin() + if err != nil { + return err + } + for _, tend := range tends { + _, err = tx.Exec( + `INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, tx_idx, raw_log) VALUES($1, $2, $3, $4, $5, $6, $7, $8)`, - headerId, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tend.Tic, tend.TransactionIndex, tend.Raw, - ) + headerId, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tend.Tic, tend.TransactionIndex, tend.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, tend_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET tend_checked = $2`, headerId, true) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} +func (r TendRepository) MarkHeaderChecked(headerId int64) error { + _, err := r.DB.Exec(`INSERT INTO public.checked_headers (header_id, tend_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET tend_checked = $2`, headerId, true) return err } @@ -47,8 +73,8 @@ func (r TendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber in err := r.DB.Select( &result, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.tend on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR tend_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, diff --git a/pkg/transformers/tend/repository_test.go b/pkg/transformers/tend/repository_test.go index 277cc79a..0f2d84ff 100644 --- a/pkg/transformers/tend/repository_test.go +++ b/pkg/transformers/tend/repository_test.go @@ -49,7 +49,7 @@ var _ = Describe("TendRepository", func() { Describe("Create", func() { It("persists a tend record", func() { - err := tendRepository.Create(headerId, test_data.TendModel) + err := tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel}) Expect(err).NotTo(HaveOccurred()) var count int @@ -70,17 +70,27 @@ var _ = Describe("TendRepository", func() { Expect(dbResult.Raw).To(MatchJSON(test_data.RawLogNoteJson)) }) + It("marks header as checked", func() { + err := tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel}) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT tend_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + It("returns an error if inserting a tend record fails", func() { - err := tendRepository.Create(headerId, test_data.TendModel) + err := tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel}) Expect(err).NotTo(HaveOccurred()) - err = tendRepository.Create(headerId, test_data.TendModel) + err = tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) }) It("deletes the tend record if its corresponding header record is deleted", func() { - err := tendRepository.Create(headerId, test_data.TendModel) + err := tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel}) Expect(err).NotTo(HaveOccurred()) var count int @@ -97,6 +107,30 @@ var _ = Describe("TendRepository", func() { }) }) + Describe("MarkHeaderChecked", func() { + It("creates a row for a new headerID", func() { + err = tendRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT tend_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerID already exists", func() { + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) + + err = tendRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT tend_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + Describe("MissingHeaders", func() { var tendBlockNumber int64 var startingBlockNumber int64 @@ -119,7 +153,7 @@ var _ = Describe("TendRepository", func() { headerIds = append(headerIds, headerId) } - err = tendRepository.Create(headerIds[1], test_data.TendModel) + err = tendRepository.MarkHeaderChecked(headerIds[1]) Expect(err).NotTo(HaveOccurred()) headers, err := tendRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) @@ -129,6 +163,33 @@ var _ = Describe("TendRepository", func() { Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) }) + It("only treats headers as checked if deal have been checked", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + startingBlockNumber := int64(1) + dentBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, dentBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + dentRepository := tend.NewTendRepository(db) + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) VALUES ($1, $2)`, headerIDs[1], true) + Expect(err).NotTo(HaveOccurred()) + + headers, err := dentRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(3)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(dentBlockNumber))) + }) + It("only returns missing headers for the current node", func() { var headerIds []int64 node2 := core.Node{} @@ -145,7 +206,7 @@ var _ = Describe("TendRepository", func() { Expect(err).NotTo(HaveOccurred()) } - err = tendRepository.Create(headerIds[1], test_data.TendModel) + err = tendRepository.MarkHeaderChecked(headerIds[1]) Expect(err).NotTo(HaveOccurred()) node1MissingHeaders, err := tendRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) diff --git a/pkg/transformers/tend/transformer.go b/pkg/transformers/tend/transformer.go index f6dcc9de..67c0c648 100644 --- a/pkg/transformers/tend/transformer.go +++ b/pkg/transformers/tend/transformer.go @@ -64,19 +64,23 @@ func (t TendTransformer) Execute() error { log.Println("Error fetching matching logs:", err) return err } - - for _, ethLog := range ethLogs { - model, err := t.Converter.Convert(config.ContractAddress, config.ContractAbi, ethLog) + if len(ethLogs) < 1 { + err := t.Repository.MarkHeaderChecked(header.Id) if err != nil { - log.Println("Error converting logs:", err) return err } + } - err = t.Repository.Create(header.Id, model) - if err != nil { - log.Println("Error persisting tend record:", err) - return err - } + models, err := t.Converter.Convert(ethLogs) + if err != nil { + log.Println("Error converting logs:", err) + return err + } + + err = t.Repository.Create(header.Id, models) + if err != nil { + log.Println("Error persisting tend record:", err) + return err } } diff --git a/pkg/transformers/tend/transformer_test.go b/pkg/transformers/tend/transformer_test.go index d3fd3570..bd8ca4a9 100644 --- a/pkg/transformers/tend/transformer_test.go +++ b/pkg/transformers/tend/transformer_test.go @@ -87,14 +87,48 @@ var _ = Describe("Tend Transformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) - It("converts an eth log to an Model", func() { + It("marks header checked if no logs returned", func() { + mockConverter := &tend_mocks.MockTendConverter{} + mockRepository := &tend_mocks.MockTendRepository{} + headerID := int64(123) + mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}}) + mockFetcher := &mocks.MockLogFetcher{} + transformer := tend.TendTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &tend_mocks.MockTendConverter{} + mockRepository := &tend_mocks.MockTendRepository{} + mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &mocks.MockLogFetcher{} + transformer := tend.TendTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("converts an eth log to a Model", func() { repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) fetcher.SetFetchedLogs([]types.Log{test_data.TendLogNote}) err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - Expect(converter.ConverterContract).To(Equal(tend.TendConfig.ContractAddress)) - Expect(converter.ConverterAbi).To(Equal(tend.TendConfig.ContractAbi)) Expect(converter.LogsToConvert).To(Equal([]types.Log{test_data.TendLogNote})) }) diff --git a/pkg/transformers/test_data/mocks/tend/converter.go b/pkg/transformers/test_data/mocks/tend/converter.go index 8f0ab2b0..cb36532c 100644 --- a/pkg/transformers/test_data/mocks/tend/converter.go +++ b/pkg/transformers/test_data/mocks/tend/converter.go @@ -22,17 +22,13 @@ import ( ) type MockTendConverter struct { - ConverterContract string - ConverterAbi string - LogsToConvert []types.Log - ConverterError error + LogsToConvert []types.Log + ConverterError error } -func (c *MockTendConverter) Convert(contractAddress string, contractAbi string, ethLog types.Log) (tend.TendModel, error) { - c.ConverterContract = contractAddress - c.ConverterAbi = contractAbi - c.LogsToConvert = append(c.LogsToConvert, ethLog) - return test_data.TendModel, c.ConverterError +func (c *MockTendConverter) Convert(ethLogs []types.Log) ([]tend.TendModel, error) { + c.LogsToConvert = append(c.LogsToConvert, ethLogs...) + return []tend.TendModel{test_data.TendModel}, c.ConverterError } func (c *MockTendConverter) SetConverterError(err error) { diff --git a/pkg/transformers/test_data/mocks/tend/repository.go b/pkg/transformers/test_data/mocks/tend/repository.go index 7c2e691a..6bb7da13 100644 --- a/pkg/transformers/test_data/mocks/tend/repository.go +++ b/pkg/transformers/test_data/mocks/tend/repository.go @@ -15,23 +15,27 @@ package tend import ( + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/transformers/tend" ) type MockTendRepository struct { - createError error - PassedEndingBlockNumber int64 - PassedHeaderID int64 - PassedStartingBlockNumber int64 - PassedTendModel tend.TendModel - missingHeaders []core.Header - missingHeadersErr error + createError error + PassedEndingBlockNumber int64 + PassedHeaderID int64 + PassedStartingBlockNumber int64 + PassedTendModel tend.TendModel + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderId int64 + missingHeaders []core.Header + missingHeadersErr error } -func (repository *MockTendRepository) Create(headerId int64, tend tend.TendModel) error { +func (repository *MockTendRepository) Create(headerId int64, tend []tend.TendModel) error { repository.PassedHeaderID = headerId - repository.PassedTendModel = tend + repository.PassedTendModel = tend[0] return repository.createError } @@ -39,6 +43,10 @@ func (repository *MockTendRepository) SetCreateError(err error) { repository.createError = err } +func (repository *MockTendRepository) SetMarkHeaderCheckedErr(err error) { + repository.markHeaderCheckedErr = err +} + func (repository *MockTendRepository) SetMissingHeadersErr(err error) { repository.missingHeadersErr = err } @@ -47,8 +55,17 @@ func (repository *MockTendRepository) SetMissingHeaders(headers []core.Header) { repository.missingHeaders = headers } +func (repository *MockTendRepository) MarkHeaderChecked(headerId int64) error { + repository.markHeaderCheckedPassedHeaderId = headerId + return repository.markHeaderCheckedErr +} + func (repository *MockTendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber return repository.missingHeaders, repository.missingHeadersErr } + +func (repository *MockTendRepository) AssertMarkHeaderCheckedCalledWith(headerId int64) { + Expect(repository.markHeaderCheckedPassedHeaderId).To(Equal(headerId)) +}