diff --git a/cmd/lightSync.go b/cmd/lightSync.go index 76f2976e..630f487f 100644 --- a/cmd/lightSync.go +++ b/cmd/lightSync.go @@ -81,9 +81,15 @@ func lightSync() { for { select { case <-ticker.C: - window := validator.ValidateHeaders() + window, err := validator.ValidateHeaders() + if err != nil { + log.Error("ValidateHeaders failed in lightSync: ", err) + } window.Log(os.Stdout) - case <-missingBlocksPopulated: + case n := <-missingBlocksPopulated: + if n == 0 { + time.Sleep(3 * time.Second) + } go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) } } diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index 207afc3b..bd9635ca 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -3,6 +3,7 @@ package repositories import ( "database/sql" "errors" + log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) @@ -38,16 +39,21 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er return header, err } -func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { +func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) { numbers := make([]int64, 0) - repository.database.Select(&numbers, `SELECT all_block_numbers + err := repository.database.Select(&numbers, `SELECT all_block_numbers FROM ( SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series WHERE all_block_numbers NOT IN ( SELECT block_number FROM headers WHERE eth_node_fingerprint = $3 ) `, startingBlockNumber, endingBlockNumber, nodeID) - return numbers + if err != nil { + log.Errorf("MissingBlockNumbers failed to get blocks between %v - %v for node %v", + startingBlockNumber, endingBlockNumber, nodeID) + return []int64{}, err + } + return numbers, nil } func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) { diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index 1b059a4b..9aa440df 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -19,7 +19,6 @@ var _ = Describe("Block header repository", func() { rawHeader []byte err error timestamp string - node core.Node db *postgres.DB repo repositories.HeaderRepository header core.Header @@ -30,8 +29,7 @@ var _ = Describe("Block header repository", func() { Expect(err).NotTo(HaveOccurred()) timestamp = big.NewInt(123456789).String() - node = core.Node{ID: "Fingerprint"} - db = test_config.NewTestDB(node) + db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) repo = repositories.NewHeaderRepository(db) header = core.Header{ @@ -44,7 +42,7 @@ var _ = Describe("Block header repository", func() { Describe("creating or updating a header", func() { It("adds a header", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) var dbHeader core.Header @@ -57,7 +55,7 @@ var _ = Describe("Block header repository", func() { }) It("adds node data to header", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) var ethNodeId int64 @@ -71,7 +69,7 @@ var _ = Describe("Block header repository", func() { }) It("returns valid header exists error if attempting duplicate headers", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) _, err = repo.CreateOrUpdateHeader(header) @@ -85,8 +83,9 @@ var _ = Describe("Block header repository", func() { }) It("replaces header if hash is different", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) + headerTwo := core.Header{ BlockNumber: header.BlockNumber, Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(), @@ -105,10 +104,12 @@ var _ = Describe("Block header repository", func() { }) It("does not replace header if node fingerprint is different", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) headerTwo := core.Header{ BlockNumber: header.BlockNumber, @@ -127,10 +128,13 @@ var _ = Describe("Block header repository", func() { }) It("only replaces header with matching node fingerprint", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) headerTwo := core.Header{ BlockNumber: header.BlockNumber, @@ -162,7 +166,7 @@ var _ = Describe("Block header repository", func() { Describe("Getting a header", func() { It("returns header if it exists", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) dbHeader, err := repo.GetHeader(header.BlockNumber) @@ -174,9 +178,10 @@ var _ = Describe("Block header repository", func() { }) It("does not return header for a different node fingerprint", func() { - _, err := repo.CreateOrUpdateHeader(header) + _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) - nodeTwo := core.Node{ID: "NodeFingerprintTwo"} + + nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) Expect(err).NotTo(HaveOccurred()) repoTwo := repositories.NewHeaderRepository(dbTwo) @@ -211,7 +216,8 @@ var _ = Describe("Block header repository", func() { }) Expect(err).NotTo(HaveOccurred()) - missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID) + missingBlockNumbers, err := repo.MissingBlockNumbers(1, 5, db.Node.ID) + Expect(err).NotTo(HaveOccurred()) Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4})) }) @@ -238,12 +244,13 @@ var _ = Describe("Block header repository", func() { }) Expect(err).NotTo(HaveOccurred()) - nodeTwo := core.Node{ID: "NodeFingerprintTwo"} + nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) Expect(err).NotTo(HaveOccurred()) repoTwo := repositories.NewHeaderRepository(dbTwo) - missingBlockNumbers := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID) + missingBlockNumbers, err := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID) + Expect(err).NotTo(HaveOccurred()) Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5})) }) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 5cf0f667..277b8184 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -40,7 +40,7 @@ type FilterRepository interface { type HeaderRepository interface { CreateOrUpdateHeader(header core.Header) (int64, error) GetHeader(blockNumber int64) (core.Header, error) - MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 + MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) HeaderExists(blockNumber int64) (bool, error) } diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index ee5bab6f..97fad441 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -41,8 +41,8 @@ func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { return core.Header{BlockNumber: blockNumber}, nil } -func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { - return repository.missingBlockNumbers +func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) { + return repository.missingBlockNumbers, nil } func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) { diff --git a/pkg/history/header_validator.go b/pkg/history/header_validator.go index b733aaee..5d10eab0 100644 --- a/pkg/history/header_validator.go +++ b/pkg/history/header_validator.go @@ -1,6 +1,7 @@ package history import ( + log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" ) @@ -19,9 +20,13 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR } } -func (validator HeaderValidator) ValidateHeaders() ValidationWindow { +func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) { window := MakeValidationWindow(validator.blockChain, validator.windowSize) blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) - return window + _, err := RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) + if err != nil { + log.Error("Error in ValidateHeaders: ", err) + return ValidationWindow{}, err + } + return window, nil } diff --git a/pkg/history/header_validator_test.go b/pkg/history/header_validator_test.go index 379f6739..937c04ef 100644 --- a/pkg/history/header_validator_test.go +++ b/pkg/history/header_validator_test.go @@ -1,22 +1,43 @@ package history_test import ( + "errors" . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/history" "math/big" ) var _ = Describe("Header validator", func() { + var ( + headerRepository *fakes.MockHeaderRepository + blockChain *fakes.MockBlockChain + ) + + BeforeEach(func() { + headerRepository = fakes.NewMockHeaderRepository() + blockChain = fakes.NewMockBlockChain() + }) + It("attempts to create every header in the validation window", func() { - headerRepository := fakes.NewMockHeaderRepository() headerRepository.SetMissingBlockNumbers([]int64{}) - blockChain := fakes.NewMockBlockChain() blockChain.SetLastBlock(big.NewInt(3)) validator := history.NewHeaderValidator(blockChain, headerRepository, 2) - validator.ValidateHeaders() + _, err := validator.ValidateHeaders() + Expect(err).NotTo(HaveOccurred()) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3}) }) + + It("propagates header repository errors", func() { + blockChain.SetLastBlock(big.NewInt(3)) + headerRepositoryError := errors.New("CreateOrUpdate") + headerRepository.SetCreateOrUpdateHeaderReturnErr(headerRepositoryError) + validator := history.NewHeaderValidator(blockChain, headerRepository, 2) + + _, err := validator.ValidateHeaders() + Expect(err).To(MatchError(headerRepositoryError)) + }) }) diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 2a3be3a1..77043921 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -11,13 +11,20 @@ import ( func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) { lastBlock := blockchain.LastBlock().Int64() headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock) + if err != nil { + log.Error("Error in checking header in PopulateMissingHeaders: ", err) return 0, err } else if headerAlreadyExists { return 0, nil } - blockNumbers := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + if err != nil { + log.Error("Error getting missing block numbers in PopulateMissingHeaders: ", err) + return 0, err + } + log.Printf("Backfilling %d blocks\n\n", len(blockNumbers)) _, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers) if err != nil { diff --git a/test_config/test_config.go b/test_config/test_config.go index 335b6149..2d31916b 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -111,6 +111,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM watched_contracts") } +// Returns a new test node, with the same ID func NewTestNode() core.Node { return core.Node{ GenesisBlock: "GENESIS",