Sync only missing blocks on cold import

= Add eth_node_fingerprint to block that can be imitated by both hot and cold imports
- Only sync missing blocks (blocks that are missing or don't share a fingerprint) on cold import
- Set block is_final status after import
This commit is contained in:
Rob Mulholand 2018-05-07 10:41:02 -05:00
parent 5a5e08bd13
commit d5c2ab33fc
31 changed files with 616 additions and 171 deletions

View File

@ -58,6 +58,10 @@ Sync VulcanizeDB from the LevelDB underlying a Geth node.
1. Assure node is not running, and that it has synced to the desired block height. 1. Assure node is not running, and that it has synced to the desired block height.
1. Start vulcanize_db 1. Start vulcanize_db
- `./vulcanizedb coldImport --config <config.toml> --starting-block-number <block-number> --ending-block-number <block-number>` - `./vulcanizedb coldImport --config <config.toml> --starting-block-number <block-number> --ending-block-number <block-number>`
1. Optional flags:
- `--starting-block-number`/`-s`: block number to start syncing from
- `--ending-block-number`/`-e`: block number to sync to
- `--all`/`-a`: sync all missing blocks
## Running the Tests ## Running the Tests

View File

@ -17,13 +17,13 @@ package cmd
import ( import (
"log" "log"
"github.com/ethereum/go-ethereum/common"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/crypto"
"github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum" "github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/pkg/geth/cold_import"
"github.com/vulcanize/vulcanizedb/pkg/geth/converters/cold_db" "github.com/vulcanize/vulcanizedb/pkg/geth/converters/cold_db"
vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
@ -44,8 +44,9 @@ Geth must be synced over all of the desired blocks and must not be running in or
func init() { func init() {
rootCmd.AddCommand(coldImportCmd) rootCmd.AddCommand(coldImportCmd)
coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import") coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import.")
coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import") coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import.")
coldImportCmd.Flags().BoolVarP(&syncAll, "all", "a", false, "Option to sync all missing blocks.")
} }
func coldImport() { func coldImport() {
@ -55,34 +56,38 @@ func coldImport() {
if err != nil { if err != nil {
log.Fatal("Error connecting to ethereum db: ", err) log.Fatal("Error connecting to ethereum db: ", err)
} }
mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber()
if syncAll {
startingBlockNumber = 0
endingBlockNumber = mostRecentBlockNumberInDb
}
if endingBlockNumber < startingBlockNumber { if endingBlockNumber < startingBlockNumber {
log.Fatal("Ending block number must be greater than starting block number for cold import.") log.Fatal("Ending block number must be greater than starting block number for cold import.")
} }
mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber()
if endingBlockNumber > mostRecentBlockNumberInDb { if endingBlockNumber > mostRecentBlockNumberInDb {
log.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb) log.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb)
} }
// init pg db // init pg db
genesisBlockHash := common.BytesToHash(ethDB.GetBlockHash(0)).String() genesisBlock := ethDB.GetBlockHash(0)
coldNode := core.Node{ reader := fs.FsReader{}
GenesisBlock: genesisBlockHash, parser := crypto.EthPublicKeyParser{}
NetworkID: 1, nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser)
ID: "LevelDbColdImport", coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath)
ClientName: "LevelDbColdImport", if err != nil {
log.Fatal("Error getting node: ", err)
} }
pgDB := utils.LoadPostgres(databaseConfig, coldNode) pgDB := utils.LoadPostgres(databaseConfig, coldNode)
// init cold importer deps // init cold importer deps
blockRepository := repositories.BlockRepository{DB: &pgDB} blockRepository := repositories.NewBlockRepository(&pgDB)
receiptRepository := repositories.ReceiptRepository{DB: &pgDB} receiptRepository := repositories.ReceiptRepository{DB: &pgDB}
transactionconverter := cold_db.NewColdDbTransactionConverter() transactionconverter := cold_db.NewColdDbTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(transactionconverter) blockConverter := vulcCommon.NewBlockConverter(transactionconverter)
// init and execute cold importer // init and execute cold importer
coldImporter := geth.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter) coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter)
err = coldImporter.Execute(startingBlockNumber, endingBlockNumber) err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID)
if err != nil { if err != nil {
log.Fatal("Error executing cold import: ", err) log.Fatal("Error executing cold import: ", err)
} }

View File

@ -16,6 +16,7 @@ var (
ipc string ipc string
levelDbPath string levelDbPath string
startingBlockNumber int64 startingBlockNumber int64
syncAll bool
endingBlockNumber int64 endingBlockNumber int64
) )

View File

@ -68,7 +68,7 @@ func sync() {
} }
db := utils.LoadPostgres(databaseConfig, blockchain.Node()) db := utils.LoadPostgres(databaseConfig, blockchain.Node())
blockRepository := repositories.BlockRepository{DB: &db} blockRepository := repositories.NewBlockRepository(&db)
validator := history.NewBlockValidator(blockchain, blockRepository, 15) validator := history.NewBlockValidator(blockchain, blockRepository, 15)
missingBlocksPopulated := make(chan int) missingBlocksPopulated := make(chan int)
go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber) go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber)

View File

