diff --git a/cmd/lightSync.go b/cmd/lightSync.go index bb191df9..dbdcf25f 100644 --- a/cmd/lightSync.go +++ b/cmd/lightSync.go @@ -97,7 +97,7 @@ func lightSync() { db := utils.LoadPostgres(databaseConfig, blockChain.Node()) headerRepository := repositories.NewHeaderRepository(&db) - validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow) + validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow, []transformers.Transformer{}) missingBlocksPopulated := make(chan int) go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) diff --git a/cmd/syncPriceFeeds.go b/cmd/syncPriceFeeds.go index fadade4a..ae16a262 100644 --- a/cmd/syncPriceFeeds.go +++ b/cmd/syncPriceFeeds.go @@ -89,14 +89,13 @@ func syncPriceFeeds() { db := utils.LoadPostgres(databaseConfig, blockChain.Node()) headerRepository := repositories.NewHeaderRepository(&db) - // TODO: add transformers to validation so we don't miss events on new block headers - validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow) missingBlocksPopulated := make(chan int) transformers := []transformers.Transformer{ pep.NewPepTransformer(blockChain, &db), pip.NewPipTransformer(blockChain, &db), rep.NewRepTransformer(blockChain, &db), } + validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow, transformers) go backFillPriceFeeds(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber, transformers) for { diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index 7aa4deac..981888ba 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -2,10 +2,13 @@ package repositories import ( "database/sql" + "errors" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +var ErrValidHeaderExists = errors.New("valid header already exists") + type HeaderRepository struct { database *postgres.DB } @@ -25,7 +28,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int if headerMustBeReplaced(hash, header) { return repository.replaceHeader(header) } - return 0, err + return 0, ErrValidHeaderExists } func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index e2bc17ab..c3ba4abb 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -13,7 +13,6 @@ import ( var _ = Describe("Block header repository", func() { Describe("creating or updating a header", func() { - It("adds a header", func() { node := core.Node{} db := test_config.NewTestDB(node) @@ -56,7 +55,7 @@ var _ = Describe("Block header repository", func() { Expect(ethNodeFingerprint).To(Equal(db.Node.ID)) }) - It("does not duplicate headers", func() { + It("returns valid header exists error if attempting duplicate headers", func() { node := core.Node{} db := test_config.NewTestDB(node) test_config.CleanTestDB(db) @@ -71,7 +70,8 @@ var _ = Describe("Block header repository", func() { Expect(err).NotTo(HaveOccurred()) _, err = repo.CreateOrUpdateHeader(header) - Expect(err).NotTo(HaveOccurred()) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(repositories.ErrValidHeaderExists)) var dbHeaders []core.Header err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber) diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index 9f188937..140df85e 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -8,6 +8,7 @@ import ( type MockHeaderRepository struct { createOrUpdateHeaderCallCount int + createOrUpdateHeaderErr error createOrUpdateHeaderPassedBlockNumbers []int64 createOrUpdateHeaderReturnID int64 missingBlockNumbers []int64 @@ -21,6 +22,10 @@ func (repository *MockHeaderRepository) SetCreateOrUpdateHeaderReturnID(id int64 repository.createOrUpdateHeaderReturnID = id } +func (repository *MockHeaderRepository) SetCreateOrUpdateHeaderReturnErr(err error) { + repository.createOrUpdateHeaderErr = err +} + func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []int64) { repository.missingBlockNumbers = blockNumbers } @@ -28,7 +33,7 @@ func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []in func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) { repository.createOrUpdateHeaderCallCount++ repository.createOrUpdateHeaderPassedBlockNumbers = append(repository.createOrUpdateHeaderPassedBlockNumbers, header.BlockNumber) - return repository.createOrUpdateHeaderReturnID, nil + return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr } func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { diff --git a/pkg/fakes/mock_transformer.go b/pkg/fakes/mock_transformer.go index 9c47df76..23a40e37 100644 --- a/pkg/fakes/mock_transformer.go +++ b/pkg/fakes/mock_transformer.go @@ -9,6 +9,7 @@ import ( type MockTransformer struct { passedHeader core.Header passedHeaderID int64 + executeCalled bool executeErr error } @@ -16,6 +17,7 @@ func NewMockTransformer() *MockTransformer { return &MockTransformer{ passedHeader: core.Header{}, passedHeaderID: 0, + executeCalled: false, executeErr: nil, } } @@ -25,12 +27,18 @@ func (transformer *MockTransformer) SetExecuteErr(err error) { } func (transformer *MockTransformer) Execute(header core.Header, headerID int64) error { + transformer.executeCalled = true transformer.passedHeader = header transformer.passedHeaderID = headerID return transformer.executeErr } func (transformer *MockTransformer) AssertExecuteCalledWith(header core.Header, headerID int64) { + Expect(transformer.executeCalled).To(BeTrue()) Expect(header).To(Equal(transformer.passedHeader)) Expect(headerID).To(Equal(transformer.passedHeaderID)) } + +func (tranformer *MockTransformer) AssertExecuteNotCalled() { + Expect(tranformer.executeCalled).To(BeFalse()) +} diff --git a/pkg/history/header_validator.go b/pkg/history/header_validator.go index 2db28603..add52dc9 100644 --- a/pkg/history/header_validator.go +++ b/pkg/history/header_validator.go @@ -10,19 +10,21 @@ type HeaderValidator struct { blockChain core.BlockChain headerRepository datastore.HeaderRepository windowSize int + transformers []transformers.Transformer } -func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderRepository, windowSize int) HeaderValidator { +func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderRepository, windowSize int, transformers []transformers.Transformer) HeaderValidator { return HeaderValidator{ blockChain: blockChain, headerRepository: repository, windowSize: windowSize, + transformers: transformers, } } func (validator HeaderValidator) ValidateHeaders() ValidationWindow { window := MakeValidationWindow(validator.blockChain, validator.windowSize) blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers, []transformers.Transformer{}) + RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers, validator.transformers) return window } diff --git a/pkg/history/header_validator_test.go b/pkg/history/header_validator_test.go index 379f6739..fdc8fb08 100644 --- a/pkg/history/header_validator_test.go +++ b/pkg/history/header_validator_test.go @@ -2,8 +2,10 @@ package history_test import ( . "github.com/onsi/ginkgo" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/pkg/transformers" "math/big" ) @@ -13,10 +15,24 @@ var _ = Describe("Header validator", func() { headerRepository.SetMissingBlockNumbers([]int64{}) blockChain := fakes.NewMockBlockChain() blockChain.SetLastBlock(big.NewInt(3)) - validator := history.NewHeaderValidator(blockChain, headerRepository, 2) + validator := history.NewHeaderValidator(blockChain, headerRepository, 2, []transformers.Transformer{}) validator.ValidateHeaders() headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3}) }) + + It("passes transformers for execution on new blocks", func() { + headerRepository := fakes.NewMockHeaderRepository() + headerRepository.SetMissingBlockNumbers([]int64{}) + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(3)) + transformer := fakes.NewMockTransformer() + validator := history.NewHeaderValidator(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) + + validator.ValidateHeaders() + + headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(2, []int64{2, 3}) + transformer.AssertExecuteCalledWith(core.Header{BlockNumber: 3}, 0) + }) }) diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 2e2fa9c8..183bbf4d 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -5,6 +5,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/transformers" ) @@ -28,6 +29,9 @@ func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore. } id, err := headerRepository.CreateOrUpdateHeader(header) if err != nil { + if err == repositories.ErrValidHeaderExists { + continue + } return 0, err } for _, transformer := range transformers { diff --git a/pkg/history/populate_headers_test.go b/pkg/history/populate_headers_test.go index 3a5466e7..72a0cec9 100644 --- a/pkg/history/populate_headers_test.go +++ b/pkg/history/populate_headers_test.go @@ -7,6 +7,7 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/history" "github.com/vulcanize/vulcanizedb/pkg/transformers" @@ -51,12 +52,26 @@ var _ = Describe("Populating headers", func() { headerRepository.SetCreateOrUpdateHeaderReturnID(headerID) transformer := fakes.NewMockTransformer() - _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer}) Expect(err).NotTo(HaveOccurred()) transformer.AssertExecuteCalledWith(core.Header{BlockNumber: blockNumber}, headerID) }) + It("does not execute transformer if repository indicates header already exists", func() { + blockNumber := int64(54321) + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(blockNumber)) + headerRepository.SetMissingBlockNumbers([]int64{blockNumber}) + headerRepository.SetCreateOrUpdateHeaderReturnErr(repositories.ErrValidHeaderExists) + transformer := fakes.NewMockTransformer() + + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer}) + + Expect(err).NotTo(HaveOccurred()) + transformer.AssertExecuteNotCalled() + }) + It("returns error if executing transformer fails", func() { blockNumber := int64(54321) blockChain := fakes.NewMockBlockChain() @@ -67,7 +82,7 @@ var _ = Describe("Populating headers", func() { transformer := fakes.NewMockTransformer() transformer.SetExecuteErr(fakes.FakeError) - _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, blockNumber, []transformers.Transformer{transformer}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError))