Improve I/O error propagation

This commit is contained in:
Edvard 2019-02-14 16:03:57 +01:00
parent f1d6e417fe
commit 6cd4e5ea95
18 changed files with 180 additions and 98 deletions

View File

@ -62,7 +62,9 @@ func init() {
func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber) populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber)
if err != nil { if err != nil {
log.Fatal("Error populating headers: ", err) // TODO Lots of possible errors in the call stack above. If errors occur, we still put
// 0 in the channel, triggering another round
log.Error("backfillAllHeaders: Error populating headers: ", err)
} }
missingBlocksPopulated <- populated missingBlocksPopulated <- populated
} }
@ -84,7 +86,7 @@ func lightSync() {
case <-ticker.C: case <-ticker.C:
window, err := validator.ValidateHeaders() window, err := validator.ValidateHeaders()
if err != nil { if err != nil {
log.Error("ValidateHeaders failed in lightSync: ", err) log.Error("lightSync: ValidateHeaders failed: ", err)
} }
log.Info(window.GetString()) log.Info(window.GetString())
case n := <-missingBlocksPopulated: case n := <-missingBlocksPopulated:
@ -97,11 +99,14 @@ func lightSync() {
} }
func validateArgs(blockChain *geth.BlockChain) { func validateArgs(blockChain *geth.BlockChain) {
lastBlock := blockChain.LastBlock().Int64() lastBlock, err := blockChain.LastBlock()
if lastBlock == 0 { if err != nil {
log.Error("validateArgs: Error getting last block: ", err)
}
if lastBlock.Int64() == 0 {
log.Fatal("geth initial: state sync not finished") log.Fatal("geth initial: state sync not finished")
} }
if startingBlockNumber > lastBlock { if startingBlockNumber > lastBlock.Int64() {
log.Fatal("starting block number > current block number") log.Fatal("starting block number > current block number")
} }
} }

View File

@ -60,7 +60,11 @@ func init() {
} }
func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
if err != nil {
log.Error("backfillAllBlocks: error in populateMissingBlocks: ", err)
}
missingBlocksPopulated <- populated
} }
func sync() { func sync() {
@ -68,12 +72,15 @@ func sync() {
defer ticker.Stop() defer ticker.Stop()
blockChain := getBlockChain() blockChain := getBlockChain()
lastBlock := blockChain.LastBlock().Int64() lastBlock, err := blockChain.LastBlock()
if lastBlock == 0 { if err != nil {
log.Error("sync: Error getting last block: ", err)
}
if lastBlock.Int64() == 0 {
log.Fatal("geth initial: state sync not finished") log.Fatal("geth initial: state sync not finished")
} }
if startingBlockNumber > lastBlock { if startingBlockNumber > lastBlock.Int64() {
log.Fatal("starting block number > current block number") log.Fatal("sync: starting block number > current block number")
} }
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
@ -85,7 +92,10 @@ func sync() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
window := validator.ValidateBlocks() window, err := validator.ValidateBlocks()
if err != nil {
log.Error("sync: error in validateBlocks: ", err)
}
log.Info(window.GetString()) log.Info(window.GetString())
case <-missingBlocksPopulated: case <-missingBlocksPopulated:
go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber) go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber)

View File

@ -30,7 +30,7 @@ type BlockChain interface {
GetHeaderByNumbers(blockNumbers []int64) ([]Header, error) GetHeaderByNumbers(blockNumbers []int64) ([]Header, error)
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error)
LastBlock() *big.Int LastBlock() (*big.Int, error)
Node() Node Node() Node
} }

View File