@ -0,0 +1,2 @@
ALTER TABLE blocks
DROP COLUMN eth_node_fingerprint;

View File

@ -0,0 +1,14 @@
BEGIN;
ALTER TABLE blocks
ADD COLUMN eth_node_fingerprint VARCHAR(128);
UPDATE blocks
SET eth_node_fingerprint = (
SELECT eth_node_id FROM eth_nodes WHERE eth_nodes.id = blocks.eth_node_id
);
ALTER TABLE blocks
ALTER COLUMN eth_node_fingerprint SET NOT NULL;
COMMIT;

View File

@ -83,7 +83,8 @@ CREATE TABLE public.blocks (
miner character varying(42), miner character varying(42),
extra_data character varying, extra_data character varying,
reward double precision, reward double precision,
uncles_reward double precision uncles_reward double precision,
eth_node_fingerprint character varying(128) NOT NULL
); );

View File

@ -0,0 +1,13 @@
package crypto_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestCrypto(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Crypto Suite")
}

21
pkg/crypto/parser.go Normal file
View File

@ -0,0 +1,21 @@
package crypto
import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/discover"
)
type PublicKeyParser interface {
ParsePublicKey(privateKey string) (string, error)
}
type EthPublicKeyParser struct{}
func (EthPublicKeyParser) ParsePublicKey(privateKey string) (string, error) {
np, err := crypto.HexToECDSA(privateKey)
if err != nil {
return "", err
}
pubKey := discover.PubkeyID(&np.PublicKey)
return pubKey.String(), nil
}

19
pkg/crypto/parser_test.go Normal file
View File

@ -0,0 +1,19 @@
package crypto_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/crypto"
)
var _ = Describe("Public key parser", func() {
It("parses public key from private key", func() {
privKey := "0000000000000000000000000000000000000000000000000000000000000001"
parser := crypto.EthPublicKeyParser{}
pubKey, err := parser.ParsePublicKey(privKey)
Expect(err).NotTo(HaveOccurred())
Expect(pubKey).To(Equal("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8"))
})
})

View File

