From 92525ca5758de1be8de13dffda2599410a72d7ce Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Thu, 20 Sep 2018 17:04:53 -0500 Subject: [PATCH] Make price feed log persistence atomic per block - Prevents marking header as checked if only one of several price feed logs is successfully persisted --- pkg/transformers/price_feeds/converter.go | 27 ++++++++++++------- .../price_feeds/converter_test.go | 6 +++-- pkg/transformers/price_feeds/repository.go | 18 ++++++++----- .../price_feeds/repository_test.go | 8 +++--- pkg/transformers/price_feeds/transformer.go | 16 +++++------ .../price_feeds/transformer_test.go | 4 +-- .../test_data/mocks/price_feeds/converter.go | 8 +++--- .../test_data/mocks/price_feeds/repository.go | 10 +++---- 8 files changed, 54 insertions(+), 43 deletions(-) diff --git a/pkg/transformers/price_feeds/converter.go b/pkg/transformers/price_feeds/converter.go index 25d1bc6c..83fc5f7a 100644 --- a/pkg/transformers/price_feeds/converter.go +++ b/pkg/transformers/price_feeds/converter.go @@ -21,18 +21,25 @@ import ( ) type Converter interface { - ToModel(log types.Log, headerID int64) (PriceFeedModel, error) + ToModels(logs []types.Log, headerID int64) ([]PriceFeedModel, error) } type PriceFeedConverter struct{} -func (converter PriceFeedConverter) ToModel(log types.Log, headerID int64) (PriceFeedModel, error) { - raw, err := json.Marshal(log) - return PriceFeedModel{ - BlockNumber: log.BlockNumber, - MedianizerAddress: log.Address.Bytes(), - UsdValue: Convert("wad", hexutil.Encode(log.Data), 15), - TransactionIndex: log.TxIndex, - Raw: raw, - }, err +func (converter PriceFeedConverter) ToModels(logs []types.Log, headerID int64) (results []PriceFeedModel, err error) { + for _, log := range logs { + raw, err := json.Marshal(log) + if err != nil { + return nil, err + } + model := PriceFeedModel{ + BlockNumber: log.BlockNumber, + MedianizerAddress: log.Address.Bytes(), + UsdValue: Convert("wad", hexutil.Encode(log.Data), 15), + TransactionIndex: log.TxIndex, + Raw: raw, + } + results = append(results, model) + } + return results, err } diff --git a/pkg/transformers/price_feeds/converter_test.go b/pkg/transformers/price_feeds/converter_test.go index 9baa9025..e16142c3 100644 --- a/pkg/transformers/price_feeds/converter_test.go +++ b/pkg/transformers/price_feeds/converter_test.go @@ -18,6 +18,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" ) @@ -27,9 +28,10 @@ var _ = Describe("Price feed Converter", func() { converter := price_feeds.PriceFeedConverter{} headerID := int64(123) - model, err := converter.ToModel(test_data.EthPriceFeedLog, headerID) + models, err := converter.ToModels([]types.Log{test_data.EthPriceFeedLog}, headerID) Expect(err).NotTo(HaveOccurred()) - Expect(model).To(Equal(test_data.PriceFeedModel)) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.PriceFeedModel)) }) }) diff --git a/pkg/transformers/price_feeds/repository.go b/pkg/transformers/price_feeds/repository.go index 59071a25..383411c2 100644 --- a/pkg/transformers/price_feeds/repository.go +++ b/pkg/transformers/price_feeds/repository.go @@ -20,7 +20,7 @@ import ( ) type IPriceFeedRepository interface { - Create(headerID int64, model PriceFeedModel) error + Create(headerID int64, models []PriceFeedModel) error MarkHeaderChecked(headerID int64) error MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) } @@ -33,14 +33,19 @@ func NewPriceFeedRepository(db *postgres.DB) PriceFeedRepository { return PriceFeedRepository{db: db} } -func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedModel) error { +func (repository PriceFeedRepository) Create(headerID int64, models []PriceFeedModel) error { tx, err := repository.db.Begin() - _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) - VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw) if err != nil { - tx.Rollback() return err } + for _, model := range models { + _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) + VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw) + if err != nil { + tx.Rollback() + return err + } + } _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) VALUES ($1, $2) ON CONFLICT (header_id) DO @@ -49,8 +54,7 @@ func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedMode tx.Rollback() return err } - tx.Commit() - return nil + return tx.Commit() } func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error { diff --git a/pkg/transformers/price_feeds/repository_test.go b/pkg/transformers/price_feeds/repository_test.go index dfa62ee3..2e338821 100644 --- a/pkg/transformers/price_feeds/repository_test.go +++ b/pkg/transformers/price_feeds/repository_test.go @@ -35,7 +35,7 @@ var _ = Describe("Price feeds repository", func() { Expect(err).NotTo(HaveOccurred()) priceFeedRepository := price_feeds.NewPriceFeedRepository(db) - err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) Expect(err).NotTo(HaveOccurred()) var dbPriceFeedUpdate price_feeds.PriceFeedModel @@ -56,7 +56,7 @@ var _ = Describe("Price feeds repository", func() { Expect(err).NotTo(HaveOccurred()) priceFeedRepository := price_feeds.NewPriceFeedRepository(db) - err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) Expect(err).NotTo(HaveOccurred()) var headerChecked bool @@ -73,10 +73,10 @@ var _ = Describe("Price feeds repository", func() { headerID, err := headerRepository.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) priceFeedRepository := price_feeds.NewPriceFeedRepository(db) - err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) Expect(err).NotTo(HaveOccurred()) - err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) Expect(err).To(HaveOccurred()) }) diff --git a/pkg/transformers/price_feeds/transformer.go b/pkg/transformers/price_feeds/transformer.go index a3e1c3e0..0958e61c 100644 --- a/pkg/transformers/price_feeds/transformer.go +++ b/pkg/transformers/price_feeds/transformer.go @@ -62,15 +62,13 @@ func (transformer PriceFeedTransformer) Execute() error { return err } } - for _, log := range logs { - model, err := transformer.Converter.ToModel(log, header.Id) - if err != nil { - return err - } - err = transformer.Repository.Create(header.Id, model) - if err != nil { - return err - } + models, err := transformer.Converter.ToModels(logs, header.Id) + if err != nil { + return err + } + err = transformer.Repository.Create(header.Id, models) + if err != nil { + return err } } return nil diff --git a/pkg/transformers/price_feeds/transformer_test.go b/pkg/transformers/price_feeds/transformer_test.go index 3fbc6b26..39f97cc8 100644 --- a/pkg/transformers/price_feeds/transformer_test.go +++ b/pkg/transformers/price_feeds/transformer_test.go @@ -149,7 +149,7 @@ var _ = Describe("Price feed transformer", func() { Expect(err).NotTo(HaveOccurred()) Expect(mockConverter.PassedHeaderID).To(Equal(headerID)) - Expect(mockConverter.PassedLog).To(Equal(test_data.EthPriceFeedLog)) + Expect(mockConverter.PassedLogs).To(Equal([]types.Log{test_data.EthPriceFeedLog})) }) It("returns err if converter returns err", func() { @@ -188,7 +188,7 @@ var _ = Describe("Price feed transformer", func() { err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - mockRepository.AssertCreateCalledWith(headerID, test_data.PriceFeedModel) + mockRepository.AssertCreateCalledWith(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) }) It("returns error if creating price feed update returns error", func() { diff --git a/pkg/transformers/test_data/mocks/price_feeds/converter.go b/pkg/transformers/test_data/mocks/price_feeds/converter.go index ef5115ca..c4f800da 100644 --- a/pkg/transformers/test_data/mocks/price_feeds/converter.go +++ b/pkg/transformers/test_data/mocks/price_feeds/converter.go @@ -22,14 +22,14 @@ import ( type MockPriceFeedConverter struct { converterErr error - PassedLog types.Log + PassedLogs []types.Log PassedHeaderID int64 } -func (converter *MockPriceFeedConverter) ToModel(log types.Log, headerID int64) (price_feeds.PriceFeedModel, error) { - converter.PassedLog = log +func (converter *MockPriceFeedConverter) ToModels(logs []types.Log, headerID int64) ([]price_feeds.PriceFeedModel, error) { + converter.PassedLogs = logs converter.PassedHeaderID = headerID - return test_data.PriceFeedModel, converter.converterErr + return []price_feeds.PriceFeedModel{test_data.PriceFeedModel}, converter.converterErr } func (converter *MockPriceFeedConverter) SetConverterErr(e error) { diff --git a/pkg/transformers/test_data/mocks/price_feeds/repository.go b/pkg/transformers/test_data/mocks/price_feeds/repository.go index f56c62a7..a5c39d86 100644 --- a/pkg/transformers/test_data/mocks/price_feeds/repository.go +++ b/pkg/transformers/test_data/mocks/price_feeds/repository.go @@ -29,7 +29,7 @@ type MockPriceFeedRepository struct { missingHeaders []core.Header missingHeadersErr error passedEndingBlockNumber int64 - passedModel price_feeds.PriceFeedModel + passedModels []price_feeds.PriceFeedModel passedStartingBlockNumber int64 } @@ -49,9 +49,9 @@ func (repository *MockPriceFeedRepository) SetMissingHeaders(headers []core.Head repository.missingHeaders = headers } -func (repository *MockPriceFeedRepository) Create(headerID int64, model price_feeds.PriceFeedModel) error { +func (repository *MockPriceFeedRepository) Create(headerID int64, models []price_feeds.PriceFeedModel) error { repository.createPassedHeaderID = headerID - repository.passedModel = model + repository.passedModels = models return repository.createErr } @@ -66,9 +66,9 @@ func (repository *MockPriceFeedRepository) MissingHeaders(startingBlockNumber, e return repository.missingHeaders, repository.missingHeadersErr } -func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, model price_feeds.PriceFeedModel) { +func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, models []price_feeds.PriceFeedModel) { Expect(repository.createPassedHeaderID).To(Equal(headerID)) - Expect(repository.passedModel).To(Equal(model)) + Expect(repository.passedModels).To(Equal(models)) } func (repository *MockPriceFeedRepository) AssertMarkHeaderCheckedCalledWith(headerID int64) {