Update price feeds if header changed by validator

- currently not validating price feeds if the underlying header already exists
  and is valid, since price feeds should have been added when initial header
  was added
This commit is contained in:
Rob Mulholand 2018-07-26 16:15:24 -05:00
parent 6e68dc4a92
commit 72a849a272
10 changed files with 65 additions and 13 deletions

View File

@ -97,7 +97,7 @@ func lightSync() {
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
headerRepository := repositories.NewHeaderRepository(&db) headerRepository := repositories.NewHeaderRepository(&db)
validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow) validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow, []transformers.Transformer{})
missingBlocksPopulated := make(chan int) missingBlocksPopulated := make(chan int)
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)

View File

@ -89,14 +89,13 @@ func syncPriceFeeds() {
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
headerRepository := repositories.NewHeaderRepository(&db) headerRepository := repositories.NewHeaderRepository(&db)
// TODO: add transformers to validation so we don't miss events on new block headers
validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow)
missingBlocksPopulated := make(chan int) missingBlocksPopulated := make(chan int)
transformers := []transformers.Transformer{ transformers := []transformers.Transformer{
pep.NewPepTransformer(blockChain, &db), pep.NewPepTransformer(blockChain, &db),
pip.NewPipTransformer(blockChain, &db), pip.NewPipTransformer(blockChain, &db),
rep.NewRepTransformer(blockChain, &db), rep.NewRepTransformer(blockChain, &db),
} }
validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow, transformers)
go backFillPriceFeeds(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber, transformers) go backFillPriceFeeds(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber, transformers)
for { for {

View File

@ -2,10 +2,13 @@ package repositories
import ( import (
"database/sql" "database/sql"
"errors"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
var ErrValidHeaderExists = errors.New("valid header already exists")
type HeaderRepository struct { type HeaderRepository struct {
database *postgres.DB database *postgres.DB
} }
@ -25,7 +28,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
if headerMustBeReplaced(hash, header) { if headerMustBeReplaced(hash, header) {
return repository.replaceHeader(header) return repository.replaceHeader(header)
} }
return 0, err return 0, ErrValidHeaderExists
} }
func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {

View File

@ -13,7 +13,6 @@ import (
var _ = Describe("Block header repository", func() { var _ = Describe("Block header repository", func() {
Describe("creating or updating a header", func() { Describe("creating or updating a header", func() {
It("adds a header", func() { It("adds a header", func() {
node := core.Node{} node := core.Node{}
db := test_config.NewTestDB(node) db := test_config.NewTestDB(node)
@ -56,7 +55,7 @@ var _ = Describe("Block header repository", func() {
Expect(ethNodeFingerprint).To(Equal(db.Node.ID)) Expect(ethNodeFingerprint).To(Equal(db.Node.ID))
}) })
It("does not duplicate headers", func() { It("returns valid header exists error if attempting duplicate headers", func() {
node := core.Node{} node := core.Node{}
db := test_config.NewTestDB(node) db := test_config.NewTestDB(node)
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
@ -71,7 +70,8 @@ var _ = Describe("Block header repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(repositories.ErrValidHeaderExists))
var dbHeaders []core.Header var dbHeaders []core.Header
err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber) err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber)

View File

@ -8,6 +8,7 @@ import (
type MockHeaderRepository struct { type MockHeaderRepository struct {
createOrUpdateHeaderCallCount int createOrUpdateHeaderCallCount int
createOrUpdateHeaderErr error
createOrUpdateHeaderPassedBlockNumbers []int64 createOrUpdateHeaderPassedBlockNumbers []int64
createOrUpdateHeaderReturnID int64 createOrUpdateHeaderReturnID int64
missingBlockNumbers []int64 missingBlockNumbers []int64
@ -21,6 +22,10 @@ func (repository *MockHeaderRepository) SetCreateOrUpdateHeaderReturnID(id int64
repository.createOrUpdateHeaderReturnID = id repository.createOrUpdateHeaderReturnID = id
} }
func (repository *MockHeaderRepository) SetCreateOrUpdateHeaderReturnErr(err error) {
repository.createOrUpdateHeaderErr = err
}
func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []int64) { func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []int64) {
repository.missingBlockNumbers = blockNumbers repository.missingBlockNumbers = blockNumbers
} }
@ -28,7 +33,7 @@ func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []in
func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) { func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) {
repository.createOrUpdateHeaderCallCount++ repository.createOrUpdateHeaderCallCount++
repository.createOrUpdateHeaderPassedBlockNumbers = append(repository.createOrUpdateHeaderPassedBlockNumbers, header.BlockNumber) repository.createOrUpdateHeaderPassedBlockNumbers = append(repository.createOrUpdateHeaderPassedBlockNumbers, header.BlockNumber)
return repository.createOrUpdateHeaderReturnID, nil return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr
} }
func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {

View File

@ -9,6 +9,7 @@ import (
type MockTransformer struct { type MockTransformer struct {
passedHeader core.Header passedHeader core.Header
passedHeaderID int64 passedHeaderID int64
executeCalled bool
executeErr error executeErr error
} }
@ -16,6 +17,7 @@ func NewMockTransformer() *MockTransformer {
return &MockTransformer{ return &MockTransformer{
passedHeader: core.Header{}, passedHeader: core.Header{},
passedHeaderID: 0, passedHeaderID: 0,
executeCalled: false,
executeErr: nil, executeErr: nil,
} }
} }
@ -25,12 +27,18 @@ func (transformer *MockTransformer) SetExecuteErr(err error) {
} }
func (transformer *MockTransformer) Execute(header core.Header, headerID int64) error { func (transformer *MockTransformer) Execute(header core.Header, headerID int64) error {
transformer.executeCalled = true
transformer.passedHeader = header transformer.passedHeader = header
transformer.passedHeaderID = headerID transformer.passedHeaderID = headerID
return transformer.executeErr return transformer.executeErr
} }
func (transformer *MockTransformer) AssertExecuteCalledWith(header core.Header, headerID int64) { func (transformer *MockTransformer) AssertExecuteCalledWith(header core.Header, headerID int64) {
Expect(transformer.executeCalled).To(BeTrue())
Expect(header).To(Equal(transformer.passedHeader)) Expect(header).To(Equal(transformer.passedHeader))
Expect(headerID).To(Equal(transformer.passedHeaderID)) Expect(headerID).To(Equal(transformer.passedHeaderID))
} }
func (tranformer *MockTransformer) AssertExecuteNotCalled() {
Expect(tranformer.executeCalled).To(BeFalse())
}

View File

@ -10,19 +10,21 @@ type HeaderValidator struct {
blockChain core.BlockChain blockChain core.BlockChain
headerRepository datastore.HeaderRepository headerRepository datastore.HeaderRepository
windowSize int windowSize int
transformers []transformers.Transformer
} }
func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderRepository, windowSize int) HeaderValidator { func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderRepository, windowSize int, transformers []transformers.Transformer) HeaderValidator {
return HeaderValidator{ return HeaderValidator{
blockChain: blockChain, blockChain: blockChain,
headerRepository: repository, headerRepository: repository,
windowSize: windowSize, windowSize: windowSize,
transformers: transformers,
} }
} }
func (validator HeaderValidator) ValidateHeaders() ValidationWindow { func (validator HeaderValidator) ValidateHeaders() ValidationWindow {
window := MakeValidationWindow(validator.blockChain, validator.windowSize) window := MakeValidationWindow(validator.blockChain, validator.windowSize)
blockNumbers := MakeRange(window.LowerBound, window.UpperBound) blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers, []transformers.Transformer{}) RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers, validator.transformers)
return window return window
} }

View File

@ -2,8 +2,10 @@ package history_test
import ( import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/history" "github.com/vulcanize/vulcanizedb/pkg/history"
"github.com/vulcanize/vulcanizedb/pkg/transformers"
"math/big" "math/big"
) )
@ -13,10 +15,24 @@ var _ = Describe("Header validator", func() {
headerRepository.SetMissingBlockNumbers([]int64{}) headerRepository.SetMissingBlockNumbers([]int64{})
blockChain := fakes.NewMockBlockChain() blockChain := fakes.NewMockBlockChain()
blockChain.SetLastBlock(big.NewInt(3)) blockChain.SetLastBlock(big.NewInt(3))
validator := history.NewHeaderValidator(blockChain, headerRepository, 2) validator := history.NewHeaderValidator(blockChain, headerRepository, 2, []transformers.Transformer{})
validator.ValidateHeaders() validator.ValidateHeaders()
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3}) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3})
}) })
It("passes transformers for execution on new blocks", func() {
headerRepository := fakes.NewMockHeaderRepository()
headerRepository.SetMissingBlockNumbers([]int64{})
blockChain := fakes.NewMockBlockChain()
blockChain.SetLastBlock(big.NewInt(3))
transformer := fakes.NewMockTransformer()
validator := history.NewHeaderValidator(blockChain, headerRepository, 1, []transformers.Transformer{transformer})
validator.ValidateHeaders()
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(2, []int64{2, 3})
transformer.AssertExecuteCalledWith(core.Header{BlockNumber: 3}, 0)
})
}) })