@ -26,7 +26,7 @@ func (blockRepository *BlockRepository) GetBlock(blockNumber int64) (core.Block,
return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber) return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber)
} }
func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 {
missingNumbers := []int64{} missingNumbers := []int64{}
for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ {
if _, ok := blockRepository.blocks[blockNumber]; !ok { if _, ok := blockRepository.blocks[blockNumber]; !ok {

View File

@ -76,7 +76,7 @@ var _ = Describe("Postgres DB", func() {
} }
node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"}
db := test_config.NewTestDB(node) db := test_config.NewTestDB(node)
blocksRepository := repositories.BlockRepository{DB: db} blocksRepository := repositories.NewBlockRepository(db)
_, err1 := blocksRepository.CreateOrUpdateBlock(badBlock) _, err1 := blocksRepository.CreateOrUpdateBlock(badBlock)
@ -131,7 +131,7 @@ var _ = Describe("Postgres DB", func() {
} }
node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"}
db, _ := postgres.NewDB(test_config.DBConfig, node) db, _ := postgres.NewDB(test_config.DBConfig, node)
blockRepository := repositories.BlockRepository{DB: db} blockRepository := repositories.NewBlockRepository(db)
_, err1 := blockRepository.CreateOrUpdateBlock(block) _, err1 := blockRepository.CreateOrUpdateBlock(block)

View File

@ -20,12 +20,16 @@ const (
var ErrBlockExists = errors.New("Won't add block that already exists.") var ErrBlockExists = errors.New("Won't add block that already exists.")
type BlockRepository struct { type BlockRepository struct {
*postgres.DB database *postgres.DB
}
func NewBlockRepository(database *postgres.DB) *BlockRepository {
return &BlockRepository{database: database}
} }
func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) { func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) {
cutoff := chainHead - blocksFromHeadBeforeFinal cutoff := chainHead - blocksFromHeadBeforeFinal
blockRepository.DB.Exec(` blockRepository.database.Exec(`
UPDATE blocks SET is_final = TRUE UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND number < $1`, WHERE is_final = FALSE AND number < $1`,
cutoff) cutoff)
@ -48,22 +52,22 @@ func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (in
return blockId, ErrBlockExists return blockId, ErrBlockExists
} }
func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 { func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 {
numbers := make([]int64, 0) numbers := make([]int64, 0)
blockRepository.DB.Select(&numbers, blockRepository.database.Select(&numbers,
`SELECT all_block_numbers `SELECT all_block_numbers
FROM ( FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
LEFT JOIN blocks LEFT JOIN blocks
ON number = all_block_numbers ON number = all_block_numbers
WHERE number ISNULL`, WHERE number ISNULL OR eth_node_fingerprint != $3`,
startingBlockNumber, startingBlockNumber,
highestBlockNumber) highestBlockNumber, nodeId)
return numbers return numbers
} }
func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, error) { func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, error) {
blockRows := blockRepository.DB.QueryRowx( blockRows := blockRepository.database.QueryRowx(
`SELECT id, `SELECT id,
number, number,
gaslimit, gaslimit,
@ -81,7 +85,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block,
reward, reward,
uncles_reward uncles_reward
FROM blocks FROM blocks
WHERE eth_node_id = $1 AND number = $2`, blockRepository.NodeID, blockNumber) WHERE eth_node_id = $1 AND number = $2`, blockRepository.database.NodeID, blockNumber)
savedBlock, err := blockRepository.loadBlock(blockRows) savedBlock, err := blockRepository.loadBlock(blockRows)
if err != nil { if err != nil {
switch err { switch err {
@ -96,17 +100,17 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block,
func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, error) { func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, error) {
var blockId int64 var blockId int64
tx, _ := blockRepository.DB.BeginTx(context.Background(), nil) tx, _ := blockRepository.database.BeginTx(context.Background(), nil)
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO blocks `INSERT INTO blocks
(eth_node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward) (eth_node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward, eth_node_fingerprint)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
RETURNING id `, RETURNING id `,
blockRepository.NodeID, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward). blockRepository.database.NodeID, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward, blockRepository.database.Node.ID).
Scan(&blockId) Scan(&blockId)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return 0, postgres.ErrDBInsertFailed return 0, err
} }
if len(block.Transactions) > 0 { if len(block.Transactions) > 0 {
err = blockRepository.createTransactions(tx, blockId, block.Transactions) err = blockRepository.createTransactions(tx, blockId, block.Transactions)
@ -189,11 +193,11 @@ func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64,
func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) { func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) {
var retrievedBlockHash string var retrievedBlockHash string
blockRepository.DB.Get(&retrievedBlockHash, blockRepository.database.Get(&retrievedBlockHash,
`SELECT hash `SELECT hash
FROM blocks FROM blocks
WHERE number = $1 AND eth_node_id = $2`, WHERE number = $1 AND eth_node_id = $2`,
block.Number, blockRepository.NodeID) block.Number, blockRepository.database.NodeID)
return retrievedBlockHash, blockExists(retrievedBlockHash) return retrievedBlockHash, blockExists(retrievedBlockHash)
} }
@ -217,11 +221,11 @@ func blockExists(retrievedBlockHash string) bool {
} }
func (blockRepository BlockRepository) removeBlock(blockNumber int64) error { func (blockRepository BlockRepository) removeBlock(blockNumber int64) error {
_, err := blockRepository.DB.Exec( _, err := blockRepository.database.Exec(
`DELETE FROM `DELETE FROM
blocks blocks
WHERE number=$1 AND eth_node_id=$2`, WHERE number=$1 AND eth_node_id=$2`,
blockNumber, blockRepository.NodeID) blockNumber, blockRepository.database.NodeID)
if err != nil { if err != nil {
return postgres.ErrDBDeleteFailed return postgres.ErrDBDeleteFailed
} }
@ -238,7 +242,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
if err != nil { if err != nil {
return core.Block{}, err return core.Block{}, err
} }
transactionRows, err := blockRepository.DB.Queryx(` transactionRows, err := blockRepository.database.Queryx(`
SELECT hash, SELECT hash,
nonce, nonce,
tx_to, tx_to,

View File

@ -15,16 +15,18 @@ import (
var _ = Describe("Saving blocks", func() { var _ = Describe("Saving blocks", func() {
var db *postgres.DB var db *postgres.DB
var node core.Node
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
BeforeEach(func() { BeforeEach(func() {
node := core.Node{ node = core.Node{
GenesisBlock: "GENESIS", GenesisBlock: "GENESIS",
NetworkID: 1, NetworkID: 1,
ID: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ID: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
} }
db = test_config.NewTestDB(node) db = test_config.NewTestDB(node)
blockRepository = repositories.BlockRepository{DB: db} blockRepository = repositories.NewBlockRepository(db)
}) })
@ -40,7 +42,7 @@ var _ = Describe("Saving blocks", func() {
ClientName: "Geth", ClientName: "Geth",
} }
dbTwo := test_config.NewTestDB(nodeTwo) dbTwo := test_config.NewTestDB(nodeTwo)
repositoryTwo := repositories.BlockRepository{DB: dbTwo} repositoryTwo := repositories.NewBlockRepository(dbTwo)
_, err := repositoryTwo.GetBlock(123) _, err := repositoryTwo.GetBlock(123)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
@ -166,7 +168,7 @@ var _ = Describe("Saving blocks", func() {
NetworkID: 1, NetworkID: 1,
} }
dbTwo := test_config.NewTestDB(nodeTwo) dbTwo := test_config.NewTestDB(nodeTwo)
repositoryTwo := repositories.BlockRepository{DB: dbTwo} repositoryTwo := repositories.NewBlockRepository(dbTwo)
blockRepository.CreateOrUpdateBlock(blockOne) blockRepository.CreateOrUpdateBlock(blockOne)
repositoryTwo.CreateOrUpdateBlock(blockTwo) repositoryTwo.CreateOrUpdateBlock(blockTwo)
@ -236,32 +238,39 @@ var _ = Describe("Saving blocks", func() {
It("is empty the starting block number is the highest known block number", func() { It("is empty the starting block number is the highest known block number", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 1}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 1})
Expect(len(blockRepository.MissingBlockNumbers(1, 1))).To(Equal(0)) Expect(len(blockRepository.MissingBlockNumbers(1, 1, node.ID))).To(Equal(0))
}) })
It("is the only missing block number", func() { It("is the only missing block number", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 2}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 2})
Expect(blockRepository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1})) Expect(blockRepository.MissingBlockNumbers(1, 2, node.ID)).To(Equal([]int64{1}))
}) })
It("is both missing block numbers", func() { It("is both missing block numbers", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
Expect(blockRepository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2})) Expect(blockRepository.MissingBlockNumbers(1, 3, node.ID)).To(Equal([]int64{1, 2}))
}) })
It("goes back to the starting block number", func() { It("goes back to the starting block number", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5})) Expect(blockRepository.MissingBlockNumbers(4, 6, node.ID)).To(Equal([]int64{4, 5}))
}) })
It("only includes missing block numbers", func() { It("only includes missing block numbers", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 4}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 4})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5})) Expect(blockRepository.MissingBlockNumbers(4, 6, node.ID)).To(Equal([]int64{5}))
})
It("includes blocks created by a different node", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 4})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
Expect(blockRepository.MissingBlockNumbers(4, 6, "Different node id")).To(Equal([]int64{4, 5, 6}))
}) })
It("is a list with multiple gaps", func() { It("is a list with multiple gaps", func() {
@ -270,18 +279,18 @@ var _ = Describe("Saving blocks", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 8})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 10}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 10})
Expect(blockRepository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9})) Expect(blockRepository.MissingBlockNumbers(3, 10, node.ID)).To(Equal([]int64{3, 6, 7, 9}))
}) })
It("returns empty array when lower bound exceeds upper bound", func() { It("returns empty array when lower bound exceeds upper bound", func() {
Expect(blockRepository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{})) Expect(blockRepository.MissingBlockNumbers(10000, 1, node.ID)).To(Equal([]int64{}))
}) })
It("only returns requested range even when other gaps exist", func() { It("only returns requested range even when other gaps exist", func() {
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 8})
Expect(blockRepository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5})) Expect(blockRepository.MissingBlockNumbers(1, 5, node.ID)).To(Equal([]int64{1, 2, 4, 5}))
}) })
}) })

View File

@ -53,7 +53,7 @@ var _ = Describe("Creating contracts", func() {
It("returns transactions 'To' a contract", func() { It("returns transactions 'To' a contract", func() {
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
blockRepository = repositories.BlockRepository{DB: db} blockRepository = repositories.NewBlockRepository(db)
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{ Transactions: []core.Transaction{

View File

@ -28,7 +28,7 @@ var _ = Describe("Logs Repository", func() {
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
} }
db = test_config.NewTestDB(node) db = test_config.NewTestDB(node)
blockRepository = repositories.BlockRepository{DB: db} blockRepository = repositories.NewBlockRepository(db)
logsRepository = repositories.LogRepository{DB: db} logsRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db}
}) })

View File

@ -10,7 +10,7 @@ import (
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
var _ = Describe("Logs Repository", func() { var _ = Describe("Receipts Repository", func() {
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
var logRepository datastore.LogRepository var logRepository datastore.LogRepository
var receiptRepository datastore.ReceiptRepository var receiptRepository datastore.ReceiptRepository
@ -24,7 +24,7 @@ var _ = Describe("Logs Repository", func() {
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
} }
db = test_config.NewTestDB(node) db = test_config.NewTestDB(node)
blockRepository = repositories.BlockRepository{DB: db} blockRepository = repositories.NewBlockRepository(db)
logRepository = repositories.LogRepository{DB: db} logRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db}
}) })
@ -99,11 +99,12 @@ var _ = Describe("Logs Repository", func() {
Hash: expected.TxHash, Hash: expected.TxHash,
Receipt: expected, Receipt: expected,
} }
block := core.Block{Transactions: []core.Transaction{transaction}} block := core.Block{Transactions: []core.Transaction{transaction}}
blockRepository.CreateOrUpdateBlock(block)
receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223")
_, err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).NotTo(HaveOccurred())
receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
//Not currently serializing bloom logs //Not currently serializing bloom logs
Expect(receipt.Bloom).To(Equal(core.Receipt{}.Bloom)) Expect(receipt.Bloom).To(Equal(core.Receipt{}.Bloom))
@ -132,10 +133,11 @@ var _ = Describe("Logs Repository", func() {
block := core.Block{ block := core.Block{
Transactions: []core.Transaction{transaction}, Transactions: []core.Transaction{transaction},
} }
blockRepository.CreateOrUpdateBlock(block)
_, err := receiptRepository.GetReceipt(receipt.TxHash) _, err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).NotTo(HaveOccurred())
_, err = receiptRepository.GetReceipt(receipt.TxHash)
Expect(err).To(Not(HaveOccurred())) Expect(err).To(Not(HaveOccurred()))
}) })
}) })

View File

@ -21,7 +21,7 @@ var _ = Describe("Watched Events Repository", func() {
BeforeEach(func() { BeforeEach(func() {
db = test_config.NewTestDB(core.Node{}) db = test_config.NewTestDB(core.Node{})
blocksRepository = repositories.BlockRepository{DB: db} blocksRepository = repositories.NewBlockRepository(db)
filterRepository = repositories.FilterRepository{DB: db} filterRepository = repositories.FilterRepository{DB: db}
logRepository = repositories.LogRepository{DB: db} logRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db}

View File

@ -14,7 +14,7 @@ var ErrBlockDoesNotExist = func(blockNumber int64) error {
type BlockRepository interface { type BlockRepository interface {
CreateOrUpdateBlock(block core.Block) (int64, error) CreateOrUpdateBlock(block core.Block) (int64, error)
GetBlock(blockNumber int64) (core.Block, error) GetBlock(blockNumber int64) (core.Block, error)
MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64
SetBlocksStatus(chainHead int64) SetBlocksStatus(chainHead int64)
} }

View File

@ -11,6 +11,13 @@ type MockBlockRepository struct {
createOrUpdateBlockPassedBlock core.Block createOrUpdateBlockPassedBlock core.Block
createOrUpdateBlockReturnInt int64 createOrUpdateBlockReturnInt int64
createOrUpdateBlockReturnErr error createOrUpdateBlockReturnErr error
missingBlockNumbersCalled bool
missingBlockNumbersPassedStartingBlockNumber int64
missingBlockNumbersPassedEndingBlockNumber int64
missingBlockNumbersPassedNodeId string
missingBlockNumbersReturnArray []int64
setBlockStatusCalled bool
setBlockStatusPassedChainHead int64
} }
func NewMockBlockRepository() *MockBlockRepository { func NewMockBlockRepository() *MockBlockRepository {
@ -19,6 +26,13 @@ func NewMockBlockRepository() *MockBlockRepository {
createOrUpdateBlockPassedBlock: core.Block{}, createOrUpdateBlockPassedBlock: core.Block{},
createOrUpdateBlockReturnInt: 0, createOrUpdateBlockReturnInt: 0,
createOrUpdateBlockReturnErr: nil, createOrUpdateBlockReturnErr: nil,
missingBlockNumbersCalled: false,
missingBlockNumbersPassedStartingBlockNumber: 0,
missingBlockNumbersPassedEndingBlockNumber: 0,
missingBlockNumbersPassedNodeId: "",
missingBlockNumbersReturnArray: nil,
setBlockStatusCalled: false,
setBlockStatusPassedChainHead: 0,
} }
} }
@ -27,6 +41,10 @@ func (mbr *MockBlockRepository) SetCreateOrUpdateBlockReturnVals(i int64, err er
mbr.createOrUpdateBlockReturnErr = err mbr.createOrUpdateBlockReturnErr = err
} }
func (mbr *MockBlockRepository) SetMissingBlockNumbersReturnArray(returnArray []int64) {
mbr.missingBlockNumbersReturnArray = returnArray
}
func (mbr *MockBlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) { func (mbr *MockBlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) {
mbr.createOrUpdateBlockCalled = true mbr.createOrUpdateBlockCalled = true
mbr.createOrUpdateBlockPassedBlock = block mbr.createOrUpdateBlockPassedBlock = block
@ -37,15 +55,32 @@ func (mbr *MockBlockRepository) GetBlock(blockNumber int64) (core.Block, error)
panic("implement me") panic("implement me")
} }
func (mbr *MockBlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { func (mbr *MockBlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 {
panic("implement me") mbr.missingBlockNumbersCalled = true
mbr.missingBlockNumbersPassedStartingBlockNumber = startingBlockNumber
mbr.missingBlockNumbersPassedEndingBlockNumber = endingBlockNumber
mbr.missingBlockNumbersPassedNodeId = nodeId
return mbr.missingBlockNumbersReturnArray
} }
func (mbr *MockBlockRepository) SetBlocksStatus(chainHead int64) { func (mbr *MockBlockRepository) SetBlocksStatus(chainHead int64) {
panic("implement me") mbr.setBlockStatusCalled = true
mbr.setBlockStatusPassedChainHead = chainHead
} }
func (mbr *MockBlockRepository) AssertCreateOrUpdateBlockCalledWith(block core.Block) { func (mbr *MockBlockRepository) AssertCreateOrUpdateBlockCalledWith(block core.Block) {
Expect(mbr.createOrUpdateBlockCalled).To(BeTrue()) Expect(mbr.createOrUpdateBlockCalled).To(BeTrue())
Expect(mbr.createOrUpdateBlockPassedBlock).To(Equal(block)) Expect(mbr.createOrUpdateBlockPassedBlock).To(Equal(block))
} }
func (mbr *MockBlockRepository) AssertMissingBlockNumbersCalledWith(startingBlockNumber int64, endingBlockNumber int64, nodeId string) {
Expect(mbr.missingBlockNumbersCalled).To(BeTrue())
Expect(mbr.missingBlockNumbersPassedStartingBlockNumber).To(Equal(startingBlockNumber))
Expect(mbr.missingBlockNumbersPassedEndingBlockNumber).To(Equal(endingBlockNumber))
Expect(mbr.missingBlockNumbersPassedNodeId).To(Equal(nodeId))
}
func (mbr *MockBlockRepository) AssertSetBlockStatusCalledWith(chainHead int64) {
Expect(mbr.setBlockStatusCalled).To(BeTrue())
Expect(mbr.setBlockStatusPassedChainHead).To(Equal(chainHead))
}

View File

@ -0,0 +1,38 @@
package fakes
import . "github.com/onsi/gomega"
type MockCryptoParser struct {
parsePublicKeyCalled bool
parsePublicKeyPassedPrivateKey string
parsePublicKeyReturnString string
parsePublicKeyReturnErr error
}
func NewMockCryptoParser() *MockCryptoParser {
return &MockCryptoParser{
parsePublicKeyCalled: false,
parsePublicKeyPassedPrivateKey: "",
parsePublicKeyReturnString: "",
parsePublicKeyReturnErr: nil,
}
}
func (mcp *MockCryptoParser) SetReturnVal(pubKey string) {
mcp.parsePublicKeyReturnString = pubKey
}
func (mcp *MockCryptoParser) SetReturnErr(err error) {
mcp.parsePublicKeyReturnErr = err
}
func (mcp *MockCryptoParser) ParsePublicKey(privateKey string) (string, error) {
mcp.parsePublicKeyCalled = true
mcp.parsePublicKeyPassedPrivateKey = privateKey
return mcp.parsePublicKeyReturnString, mcp.parsePublicKeyReturnErr
}
func (mcp *MockCryptoParser) AssertParsePublicKeyCalledWith(privateKey string) {
Expect(mcp.parsePublicKeyCalled).To(BeTrue())
Expect(mcp.parsePublicKeyPassedPrivateKey).To(Equal(privateKey))
}

View File

@ -0,0 +1,38 @@
package fakes
import . "github.com/onsi/gomega"
type MockFsReader struct {
readCalled bool
readPassedPath string
readReturnBytes []byte
readReturnErr error
}
func NewMockFsReader() *MockFsReader {
return &MockFsReader{
readCalled: false,
readPassedPath: "",
readReturnBytes: nil,
readReturnErr: nil,
}
}
func (mfr *MockFsReader) SetReturnBytes(returnBytes []byte) {
mfr.readReturnBytes = returnBytes
}
func (mfr *MockFsReader) SetReturnErr(err error) {
mfr.readReturnErr = err
}
func (mfr *MockFsReader) Read(path string) ([]byte, error) {
mfr.readCalled = true
mfr.readPassedPath = path
return mfr.readReturnBytes, mfr.readReturnErr
}
func (mfr *MockFsReader) AssertReadCalledWith(path string) {
Expect(mfr.readCalled).To(BeTrue())
Expect(mfr.readPassedPath).To(Equal(path))
}

14
pkg/fs/reader.go Normal file
View File

@ -0,0 +1,14 @@
package fs
import "io/ioutil"
type Reader interface {
Read(path string) ([]byte, error)
}
type FsReader struct {
}
func (FsReader) Read(path string) ([]byte, error) {
return ioutil.ReadFile(path)
}

View File

@ -2,7 +2,6 @@ package geth
import ( import (
"errors" "errors"
"io/ioutil"
"strings" "strings"
"encoding/json" "encoding/json"
@ -11,6 +10,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi"
"github.com/vulcanize/vulcanizedb/pkg/fs"
) )
var ( var (
@ -80,7 +80,8 @@ func ParseAbi(abiString string) (abi.ABI, error) {
} }
func ReadAbiFile(abiFilePath string) (string, error) { func ReadAbiFile(abiFilePath string) (string, error) {
filesBytes, err := ioutil.ReadFile(abiFilePath) reader := fs.FsReader{}
filesBytes, err := reader.Read(abiFilePath)
if err != nil { if err != nil {
return "", ErrMissingAbiFile return "", ErrMissingAbiFile
} }

View File

@ -0,0 +1,13 @@
package cold_import_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestColdImport(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ColdImport Suite")
}

View File

@ -1,9 +1,8 @@
package geth package cold_import
import ( import (
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum" "github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
) )
@ -23,22 +22,21 @@ func NewColdImporter(ethDB ethereum.Database, blockRepository datastore.BlockRep
} }
} }
func (ci *ColdImporter) Execute(startingBlockNumber int64, endingBlockNumber int64) error { func (ci *ColdImporter) Execute(startingBlockNumber int64, endingBlockNumber int64, nodeId string) error {
for i := startingBlockNumber; i <= endingBlockNumber; i++ { missingBlocks := ci.blockRepository.MissingBlockNumbers(startingBlockNumber, endingBlockNumber, nodeId)
hash := ci.ethDB.GetBlockHash(i) for _, n := range missingBlocks {
hash := ci.ethDB.GetBlockHash(n)
blockId, err := ci.createBlocksAndTransactions(hash, i) blockId, err := ci.createBlocksAndTransactions(hash, n)
if err != nil { if err != nil {
if err == repositories.ErrBlockExists {
continue
}
return err return err
} }
err = ci.createReceiptsAndLogs(hash, i, blockId) err = ci.createReceiptsAndLogs(hash, n, blockId)
if err != nil { if err != nil {
return err return err
} }
} }
ci.blockRepository.SetBlocksStatus(endingBlockNumber)
return nil return nil
} }

View File

@ -0,0 +1,133 @@
package cold_import_test
import (
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth/cold_import"
vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
)
var _ = Describe("Geth cold importer", func() {
var fakeGethBlock *types.Block
BeforeEach(func() {
header := &types.Header{}
transactions := []*types.Transaction{}
uncles := []*types.Header{}
receipts := []*types.Receipt{}
fakeGethBlock = types.NewBlock(header, transactions, uncles, receipts)
})
It("only populates missing blocks", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
nodeId := "node_id"
startingBlockNumber := int64(120)
missingBlockNumber := int64(123)
endingBlockNumber := int64(125)
fakeHash := []byte{1, 2, 3, 4, 5}
mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{missingBlockNumber})
mockEthereumDatabase.SetReturnHash(fakeHash)
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(startingBlockNumber, endingBlockNumber, nodeId)
mockBlockRepository.AssertMissingBlockNumbersCalledWith(startingBlockNumber, endingBlockNumber, nodeId)
mockEthereumDatabase.AssertGetBlockHashCalledWith(missingBlockNumber)
mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, missingBlockNumber)
})
It("fetches missing blocks from level db and persists them to pg", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
blockNumber := int64(123)
fakeHash := []byte{1, 2, 3, 4, 5}
mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{blockNumber})
mockEthereumDatabase.SetReturnHash(fakeHash)
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(blockNumber, blockNumber, "node_id")
mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber)
mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber)
mockTransactionConverter.AssertConvertTransactionsToCoreCalledWith(fakeGethBlock)
convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock)
Expect(err).NotTo(HaveOccurred())
mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock)
})
It("sets is_final status on populated blocks", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
startingBlockNumber := int64(120)
endingBlockNumber := int64(125)
fakeHash := []byte{1, 2, 3, 4, 5}
mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{startingBlockNumber})
mockEthereumDatabase.SetReturnHash(fakeHash)
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(startingBlockNumber, endingBlockNumber, "node_id")
mockBlockRepository.AssertSetBlockStatusCalledWith(endingBlockNumber)
})
It("fetches receipts from level db and persists them to pg", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
blockNumber := int64(123)
blockId := int64(999)
mockBlockRepository.SetCreateOrUpdateBlockReturnVals(blockId, nil)
fakeReceipts := types.Receipts{{}}
mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{blockNumber})
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
mockEthereumDatabase.SetReturnReceipts(fakeReceipts)
importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(blockNumber, blockNumber, "node_id")
expectedReceipts := vulcCommon.ToCoreReceipts(fakeReceipts)
mockReceiptRepository.AssertCreateReceiptsAndLogsCalledWith(blockId, expectedReceipts)
})
It("does not fetch receipts if block already exists", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
blockNumber := int64(123)
mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{})
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
mockBlockRepository.SetCreateOrUpdateBlockReturnVals(0, repositories.ErrBlockExists)
importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
err := importer.Execute(blockNumber, blockNumber, "node_id")
Expect(err).NotTo(HaveOccurred())
mockReceiptRepository.AssertCreateReceiptsAndLogsNotCalled()
})
})

View File

@ -0,0 +1,68 @@
package cold_import
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/crypto"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"strings"
)
const (
ColdImportClientName = "LevelDbColdImport"
ColdImportNetworkId float64 = 1
)
var (
NoChainDataErr = errors.New("Level DB path does not include chaindata extension.")
NoGethRootErr = errors.New("Level DB path does not include root path to geth.")
)
type ColdImportNodeBuilder struct {
reader fs.Reader
parser crypto.PublicKeyParser
}
func NewColdImportNodeBuilder(reader fs.Reader, parser crypto.PublicKeyParser) ColdImportNodeBuilder {
return ColdImportNodeBuilder{reader: reader, parser: parser}
}
func (cinb ColdImportNodeBuilder) GetNode(genesisBlock []byte, levelPath string) (core.Node, error) {
var coldNode core.Node
nodeKeyPath, err := getNodeKeyPath(levelPath)
if err != nil {
return coldNode, err
}
nodeKey, err := cinb.reader.Read(nodeKeyPath)
if err != nil {
return coldNode, err
}
nodeId, err := cinb.parser.ParsePublicKey(string(nodeKey))
if err != nil {
return coldNode, err
}
genesisBlockHash := common.BytesToHash(genesisBlock).String()
coldNode = core.Node{
GenesisBlock: genesisBlockHash,
NetworkID: ColdImportNetworkId,
ID: nodeId,
ClientName: ColdImportClientName,
}
return coldNode, nil
}
func getNodeKeyPath(levelPath string) (string, error) {
chaindataExtension := "chaindata"
if !strings.Contains(levelPath, chaindataExtension) {
return "", NoChainDataErr
}
chaindataExtensionLength := len(chaindataExtension)
gethRootPathLength := len(levelPath) - chaindataExtensionLength
if gethRootPathLength <= chaindataExtensionLength {
return "", NoGethRootErr
}
gethRootPath := levelPath[:gethRootPathLength]
nodeKeyPath := gethRootPath + "nodekey"
return nodeKeyPath, nil
}

View File

@ -0,0 +1,97 @@
package cold_import_test
import (
"errors"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth/cold_import"
)
var _ = Describe("Cold importer node builder", func() {
Describe("when level path is not valid", func() {
It("returns error if no chaindata extension", func() {
gethPath := "path/to/geth"
mockReader := fakes.NewMockFsReader()
mockParser := fakes.NewMockCryptoParser()
nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser)
_, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, gethPath)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(cold_import.NoChainDataErr))
})
It("returns error if no root geth path", func() {
chaindataPath := "chaindata"
mockReader := fakes.NewMockFsReader()
mockParser := fakes.NewMockCryptoParser()
nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser)
_, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, chaindataPath)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(cold_import.NoGethRootErr))
})
})
Describe("when reader fails", func() {
It("returns err", func() {
mockReader := fakes.NewMockFsReader()
fakeError := errors.New("Failed")
mockReader.SetReturnErr(fakeError)
mockParser := fakes.NewMockCryptoParser()
nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser)
_, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, "path/to/geth/chaindata")
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakeError))
})
})
Describe("when parser fails", func() {
It("returns err", func() {
mockReader := fakes.NewMockFsReader()
mockParser := fakes.NewMockCryptoParser()
fakeErr := errors.New("Failed")
mockParser.SetReturnErr(fakeErr)
nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser)
_, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, "path/to/geth/chaindata")
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakeErr))
})
})
Describe("when path is valid and reader and parser succeed", func() {
It("builds a node", func() {
fakeGenesisBlock := []byte{1, 2, 3, 4, 5}
fakeRootGethPath := "root/path/to/geth/"
fakeLevelPath := fakeRootGethPath + "chaindata"
fakeNodeKeyPath := fakeRootGethPath + "nodekey"
fakePublicKeyBytes := []byte{5, 4, 3, 2, 1}
fakePublicKeyString := "public_key"
mockReader := fakes.NewMockFsReader()
mockReader.SetReturnBytes(fakePublicKeyBytes)
mockParser := fakes.NewMockCryptoParser()
mockParser.SetReturnVal(fakePublicKeyString)
nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser)
result, err := nodeBuilder.GetNode(fakeGenesisBlock, fakeLevelPath)
Expect(err).NotTo(HaveOccurred())
mockReader.AssertReadCalledWith(fakeNodeKeyPath)
mockParser.AssertParsePublicKeyCalledWith(string(fakePublicKeyBytes))
Expect(result).NotTo(BeNil())
Expect(result.ClientName).To(Equal(cold_import.ColdImportClientName))
expectedGenesisBlock := common.BytesToHash(fakeGenesisBlock).String()
Expect(result.GenesisBlock).To(Equal(expectedGenesisBlock))
Expect(result.ID).To(Equal(fakePublicKeyString))
Expect(result.NetworkID).To(Equal(cold_import.ColdImportNetworkId))
})
})
})

View File

@ -1,85 +0,0 @@
package geth_test
import (
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth"
vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
)
var _ = Describe("Geth cold importer", func() {
var fakeGethBlock *types.Block
BeforeEach(func() {
header := &types.Header{}
transactions := []*types.Transaction{}
uncles := []*types.Header{}
receipts := []*types.Receipt{}
fakeGethBlock = types.NewBlock(header, transactions, uncles, receipts)
})
It("fetches blocks from level db and persists them to pg", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
blockNumber := int64(123)
fakeHash := []byte{1, 2, 3, 4, 5}
mockEthereumDatabase.SetReturnHash(fakeHash)
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(blockNumber, blockNumber)
mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber)
mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber)
mockTransactionConverter.AssertConvertTransactionsToCoreCalledWith(fakeGethBlock)
convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock)
Expect(err).NotTo(HaveOccurred())
mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock)
})
It("fetches receipts from level db and persists them to pg", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
blockNumber := int64(123)
blockId := int64(999)
mockBlockRepository.SetCreateOrUpdateBlockReturnVals(blockId, nil)
fakeReceipts := types.Receipts{{}}
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
mockEthereumDatabase.SetReturnReceipts(fakeReceipts)
importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
importer.Execute(blockNumber, blockNumber)
expectedReceipts := vulcCommon.ToCoreReceipts(fakeReceipts)
mockReceiptRepository.AssertCreateReceiptsAndLogsCalledWith(blockId, expectedReceipts)
})
It("does not fetch receipts if block already exists", func() {
mockEthereumDatabase := fakes.NewMockEthereumDatabase()
mockBlockRepository := fakes.NewMockBlockRepository()
mockReceiptRepository := fakes.NewMockReceiptRepository()
mockTransactionConverter := fakes.NewMockTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter)
mockEthereumDatabase.SetReturnBlock(fakeGethBlock)
mockBlockRepository.SetCreateOrUpdateBlockReturnVals(0, repositories.ErrBlockExists)
importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter)
err := importer.Execute(1, 1)
Expect(err).NotTo(HaveOccurred())
mockReceiptRepository.AssertCreateReceiptsAndLogsNotCalled()
})
})

View File

@ -9,7 +9,7 @@ import (
func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int {
lastBlock := blockchain.LastBlock().Int64() lastBlock := blockchain.LastBlock().Int64()
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1) blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1, blockchain.Node().ID)
log.SetPrefix("") log.SetPrefix("")
log.Printf("Backfilling %d blocks\n\n", len(blockRange)) log.Printf("Backfilling %d blocks\n\n", len(blockRange))
RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)