From 135527101196debfb1e8d550806888c68858d392 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Tue, 17 Jul 2018 16:23:07 -0500 Subject: [PATCH] Add light sync command - Only syncs block headers (excludes block bodies, transactions, receipts, and logs) - Modifies validation window to include the most recent block - Isolates validation window to the variable defined in the cmd directory (blocks have a separate variable defined in the block_repository for determining when to set a block as final) --- README.md | 14 +- cmd/coldImport.go | 6 +- cmd/erc20.go | 2 +- cmd/lightSync.go | 91 +++++++ cmd/root.go | 14 + cmd/sync.go | 31 ++- .../1531758090_create_headers_table.down.sql | 1 + .../1531758090_create_headers_table.up.sql | 11 + db/schema.sql | 57 ++++ .../erc20_watcher/every_block/fetcher_test.go | 2 +- .../every_block/integration_test.go | 2 +- .../erc20_watcher/every_block/transformers.go | 2 +- examples/mocks/mocks.go | 8 + examples/test_helpers/database.go | 15 -- integration_test/block_rewards_test.go | 4 +- integration_test/contract_test.go | 6 +- integration_test/geth_blockchain_test.go | 4 +- pkg/core/blockchain.go | 1 + pkg/core/header.go | 7 + pkg/datastore/inmemory/header_repository.go | 30 +++ pkg/datastore/inmemory/in_memory.go | 16 +- .../repositories/header_repository.go | 80 ++++++ .../repositories/header_repository_test.go | 248 ++++++++++++++++++ pkg/datastore/repository.go | 8 +- pkg/fakes/blockchain.go | 75 ++++-- pkg/geth/blockchain.go | 61 ++--- pkg/geth/contract.go | 8 +- pkg/geth/converters/common/block_converter.go | 1 + .../converters/common/header_converter.go | 24 ++ .../common/header_converter_test.go | 47 ++++ pkg/history/block_validator.go | 29 ++ pkg/history/block_validator_test.go | 43 +++ pkg/history/header_validator.go | 27 ++ pkg/history/header_validator_test.go | 39 +++ pkg/history/populate_blocks.go | 2 +- pkg/history/populate_blocks_test.go | 12 +- pkg/history/populate_headers.go | 29 ++ pkg/history/populate_headers_test.go | 52 ++++ pkg/history/validate_blocks.go | 68 ----- pkg/history/validate_blocks_test.go | 86 ------ pkg/history/validation_window.go | 41 +++ pkg/history/validation_window_test.go | 53 ++++ test_config/test_config.go | 7 +- 43 files changed, 1092 insertions(+), 272 deletions(-) create mode 100644 cmd/lightSync.go create mode 100644 db/migrations/1531758090_create_headers_table.down.sql create mode 100644 db/migrations/1531758090_create_headers_table.up.sql create mode 100644 pkg/core/header.go create mode 100644 pkg/datastore/inmemory/header_repository.go create mode 100644 pkg/datastore/postgres/repositories/header_repository.go create mode 100644 pkg/datastore/postgres/repositories/header_repository_test.go create mode 100644 pkg/geth/converters/common/header_converter.go create mode 100644 pkg/geth/converters/common/header_converter_test.go create mode 100644 pkg/history/block_validator.go create mode 100644 pkg/history/block_validator_test.go create mode 100644 pkg/history/header_validator.go create mode 100644 pkg/history/header_validator_test.go create mode 100644 pkg/history/populate_headers.go create mode 100644 pkg/history/populate_headers_test.go delete mode 100644 pkg/history/validate_blocks.go delete mode 100644 pkg/history/validate_blocks_test.go create mode 100644 pkg/history/validation_window.go create mode 100644 pkg/history/validation_window_test.go diff --git a/README.md b/README.md index 65b41dd3..05d450eb 100644 --- a/README.md +++ b/README.md @@ -54,9 +54,10 @@ Vulcanize DB is a set of tools that make it easier for developers to write appli - See `environments/infura.toml` to configure commands to run against infura, if a local node is unavailable ## Start syncing with postgres -Syncs VulcanizeDB with the configured Ethereum node. -1. Start node (**if fast syncing wait for initial sync to finish**) -1. In a separate terminal start vulcanize_db +Syncs VulcanizeDB with the configured Ethereum node, populating blocks, transactions, receipts, and logs. +This command is useful when you want to maintain a broad cache of what's happening on the blockchain. +1. Start Ethereum node (**if fast syncing your Ethereum node, wait for initial sync to finish**) +1. In a separate terminal start VulcanizeDB: - `./vulcanizedb sync --config --starting-block-number ` ## Alternatively, sync from Geth's underlying LevelDB @@ -69,6 +70,13 @@ Sync VulcanizeDB from the LevelDB underlying a Geth node. - `--ending-block-number`/`-e`: block number to sync to - `--all`/`-a`: sync all missing blocks +## Alternatively, sync in "light" mode +Syncs VulcanizeDB with the configured Ethereum node, populating only block headers. +This command is useful when you want a minimal baseline from which to track targeted data on the blockchain (e.g. individual smart contract storage values). +1. Start Ethereum node +2. In a separate terminal start VulcanizeDB: + - `./vulcanizedb lightSync --config --starting-block-number ` + ## Start full environment in docker by single command ### Geth Rinkeby diff --git a/cmd/coldImport.go b/cmd/coldImport.go index afb86ffa..ee00e04d 100644 --- a/cmd/coldImport.go +++ b/cmd/coldImport.go @@ -1,4 +1,4 @@ -// Copyright © 2018 Rob Mulholand +// Copyright © 2018 Vulcanize // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -44,8 +44,8 @@ Geth must be synced over all of the desired blocks and must not be running in or func init() { rootCmd.AddCommand(coldImportCmd) - coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import.") - coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import.") + coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "BlockNumber for first block to cold import.") + coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "BlockNumber for last block to cold import.") coldImportCmd.Flags().BoolVarP(&syncAll, "all", "a", false, "Option to sync all missing blocks.") } diff --git a/cmd/erc20.go b/cmd/erc20.go index 6bc3f375..e1d16531 100644 --- a/cmd/erc20.go +++ b/cmd/erc20.go @@ -49,7 +49,7 @@ Expects an ethereum node to be running and requires a .toml config file: func watchERC20s() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - blockchain := geth.NewBlockchain(ipc) + blockchain := geth.NewBlockChain(ipc) db, err := postgres.NewDB(databaseConfig, blockchain.Node()) if err != nil { log.Fatal("Failed to initialize database.") diff --git a/cmd/lightSync.go b/cmd/lightSync.go new file mode 100644 index 00000000..f59beb5a --- /dev/null +++ b/cmd/lightSync.go @@ -0,0 +1,91 @@ +// Copyright © 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/geth" + "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/utils" + "log" + "os" + "time" +) + +// lightSyncCmd represents the lightSync command +var lightSyncCmd = &cobra.Command{ + Use: "lightSync", + Short: "Syncs VulcanizeDB with local ethereum node's block headers", + Long: `Syncs VulcanizeDB with local ethereum node. Populates +Postgres with block headers. + +./vulcanizedb lightSync --starting-block-number 0 --config public.toml + +Expects ethereum node to be running and requires a .toml config: + + [database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + + [client] + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" +`, + Run: func(cmd *cobra.Command, args []string) { + lightSync() + }, +} + +func init() { + rootCmd.AddCommand(lightSyncCmd) + lightSyncCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from") +} + +func backFillAllHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { + missingBlocksPopulated <- history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber) +} + +func lightSync() { + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + blockChain := geth.NewBlockChain(ipc) + + lastBlock := blockChain.LastBlock().Int64() + if lastBlock == 0 { + log.Fatal("geth initial: state sync not finished") + } + if startingBlockNumber > lastBlock { + log.Fatal("starting block number > current block number") + } + + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + headerRepository := repositories.NewHeaderRepository(&db) + validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow) + missingBlocksPopulated := make(chan int) + go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) + + for { + select { + case <-ticker.C: + window := validator.ValidateHeaders() + window.Log(os.Stdout) + case <-missingBlocksPopulated: + go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber) + } + } +} diff --git a/cmd/root.go b/cmd/root.go index 45a633ca..c0336edc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,3 +1,17 @@ +// Copyright © 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package cmd import ( diff --git a/cmd/sync.go b/cmd/sync.go index 42a2e111..115c864f 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -1,3 +1,17 @@ +// Copyright © 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package cmd import ( @@ -19,9 +33,11 @@ import ( // syncCmd represents the sync command var syncCmd = &cobra.Command{ Use: "sync", - Short: "Syncs vulcanizedb with local ethereum node", - Long: `Syncs vulcanizedb with local ethereum node. -vulcanizedb sync --starting-block-number 0 --config public.toml + Short: "Syncs VulcanizeDB with local ethereum node", + Long: `Syncs VulcanizeDB with local ethereum node. Populates +Postgres with blocks, transactions, receipts, and logs. + +./vulcanizedb sync --starting-block-number 0 --config public.toml Expects ethereum node to be running and requires a .toml config: @@ -39,7 +55,8 @@ Expects ethereum node to be running and requires a .toml config: } const ( - pollingInterval = 7 * time.Second + pollingInterval = 7 * time.Second + validationWindow = 15 ) func init() { @@ -55,7 +72,7 @@ func backFillAllBlocks(blockchain core.Blockchain, blockRepository datastore.Blo func sync() { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() - blockchain := geth.NewBlockchain(ipc) + blockchain := geth.NewBlockChain(ipc) lastBlock := blockchain.LastBlock().Int64() if lastBlock == 0 { @@ -67,7 +84,7 @@ func sync() { db := utils.LoadPostgres(databaseConfig, blockchain.Node()) blockRepository := repositories.NewBlockRepository(&db) - validator := history.NewBlockValidator(blockchain, blockRepository, 15) + validator := history.NewBlockValidator(blockchain, blockRepository, validationWindow) missingBlocksPopulated := make(chan int) go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber) @@ -75,7 +92,7 @@ func sync() { select { case <-ticker.C: window := validator.ValidateBlocks() - validator.Log(os.Stdout, window) + window.Log(os.Stdout) case <-missingBlocksPopulated: go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber) } diff --git a/db/migrations/1531758090_create_headers_table.down.sql b/db/migrations/1531758090_create_headers_table.down.sql new file mode 100644 index 00000000..d9b283f9 --- /dev/null +++ b/db/migrations/1531758090_create_headers_table.down.sql @@ -0,0 +1 @@ +DROP TABLE public.headers; \ No newline at end of file diff --git a/db/migrations/1531758090_create_headers_table.up.sql b/db/migrations/1531758090_create_headers_table.up.sql new file mode 100644 index 00000000..31876183 --- /dev/null +++ b/db/migrations/1531758090_create_headers_table.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE public.headers ( + id SERIAL PRIMARY KEY, + hash VARCHAR(66), + block_number BIGINT, + raw bytea, + eth_node_id INTEGER, + eth_node_fingerprint VARCHAR(128), + CONSTRAINT eth_nodes_fk FOREIGN KEY (eth_node_id) + REFERENCES eth_nodes (id) + ON DELETE CASCADE +); \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index c0599805..b9e03a52 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -121,6 +121,40 @@ CREATE TABLE public.eth_nodes ( ); +-- +-- Name: headers; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.headers ( + id integer NOT NULL, + hash character varying(66), + block_number bigint, + raw bytea, + eth_node_id integer, + eth_node_fingerprint character varying(128) +); + + +-- +-- Name: headers_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.headers_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: headers_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.headers_id_seq OWNED BY public.headers.id; + + -- -- Name: log_filters; Type: TABLE; Schema: public; Owner: - -- @@ -385,6 +419,13 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block ALTER TABLE ONLY public.eth_nodes ALTER COLUMN id SET DEFAULT nextval('public.nodes_id_seq'::regclass); +-- +-- Name: headers id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.headers_id_seq'::regclass); + + -- -- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: - -- @@ -459,6 +500,14 @@ ALTER TABLE ONLY public.eth_nodes ADD CONSTRAINT eth_node_uc UNIQUE (genesis_block, network_id, eth_node_id); +-- +-- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.headers + ADD CONSTRAINT headers_pkey PRIMARY KEY (id); + + -- -- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -574,6 +623,14 @@ ALTER TABLE ONLY public.token_supply ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; +-- +-- Name: headers eth_nodes_fk; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.headers + ADD CONSTRAINT eth_nodes_fk FOREIGN KEY (eth_node_id) REFERENCES public.eth_nodes(id) ON DELETE CASCADE; + + -- -- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/examples/erc20_watcher/every_block/fetcher_test.go b/examples/erc20_watcher/every_block/fetcher_test.go index 13813ff6..10b2ac65 100644 --- a/examples/erc20_watcher/every_block/fetcher_test.go +++ b/examples/erc20_watcher/every_block/fetcher_test.go @@ -28,7 +28,7 @@ var _ = Describe("ERC20 Fetcher", func() { blockNumber := int64(5502914) infuraIPC := "https://mainnet.infura.io/J5Vd2fRtGsw0zZ0Ov3BL" - realBlockchain := geth.NewBlockchain(infuraIPC) + realBlockchain := geth.NewBlockChain(infuraIPC) realFetcher := every_block.NewFetcher(realBlockchain) fakeBlockchain := &mocks.Blockchain{} diff --git a/examples/erc20_watcher/every_block/integration_test.go b/examples/erc20_watcher/every_block/integration_test.go index 68afc441..0443d00c 100644 --- a/examples/erc20_watcher/every_block/integration_test.go +++ b/examples/erc20_watcher/every_block/integration_test.go @@ -58,7 +58,7 @@ var _ = Describe("Everyblock transformers", func() { }) It("creates a token_supply record for each block in the given range", func() { - initializer := every_block.TokenSupplyTransformerInitializer{erc20_watcher.DaiConfig} + initializer := every_block.TokenSupplyTransformerInitializer{Config: erc20_watcher.DaiConfig} transformer := initializer.NewTokenSupplyTransformer(db, &blockchain) transformer.Execute() diff --git a/examples/erc20_watcher/every_block/transformers.go b/examples/erc20_watcher/every_block/transformers.go index 2c886375..887610b0 100644 --- a/examples/erc20_watcher/every_block/transformers.go +++ b/examples/erc20_watcher/every_block/transformers.go @@ -15,8 +15,8 @@ package every_block import ( - "github.com/vulcanize/vulcanizedb/libraries/shared" "github.com/vulcanize/vulcanizedb/examples/erc20_watcher" + "github.com/vulcanize/vulcanizedb/libraries/shared" ) func TransformerInitializers() []shared.TransformerInitializer { diff --git a/examples/mocks/mocks.go b/examples/mocks/mocks.go index c7fba520..307986dc 100644 --- a/examples/mocks/mocks.go +++ b/examples/mocks/mocks.go @@ -116,6 +116,10 @@ type Blockchain struct { lastBlock *big.Int } +func (fb *Blockchain) GetHeaderByNumber(blockNumber int64) (core.Header, error) { + panic("implement me") +} + func (fb *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { fb.FetchedAbi = abiJSON fb.FetchedContractAddress = address @@ -150,6 +154,10 @@ type FailureBlockchain struct { lastBlock *big.Int } +func (fb FailureBlockchain) GetHeaderByNumber(blockNumber int64) (core.Header, error) { + panic("implement me") +} + func (FailureBlockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { return errors.New("TestError") } diff --git a/examples/test_helpers/database.go b/examples/test_helpers/database.go index 8c10f064..d4d5e9ad 100644 --- a/examples/test_helpers/database.go +++ b/examples/test_helpers/database.go @@ -35,21 +35,6 @@ type TransferDBRow struct { VulcanizeLogID int64 `db:"vulcanize_log_id"` } -func CreateLogRecord(db *postgres.DB, logRepository repositories.LogRepository, log core.Log) { - blockRepository := repositories.NewBlockRepository(db) - receiptRepository := repositories.ReceiptRepository{DB: db} - - blockNumber := log.BlockNumber - blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber}) - Expect(err).NotTo(HaveOccurred()) - - receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) - Expect(err).NotTo(HaveOccurred()) - - err = logRepository.CreateLogs([]core.Log{log}, receiptId) - Expect(err).NotTo(HaveOccurred()) -} - func CreateNewDatabase() *postgres.DB { var node core.Node node = core.Node{ diff --git a/integration_test/block_rewards_test.go b/integration_test/block_rewards_test.go index 0fd7b65b..55b05bd9 100644 --- a/integration_test/block_rewards_test.go +++ b/integration_test/block_rewards_test.go @@ -10,14 +10,14 @@ import ( var _ = Describe("Rewards calculations", func() { It("calculates a block reward for a real block", func() { - blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath) block, err := blockchain.GetBlockByNumber(1071819) Expect(err).ToNot(HaveOccurred()) Expect(block.Reward).To(Equal(5.31355)) }) It("calculates an uncle reward for a real block", func() { - blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath) block, err := blockchain.GetBlockByNumber(1071819) Expect(err).ToNot(HaveOccurred()) Expect(block.UnclesReward).To(Equal(6.875)) diff --git a/integration_test/contract_test.go b/integration_test/contract_test.go index 2844c9f4..8b55042d 100644 --- a/integration_test/contract_test.go +++ b/integration_test/contract_test.go @@ -27,7 +27,7 @@ var _ = Describe("Reading contracts", func() { }, Index: 19, Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"} - blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath) contract := testing.SampleContract() logs, err := blockchain.GetLogs(contract, big.NewInt(4703824), nil) @@ -39,7 +39,7 @@ var _ = Describe("Reading contracts", func() { }) It("returns and empty log array when no events for a given block / contract combo", func() { - blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath) logs, err := blockchain.GetLogs(core.Contract{Hash: "x123"}, big.NewInt(4703824), nil) @@ -52,7 +52,7 @@ var _ = Describe("Reading contracts", func() { Describe("Fetching Contract data", func() { It("returns the correct attribute for a real contract", func() { - blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath) contract := testing.SampleContract() var balance = new(big.Int) diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index 598837cb..bd8dafd3 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -12,11 +12,11 @@ import ( var _ = Describe("Reading from the Geth blockchain", func() { - var blockchain *geth.Blockchain + var blockchain *geth.BlockChain var inMemory *inmemory.InMemory BeforeEach(func() { - blockchain = geth.NewBlockchain(test_config.InfuraClient.IPCPath) + blockchain = geth.NewBlockChain(test_config.InfuraClient.IPCPath) inMemory = inmemory.NewInMemory() }) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e67af221..07b6e3fc 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -5,6 +5,7 @@ import "math/big" type Blockchain interface { ContractDataFetcher GetBlockByNumber(blockNumber int64) (Block, error) + GetHeaderByNumber(blockNumber int64) (Header, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) LastBlock() *big.Int Node() Node diff --git a/pkg/core/header.go b/pkg/core/header.go new file mode 100644 index 00000000..21335b6f --- /dev/null +++ b/pkg/core/header.go @@ -0,0 +1,7 @@ +package core + +type Header struct { + BlockNumber int64 `db:"block_number"` + Hash string + Raw []byte +} diff --git a/pkg/datastore/inmemory/header_repository.go b/pkg/datastore/inmemory/header_repository.go new file mode 100644 index 00000000..ba78a1d5 --- /dev/null +++ b/pkg/datastore/inmemory/header_repository.go @@ -0,0 +1,30 @@ +package inmemory + +import "github.com/vulcanize/vulcanizedb/pkg/core" + +type HeaderRepository struct { + memory *InMemory +} + +func NewHeaderRepository(memory *InMemory) *HeaderRepository { + return &HeaderRepository{memory: memory} +} + +func (repository *HeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) { + repository.memory.headers[header.BlockNumber] = header + return 0, nil +} + +func (repository *HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { + return repository.memory.headers[blockNumber], nil +} + +func (repository *HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { + missingNumbers := []int64{} + for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { + if _, ok := repository.memory.headers[blockNumber]; !ok { + missingNumbers = append(missingNumbers, blockNumber) + } + } + return missingNumbers +} diff --git a/pkg/datastore/inmemory/in_memory.go b/pkg/datastore/inmemory/in_memory.go index 36e272f5..d7d9d87a 100644 --- a/pkg/datastore/inmemory/in_memory.go +++ b/pkg/datastore/inmemory/in_memory.go @@ -10,21 +10,23 @@ const ( ) type InMemory struct { - blocks map[int64]core.Block - receipts map[string]core.Receipt - contracts map[string]core.Contract - logs map[string][]core.Log - logFilters map[string]filters.LogFilter CreateOrUpdateBlockCallCount int + blocks map[int64]core.Block + contracts map[string]core.Contract + headers map[int64]core.Header + logFilters map[string]filters.LogFilter + logs map[string][]core.Log + receipts map[string]core.Receipt } func NewInMemory() *InMemory { return &InMemory{ CreateOrUpdateBlockCallCount: 0, blocks: make(map[int64]core.Block), - receipts: make(map[string]core.Receipt), contracts: make(map[string]core.Contract), - logs: make(map[string][]core.Log), + headers: make(map[int64]core.Header), logFilters: make(map[string]filters.LogFilter), + logs: make(map[string][]core.Log), + receipts: make(map[string]core.Receipt), } } diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go new file mode 100644 index 00000000..7aa4deac --- /dev/null +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -0,0 +1,80 @@ +package repositories + +import ( + "database/sql" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type HeaderRepository struct { + database *postgres.DB +} + +func NewHeaderRepository(database *postgres.DB) HeaderRepository { + return HeaderRepository{database: database} +} + +func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) { + hash, err := repository.getHeaderHash(header) + if err != nil { + if headerDoesNotExist(err) { + return repository.insertHeader(header) + } + return 0, err + } + if headerMustBeReplaced(hash, header) { + return repository.replaceHeader(header) + } + return 0, err +} + +func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { + var header core.Header + err := repository.database.Get(&header, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, + blockNumber, repository.database.Node.ID) + return header, err +} + +func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 { + numbers := make([]int64, 0) + repository.database.Select(&numbers, `SELECT all_block_numbers + FROM ( + SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series + WHERE all_block_numbers NOT IN ( + SELECT block_number FROM headers WHERE eth_node_fingerprint = $3 + ) `, + startingBlockNumber, endingBlockNumber, nodeID) + return numbers +} + +func headerMustBeReplaced(hash string, header core.Header) bool { + return hash != header.Hash +} + +func headerDoesNotExist(err error) bool { + return err == sql.ErrNoRows +} + +func (repository HeaderRepository) getHeaderHash(header core.Header) (string, error) { + var hash string + err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, + header.BlockNumber, repository.database.Node.ID) + return hash, err +} + +func (repository HeaderRepository) insertHeader(header core.Header) (int64, error) { + var headerId int64 + err := repository.database.QueryRowx( + `INSERT INTO public.headers (block_number, hash, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3, $4, $5) RETURNING id`, + header.BlockNumber, header.Hash, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId) + return headerId, err +} + +func (repository HeaderRepository) replaceHeader(header core.Header) (int64, error) { + _, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, + header.BlockNumber, repository.database.Node.ID) + if err != nil { + return 0, err + } + return repository.insertHeader(header) +} diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go new file mode 100644 index 00000000..49a74af3 --- /dev/null +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -0,0 +1,248 @@ +package repositories_test + +import ( + "database/sql" + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Block header repository", func() { + Describe("creating or updating a header", func() { + + It("adds a header", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + + _, err := repo.CreateOrUpdateHeader(header) + + Expect(err).NotTo(HaveOccurred()) + var dbHeader core.Header + err = db.Get(&dbHeader, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(dbHeader.BlockNumber).To(Equal(header.BlockNumber)) + Expect(dbHeader.Hash).To(Equal(header.Hash)) + Expect(dbHeader.Raw).To(Equal(header.Raw)) + }) + + It("adds node data to header", func() { + node := core.Node{ID: "EthNodeFingerprint"} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{BlockNumber: 100} + + _, err := repo.CreateOrUpdateHeader(header) + + Expect(err).NotTo(HaveOccurred()) + var ethNodeId int64 + err = db.Get(ðNodeId, `SELECT eth_node_id FROM public.headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(ethNodeId).To(Equal(db.NodeID)) + var ethNodeFingerprint string + err = db.Get(ðNodeFingerprint, `SELECT eth_node_fingerprint FROM public.headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(ethNodeFingerprint).To(Equal(db.Node.ID)) + }) + + It("does not duplicate headers", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + + _, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + + var dbHeaders []core.Header + err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(len(dbHeaders)).To(Equal(1)) + }) + + It("replaces header if hash is different", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + _, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + headerTwo := core.Header{ + BlockNumber: header.BlockNumber, + Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(), + Raw: []byte{5, 4, 3, 2, 1}, + } + + _, err = repo.CreateOrUpdateHeader(headerTwo) + + Expect(err).NotTo(HaveOccurred()) + var dbHeader core.Header + err = db.Get(&dbHeader, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(dbHeader.Hash).To(Equal(headerTwo.Hash)) + Expect(dbHeader.Raw).To(Equal(headerTwo.Raw)) + }) + + It("does not replace header if node fingerprint is different", func() { + node := core.Node{ID: "Fingerprint"} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + _, err := repo.CreateOrUpdateHeader(header) + nodeTwo := core.Node{ID: "FingerprintTwo"} + dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) + Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) + headerTwo := core.Header{ + BlockNumber: header.BlockNumber, + Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(), + Raw: []byte{5, 4, 3, 2, 1}, + } + + _, err = repoTwo.CreateOrUpdateHeader(headerTwo) + + Expect(err).NotTo(HaveOccurred()) + var dbHeaders []core.Header + err = dbTwo.Select(&dbHeaders, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(len(dbHeaders)).To(Equal(2)) + }) + + It("only replaces header with matching node fingerprint", func() { + node := core.Node{ID: "Fingerprint"} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + _, err := repo.CreateOrUpdateHeader(header) + nodeTwo := core.Node{ID: "FingerprintTwo"} + dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) + Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) + headerTwo := core.Header{ + BlockNumber: header.BlockNumber, + Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(), + Raw: []byte{5, 4, 3, 2, 1}, + } + _, err = repoTwo.CreateOrUpdateHeader(headerTwo) + headerThree := core.Header{ + BlockNumber: header.BlockNumber, + Hash: common.BytesToHash([]byte{1, 1, 1, 1, 1}).Hex(), + Raw: []byte{1, 1, 1, 1, 1}, + } + + _, err = repoTwo.CreateOrUpdateHeader(headerThree) + + Expect(err).NotTo(HaveOccurred()) + var dbHeaders []core.Header + err = dbTwo.Select(&dbHeaders, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(len(dbHeaders)).To(Equal(2)) + Expect(dbHeaders[0].Hash).To(Or(Equal(header.Hash), Equal(headerThree.Hash))) + Expect(dbHeaders[1].Hash).To(Or(Equal(header.Hash), Equal(headerThree.Hash))) + Expect(dbHeaders[0].Raw).To(Or(Equal(header.Raw), Equal(headerThree.Raw))) + Expect(dbHeaders[1].Raw).To(Or(Equal(header.Raw), Equal(headerThree.Raw))) + }) + }) + + Describe("Getting a header", func() { + It("returns header if it exists", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + _, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + + dbHeader, err := repo.GetHeader(header.BlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(dbHeader).To(Equal(header)) + }) + + It("does not return header for a different node fingerprint", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + header := core.Header{ + BlockNumber: 100, + Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(), + Raw: []byte{1, 2, 3, 4, 5}, + } + _, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + nodeTwo := core.Node{ID: "NodeFingerprintTwo"} + dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) + Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) + + _, err = repoTwo.GetHeader(header.BlockNumber) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(sql.ErrNoRows)) + }) + }) + + Describe("Getting missing headers", func() { + It("returns block numbers for headers not in the database", func() { + node := core.Node{} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 1}) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 3}) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 5}) + + missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID) + + Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4})) + }) + + It("does not count headers created by a different node fingerprint", func() { + node := core.Node{ID: "NodeFingerprint"} + db := test_config.NewTestDB(node) + repo := repositories.NewHeaderRepository(db) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 1}) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 3}) + repo.CreateOrUpdateHeader(core.Header{BlockNumber: 5}) + nodeTwo := core.Node{ID: "NodeFingerprintTwo"} + dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo) + Expect(err).NotTo(HaveOccurred()) + repoTwo := repositories.NewHeaderRepository(dbTwo) + + missingBlockNumbers := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID) + + Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5})) + }) + }) +}) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 4b9592e8..fd8b59cd 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -14,7 +14,7 @@ var ErrBlockDoesNotExist = func(blockNumber int64) error { type BlockRepository interface { CreateOrUpdateBlock(block core.Block) (int64, error) GetBlock(blockNumber int64) (core.Block, error) - MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64 + MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 SetBlocksStatus(chainHead int64) } @@ -37,6 +37,12 @@ type FilterRepository interface { GetFilter(name string) (filters.LogFilter, error) } +type HeaderRepository interface { + CreateOrUpdateHeader(header core.Header) (int64, error) + GetHeader(blockNumber int64) (core.Header, error) + MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 +} + type LogRepository interface { CreateLogs(logs []core.Log, receiptId int64) error GetLogs(address string, blockNumber int64) []core.Log diff --git a/pkg/fakes/blockchain.go b/pkg/fakes/blockchain.go index ffc8d1ab..8032f3de 100644 --- a/pkg/fakes/blockchain.go +++ b/pkg/fakes/blockchain.go @@ -6,28 +6,33 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -type Blockchain struct { - logs map[string][]core.Log - blocks map[int64]core.Block - contractAttributes map[string]map[string]string - blocksChannel chan core.Block - WasToldToStop bool - node core.Node +type BlockChain struct { ContractReturnValue []byte + WasToldToStop bool + blocks map[int64]core.Block + blocksChannel chan core.Block + contractAttributes map[string]map[string]string err error + headers map[int64]core.Header + logs map[string][]core.Log + node core.Node } -func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { +func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (core.Header, error) { + return blockChain.headers[blockNumber], nil +} + +func (blockChain *BlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { panic("implement me") } -func (blockchain *Blockchain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { - return blockchain.ContractReturnValue, nil +func (blockChain *BlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { + return blockChain.ContractReturnValue, nil } -func (blockchain *Blockchain) LastBlock() *big.Int { +func (blockChain *BlockChain) LastBlock() *big.Int { var max int64 - for blockNumber := range blockchain.blocks { + for blockNumber := range blockChain.blocks { if blockNumber > max { max = blockNumber } @@ -35,16 +40,16 @@ func (blockchain *Blockchain) LastBlock() *big.Int { return big.NewInt(max) } -func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlock *big.Int, endingBlock *big.Int) ([]core.Log, error) { - return blockchain.logs[contract.Hash], nil +func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlock *big.Int, endingBlock *big.Int) ([]core.Log, error) { + return blockChain.logs[contract.Hash], nil } -func (blockchain *Blockchain) Node() core.Node { - return blockchain.node +func (blockChain *BlockChain) Node() core.Node { + return blockChain.node } -func NewBlockchain(err error) *Blockchain { - return &Blockchain{ +func NewBlockchain(err error) *BlockChain { + return &BlockChain{ blocks: make(map[int64]core.Block), logs: make(map[string][]core.Log), contractAttributes: make(map[string]map[string]string), @@ -53,24 +58,40 @@ func NewBlockchain(err error) *Blockchain { } } -func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain { +func NewBlockchainWithBlocks(blocks []core.Block) *BlockChain { blockNumberToBlocks := make(map[int64]core.Block) for _, block := range blocks { blockNumberToBlocks[block.Number] = block } - return &Blockchain{ + return &BlockChain{ blocks: blockNumberToBlocks, } } -func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) { - if blockchain.err != nil { - return core.Block{}, blockchain.err +func NewBlockChainWithHeaders(headers []core.Header) *BlockChain { + // need to create blocks and headers so that LastBlock() will work in the mock + // no reason to implement LastBlock() separately for headers since it checks + // the last header in the Node's DB already + memoryBlocks := make(map[int64]core.Block) + memoryHeaders := make(map[int64]core.Header) + for _, header := range headers { + memoryBlocks[header.BlockNumber] = core.Block{Number: header.BlockNumber} + memoryHeaders[header.BlockNumber] = header + } + return &BlockChain{ + blocks: memoryBlocks, + headers: memoryHeaders, } - return blockchain.blocks[blockNumber], nil } -func (blockchain *Blockchain) AddBlock(block core.Block) { - blockchain.blocks[block.Number] = block - blockchain.blocksChannel <- block +func (blockChain *BlockChain) GetBlockByNumber(blockNumber int64) (core.Block, error) { + if blockChain.err != nil { + return core.Block{}, blockChain.err + } + return blockChain.blocks[blockNumber], nil +} + +func (blockChain *BlockChain) AddBlock(block core.Block) { + blockChain.blocks[block.Number] = block + blockChain.blocksChannel <- block } diff --git a/pkg/geth/blockchain.go b/pkg/geth/blockchain.go index 0d10b7c3..ecc477e7 100644 --- a/pkg/geth/blockchain.go +++ b/pkg/geth/blockchain.go @@ -1,13 +1,11 @@ package geth import ( - "math/big" - "log" + "math/big" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "golang.org/x/net/context" @@ -18,31 +16,30 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/geth/node" ) -type Blockchain struct { - client *ethclient.Client - blockConverter vulcCommon.BlockConverter - readGethHeaders chan *types.Header - outputBlocks chan core.Block - newHeadSubscription ethereum.Subscription - node core.Node +type BlockChain struct { + client *ethclient.Client + blockConverter vulcCommon.BlockConverter + headerConverter vulcCommon.HeaderConverter + node core.Node } -func NewBlockchain(ipcPath string) *Blockchain { - blockchain := Blockchain{} +func NewBlockChain(ipcPath string) *BlockChain { rpcClient, err := rpc.Dial(ipcPath) if err != nil { log.Fatal(err) } client := ethclient.NewClient(rpcClient) clientWrapper := node.ClientWrapper{ContextCaller: rpcClient, IPCPath: ipcPath} - blockchain.node = node.MakeNode(clientWrapper) - blockchain.client = client transactionConverter := vulcRpc.NewRpcTransactionConverter(client) - blockchain.blockConverter = vulcCommon.NewBlockConverter(transactionConverter) - return &blockchain + return &BlockChain{ + client: client, + blockConverter: vulcCommon.NewBlockConverter(transactionConverter), + headerConverter: vulcCommon.HeaderConverter{}, + node: node.MakeNode(clientWrapper), + } } -func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]core.Log, error) { +func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) { if endingBlockNumber == nil { endingBlockNumber = startingBlockNumber } @@ -52,7 +49,7 @@ func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumbe ToBlock: endingBlockNumber, Addresses: []common.Address{contractAddress}, } - gethLogs, err := blockchain.client.FilterLogs(context.Background(), fc) + gethLogs, err := blockChain.client.FilterLogs(context.Background(), fc) if err != nil { return []core.Log{}, err } @@ -60,23 +57,27 @@ func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumbe return logs, nil } -func (blockchain *Blockchain) Node() core.Node { - return blockchain.node +func (blockChain *BlockChain) Node() core.Node { + return blockChain.node } -func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) { - gethBlock, err := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber)) +func (blockChain *BlockChain) GetBlockByNumber(blockNumber int64) (block core.Block, err error) { + gethBlock, err := blockChain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber)) if err != nil { - return core.Block{}, err + return block, err } - block, err := blockchain.blockConverter.ToCoreBlock(gethBlock) - if err != nil { - return core.Block{}, err - } - return block, nil + return blockChain.blockConverter.ToCoreBlock(gethBlock) } -func (blockchain *Blockchain) LastBlock() *big.Int { - block, _ := blockchain.client.HeaderByNumber(context.Background(), nil) +func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core.Header, err error) { + gethHeader, err := blockChain.client.HeaderByNumber(context.Background(), big.NewInt(blockNumber)) + if err != nil { + return header, err + } + return blockChain.headerConverter.Convert(gethHeader) +} + +func (blockChain *BlockChain) LastBlock() *big.Int { + block, _ := blockChain.client.HeaderByNumber(context.Background(), nil) return block.Number } diff --git a/pkg/geth/contract.go b/pkg/geth/contract.go index 8813f545..f91b7ee4 100644 --- a/pkg/geth/contract.go +++ b/pkg/geth/contract.go @@ -14,7 +14,7 @@ var ( ErrInvalidStateAttribute = errors.New("invalid state attribute") ) -func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { +func (blockChain *BlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { parsed, err := ParseAbi(abiJSON) if err != nil { return err @@ -28,15 +28,15 @@ func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, if err != nil { return err } - output, err := blockchain.callContract(address, input, big.NewInt(blockNumber)) + output, err := blockChain.callContract(address, input, big.NewInt(blockNumber)) if err != nil { return err } return parsed.Unpack(result, method, output) } -func (blockchain *Blockchain) callContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { +func (blockChain *BlockChain) callContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { to := common.HexToAddress(contractHash) msg := ethereum.CallMsg{To: &to, Data: input} - return blockchain.client.CallContract(context.Background(), msg, blockNumber) + return blockChain.client.CallContract(context.Background(), msg, blockNumber) } diff --git a/pkg/geth/converters/common/block_converter.go b/pkg/geth/converters/common/block_converter.go index 1f7699f5..6e13f474 100644 --- a/pkg/geth/converters/common/block_converter.go +++ b/pkg/geth/converters/common/block_converter.go @@ -20,6 +20,7 @@ func (bc BlockConverter) ToCoreBlock(gethBlock *types.Block) (core.Block, error) if err != nil { return core.Block{}, err } + coreBlock := core.Block{ Difficulty: gethBlock.Difficulty().Int64(), ExtraData: hexutil.Encode(gethBlock.Extra()), diff --git a/pkg/geth/converters/common/header_converter.go b/pkg/geth/converters/common/header_converter.go new file mode 100644 index 00000000..0638f75d --- /dev/null +++ b/pkg/geth/converters/common/header_converter.go @@ -0,0 +1,24 @@ +package common + +import ( + "bytes" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +type HeaderConverter struct{} + +func (converter HeaderConverter) Convert(gethHeader *types.Header) (core.Header, error) { + writer := new(bytes.Buffer) + err := rlp.Encode(writer, &gethHeader) + if err != nil { + panic(err) + } + coreHeader := core.Header{ + Hash: gethHeader.Hash().Hex(), + BlockNumber: gethHeader.Number.Int64(), + Raw: writer.Bytes(), + } + return coreHeader, nil +} diff --git a/pkg/geth/converters/common/header_converter_test.go b/pkg/geth/converters/common/header_converter_test.go new file mode 100644 index 00000000..76ef4fcc --- /dev/null +++ b/pkg/geth/converters/common/header_converter_test.go @@ -0,0 +1,47 @@ +package common_test + +import ( + "bytes" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + common2 "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" + "math/big" +) + +var _ = Describe("Block header converter", func() { + It("converts geth header to core header", func() { + gethHeader := &types.Header{ + Difficulty: big.NewInt(1), + Number: big.NewInt(2), + ParentHash: common.HexToHash("0xParent"), + ReceiptHash: common.HexToHash("0xReceipt"), + Root: common.HexToHash("0xRoot"), + Time: big.NewInt(3), + TxHash: common.HexToHash("0xTransaction"), + UncleHash: common.HexToHash("0xUncle"), + } + converter := common2.HeaderConverter{} + + coreHeader, err := converter.Convert(gethHeader) + + Expect(err).NotTo(HaveOccurred()) + Expect(coreHeader.BlockNumber).To(Equal(gethHeader.Number.Int64())) + Expect(coreHeader.Hash).To(Equal(gethHeader.Hash().Hex())) + }) + + It("includes raw bytes for header", func() { + headerRLP := []byte{249, 2, 23, 160, 180, 251, 173, 248, 234, 69, 43, 19, 151, 24, 226, 112, 13, 193, 19, 92, 252, 129, 20, 80, 49, 200, 75, 122, 178, 124, 215, 16, 57, 79, 123, 56, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 148, 42, 101, 172, 164, 213, 252, 91, 92, 133, 144, 144, 166, 195, 77, 22, 65, 53, 57, 130, 38, 160, 14, 6, 111, 60, 34, 151, 165, 203, 48, 5, 147, 5, 38, 23, 209, 188, 165, 148, 111, 12, 170, 6, 53, 253, 177, 184, 90, 199, 229, 35, 111, 52, 160, 101, 186, 136, 127, 203, 8, 38, 246, 22, 208, 31, 115, 108, 29, 45, 103, 123, 202, 189, 226, 247, 252, 37, 170, 145, 207, 188, 11, 59, 173, 92, 179, 160, 32, 227, 83, 69, 64, 202, 241, 99, 120, 230, 232, 106, 43, 241, 35, 109, 159, 135, 109, 50, 24, 251, 192, 57, 88, 230, 219, 28, 99, 75, 35, 51, 185, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 134, 11, 105, 222, 129, 162, 43, 131, 15, 66, 64, 131, 47, 239, 216, 130, 196, 68, 132, 86, 191, 180, 21, 152, 215, 131, 1, 3, 3, 132, 71, 101, 116, 104, 135, 103, 111, 49, 46, 53, 46, 49, 133, 108, 105, 110, 117, 120, 160, 146, 196, 18, 154, 10, 226, 54, 27, 69, 42, 158, 222, 236, 229, 92, 18, 236, 238, 171, 134, 99, 22, 25, 94, 61, 135, 252, 27, 0, 91, 102, 69, 136, 205, 76, 85, 185, 65, 207, 144, 21} + var gethHeader types.Header + err := rlp.Decode(bytes.NewReader(headerRLP), &gethHeader) + Expect(err).NotTo(HaveOccurred()) + converter := common2.HeaderConverter{} + + coreHeader, err := converter.Convert(&gethHeader) + + Expect(err).NotTo(HaveOccurred()) + Expect(coreHeader.Raw).To(Equal(headerRLP)) + }) +}) diff --git a/pkg/history/block_validator.go b/pkg/history/block_validator.go new file mode 100644 index 00000000..0166f2e2 --- /dev/null +++ b/pkg/history/block_validator.go @@ -0,0 +1,29 @@ +package history + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore" +) + +type BlockValidator struct { + blockchain core.Blockchain + blockRepository datastore.BlockRepository + windowSize int +} + +func NewBlockValidator(blockchain core.Blockchain, blockRepository datastore.BlockRepository, windowSize int) *BlockValidator { + return &BlockValidator{ + blockchain: blockchain, + blockRepository: blockRepository, + windowSize: windowSize, + } +} + +func (bv BlockValidator) ValidateBlocks() ValidationWindow { + window := MakeValidationWindow(bv.blockchain, bv.windowSize) + blockNumbers := MakeRange(window.LowerBound, window.UpperBound) + RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) + lastBlock := bv.blockchain.LastBlock().Int64() + bv.blockRepository.SetBlocksStatus(lastBlock) + return window +} diff --git a/pkg/history/block_validator_test.go b/pkg/history/block_validator_test.go new file mode 100644 index 00000000..adc04aa5 --- /dev/null +++ b/pkg/history/block_validator_test.go @@ -0,0 +1,43 @@ +package history_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/history" +) + +var _ = Describe("Blocks validator", func() { + + It("calls create or update for all blocks within the window", func() { + blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ + {Number: 4}, + {Number: 5}, + {Number: 6}, + {Number: 7}, + }) + inMemoryDB := inmemory.NewInMemory() + blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB} + validator := history.NewBlockValidator(blockchain, blocksRepository, 2) + + window := validator.ValidateBlocks() + + Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7})) + Expect(blocksRepository.BlockCount()).To(Equal(3)) + Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(3)) + }) + + It("returns the number of largest block", func() { + blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ + {Number: 1}, + {Number: 2}, + {Number: 3}, + }) + maxBlockNumber := blockchain.LastBlock() + + Expect(maxBlockNumber.Int64()).To(Equal(int64(3))) + }) +}) diff --git a/pkg/history/header_validator.go b/pkg/history/header_validator.go new file mode 100644 index 00000000..d2b549d9 --- /dev/null +++ b/pkg/history/header_validator.go @@ -0,0 +1,27 @@ +package history + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore" +) + +type HeaderValidator struct { + blockChain core.Blockchain + headerRepository datastore.HeaderRepository + windowSize int +} + +func NewHeaderValidator(blockChain core.Blockchain, repository datastore.HeaderRepository, windowSize int) HeaderValidator { + return HeaderValidator{ + blockChain: blockChain, + headerRepository: repository, + windowSize: windowSize, + } +} + +func (validator HeaderValidator) ValidateHeaders() ValidationWindow { + window := MakeValidationWindow(validator.blockChain, validator.windowSize) + blockNumbers := MakeRange(window.LowerBound, window.UpperBound) + RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) + return window +} diff --git a/pkg/history/header_validator_test.go b/pkg/history/header_validator_test.go new file mode 100644 index 00000000..e96ce268 --- /dev/null +++ b/pkg/history/header_validator_test.go @@ -0,0 +1,39 @@ +package history_test + +import ( + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/history" +) + +var _ = Describe("Header validator", func() { + It("replaces headers in the validation window that have changed", func() { + blockNumber := int64(10) + oldHash := common.HexToHash("0x0987654321").Hex() + oldHeader := core.Header{ + BlockNumber: blockNumber, + Hash: oldHash, + } + inMemory := inmemory.NewInMemory() + headerRepository := inmemory.NewHeaderRepository(inMemory) + headerRepository.CreateOrUpdateHeader(oldHeader) + newHash := common.HexToHash("0x123456789").Hex() + newHeader := core.Header{ + BlockNumber: blockNumber, + Hash: newHash, + } + headers := []core.Header{newHeader} + blockChain := fakes.NewBlockChainWithHeaders(headers) + validator := history.NewHeaderValidator(blockChain, headerRepository, 1) + + validator.ValidateHeaders() + + dbHeader, _ := headerRepository.GetHeader(blockNumber) + Expect(dbHeader.Hash).NotTo(Equal(oldHash)) + Expect(dbHeader.Hash).To(Equal(newHash)) + }) +}) diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index 52106a75..3823a745 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -9,7 +9,7 @@ import ( func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int { lastBlock := blockchain.LastBlock().Int64() - blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1, blockchain.Node().ID) + blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) log.SetPrefix("") log.Printf("Backfilling %d blocks\n\n", len(blockRange)) RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) diff --git a/pkg/history/populate_blocks_test.go b/pkg/history/populate_blocks_test.go index a98c76aa..0ce0e1ed 100644 --- a/pkg/history/populate_blocks_test.go +++ b/pkg/history/populate_blocks_test.go @@ -20,7 +20,7 @@ var _ = Describe("Populating blocks", func() { blockRepository = &inmemory.BlockRepository{InMemory: inMemory} }) - It("fills in the only missing block (Number 1)", func() { + It("fills in the only missing block (BlockNumber 1)", func() { blocks := []core.Block{ {Number: 1}, {Number: 2}, @@ -57,11 +57,12 @@ var _ = Describe("Populating blocks", func() { blockRepository.CreateOrUpdateBlock(core.Block{Number: 9}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 11}) blockRepository.CreateOrUpdateBlock(core.Block{Number: 12}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 13}) blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 5) Expect(blocksAdded).To(Equal(3)) - Expect(blockRepository.BlockCount()).To(Equal(11)) + Expect(blockRepository.BlockCount()).To(Equal(12)) _, err := blockRepository.GetBlock(4) Expect(err).To(HaveOccurred()) _, err = blockRepository.GetBlock(5) @@ -70,7 +71,7 @@ var _ = Describe("Populating blocks", func() { Expect(err).ToNot(HaveOccurred()) _, err = blockRepository.GetBlock(10) Expect(err).ToNot(HaveOccurred()) - _, err = blockRepository.GetBlock(13) + _, err = blockRepository.GetBlock(14) Expect(err).To(HaveOccurred()) }) @@ -98,8 +99,8 @@ var _ = Describe("Populating blocks", func() { }) history.RetrieveAndUpdateBlocks(blockchain, blockRepository, history.MakeRange(2, 5)) - Expect(blockRepository.BlockCount()).To(Equal(3)) - Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3)) + Expect(blockRepository.BlockCount()).To(Equal(4)) + Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(4)) }) It("does not call repository create block when there is an error", func() { @@ -109,5 +110,4 @@ var _ = Describe("Populating blocks", func() { Expect(blockRepository.BlockCount()).To(Equal(0)) Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(0)) }) - }) diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go new file mode 100644 index 00000000..140e81b1 --- /dev/null +++ b/pkg/history/populate_headers.go @@ -0,0 +1,29 @@ +package history + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore" + "log" +) + +func PopulateMissingHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) int { + lastBlock := blockchain.LastBlock().Int64() + blockRange := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) + log.SetPrefix("") + log.Printf("Backfilling %d blocks\n\n", len(blockRange)) + RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange) + return len(blockRange) +} + +func RetrieveAndUpdateHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, blockNumbers []int64) int { + for _, blockNumber := range blockNumbers { + header, err := blockchain.GetHeaderByNumber(blockNumber) + if err != nil { + log.Printf("failed to retrieve block number: %d\n", blockNumber) + return 0 + } + // TODO: handle possible error here + headerRepository.CreateOrUpdateHeader(header) + } + return len(blockNumbers) +} diff --git a/pkg/history/populate_headers_test.go b/pkg/history/populate_headers_test.go new file mode 100644 index 00000000..d8e46bb7 --- /dev/null +++ b/pkg/history/populate_headers_test.go @@ -0,0 +1,52 @@ +package history_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/history" +) + +var _ = Describe("Populating headers", func() { + + var inMemory *inmemory.InMemory + var headerRepository *inmemory.HeaderRepository + + BeforeEach(func() { + inMemory = inmemory.NewInMemory() + headerRepository = inmemory.NewHeaderRepository(inMemory) + }) + + Describe("When 1 missing header", func() { + + It("returns number of headers added", func() { + headers := []core.Header{ + {BlockNumber: 1}, + {BlockNumber: 2}, + } + blockChain := fakes.NewBlockChainWithHeaders(headers) + headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: 2}) + + headersAdded := history.PopulateMissingHeaders(blockChain, headerRepository, 1) + + Expect(headersAdded).To(Equal(1)) + }) + }) + + It("adds missing headers to the db", func() { + headers := []core.Header{ + {BlockNumber: 1}, + {BlockNumber: 2}, + } + blockChain := fakes.NewBlockChainWithHeaders(headers) + dbHeader, _ := headerRepository.GetHeader(1) + Expect(dbHeader.BlockNumber).To(BeZero()) + + history.PopulateMissingHeaders(blockChain, headerRepository, 1) + + dbHeader, _ = headerRepository.GetHeader(1) + Expect(dbHeader.BlockNumber).To(Equal(int64(1))) + }) +}) diff --git a/pkg/history/validate_blocks.go b/pkg/history/validate_blocks.go deleted file mode 100644 index 1f6ecf9e..00000000 --- a/pkg/history/validate_blocks.go +++ /dev/null @@ -1,68 +0,0 @@ -package history - -import ( - "io" - "text/template" - - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore" -) - -const WindowTemplate = `Validating Blocks -|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD) - -` - -var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate)) - -type BlockValidator struct { - blockchain core.Blockchain - blockRepository datastore.BlockRepository - windowSize int - parsedLoggingTemplate template.Template -} - -func NewBlockValidator(blockchain core.Blockchain, blockRepository datastore.BlockRepository, windowSize int) *BlockValidator { - return &BlockValidator{ - blockchain, - blockRepository, - windowSize, - ParsedWindowTemplate, - } -} - -func (bv BlockValidator) ValidateBlocks() ValidationWindow { - window := MakeValidationWindow(bv.blockchain, bv.windowSize) - blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) - lastBlock := bv.blockchain.LastBlock().Int64() - bv.blockRepository.SetBlocksStatus(lastBlock) - return window -} - -func (bv BlockValidator) Log(out io.Writer, window ValidationWindow) { - bv.parsedLoggingTemplate.Execute(out, window) -} - -type ValidationWindow struct { - LowerBound int64 - UpperBound int64 -} - -func (window ValidationWindow) Size() int { - return int(window.UpperBound - window.LowerBound) -} - -func MakeValidationWindow(blockchain core.Blockchain, windowSize int) ValidationWindow { - upperBound := blockchain.LastBlock().Int64() - lowerBound := upperBound - int64(windowSize) - return ValidationWindow{lowerBound, upperBound} -} - -func MakeRange(min, max int64) []int64 { - a := make([]int64, max-min) - for i := range a { - a[i] = min + int64(i) - } - return a -} diff --git a/pkg/history/validate_blocks_test.go b/pkg/history/validate_blocks_test.go deleted file mode 100644 index fc92521e..00000000 --- a/pkg/history/validate_blocks_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package history_test - -import ( - "bytes" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/pkg/history" -) - -var _ = Describe("Blocks validator", func() { - - It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() { - blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ - {Number: 1}, - {Number: 2}, - {Number: 3}, - {Number: 4}, - {Number: 5}, - }) - - validationWindow := history.MakeValidationWindow(blockchain, 2) - - Expect(validationWindow.LowerBound).To(Equal(int64(3))) - Expect(validationWindow.UpperBound).To(Equal(int64(5))) - }) - - It("returns the window size", func() { - window := history.ValidationWindow{LowerBound: 1, UpperBound: 3} - Expect(window.Size()).To(Equal(2)) - }) - - It("calls create or update for all blocks within the window", func() { - blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ - {Number: 4}, - {Number: 5}, - {Number: 6}, - {Number: 7}, - }) - inMemoryDB := inmemory.NewInMemory() - blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB} - - validator := history.NewBlockValidator(blockchain, blocksRepository, 2) - window := validator.ValidateBlocks() - Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7})) - Expect(blocksRepository.BlockCount()).To(Equal(2)) - Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(2)) - }) - - It("logs window message", func() { - inMemoryDB := inmemory.NewInMemory() - blockRepository := &inmemory.BlockRepository{InMemory: inMemoryDB} - - expectedMessage := &bytes.Buffer{} - window := history.ValidationWindow{LowerBound: 5, UpperBound: 7} - history.ParsedWindowTemplate.Execute(expectedMessage, history.ValidationWindow{LowerBound: 5, UpperBound: 7}) - - blockchain := fakes.NewBlockchainWithBlocks([]core.Block{}) - validator := history.NewBlockValidator(blockchain, blockRepository, 2) - actualMessage := &bytes.Buffer{} - validator.Log(actualMessage, window) - Expect(actualMessage).To(Equal(expectedMessage)) - }) - - It("generates a range of int64s", func() { - numberOfBlocksCreated := history.MakeRange(0, 5) - expected := []int64{0, 1, 2, 3, 4} - - Expect(numberOfBlocksCreated).To(Equal(expected)) - }) - - It("returns the number of largest block", func() { - blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ - {Number: 1}, - {Number: 2}, - {Number: 3}, - }) - maxBlockNumber := blockchain.LastBlock() - - Expect(maxBlockNumber.Int64()).To(Equal(int64(3))) - }) - -}) diff --git a/pkg/history/validation_window.go b/pkg/history/validation_window.go new file mode 100644 index 00000000..ee59a6c6 --- /dev/null +++ b/pkg/history/validation_window.go @@ -0,0 +1,41 @@ +package history + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "io" + "text/template" +) + +const WindowTemplate = `Validating Blocks +|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD) + +` + +var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate)) + +type ValidationWindow struct { + LowerBound int64 + UpperBound int64 +} + +func (window ValidationWindow) Size() int { + return int(window.UpperBound - window.LowerBound) +} + +func MakeValidationWindow(blockchain core.Blockchain, windowSize int) ValidationWindow { + upperBound := blockchain.LastBlock().Int64() + lowerBound := upperBound - int64(windowSize) + return ValidationWindow{lowerBound, upperBound} +} + +func MakeRange(min, max int64) []int64 { + a := make([]int64, max-min+1) + for i := range a { + a[i] = min + int64(i) + } + return a +} + +func (window ValidationWindow) Log(out io.Writer) { + ParsedWindowTemplate.Execute(out, window) +} diff --git a/pkg/history/validation_window_test.go b/pkg/history/validation_window_test.go new file mode 100644 index 00000000..07c73dba --- /dev/null +++ b/pkg/history/validation_window_test.go @@ -0,0 +1,53 @@ +package history_test + +import ( + "bytes" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/history" +) + +var _ = Describe("", func() { + It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() { + blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ + {Number: 1}, + {Number: 2}, + {Number: 3}, + {Number: 4}, + {Number: 5}, + }) + + validationWindow := history.MakeValidationWindow(blockchain, 2) + + Expect(validationWindow.LowerBound).To(Equal(int64(3))) + Expect(validationWindow.UpperBound).To(Equal(int64(5))) + }) + + It("returns the window size", func() { + window := history.ValidationWindow{LowerBound: 1, UpperBound: 3} + + Expect(window.Size()).To(Equal(2)) + }) + + It("generates a range of int64s", func() { + numberOfBlocksCreated := history.MakeRange(0, 5) + expected := []int64{0, 1, 2, 3, 4, 5} + + Expect(numberOfBlocksCreated).To(Equal(expected)) + }) + + It("logs window message", func() { + expectedMessage := &bytes.Buffer{} + window := history.ValidationWindow{LowerBound: 5, UpperBound: 7} + history.ParsedWindowTemplate.Execute(expectedMessage, window) + actualMessage := &bytes.Buffer{} + + window.Log(actualMessage) + + Expect(actualMessage).To(Equal(expectedMessage)) + }) +}) diff --git a/test_config/test_config.go b/test_config/test_config.go index b091a0e6..b1a59d1b 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -62,12 +62,13 @@ func setABIPath() { func NewTestDB(node core.Node) *postgres.DB { db, _ := postgres.NewDB(DBConfig, node) - db.MustExec("DELETE FROM watched_contracts") - db.MustExec("DELETE FROM transactions") db.MustExec("DELETE FROM blocks") + db.MustExec("DELETE FROM headers") + db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM logs") db.MustExec("DELETE FROM receipts") - db.MustExec("DELETE FROM log_filters") + db.MustExec("DELETE FROM transactions") + db.MustExec("DELETE FROM watched_contracts") return db }