@ -18,6 +18,7 @@ package ethereum
import ( import (
"fmt" "fmt"
"github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -37,6 +38,7 @@ func CreateDatabase(config DatabaseConfig) (Database, error) {
case Level: case Level:
levelDBConnection, err := ethdb.NewLDBDatabase(config.Path, 128, 1024) levelDBConnection, err := ethdb.NewLDBDatabase(config.Path, 128, 1024)
if err != nil { if err != nil {
logrus.Error("CreateDatabase: error connecting to new LDBD: ", err)
return nil, err return nil, err
} }
levelDBReader := level.NewLevelDatabaseReader(levelDBConnection) levelDBReader := level.NewLevelDatabaseReader(levelDBConnection)

View File

@ -43,12 +43,16 @@ func NewBlockRepository(database *postgres.DB) *BlockRepository {
return &BlockRepository{database: database} return &BlockRepository{database: database}
} }
func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) { func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) error {
cutoff := chainHead - blocksFromHeadBeforeFinal cutoff := chainHead - blocksFromHeadBeforeFinal
blockRepository.database.Exec(` _, err := blockRepository.database.Exec(`
UPDATE blocks SET is_final = TRUE UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND number < $1`, WHERE is_final = FALSE AND number < $1`,
cutoff) cutoff)
if err != nil {
return err
}
return nil
} }
func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) { func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) {
@ -70,7 +74,7 @@ func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (in
func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 { func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 {
numbers := make([]int64, 0) numbers := make([]int64, 0)
blockRepository.database.Select(&numbers, err := blockRepository.database.Select(&numbers,
`SELECT all_block_numbers `SELECT all_block_numbers
FROM ( FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
@ -79,6 +83,9 @@ func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber i
) `, ) `,
startingBlockNumber, startingBlockNumber,
highestBlockNumber, nodeId) highestBlockNumber, nodeId)
if err != nil {
log.Error("MissingBlockNumbers: error getting blocks: ", err)
}
return numbers return numbers
} }
@ -108,6 +115,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block,
case sql.ErrNoRows: case sql.ErrNoRows:
return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber) return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber)
default: default:
log.Error("GetBlock: error loading blocks: ", err)
return savedBlock, err return savedBlock, err
} }
} }
@ -202,6 +210,7 @@ func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64,
RETURNING id`, RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil { if err != nil {
log.Error("createReceipt: error inserting receipt: ", err)
return receiptId, err return receiptId, err
} }
return receiptId, nil return receiptId, nil
@ -256,6 +265,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
var block b var block b
err := blockRows.StructScan(&block) err := blockRows.StructScan(&block)
if err != nil { if err != nil {
log.Error("loadBlock: error loading block: ", err)
return core.Block{}, err return core.Block{}, err
} }
transactionRows, err := blockRepository.database.Queryx(` transactionRows, err := blockRepository.database.Queryx(`
@ -271,6 +281,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
WHERE block_id = $1 WHERE block_id = $1
ORDER BY hash`, block.ID) ORDER BY hash`, block.ID)
if err != nil { if err != nil {
log.Error("loadBlock: error fetting transactions: ", err)
return core.Block{}, err return core.Block{}, err
} }
block.Transactions = blockRepository.LoadTransactions(transactionRows) block.Transactions = blockRepository.LoadTransactions(transactionRows)

View File

@ -47,14 +47,17 @@ func (contractRepository ContractRepository) CreateContract(contract core.Contra
return nil return nil
} }
func (contractRepository ContractRepository) ContractExists(contractHash string) bool { func (contractRepository ContractRepository) ContractExists(contractHash string) (bool, error) {
var exists bool var exists bool
contractRepository.DB.QueryRow( err := contractRepository.DB.QueryRow(
`SELECT exists( `SELECT exists(
SELECT 1 SELECT 1
FROM watched_contracts FROM watched_contracts
WHERE contract_hash = $1)`, contractHash).Scan(&exists) WHERE contract_hash = $1)`, contractHash).Scan(&exists)
return exists if err != nil {
return false, err
}
return exists, nil
} }
func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) { func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) {
@ -66,12 +69,15 @@ func (contractRepository ContractRepository) GetContract(contractHash string) (c
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return core.Contract{}, datastore.ErrContractDoesNotExist(contractHash) return core.Contract{}, datastore.ErrContractDoesNotExist(contractHash)
} }
savedContract := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi}) savedContract, err := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi})
if err != nil {
return core.Contract{}, err
}
return savedContract, nil return savedContract, nil
} }
func (contractRepository ContractRepository) addTransactions(contract core.Contract) core.Contract { func (contractRepository ContractRepository) addTransactions(contract core.Contract) (core.Contract, error) {
transactionRows, _ := contractRepository.DB.Queryx(` transactionRows, err := contractRepository.DB.Queryx(`
SELECT hash, SELECT hash,
nonce, nonce,
tx_to, tx_to,
@ -83,8 +89,11 @@ func (contractRepository ContractRepository) addTransactions(contract core.Contr
FROM transactions FROM transactions
WHERE tx_to = $1 WHERE tx_to = $1
ORDER BY block_id DESC`, contract.Hash) ORDER BY block_id DESC`, contract.Hash)
if err != nil {
return core.Contract{}, err
}
blockRepository := &BlockRepository{contractRepository.DB} blockRepository := &BlockRepository{contractRepository.DB}
transactions := blockRepository.LoadTransactions(transactionRows) transactions := blockRepository.LoadTransactions(transactionRows)
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi} savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
return savedContract return savedContract, nil
} }

View File

@ -42,6 +42,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
if headerDoesNotExist(err) { if headerDoesNotExist(err) {
return repository.insertHeader(header) return repository.insertHeader(header)
} }
log.Error("CreateOrUpdateHeader: error getting header hash: ", err)
return 0, err return 0, err
} }
if headerMustBeReplaced(hash, header) { if headerMustBeReplaced(hash, header) {
@ -54,6 +55,7 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er
var header core.Header var header core.Header
err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
blockNumber, repository.database.Node.ID) blockNumber, repository.database.Node.ID)
log.Error("GetHeader: error getting headers: ", err)
return header, err return header, err
} }
@ -74,18 +76,6 @@ func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endi
return numbers, nil return numbers, nil
} }
func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
_, err := repository.GetHeader(blockNumber)
if err != nil {
if headerDoesNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}
func headerMustBeReplaced(hash string, header core.Header) bool { func headerMustBeReplaced(hash string, header core.Header) bool {
return hash != header.Hash return hash != header.Hash
} }
@ -98,6 +88,7 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er
var hash string var hash string
err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
header.BlockNumber, repository.database.Node.ID) header.BlockNumber, repository.database.Node.ID)
log.Error("getHeaderHash: error getting headers: ", err)
return hash, err return hash, err
} }
@ -106,6 +97,9 @@ func (repository HeaderRepository) insertHeader(header core.Header) (int64, erro
err := repository.database.QueryRowx( err := repository.database.QueryRowx(
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`, `INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`,
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId) header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId)
if err != nil {
log.Error("insertHeader: error inserting header: ", err)
}
return headerId, err return headerId, err
} }
@ -113,6 +107,7 @@ func (repository HeaderRepository) replaceHeader(header core.Header) (int64, err
_, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, _, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
header.BlockNumber, repository.database.Node.ID) header.BlockNumber, repository.database.Node.ID)
if err != nil { if err != nil {
log.Error("replaceHeader: error deleting headers: ", err)
return 0, err return 0, err
} }
return repository.insertHeader(header) return repository.insertHeader(header)

