diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index c7948337..ccc99198 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -51,6 +51,18 @@ func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endi return numbers } +func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) { + _, err := repository.GetHeader(blockNumber) + if err != nil { + if headerDoesNotExist(err) { + return false, nil + } + return false, err + } + + return true, nil +} + func headerMustBeReplaced(hash string, header core.Header) bool { return hash != header.Hash } diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index c85b8a34..1b059a4b 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -19,27 +19,31 @@ var _ = Describe("Block header repository", func() { rawHeader []byte err error timestamp string + node core.Node + db *postgres.DB + repo repositories.HeaderRepository + header core.Header ) BeforeEach(func() { rawHeader, err = json.Marshal(types.Header{}) Expect(err).NotTo(HaveOccurred()) timestamp = big.NewInt(123456789).String() + + node = core.Node{ID: "Fingerprint"} + db = test_config.NewTestDB(node) + test_config.CleanTestDB(db) + repo = repositories.NewHeaderRepository(db) + header = core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: rawHeader, + Timestamp: timestamp, + } }) Describe("creating or updating a header", func() { It("adds a header", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } - _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) @@ -53,16 +57,6 @@ var _ = Describe("Block header repository", func() { }) It("adds node data to header", func() { - node := core.Node{ID: "EthNodeFingerprint"} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Raw: rawHeader, - Timestamp: timestamp, - } - _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) @@ -77,17 +71,6 @@ var _ = Describe("Block header repository", func() { }) It("returns valid header exists error if attempting duplicate headers", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } - _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) @@ -102,16 +85,6 @@ var _ = Describe("Block header repository", func() { }) It("replaces header if hash is different", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) headerTwo := core.Header{ @@ -132,16 +105,6 @@ var _ = Describe("Block header repository", func() { }) It("does not replace header if node fingerprint is different", func() { - node := core.Node{ID: "Fingerprint"} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } _, err := repo.CreateOrUpdateHeader(header) nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) @@ -164,15 +127,6 @@ var _ = Describe("Block header repository", func() { }) It("only replaces header with matching node fingerprint", func() { - node := core.Node{ID: "Fingerprint"} - db := test_config.NewTestDB(node) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } _, err := repo.CreateOrUpdateHeader(header) nodeTwo := core.Node{ID: "FingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) @@ -208,16 +162,6 @@ var _ = Describe("Block header repository", func() { Describe("Getting a header", func() { It("returns header if it exists", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) @@ -230,16 +174,6 @@ var _ = Describe("Block header repository", func() { }) It("does not return header for a different node fingerprint", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - header := core.Header{ - BlockNumber: 100, - Hash: common.BytesToHash(rawHeader).Hex(), - Raw: rawHeader, - Timestamp: timestamp, - } _, err := repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred()) nodeTwo := core.Node{ID: "NodeFingerprintTwo"} @@ -256,25 +190,26 @@ var _ = Describe("Block header repository", func() { Describe("Getting missing headers", func() { It("returns block numbers for headers not in the database", func() { - node := core.Node{} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - repo.CreateOrUpdateHeader(core.Header{ + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 1, Raw: rawHeader, Timestamp: timestamp, }) - repo.CreateOrUpdateHeader(core.Header{ + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 3, Raw: rawHeader, Timestamp: timestamp, }) - repo.CreateOrUpdateHeader(core.Header{ + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 5, Raw: rawHeader, Timestamp: timestamp, }) + Expect(err).NotTo(HaveOccurred()) missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID) @@ -282,25 +217,27 @@ var _ = Describe("Block header repository", func() { }) It("does not count headers created by a different node fingerprint", func() { - node := core.Node{ID: "NodeFingerprint"} - db := test_config.NewTestDB(node) - test_config.CleanTestDB(db) - repo := repositories.NewHeaderRepository(db) - repo.CreateOrUpdateHeader(core.Header{ + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 1, Raw: rawHeader, Timestamp: timestamp, }) - repo.CreateOrUpdateHeader(core.Header{ + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 3, Raw: rawHeader, Timestamp: timestamp, }) - repo.CreateOrUpdateHeader(core.Header{ + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.CreateOrUpdateHeader(core.Header{ BlockNumber: 5, Raw: rawHeader, Timestamp: timestamp, }) + Expect(err).NotTo(HaveOccurred()) + nodeTwo := core.Node{ID: "NodeFingerprintTwo"} dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) Expect(err).NotTo(HaveOccurred()) @@ -311,4 +248,21 @@ var _ = Describe("Block header repository", func() { Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5})) }) }) + + Describe("HeaderExists", func() { + It("returns true if the header record exists", func() { + _, err = repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + + result, err := repo.HeaderExists(header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeTrue()) + }) + + It("returns false if the header record doesn't exist", func() { + result, err := repo.HeaderExists(1) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeFalse()) + }) + }) }) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index fd8b59cd..5cf0f667 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -41,6 +41,7 @@ type HeaderRepository interface { CreateOrUpdateHeader(header core.Header) (int64, error) GetHeader(blockNumber int64) (core.Header, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 + HeaderExists(blockNumber int64) (bool, error) } type LogRepository interface { diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index 140df85e..ee5bab6f 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -12,6 +12,7 @@ type MockHeaderRepository struct { createOrUpdateHeaderPassedBlockNumbers []int64 createOrUpdateHeaderReturnID int64 missingBlockNumbers []int64 + headerExists bool } func NewMockHeaderRepository() *MockHeaderRepository { @@ -44,6 +45,14 @@ func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, return repository.missingBlockNumbers } +func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) { + return repository.headerExists, nil +} + +func (repository *MockHeaderRepository) SetHeaderExists(headerExists bool) { + repository.headerExists = headerExists +} + func (repository *MockHeaderRepository) AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(times int, blockNumbers []int64) { Expect(repository.createOrUpdateHeaderCallCount).To(Equal(times)) Expect(repository.createOrUpdateHeaderPassedBlockNumbers).To(Equal(blockNumbers)) diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index 10526789..884e5521 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -10,6 +10,11 @@ import ( func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { lastBlock := blockchain.LastBlock().Int64() blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + + if len(blockRange) == 0 { + return 0 + } + log.Printf("Backfilling %d blocks\n\n", len(blockRange)) RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) return len(blockRange) diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 01584956..2a3be3a1 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -10,13 +10,20 @@ import ( func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) { lastBlock := blockchain.LastBlock().Int64() - blockRange := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) - log.Printf("Backfilling %d blocks\n\n", len(blockRange)) - _, err := RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange) + headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock) + if err != nil { + return 0, err + } else if headerAlreadyExists { + return 0, nil + } + + blockNumbers := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + log.Printf("Backfilling %d blocks\n\n", len(blockNumbers)) + _, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers) if err != nil { return 0, err } - return len(blockRange), nil + return len(blockNumbers), nil } func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64) (int, error) { diff --git a/pkg/history/populate_headers_test.go b/pkg/history/populate_headers_test.go index 009a620c..b2bd569b 100644 --- a/pkg/history/populate_headers_test.go +++ b/pkg/history/populate_headers_test.go @@ -39,4 +39,14 @@ var _ = Describe("Populating headers", func() { Expect(err).NotTo(HaveOccurred()) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2}) }) + + It("returns early if the db is already synced up to the head of the chain", func() { + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(2)) + headerRepository.SetHeaderExists(true) + headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2) + + Expect(err).NotTo(HaveOccurred()) + Expect(headersAdded).To(Equal(0)) + }) })