diff --git a/db/migrations/1537471852_add_flip_kick_to_checked_headers.down.sql b/db/migrations/1537471852_add_flip_kick_to_checked_headers.down.sql new file mode 100644 index 00000000..a9949f5f --- /dev/null +++ b/db/migrations/1537471852_add_flip_kick_to_checked_headers.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + DROP COLUMN flip_kick_checked; \ No newline at end of file diff --git a/db/migrations/1537471852_add_flip_kick_to_checked_headers.up.sql b/db/migrations/1537471852_add_flip_kick_to_checked_headers.up.sql new file mode 100644 index 00000000..b422c257 --- /dev/null +++ b/db/migrations/1537471852_add_flip_kick_to_checked_headers.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE public.checked_headers + ADD COLUMN flip_kick_checked BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 95db74be..c255742e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -671,7 +671,8 @@ CREATE TABLE public.checked_headers ( header_id integer NOT NULL, price_feeds_checked boolean DEFAULT false NOT NULL, deal_checked boolean DEFAULT false NOT NULL, - dent_checked boolean DEFAULT false NOT NULL + dent_checked boolean DEFAULT false NOT NULL, + flip_kick_checked boolean DEFAULT false NOT NULL ); diff --git a/pkg/transformers/flip_kick/converter.go b/pkg/transformers/flip_kick/converter.go index 99a7212a..a2aed28b 100644 --- a/pkg/transformers/flip_kick/converter.go +++ b/pkg/transformers/flip_kick/converter.go @@ -28,59 +28,67 @@ import ( ) type Converter interface { - ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*FlipKickEntity, error) - ToModel(flipKick FlipKickEntity) (FlipKickModel, error) + ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) ([]FlipKickEntity, error) + ToModels(flipKicks []FlipKickEntity) ([]FlipKickModel, error) } type FlipKickConverter struct{} -func (FlipKickConverter) ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*FlipKickEntity, error) { - entity := &FlipKickEntity{} - address := common.HexToAddress(contractAddress) - abi, err := geth.ParseAbi(contractAbi) - if err != nil { - return entity, err +func (FlipKickConverter) ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) (results []FlipKickEntity, err error) { + for _, ethLog := range ethLogs { + entity := &FlipKickEntity{} + address := common.HexToAddress(contractAddress) + abi, err := geth.ParseAbi(contractAbi) + if err != nil { + return nil, err + } + + contract := bind.NewBoundContract(address, abi, nil, nil, nil) + + err = contract.UnpackLog(entity, "Kick", ethLog) + if err != nil { + return nil, err + } + entity.Raw = ethLog + entity.TransactionIndex = ethLog.TxIndex + results = append(results, *entity) } - contract := bind.NewBoundContract(address, abi, nil, nil, nil) - - err = contract.UnpackLog(entity, "Kick", ethLog) - if err != nil { - return entity, err - } - entity.Raw = ethLog - entity.TransactionIndex = ethLog.TxIndex - return entity, nil + return results, nil } -func (FlipKickConverter) ToModel(flipKick FlipKickEntity) (FlipKickModel, error) { - if flipKick.Id == nil { - return FlipKickModel{}, errors.New("FlipKick log ID cannot be nil.") - } +func (FlipKickConverter) ToModels(flipKicks []FlipKickEntity) (results []FlipKickModel, err error) { + for _, flipKick := range flipKicks { + if flipKick.Id == nil { + return nil, errors.New("FlipKick log ID cannot be nil.") + } - id := flipKick.Id.String() - lot := shared.ConvertNilToEmptyString(flipKick.Lot.String()) - bid := shared.ConvertNilToEmptyString(flipKick.Bid.String()) - gal := flipKick.Gal.String() - endValue := shared.ConvertNilToZeroTimeValue(flipKick.End) - end := time.Unix(endValue, 0) - urn := common.BytesToAddress(flipKick.Urn[:common.AddressLength]).String() - tab := shared.ConvertNilToEmptyString(flipKick.Tab.String()) - rawLogJson, err := json.Marshal(flipKick.Raw) - if err != nil { - return FlipKickModel{}, err - } - rawLogString := string(rawLogJson) + id := flipKick.Id.String() + lot := shared.ConvertNilToEmptyString(flipKick.Lot.String()) + bid := shared.ConvertNilToEmptyString(flipKick.Bid.String()) + gal := flipKick.Gal.String() + endValue := shared.ConvertNilToZeroTimeValue(flipKick.End) + end := time.Unix(endValue, 0) + urn := common.BytesToAddress(flipKick.Urn[:common.AddressLength]).String() + tab := shared.ConvertNilToEmptyString(flipKick.Tab.String()) + rawLogJson, err := json.Marshal(flipKick.Raw) + if err != nil { + return nil, err + } + rawLogString := string(rawLogJson) - return FlipKickModel{ - BidId: id, - Lot: lot, - Bid: bid, - Gal: gal, - End: end, - Urn: urn, - Tab: tab, - TransactionIndex: flipKick.TransactionIndex, - Raw: rawLogString, - }, nil + model := FlipKickModel{ + BidId: id, + Lot: lot, + Bid: bid, + Gal: gal, + End: end, + Urn: urn, + Tab: tab, + TransactionIndex: flipKick.TransactionIndex, + Raw: rawLogString, + } + results = append(results, model) + } + return results, err } diff --git a/pkg/transformers/flip_kick/converter_test.go b/pkg/transformers/flip_kick/converter_test.go index 25e281dc..f8e7f6c9 100644 --- a/pkg/transformers/flip_kick/converter_test.go +++ b/pkg/transformers/flip_kick/converter_test.go @@ -33,9 +33,11 @@ var _ = Describe("FlipKick Converter", func() { Describe("ToEntity", func() { It("converts an Eth Log to a FlipKickEntity", func() { - entity, err := converter.ToEntity(shared.FlipperContractAddress, shared.FlipperABI, test_data.EthFlipKickLog) + entities, err := converter.ToEntities(shared.FlipperContractAddress, shared.FlipperABI, []types.Log{test_data.EthFlipKickLog}) Expect(err).NotTo(HaveOccurred()) + Expect(len(entities)).To(Equal(1)) + entity := entities[0] Expect(entity.Id).To(Equal(test_data.FlipKickEntity.Id)) Expect(entity.Lot).To(Equal(test_data.FlipKickEntity.Lot)) Expect(entity.Bid).To(Equal(test_data.FlipKickEntity.Bid)) @@ -47,7 +49,7 @@ var _ = Describe("FlipKick Converter", func() { }) It("returns an error if converting log to entity fails", func() { - _, err := converter.ToEntity(shared.FlipperContractAddress, "error abi", test_data.EthFlipKickLog) + _, err := converter.ToEntities(shared.FlipperContractAddress, "error abi", []types.Log{test_data.EthFlipKickLog}) Expect(err).To(HaveOccurred()) }) @@ -71,16 +73,19 @@ var _ = Describe("FlipKick Converter", func() { }) It("converts an Entity to a Model", func() { - model, err := converter.ToModel(test_data.FlipKickEntity) + models, err := converter.ToModels([]flip_kick.FlipKickEntity{test_data.FlipKickEntity}) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.FlipKickModel)) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.FlipKickModel)) }) It("handles nil values", func() { - model, err := converter.ToModel(emptyEntity) + models, err := converter.ToModels([]flip_kick.FlipKickEntity{emptyEntity}) Expect(err).NotTo(HaveOccurred()) + Expect(len(models)).To(Equal(1)) + model := models[0] Expect(model.BidId).To(Equal("1")) Expect(model.Lot).To(Equal(emptyString)) Expect(model.Bid).To(Equal(emptyString)) @@ -93,10 +98,9 @@ var _ = Describe("FlipKick Converter", func() { It("returns an error if the flip kick event id is nil", func() { emptyEntity.Id = nil - entity, err := converter.ToModel(emptyEntity) + _, err := converter.ToModels([]flip_kick.FlipKickEntity{emptyEntity}) Expect(err).To(HaveOccurred()) - Expect(entity).To(Equal(flip_kick.FlipKickModel{})) }) }) }) diff --git a/pkg/transformers/flip_kick/repository.go b/pkg/transformers/flip_kick/repository.go index 0af06361..c43e0dca 100644 --- a/pkg/transformers/flip_kick/repository.go +++ b/pkg/transformers/flip_kick/repository.go @@ -22,7 +22,8 @@ import ( ) type Repository interface { - Create(headerId int64, flipKick FlipKickModel) error + Create(headerId int64, flipKicks []FlipKickModel) error + MarkHeaderChecked(headerId int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -33,12 +34,38 @@ type FlipKickRepository struct { func NewFlipKickRepository(db *postgres.DB) FlipKickRepository { return FlipKickRepository{DB: db} } -func (fkr FlipKickRepository) Create(headerId int64, flipKick FlipKickModel) error { - _, err := fkr.DB.Exec( - `INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, raw_log) +func (fkr FlipKickRepository) Create(headerId int64, flipKicks []FlipKickModel) error { + tx, err := fkr.DB.Begin() + if err != nil { + return err + } + for _, flipKick := range flipKicks { + _, err := tx.Exec( + `INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, raw_log) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, - headerId, flipKick.BidId, flipKick.Lot, flipKick.Bid, flipKick.Gal, flipKick.End, flipKick.Urn, flipKick.Tab, flipKick.TransactionIndex, flipKick.Raw, - ) + headerId, flipKick.BidId, flipKick.Lot, flipKick.Bid, flipKick.Gal, flipKick.End, flipKick.Urn, flipKick.Tab, flipKick.TransactionIndex, flipKick.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, flip_kick_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET flip_kick_checked = $2`, headerId, true) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} + +func (fkr FlipKickRepository) MarkHeaderChecked(headerId int64) error { + _, err := fkr.DB.Exec(`INSERT INTO public.checked_headers (header_id, flip_kick_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET flip_kick_checked = $2`, headerId, true) return err } @@ -47,8 +74,8 @@ func (fkr FlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNum err := fkr.DB.Select( &result, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.flip_kick on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR flip_kick_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, diff --git a/pkg/transformers/flip_kick/repository_test.go b/pkg/transformers/flip_kick/repository_test.go index a7c1624b..009e8477 100644 --- a/pkg/transformers/flip_kick/repository_test.go +++ b/pkg/transformers/flip_kick/repository_test.go @@ -54,8 +54,8 @@ var _ = Describe("FlipKick Repository", func() { Expect(err).NotTo(HaveOccurred()) }) - It("persists a flip_kick record", func() { - err := flipKickRepository.Create(headerId, flipKick) + It("persists flip_kick records", func() { + err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick}) Expect(err).NotTo(HaveOccurred()) assertDBRecordCount(db, "maker.flip_kick", 1) @@ -75,17 +75,27 @@ var _ = Describe("FlipKick Repository", func() { Expect(dbResult.Raw).To(MatchJSON(flipKick.Raw)) }) - It("returns an error if inserting the flip_kick record fails", func() { - err := flipKickRepository.Create(headerId, test_data.FlipKickModel) + It("marks header checked", func() { + err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick}) Expect(err).NotTo(HaveOccurred()) - err = flipKickRepository.Create(headerId, test_data.FlipKickModel) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("returns an error if inserting the flip_kick record fails", func() { + err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick}) + Expect(err).NotTo(HaveOccurred()) + + err = flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) }) It("deletes the flip_kick records if its corresponding header record is deleted", func() { - err := flipKickRepository.Create(headerId, test_data.FlipKickModel) + err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick}) Expect(err).NotTo(HaveOccurred()) assertDBRecordCount(db, "maker.flip_kick", 1) assertDBRecordCount(db, "headers", 1) @@ -98,6 +108,31 @@ var _ = Describe("FlipKick Repository", func() { }) }) + Describe("MarkHeaderChecked", func() { + It("creates a row for a new headerID", func() { + err := flipKickRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerID already exists", func() { + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) + Expect(err).NotTo(HaveOccurred()) + + err = flipKickRepository.MarkHeaderChecked(headerId) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + Describe("When there are multiple nodes", func() { var db2 *postgres.DB var flipKickRepository2 flip_kick.FlipKickRepository @@ -130,11 +165,10 @@ var _ = Describe("FlipKick Repository", func() { }) Describe("MissingHeaders", func() { - It("returns headers for which there isn't an associated flip_kick record", func() { + It("returns headers that haven't been marked as checked", func() { startingBlock := blockNumber - 3 endingBlock := blockNumber + 3 - - err := flipKickRepository.Create(headerId, test_data.FlipKickModel) + err := flipKickRepository.MarkHeaderChecked(headerId) Expect(err).NotTo(HaveOccurred()) newBlockNumber := blockNumber + 3 diff --git a/pkg/transformers/flip_kick/transformer.go b/pkg/transformers/flip_kick/transformer.go index 58a52fd3..5d10fa3d 100644 --- a/pkg/transformers/flip_kick/transformer.go +++ b/pkg/transformers/flip_kick/transformer.go @@ -95,21 +95,25 @@ func (fkt FlipKickTransformer) Execute() error { if err != nil { resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, FetcherError)) } + if len(ethLogs) < 1 { + err := fkt.Repository.MarkHeaderChecked(header.Id) + if err != nil { + return err + } + } - for _, ethLog := range ethLogs { - entity, err := fkt.Converter.ToEntity(config.ContractAddress, config.ContractAbi, ethLog) - if err != nil { - resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, LogToEntityError)) - } - model, err := fkt.Converter.ToModel(*entity) - if err != nil { - resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, EntityToModelError)) - } + entities, err := fkt.Converter.ToEntities(config.ContractAddress, config.ContractAbi, ethLogs) + if err != nil { + resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, LogToEntityError)) + } + models, err := fkt.Converter.ToModels(entities) + if err != nil { + resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, EntityToModelError)) + } - err = fkt.Repository.Create(header.Id, model) - if err != nil { - resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, RepositoryError)) - } + err = fkt.Repository.Create(header.Id, models) + if err != nil { + resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, RepositoryError)) } } diff --git a/pkg/transformers/flip_kick/transformer_test.go b/pkg/transformers/flip_kick/transformer_test.go index 7e4350df..b9fbac90 100644 --- a/pkg/transformers/flip_kick/transformer_test.go +++ b/pkg/transformers/flip_kick/transformer_test.go @@ -96,6 +96,42 @@ var _ = Describe("FlipKick Transformer", func() { Expect(err.Error()).To(ContainSubstring("error(s) transforming FlipKick event logs")) }) + It("marks header checked if no logs returned", func() { + mockConverter := &flip_kick_mocks.MockFlipKickConverter{} + mockRepository := &flip_kick_mocks.MockFlipKickRepository{} + headerID := int64(123) + mockRepository.SetHeadersToReturn([]core.Header{{Id: headerID}}) + mockFetcher := &mocks.MockLogFetcher{} + transformer := flip_kick.FlipKickTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &flip_kick_mocks.MockFlipKickConverter{} + mockRepository := &flip_kick_mocks.MockFlipKickRepository{} + mockRepository.SetHeadersToReturn([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &mocks.MockLogFetcher{} + transformer := flip_kick.FlipKickTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + It("converts the logs", func() { err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/transformers/test_data/mocks/flip_kick/converter.go b/pkg/transformers/test_data/mocks/flip_kick/converter.go index 0069ea54..0f0c2225 100644 --- a/pkg/transformers/test_data/mocks/flip_kick/converter.go +++ b/pkg/transformers/test_data/mocks/flip_kick/converter.go @@ -29,17 +29,18 @@ type MockFlipKickConverter struct { ConverterError error } -func (mfkc *MockFlipKickConverter) ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*flip_kick.FlipKickEntity, error) { +func (mfkc *MockFlipKickConverter) ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) ([]flip_kick.FlipKickEntity, error) { mfkc.ConverterContract = contractAddress mfkc.ConverterAbi = contractAbi - mfkc.LogsToConvert = append(mfkc.LogsToConvert, ethLog) - return &test_data.FlipKickEntity, mfkc.ConverterError + mfkc.LogsToConvert = append(mfkc.LogsToConvert, ethLogs...) + return []flip_kick.FlipKickEntity{test_data.FlipKickEntity}, mfkc.ConverterError } -func (mfkc *MockFlipKickConverter) ToModel(flipKickEntity flip_kick.FlipKickEntity) (flip_kick.FlipKickModel, error) { - mfkc.EntitiesToConvert = append(mfkc.EntitiesToConvert, flipKickEntity) - return test_data.FlipKickModel, nil +func (mfkc *MockFlipKickConverter) ToModels(flipKickEntities []flip_kick.FlipKickEntity) ([]flip_kick.FlipKickModel, error) { + mfkc.EntitiesToConvert = append(mfkc.EntitiesToConvert, flipKickEntities...) + return []flip_kick.FlipKickModel{test_data.FlipKickModel}, nil } + func (mfkc *MockFlipKickConverter) SetConverterError(err error) { mfkc.ConverterError = err } diff --git a/pkg/transformers/test_data/mocks/flip_kick/repository.go b/pkg/transformers/test_data/mocks/flip_kick/repository.go index f54f1aac..633c487f 100644 --- a/pkg/transformers/test_data/mocks/flip_kick/repository.go +++ b/pkg/transformers/test_data/mocks/flip_kick/repository.go @@ -15,27 +15,36 @@ package flip_kick import ( + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/transformers/flip_kick" ) type MockFlipKickRepository struct { - HeaderIds []int64 - HeadersToReturn []core.Header - StartingBlockNumber int64 - EndingBlockNumber int64 - FlipKicksCreated []flip_kick.FlipKickModel - CreateRecordError error - MissingHeadersError error + CreateRecordError error + EndingBlockNumber int64 + FlipKicksCreated []flip_kick.FlipKickModel + HeaderIds []int64 + HeadersToReturn []core.Header + MissingHeadersError error + StartingBlockNumber int64 + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderId int64 } -func (mfkr *MockFlipKickRepository) Create(headerId int64, flipKick flip_kick.FlipKickModel) error { +func (mfkr *MockFlipKickRepository) Create(headerId int64, flipKick []flip_kick.FlipKickModel) error { mfkr.HeaderIds = append(mfkr.HeaderIds, headerId) - mfkr.FlipKicksCreated = append(mfkr.FlipKicksCreated, flipKick) + mfkr.FlipKicksCreated = append(mfkr.FlipKicksCreated, flipKick...) return mfkr.CreateRecordError } +func (mfkr *MockFlipKickRepository) MarkHeaderChecked(headerId int64) error { + mfkr.markHeaderCheckedPassedHeaderId = headerId + return mfkr.markHeaderCheckedErr +} + func (mfkr *MockFlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { mfkr.StartingBlockNumber = startingBlockNumber mfkr.EndingBlockNumber = endingBlockNumber @@ -50,6 +59,15 @@ func (mfkr *MockFlipKickRepository) SetHeadersToReturn(headers []core.Header) { func (mfkr *MockFlipKickRepository) SetCreateRecordError(err error) { mfkr.CreateRecordError = err } + +func (mfkr *MockFlipKickRepository) SetMarkHeaderCheckedErr(err error) { + mfkr.markHeaderCheckedErr = err +} + func (mfkr *MockFlipKickRepository) SetMissingHeadersError(err error) { mfkr.MissingHeadersError = err } + +func (mfkr *MockFlipKickRepository) AssertMarkHeaderCheckedCalledWith(headerId int64) { + Expect(mfkr.markHeaderCheckedPassedHeaderId).To(Equal(headerId)) +}