View File

@ -18,6 +18,7 @@ package repositories
import ( import (
"context" "context"
"github.com/sirupsen/logrus"
"database/sql" "database/sql"
@ -43,12 +44,16 @@ func (logRepository LogRepository) CreateLogs(lgs []core.Log, receiptId int64) e
return postgres.ErrDBInsertFailed return postgres.ErrDBInsertFailed
} }
} }
tx.Commit() err := tx.Commit()
if err != nil {
tx.Rollback()
return postgres.ErrDBInsertFailed
}
return nil return nil
} }
func (logRepository LogRepository) GetLogs(address string, blockNumber int64) []core.Log { func (logRepository LogRepository) GetLogs(address string, blockNumber int64) ([]core.Log, error) {
logRows, _ := logRepository.DB.Query( logRows, err := logRepository.DB.Query(
`SELECT block_number, `SELECT block_number,
address, address,
tx_hash, tx_hash,
@ -61,10 +66,13 @@ func (logRepository LogRepository) GetLogs(address string, blockNumber int64) []
FROM logs FROM logs
WHERE address = $1 AND block_number = $2 WHERE address = $1 AND block_number = $2
ORDER BY block_number DESC`, address, blockNumber) ORDER BY block_number DESC`, address, blockNumber)
if err != nil {
return []core.Log{}, err
}
return logRepository.loadLogs(logRows) return logRepository.loadLogs(logRows)
} }
func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log { func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) ([]core.Log, error) {
var lgs []core.Log var lgs []core.Log
for logsRows.Next() { for logsRows.Next() {
var blockNumber int64 var blockNumber int64
@ -73,7 +81,10 @@ func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log {
var index int64 var index int64
var data string var data string
var topics core.Topics var topics core.Topics
logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) err := logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data)
if err != nil {
logrus.Warn("loadLogs: Error scanning a row in logRows")
}
lg := core.Log{ lg := core.Log{
BlockNumber: blockNumber, BlockNumber: blockNumber,
TxHash: txHash, TxHash: txHash,
@ -86,5 +97,5 @@ func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log {
} }
lgs = append(lgs, lg) lgs = append(lgs, lg)
} }
return lgs return lgs, nil
} }

