Make price feed log persistence atomic per block

- Prevents marking header as checked if only one of several price feed
  logs is successfully persisted
This commit is contained in:
Rob Mulholand 2018-09-20 17:04:53 -05:00
parent ac362650db
commit 92525ca575
8 changed files with 54 additions and 43 deletions

View File

@ -21,18 +21,25 @@ import (
) )
type Converter interface { type Converter interface {
ToModel(log types.Log, headerID int64) (PriceFeedModel, error) ToModels(logs []types.Log, headerID int64) ([]PriceFeedModel, error)
} }
type PriceFeedConverter struct{} type PriceFeedConverter struct{}
func (converter PriceFeedConverter) ToModel(log types.Log, headerID int64) (PriceFeedModel, error) { func (converter PriceFeedConverter) ToModels(logs []types.Log, headerID int64) (results []PriceFeedModel, err error) {
for _, log := range logs {
raw, err := json.Marshal(log) raw, err := json.Marshal(log)
return PriceFeedModel{ if err != nil {
return nil, err
}
model := PriceFeedModel{
BlockNumber: log.BlockNumber, BlockNumber: log.BlockNumber,
MedianizerAddress: log.Address.Bytes(), MedianizerAddress: log.Address.Bytes(),
UsdValue: Convert("wad", hexutil.Encode(log.Data), 15), UsdValue: Convert("wad", hexutil.Encode(log.Data), 15),
TransactionIndex: log.TxIndex, TransactionIndex: log.TxIndex,
Raw: raw, Raw: raw,
}, err }
results = append(results, model)
}
return results, err
} }

View File

@ -18,6 +18,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds" "github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
) )
@ -27,9 +28,10 @@ var _ = Describe("Price feed Converter", func() {
converter := price_feeds.PriceFeedConverter{} converter := price_feeds.PriceFeedConverter{}
headerID := int64(123) headerID := int64(123)
model, err := converter.ToModel(test_data.EthPriceFeedLog, headerID) models, err := converter.ToModels([]types.Log{test_data.EthPriceFeedLog}, headerID)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(model).To(Equal(test_data.PriceFeedModel)) Expect(len(models)).To(Equal(1))
Expect(models[0]).To(Equal(test_data.PriceFeedModel))
}) })
}) })

View File

@ -20,7 +20,7 @@ import (
) )
type IPriceFeedRepository interface { type IPriceFeedRepository interface {
Create(headerID int64, model PriceFeedModel) error Create(headerID int64, models []PriceFeedModel) error
MarkHeaderChecked(headerID int64) error MarkHeaderChecked(headerID int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
} }
@ -33,14 +33,19 @@ func NewPriceFeedRepository(db *postgres.DB) PriceFeedRepository {
return PriceFeedRepository{db: db} return PriceFeedRepository{db: db}
} }
func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedModel) error { func (repository PriceFeedRepository) Create(headerID int64, models []PriceFeedModel) error {
tx, err := repository.db.Begin() tx, err := repository.db.Begin()
if err != nil {
return err
}
for _, model := range models {
_, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log)
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw) VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return err return err
} }
}
_, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked) _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, price_feeds_checked)
VALUES ($1, $2) VALUES ($1, $2)
ON CONFLICT (header_id) DO ON CONFLICT (header_id) DO
@ -49,8 +54,7 @@ func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedMode
tx.Rollback() tx.Rollback()
return err return err
} }
tx.Commit() return tx.Commit()
return nil
} }
func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error { func (repository PriceFeedRepository) MarkHeaderChecked(headerID int64) error {

View File

@ -35,7 +35,7 @@ var _ = Describe("Price feeds repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
priceFeedRepository := price_feeds.NewPriceFeedRepository(db) priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var dbPriceFeedUpdate price_feeds.PriceFeedModel var dbPriceFeedUpdate price_feeds.PriceFeedModel
@ -56,7 +56,7 @@ var _ = Describe("Price feeds repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
priceFeedRepository := price_feeds.NewPriceFeedRepository(db) priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var headerChecked bool var headerChecked bool
@ -73,10 +73,10 @@ var _ = Describe("Price feeds repository", func() {
headerID, err := headerRepository.CreateOrUpdateHeader(header) headerID, err := headerRepository.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
priceFeedRepository := price_feeds.NewPriceFeedRepository(db) priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel) err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })

View File

@ -62,16 +62,14 @@ func (transformer PriceFeedTransformer) Execute() error {
return err return err
} }
} }
for _, log := range logs { models, err := transformer.Converter.ToModels(logs, header.Id)
model, err := transformer.Converter.ToModel(log, header.Id)
if err != nil { if err != nil {
return err return err
} }
err = transformer.Repository.Create(header.Id, model) err = transformer.Repository.Create(header.Id, models)
if err != nil { if err != nil {
return err return err
} }
} }
}
return nil return nil
} }

