Add light sync command
- Only syncs block headers (excludes block bodies, transactions, receipts, and logs) - Modifies validation window to include the most recent block - Isolates validation window to the variable defined in the cmd directory (blocks have a separate variable defined in the block_repository for determining when to set a block as final)
This commit is contained in:
parent
a620fc6a51
commit
1355271011
14
README.md
14
README.md
@ -54,9 +54,10 @@ Vulcanize DB is a set of tools that make it easier for developers to write appli
|
||||
- See `environments/infura.toml` to configure commands to run against infura, if a local node is unavailable
|
||||
|
||||
## Start syncing with postgres
|
||||
Syncs VulcanizeDB with the configured Ethereum node.
|
||||
1. Start node (**if fast syncing wait for initial sync to finish**)
|
||||
1. In a separate terminal start vulcanize_db
|
||||
Syncs VulcanizeDB with the configured Ethereum node, populating blocks, transactions, receipts, and logs.
|
||||
This command is useful when you want to maintain a broad cache of what's happening on the blockchain.
|
||||
1. Start Ethereum node (**if fast syncing your Ethereum node, wait for initial sync to finish**)
|
||||
1. In a separate terminal start VulcanizeDB:
|
||||
- `./vulcanizedb sync --config <config.toml> --starting-block-number <block-number>`
|
||||
|
||||
## Alternatively, sync from Geth's underlying LevelDB
|
||||
@ -69,6 +70,13 @@ Sync VulcanizeDB from the LevelDB underlying a Geth node.
|
||||
- `--ending-block-number`/`-e`: block number to sync to
|
||||
- `--all`/`-a`: sync all missing blocks
|
||||
|
||||
## Alternatively, sync in "light" mode
|
||||
Syncs VulcanizeDB with the configured Ethereum node, populating only block headers.
|
||||
This command is useful when you want a minimal baseline from which to track targeted data on the blockchain (e.g. individual smart contract storage values).
|
||||
1. Start Ethereum node
|
||||
2. In a separate terminal start VulcanizeDB:
|
||||
- `./vulcanizedb lightSync --config <config.toml> --starting-block-number <block-number>`
|
||||
|
||||
## Start full environment in docker by single command
|
||||
|
||||
### Geth Rinkeby
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright © 2018 Rob Mulholand <rmulholand@8thlight.com>
|
||||
// Copyright © 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
@ -44,8 +44,8 @@ Geth must be synced over all of the desired blocks and must not be running in or
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(coldImportCmd)
|
||||
coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Number for first block to cold import.")
|
||||
coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "Number for last block to cold import.")
|
||||
coldImportCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "BlockNumber for first block to cold import.")
|
||||
coldImportCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "e", 5500000, "BlockNumber for last block to cold import.")
|
||||
coldImportCmd.Flags().BoolVarP(&syncAll, "all", "a", false, "Option to sync all missing blocks.")
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,7 @@ Expects an ethereum node to be running and requires a .toml config file:
|
||||
func watchERC20s() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
blockchain := geth.NewBlockchain(ipc)
|
||||
blockchain := geth.NewBlockChain(ipc)
|
||||
db, err := postgres.NewDB(databaseConfig, blockchain.Node())
|
||||
if err != nil {
|
||||
log.Fatal("Failed to initialize database.")
|
||||
|
91
cmd/lightSync.go
Normal file
91
cmd/lightSync.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright © 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/geth"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
"github.com/vulcanize/vulcanizedb/utils"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// lightSyncCmd represents the lightSync command
|
||||
var lightSyncCmd = &cobra.Command{
|
||||
Use: "lightSync",
|
||||
Short: "Syncs VulcanizeDB with local ethereum node's block headers",
|
||||
Long: `Syncs VulcanizeDB with local ethereum node. Populates
|
||||
Postgres with block headers.
|
||||
|
||||
./vulcanizedb lightSync --starting-block-number 0 --config public.toml
|
||||
|
||||
Expects ethereum node to be running and requires a .toml config:
|
||||
|
||||
[database]
|
||||
name = "vulcanize_public"
|
||||
hostname = "localhost"
|
||||
port = 5432
|
||||
|
||||
[client]
|
||||
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
|
||||
`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
lightSync()
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(lightSyncCmd)
|
||||
lightSyncCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from")
|
||||
}
|
||||
|
||||
func backFillAllHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
|
||||
missingBlocksPopulated <- history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber)
|
||||
}
|
||||
|
||||
func lightSync() {
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
defer ticker.Stop()
|
||||
blockChain := geth.NewBlockChain(ipc)
|
||||
|
||||
lastBlock := blockChain.LastBlock().Int64()
|
||||
if lastBlock == 0 {
|
||||
log.Fatal("geth initial: state sync not finished")
|
||||
}
|
||||
if startingBlockNumber > lastBlock {
|
||||
log.Fatal("starting block number > current block number")
|
||||
}
|
||||
|
||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||
headerRepository := repositories.NewHeaderRepository(&db)
|
||||
validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow)
|
||||
missingBlocksPopulated := make(chan int)
|
||||
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
window := validator.ValidateHeaders()
|
||||
window.Log(os.Stdout)
|
||||
case <-missingBlocksPopulated:
|
||||
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)
|
||||
}
|
||||
}
|
||||
}
|
14
cmd/root.go
14
cmd/root.go
@ -1,3 +1,17 @@
|
||||
// Copyright © 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
|
29
cmd/sync.go
29
cmd/sync.go
@ -1,3 +1,17 @@
|
||||
// Copyright © 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
@ -19,9 +33,11 @@ import (
|
||||
// syncCmd represents the sync command
|
||||
var syncCmd = &cobra.Command{
|
||||
Use: "sync",
|
||||
Short: "Syncs vulcanizedb with local ethereum node",
|
||||
Long: `Syncs vulcanizedb with local ethereum node.
|
||||
vulcanizedb sync --starting-block-number 0 --config public.toml
|
||||
Short: "Syncs VulcanizeDB with local ethereum node",
|
||||
Long: `Syncs VulcanizeDB with local ethereum node. Populates
|
||||
Postgres with blocks, transactions, receipts, and logs.
|
||||
|
||||
./vulcanizedb sync --starting-block-number 0 --config public.toml
|
||||
|
||||
Expects ethereum node to be running and requires a .toml config:
|
||||
|
||||
@ -40,6 +56,7 @@ Expects ethereum node to be running and requires a .toml config:
|
||||
|
||||
const (
|
||||
pollingInterval = 7 * time.Second
|
||||
validationWindow = 15
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -55,7 +72,7 @@ func backFillAllBlocks(blockchain core.Blockchain, blockRepository datastore.Blo
|
||||
func sync() {
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
defer ticker.Stop()
|
||||
blockchain := geth.NewBlockchain(ipc)
|
||||
blockchain := geth.NewBlockChain(ipc)
|
||||
|
||||
lastBlock := blockchain.LastBlock().Int64()
|
||||
if lastBlock == 0 {
|
||||
@ -67,7 +84,7 @@ func sync() {
|
||||
|
||||
db := utils.LoadPostgres(databaseConfig, blockchain.Node())
|
||||
blockRepository := repositories.NewBlockRepository(&db)
|
||||
validator := history.NewBlockValidator(blockchain, blockRepository, 15)
|
||||
validator := history.NewBlockValidator(blockchain, blockRepository, validationWindow)
|
||||
missingBlocksPopulated := make(chan int)
|
||||
go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber)
|
||||
|
||||
@ -75,7 +92,7 @@ func sync() {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
window := validator.ValidateBlocks()
|
||||
validator.Log(os.Stdout, window)
|
||||
window.Log(os.Stdout)
|
||||
case <-missingBlocksPopulated:
|
||||
go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, startingBlockNumber)
|
||||
}
|
||||
|
1
db/migrations/1531758090_create_headers_table.down.sql
Normal file
1
db/migrations/1531758090_create_headers_table.down.sql
Normal file
@ -0,0 +1 @@
|
||||
DROP TABLE public.headers;
|
11
db/migrations/1531758090_create_headers_table.up.sql
Normal file
11
db/migrations/1531758090_create_headers_table.up.sql
Normal file
@ -0,0 +1,11 @@
|
||||
CREATE TABLE public.headers (
|
||||
id SERIAL PRIMARY KEY,
|
||||
hash VARCHAR(66),
|
||||
block_number BIGINT,
|
||||
raw bytea,
|
||||
eth_node_id INTEGER,
|
||||
eth_node_fingerprint VARCHAR(128),
|
||||
CONSTRAINT eth_nodes_fk FOREIGN KEY (eth_node_id)
|
||||
REFERENCES eth_nodes (id)
|
||||
ON DELETE CASCADE
|
||||
);
|
@ -121,6 +121,40 @@ CREATE TABLE public.eth_nodes (
|
||||
);
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE TABLE public.headers (
|
||||
id integer NOT NULL,
|
||||
hash character varying(66),
|
||||
block_number bigint,
|
||||
raw bytea,
|
||||
eth_node_id integer,
|
||||
eth_node_fingerprint character varying(128)
|
||||
);
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers_id_seq; Type: SEQUENCE; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
CREATE SEQUENCE public.headers_id_seq
|
||||
AS integer
|
||||
START WITH 1
|
||||
INCREMENT BY 1
|
||||
NO MINVALUE
|
||||
NO MAXVALUE
|
||||
CACHE 1;
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER SEQUENCE public.headers_id_seq OWNED BY public.headers.id;
|
||||
|
||||
|
||||
--
|
||||
-- Name: log_filters; Type: TABLE; Schema: public; Owner: -
|
||||
--
|
||||
@ -385,6 +419,13 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block
|
||||
ALTER TABLE ONLY public.eth_nodes ALTER COLUMN id SET DEFAULT nextval('public.nodes_id_seq'::regclass);
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers id; Type: DEFAULT; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.headers_id_seq'::regclass);
|
||||
|
||||
|
||||
--
|
||||
-- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: -
|
||||
--
|
||||
@ -459,6 +500,14 @@ ALTER TABLE ONLY public.eth_nodes
|
||||
ADD CONSTRAINT eth_node_uc UNIQUE (genesis_block, network_id, eth_node_id);
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER TABLE ONLY public.headers
|
||||
ADD CONSTRAINT headers_pkey PRIMARY KEY (id);
|
||||
|
||||
|
||||
--
|
||||
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
@ -574,6 +623,14 @@ ALTER TABLE ONLY public.token_supply
|
||||
ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
--
|
||||
-- Name: headers eth_nodes_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
|
||||
ALTER TABLE ONLY public.headers
|
||||
ADD CONSTRAINT eth_nodes_fk FOREIGN KEY (eth_node_id) REFERENCES public.eth_nodes(id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
--
|
||||
-- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
|
||||
--
|
||||
|
@ -28,7 +28,7 @@ var _ = Describe("ERC20 Fetcher", func() {
|
||||
blockNumber := int64(5502914)
|
||||
|
||||
infuraIPC := "https://mainnet.infura.io/J5Vd2fRtGsw0zZ0Ov3BL"
|
||||
realBlockchain := geth.NewBlockchain(infuraIPC)
|
||||
realBlockchain := geth.NewBlockChain(infuraIPC)
|
||||
realFetcher := every_block.NewFetcher(realBlockchain)
|
||||
|
||||
fakeBlockchain := &mocks.Blockchain{}
|
||||
|
@ -58,7 +58,7 @@ var _ = Describe("Everyblock transformers", func() {
|
||||
})
|
||||
|
||||
It("creates a token_supply record for each block in the given range", func() {
|
||||
initializer := every_block.TokenSupplyTransformerInitializer{erc20_watcher.DaiConfig}
|
||||
initializer := every_block.TokenSupplyTransformerInitializer{Config: erc20_watcher.DaiConfig}
|
||||
transformer := initializer.NewTokenSupplyTransformer(db, &blockchain)
|
||||
transformer.Execute()
|
||||
|
||||
|
@ -15,8 +15,8 @@
|
||||
package every_block
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared"
|
||||
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher"
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared"
|
||||
)
|
||||
|
||||
func TransformerInitializers() []shared.TransformerInitializer {
|
||||
|
@ -116,6 +116,10 @@ type Blockchain struct {
|
||||
lastBlock *big.Int
|
||||
}
|
||||
|
||||
func (fb *Blockchain) GetHeaderByNumber(blockNumber int64) (core.Header, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (fb *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
fb.FetchedAbi = abiJSON
|
||||
fb.FetchedContractAddress = address
|
||||
@ -150,6 +154,10 @@ type FailureBlockchain struct {
|
||||
lastBlock *big.Int
|
||||
}
|
||||
|
||||
func (fb FailureBlockchain) GetHeaderByNumber(blockNumber int64) (core.Header, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (FailureBlockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
return errors.New("TestError")
|
||||
}
|
||||
|
@ -35,21 +35,6 @@ type TransferDBRow struct {
|
||||
VulcanizeLogID int64 `db:"vulcanize_log_id"`
|
||||
}
|
||||
|
||||
func CreateLogRecord(db *postgres.DB, logRepository repositories.LogRepository, log core.Log) {
|
||||
blockRepository := repositories.NewBlockRepository(db)
|
||||
receiptRepository := repositories.ReceiptRepository{DB: db}
|
||||
|
||||
blockNumber := log.BlockNumber
|
||||
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = logRepository.CreateLogs([]core.Log{log}, receiptId)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
func CreateNewDatabase() *postgres.DB {
|
||||
var node core.Node
|
||||
node = core.Node{
|
||||
|
@ -10,14 +10,14 @@ import (
|
||||
var _ = Describe("Rewards calculations", func() {
|
||||
|
||||
It("calculates a block reward for a real block", func() {
|
||||
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
block, err := blockchain.GetBlockByNumber(1071819)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(block.Reward).To(Equal(5.31355))
|
||||
})
|
||||
|
||||
It("calculates an uncle reward for a real block", func() {
|
||||
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
block, err := blockchain.GetBlockByNumber(1071819)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(block.UnclesReward).To(Equal(6.875))
|
||||
|
@ -27,7 +27,7 @@ var _ = Describe("Reading contracts", func() {
|
||||
},
|
||||
Index: 19,
|
||||
Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"}
|
||||
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
contract := testing.SampleContract()
|
||||
|
||||
logs, err := blockchain.GetLogs(contract, big.NewInt(4703824), nil)
|
||||
@ -39,7 +39,7 @@ var _ = Describe("Reading contracts", func() {
|
||||
})
|
||||
|
||||
It("returns and empty log array when no events for a given block / contract combo", func() {
|
||||
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
|
||||
logs, err := blockchain.GetLogs(core.Contract{Hash: "x123"}, big.NewInt(4703824), nil)
|
||||
|
||||
@ -52,7 +52,7 @@ var _ = Describe("Reading contracts", func() {
|
||||
|
||||
Describe("Fetching Contract data", func() {
|
||||
It("returns the correct attribute for a real contract", func() {
|
||||
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain := geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
|
||||
contract := testing.SampleContract()
|
||||
var balance = new(big.Int)
|
||||
|
@ -12,11 +12,11 @@ import (
|
||||
|
||||
var _ = Describe("Reading from the Geth blockchain", func() {
|
||||
|
||||
var blockchain *geth.Blockchain
|
||||
var blockchain *geth.BlockChain
|
||||
var inMemory *inmemory.InMemory
|
||||
|
||||
BeforeEach(func() {
|
||||
blockchain = geth.NewBlockchain(test_config.InfuraClient.IPCPath)
|
||||
blockchain = geth.NewBlockChain(test_config.InfuraClient.IPCPath)
|
||||
inMemory = inmemory.NewInMemory()
|
||||
})
|
||||
|
||||
|
@ -5,6 +5,7 @@ import "math/big"
|
||||
type Blockchain interface {
|
||||
ContractDataFetcher
|
||||
GetBlockByNumber(blockNumber int64) (Block, error)
|
||||
GetHeaderByNumber(blockNumber int64) (Header, error)
|
||||
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
|
||||
LastBlock() *big.Int
|
||||
Node() Node
|
||||
|
7
pkg/core/header.go
Normal file
7
pkg/core/header.go
Normal file
@ -0,0 +1,7 @@
|
||||
package core
|
||||
|
||||
type Header struct {
|
||||
BlockNumber int64 `db:"block_number"`
|
||||
Hash string
|
||||
Raw []byte
|
||||
}
|
30
pkg/datastore/inmemory/header_repository.go
Normal file
30
pkg/datastore/inmemory/header_repository.go
Normal file
@ -0,0 +1,30 @@
|
||||
package inmemory
|
||||
|
||||
import "github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
|
||||
type HeaderRepository struct {
|
||||
memory *InMemory
|
||||
}
|
||||
|
||||
func NewHeaderRepository(memory *InMemory) *HeaderRepository {
|
||||
return &HeaderRepository{memory: memory}
|
||||
}
|
||||
|
||||
func (repository *HeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) {
|
||||
repository.memory.headers[header.BlockNumber] = header
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (repository *HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
|
||||
return repository.memory.headers[blockNumber], nil
|
||||
}
|
||||
|
||||
func (repository *HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 {
|
||||
missingNumbers := []int64{}
|
||||
for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ {
|
||||
if _, ok := repository.memory.headers[blockNumber]; !ok {
|
||||
missingNumbers = append(missingNumbers, blockNumber)
|
||||
}
|
||||
}
|
||||
return missingNumbers
|
||||
}
|
@ -10,21 +10,23 @@ const (
|
||||
)
|
||||
|
||||
type InMemory struct {
|
||||
blocks map[int64]core.Block
|
||||
receipts map[string]core.Receipt
|
||||
contracts map[string]core.Contract
|
||||
logs map[string][]core.Log
|
||||
logFilters map[string]filters.LogFilter
|
||||
CreateOrUpdateBlockCallCount int
|
||||
blocks map[int64]core.Block
|
||||
contracts map[string]core.Contract
|
||||
headers map[int64]core.Header
|
||||
logFilters map[string]filters.LogFilter
|
||||
logs map[string][]core.Log
|
||||
receipts map[string]core.Receipt
|
||||
}
|
||||
|
||||
func NewInMemory() *InMemory {
|
||||
return &InMemory{
|
||||
CreateOrUpdateBlockCallCount: 0,
|
||||
blocks: make(map[int64]core.Block),
|
||||
receipts: make(map[string]core.Receipt),
|
||||
contracts: make(map[string]core.Contract),
|
||||
logs: make(map[string][]core.Log),
|
||||
headers: make(map[int64]core.Header),
|
||||
logFilters: make(map[string]filters.LogFilter),
|
||||
logs: make(map[string][]core.Log),
|
||||
receipts: make(map[string]core.Receipt),
|
||||
}
|
||||
}
|
||||
|
80
pkg/datastore/postgres/repositories/header_repository.go
Normal file
80
pkg/datastore/postgres/repositories/header_repository.go
Normal file
@ -0,0 +1,80 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
)
|
||||
|
||||
type HeaderRepository struct {
|
||||
database *postgres.DB
|
||||
}
|
||||
|
||||
func NewHeaderRepository(database *postgres.DB) HeaderRepository {
|
||||
return HeaderRepository{database: database}
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) {
|
||||
hash, err := repository.getHeaderHash(header)
|
||||
if err != nil {
|
||||
if headerDoesNotExist(err) {
|
||||
return repository.insertHeader(header)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
if headerMustBeReplaced(hash, header) {
|
||||
return repository.replaceHeader(header)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
|
||||
var header core.Header
|
||||
err := repository.database.Get(&header, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
|
||||
blockNumber, repository.database.Node.ID)
|
||||
return header, err
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 {
|
||||
numbers := make([]int64, 0)
|
||||
repository.database.Select(&numbers, `SELECT all_block_numbers
|
||||
FROM (
|
||||
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
|
||||
WHERE all_block_numbers NOT IN (
|
||||
SELECT block_number FROM headers WHERE eth_node_fingerprint = $3
|
||||
) `,
|
||||
startingBlockNumber, endingBlockNumber, nodeID)
|
||||
return numbers
|
||||
}
|
||||
|
||||
func headerMustBeReplaced(hash string, header core.Header) bool {
|
||||
return hash != header.Hash
|
||||
}
|
||||
|
||||
func headerDoesNotExist(err error) bool {
|
||||
return err == sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) getHeaderHash(header core.Header) (string, error) {
|
||||
var hash string
|
||||
err := repository.database.Get(&hash, `SELECT hash FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
|
||||
header.BlockNumber, repository.database.Node.ID)
|
||||
return hash, err
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) insertHeader(header core.Header) (int64, error) {
|
||||
var headerId int64
|
||||
err := repository.database.QueryRowx(
|
||||
`INSERT INTO public.headers (block_number, hash, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3, $4, $5) RETURNING id`,
|
||||
header.BlockNumber, header.Hash, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId)
|
||||
return headerId, err
|
||||
}
|
||||
|
||||
func (repository HeaderRepository) replaceHeader(header core.Header) (int64, error) {
|
||||
_, err := repository.database.Exec(`DELETE FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
|
||||
header.BlockNumber, repository.database.Node.ID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return repository.insertHeader(header)
|
||||
}
|
248
pkg/datastore/postgres/repositories/header_repository_test.go
Normal file
248
pkg/datastore/postgres/repositories/header_repository_test.go
Normal file
@ -0,0 +1,248 @@
|
||||
package repositories_test
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
|
||||
"github.com/vulcanize/vulcanizedb/test_config"
|
||||
)
|
||||
|
||||
var _ = Describe("Block header repository", func() {
|
||||
Describe("creating or updating a header", func() {
|
||||
|
||||
It("adds a header", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var dbHeader core.Header
|
||||
err = db.Get(&dbHeader, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(dbHeader.BlockNumber).To(Equal(header.BlockNumber))
|
||||
Expect(dbHeader.Hash).To(Equal(header.Hash))
|
||||
Expect(dbHeader.Raw).To(Equal(header.Raw))
|
||||
})
|
||||
|
||||
It("adds node data to header", func() {
|
||||
node := core.Node{ID: "EthNodeFingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{BlockNumber: 100}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var ethNodeId int64
|
||||
err = db.Get(ðNodeId, `SELECT eth_node_id FROM public.headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(ethNodeId).To(Equal(db.NodeID))
|
||||
var ethNodeFingerprint string
|
||||
err = db.Get(ðNodeFingerprint, `SELECT eth_node_fingerprint FROM public.headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(ethNodeFingerprint).To(Equal(db.Node.ID))
|
||||
})
|
||||
|
||||
It("does not duplicate headers", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var dbHeaders []core.Header
|
||||
err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(dbHeaders)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("replaces header if hash is different", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
headerTwo := core.Header{
|
||||
BlockNumber: header.BlockNumber,
|
||||
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
|
||||
Raw: []byte{5, 4, 3, 2, 1},
|
||||
}
|
||||
|
||||
_, err = repo.CreateOrUpdateHeader(headerTwo)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var dbHeader core.Header
|
||||
err = db.Get(&dbHeader, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(dbHeader.Hash).To(Equal(headerTwo.Hash))
|
||||
Expect(dbHeader.Raw).To(Equal(headerTwo.Raw))
|
||||
})
|
||||
|
||||
It("does not replace header if node fingerprint is different", func() {
|
||||
node := core.Node{ID: "Fingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||
headerTwo := core.Header{
|
||||
BlockNumber: header.BlockNumber,
|
||||
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
|
||||
Raw: []byte{5, 4, 3, 2, 1},
|
||||
}
|
||||
|
||||
_, err = repoTwo.CreateOrUpdateHeader(headerTwo)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var dbHeaders []core.Header
|
||||
err = dbTwo.Select(&dbHeaders, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(dbHeaders)).To(Equal(2))
|
||||
})
|
||||
|
||||
It("only replaces header with matching node fingerprint", func() {
|
||||
node := core.Node{ID: "Fingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||
headerTwo := core.Header{
|
||||
BlockNumber: header.BlockNumber,
|
||||
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
|
||||
Raw: []byte{5, 4, 3, 2, 1},
|
||||
}
|
||||
_, err = repoTwo.CreateOrUpdateHeader(headerTwo)
|
||||
headerThree := core.Header{
|
||||
BlockNumber: header.BlockNumber,
|
||||
Hash: common.BytesToHash([]byte{1, 1, 1, 1, 1}).Hex(),
|
||||
Raw: []byte{1, 1, 1, 1, 1},
|
||||
}
|
||||
|
||||
_, err = repoTwo.CreateOrUpdateHeader(headerThree)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var dbHeaders []core.Header
|
||||
err = dbTwo.Select(&dbHeaders, `SELECT block_number, hash, raw FROM headers WHERE block_number = $1`, header.BlockNumber)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(dbHeaders)).To(Equal(2))
|
||||
Expect(dbHeaders[0].Hash).To(Or(Equal(header.Hash), Equal(headerThree.Hash)))
|
||||
Expect(dbHeaders[1].Hash).To(Or(Equal(header.Hash), Equal(headerThree.Hash)))
|
||||
Expect(dbHeaders[0].Raw).To(Or(Equal(header.Raw), Equal(headerThree.Raw)))
|
||||
Expect(dbHeaders[1].Raw).To(Or(Equal(header.Raw), Equal(headerThree.Raw)))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Getting a header", func() {
|
||||
It("returns header if it exists", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
dbHeader, err := repo.GetHeader(header.BlockNumber)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(dbHeader).To(Equal(header))
|
||||
})
|
||||
|
||||
It("does not return header for a different node fingerprint", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
header := core.Header{
|
||||
BlockNumber: 100,
|
||||
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
|
||||
Raw: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
_, err := repo.CreateOrUpdateHeader(header)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||
|
||||
_, err = repoTwo.GetHeader(header.BlockNumber)
|
||||
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err).To(MatchError(sql.ErrNoRows))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Getting missing headers", func() {
|
||||
It("returns block numbers for headers not in the database", func() {
|
||||
node := core.Node{}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 1})
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 3})
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 5})
|
||||
|
||||
missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID)
|
||||
|
||||
Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4}))
|
||||
})
|
||||
|
||||
It("does not count headers created by a different node fingerprint", func() {
|
||||
node := core.Node{ID: "NodeFingerprint"}
|
||||
db := test_config.NewTestDB(node)
|
||||
repo := repositories.NewHeaderRepository(db)
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 1})
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 3})
|
||||
repo.CreateOrUpdateHeader(core.Header{BlockNumber: 5})
|
||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||
|
||||
missingBlockNumbers := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID)
|
||||
|
||||
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
|
||||
})
|
||||
})
|
||||
})
|
@ -14,7 +14,7 @@ var ErrBlockDoesNotExist = func(blockNumber int64) error {
|
||||
type BlockRepository interface {
|
||||
CreateOrUpdateBlock(block core.Block) (int64, error)
|
||||
GetBlock(blockNumber int64) (core.Block, error)
|
||||
MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64, nodeId string) []int64
|
||||
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64
|
||||
SetBlocksStatus(chainHead int64)
|
||||
}
|
||||
|
||||
@ -37,6 +37,12 @@ type FilterRepository interface {
|
||||
GetFilter(name string) (filters.LogFilter, error)
|
||||
}
|
||||
|
||||
type HeaderRepository interface {
|
||||
CreateOrUpdateHeader(header core.Header) (int64, error)
|
||||
GetHeader(blockNumber int64) (core.Header, error)
|
||||
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64
|
||||
}
|
||||
|
||||
type LogRepository interface {
|
||||
CreateLogs(logs []core.Log, receiptId int64) error
|
||||
GetLogs(address string, blockNumber int64) []core.Log
|
||||
|
@ -6,28 +6,33 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
)
|
||||
|
||||
type Blockchain struct {
|
||||
logs map[string][]core.Log
|
||||
blocks map[int64]core.Block
|
||||
contractAttributes map[string]map[string]string
|
||||
blocksChannel chan core.Block
|
||||
WasToldToStop bool
|
||||
node core.Node
|
||||
type BlockChain struct {
|
||||
ContractReturnValue []byte
|
||||
WasToldToStop bool
|
||||
blocks map[int64]core.Block
|
||||
blocksChannel chan core.Block
|
||||
contractAttributes map[string]map[string]string
|
||||
err error
|
||||
headers map[int64]core.Header
|
||||
logs map[string][]core.Log
|
||||
node core.Node
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (core.Header, error) {
|
||||
return blockChain.headers[blockNumber], nil
|
||||
}
|
||||
|
||||
func (blockChain *BlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) {
|
||||
return blockchain.ContractReturnValue, nil
|
||||
func (blockChain *BlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) {
|
||||
return blockChain.ContractReturnValue, nil
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) LastBlock() *big.Int {
|
||||
func (blockChain *BlockChain) LastBlock() *big.Int {
|
||||
var max int64
|
||||
for blockNumber := range blockchain.blocks {
|
||||
for blockNumber := range blockChain.blocks {
|
||||
if blockNumber > max {
|
||||
max = blockNumber
|
||||
}
|
||||
@ -35,16 +40,16 @@ func (blockchain *Blockchain) LastBlock() *big.Int {
|
||||
return big.NewInt(max)
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlock *big.Int, endingBlock *big.Int) ([]core.Log, error) {
|
||||
return blockchain.logs[contract.Hash], nil
|
||||
func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlock *big.Int, endingBlock *big.Int) ([]core.Log, error) {
|
||||
return blockChain.logs[contract.Hash], nil
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) Node() core.Node {
|
||||
return blockchain.node
|
||||
func (blockChain *BlockChain) Node() core.Node {
|
||||
return blockChain.node
|
||||
}
|
||||
|
||||
func NewBlockchain(err error) *Blockchain {
|
||||
return &Blockchain{
|
||||
func NewBlockchain(err error) *BlockChain {
|
||||
return &BlockChain{
|
||||
blocks: make(map[int64]core.Block),
|
||||
logs: make(map[string][]core.Log),
|
||||
contractAttributes: make(map[string]map[string]string),
|
||||
@ -53,24 +58,40 @@ func NewBlockchain(err error) *Blockchain {
|
||||
}
|
||||
}
|
||||
|
||||
func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain {
|
||||
func NewBlockchainWithBlocks(blocks []core.Block) *BlockChain {
|
||||
blockNumberToBlocks := make(map[int64]core.Block)
|
||||
for _, block := range blocks {
|
||||
blockNumberToBlocks[block.Number] = block
|
||||
}
|
||||
return &Blockchain{
|
||||
return &BlockChain{
|
||||
blocks: blockNumberToBlocks,
|
||||
}
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
|
||||
if blockchain.err != nil {
|
||||
return core.Block{}, blockchain.err
|
||||
func NewBlockChainWithHeaders(headers []core.Header) *BlockChain {
|
||||
// need to create blocks and headers so that LastBlock() will work in the mock
|
||||
// no reason to implement LastBlock() separately for headers since it checks
|
||||
// the last header in the Node's DB already
|
||||
memoryBlocks := make(map[int64]core.Block)
|
||||
memoryHeaders := make(map[int64]core.Header)
|
||||
for _, header := range headers {
|
||||
memoryBlocks[header.BlockNumber] = core.Block{Number: header.BlockNumber}
|
||||
memoryHeaders[header.BlockNumber] = header
|
||||
}
|
||||
return &BlockChain{
|
||||
blocks: memoryBlocks,
|
||||
headers: memoryHeaders,
|
||||
}
|
||||
return blockchain.blocks[blockNumber], nil
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) AddBlock(block core.Block) {
|
||||
blockchain.blocks[block.Number] = block
|
||||
blockchain.blocksChannel <- block
|
||||
func (blockChain *BlockChain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
|
||||
if blockChain.err != nil {
|
||||
return core.Block{}, blockChain.err
|
||||
}
|
||||
return blockChain.blocks[blockNumber], nil
|
||||
}
|
||||
|
||||
func (blockChain *BlockChain) AddBlock(block core.Block) {
|
||||
blockChain.blocks[block.Number] = block
|
||||
blockChain.blocksChannel <- block
|
||||
}
|
||||
|
@ -1,13 +1,11 @@
|
||||
package geth
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"log"
|
||||
"math/big"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"golang.org/x/net/context"
|
||||
@ -18,31 +16,30 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
|
||||
)
|
||||
|
||||
type Blockchain struct {
|
||||
type BlockChain struct {
|
||||
client *ethclient.Client
|
||||
blockConverter vulcCommon.BlockConverter
|
||||
readGethHeaders chan *types.Header
|
||||
outputBlocks chan core.Block
|
||||
newHeadSubscription ethereum.Subscription
|
||||
headerConverter vulcCommon.HeaderConverter
|
||||
node core.Node
|
||||
}
|
||||
|
||||
func NewBlockchain(ipcPath string) *Blockchain {
|
||||
blockchain := Blockchain{}
|
||||
func NewBlockChain(ipcPath string) *BlockChain {
|
||||
rpcClient, err := rpc.Dial(ipcPath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
client := ethclient.NewClient(rpcClient)
|
||||
clientWrapper := node.ClientWrapper{ContextCaller: rpcClient, IPCPath: ipcPath}
|
||||
blockchain.node = node.MakeNode(clientWrapper)
|
||||
blockchain.client = client
|
||||
transactionConverter := vulcRpc.NewRpcTransactionConverter(client)
|
||||
blockchain.blockConverter = vulcCommon.NewBlockConverter(transactionConverter)
|
||||
return &blockchain
|
||||
return &BlockChain{
|
||||
client: client,
|
||||
blockConverter: vulcCommon.NewBlockConverter(transactionConverter),
|
||||
headerConverter: vulcCommon.HeaderConverter{},
|
||||
node: node.MakeNode(clientWrapper),
|
||||
}
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]core.Log, error) {
|
||||
func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) {
|
||||
if endingBlockNumber == nil {
|
||||
endingBlockNumber = startingBlockNumber
|
||||
}
|
||||
@ -52,7 +49,7 @@ func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumbe
|
||||
ToBlock: endingBlockNumber,
|
||||
Addresses: []common.Address{contractAddress},
|
||||
}
|
||||
gethLogs, err := blockchain.client.FilterLogs(context.Background(), fc)
|
||||
gethLogs, err := blockChain.client.FilterLogs(context.Background(), fc)
|
||||
if err != nil {
|
||||
return []core.Log{}, err
|
||||
}
|
||||
@ -60,23 +57,27 @@ func (blockchain *Blockchain) GetLogs(contract core.Contract, startingBlockNumbe
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) Node() core.Node {
|
||||
return blockchain.node
|
||||
func (blockChain *BlockChain) Node() core.Node {
|
||||
return blockChain.node
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
|
||||
gethBlock, err := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber))
|
||||
func (blockChain *BlockChain) GetBlockByNumber(blockNumber int64) (block core.Block, err error) {
|
||||
gethBlock, err := blockChain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber))
|
||||
if err != nil {
|
||||
return core.Block{}, err
|
||||
return block, err
|
||||
}
|
||||
block, err := blockchain.blockConverter.ToCoreBlock(gethBlock)
|
||||
if err != nil {
|
||||
return core.Block{}, err
|
||||
}
|
||||
return block, nil
|
||||
return blockChain.blockConverter.ToCoreBlock(gethBlock)
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) LastBlock() *big.Int {
|
||||
block, _ := blockchain.client.HeaderByNumber(context.Background(), nil)
|
||||
func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core.Header, err error) {
|
||||
gethHeader, err := blockChain.client.HeaderByNumber(context.Background(), big.NewInt(blockNumber))
|
||||
if err != nil {
|
||||
return header, err
|
||||
}
|
||||
return blockChain.headerConverter.Convert(gethHeader)
|
||||
}
|
||||
|
||||
func (blockChain *BlockChain) LastBlock() *big.Int {
|
||||
block, _ := blockChain.client.HeaderByNumber(context.Background(), nil)
|
||||
return block.Number
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ var (
|
||||
ErrInvalidStateAttribute = errors.New("invalid state attribute")
|
||||
)
|
||||
|
||||
func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
func (blockChain *BlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
|
||||
parsed, err := ParseAbi(abiJSON)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -28,15 +28,15 @@ func (blockchain *Blockchain) FetchContractData(abiJSON string, address string,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
output, err := blockchain.callContract(address, input, big.NewInt(blockNumber))
|
||||
output, err := blockChain.callContract(address, input, big.NewInt(blockNumber))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return parsed.Unpack(result, method, output)
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) callContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) {
|
||||
func (blockChain *BlockChain) callContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) {
|
||||
to := common.HexToAddress(contractHash)
|
||||
msg := ethereum.CallMsg{To: &to, Data: input}
|
||||
return blockchain.client.CallContract(context.Background(), msg, blockNumber)
|
||||
return blockChain.client.CallContract(context.Background(), msg, blockNumber)
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ func (bc BlockConverter) ToCoreBlock(gethBlock *types.Block) (core.Block, error)
|
||||
if err != nil {
|
||||
return core.Block{}, err
|
||||
}
|
||||
|
||||
coreBlock := core.Block{
|
||||
Difficulty: gethBlock.Difficulty().Int64(),
|
||||
ExtraData: hexutil.Encode(gethBlock.Extra()),
|
||||
|
24
pkg/geth/converters/common/header_converter.go
Normal file
24
pkg/geth/converters/common/header_converter.go
Normal file
@ -0,0 +1,24 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
)
|
||||
|
||||
type HeaderConverter struct{}
|
||||
|
||||
func (converter HeaderConverter) Convert(gethHeader *types.Header) (core.Header, error) {
|
||||
writer := new(bytes.Buffer)
|
||||
err := rlp.Encode(writer, &gethHeader)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
coreHeader := core.Header{
|
||||
Hash: gethHeader.Hash().Hex(),
|
||||
BlockNumber: gethHeader.Number.Int64(),
|
||||
Raw: writer.Bytes(),
|
||||
}
|
||||
return coreHeader, nil
|
||||
}
|
47
pkg/geth/converters/common/header_converter_test.go
Normal file
47
pkg/geth/converters/common/header_converter_test.go
Normal file
@ -0,0 +1,47 @@
|
||||
package common_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
common2 "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
|
||||
"math/big"
|
||||
)
|
||||
|
||||
var _ = Describe("Block header converter", func() {
|
||||
It("converts geth header to core header", func() {
|
||||
gethHeader := &types.Header{
|
||||
Difficulty: big.NewInt(1),
|
||||
Number: big.NewInt(2),
|
||||
ParentHash: common.HexToHash("0xParent"),
|
||||
ReceiptHash: common.HexToHash("0xReceipt"),
|
||||
Root: common.HexToHash("0xRoot"),
|
||||
Time: big.NewInt(3),
|
||||
TxHash: common.HexToHash("0xTransaction"),
|
||||
UncleHash: common.HexToHash("0xUncle"),
|
||||
}
|
||||
converter := common2.HeaderConverter{}
|
||||
|
||||
coreHeader, err := converter.Convert(gethHeader)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(coreHeader.BlockNumber).To(Equal(gethHeader.Number.Int64()))
|
||||
Expect(coreHeader.Hash).To(Equal(gethHeader.Hash().Hex()))
|
||||
})
|
||||
|
||||
It("includes raw bytes for header", func() {
|
||||
headerRLP := []byte{249, 2, 23, 160, 180, 251, 173, 248, 234, 69, 43, 19, 151, 24, 226, 112, 13, 193, 19, 92, 252, 129, 20, 80, 49, 200, 75, 122, 178, 124, 215, 16, 57, 79, 123, 56, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 148, 42, 101, 172, 164, 213, 252, 91, 92, 133, 144, 144, 166, 195, 77, 22, 65, 53, 57, 130, 38, 160, 14, 6, 111, 60, 34, 151, 165, 203, 48, 5, 147, 5, 38, 23, 209, 188, 165, 148, 111, 12, 170, 6, 53, 253, 177, 184, 90, 199, 229, 35, 111, 52, 160, 101, 186, 136, 127, 203, 8, 38, 246, 22, 208, 31, 115, 108, 29, 45, 103, 123, 202, 189, 226, 247, 252, 37, 170, 145, 207, 188, 11, 59, 173, 92, 179, 160, 32, 227, 83, 69, 64, 202, 241, 99, 120, 230, 232, 106, 43, 241, 35, 109, 159, 135, 109, 50, 24, 251, 192, 57, 88, 230, 219, 28, 99, 75, 35, 51, 185, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 134, 11, 105, 222, 129, 162, 43, 131, 15, 66, 64, 131, 47, 239, 216, 130, 196, 68, 132, 86, 191, 180, 21, 152, 215, 131, 1, 3, 3, 132, 71, 101, 116, 104, 135, 103, 111, 49, 46, 53, 46, 49, 133, 108, 105, 110, 117, 120, 160, 146, 196, 18, 154, 10, 226, 54, 27, 69, 42, 158, 222, 236, 229, 92, 18, 236, 238, 171, 134, 99, 22, 25, 94, 61, 135, 252, 27, 0, 91, 102, 69, 136, 205, 76, 85, 185, 65, 207, 144, 21}
|
||||
var gethHeader types.Header
|
||||
err := rlp.Decode(bytes.NewReader(headerRLP), &gethHeader)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
converter := common2.HeaderConverter{}
|
||||
|
||||
coreHeader, err := converter.Convert(&gethHeader)
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(coreHeader.Raw).To(Equal(headerRLP))
|
||||
})
|
||||
})
|
29
pkg/history/block_validator.go
Normal file
29
pkg/history/block_validator.go
Normal file
@ -0,0 +1,29 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||
)
|
||||
|
||||
type BlockValidator struct {
|
||||
blockchain core.Blockchain
|
||||
blockRepository datastore.BlockRepository
|
||||
windowSize int
|
||||
}
|
||||
|
||||
func NewBlockValidator(blockchain core.Blockchain, blockRepository datastore.BlockRepository, windowSize int) *BlockValidator {
|
||||
return &BlockValidator{
|
||||
blockchain: blockchain,
|
||||
blockRepository: blockRepository,
|
||||
windowSize: windowSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (bv BlockValidator) ValidateBlocks() ValidationWindow {
|
||||
window := MakeValidationWindow(bv.blockchain, bv.windowSize)
|
||||
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
||||
RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
|
||||
lastBlock := bv.blockchain.LastBlock().Int64()
|
||||
bv.blockRepository.SetBlocksStatus(lastBlock)
|
||||
return window
|
||||
}
|
43
pkg/history/block_validator_test.go
Normal file
43
pkg/history/block_validator_test.go
Normal file
@ -0,0 +1,43 @@
|
||||
package history_test
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
)
|
||||
|
||||
var _ = Describe("Blocks validator", func() {
|
||||
|
||||
It("calls create or update for all blocks within the window", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 4},
|
||||
{Number: 5},
|
||||
{Number: 6},
|
||||
{Number: 7},
|
||||
})
|
||||
inMemoryDB := inmemory.NewInMemory()
|
||||
blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB}
|
||||
validator := history.NewBlockValidator(blockchain, blocksRepository, 2)
|
||||
|
||||
window := validator.ValidateBlocks()
|
||||
|
||||
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))
|
||||
Expect(blocksRepository.BlockCount()).To(Equal(3))
|
||||
Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(3))
|
||||
})
|
||||
|
||||
It("returns the number of largest block", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 1},
|
||||
{Number: 2},
|
||||
{Number: 3},
|
||||
})
|
||||
maxBlockNumber := blockchain.LastBlock()
|
||||
|
||||
Expect(maxBlockNumber.Int64()).To(Equal(int64(3)))
|
||||
})
|
||||
})
|
27
pkg/history/header_validator.go
Normal file
27
pkg/history/header_validator.go
Normal file
@ -0,0 +1,27 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||
)
|
||||
|
||||
type HeaderValidator struct {
|
||||
blockChain core.Blockchain
|
||||
headerRepository datastore.HeaderRepository
|
||||
windowSize int
|
||||
}
|
||||
|
||||
func NewHeaderValidator(blockChain core.Blockchain, repository datastore.HeaderRepository, windowSize int) HeaderValidator {
|
||||
return HeaderValidator{
|
||||
blockChain: blockChain,
|
||||
headerRepository: repository,
|
||||
windowSize: windowSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (validator HeaderValidator) ValidateHeaders() ValidationWindow {
|
||||
window := MakeValidationWindow(validator.blockChain, validator.windowSize)
|
||||
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
||||
RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
|
||||
return window
|
||||
}
|
39
pkg/history/header_validator_test.go
Normal file
39
pkg/history/header_validator_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package history_test
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
)
|
||||
|
||||
var _ = Describe("Header validator", func() {
|
||||
It("replaces headers in the validation window that have changed", func() {
|
||||
blockNumber := int64(10)
|
||||
oldHash := common.HexToHash("0x0987654321").Hex()
|
||||
oldHeader := core.Header{
|
||||
BlockNumber: blockNumber,
|
||||
Hash: oldHash,
|
||||
}
|
||||
inMemory := inmemory.NewInMemory()
|
||||
headerRepository := inmemory.NewHeaderRepository(inMemory)
|
||||
headerRepository.CreateOrUpdateHeader(oldHeader)
|
||||
newHash := common.HexToHash("0x123456789").Hex()
|
||||
newHeader := core.Header{
|
||||
BlockNumber: blockNumber,
|
||||
Hash: newHash,
|
||||
}
|
||||
headers := []core.Header{newHeader}
|
||||
blockChain := fakes.NewBlockChainWithHeaders(headers)
|
||||
validator := history.NewHeaderValidator(blockChain, headerRepository, 1)
|
||||
|
||||
validator.ValidateHeaders()
|
||||
|
||||
dbHeader, _ := headerRepository.GetHeader(blockNumber)
|
||||
Expect(dbHeader.Hash).NotTo(Equal(oldHash))
|
||||
Expect(dbHeader.Hash).To(Equal(newHash))
|
||||
})
|
||||
})
|
@ -9,7 +9,7 @@ import (
|
||||
|
||||
func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, startingBlockNumber int64) int {
|
||||
lastBlock := blockchain.LastBlock().Int64()
|
||||
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1, blockchain.Node().ID)
|
||||
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||
log.SetPrefix("")
|
||||
log.Printf("Backfilling %d blocks\n\n", len(blockRange))
|
||||
RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)
|
||||
|
@ -20,7 +20,7 @@ var _ = Describe("Populating blocks", func() {
|
||||
blockRepository = &inmemory.BlockRepository{InMemory: inMemory}
|
||||
})
|
||||
|
||||
It("fills in the only missing block (Number 1)", func() {
|
||||
It("fills in the only missing block (BlockNumber 1)", func() {
|
||||
blocks := []core.Block{
|
||||
{Number: 1},
|
||||
{Number: 2},
|
||||
@ -57,11 +57,12 @@ var _ = Describe("Populating blocks", func() {
|
||||
blockRepository.CreateOrUpdateBlock(core.Block{Number: 9})
|
||||
blockRepository.CreateOrUpdateBlock(core.Block{Number: 11})
|
||||
blockRepository.CreateOrUpdateBlock(core.Block{Number: 12})
|
||||
blockRepository.CreateOrUpdateBlock(core.Block{Number: 13})
|
||||
|
||||
blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 5)
|
||||
|
||||
Expect(blocksAdded).To(Equal(3))
|
||||
Expect(blockRepository.BlockCount()).To(Equal(11))
|
||||
Expect(blockRepository.BlockCount()).To(Equal(12))
|
||||
_, err := blockRepository.GetBlock(4)
|
||||
Expect(err).To(HaveOccurred())
|
||||
_, err = blockRepository.GetBlock(5)
|
||||
@ -70,7 +71,7 @@ var _ = Describe("Populating blocks", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = blockRepository.GetBlock(10)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = blockRepository.GetBlock(13)
|
||||
_, err = blockRepository.GetBlock(14)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
|
||||
@ -98,8 +99,8 @@ var _ = Describe("Populating blocks", func() {
|
||||
})
|
||||
|
||||
history.RetrieveAndUpdateBlocks(blockchain, blockRepository, history.MakeRange(2, 5))
|
||||
Expect(blockRepository.BlockCount()).To(Equal(3))
|
||||
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3))
|
||||
Expect(blockRepository.BlockCount()).To(Equal(4))
|
||||
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(4))
|
||||
})
|
||||
|
||||
It("does not call repository create block when there is an error", func() {
|
||||
@ -109,5 +110,4 @@ var _ = Describe("Populating blocks", func() {
|
||||
Expect(blockRepository.BlockCount()).To(Equal(0))
|
||||
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(0))
|
||||
})
|
||||
|
||||
})
|
||||
|
29
pkg/history/populate_headers.go
Normal file
29
pkg/history/populate_headers.go
Normal file
@ -0,0 +1,29 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||
"log"
|
||||
)
|
||||
|
||||
func PopulateMissingHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) int {
|
||||
lastBlock := blockchain.LastBlock().Int64()
|
||||
blockRange := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||
log.SetPrefix("")
|
||||
log.Printf("Backfilling %d blocks\n\n", len(blockRange))
|
||||
RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange)
|
||||
return len(blockRange)
|
||||
}
|
||||
|
||||
func RetrieveAndUpdateHeaders(blockchain core.Blockchain, headerRepository datastore.HeaderRepository, blockNumbers []int64) int {
|
||||
for _, blockNumber := range blockNumbers {
|
||||
header, err := blockchain.GetHeaderByNumber(blockNumber)
|
||||
if err != nil {
|
||||
log.Printf("failed to retrieve block number: %d\n", blockNumber)
|
||||
return 0
|
||||
}
|
||||
// TODO: handle possible error here
|
||||
headerRepository.CreateOrUpdateHeader(header)
|
||||
}
|
||||
return len(blockNumbers)
|
||||
}
|
52
pkg/history/populate_headers_test.go
Normal file
52
pkg/history/populate_headers_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package history_test
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
)
|
||||
|
||||
var _ = Describe("Populating headers", func() {
|
||||
|
||||
var inMemory *inmemory.InMemory
|
||||
var headerRepository *inmemory.HeaderRepository
|
||||
|
||||
BeforeEach(func() {
|
||||
inMemory = inmemory.NewInMemory()
|
||||
headerRepository = inmemory.NewHeaderRepository(inMemory)
|
||||
})
|
||||
|
||||
Describe("When 1 missing header", func() {
|
||||
|
||||
It("returns number of headers added", func() {
|
||||
headers := []core.Header{
|
||||
{BlockNumber: 1},
|
||||
{BlockNumber: 2},
|
||||
}
|
||||
blockChain := fakes.NewBlockChainWithHeaders(headers)
|
||||
headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: 2})
|
||||
|
||||
headersAdded := history.PopulateMissingHeaders(blockChain, headerRepository, 1)
|
||||
|
||||
Expect(headersAdded).To(Equal(1))
|
||||
})
|
||||
})
|
||||
|
||||
It("adds missing headers to the db", func() {
|
||||
headers := []core.Header{
|
||||
{BlockNumber: 1},
|
||||
{BlockNumber: 2},
|
||||
}
|
||||
blockChain := fakes.NewBlockChainWithHeaders(headers)
|
||||
dbHeader, _ := headerRepository.GetHeader(1)
|
||||
Expect(dbHeader.BlockNumber).To(BeZero())
|
||||
|
||||
history.PopulateMissingHeaders(blockChain, headerRepository, 1)
|
||||
|
||||
dbHeader, _ = headerRepository.GetHeader(1)
|
||||
Expect(dbHeader.BlockNumber).To(Equal(int64(1)))
|
||||
})
|
||||
})
|
@ -1,68 +0,0 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"io"
|
||||
"text/template"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||
)
|
||||
|
||||
const WindowTemplate = `Validating Blocks
|
||||
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD)
|
||||
|
||||
`
|
||||
|
||||
var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate))
|
||||
|
||||
type BlockValidator struct {
|
||||
blockchain core.Blockchain
|
||||
blockRepository datastore.BlockRepository
|
||||
windowSize int
|
||||
parsedLoggingTemplate template.Template
|
||||
}
|
||||
|
||||
func NewBlockValidator(blockchain core.Blockchain, blockRepository datastore.BlockRepository, windowSize int) *BlockValidator {
|
||||
return &BlockValidator{
|
||||
blockchain,
|
||||
blockRepository,
|
||||
windowSize,
|
||||
ParsedWindowTemplate,
|
||||
}
|
||||
}
|
||||
|
||||
func (bv BlockValidator) ValidateBlocks() ValidationWindow {
|
||||
window := MakeValidationWindow(bv.blockchain, bv.windowSize)
|
||||
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
||||
RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
|
||||
lastBlock := bv.blockchain.LastBlock().Int64()
|
||||
bv.blockRepository.SetBlocksStatus(lastBlock)
|
||||
return window
|
||||
}
|
||||
|
||||
func (bv BlockValidator) Log(out io.Writer, window ValidationWindow) {
|
||||
bv.parsedLoggingTemplate.Execute(out, window)
|
||||
}
|
||||
|
||||
type ValidationWindow struct {
|
||||
LowerBound int64
|
||||
UpperBound int64
|
||||
}
|
||||
|
||||
func (window ValidationWindow) Size() int {
|
||||
return int(window.UpperBound - window.LowerBound)
|
||||
}
|
||||
|
||||
func MakeValidationWindow(blockchain core.Blockchain, windowSize int) ValidationWindow {
|
||||
upperBound := blockchain.LastBlock().Int64()
|
||||
lowerBound := upperBound - int64(windowSize)
|
||||
return ValidationWindow{lowerBound, upperBound}
|
||||
}
|
||||
|
||||
func MakeRange(min, max int64) []int64 {
|
||||
a := make([]int64, max-min)
|
||||
for i := range a {
|
||||
a[i] = min + int64(i)
|
||||
}
|
||||
return a
|
||||
}
|
@ -1,86 +0,0 @@
|
||||
package history_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
)
|
||||
|
||||
var _ = Describe("Blocks validator", func() {
|
||||
|
||||
It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 1},
|
||||
{Number: 2},
|
||||
{Number: 3},
|
||||
{Number: 4},
|
||||
{Number: 5},
|
||||
})
|
||||
|
||||
validationWindow := history.MakeValidationWindow(blockchain, 2)
|
||||
|
||||
Expect(validationWindow.LowerBound).To(Equal(int64(3)))
|
||||
Expect(validationWindow.UpperBound).To(Equal(int64(5)))
|
||||
})
|
||||
|
||||
It("returns the window size", func() {
|
||||
window := history.ValidationWindow{LowerBound: 1, UpperBound: 3}
|
||||
Expect(window.Size()).To(Equal(2))
|
||||
})
|
||||
|
||||
It("calls create or update for all blocks within the window", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 4},
|
||||
{Number: 5},
|
||||
{Number: 6},
|
||||
{Number: 7},
|
||||
})
|
||||
inMemoryDB := inmemory.NewInMemory()
|
||||
blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB}
|
||||
|
||||
validator := history.NewBlockValidator(blockchain, blocksRepository, 2)
|
||||
window := validator.ValidateBlocks()
|
||||
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))
|
||||
Expect(blocksRepository.BlockCount()).To(Equal(2))
|
||||
Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(2))
|
||||
})
|
||||
|
||||
It("logs window message", func() {
|
||||
inMemoryDB := inmemory.NewInMemory()
|
||||
blockRepository := &inmemory.BlockRepository{InMemory: inMemoryDB}
|
||||
|
||||
expectedMessage := &bytes.Buffer{}
|
||||
window := history.ValidationWindow{LowerBound: 5, UpperBound: 7}
|
||||
history.ParsedWindowTemplate.Execute(expectedMessage, history.ValidationWindow{LowerBound: 5, UpperBound: 7})
|
||||
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{})
|
||||
validator := history.NewBlockValidator(blockchain, blockRepository, 2)
|
||||
actualMessage := &bytes.Buffer{}
|
||||
validator.Log(actualMessage, window)
|
||||
Expect(actualMessage).To(Equal(expectedMessage))
|
||||
})
|
||||
|
||||
It("generates a range of int64s", func() {
|
||||
numberOfBlocksCreated := history.MakeRange(0, 5)
|
||||
expected := []int64{0, 1, 2, 3, 4}
|
||||
|
||||
Expect(numberOfBlocksCreated).To(Equal(expected))
|
||||
})
|
||||
|
||||
It("returns the number of largest block", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 1},
|
||||
{Number: 2},
|
||||
{Number: 3},
|
||||
})
|
||||
maxBlockNumber := blockchain.LastBlock()
|
||||
|
||||
Expect(maxBlockNumber.Int64()).To(Equal(int64(3)))
|
||||
})
|
||||
|
||||
})
|
41
pkg/history/validation_window.go
Normal file
41
pkg/history/validation_window.go
Normal file
@ -0,0 +1,41 @@
|
||||
package history
|
||||
|
||||
import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"io"
|
||||
"text/template"
|
||||
)
|
||||
|
||||
const WindowTemplate = `Validating Blocks
|
||||
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| ({{.UpperBound}}:HEAD)
|
||||
|
||||
`
|
||||
|
||||
var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTemplate))
|
||||
|
||||
type ValidationWindow struct {
|
||||
LowerBound int64
|
||||
UpperBound int64
|
||||
}
|
||||
|
||||
func (window ValidationWindow) Size() int {
|
||||
return int(window.UpperBound - window.LowerBound)
|
||||
}
|
||||
|
||||
func MakeValidationWindow(blockchain core.Blockchain, windowSize int) ValidationWindow {
|
||||
upperBound := blockchain.LastBlock().Int64()
|
||||
lowerBound := upperBound - int64(windowSize)
|
||||
return ValidationWindow{lowerBound, upperBound}
|
||||
}
|
||||
|
||||
func MakeRange(min, max int64) []int64 {
|
||||
a := make([]int64, max-min+1)
|
||||
for i := range a {
|
||||
a[i] = min + int64(i)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func (window ValidationWindow) Log(out io.Writer) {
|
||||
ParsedWindowTemplate.Execute(out, window)
|
||||
}
|
53
pkg/history/validation_window_test.go
Normal file
53
pkg/history/validation_window_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
package history_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||
)
|
||||
|
||||
var _ = Describe("", func() {
|
||||
It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() {
|
||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||
{Number: 1},
|
||||
{Number: 2},
|
||||
{Number: 3},
|
||||
{Number: 4},
|
||||
{Number: 5},
|
||||
})
|
||||
|
||||
validationWindow := history.MakeValidationWindow(blockchain, 2)
|
||||
|
||||
Expect(validationWindow.LowerBound).To(Equal(int64(3)))
|
||||
Expect(validationWindow.UpperBound).To(Equal(int64(5)))
|
||||
})
|
||||
|
||||
It("returns the window size", func() {
|
||||
window := history.ValidationWindow{LowerBound: 1, UpperBound: 3}
|
||||
|
||||
Expect(window.Size()).To(Equal(2))
|
||||
})
|
||||
|
||||
It("generates a range of int64s", func() {
|
||||
numberOfBlocksCreated := history.MakeRange(0, 5)
|
||||
expected := []int64{0, 1, 2, 3, 4, 5}
|
||||
|
||||
Expect(numberOfBlocksCreated).To(Equal(expected))
|
||||
})
|
||||
|
||||
It("logs window message", func() {
|
||||
expectedMessage := &bytes.Buffer{}
|
||||
window := history.ValidationWindow{LowerBound: 5, UpperBound: 7}
|
||||
history.ParsedWindowTemplate.Execute(expectedMessage, window)
|
||||
actualMessage := &bytes.Buffer{}
|
||||
|
||||
window.Log(actualMessage)
|
||||
|
||||
Expect(actualMessage).To(Equal(expectedMessage))
|
||||
})
|
||||
})
|
@ -62,12 +62,13 @@ func setABIPath() {
|
||||
|
||||
func NewTestDB(node core.Node) *postgres.DB {
|
||||
db, _ := postgres.NewDB(DBConfig, node)
|
||||
db.MustExec("DELETE FROM watched_contracts")
|
||||
db.MustExec("DELETE FROM transactions")
|
||||
db.MustExec("DELETE FROM blocks")
|
||||
db.MustExec("DELETE FROM headers")
|
||||
db.MustExec("DELETE FROM log_filters")
|
||||
db.MustExec("DELETE FROM logs")
|
||||
db.MustExec("DELETE FROM receipts")
|
||||
db.MustExec("DELETE FROM log_filters")
|
||||
db.MustExec("DELETE FROM transactions")
|
||||
db.MustExec("DELETE FROM watched_contracts")
|
||||
return db
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user