Merge branch 'staging' into VDB-125-aggregate-log-fetching
This commit is contained in:
commit
2292c94e99
@ -50,6 +50,18 @@ func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endi
|
||||
return numbers
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
||||
_, err := repository.GetHeader(blockNumber)
|
||||
if err != nil {
|
||||
if headerDoesNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func headerMustBeReplaced(hash string, header core.Header) bool {
|
||||
return hash != header.Hash
|
||||
}
|
||||
|
@ -19,27 +19,31 @@ var _ = Describe("Block header repository", func() {
|
||||
rawHeader []byte
|
||||
err error
|
||||
timestamp string
|
||||
node core.Node
|
||||
db *postgres.DB
|
||||
repo repositories.HeaderRepository
|
||||
header core.Header
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
rawHeader, err = json.Marshal(types.Header{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
timestamp = big.NewInt(123456789).String()
|
||||
|
||||
node = core.Node{ID: "Fingerprint"}
|
||||
db = test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo = repositories.NewHeaderRepository(db)
|
||||
header = core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
})
|
||||
|
||||
Describe("creating or updating a header", func() {
|
||||
It("adds a header", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
@ -53,16 +57,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("adds node data to header", func() {
|
||||
node := core.Node{ID: "EthNodeFingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
@ -77,17 +71,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("returns valid header exists error if attempting duplicate headers", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
@ -102,16 +85,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("replaces header if hash is different", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
headerTwo := core.Header{
|
||||
@ -132,16 +105,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("does not replace header if node fingerprint is different", func() {
|
||||
node := core.Node{ID: "Fingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
@ -164,15 +127,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("only replaces header with matching node fingerprint", func() {
|
||||
node := core.Node{ID: "Fingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
@ -208,16 +162,6 @@ var _ = Describe("Block header repository", func() {
|
||||
|
||||
Describe("Getting a header", func() {
|
||||
It("returns header if it exists", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
@ -230,16 +174,6 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("does not return header for a different node fingerprint", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash(rawHeader).Hex(),
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
||||
@ -256,25 +190,26 @@ var _ = Describe("Block header repository", func() {
|
||||
|
||||
Describe("Getting missing headers", func() {
|
||||
It("returns block numbers for headers not in the database", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 1,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 3,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 5,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID)
|
||||
|
||||
@ -282,25 +217,27 @@ var _ = Describe("Block header repository", func() {
|
||||
})
|
||||
|
||||
It("does not count headers created by a different node fingerprint", func() {
|
||||
node := core.Node{ID: "NodeFingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 1,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 3,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
repo.CreateOrUpdateHeader(core.Header{
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(core.Header{
|
||||
BlockNumber: 5,
|
||||
Raw: rawHeader,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
@ -311,4 +248,21 @@ var _ = Describe("Block header repository", func() {
|
||||
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("HeaderExists", func() {
|
||||
It("returns true if the header record exists", func() {
|
||||
_, err = repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
result, err := repo.HeaderExists(header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeTrue())
|
||||
})
|
||||
|
||||
It("returns false if the header record doesn't exist", func() {
|
||||
result, err := repo.HeaderExists(1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -41,6 +41,7 @@ type HeaderRepository interface {
|
||||
CreateOrUpdateHeader(header core.Header) (int64, error)
|
||||
GetHeader(blockNumber int64) (core.Header, error)
|
||||
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64
|
||||
HeaderExists(blockNumber int64) (bool, error)
|
||||
}
|
||||
|
||||
type LogRepository interface {
|
||||
|
@ -12,6 +12,7 @@ type MockHeaderRepository struct {
|
||||
createOrUpdateHeaderPassedBlockNumbers []int64
|
||||
createOrUpdateHeaderReturnID int64
|
||||
missingBlockNumbers []int64
|
||||
headerExists bool
|
||||
}
|
||||
|
||||
func NewMockHeaderRepository() *MockHeaderRepository {
|
||||
@ -44,6 +45,14 @@ func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber,
|
||||
return repository.missingBlockNumbers
|
||||
}
|
||||
|
||||
func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
||||
return repository.headerExists, nil
|
||||
}
|
||||
|
||||
func (repository *MockHeaderRepository) SetHeaderExists(headerExists bool) {
|
||||
repository.headerExists = headerExists
|
||||
}
|
||||
|
||||
func (repository *MockHeaderRepository) AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(times int, blockNumbers []int64) {
|
||||
Expect(repository.createOrUpdateHeaderCallCount).To(Equal(times))
|
||||
Expect(repository.createOrUpdateHeaderPassedBlockNumbers).To(Equal(blockNumbers))
|
||||
|
@ -10,6 +10,11 @@ import (
|
||||
func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int {
|
||||
lastBlock := blockchain.LastBlock().Int64()
|
||||
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||
|
||||
if len(blockRange) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
log.Printf("Backfilling %d blocks\n\n", len(blockRange))
|
||||
RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)
|
||||
return len(blockRange)
|
||||
|
@ -10,13 +10,20 @@ import (
|
||||
|
||||
func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
|
||||
lastBlock := blockchain.LastBlock().Int64()
|
||||
blockRange := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||
log.Printf("Backfilling %d blocks\n\n", len(blockRange))
|
||||
_, err := RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange)
|
||||
headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
} else if headerAlreadyExists {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
blockNumbers := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||
log.Printf("Backfilling %d blocks\n\n", len(blockNumbers))
|
||||
_, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(blockRange), nil
|
||||
return len(blockNumbers), nil
|
||||
}
|
||||
|
||||
func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64) (int, error) {
|
||||
|
@ -39,4 +39,14 @@ var _ = Describe("Populating headers", func() {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2})
|
||||
})
|
||||
|
||||
It("returns early if the db is already synced up to the head of the chain", func() {
|
||||
blockChain := fakes.NewMockBlockChain()
|
||||
blockChain.SetLastBlock(big.NewInt(2))
|
||||
headerRepository.SetHeaderExists(true)
|
||||
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(headersAdded).To(Equal(0))
|
||||
})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user