View File

@ -149,7 +149,7 @@ var _ = Describe("Price feed transformer", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockConverter.PassedHeaderID).To(Equal(headerID)) Expect(mockConverter.PassedHeaderID).To(Equal(headerID))
Expect(mockConverter.PassedLog).To(Equal(test_data.EthPriceFeedLog)) Expect(mockConverter.PassedLogs).To(Equal([]types.Log{test_data.EthPriceFeedLog}))
}) })
It("returns err if converter returns err", func() { It("returns err if converter returns err", func() {
@ -188,7 +188,7 @@ var _ = Describe("Price feed transformer", func() {
err := transformer.Execute() err := transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
mockRepository.AssertCreateCalledWith(headerID, test_data.PriceFeedModel) mockRepository.AssertCreateCalledWith(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel})
}) })
It("returns error if creating price feed update returns error", func() { It("returns error if creating price feed update returns error", func() {

View File

@ -22,14 +22,14 @@ import (
type MockPriceFeedConverter struct { type MockPriceFeedConverter struct {
converterErr error converterErr error
PassedLog types.Log PassedLogs []types.Log
PassedHeaderID int64 PassedHeaderID int64
} }
func (converter *MockPriceFeedConverter) ToModel(log types.Log, headerID int64) (price_feeds.PriceFeedModel, error) { func (converter *MockPriceFeedConverter) ToModels(logs []types.Log, headerID int64) ([]price_feeds.PriceFeedModel, error) {
converter.PassedLog = log converter.PassedLogs = logs
converter.PassedHeaderID = headerID converter.PassedHeaderID = headerID
return test_data.PriceFeedModel, converter.converterErr return []price_feeds.PriceFeedModel{test_data.PriceFeedModel}, converter.converterErr
} }
func (converter *MockPriceFeedConverter) SetConverterErr(e error) { func (converter *MockPriceFeedConverter) SetConverterErr(e error) {

View File

@ -29,7 +29,7 @@ type MockPriceFeedRepository struct {
missingHeaders []core.Header missingHeaders []core.Header
missingHeadersErr error missingHeadersErr error
passedEndingBlockNumber int64 passedEndingBlockNumber int64
passedModel price_feeds.PriceFeedModel passedModels []price_feeds.PriceFeedModel
passedStartingBlockNumber int64 passedStartingBlockNumber int64
} }
@ -49,9 +49,9 @@ func (repository *MockPriceFeedRepository) SetMissingHeaders(headers []core.Head
repository.missingHeaders = headers repository.missingHeaders = headers
} }
func (repository *MockPriceFeedRepository) Create(headerID int64, model price_feeds.PriceFeedModel) error { func (repository *MockPriceFeedRepository) Create(headerID int64, models []price_feeds.PriceFeedModel) error {
repository.createPassedHeaderID = headerID repository.createPassedHeaderID = headerID
repository.passedModel = model repository.passedModels = models
return repository.createErr return repository.createErr
} }
@ -66,9 +66,9 @@ func (repository *MockPriceFeedRepository) MissingHeaders(startingBlockNumber, e
return repository.missingHeaders, repository.missingHeadersErr return repository.missingHeaders, repository.missingHeadersErr
} }
func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, model price_feeds.PriceFeedModel) { func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, models []price_feeds.PriceFeedModel) {
Expect(repository.createPassedHeaderID).To(Equal(headerID)) Expect(repository.createPassedHeaderID).To(Equal(headerID))
Expect(repository.passedModel).To(Equal(model)) Expect(repository.passedModels).To(Equal(models))
} }
func (repository *MockPriceFeedRepository) AssertMarkHeaderCheckedCalledWith(headerID int64) { func (repository *MockPriceFeedRepository) AssertMarkHeaderCheckedCalledWith(headerID int64) {