diff --git a/pkg/transformers/shared/constants/checked_headers.go b/pkg/transformers/shared/constants/checked_headers.go index 91ddd6fb..39b07214 100644 --- a/pkg/transformers/shared/constants/checked_headers.go +++ b/pkg/transformers/shared/constants/checked_headers.go @@ -16,8 +16,14 @@ var ( PitFileStabilityFeeChecked = "pit_file_stability_fee_checked" PriceFeedsChecked = "price_feeds_checked" TendChecked = "tend_checked" - VatFluxChecked = "vat_flux_checked" + VatFluxChecked = "vat_flux_checked" VatFoldChecked = "vat_fold_checked" VatGrabChecked = "vat_grab_checked" VatHealChecked = "vat_heal_checked" + VatInitChecked = "vat_init_checked" + VatMoveChecked = "vat_move_checked" + VatSlipChecked = "vat_slip_checked" + VatTollChecked = "vat_toll_checked" + VatTuneChecked = "vat_tune_checked" + VowFlogChecked = "vow_flog_checked" ) diff --git a/pkg/transformers/test_data/vow_flog.go b/pkg/transformers/test_data/vow_flog.go index 76cf1880..8fee35a4 100644 --- a/pkg/transformers/test_data/vow_flog.go +++ b/pkg/transformers/test_data/vow_flog.go @@ -23,7 +23,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/transformers/vow_flog" ) -var EthFlogLog = types.Log{ +var EthVowFlogLog = types.Log{ Address: common.HexToAddress(constants.VowContractAddress), Topics: []common.Hash{ common.HexToHash("0x35aee16f00000000000000000000000000000000000000000000000000000000"), @@ -40,10 +40,10 @@ var EthFlogLog = types.Log{ Removed: false, } -var rawFlogLog, _ = json.Marshal(EthFlogLog) -var FlogModel = vow_flog.VowFlogModel{ +var rawVowFlogLog, _ = json.Marshal(EthVowFlogLog) +var VowFlogModel = vow_flog.VowFlogModel{ Era: "1337", - LogIndex: EthFlogLog.Index, - TransactionIndex: EthFlogLog.TxIndex, - Raw: rawFlogLog, + LogIndex: EthVowFlogLog.Index, + TransactionIndex: EthVowFlogLog.TxIndex, + Raw: rawVowFlogLog, } diff --git a/pkg/transformers/vat_init/repository.go b/pkg/transformers/vat_init/repository.go index 3d405929..cee9e44a 100644 --- a/pkg/transformers/vat_init/repository.go +++ b/pkg/transformers/vat_init/repository.go @@ -18,7 +18,8 @@ import ( "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "log" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" ) type VatInitRepository struct { @@ -38,7 +39,6 @@ func (repository VatInitRepository) Create(headerID int64, models []interface{}) return fmt.Errorf("model of type %T, not %T", model, VatInitModel{}) } - log.Printf("VatInit model: %v", vatInit) _, err = tx.Exec( `INSERT INTO maker.vat_init (header_id, ilk, log_idx, tx_idx, raw_log) VALUES($1, $2, $3, $4, $5)`, @@ -46,15 +46,11 @@ func (repository VatInitRepository) Create(headerID int64, models []interface{}) ) if err != nil { tx.Rollback() - log.Printf("Error: %v \n", err) return err } } - _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vat_init_checked) - VALUES($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_init_checked = $2`, headerID, true) + err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked) if err != nil { tx.Rollback() return err @@ -64,28 +60,11 @@ func (repository VatInitRepository) Create(headerID int64, models []interface{}) } func (repository VatInitRepository) MarkHeaderChecked(headerID int64) error { - _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_init_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_init_checked = $2`, headerID, true) - return err + return shared.MarkHeaderChecked(headerID, repository.db, constants.VatInitChecked) } func (repository VatInitRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - var result []core.Header - err := repository.db.Select( - &result, - `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vat_init_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlockNumber, - endingBlockNumber, - repository.db.Node.ID, - ) - return result, err + return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatInitChecked) } func (repository *VatInitRepository) SetDB(db *postgres.DB) { diff --git a/pkg/transformers/vat_init/repository_test.go b/pkg/transformers/vat_init/repository_test.go index d732d43c..93ca26fe 100644 --- a/pkg/transformers/vat_init/repository_test.go +++ b/pkg/transformers/vat_init/repository_test.go @@ -15,13 +15,11 @@ package vat_init_test import ( - "database/sql" - "math/rand" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" @@ -32,30 +30,36 @@ import ( var _ = Describe("Vat init repository", func() { var ( - db *postgres.DB - vatInitRepository vat_init.VatInitRepository - headerRepository repositories.HeaderRepository - err error + db *postgres.DB + repository vat_init.VatInitRepository ) BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - vatInitRepository = vat_init.VatInitRepository{} - vatInitRepository.SetDB(db) - headerRepository = repositories.NewHeaderRepository(db) + repository = vat_init.VatInitRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 + modelWithDifferentLogIdx := test_data.VatInitModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VatInitChecked, + LogEventTableName: "maker.vat_init", + TestModel: test_data.VatInitModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) + + It("persists vat init records", func() { + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) - }) - It("adds a vat event", func() { - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) + err = repository.Create(headerID, []interface{}{test_data.VatInitModel}) Expect(err).NotTo(HaveOccurred()) var dbVatInit vat_init.VatInitModel @@ -66,157 +70,23 @@ var _ = Describe("Vat init repository", func() { Expect(dbVatInit.TransactionIndex).To(Equal(test_data.VatInitModel.TransactionIndex)) Expect(dbVatInit.Raw).To(MatchJSON(test_data.VatInitModel.Raw)) }) - - It("does not duplicate vat events", func() { - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) - Expect(err).NotTo(HaveOccurred()) - - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) - - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("removes vat if corresponding header is deleted", func() { - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) - Expect(err).NotTo(HaveOccurred()) - - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbVatInit vat_init.VatInitModel - err = db.Get(&dbVatInit, `SELECT ilk, log_idx, tx_idx, raw_log FROM maker.vat_init WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("marks the header as checked for vat init logs", func() { - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_init_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - - err = vatInitRepository.Create(headerID, []interface{}{test_data.VatInitModel}) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_init_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("returns an error if model is of wrong type", func() { - err = vatInitRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) - }) - }) - - Describe("MarkHeaderChecked", func() { - var headerID int64 - - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a row for a new headerID", func() { - err = vatInitRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_init_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerID already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - - err = vatInitRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_init_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) }) Describe("MissingHeaders", func() { - var ( - startingBlock, vatInitBlock, endingBlock int64 - blockNumbers, headerIDs []int64 - ) + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vat_init.VatInitRepository{}, + } - BeforeEach(func() { - startingBlock = rand.Int63() - vatInitBlock = startingBlock + 1 - endingBlock = startingBlock + 2 + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) + }) - blockNumbers = []int64{startingBlock, vatInitBlock, endingBlock, endingBlock + 1} + Describe("MarkHeaderChecked", func() { + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VatInitChecked, + Repository: &repository, + } - headerIDs = []int64{} - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - } - }) - - It("returns headers that haven't been checked", func() { - err := vatInitRepository.MarkHeaderChecked(headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatInitRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - }) - - It("only treats headers as checked if vat init logs have been checked", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatInitRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatInitBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatInitBlock))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatInitBlock))) - }) - - It("only returns headers associated with the current node", func() { - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - for _, n := range blockNumbers { - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - - vatInitRepositoryTwo := vat_init.VatInitRepository{} - vatInitRepositoryTwo.SetDB(dbTwo) - err = vatInitRepository.MarkHeaderChecked(headerIDs[0]) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := vatInitRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := vatInitRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) }) diff --git a/pkg/transformers/vat_move/repository.go b/pkg/transformers/vat_move/repository.go index d822ad78..f47db4d6 100644 --- a/pkg/transformers/vat_move/repository.go +++ b/pkg/transformers/vat_move/repository.go @@ -18,6 +18,8 @@ import ( "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" ) type VatMoveRepository struct { @@ -49,13 +51,7 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{}) } } - _, err = tx.Exec( - `INSERT INTO public.checked_headers (header_id, vat_move_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_move_checked = $2`, - headerID, true) - + err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked) if err != nil { tx.Rollback() return err @@ -65,29 +61,11 @@ func (repository VatMoveRepository) Create(headerID int64, models []interface{}) } func (repository VatMoveRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - var result []core.Header - err := repository.db.Select( - &result, - `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vat_move_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlockNumber, - endingBlockNumber, - repository.db.Node.ID, - ) - - return result, err + return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatMoveChecked) } func (repository VatMoveRepository) MarkHeaderChecked(headerID int64) error { - _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_move_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_move_checked = $2`, headerID, true) - return err + return shared.MarkHeaderChecked(headerID, repository.db, constants.VatMoveChecked) } func (repository *VatMoveRepository) SetDB(db *postgres.DB) { diff --git a/pkg/transformers/vat_move/repository_test.go b/pkg/transformers/vat_move/repository_test.go index d8318d2f..b4eb0bcc 100644 --- a/pkg/transformers/vat_move/repository_test.go +++ b/pkg/transformers/vat_move/repository_test.go @@ -15,44 +15,48 @@ package vat_move_test import ( - "database/sql" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "math/rand" - - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_move" "github.com/vulcanize/vulcanizedb/test_config" ) var _ = Describe("Vat Move", func() { var db *postgres.DB - var headerRepository repositories.HeaderRepository - var vatMoveRepository vat_move.VatMoveRepository + var repository vat_move.VatMoveRepository BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - headerRepository = repositories.NewHeaderRepository(db) - vatMoveRepository = vat_move.VatMoveRepository{} - vatMoveRepository.SetDB(db) + repository = vat_move.VatMoveRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 - var err error + modelWithDifferentLogIdx := test_data.VatMoveModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VatMoveChecked, + LogEventTableName: "maker.vat_move", + TestModel: test_data.VatMoveModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) + + It("persists vat move records", func() { + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) - }) - It("adds a vat move event", func() { - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) + err = repository.Create(headerID, []interface{}{test_data.VatMoveModel}) Expect(err).NotTo(HaveOccurred()) var dbVatMove vat_move.VatMoveModel @@ -65,162 +69,23 @@ var _ = Describe("Vat Move", func() { Expect(dbVatMove.TransactionIndex).To(Equal(test_data.VatMoveModel.TransactionIndex)) Expect(dbVatMove.Raw).To(MatchJSON(test_data.VatMoveModel.Raw)) }) - - It("marks header id as checked for vat move logs", func() { - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_move_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_move_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("returns an error if insertion fails", func() { - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) - Expect(err).NotTo(HaveOccurred()) - - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) - - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("removes vat move event if corresponding header is deleted", func() { - err = vatMoveRepository.Create(headerID, []interface{}{test_data.VatMoveModel}) - Expect(err).NotTo(HaveOccurred()) - - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbVatMove vat_move.VatMoveModel - err = db.Get(&dbVatMove, `SELECT src, dst, rad, log_idx, tx_idx, raw_log FROM maker.vat_move WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("returns an error if model is of wrong type", func() { - err = vatMoveRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) - }) }) Describe("MissingHeaders", func() { - var eventBlockNumber = rand.Int63() - var startingBlockNumber = eventBlockNumber - 1 - var endingBlockNumber = eventBlockNumber + 1 - var outOfRangeBlockNumber = eventBlockNumber + 2 + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vat_move.VatMoveRepository{}, + } - It("returns headers haven't been checked", func() { - var headerIds []int64 - - for _, number := range []int64{startingBlockNumber, eventBlockNumber, endingBlockNumber, outOfRangeBlockNumber} { - headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(number)) - Expect(err).NotTo(HaveOccurred()) - headerIds = append(headerIds, headerId) - } - - err := vatMoveRepository.MarkHeaderChecked(headerIds[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatMoveRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - }) - - It("only treats headers as checked if vat_move has been checked", func() { - var headerIds []int64 - for _, number := range []int64{startingBlockNumber, eventBlockNumber, endingBlockNumber, outOfRangeBlockNumber} { - headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(number)) - Expect(err).NotTo(HaveOccurred()) - headerIds = append(headerIds, headerId) - } - - // Just creates row, doesn't set this header as checked - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIds[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatMoveRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventBlockNumber))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventBlockNumber))) - }) - - It("only returns headers associated with the current node", func() { - blockNumbers := []int64{1, 2, 3} - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - var headerIDs []int64 - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - vatMoveRepositoryTwo := vat_move.VatMoveRepository{} - vatMoveRepositoryTwo.SetDB(dbTwo) - err := vatMoveRepository.Create(headerIDs[0], []interface{}{test_data.VatMoveModel}) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := vatMoveRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := vatMoveRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) }) Describe("MarkHeaderChecked", func() { - var headerID int64 - var err error + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VatMoveChecked, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a new row for a new headerID", func() { - err = vatMoveRepository.MarkHeaderChecked(headerID) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_move_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerId already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - err = vatMoveRepository.MarkHeaderChecked(headerID) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_move_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) }) diff --git a/pkg/transformers/vat_slip/repository.go b/pkg/transformers/vat_slip/repository.go index beedf28c..1c6b8d06 100644 --- a/pkg/transformers/vat_slip/repository.go +++ b/pkg/transformers/vat_slip/repository.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" ) type VatSlipRepository struct { @@ -24,7 +26,7 @@ func (repository VatSlipRepository) Create(headerID int64, models []interface{}) _, err = tx.Exec( `INSERT into maker.vat_slip (header_id, ilk, guy, rad, tx_idx, log_idx, raw_log) - VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, + VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, headerID, vatSlip.Ilk, vatSlip.Guy, vatSlip.Rad, vatSlip.TransactionIndex, vatSlip.LogIndex, vatSlip.Raw, ) if err != nil { @@ -32,10 +34,8 @@ func (repository VatSlipRepository) Create(headerID int64, models []interface{}) return err } } - _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vat_slip_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_slip_checked = $2`, headerID, true) + + err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked) if err != nil { tx.Rollback() return err @@ -45,28 +45,11 @@ func (repository VatSlipRepository) Create(headerID int64, models []interface{}) } func (repository VatSlipRepository) MarkHeaderChecked(headerID int64) error { - _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_slip_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_slip_checked = $2`, headerID, true) - return err + return shared.MarkHeaderChecked(headerID, repository.db, constants.VatSlipChecked) } func (repository VatSlipRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - var result []core.Header - err := repository.db.Select( - &result, - `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vat_slip_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlockNumber, - endingBlockNumber, - repository.db.Node.ID, - ) - return result, err + return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatSlipChecked) } func (repository *VatSlipRepository) SetDB(db *postgres.DB) { diff --git a/pkg/transformers/vat_slip/repository_test.go b/pkg/transformers/vat_slip/repository_test.go index 5ed48c68..d3f1e64e 100644 --- a/pkg/transformers/vat_slip/repository_test.go +++ b/pkg/transformers/vat_slip/repository_test.go @@ -1,14 +1,11 @@ package vat_slip_test import ( - "database/sql" - "math/rand" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" @@ -19,30 +16,36 @@ import ( var _ = Describe("Vat slip repository", func() { var ( - db *postgres.DB - vatSlipRepository vat_slip.VatSlipRepository - err error - headerRepository datastore.HeaderRepository + db *postgres.DB + repository vat_slip.VatSlipRepository ) BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - headerRepository = repositories.NewHeaderRepository(db) - vatSlipRepository = vat_slip.VatSlipRepository{} - vatSlipRepository.SetDB(db) + repository = vat_slip.VatSlipRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 + modelWithDifferentLogIdx := test_data.VatSlipModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VatSlipChecked, + LogEventTableName: "maker.vat_slip", + TestModel: test_data.VatSlipModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) It("adds a vat slip event", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(err).NotTo(HaveOccurred()) + + err = repository.Create(headerID, []interface{}{test_data.VatSlipModel}) Expect(err).NotTo(HaveOccurred()) var dbVatSlip vat_slip.VatSlipModel @@ -55,166 +58,23 @@ var _ = Describe("Vat slip repository", func() { Expect(dbVatSlip.LogIndex).To(Equal(test_data.VatSlipModel.LogIndex)) Expect(dbVatSlip.Raw).To(MatchJSON(test_data.VatSlipModel.Raw)) }) - - It("marks header as checked for logs", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_slip_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_slip_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("does not duplicate vat slip events", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - Expect(err).NotTo(HaveOccurred()) - - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("allows for multiple vat slip events in one transaction if they have different log indexes", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - Expect(err).NotTo(HaveOccurred()) - - newVatSlip := test_data.VatSlipModel - newVatSlip.LogIndex = newVatSlip.LogIndex + 1 - err := vatSlipRepository.Create(headerID, []interface{}{newVatSlip}) - - Expect(err).NotTo(HaveOccurred()) - }) - - It("removes vat slip if corresponding header is deleted", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.VatSlipModel}) - Expect(err).NotTo(HaveOccurred()) - - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbVatSlip vat_slip.VatSlipModel - err = db.Get(&dbVatSlip, `SELECT ilk, guy, rad, tx_idx, raw_log FROM maker.vat_slip WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("returns an error if model is of wrong type", func() { - err = vatSlipRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) - }) }) Describe("MarkHeaderChecked", func() { - var headerID int64 + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VatSlipChecked, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a row for a new headerID", func() { - err = vatSlipRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_slip_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerID already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - - err = vatSlipRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_slip_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) Describe("MissingHeaders", func() { - var ( - startingBlock, endingBlock, vatSlipBlock int64 - blockNumbers, headerIDs []int64 - ) + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vat_slip.VatSlipRepository{}, + } - BeforeEach(func() { - startingBlock = rand.Int63() - vatSlipBlock = startingBlock + 1 - endingBlock = startingBlock + 2 - - blockNumbers = []int64{startingBlock, vatSlipBlock, endingBlock, endingBlock + 1} - - headerIDs = []int64{} - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - } - }) - - It("returns headers that haven't been checked", func() { - err := vatSlipRepository.MarkHeaderChecked(headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatSlipRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - }) - - It("only treats headers as checked if vat slip logs have been checked", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatSlipRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatSlipBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatSlipBlock))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatSlipBlock))) - }) - - It("only returns headers associated with the current node", func() { - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - for _, n := range blockNumbers { - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - vatSlipRepositoryTwo := vat_slip.VatSlipRepository{} - vatSlipRepositoryTwo.SetDB(dbTwo) - err := vatSlipRepository.MarkHeaderChecked(headerIDs[0]) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := vatSlipRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := vatSlipRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) }) }) diff --git a/pkg/transformers/vat_toll/repository.go b/pkg/transformers/vat_toll/repository.go index 722b99c0..d115c742 100644 --- a/pkg/transformers/vat_toll/repository.go +++ b/pkg/transformers/vat_toll/repository.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" ) type VatTollRepository struct { @@ -23,7 +25,7 @@ func (repository VatTollRepository) Create(headerID int64, models []interface{}) } _, err = tx.Exec( `INSERT into maker.vat_toll (header_id, ilk, urn, take, tx_idx, log_idx, raw_log) - VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, + VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, headerID, vatToll.Ilk, vatToll.Urn, vatToll.Take, vatToll.TransactionIndex, vatToll.LogIndex, vatToll.Raw, ) if err != nil { @@ -31,10 +33,8 @@ func (repository VatTollRepository) Create(headerID int64, models []interface{}) return err } } - _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vat_toll_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_toll_checked = $2`, headerID, true) + + err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked) if err != nil { tx.Rollback() return err @@ -43,28 +43,11 @@ func (repository VatTollRepository) Create(headerID int64, models []interface{}) } func (repository VatTollRepository) MarkHeaderChecked(headerID int64) error { - _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_toll_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vat_toll_checked = $2`, headerID, true) - return err + return shared.MarkHeaderChecked(headerID, repository.db, constants.VatTollChecked) } func (repository VatTollRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - var result []core.Header - err := repository.db.Select( - &result, - `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vat_toll_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlockNumber, - endingBlockNumber, - repository.db.Node.ID, - ) - return result, err + return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VatTollChecked) } func (repository *VatTollRepository) SetDB(db *postgres.DB) { diff --git a/pkg/transformers/vat_toll/repository_test.go b/pkg/transformers/vat_toll/repository_test.go index 492428c6..6b3d21d4 100644 --- a/pkg/transformers/vat_toll/repository_test.go +++ b/pkg/transformers/vat_toll/repository_test.go @@ -1,48 +1,51 @@ package vat_toll_test import ( - "database/sql" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_toll" "github.com/vulcanize/vulcanizedb/test_config" - "math/rand" ) var _ = Describe("Vat toll repository", func() { var ( - db *postgres.DB - vatTollRepository vat_toll.VatTollRepository - err error - headerRepository datastore.HeaderRepository + db *postgres.DB + repository vat_toll.VatTollRepository ) BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - headerRepository = repositories.NewHeaderRepository(db) - vatTollRepository = vat_toll.VatTollRepository{} - vatTollRepository.SetDB(db) + repository = vat_toll.VatTollRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 + modelWithDifferentLogIdx := test_data.VatTollModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VatTollChecked, + LogEventTableName: "maker.vat_toll", + TestModel: test_data.VatTollModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) It("adds a vat toll event", func() { - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(err).NotTo(HaveOccurred()) + err = repository.Create(headerID, []interface{}{test_data.VatTollModel}) Expect(err).NotTo(HaveOccurred()) var dbVatToll vat_toll.VatTollModel @@ -55,163 +58,23 @@ var _ = Describe("Vat toll repository", func() { Expect(dbVatToll.LogIndex).To(Equal(test_data.VatTollModel.LogIndex)) Expect(dbVatToll.Raw).To(MatchJSON(test_data.VatTollModel.Raw)) }) - - It("marks header as checked for logs", func() { - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_toll_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_toll_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("does not duplicate vat toll events", func() { - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) - Expect(err).NotTo(HaveOccurred()) - - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("allows for multiple vat toll events in one transaction if they have different log indexes", func() { - err = vatTollRepository.Create(headerID, []interface{}{test_data.VatTollModel}) - Expect(err).NotTo(HaveOccurred()) - - newVatToll := test_data.VatTollModel - newVatToll.LogIndex = newVatToll.LogIndex + 1 - err := vatTollRepository.Create(headerID, []interface{}{newVatToll}) - - Expect(err).NotTo(HaveOccurred()) - }) - - It("removes vat toll if corresponding header is deleted", func() { - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbVatToll vat_toll.VatTollModel - err = db.Get(&dbVatToll, `SELECT ilk, urn, take, tx_idx, raw_log FROM maker.vat_toll WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("returns an error if model is of wrong type", func() { - err = vatTollRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) - }) }) Describe("MarkHeaderChecked", func() { - var headerID int64 + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VatTollChecked, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a row for a new headerID", func() { - err = vatTollRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_toll_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerID already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - - err = vatTollRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_toll_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) Describe("MissingHeaders", func() { - var ( - startingBlock, endingBlock, vatTollBlock int64 - blockNumbers, headerIDs []int64 - ) + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vat_toll.VatTollRepository{}, + } - BeforeEach(func() { - startingBlock = rand.Int63() - vatTollBlock = startingBlock + 1 - endingBlock = startingBlock + 2 - - blockNumbers = []int64{startingBlock, vatTollBlock, endingBlock, endingBlock + 1} - - headerIDs = []int64{} - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - } - }) - - It("returns headers that haven't been checked", func() { - err := vatTollRepository.MarkHeaderChecked(headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatTollRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock))) - }) - - It("only treats headers as checked if vat toll logs have been checked", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatTollRepository.MissingHeaders(startingBlock, endingBlock) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatTollBlock))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatTollBlock))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlock), Equal(endingBlock), Equal(vatTollBlock))) - }) - - It("only returns headers associated with the current node", func() { - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - for _, n := range blockNumbers { - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - vatTollRepositoryTwo := vat_toll.VatTollRepository{} - vatTollRepositoryTwo.SetDB(dbTwo) - err := vatTollRepository.MarkHeaderChecked(headerIDs[0]) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := vatTollRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := vatTollRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) }) }) diff --git a/pkg/transformers/vat_tune/repository_test.go b/pkg/transformers/vat_tune/repository_test.go index 06bcedcc..5214f9e5 100644 --- a/pkg/transformers/vat_tune/repository_test.go +++ b/pkg/transformers/vat_tune/repository_test.go @@ -1,48 +1,51 @@ package vat_tune_test import ( - "database/sql" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" "github.com/vulcanize/vulcanizedb/test_config" - "math/rand" ) var _ = Describe("Vat tune repository", func() { var ( - db *postgres.DB - vatTuneRepository vat_tune.VatTuneRepository - err error - headerRepository datastore.HeaderRepository + db *postgres.DB + repository vat_tune.VatTuneRepository ) BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - headerRepository = repositories.NewHeaderRepository(db) - vatTuneRepository = vat_tune.VatTuneRepository{} - vatTuneRepository.SetDB(db) + repository = vat_tune.VatTuneRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 - - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) + modelWithDifferentLogIdx := test_data.VatTuneModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VatTuneChecked, + LogEventTableName: "maker.vat_heal", + TestModel: test_data.VatTuneModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) It("adds a vat tune event", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(err).NotTo(HaveOccurred()) + + err = repository.Create(headerID, []interface{}{test_data.VatTuneModel}) Expect(err).NotTo(HaveOccurred()) var dbVatTune vat_tune.VatTuneModel @@ -58,166 +61,23 @@ var _ = Describe("Vat tune repository", func() { Expect(dbVatTune.LogIndex).To(Equal(test_data.VatTuneModel.LogIndex)) Expect(dbVatTune.Raw).To(MatchJSON(test_data.VatTuneModel.Raw)) }) - - It("marks header as checked for logs", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("does not duplicate vat tune events", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - Expect(err).NotTo(HaveOccurred()) - - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("allows for multiple vat tune events in one transaction if they have different log indexes", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - Expect(err).NotTo(HaveOccurred()) - - newVatTune := test_data.VatTuneModel - newVatTune.LogIndex = newVatTune.LogIndex + 1 - err := vatTuneRepository.Create(headerID, []interface{}{newVatTune}) - - Expect(err).NotTo(HaveOccurred()) - }) - - It("removes vat tune if corresponding header is deleted", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.VatTuneModel}) - Expect(err).NotTo(HaveOccurred()) - - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbVatTune vat_tune.VatTuneModel - err = db.Get(&dbVatTune, `SELECT ilk, urn, v, w, dink, dart, tx_idx, raw_log FROM maker.vat_tune WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("returns an error if model is of wrong type", func() { - err = vatTuneRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) - }) }) Describe("MarkHeaderChecked", func() { - var headerID int64 + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VatTuneChecked, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a row for a new headerID", func() { - err = vatTuneRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerID already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - - err = vatTuneRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) Describe("MissingHeaders", func() { - var ( - startingBlockNumber, vatTuneBlockNumber, endingBlockNumber int64 - blockNumbers, headerIDs []int64 - ) + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vat_tune.VatTuneRepository{}, + } - BeforeEach(func() { - startingBlockNumber = rand.Int63() - vatTuneBlockNumber = startingBlockNumber + 1 - endingBlockNumber = startingBlockNumber + 2 - - blockNumbers = []int64{startingBlockNumber, vatTuneBlockNumber, endingBlockNumber, endingBlockNumber + 1} - - headerIDs = []int64{} - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - } - }) - - It("returns headers that haven't been checked", func() { - err := vatTuneRepository.MarkHeaderChecked(headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatTuneRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - }) - - It("only treats headers as checked if vat tune logs have been checked", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := vatTuneRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTuneBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTuneBlockNumber))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTuneBlockNumber))) - }) - - It("only returns headers associated with the current node", func() { - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - for _, n := range blockNumbers { - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - vatTuneRepositoryTwo := vat_tune.VatTuneRepository{} - vatTuneRepositoryTwo.SetDB(dbTwo) - err := vatTuneRepository.MarkHeaderChecked(headerIDs[0]) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := vatTuneRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := vatTuneRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) }) }) diff --git a/pkg/transformers/vow_flog/converter_test.go b/pkg/transformers/vow_flog/converter_test.go index 4849f06c..895fe510 100644 --- a/pkg/transformers/vow_flog/converter_test.go +++ b/pkg/transformers/vow_flog/converter_test.go @@ -49,10 +49,10 @@ var _ = Describe("Vow flog converter", func() { }) It("converts a log to a model", func() { - models, err := converter.ToModels([]types.Log{test_data.EthFlogLog}) + models, err := converter.ToModels([]types.Log{test_data.EthVowFlogLog}) Expect(err).NotTo(HaveOccurred()) Expect(len(models)).To(Equal(1)) - Expect(models[0].(vow_flog.VowFlogModel)).To(Equal(test_data.FlogModel)) + Expect(models[0].(vow_flog.VowFlogModel)).To(Equal(test_data.VowFlogModel)) }) }) diff --git a/pkg/transformers/vow_flog/repository.go b/pkg/transformers/vow_flog/repository.go index 78a22373..0754fd5f 100644 --- a/pkg/transformers/vow_flog/repository.go +++ b/pkg/transformers/vow_flog/repository.go @@ -18,6 +18,8 @@ import ( "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" ) type VowFlogRepository struct { @@ -48,41 +50,22 @@ func (repository VowFlogRepository) Create(headerID int64, models []interface{}) return err } } - _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vow_flog_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vow_flog_checked = $2`, headerID, true) + err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked) if err != nil { tx.Rollback() return err } + return tx.Commit() } func (repository VowFlogRepository) MarkHeaderChecked(headerID int64) error { - _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vow_flog_checked) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET vow_flog_checked = $2`, headerID, true) - return err + return shared.MarkHeaderChecked(headerID, repository.db, constants.VowFlogChecked) } func (repository VowFlogRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { - var result []core.Header - err := repository.db.Select( - &result, - `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vow_flog_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlockNumber, - endingBlockNumber, - repository.db.Node.ID, - ) - return result, err + return shared.MissingHeaders(startingBlockNumber, endingBlockNumber, repository.db, constants.VowFlogChecked) } func (repository *VowFlogRepository) SetDB(db *postgres.DB) { diff --git a/pkg/transformers/vow_flog/repository_test.go b/pkg/transformers/vow_flog/repository_test.go index 821388ac..153b1876 100644 --- a/pkg/transformers/vow_flog/repository_test.go +++ b/pkg/transformers/vow_flog/repository_test.go @@ -15,14 +15,11 @@ package vow_flog_test import ( - "database/sql" - "math/rand" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/shared_behaviors" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" @@ -33,186 +30,63 @@ import ( var _ = Describe("Vow flog repository", func() { var ( - db *postgres.DB - flogRepository vow_flog.VowFlogRepository - err error - headerRepository datastore.HeaderRepository + db *postgres.DB + repository vow_flog.VowFlogRepository ) BeforeEach(func() { - db = test_config.NewTestDB(core.Node{}) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) - headerRepository = repositories.NewHeaderRepository(db) - flogRepository = vow_flog.VowFlogRepository{} - flogRepository.SetDB(db) + repository = vow_flog.VowFlogRepository{} + repository.SetDB(db) }) Describe("Create", func() { - var headerID int64 + modelWithDifferentLogIdx := test_data.VowFlogModel + modelWithDifferentLogIdx.LogIndex++ + inputs := shared_behaviors.CreateBehaviorInputs{ + CheckedHeaderColumnName: constants.VowFlogChecked, + LogEventTableName: "maker.vow_flog", + TestModel: test_data.VowFlogModel, + ModelWithDifferentLogIdx: modelWithDifferentLogIdx, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) + shared_behaviors.SharedRepositoryCreateBehaviors(&inputs) It("adds a vow flog event", func() { - err = flogRepository.Create(headerID, []interface{}{test_data.FlogModel}) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(err).NotTo(HaveOccurred()) + + err = repository.Create(headerID, []interface{}{test_data.VowFlogModel}) Expect(err).NotTo(HaveOccurred()) var dbFlog vow_flog.VowFlogModel err = db.Get(&dbFlog, `SELECT era, log_idx, tx_idx, raw_log FROM maker.vow_flog WHERE header_id = $1`, headerID) Expect(err).NotTo(HaveOccurred()) - Expect(dbFlog.Era).To(Equal(test_data.FlogModel.Era)) - Expect(dbFlog.LogIndex).To(Equal(test_data.FlogModel.LogIndex)) - Expect(dbFlog.TransactionIndex).To(Equal(test_data.FlogModel.TransactionIndex)) - Expect(dbFlog.Raw).To(MatchJSON(test_data.FlogModel.Raw)) - }) - - It("marks header as checked for logs", func() { - err = flogRepository.Create(headerID, []interface{}{test_data.FlogModel}) - Expect(err).NotTo(HaveOccurred()) - - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vow_flog_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates the header to checked if checked headers row already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - Expect(err).NotTo(HaveOccurred()) - - err = flogRepository.Create(headerID, []interface{}{test_data.FlogModel}) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vow_flog_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("does not duplicate vow flog events", func() { - err = flogRepository.Create(headerID, []interface{}{test_data.FlogModel}) - Expect(err).NotTo(HaveOccurred()) - - err = flogRepository.Create(headerID, []interface{}{test_data.FlogModel}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) - }) - - It("removes vow flog events if corresponding header is deleted", func() { - _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) - - Expect(err).NotTo(HaveOccurred()) - var dbFlog vow_flog.VowFlogModel - err = db.Get(&dbFlog, `SELECT era, log_idx, tx_idx, raw_log FROM maker.vow_flog WHERE header_id = $1`, headerID) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(sql.ErrNoRows)) - }) - - It("returns an error if model is of wrong type", func() { - err = flogRepository.Create(headerID, []interface{}{test_data.WrongModel{}}) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("model of type")) + Expect(dbFlog.Era).To(Equal(test_data.VowFlogModel.Era)) + Expect(dbFlog.LogIndex).To(Equal(test_data.VowFlogModel.LogIndex)) + Expect(dbFlog.TransactionIndex).To(Equal(test_data.VowFlogModel.TransactionIndex)) + Expect(dbFlog.Raw).To(MatchJSON(test_data.VowFlogModel.Raw)) }) }) Describe("MarkHeaderChecked", func() { - var headerID int64 + inputs := shared_behaviors.MarkedHeaderCheckedBehaviorInputs{ + CheckedHeaderColumnName: constants.VowFlogChecked, + Repository: &repository, + } - BeforeEach(func() { - headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(err).NotTo(HaveOccurred()) - }) - - It("creates a row for a new headerID", func() { - err = flogRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vow_flog_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) - - It("updates row when headerID already exists", func() { - _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) - - err = flogRepository.MarkHeaderChecked(headerID) - - Expect(err).NotTo(HaveOccurred()) - var headerChecked bool - err = db.Get(&headerChecked, `SELECT vow_flog_checked FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(err).NotTo(HaveOccurred()) - Expect(headerChecked).To(BeTrue()) - }) + shared_behaviors.SharedRepositoryMarkHeaderCheckedBehaviors(&inputs) }) Describe("MissingHeaders", func() { - var ( - startingBlockNumber, flogBlockNumber, endingBlockNumber int64 - blockNumbers, headerIDs []int64 - ) + inputs := shared_behaviors.MissingHeadersBehaviorInputs{ + Repository: &repository, + RepositoryTwo: &vow_flog.VowFlogRepository{}, + } - BeforeEach(func() { - startingBlockNumber = rand.Int63() - flogBlockNumber = startingBlockNumber + 1 - endingBlockNumber = startingBlockNumber + 2 - - blockNumbers = []int64{startingBlockNumber, flogBlockNumber, endingBlockNumber, endingBlockNumber + 1} - - headerIDs = []int64{} - for _, n := range blockNumbers { - headerID, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIDs = append(headerIDs, headerID) - } - }) - - It("returns headers that haven't been checked", func() { - err := flogRepository.MarkHeaderChecked(headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := flogRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(2)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) - }) - - It("only treats headers as checked if vow flog logs have been checked", func() { - _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) - Expect(err).NotTo(HaveOccurred()) - - headers, err := flogRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(headers)).To(Equal(3)) - Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(flogBlockNumber))) - Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(flogBlockNumber))) - Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(flogBlockNumber))) - }) - - It("only returns headers associated with the current node", func() { - dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) - headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) - for _, n := range blockNumbers { - _, err = headerRepositoryTwo.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - flogRepositoryTwo := vow_flog.VowFlogRepository{} - flogRepositoryTwo.SetDB(dbTwo) - err := flogRepository.MarkHeaderChecked(headerIDs[0]) - Expect(err).NotTo(HaveOccurred()) - - nodeOneMissingHeaders, err := flogRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) - - nodeTwoMissingHeaders, err := flogRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) - }) + shared_behaviors.SharedRepositoryMissingHeadersBehaviors(&inputs) }) })