From d5c2ab33fcc12a26afa7a4166769d5993262b0d0 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Mon, 7 May 2018 10:41:02 -0500 Subject: [PATCH] Sync only missing blocks on cold import = Add eth_node_fingerprint to block that can be imitated by both hot and cold imports - Only sync missing blocks (blocks that are missing or don't share a fingerprint) on cold import - Set block is_final status after import --- README.md | 4 + cmd/coldImport.go | 37 ++--- cmd/root.go | 1 + cmd/sync.go | 2 +- ...dd_eth_node_fingerprint_to_blocks.down.sql | 2 + ..._add_eth_node_fingerprint_to_blocks.up.sql | 14 ++ db/schema.sql | 3 +- pkg/crypto/crypto_suite_test.go | 13 ++ pkg/crypto/parser.go | 21 +++ pkg/crypto/parser_test.go | 19 +++ pkg/datastore/inmemory/block_repository.go | 2 +- pkg/datastore/postgres/postgres_test.go | 4 +- .../postgres/repositories/block_repository.go | 40 +++--- .../repositories/block_repository_test.go | 33 +++-- .../repositories/contract_repository_test.go | 2 +- .../repositories/logs_repository_test.go | 2 +- .../repositories/receipts_repository_test.go | 16 ++- .../watched_events_repository_test.go | 2 +- pkg/datastore/repository.go | 2 +- pkg/fakes/mock_block_repository.go | 57 ++++++-- pkg/fakes/mock_crypto_parser.go | 38 +++++ pkg/fakes/mock_fs_reader.go | 38 +++++ pkg/fs/reader.go | 14 ++ pkg/geth/abi.go | 5 +- .../cold_import/cold_import_suite_test.go | 13 ++ .../importer.go} | 18 ++- pkg/geth/cold_import/importer_test.go | 133 ++++++++++++++++++ pkg/geth/cold_import/node_builder.go | 68 +++++++++ pkg/geth/cold_import/node_builder_test.go | 97 +++++++++++++ pkg/geth/cold_importer_test.go | 85 ----------- pkg/history/populate_blocks.go | 2 +- 31 files changed, 616 insertions(+), 171 deletions(-) create mode 100644 db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.down.sql create mode 100644 db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.up.sql create mode 100644 pkg/crypto/crypto_suite_test.go create mode 100644 pkg/crypto/parser.go create mode 100644 pkg/crypto/parser_test.go create mode 100644 pkg/fakes/mock_crypto_parser.go create mode 100644 pkg/fakes/mock_fs_reader.go create mode 100644 pkg/fs/reader.go create mode 100644 pkg/geth/cold_import/cold_import_suite_test.go rename pkg/geth/{cold_importer.go => cold_import/importer.go} (78%) create mode 100644 pkg/geth/cold_import/importer_test.go create mode 100644 pkg/geth/cold_import/node_builder.go create mode 100644 pkg/geth/cold_import/node_builder_test.go delete mode 100644 pkg/geth/cold_importer_test.go diff --git a/README.md b/README.md index bfa5e1fb..10b12041 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,10 @@ Sync VulcanizeDB from the LevelDB underlying a Geth node. 1. Assure node is not running, and that it has synced to the desired block height. 1. Start vulcanize_db - `./vulcanizedb coldImport --config --starting-block-number --ending-block-number ` +1. Optional flags: + - `--starting-block-number`/`-s`: block number to start syncing from + - `--ending-block-number`/`-e`: block number to sync to + - `--all`/`-a`: sync all missing blocks ## Running the Tests diff --git a/cmd/coldImport.go b/cmd/coldImport.go index 0b2aa61c..afb86ffa 100644 --- a/cmd/coldImport.go +++ b/cmd/coldImport.go @@ -17,13 +17,13 @@ package cmd import ( "log" - "github.com/ethereum/go-ethereum/common" "github.com/spf13/cobra" - "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/crypto" "github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" - "github.com/vulcanize/vulcanizedb/pkg/geth" + "github.com/vulcanize/vulcanizedb/pkg/fs" + "github.com/vulcanize/vulcanizedb/pkg/geth/cold_import" "github.com/vulcanize/vulcanizedb/pkg/geth/converters/cold_db" vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" "github.com/vulcanize/vulcanizedb/utils" @@ -44,8 +44,9 @@ Geth must be synced over all of the desired blocks and must not be running in or func init() { rootCmd.AddCommand(coldImportCmd) - coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import") - coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import") + coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import.") + coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import.") + coldImportCmd.Flags().BoolVarP(&syncAll, "all", "a", false, "Option to sync all missing blocks.") } func coldImport() { @@ -55,34 +56,38 @@ func coldImport() { if err != nil { log.Fatal("Error connecting to ethereum db: ", err) } - + mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber() + if syncAll { + startingBlockNumber = 0 + endingBlockNumber = mostRecentBlockNumberInDb + } if endingBlockNumber < startingBlockNumber { log.Fatal("Ending block number must be greater than starting block number for cold import.") } - mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber() if endingBlockNumber > mostRecentBlockNumberInDb { log.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb) } // init pg db - genesisBlockHash := common.BytesToHash(ethDB.GetBlockHash(0)).String() - coldNode := core.Node{ - GenesisBlock: genesisBlockHash, - NetworkID: 1, - ID: "LevelDbColdImport", - ClientName: "LevelDbColdImport", + genesisBlock := ethDB.GetBlockHash(0) + reader := fs.FsReader{} + parser := crypto.EthPublicKeyParser{} + nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser) + coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath) + if err != nil { + log.Fatal("Error getting node: ", err) } pgDB := utils.LoadPostgres(databaseConfig, coldNode) // init cold importer deps - blockRepository := repositories.BlockRepository{DB: &pgDB} + blockRepository := repositories.NewBlockRepository(&pgDB) receiptRepository := repositories.ReceiptRepository{DB: &pgDB} transactionconverter := cold_db.NewColdDbTransactionConverter() blockConverter := vulcCommon.NewBlockConverter(transactionconverter) // init and execute cold importer - coldImporter := geth.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter) - err = coldImporter.Execute(startingBlockNumber, endingBlockNumber) + coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter) + err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID) if err != nil { log.Fatal("Error executing cold import: ", err) } diff --git a/cmd/root.go b/cmd/root.go index de46227e..8ce83ff2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -16,6 +16,7 @@ var ( ipc string levelDbPath string startingBlockNumber int64 + syncAll bool endingBlockNumber int64 ) diff --git a/cmd/sync.go b/cmd/sync.go index ff620f36..9814a41c 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -68,7 +68,7 @@ func sync() { } db := utils.LoadPostgres(databaseConfig, blockchain.Node()) - blockRepository := repositories.BlockRepository{DB: &db} + blockRepository := repositories.NewBlockRepository(&db) validator := history.NewBlockValidator(blockchain, blockRepository, 15) missingBlocksPopulated := make(chan int) go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber) diff --git a/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.down.sql b/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.down.sql new file mode 100644 index 00000000..f1980bec --- /dev/null +++ b/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE blocks + DROP COLUMN eth_node_fingerprint; \ No newline at end of file diff --git a/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.up.sql b/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.up.sql new file mode 100644 index 00000000..947db2c7 --- /dev/null +++ b/db/migrations/1525464568_add_eth_node_fingerprint_to_blocks.up.sql @@ -0,0 +1,14 @@ +BEGIN; + +ALTER TABLE blocks + ADD COLUMN eth_node_fingerprint VARCHAR(128); + +UPDATE blocks + SET eth_node_fingerprint = ( + SELECT eth_node_id FROM eth_nodes WHERE eth_nodes.id = blocks.eth_node_id + ); + +ALTER TABLE blocks + ALTER COLUMN eth_node_fingerprint SET NOT NULL; + +COMMIT; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 0a8e84ed..bd4e1d59 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -83,7 +83,8 @@ CREATE TABLE public.blocks ( miner character varying(42), extra_data character varying, reward double precision, - uncles_reward double precision + uncles_reward double precision, + eth_node_fingerprint character varying(128) NOT NULL ); diff --git a/pkg/crypto/crypto_suite_test.go b/pkg/crypto/crypto_suite_test.go new file mode 100644 index 00000000..e60462b9 --- /dev/null +++ b/pkg/crypto/crypto_suite_test.go @@ -0,0 +1,13 @@ +package crypto_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestCrypto(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Crypto Suite") +} diff --git a/pkg/crypto/parser.go b/pkg/crypto/parser.go new file mode 100644 index 00000000..30db1699 --- /dev/null +++ b/pkg/crypto/parser.go @@ -0,0 +1,21 @@ +package crypto + +import ( + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type PublicKeyParser interface { + ParsePublicKey(privateKey string) (string, error) +} + +type EthPublicKeyParser struct{} + +func (EthPublicKeyParser) ParsePublicKey(privateKey string) (string, error) { + np, err := crypto.HexToECDSA(privateKey) + if err != nil { + return "", err + } + pubKey := discover.PubkeyID(&np.PublicKey) + return pubKey.String(), nil +} diff --git a/pkg/crypto/parser_test.go b/pkg/crypto/parser_test.go new file mode 100644 index 00000000..77c1478e --- /dev/null +++ b/pkg/crypto/parser_test.go @@ -0,0 +1,19 @@ +package crypto_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/crypto" +) + +var _ = Describe("Public key parser", func() { + It("parses public key from private key", func() { + privKey := "0000000000000000000000000000000000000000000000000000000000000001" + parser := crypto.EthPublicKeyParser{} + + pubKey, err := parser.ParsePublicKey(privKey) + + Expect(err).NotTo(HaveOccurred()) + Expect(pubKey).To(Equal("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8")) + }) +}) diff --git a/pkg/datastore/inmemory/block_repository.go b/pkg/datastore/inmemory/block_repository.go index 86a23ec8..ee07efd2 100644 --- a/pkg/datastore/inmemory/block_repository.go +++ b/pkg/datastore/inmemory/block_repository.go @@ -26,7 +26,7 @@ func (blockRepository *BlockRepository) GetBlock(blockNumber int64) (core.Block, return core.Block{}, datastore.ErrBlockDoesNotExist(blockNumber) } -func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { +func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 { missingNumbers := []int64{} for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { if _, ok := blockRepository.blocks[blockNumber]; !ok { diff --git a/pkg/datastore/postgres/postgres_test.go b/pkg/datastore/postgres/postgres_test.go index 784cef27..7c4555a7 100644 --- a/pkg/datastore/postgres/postgres_test.go +++ b/pkg/datastore/postgres/postgres_test.go @@ -76,7 +76,7 @@ var _ = Describe("Postgres DB", func() { } node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} db := test_config.NewTestDB(node) - blocksRepository := repositories.BlockRepository{DB: db} + blocksRepository := repositories.NewBlockRepository(db) _, err1 := blocksRepository.CreateOrUpdateBlock(badBlock) @@ -131,7 +131,7 @@ var _ = Describe("Postgres DB", func() { } node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} db, _ := postgres.NewDB(test_config.DBConfig, node) - blockRepository := repositories.BlockRepository{DB: db} + blockRepository := repositories.NewBlockRepository(db) _, err1 := blockRepository.CreateOrUpdateBlock(block) diff --git a/pkg/datastore/postgres/repositories/block_repository.go b/pkg/datastore/postgres/repositories/block_repository.go index 752b6da5..c1c6cfbe 100644 --- a/pkg/datastore/postgres/repositories/block_repository.go +++ b/pkg/datastore/postgres/repositories/block_repository.go @@ -20,12 +20,16 @@ const ( var ErrBlockExists = errors.New("Won't add block that already exists.") type BlockRepository struct { - *postgres.DB + database *postgres.DB +} + +func NewBlockRepository(database *postgres.DB) *BlockRepository { + return &BlockRepository{database: database} } func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) { cutoff := chainHead - blocksFromHeadBeforeFinal - blockRepository.DB.Exec(` + blockRepository.database.Exec(` UPDATE blocks SET is_final = TRUE WHERE is_final = FALSE AND number < $1`, cutoff) @@ -48,22 +52,22 @@ func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) (in return blockId, ErrBlockExists } -func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 { +func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64, nodeId string) []int64 { numbers := make([]int64, 0) - blockRepository.DB.Select(&numbers, + blockRepository.database.Select(&numbers, `SELECT all_block_numbers FROM ( SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series LEFT JOIN blocks ON number = all_block_numbers - WHERE number ISNULL`, + WHERE number ISNULL OR eth_node_fingerprint != $3`, startingBlockNumber, - highestBlockNumber) + highestBlockNumber, nodeId) return numbers } func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, error) { - blockRows := blockRepository.DB.QueryRowx( + blockRows := blockRepository.database.QueryRowx( `SELECT id, number, gaslimit, @@ -81,7 +85,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, reward, uncles_reward FROM blocks - WHERE eth_node_id = $1 AND number = $2`, blockRepository.NodeID, blockNumber) + WHERE eth_node_id = $1 AND number = $2`, blockRepository.database.NodeID, blockNumber) savedBlock, err := blockRepository.loadBlock(blockRows) if err != nil { switch err { @@ -96,17 +100,17 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, error) { var blockId int64 - tx, _ := blockRepository.DB.BeginTx(context.Background(), nil) + tx, _ := blockRepository.database.BeginTx(context.Background(), nil) err := tx.QueryRow( `INSERT INTO blocks - (eth_node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + (eth_node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward, eth_node_fingerprint) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING id `, - blockRepository.NodeID, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward). + blockRepository.database.NodeID, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward, blockRepository.database.Node.ID). Scan(&blockId) if err != nil { tx.Rollback() - return 0, postgres.ErrDBInsertFailed + return 0, err } if len(block.Transactions) > 0 { err = blockRepository.createTransactions(tx, blockId, block.Transactions) @@ -189,11 +193,11 @@ func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64, func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) { var retrievedBlockHash string - blockRepository.DB.Get(&retrievedBlockHash, + blockRepository.database.Get(&retrievedBlockHash, `SELECT hash FROM blocks WHERE number = $1 AND eth_node_id = $2`, - block.Number, blockRepository.NodeID) + block.Number, blockRepository.database.NodeID) return retrievedBlockHash, blockExists(retrievedBlockHash) } @@ -217,11 +221,11 @@ func blockExists(retrievedBlockHash string) bool { } func (blockRepository BlockRepository) removeBlock(blockNumber int64) error { - _, err := blockRepository.DB.Exec( + _, err := blockRepository.database.Exec( `DELETE FROM blocks WHERE number=$1 AND eth_node_id=$2`, - blockNumber, blockRepository.NodeID) + blockNumber, blockRepository.database.NodeID) if err != nil { return postgres.ErrDBDeleteFailed } @@ -238,7 +242,7 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc if err != nil { return core.Block{}, err } - transactionRows, err := blockRepository.DB.Queryx(` + transactionRows, err := blockRepository.database.Queryx(` SELECT hash, nonce, tx_to, diff --git a/pkg/datastore/postgres/repositories/block_repository_test.go b/pkg/datastore/postgres/repositories/block_repository_test.go index b1b21688..1b2c64fc 100644 --- a/pkg/datastore/postgres/repositories/block_repository_test.go +++ b/pkg/datastore/postgres/repositories/block_repository_test.go @@ -15,16 +15,18 @@ import ( var _ = Describe("Saving blocks", func() { var db *postgres.DB + var node core.Node var blockRepository datastore.BlockRepository + BeforeEach(func() { - node := core.Node{ + node = core.Node{ GenesisBlock: "GENESIS", NetworkID: 1, ID: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } db = test_config.NewTestDB(node) - blockRepository = repositories.BlockRepository{DB: db} + blockRepository = repositories.NewBlockRepository(db) }) @@ -40,7 +42,7 @@ var _ = Describe("Saving blocks", func() { ClientName: "Geth", } dbTwo := test_config.NewTestDB(nodeTwo) - repositoryTwo := repositories.BlockRepository{DB: dbTwo} + repositoryTwo := repositories.NewBlockRepository(dbTwo) _, err := repositoryTwo.GetBlock(123) Expect(err).To(HaveOccurred()) @@ -166,7 +168,7 @@ var _ = Describe("Saving blocks", func() { NetworkID: 1, } dbTwo := test_config.NewTestDB(nodeTwo) - repositoryTwo := repositories.BlockRepository{DB: dbTwo} + repositoryTwo := repositories.NewBlockRepository(dbTwo) blockRepository.CreateOrUpdateBlock(blockOne) repositoryTwo.CreateOrUpdateBlock(blockTwo) @@ -236,32 +238,39 @@ var _ = Describe("Saving blocks", func() { It("is empty the starting block number is the highest known block number", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 1}) - Expect(len(blockRepository.MissingBlockNumbers(1, 1))).To(Equal(0)) + Expect(len(blockRepository.MissingBlockNumbers(1, 1, node.ID))).To(Equal(0)) }) It("is the only missing block number", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 2}) - Expect(blockRepository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1})) + Expect(blockRepository.MissingBlockNumbers(1, 2, node.ID)).To(Equal([]int64{1})) }) It("is both missing block numbers", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) - Expect(blockRepository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2})) + Expect(blockRepository.MissingBlockNumbers(1, 3, node.ID)).To(Equal([]int64{1, 2})) }) It("goes back to the starting block number", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) - Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5})) + Expect(blockRepository.MissingBlockNumbers(4, 6, node.ID)).To(Equal([]int64{4, 5})) }) It("only includes missing block numbers", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 4}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) - Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5})) + Expect(blockRepository.MissingBlockNumbers(4, 6, node.ID)).To(Equal([]int64{5})) + }) + + It("includes blocks created by a different node", func() { + blockRepository.CreateOrUpdateBlock(core.Block{Number: 4}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) + + Expect(blockRepository.MissingBlockNumbers(4, 6, "Different node id")).To(Equal([]int64{4, 5, 6})) }) It("is a list with multiple gaps", func() { @@ -270,18 +279,18 @@ var _ = Describe("Saving blocks", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 10}) - Expect(blockRepository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9})) + Expect(blockRepository.MissingBlockNumbers(3, 10, node.ID)).To(Equal([]int64{3, 6, 7, 9})) }) It("returns empty array when lower bound exceeds upper bound", func() { - Expect(blockRepository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{})) + Expect(blockRepository.MissingBlockNumbers(10000, 1, node.ID)).To(Equal([]int64{})) }) It("only returns requested range even when other gaps exist", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) - Expect(blockRepository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5})) + Expect(blockRepository.MissingBlockNumbers(1, 5, node.ID)).To(Equal([]int64{1, 2, 4, 5})) }) }) diff --git a/pkg/datastore/postgres/repositories/contract_repository_test.go b/pkg/datastore/postgres/repositories/contract_repository_test.go index 6aa1e7f1..0ecaf8bf 100644 --- a/pkg/datastore/postgres/repositories/contract_repository_test.go +++ b/pkg/datastore/postgres/repositories/contract_repository_test.go @@ -53,7 +53,7 @@ var _ = Describe("Creating contracts", func() { It("returns transactions 'To' a contract", func() { var blockRepository datastore.BlockRepository - blockRepository = repositories.BlockRepository{DB: db} + blockRepository = repositories.NewBlockRepository(db) block := core.Block{ Number: 123, Transactions: []core.Transaction{ diff --git a/pkg/datastore/postgres/repositories/logs_repository_test.go b/pkg/datastore/postgres/repositories/logs_repository_test.go index ac117845..072f49bc 100644 --- a/pkg/datastore/postgres/repositories/logs_repository_test.go +++ b/pkg/datastore/postgres/repositories/logs_repository_test.go @@ -28,7 +28,7 @@ var _ = Describe("Logs Repository", func() { ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } db = test_config.NewTestDB(node) - blockRepository = repositories.BlockRepository{DB: db} + blockRepository = repositories.NewBlockRepository(db) logsRepository = repositories.LogRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db} }) diff --git a/pkg/datastore/postgres/repositories/receipts_repository_test.go b/pkg/datastore/postgres/repositories/receipts_repository_test.go index 80dbffb4..2ba46e05 100644 --- a/pkg/datastore/postgres/repositories/receipts_repository_test.go +++ b/pkg/datastore/postgres/repositories/receipts_repository_test.go @@ -10,7 +10,7 @@ import ( "github.com/vulcanize/vulcanizedb/test_config" ) -var _ = Describe("Logs Repository", func() { +var _ = Describe("Receipts Repository", func() { var blockRepository datastore.BlockRepository var logRepository datastore.LogRepository var receiptRepository datastore.ReceiptRepository @@ -24,7 +24,7 @@ var _ = Describe("Logs Repository", func() { ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } db = test_config.NewTestDB(node) - blockRepository = repositories.BlockRepository{DB: db} + blockRepository = repositories.NewBlockRepository(db) logRepository = repositories.LogRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db} }) @@ -99,11 +99,12 @@ var _ = Describe("Logs Repository", func() { Hash: expected.TxHash, Receipt: expected, } - block := core.Block{Transactions: []core.Transaction{transaction}} - blockRepository.CreateOrUpdateBlock(block) - receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223") + _, err := blockRepository.CreateOrUpdateBlock(block) + + Expect(err).NotTo(HaveOccurred()) + receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223") Expect(err).ToNot(HaveOccurred()) //Not currently serializing bloom logs Expect(receipt.Bloom).To(Equal(core.Receipt{}.Bloom)) @@ -132,10 +133,11 @@ var _ = Describe("Logs Repository", func() { block := core.Block{ Transactions: []core.Transaction{transaction}, } - blockRepository.CreateOrUpdateBlock(block) - _, err := receiptRepository.GetReceipt(receipt.TxHash) + _, err := blockRepository.CreateOrUpdateBlock(block) + Expect(err).NotTo(HaveOccurred()) + _, err = receiptRepository.GetReceipt(receipt.TxHash) Expect(err).To(Not(HaveOccurred())) }) }) diff --git a/pkg/datastore/postgres/repositories/watched_events_repository_test.go b/pkg/datastore/postgres/repositories/watched_events_repository_test.go index 37eef2f9..0192dc31 100644 --- a/pkg/datastore/postgres/repositories/watched_events_repository_test.go +++ b/pkg/datastore/postgres/repositories/watched_events_repository_test.go @@ -21,7 +21,7 @@ var _ = Describe("Watched Events Repository", func() { BeforeEach(func() { db = test_config.NewTestDB(core.Node{}) - blocksRepository = repositories.BlockRepository{DB: db} + blocksRepository = repositories.NewBlockRepository(db) filterRepository = repositories.FilterRepository{DB: db} logRepository = repositories.LogRepository{DB: db} receiptRepository = repositories.ReceiptRepository{DB: db} diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 3eda94ff..4b9592e8 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -14,7 +14,7 @@ var ErrBlockDoesNotExist = func(blockNumber int64) error { type BlockRepository interface { CreateOrUpdateBlock(block core.Block) (int64, error) GetBlock(blockNumber int64) (core.Block, error) - MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 + MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 SetBlocksStatus(chainHead int64) } diff --git a/pkg/fakes/mock_block_repository.go b/pkg/fakes/mock_block_repository.go index 586522b2..6391c198 100644 --- a/pkg/fakes/mock_block_repository.go +++ b/pkg/fakes/mock_block_repository.go @@ -7,18 +7,32 @@ import ( ) type MockBlockRepository struct { - createOrUpdateBlockCalled bool - createOrUpdateBlockPassedBlock core.Block - createOrUpdateBlockReturnInt int64 - createOrUpdateBlockReturnErr error + createOrUpdateBlockCalled bool + createOrUpdateBlockPassedBlock core.Block + createOrUpdateBlockReturnInt int64 + createOrUpdateBlockReturnErr error + missingBlockNumbersCalled bool + missingBlockNumbersPassedStartingBlockNumber int64 + missingBlockNumbersPassedEndingBlockNumber int64 + missingBlockNumbersPassedNodeId string + missingBlockNumbersReturnArray []int64 + setBlockStatusCalled bool + setBlockStatusPassedChainHead int64 } func NewMockBlockRepository() *MockBlockRepository { return &MockBlockRepository{ - createOrUpdateBlockCalled: false, - createOrUpdateBlockPassedBlock: core.Block{}, - createOrUpdateBlockReturnInt: 0, - createOrUpdateBlockReturnErr: nil, + createOrUpdateBlockCalled: false, + createOrUpdateBlockPassedBlock: core.Block{}, + createOrUpdateBlockReturnInt: 0, + createOrUpdateBlockReturnErr: nil, + missingBlockNumbersCalled: false, + missingBlockNumbersPassedStartingBlockNumber: 0, + missingBlockNumbersPassedEndingBlockNumber: 0, + missingBlockNumbersPassedNodeId: "", + missingBlockNumbersReturnArray: nil, + setBlockStatusCalled: false, + setBlockStatusPassedChainHead: 0, } } @@ -27,6 +41,10 @@ func (mbr *MockBlockRepository) SetCreateOrUpdateBlockReturnVals(i int64, err er mbr.createOrUpdateBlockReturnErr = err } +func (mbr *MockBlockRepository) SetMissingBlockNumbersReturnArray(returnArray []int64) { + mbr.missingBlockNumbersReturnArray = returnArray +} + func (mbr *MockBlockRepository) CreateOrUpdateBlock(block core.Block) (int64, error) { mbr.createOrUpdateBlockCalled = true mbr.createOrUpdateBlockPassedBlock = block @@ -37,15 +55,32 @@ func (mbr *MockBlockRepository) GetBlock(blockNumber int64) (core.Block, error) panic("implement me") } -func (mbr *MockBlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { - panic("implement me") +func (mbr *MockBlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 { + mbr.missingBlockNumbersCalled = true + mbr.missingBlockNumbersPassedStartingBlockNumber = startingBlockNumber + mbr.missingBlockNumbersPassedEndingBlockNumber = endingBlockNumber + mbr.missingBlockNumbersPassedNodeId = nodeId + return mbr.missingBlockNumbersReturnArray } func (mbr *MockBlockRepository) SetBlocksStatus(chainHead int64) { - panic("implement me") + mbr.setBlockStatusCalled = true + mbr.setBlockStatusPassedChainHead = chainHead } func (mbr *MockBlockRepository) AssertCreateOrUpdateBlockCalledWith(block core.Block) { Expect(mbr.createOrUpdateBlockCalled).To(BeTrue()) Expect(mbr.createOrUpdateBlockPassedBlock).To(Equal(block)) } + +func (mbr *MockBlockRepository) AssertMissingBlockNumbersCalledWith(startingBlockNumber int64, endingBlockNumber int64, nodeId string) { + Expect(mbr.missingBlockNumbersCalled).To(BeTrue()) + Expect(mbr.missingBlockNumbersPassedStartingBlockNumber).To(Equal(startingBlockNumber)) + Expect(mbr.missingBlockNumbersPassedEndingBlockNumber).To(Equal(endingBlockNumber)) + Expect(mbr.missingBlockNumbersPassedNodeId).To(Equal(nodeId)) +} + +func (mbr *MockBlockRepository) AssertSetBlockStatusCalledWith(chainHead int64) { + Expect(mbr.setBlockStatusCalled).To(BeTrue()) + Expect(mbr.setBlockStatusPassedChainHead).To(Equal(chainHead)) +} diff --git a/pkg/fakes/mock_crypto_parser.go b/pkg/fakes/mock_crypto_parser.go new file mode 100644 index 00000000..98b21f80 --- /dev/null +++ b/pkg/fakes/mock_crypto_parser.go @@ -0,0 +1,38 @@ +package fakes + +import . "github.com/onsi/gomega" + +type MockCryptoParser struct { + parsePublicKeyCalled bool + parsePublicKeyPassedPrivateKey string + parsePublicKeyReturnString string + parsePublicKeyReturnErr error +} + +func NewMockCryptoParser() *MockCryptoParser { + return &MockCryptoParser{ + parsePublicKeyCalled: false, + parsePublicKeyPassedPrivateKey: "", + parsePublicKeyReturnString: "", + parsePublicKeyReturnErr: nil, + } +} + +func (mcp *MockCryptoParser) SetReturnVal(pubKey string) { + mcp.parsePublicKeyReturnString = pubKey +} + +func (mcp *MockCryptoParser) SetReturnErr(err error) { + mcp.parsePublicKeyReturnErr = err +} + +func (mcp *MockCryptoParser) ParsePublicKey(privateKey string) (string, error) { + mcp.parsePublicKeyCalled = true + mcp.parsePublicKeyPassedPrivateKey = privateKey + return mcp.parsePublicKeyReturnString, mcp.parsePublicKeyReturnErr +} + +func (mcp *MockCryptoParser) AssertParsePublicKeyCalledWith(privateKey string) { + Expect(mcp.parsePublicKeyCalled).To(BeTrue()) + Expect(mcp.parsePublicKeyPassedPrivateKey).To(Equal(privateKey)) +} diff --git a/pkg/fakes/mock_fs_reader.go b/pkg/fakes/mock_fs_reader.go new file mode 100644 index 00000000..c9a74466 --- /dev/null +++ b/pkg/fakes/mock_fs_reader.go @@ -0,0 +1,38 @@ +package fakes + +import . "github.com/onsi/gomega" + +type MockFsReader struct { + readCalled bool + readPassedPath string + readReturnBytes []byte + readReturnErr error +} + +func NewMockFsReader() *MockFsReader { + return &MockFsReader{ + readCalled: false, + readPassedPath: "", + readReturnBytes: nil, + readReturnErr: nil, + } +} + +func (mfr *MockFsReader) SetReturnBytes(returnBytes []byte) { + mfr.readReturnBytes = returnBytes +} + +func (mfr *MockFsReader) SetReturnErr(err error) { + mfr.readReturnErr = err +} + +func (mfr *MockFsReader) Read(path string) ([]byte, error) { + mfr.readCalled = true + mfr.readPassedPath = path + return mfr.readReturnBytes, mfr.readReturnErr +} + +func (mfr *MockFsReader) AssertReadCalledWith(path string) { + Expect(mfr.readCalled).To(BeTrue()) + Expect(mfr.readPassedPath).To(Equal(path)) +} diff --git a/pkg/fs/reader.go b/pkg/fs/reader.go new file mode 100644 index 00000000..4029b8a7 --- /dev/null +++ b/pkg/fs/reader.go @@ -0,0 +1,14 @@ +package fs + +import "io/ioutil" + +type Reader interface { + Read(path string) ([]byte, error) +} + +type FsReader struct { +} + +func (FsReader) Read(path string) ([]byte, error) { + return ioutil.ReadFile(path) +} diff --git a/pkg/geth/abi.go b/pkg/geth/abi.go index d38c6111..5211dd98 100644 --- a/pkg/geth/abi.go +++ b/pkg/geth/abi.go @@ -2,7 +2,6 @@ package geth import ( "errors" - "io/ioutil" "strings" "encoding/json" @@ -11,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/vulcanize/vulcanizedb/pkg/fs" ) var ( @@ -80,7 +80,8 @@ func ParseAbi(abiString string) (abi.ABI, error) { } func ReadAbiFile(abiFilePath string) (string, error) { - filesBytes, err := ioutil.ReadFile(abiFilePath) + reader := fs.FsReader{} + filesBytes, err := reader.Read(abiFilePath) if err != nil { return "", ErrMissingAbiFile } diff --git a/pkg/geth/cold_import/cold_import_suite_test.go b/pkg/geth/cold_import/cold_import_suite_test.go new file mode 100644 index 00000000..8c524782 --- /dev/null +++ b/pkg/geth/cold_import/cold_import_suite_test.go @@ -0,0 +1,13 @@ +package cold_import_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestColdImport(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ColdImport Suite") +} diff --git a/pkg/geth/cold_importer.go b/pkg/geth/cold_import/importer.go similarity index 78% rename from pkg/geth/cold_importer.go rename to pkg/geth/cold_import/importer.go index 86cb9820..fbf4f90f 100644 --- a/pkg/geth/cold_importer.go +++ b/pkg/geth/cold_import/importer.go @@ -1,9 +1,8 @@ -package geth +package cold_import import ( "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/ethereum" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" ) @@ -23,22 +22,21 @@ func NewColdImporter(ethDB ethereum.Database, blockRepository datastore.BlockRep } } -func (ci *ColdImporter) Execute(startingBlockNumber int64, endingBlockNumber int64) error { - for i := startingBlockNumber; i <= endingBlockNumber; i++ { - hash := ci.ethDB.GetBlockHash(i) +func (ci *ColdImporter) Execute(startingBlockNumber int64, endingBlockNumber int64, nodeId string) error { + missingBlocks := ci.blockRepository.MissingBlockNumbers(startingBlockNumber, endingBlockNumber, nodeId) + for _, n := range missingBlocks { + hash := ci.ethDB.GetBlockHash(n) - blockId, err := ci.createBlocksAndTransactions(hash, i) + blockId, err := ci.createBlocksAndTransactions(hash, n) if err != nil { - if err == repositories.ErrBlockExists { - continue - } return err } - err = ci.createReceiptsAndLogs(hash, i, blockId) + err = ci.createReceiptsAndLogs(hash, n, blockId) if err != nil { return err } } + ci.blockRepository.SetBlocksStatus(endingBlockNumber) return nil } diff --git a/pkg/geth/cold_import/importer_test.go b/pkg/geth/cold_import/importer_test.go new file mode 100644 index 00000000..5f9bf58c --- /dev/null +++ b/pkg/geth/cold_import/importer_test.go @@ -0,0 +1,133 @@ +package cold_import_test + +import ( + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/geth/cold_import" + vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" +) + +var _ = Describe("Geth cold importer", func() { + var fakeGethBlock *types.Block + + BeforeEach(func() { + header := &types.Header{} + transactions := []*types.Transaction{} + uncles := []*types.Header{} + receipts := []*types.Receipt{} + fakeGethBlock = types.NewBlock(header, transactions, uncles, receipts) + }) + + It("only populates missing blocks", func() { + mockEthereumDatabase := fakes.NewMockEthereumDatabase() + mockBlockRepository := fakes.NewMockBlockRepository() + mockReceiptRepository := fakes.NewMockReceiptRepository() + mockTransactionConverter := fakes.NewMockTransactionConverter() + blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) + + nodeId := "node_id" + startingBlockNumber := int64(120) + missingBlockNumber := int64(123) + endingBlockNumber := int64(125) + fakeHash := []byte{1, 2, 3, 4, 5} + mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{missingBlockNumber}) + mockEthereumDatabase.SetReturnHash(fakeHash) + mockEthereumDatabase.SetReturnBlock(fakeGethBlock) + importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) + + importer.Execute(startingBlockNumber, endingBlockNumber, nodeId) + + mockBlockRepository.AssertMissingBlockNumbersCalledWith(startingBlockNumber, endingBlockNumber, nodeId) + mockEthereumDatabase.AssertGetBlockHashCalledWith(missingBlockNumber) + mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, missingBlockNumber) + }) + + It("fetches missing blocks from level db and persists them to pg", func() { + mockEthereumDatabase := fakes.NewMockEthereumDatabase() + mockBlockRepository := fakes.NewMockBlockRepository() + mockReceiptRepository := fakes.NewMockReceiptRepository() + mockTransactionConverter := fakes.NewMockTransactionConverter() + blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) + + blockNumber := int64(123) + fakeHash := []byte{1, 2, 3, 4, 5} + mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{blockNumber}) + mockEthereumDatabase.SetReturnHash(fakeHash) + mockEthereumDatabase.SetReturnBlock(fakeGethBlock) + importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) + + importer.Execute(blockNumber, blockNumber, "node_id") + + mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber) + mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber) + mockTransactionConverter.AssertConvertTransactionsToCoreCalledWith(fakeGethBlock) + convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock) + Expect(err).NotTo(HaveOccurred()) + mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock) + }) + + It("sets is_final status on populated blocks", func() { + mockEthereumDatabase := fakes.NewMockEthereumDatabase() + mockBlockRepository := fakes.NewMockBlockRepository() + mockReceiptRepository := fakes.NewMockReceiptRepository() + mockTransactionConverter := fakes.NewMockTransactionConverter() + blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) + + startingBlockNumber := int64(120) + endingBlockNumber := int64(125) + fakeHash := []byte{1, 2, 3, 4, 5} + mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{startingBlockNumber}) + mockEthereumDatabase.SetReturnHash(fakeHash) + mockEthereumDatabase.SetReturnBlock(fakeGethBlock) + importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) + + importer.Execute(startingBlockNumber, endingBlockNumber, "node_id") + + mockBlockRepository.AssertSetBlockStatusCalledWith(endingBlockNumber) + }) + + It("fetches receipts from level db and persists them to pg", func() { + mockEthereumDatabase := fakes.NewMockEthereumDatabase() + mockBlockRepository := fakes.NewMockBlockRepository() + mockReceiptRepository := fakes.NewMockReceiptRepository() + mockTransactionConverter := fakes.NewMockTransactionConverter() + blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) + + blockNumber := int64(123) + blockId := int64(999) + mockBlockRepository.SetCreateOrUpdateBlockReturnVals(blockId, nil) + fakeReceipts := types.Receipts{{}} + mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{blockNumber}) + mockEthereumDatabase.SetReturnBlock(fakeGethBlock) + mockEthereumDatabase.SetReturnReceipts(fakeReceipts) + importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) + + importer.Execute(blockNumber, blockNumber, "node_id") + + expectedReceipts := vulcCommon.ToCoreReceipts(fakeReceipts) + mockReceiptRepository.AssertCreateReceiptsAndLogsCalledWith(blockId, expectedReceipts) + }) + + It("does not fetch receipts if block already exists", func() { + mockEthereumDatabase := fakes.NewMockEthereumDatabase() + mockBlockRepository := fakes.NewMockBlockRepository() + mockReceiptRepository := fakes.NewMockReceiptRepository() + mockTransactionConverter := fakes.NewMockTransactionConverter() + blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) + + blockNumber := int64(123) + mockBlockRepository.SetMissingBlockNumbersReturnArray([]int64{}) + mockEthereumDatabase.SetReturnBlock(fakeGethBlock) + mockBlockRepository.SetCreateOrUpdateBlockReturnVals(0, repositories.ErrBlockExists) + importer := cold_import.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) + + err := importer.Execute(blockNumber, blockNumber, "node_id") + + Expect(err).NotTo(HaveOccurred()) + mockReceiptRepository.AssertCreateReceiptsAndLogsNotCalled() + }) +}) diff --git a/pkg/geth/cold_import/node_builder.go b/pkg/geth/cold_import/node_builder.go new file mode 100644 index 00000000..f3f79899 --- /dev/null +++ b/pkg/geth/cold_import/node_builder.go @@ -0,0 +1,68 @@ +package cold_import + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/crypto" + "github.com/vulcanize/vulcanizedb/pkg/fs" + "strings" +) + +const ( + ColdImportClientName = "LevelDbColdImport" + ColdImportNetworkId float64 = 1 +) + +var ( + NoChainDataErr = errors.New("Level DB path does not include chaindata extension.") + NoGethRootErr = errors.New("Level DB path does not include root path to geth.") +) + +type ColdImportNodeBuilder struct { + reader fs.Reader + parser crypto.PublicKeyParser +} + +func NewColdImportNodeBuilder(reader fs.Reader, parser crypto.PublicKeyParser) ColdImportNodeBuilder { + return ColdImportNodeBuilder{reader: reader, parser: parser} +} + +func (cinb ColdImportNodeBuilder) GetNode(genesisBlock []byte, levelPath string) (core.Node, error) { + var coldNode core.Node + nodeKeyPath, err := getNodeKeyPath(levelPath) + if err != nil { + return coldNode, err + } + nodeKey, err := cinb.reader.Read(nodeKeyPath) + if err != nil { + return coldNode, err + } + nodeId, err := cinb.parser.ParsePublicKey(string(nodeKey)) + if err != nil { + return coldNode, err + } + genesisBlockHash := common.BytesToHash(genesisBlock).String() + coldNode = core.Node{ + GenesisBlock: genesisBlockHash, + NetworkID: ColdImportNetworkId, + ID: nodeId, + ClientName: ColdImportClientName, + } + return coldNode, nil +} + +func getNodeKeyPath(levelPath string) (string, error) { + chaindataExtension := "chaindata" + if !strings.Contains(levelPath, chaindataExtension) { + return "", NoChainDataErr + } + chaindataExtensionLength := len(chaindataExtension) + gethRootPathLength := len(levelPath) - chaindataExtensionLength + if gethRootPathLength <= chaindataExtensionLength { + return "", NoGethRootErr + } + gethRootPath := levelPath[:gethRootPathLength] + nodeKeyPath := gethRootPath + "nodekey" + return nodeKeyPath, nil +} diff --git a/pkg/geth/cold_import/node_builder_test.go b/pkg/geth/cold_import/node_builder_test.go new file mode 100644 index 00000000..ec19a009 --- /dev/null +++ b/pkg/geth/cold_import/node_builder_test.go @@ -0,0 +1,97 @@ +package cold_import_test + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/geth/cold_import" +) + +var _ = Describe("Cold importer node builder", func() { + Describe("when level path is not valid", func() { + It("returns error if no chaindata extension", func() { + gethPath := "path/to/geth" + mockReader := fakes.NewMockFsReader() + mockParser := fakes.NewMockCryptoParser() + nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser) + + _, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, gethPath) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(cold_import.NoChainDataErr)) + }) + + It("returns error if no root geth path", func() { + chaindataPath := "chaindata" + mockReader := fakes.NewMockFsReader() + mockParser := fakes.NewMockCryptoParser() + nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser) + + _, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, chaindataPath) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(cold_import.NoGethRootErr)) + }) + }) + + Describe("when reader fails", func() { + It("returns err", func() { + mockReader := fakes.NewMockFsReader() + fakeError := errors.New("Failed") + mockReader.SetReturnErr(fakeError) + mockParser := fakes.NewMockCryptoParser() + nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser) + + _, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, "path/to/geth/chaindata") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakeError)) + }) + }) + + Describe("when parser fails", func() { + It("returns err", func() { + mockReader := fakes.NewMockFsReader() + mockParser := fakes.NewMockCryptoParser() + fakeErr := errors.New("Failed") + mockParser.SetReturnErr(fakeErr) + nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser) + + _, err := nodeBuilder.GetNode([]byte{1, 2, 3, 4, 5}, "path/to/geth/chaindata") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakeErr)) + }) + }) + + Describe("when path is valid and reader and parser succeed", func() { + It("builds a node", func() { + fakeGenesisBlock := []byte{1, 2, 3, 4, 5} + fakeRootGethPath := "root/path/to/geth/" + fakeLevelPath := fakeRootGethPath + "chaindata" + fakeNodeKeyPath := fakeRootGethPath + "nodekey" + fakePublicKeyBytes := []byte{5, 4, 3, 2, 1} + fakePublicKeyString := "public_key" + mockReader := fakes.NewMockFsReader() + mockReader.SetReturnBytes(fakePublicKeyBytes) + mockParser := fakes.NewMockCryptoParser() + mockParser.SetReturnVal(fakePublicKeyString) + nodeBuilder := cold_import.NewColdImportNodeBuilder(mockReader, mockParser) + + result, err := nodeBuilder.GetNode(fakeGenesisBlock, fakeLevelPath) + + Expect(err).NotTo(HaveOccurred()) + mockReader.AssertReadCalledWith(fakeNodeKeyPath) + mockParser.AssertParsePublicKeyCalledWith(string(fakePublicKeyBytes)) + Expect(result).NotTo(BeNil()) + Expect(result.ClientName).To(Equal(cold_import.ColdImportClientName)) + expectedGenesisBlock := common.BytesToHash(fakeGenesisBlock).String() + Expect(result.GenesisBlock).To(Equal(expectedGenesisBlock)) + Expect(result.ID).To(Equal(fakePublicKeyString)) + Expect(result.NetworkID).To(Equal(cold_import.ColdImportNetworkId)) + }) + }) + +}) diff --git a/pkg/geth/cold_importer_test.go b/pkg/geth/cold_importer_test.go deleted file mode 100644 index 012adade..00000000 --- a/pkg/geth/cold_importer_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package geth_test - -import ( - "github.com/ethereum/go-ethereum/core/types" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/pkg/geth" - vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" -) - -var _ = Describe("Geth cold importer", func() { - var fakeGethBlock *types.Block - - BeforeEach(func() { - header := &types.Header{} - transactions := []*types.Transaction{} - uncles := []*types.Header{} - receipts := []*types.Receipt{} - fakeGethBlock = types.NewBlock(header, transactions, uncles, receipts) - }) - - It("fetches blocks from level db and persists them to pg", func() { - mockEthereumDatabase := fakes.NewMockEthereumDatabase() - mockBlockRepository := fakes.NewMockBlockRepository() - mockReceiptRepository := fakes.NewMockReceiptRepository() - mockTransactionConverter := fakes.NewMockTransactionConverter() - blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) - - blockNumber := int64(123) - fakeHash := []byte{1, 2, 3, 4, 5} - mockEthereumDatabase.SetReturnHash(fakeHash) - mockEthereumDatabase.SetReturnBlock(fakeGethBlock) - importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - - importer.Execute(blockNumber, blockNumber) - - mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber) - mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber) - mockTransactionConverter.AssertConvertTransactionsToCoreCalledWith(fakeGethBlock) - convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock) - Expect(err).NotTo(HaveOccurred()) - mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock) - }) - - It("fetches receipts from level db and persists them to pg", func() { - mockEthereumDatabase := fakes.NewMockEthereumDatabase() - mockBlockRepository := fakes.NewMockBlockRepository() - mockReceiptRepository := fakes.NewMockReceiptRepository() - mockTransactionConverter := fakes.NewMockTransactionConverter() - blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) - - blockNumber := int64(123) - blockId := int64(999) - mockBlockRepository.SetCreateOrUpdateBlockReturnVals(blockId, nil) - fakeReceipts := types.Receipts{{}} - mockEthereumDatabase.SetReturnBlock(fakeGethBlock) - mockEthereumDatabase.SetReturnReceipts(fakeReceipts) - importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - - importer.Execute(blockNumber, blockNumber) - - expectedReceipts := vulcCommon.ToCoreReceipts(fakeReceipts) - mockReceiptRepository.AssertCreateReceiptsAndLogsCalledWith(blockId, expectedReceipts) - }) - - It("does not fetch receipts if block already exists", func() { - mockEthereumDatabase := fakes.NewMockEthereumDatabase() - mockBlockRepository := fakes.NewMockBlockRepository() - mockReceiptRepository := fakes.NewMockReceiptRepository() - mockTransactionConverter := fakes.NewMockTransactionConverter() - blockConverter := vulcCommon.NewBlockConverter(mockTransactionConverter) - - mockEthereumDatabase.SetReturnBlock(fakeGethBlock) - mockBlockRepository.SetCreateOrUpdateBlockReturnVals(0, repositories.ErrBlockExists) - importer := geth.NewColdImporter(mockEthereumDatabase, mockBlockRepository, mockReceiptRepository, blockConverter) - - err := importer.Execute(1, 1) - - Expect(err).NotTo(HaveOccurred()) - mockReceiptRepository.AssertCreateReceiptsAndLogsNotCalled() - }) -}) diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index 89ff9a11..52106a75 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -9,7 +9,7 @@ import ( func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { lastBlock := blockchain.LastBlock().Int64() - blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1) + blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1, blockchain.Node().ID) log.SetPrefix("") log.Printf("Backfilling %d blocks\n\n", len(blockRange)) RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)