VDB-302 Sleep when no missing blocks in lightSync (#129)

* Sleep when no missing blocks in lightSync

* Fix tests and error propagation

* Correct geth.log to logrus
This commit is contained in:
Edvard Hübinette 2019-01-11 10:58:41 +01:00 committed by GitHub
parent 77209d3b62
commit ebca338b1e
9 changed files with 83 additions and 30 deletions

View File

@ -81,9 +81,15 @@ func lightSync() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
window := validator.ValidateHeaders() window, err := validator.ValidateHeaders()
if err != nil {
log.Error("ValidateHeaders failed in lightSync: ", err)
}
window.Log(os.Stdout) window.Log(os.Stdout)
case <-missingBlocksPopulated: case n := <-missingBlocksPopulated:
if n == 0 {
time.Sleep(3 * time.Second)
}
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)
} }
} }

View File

@ -3,6 +3,7 @@ package repositories
import ( import (
"database/sql" "database/sql"
"errors" "errors"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
@ -38,16 +39,21 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er
return header, err return header, err
} }
func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) {
numbers := make([]int64, 0) numbers := make([]int64, 0)
repository.database.Select(&numbers, `SELECT all_block_numbers err := repository.database.Select(&numbers, `SELECT all_block_numbers
FROM ( FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
WHERE all_block_numbers NOT IN ( WHERE all_block_numbers NOT IN (
SELECT block_number FROM headers WHERE eth_node_fingerprint = $3 SELECT block_number FROM headers WHERE eth_node_fingerprint = $3
) `, ) `,
startingBlockNumber, endingBlockNumber, nodeID) startingBlockNumber, endingBlockNumber, nodeID)
return numbers if err != nil {
log.Errorf("MissingBlockNumbers failed to get blocks between %v - %v for node %v",
startingBlockNumber, endingBlockNumber, nodeID)
return []int64{}, err
}
return numbers, nil
} }
func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) { func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {

View File

@ -19,7 +19,6 @@ var _ = Describe("Block header repository", func() {
rawHeader []byte rawHeader []byte
err error err error
timestamp string timestamp string
node core.Node
db *postgres.DB db *postgres.DB
repo repositories.HeaderRepository repo repositories.HeaderRepository
header core.Header header core.Header
@ -30,8 +29,7 @@ var _ = Describe("Block header repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
timestamp = big.NewInt(123456789).String() timestamp = big.NewInt(123456789).String()
node = core.Node{ID: "Fingerprint"} db = test_config.NewTestDB(test_config.NewTestNode())
db = test_config.NewTestDB(node)
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
repo = repositories.NewHeaderRepository(db) repo = repositories.NewHeaderRepository(db)
header = core.Header{ header = core.Header{
@ -44,7 +42,7 @@ var _ = Describe("Block header repository", func() {
Describe("creating or updating a header", func() { Describe("creating or updating a header", func() {
It("adds a header", func() { It("adds a header", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var dbHeader core.Header var dbHeader core.Header
@ -57,7 +55,7 @@ var _ = Describe("Block header repository", func() {
}) })
It("adds node data to header", func() { It("adds node data to header", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var ethNodeId int64 var ethNodeId int64
@ -71,7 +69,7 @@ var _ = Describe("Block header repository", func() {
}) })
It("returns valid header exists error if attempting duplicate headers", func() { It("returns valid header exists error if attempting duplicate headers", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
@ -85,8 +83,9 @@ var _ = Describe("Block header repository", func() {
}) })
It("replaces header if hash is different", func() { It("replaces header if hash is different", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
headerTwo := core.Header{ headerTwo := core.Header{
BlockNumber: header.BlockNumber, BlockNumber: header.BlockNumber,
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(), Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
@ -105,10 +104,12 @@ var _ = Describe("Block header repository", func() {
}) })
It("does not replace header if node fingerprint is different", func() { It("does not replace header if node fingerprint is different", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
nodeTwo := core.Node{ID: "FingerprintTwo"} nodeTwo := core.Node{ID: "FingerprintTwo"}
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
repoTwo := repositories.NewHeaderRepository(dbTwo) repoTwo := repositories.NewHeaderRepository(dbTwo)
headerTwo := core.Header{ headerTwo := core.Header{
BlockNumber: header.BlockNumber, BlockNumber: header.BlockNumber,
@ -127,10 +128,13 @@ var _ = Describe("Block header repository", func() {
}) })
It("only replaces header with matching node fingerprint", func() { It("only replaces header with matching node fingerprint", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
nodeTwo := core.Node{ID: "FingerprintTwo"} nodeTwo := core.Node{ID: "FingerprintTwo"}
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
repoTwo := repositories.NewHeaderRepository(dbTwo) repoTwo := repositories.NewHeaderRepository(dbTwo)
headerTwo := core.Header{ headerTwo := core.Header{
BlockNumber: header.BlockNumber, BlockNumber: header.BlockNumber,
@ -162,7 +166,7 @@ var _ = Describe("Block header repository", func() {
Describe("Getting a header", func() { Describe("Getting a header", func() {
It("returns header if it exists", func() { It("returns header if it exists", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
dbHeader, err := repo.GetHeader(header.BlockNumber) dbHeader, err := repo.GetHeader(header.BlockNumber)
@ -174,9 +178,10 @@ var _ = Describe("Block header repository", func() {
}) })
It("does not return header for a different node fingerprint", func() { It("does not return header for a different node fingerprint", func() {
_, err := repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
nodeTwo := core.Node{ID: "FingerprintTwo"}
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
repoTwo := repositories.NewHeaderRepository(dbTwo) repoTwo := repositories.NewHeaderRepository(dbTwo)
@ -211,7 +216,8 @@ var _ = Describe("Block header repository", func() {
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID) missingBlockNumbers, err := repo.MissingBlockNumbers(1, 5, db.Node.ID)
Expect(err).NotTo(HaveOccurred())
Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4})) Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4}))
}) })
@ -238,12 +244,13 @@ var _ = Describe("Block header repository", func() {
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
nodeTwo := core.Node{ID: "NodeFingerprintTwo"} nodeTwo := core.Node{ID: "FingerprintTwo"}
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
repoTwo := repositories.NewHeaderRepository(dbTwo) repoTwo := repositories.NewHeaderRepository(dbTwo)
missingBlockNumbers := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID) missingBlockNumbers, err := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID)
Expect(err).NotTo(HaveOccurred())
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5})) Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
}) })

View File

@ -40,7 +40,7 @@ type FilterRepository interface {
type HeaderRepository interface { type HeaderRepository interface {
CreateOrUpdateHeader(header core.Header) (int64, error) CreateOrUpdateHeader(header core.Header) (int64, error)
GetHeader(blockNumber int64) (core.Header, error) GetHeader(blockNumber int64) (core.Header, error)
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error)
HeaderExists(blockNumber int64) (bool, error) HeaderExists(blockNumber int64) (bool, error)
} }

View File

@ -41,8 +41,8 @@ func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
return core.Header{BlockNumber: blockNumber}, nil return core.Header{BlockNumber: blockNumber}, nil
} }
func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) {
return repository.missingBlockNumbers return repository.missingBlockNumbers, nil
} }
func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) { func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) {

View File

@ -1,6 +1,7 @@
package history package history
import ( import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
@ -19,9 +20,13 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR
} }
} }
func (validator HeaderValidator) ValidateHeaders() ValidationWindow { func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) {
window := MakeValidationWindow(validator.blockChain, validator.windowSize) window := MakeValidationWindow(validator.blockChain, validator.windowSize)
blockNumbers := MakeRange(window.LowerBound, window.UpperBound) blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) _, err := RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
return window if err != nil {
log.Error("Error in ValidateHeaders: ", err)
return ValidationWindow{}, err
}
return window, nil
} }

View File

@ -1,22 +1,43 @@
package history_test package history_test
import ( import (
"errors"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/history" "github.com/vulcanize/vulcanizedb/pkg/history"
"math/big" "math/big"
) )
var _ = Describe("Header validator", func() { var _ = Describe("Header validator", func() {
var (
headerRepository *fakes.MockHeaderRepository
blockChain *fakes.MockBlockChain
)
BeforeEach(func() {
headerRepository = fakes.NewMockHeaderRepository()
blockChain = fakes.NewMockBlockChain()
})
It("attempts to create every header in the validation window", func() { It("attempts to create every header in the validation window", func() {
headerRepository := fakes.NewMockHeaderRepository()
headerRepository.SetMissingBlockNumbers([]int64{}) headerRepository.SetMissingBlockNumbers([]int64{})
blockChain := fakes.NewMockBlockChain()
blockChain.SetLastBlock(big.NewInt(3)) blockChain.SetLastBlock(big.NewInt(3))
validator := history.NewHeaderValidator(blockChain, headerRepository, 2) validator := history.NewHeaderValidator(blockChain, headerRepository, 2)
validator.ValidateHeaders() _, err := validator.ValidateHeaders()
Expect(err).NotTo(HaveOccurred())
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3}) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3})
}) })
It("propagates header repository errors", func() {
blockChain.SetLastBlock(big.NewInt(3))
headerRepositoryError := errors.New("CreateOrUpdate")
headerRepository.SetCreateOrUpdateHeaderReturnErr(headerRepositoryError)
validator := history.NewHeaderValidator(blockChain, headerRepository, 2)
_, err := validator.ValidateHeaders()
Expect(err).To(MatchError(headerRepositoryError))
})
}) })

View File

@ -11,13 +11,20 @@ import (
func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) { func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
lastBlock := blockchain.LastBlock().Int64() lastBlock := blockchain.LastBlock().Int64()
headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock) headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock)
if err != nil { if err != nil {
log.Error("Error in checking header in PopulateMissingHeaders: ", err)
return 0, err return 0, err
} else if headerAlreadyExists { } else if headerAlreadyExists {
return 0, nil return 0, nil
} }
blockNumbers := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
if err != nil {
log.Error("Error getting missing block numbers in PopulateMissingHeaders: ", err)
return 0, err
}
log.Printf("Backfilling %d blocks\n\n", len(blockNumbers)) log.Printf("Backfilling %d blocks\n\n", len(blockNumbers))
_, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers) _, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers)
if err != nil { if err != nil {

View File

@ -111,6 +111,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_contracts")
} }
// Returns a new test node, with the same ID
func NewTestNode() core.Node { func NewTestNode() core.Node {
return core.Node{ return core.Node{
GenesisBlock: "GENESIS", GenesisBlock: "GENESIS",