View File

@ -19,6 +19,7 @@ package repositories
import ( import (
"context" "context"
"database/sql" "database/sql"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
@ -61,6 +62,9 @@ func createReceipt(receipt core.Receipt, blockId int64, tx *sql.Tx) (int64, erro
RETURNING id`, RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId, receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId,
).Scan(&receiptId) ).Scan(&receiptId)
if err != nil {
logrus.Error("createReceipt: Error inserting: ", err)
}
return receiptId, err return receiptId, err
} }
@ -90,6 +94,7 @@ func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
logrus.Warning("CreateReceipt: error inserting receipt: ", err)
return receiptId, err return receiptId, err
} }
tx.Commit() tx.Commit()

View File

@ -17,6 +17,7 @@
package repositories package repositories
import ( import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
@ -28,6 +29,7 @@ type WatchedEventRepository struct {
func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) { func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) {
rows, err := watchedEventRepository.DB.Queryx(`SELECT id, name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name) rows, err := watchedEventRepository.DB.Queryx(`SELECT id, name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name)
if err != nil { if err != nil {
logrus.Error("GetWatchedEvents: error getting watched events: ", err)
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
@ -37,11 +39,13 @@ func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name strin
lg := new(core.WatchedEvent) lg := new(core.WatchedEvent)
err = rows.StructScan(lg) err = rows.StructScan(lg)
if err != nil { if err != nil {
logrus.Warn("GetWatchedEvents: error scanning log: ", err)
return nil, err return nil, err
} }
lgs = append(lgs, lg) lgs = append(lgs, lg)
} }
if err = rows.Err(); err != nil { if err = rows.Err(); err != nil {
logrus.Warn("GetWatchedEvents: error scanning logs: ", err)
return nil, err return nil, err
} }
return lgs, nil return lgs, nil

View File

@ -31,7 +31,7 @@ type BlockRepository interface {
CreateOrUpdateBlock(block core.Block) (int64, error) CreateOrUpdateBlock(block core.Block) (int64, error)
GetBlock(blockNumber int64) (core.Block, error) GetBlock(blockNumber int64) (core.Block, error)
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64
SetBlocksStatus(chainHead int64) SetBlocksStatus(chainHead int64) error
} }
var ErrContractDoesNotExist = func(contractHash string) error { var ErrContractDoesNotExist = func(contractHash string) error {
@ -41,7 +41,7 @@ var ErrContractDoesNotExist = func(contractHash string) error {
type ContractRepository interface { type ContractRepository interface {
CreateContract(contract core.Contract) error CreateContract(contract core.Contract) error
GetContract(contractHash string) (core.Contract, error) GetContract(contractHash string) (core.Contract, error)
ContractExists(contractHash string) bool ContractExists(contractHash string) (bool, error)
} }
var ErrFilterDoesNotExist = func(name string) error { var ErrFilterDoesNotExist = func(name string) error {
@ -57,12 +57,11 @@ type HeaderRepository interface {
CreateOrUpdateHeader(header core.Header) (int64, error) CreateOrUpdateHeader(header core.Header) (int64, error)
GetHeader(blockNumber int64) (core.Header, error) GetHeader(blockNumber int64) (core.Header, error)
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error)
HeaderExists(blockNumber int64) (bool, error)
} }
type LogRepository interface { type LogRepository interface {
CreateLogs(logs []core.Log, receiptId int64) error CreateLogs(logs []core.Log, receiptId int64) error
GetLogs(address string, blockNumber int64) []core.Log GetLogs(address string, blockNumber int64) ([]core.Log, error)
} }
var ErrReceiptDoesNotExist = func(txHash string) error { var ErrReceiptDoesNotExist = func(txHash string) error {

View File

@ -81,7 +81,7 @@ func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Heade
if err != nil { if err != nil {
return header, err return header, err
} }
return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()) return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()), nil
} }
func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) { func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) {
@ -113,8 +113,7 @@ func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []cor
for _, POWHeader := range POWHeaders { for _, POWHeader := range POWHeaders {
if POWHeader.Number != nil { if POWHeader.Number != nil {
header, _ := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String()) header := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String())
headers = append(headers, header) headers = append(headers, header)
} }
} }
@ -147,7 +146,7 @@ func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Heade
GasUsed: uint64(POAHeader.GasUsed), GasUsed: uint64(POAHeader.GasUsed),
Time: POAHeader.Time.ToInt(), Time: POAHeader.Time.ToInt(),
Extra: POAHeader.Extra, Extra: POAHeader.Extra,
}, POAHeader.Hash.String()) }, POAHeader.Hash.String()), nil
} }
func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []core.Header, err error) { func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []core.Header, err error) {
@ -182,7 +181,7 @@ func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []cor
var header core.Header var header core.Header
//Header.Number of the newest block will return nil. //Header.Number of the newest block will return nil.
if _, err := strconv.ParseUint(POAHeader.Number.ToInt().String(), 16, 64); err == nil { if _, err := strconv.ParseUint(POAHeader.Number.ToInt().String(), 16, 64); err == nil {
header, _ = blockChain.headerConverter.Convert(&types.Header{ header = blockChain.headerConverter.Convert(&types.Header{
ParentHash: POAHeader.ParentHash, ParentHash: POAHeader.ParentHash,
UncleHash: POAHeader.UncleHash, UncleHash: POAHeader.UncleHash,
Coinbase: POAHeader.Coinbase, Coinbase: POAHeader.Coinbase,
@ -232,9 +231,9 @@ func (blockChain *BlockChain) GetEthLogsWithCustomQuery(query ethereum.FilterQue
return gethLogs, nil return gethLogs, nil
} }
func (blockChain *BlockChain) LastBlock() *big.Int { func (blockChain *BlockChain) LastBlock() (*big.Int, error) {
block, _ := blockChain.ethClient.HeaderByNumber(context.Background(), nil) block, err := blockChain.ethClient.HeaderByNumber(context.Background(), nil)
return block.Number return block.Number, err
} }
func (blockChain *BlockChain) Node() core.Node { func (blockChain *BlockChain) Node() core.Node {

View File

@ -25,7 +25,7 @@ import (
type HeaderConverter struct{} type HeaderConverter struct{}
func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash string) (core.Header, error) { func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash string) core.Header {
rawHeader, err := json.Marshal(gethHeader) rawHeader, err := json.Marshal(gethHeader)
if err != nil { if err != nil {
panic(err) panic(err)
@ -36,5 +36,5 @@ func (converter HeaderConverter) Convert(gethHeader *types.Header, blockHash str
Raw: rawHeader, Raw: rawHeader,
Timestamp: gethHeader.Time.String(), Timestamp: gethHeader.Time.String(),
} }
return coreHeader, nil return coreHeader
} }

View File

@ -17,6 +17,7 @@
package history package history
import ( import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
@ -35,11 +36,30 @@ func NewBlockValidator(blockchain core.BlockChain, blockRepository datastore.Blo
} }
} }
func (bv BlockValidator) ValidateBlocks() ValidationWindow { func (bv BlockValidator) ValidateBlocks() (ValidationWindow, error) {
window := MakeValidationWindow(bv.blockchain, bv.windowSize) window, err := MakeValidationWindow(bv.blockchain, bv.windowSize)
blockNumbers := MakeRange(window.LowerBound, window.UpperBound) if err != nil {
RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) logrus.Error("ValidateBlocks: error creating validation window: ", err)
lastBlock := bv.blockchain.LastBlock().Int64() return ValidationWindow{}, err
bv.blockRepository.SetBlocksStatus(lastBlock) }
return window
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
_, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
if err != nil {
logrus.Error("ValidateBlocks: error getting and updating blocks: ", err)
return ValidationWindow{}, err
}
lastBlock, err := bv.blockchain.LastBlock()
if err != nil {
logrus.Error("ValidateBlocks: error getting last block: ", err)
return ValidationWindow{}, err
}
err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64())
if err != nil {
logrus.Error("ValidateBlocks: error setting block status: ", err)
return ValidationWindow{}, err
}
return window, nil
} }

View File

@ -17,8 +17,7 @@
package history package history
import ( import (
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
@ -38,11 +37,15 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR
} }
func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) { func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) {
window := MakeValidationWindow(validator.blockChain, validator.windowSize) window, err := MakeValidationWindow(validator.blockChain, validator.windowSize)
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
_, err := RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
if err != nil { if err != nil {
log.Error("Error in ValidateHeaders: ", err) logrus.Error("ValidateHeaders: error creating validation window: ", err)
return ValidationWindow{}, err
}
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
_, err = RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
if err != nil {
logrus.Error("ValidateHeaders: error getting/updating headers: ", err)
return ValidationWindow{}, err return ValidationWindow{}, err
} }
return window, nil return window, nil

View File

@ -23,28 +23,41 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { func PopulateMissingBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, startingBlockNumber int64) (int, error) {
lastBlock := blockchain.LastBlock().Int64() lastBlock, err := blockchain.LastBlock()
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) if err != nil {
log.Error("PopulateMissingBlocks: error getting last block: ", err)
return 0, err
}
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockchain.Node().ID)
if len(blockRange) == 0 { if len(blockRange) == 0 {
return 0 return 0, nil
} }
log.Printf("Backfilling %d blocks\n\n", len(blockRange)) log.Printf("Backfilling %d blocks\n\n", len(blockRange))
RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) _, err = RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)
return len(blockRange) if err != nil {
log.Error("PopulateMissingBlocks: error gettings/updating blocks: ", err)
return 0, err
}
return len(blockRange), nil
} }
func RetrieveAndUpdateBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, blockNumbers []int64) int { func RetrieveAndUpdateBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, blockNumbers []int64) (int, error) {
for _, blockNumber := range blockNumbers { for _, blockNumber := range blockNumbers {
block, err := blockchain.GetBlockByNumber(blockNumber) block, err := blockchain.GetBlockByNumber(blockNumber)
if err != nil { if err != nil {
log.Printf("failed to retrieve block number: %d\n", blockNumber) log.Error("RetrieveAndUpdateBlocks: error getting block: ", err)
return 0 return 0, err
} }
// TODO: handle possible error here
blockRepository.CreateOrUpdateBlock(block) _, err = blockRepository.CreateOrUpdateBlock(block)
if err != nil {
log.Error("RetrieveAndUpdateBlocks: error creating/updating block: ", err)
return 0, err
} }
return len(blockNumbers)
}
return len(blockNumbers), nil
} }

View File

@ -25,25 +25,24 @@ import (
) )
func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) { func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
lastBlock := blockchain.LastBlock().Int64() lastBlock, err := blockchain.LastBlock()
headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock)
if err != nil { if err != nil {
log.Error("Error in checking header in PopulateMissingHeaders: ", err) log.Error("PopulateMissingHeaders: Error getting last block: ", err)
return 0, err return 0, err
} else if headerAlreadyExists {
return 0, nil
} }
blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockchain.Node().ID)
if err != nil { if err != nil {
log.Error("Error getting missing block numbers in PopulateMissingHeaders: ", err) log.Error("PopulateMissingHeaders: Error getting missing block numbers: ", err)
return 0, err return 0, err
} else if len(blockNumbers) == 0 {
return 0, nil
} }
log.Printf("Backfilling %d blocks\n\n", len(blockNumbers)) log.Printf("Backfilling %d blocks\n\n", len(blockNumbers))
_, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers) _, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers)
if err != nil { if err != nil {
log.Error("PopulateMissingHeaders: Error getting/updating headers:", err)
return 0, err return 0, err
} }
return len(blockNumbers), nil return len(blockNumbers), nil

View File

@ -18,17 +18,10 @@ package history
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"text/template"
) )
const WindowTemplate = `Validating Blocks
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD)
`
var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate))
type ValidationWindow struct { type ValidationWindow struct {
LowerBound int64 LowerBound int64
UpperBound int64 UpperBound int64
@ -38,10 +31,14 @@ func (window ValidationWindow) Size() int {
return int(window.UpperBound - window.LowerBound) return int(window.UpperBound - window.LowerBound)
} }
func MakeValidationWindow(blockchain core.BlockChain, windowSize int) ValidationWindow { func MakeValidationWindow(blockchain core.BlockChain, windowSize int) (ValidationWindow, error) {
upperBound := blockchain.LastBlock().Int64() upperBound, err := blockchain.LastBlock()
lowerBound := upperBound - int64(windowSize) if err != nil {
return ValidationWindow{lowerBound, upperBound} log.Error("MakeValidationWindow: error getting LastBlock: ", err)
return ValidationWindow{}, err
}
lowerBound := upperBound.Int64() - int64(windowSize)
return ValidationWindow{lowerBound, upperBound.Int64()}, nil
} }
func MakeRange(min, max int64) []int64 { func MakeRange(min, max int64) []int64 {