From 3e9901864b0b5a3c45c82736c003b74404bd0c39 Mon Sep 17 00:00:00 2001 From: David Terry Date: Tue, 9 Oct 2018 13:34:22 +0300 Subject: [PATCH] vat.fold: switch to batched interface --- .../test_data/mocks/vat_fold/converter.go | 8 ++-- .../test_data/mocks/vat_fold/repository.go | 35 ++++++++++++---- pkg/transformers/vat_fold/converter.go | 42 +++++++++++-------- pkg/transformers/vat_fold/converter_test.go | 6 +-- pkg/transformers/vat_fold/repository.go | 37 ++++++++++++---- pkg/transformers/vat_fold/repository_test.go | 9 ++-- pkg/transformers/vat_fold/transformer.go | 16 ++++--- pkg/transformers/vat_fold/transformer_test.go | 4 +- 8 files changed, 104 insertions(+), 53 deletions(-) diff --git a/pkg/transformers/test_data/mocks/vat_fold/converter.go b/pkg/transformers/test_data/mocks/vat_fold/converter.go index 9625cf78..c9b992bf 100644 --- a/pkg/transformers/test_data/mocks/vat_fold/converter.go +++ b/pkg/transformers/test_data/mocks/vat_fold/converter.go @@ -23,12 +23,12 @@ import ( type MockVatFoldConverter struct { converterErr error - PassedLog types.Log + PassedLogs []types.Log } -func (converter *MockVatFoldConverter) ToModel(ethLog types.Log) (vat_fold.VatFoldModel, error) { - converter.PassedLog = ethLog - return test_data.VatFoldModel, converter.converterErr +func (converter *MockVatFoldConverter) ToModels(ethLogs []types.Log) ([]vat_fold.VatFoldModel, error) { + converter.PassedLogs = ethLogs + return []vat_fold.VatFoldModel{test_data.VatFoldModel}, converter.converterErr } func (converter *MockVatFoldConverter) SetConverterError(e error) { diff --git a/pkg/transformers/test_data/mocks/vat_fold/repository.go b/pkg/transformers/test_data/mocks/vat_fold/repository.go index 79e973d0..55deb829 100644 --- a/pkg/transformers/test_data/mocks/vat_fold/repository.go +++ b/pkg/transformers/test_data/mocks/vat_fold/repository.go @@ -15,32 +15,45 @@ package vat_fold import ( + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_fold" ) type MockVatFoldRepository struct { - createErr error - missingHeaders []core.Header - missingHeadersErr error - PassedStartingBlockNumber int64 - PassedEndingBlockNumber int64 - PassedHeaderID int64 - PassedModel vat_fold.VatFoldModel + createErr error + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderID int64 + missingHeaders []core.Header + missingHeadersErr error + PassedStartingBlockNumber int64 + PassedEndingBlockNumber int64 + PassedHeaderID int64 + PassedModels []vat_fold.VatFoldModel } -func (repository *MockVatFoldRepository) Create(headerID int64, model vat_fold.VatFoldModel) error { +func (repository *MockVatFoldRepository) Create(headerID int64, models []vat_fold.VatFoldModel) error { repository.PassedHeaderID = headerID - repository.PassedModel = model + repository.PassedModels = models return repository.createErr } +func (repository *MockVatFoldRepository) MarkHeaderChecked(headerID int64) error { + repository.markHeaderCheckedPassedHeaderID = headerID + return repository.markHeaderCheckedErr +} + func (repository *MockVatFoldRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber return repository.missingHeaders, repository.missingHeadersErr } +func (repository *MockVatFoldRepository) SetMarkHeaderCheckedErr(e error) { + repository.markHeaderCheckedErr = e +} + func (repository *MockVatFoldRepository) SetMissingHeadersErr(e error) { repository.missingHeadersErr = e } @@ -52,3 +65,7 @@ func (repository *MockVatFoldRepository) SetMissingHeaders(headers []core.Header func (repository *MockVatFoldRepository) SetCreateError(e error) { repository.createErr = e } + +func (repository *MockVatFoldRepository) AssertMarkHeaderCheckedCalledWith(i int64) { + Expect(repository.markHeaderCheckedPassedHeaderID).To(Equal(i)) +} diff --git a/pkg/transformers/vat_fold/converter.go b/pkg/transformers/vat_fold/converter.go index c8bf0e1d..4f57919c 100644 --- a/pkg/transformers/vat_fold/converter.go +++ b/pkg/transformers/vat_fold/converter.go @@ -26,29 +26,35 @@ import ( ) type Converter interface { - ToModel(ethLog types.Log) (VatFoldModel, error) + ToModels(ethLogs []types.Log) ([]VatFoldModel, error) } type VatFoldConverter struct{} -func (VatFoldConverter) ToModel(ethLog types.Log) (VatFoldModel, error) { - err := verifyLog(ethLog) - if err != nil { - return VatFoldModel{}, err +func (VatFoldConverter) ToModels(ethLogs []types.Log) ([]VatFoldModel, error) { + var models []VatFoldModel + for _, ethLog := range ethLogs { + err := verifyLog(ethLog) + if err != nil { + return nil, err + } + + ilk := string(bytes.Trim(ethLog.Topics[1].Bytes(), "\x00")) + urn := common.BytesToAddress(ethLog.Topics[2].Bytes()).String() + rate := big.NewInt(0).SetBytes(ethLog.Topics[3].Bytes()).String() + raw, err := json.Marshal(ethLog) + + model := VatFoldModel{ + Ilk: ilk, + Urn: urn, + Rate: rate, + TransactionIndex: ethLog.TxIndex, + Raw: raw, + } + + models = append(models, model) } - - ilk := string(bytes.Trim(ethLog.Topics[1].Bytes(), "\x00")) - urn := common.HexToAddress(ethLog.Topics[2].String()).String() - rate := big.NewInt(0).SetBytes(ethLog.Topics[3].Bytes()).String() - raw, err := json.Marshal(ethLog) - - return VatFoldModel{ - Ilk: ilk, - Urn: urn, - Rate: rate, - TransactionIndex: ethLog.TxIndex, - Raw: raw, - }, err + return models, nil } func verifyLog(log types.Log) error { diff --git a/pkg/transformers/vat_fold/converter_test.go b/pkg/transformers/vat_fold/converter_test.go index 021096db..97d687ad 100644 --- a/pkg/transformers/vat_fold/converter_test.go +++ b/pkg/transformers/vat_fold/converter_test.go @@ -28,7 +28,7 @@ var _ = Describe("Vat fold converter", func() { converter := vat_fold.VatFoldConverter{} badLog := types.Log{} - _, err := converter.ToModel(badLog) + _, err := converter.ToModels([]types.Log{badLog}) Expect(err).To(HaveOccurred()) }) @@ -36,9 +36,9 @@ var _ = Describe("Vat fold converter", func() { It("converts a log to an model", func() { converter := vat_fold.VatFoldConverter{} - model, err := converter.ToModel(test_data.EthVatFoldLog) + model, err := converter.ToModels([]types.Log{test_data.EthVatFoldLog}) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.VatFoldModel)) + Expect(model).To(Equal([]vat_fold.VatFoldModel{test_data.VatFoldModel})) }) }) diff --git a/pkg/transformers/vat_fold/repository.go b/pkg/transformers/vat_fold/repository.go index 19fac0a1..d68b93a8 100644 --- a/pkg/transformers/vat_fold/repository.go +++ b/pkg/transformers/vat_fold/repository.go @@ -20,7 +20,8 @@ import ( ) type Repository interface { - Create(headerID int64, model VatFoldModel) error + Create(headerID int64, models []VatFoldModel) error + MarkHeaderChecked(headerID int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -34,10 +35,31 @@ func NewVatFoldRepository(db *postgres.DB) VatFoldRepository { } } -func (repository VatFoldRepository) Create(headerID int64, model VatFoldModel) error { - _, err := repository.db.Exec(`INSERT INTO maker.vat_fold (header_id, ilk, urn, rate, raw_log, tx_idx) - VALUES($1, $2, $3, $4::NUMERIC, $5, $6)`, - headerID, model.Ilk, model.Urn, model.Rate, model.Raw, model.TransactionIndex) +func (repository VatFoldRepository) Create(headerID int64, models []VatFoldModel) error { + tx, err := repository.db.Begin() + if err != nil { + return err + } + for _, model := range models { + _, err = tx.Exec( + `INSERT into maker.vat_fold (header_id, ilk, urn, rate, tx_idx, raw_log) + VALUES($1, $2, $3, $4::NUMERIC, $5, $6)`, + headerID, model.Ilk, model.Urn, model.Rate, model.TransactionIndex, model.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + + return tx.Commit() +} + +func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error { + _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_fold_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET vat_fold_checked = $2`, headerID, true) return err } @@ -46,8 +68,8 @@ func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBl err := repository.db.Select( &result, `SELECT headers.id, headers.block_number FROM headers - LEFT JOIN maker.vat_fold on headers.id = header_id - WHERE header_id ISNULL + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR vat_fold_checked IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3`, @@ -55,6 +77,5 @@ func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBl endingBlockNumber, repository.db.Node.ID, ) - return result, err } diff --git a/pkg/transformers/vat_fold/repository_test.go b/pkg/transformers/vat_fold/repository_test.go index 6de7b3f4..a8c17c16 100644 --- a/pkg/transformers/vat_fold/repository_test.go +++ b/pkg/transformers/vat_fold/repository_test.go @@ -46,7 +46,7 @@ var _ = Describe("", func() { headerID = id vatFoldRepository = vat_fold.NewVatFoldRepository(db) - err = vatFoldRepository.Create(headerID, test_data.VatFoldModel) + err = vatFoldRepository.Create(headerID, []vat_fold.VatFoldModel{test_data.VatFoldModel}) Expect(err).NotTo(HaveOccurred()) }) @@ -63,7 +63,7 @@ var _ = Describe("", func() { }) It("does not duplicate vat events", func() { - err := vatFoldRepository.Create(headerID, test_data.VatFoldModel) + err := vatFoldRepository.Create(headerID, []vat_fold.VatFoldModel{test_data.VatFoldModel}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) @@ -164,7 +164,10 @@ func initRepository(options repositoryOptions) vat_fold.VatFoldRepository { } if options.storeEvent { - err := vatfoldRepository.Create(headerIDs[options.storedEventBlockNumber], test_data.VatFoldModel) + err := vatfoldRepository.Create( + headerIDs[options.storedEventBlockNumber], + []vat_fold.VatFoldModel{test_data.VatFoldModel}, + ) Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/transformers/vat_fold/transformer.go b/pkg/transformers/vat_fold/transformer.go index 78237fb0..17c6a321 100644 --- a/pkg/transformers/vat_fold/transformer.go +++ b/pkg/transformers/vat_fold/transformer.go @@ -59,16 +59,20 @@ func (transformer VatFoldTransformer) Execute() error { if err != nil { return err } - for _, log := range matchingLogs { - model, err := transformer.Converter.ToModel(log) - if err != nil { - return err - } - err = transformer.Repository.Create(header.Id, model) + if len(matchingLogs) < 1 { + err = transformer.Repository.MarkHeaderChecked(header.Id) if err != nil { return err } } + models, err := transformer.Converter.ToModels(matchingLogs) + if err != nil { + return err + } + err = transformer.Repository.Create(header.Id, models) + if err != nil { + return err + } } return nil } diff --git a/pkg/transformers/vat_fold/transformer_test.go b/pkg/transformers/vat_fold/transformer_test.go index 57370d2b..f263bc90 100644 --- a/pkg/transformers/vat_fold/transformer_test.go +++ b/pkg/transformers/vat_fold/transformer_test.go @@ -134,7 +134,7 @@ var _ = Describe("Vat fold transformer", func() { err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - Expect(converter.PassedLog).To(Equal(test_data.EthVatFoldLog)) + Expect(converter.PassedLogs).To(Equal([]types.Log{test_data.EthVatFoldLog})) }) It("returns error if converter returns error", func() { @@ -161,7 +161,7 @@ var _ = Describe("Vat fold transformer", func() { Expect(err).NotTo(HaveOccurred()) Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id)) - Expect(repository.PassedModel).To(Equal(test_data.VatFoldModel)) + Expect(repository.PassedModels).To(Equal([]vat_fold.VatFoldModel{test_data.VatFoldModel})) }) It("returns error if repository returns error for create", func() {