View File

@ -5,6 +5,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/transformers" "github.com/vulcanize/vulcanizedb/pkg/transformers"
) )
@ -28,6 +29,9 @@ func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.
} }
id, err := headerRepository.CreateOrUpdateHeader(header) id, err := headerRepository.CreateOrUpdateHeader(header)
if err != nil { if err != nil {
if err == repositories.ErrValidHeaderExists {
continue
}
return 0, err return 0, err
} }
for _, transformer := range transformers { for _, transformer := range transformers {

View File

@ -7,6 +7,7 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/history" "github.com/vulcanize/vulcanizedb/pkg/history"
"github.com/vulcanize/vulcanizedb/pkg/transformers" "github.com/vulcanize/vulcanizedb/pkg/transformers"
@ -51,12 +52,26 @@ var _ = Describe("Populating headers", func() {
headerRepository.SetCreateOrUpdateHeaderReturnID(headerID) headerRepository.SetCreateOrUpdateHeaderReturnID(headerID)
transformer := fakes.NewMockTransformer() transformer := fakes.NewMockTransformer()
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) _, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
transformer.AssertExecuteCalledWith(core.Header{BlockNumber: blockNumber}, headerID) transformer.AssertExecuteCalledWith(core.Header{BlockNumber: blockNumber}, headerID)
}) })
It("does not execute transformer if repository indicates header already exists", func() {
blockNumber := int64(54321)
blockChain := fakes.NewMockBlockChain()
blockChain.SetLastBlock(big.NewInt(blockNumber))
headerRepository.SetMissingBlockNumbers([]int64{blockNumber})
headerRepository.SetCreateOrUpdateHeaderReturnErr(repositories.ErrValidHeaderExists)
transformer := fakes.NewMockTransformer()
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer})
Expect(err).NotTo(HaveOccurred())
transformer.AssertExecuteNotCalled()
})
It("returns error if executing transformer fails", func() { It("returns error if executing transformer fails", func() {
blockNumber := int64(54321) blockNumber := int64(54321)
blockChain := fakes.NewMockBlockChain() blockChain := fakes.NewMockBlockChain()
@ -67,7 +82,7 @@ var _ = Describe("Populating headers", func() {
transformer := fakes.NewMockTransformer() transformer := fakes.NewMockTransformer()
transformer.SetExecuteErr(fakes.FakeError) transformer.SetExecuteErr(fakes.FakeError)
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) _, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))