From cdc861e5f234895d2db51df753b10893a5a76493 Mon Sep 17 00:00:00 2001 From: Matt Krump Date: Tue, 7 Nov 2017 09:39:44 -0600 Subject: [PATCH 1/2] Update Config to use abs path if supplied --- pkg/config/config.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 74932df4..c2a5f100 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,7 +23,9 @@ func NewConfig(environment string) Config { filenameWithExtension := fmt.Sprintf("%s.toml", environment) absolutePath := filepath.Join(ProjectRoot(), "pkg", "config", "environments", filenameWithExtension) config := parseConfigFile(absolutePath) - config.Client.IPCPath = filepath.Join(ProjectRoot(), config.Client.IPCPath) + if !filepath.IsAbs(config.Client.IPCPath) { + config.Client.IPCPath = filepath.Join(ProjectRoot(), config.Client.IPCPath) + } return config } From 4c84173bc0af8f4d64bd9a80c10798260aecf353 Mon Sep 17 00:00:00 2001 From: Eric Meyer Date: Mon, 6 Nov 2017 14:36:12 -0600 Subject: [PATCH 2/2] Add ability to populate missing blocks * The command populates up to the highest known block number * The anticipated use case is that the listener will be running in parallel to the populateBlocks command * This will mean that the listener is responsible for picking up new blocks, and the populateBlocks command is reposible for historical blocks * Reformat SQL statements --- Gododir/main.go | 10 +++ README.md | 6 ++ cmd/populate_blocks/main.go | 32 +++++++++ integration_test/geth_blockchain_test.go | 13 +++- .../blockchain_listener_test.go | 8 +-- pkg/core/blockchain.go | 1 + pkg/fakes/blockchain.go | 30 ++++++-- pkg/geth/geth_blockchain.go | 11 ++- pkg/history/history_suite_test.go | 13 ++++ pkg/history/populate_blocks.go | 15 ++++ pkg/history/populate_blocks_test.go | 69 +++++++++++++++++++ pkg/repositories/in_memory.go | 20 ++++++ pkg/repositories/postgres.go | 35 ++++++++-- pkg/repositories/repository.go | 2 + pkg/repositories/repository_test.go | 69 +++++++++++++++++++ 15 files changed, 315 insertions(+), 19 deletions(-) create mode 100644 cmd/populate_blocks/main.go create mode 100644 pkg/history/history_suite_test.go create mode 100644 pkg/history/populate_blocks.go create mode 100644 pkg/history/populate_blocks_test.go diff --git a/Gododir/main.go b/Gododir/main.go index fece563e..b826f067 100644 --- a/Gododir/main.go +++ b/Gododir/main.go @@ -25,6 +25,16 @@ func tasks(p *do.Project) { do.M{"environment": environment, "$in": "cmd/run"}) }) + p.Task("populateBlocks", nil, func(context *do.Context) { + environment := parseEnvironment(context) + startingNumber := context.Args.MayInt(-1, "starting-number") + if startingNumber < 0 { + log.Fatalln("--starting-number required") + } + context.Start(`go run main.go --environment={{.environment}} --starting-number={{.startingNumber}}`, + do.M{"environment": environment, "startingNumber": startingNumber, "$in": "cmd/populate_blocks"}) + }) + p.Task("migrate", nil, func(context *do.Context) { environment := parseEnvironment(context) cfg := config.NewConfig(environment) diff --git a/README.md b/README.md index c346f756..b5d9ce04 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,12 @@ The default location for Ethereum is: 1. Start a blockchain. 2. In a separate terminal start listener (ipcDir location) - `godo run -- --environment=` + +## Retrieving Historical Data + +1. Start a blockchain. +2. In a separate terminal start listener (ipcDir location) + - `godo populateBlocks -- --environment= --starting-number=` ### Configuring Additional Environments diff --git a/cmd/populate_blocks/main.go b/cmd/populate_blocks/main.go new file mode 100644 index 00000000..37d38699 --- /dev/null +++ b/cmd/populate_blocks/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "flag" + + "log" + + "fmt" + + "github.com/8thlight/vulcanizedb/pkg/config" + "github.com/8thlight/vulcanizedb/pkg/geth" + "github.com/8thlight/vulcanizedb/pkg/history" + "github.com/8thlight/vulcanizedb/pkg/repositories" + "github.com/jmoiron/sqlx" +) + +func main() { + environment := flag.String("environment", "", "Environment name") + startingBlockNumber := flag.Int("starting-number", -1, "First block to fill from") + flag.Parse() + cfg := config.NewConfig(*environment) + + blockchain := geth.NewGethBlockchain(cfg.Client.IPCPath) + connectString := config.DbConnectionString(cfg.Database) + db, err := sqlx.Connect("postgres", connectString) + if err != nil { + log.Fatalf("Error connecting to DB: %v\n", err) + } + repository := repositories.NewPostgres(db) + numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, int64(*startingBlockNumber)) + fmt.Printf("Populated %d blocks", numberOfBlocksCreated) +} diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index a360bbeb..e1d2979b 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -14,11 +14,12 @@ var _ = Describe("Reading from the Geth blockchain", func() { var listener blockchain_listener.BlockchainListener var observer *fakes.BlockchainObserver + var blockchain *geth.GethBlockchain BeforeEach(func() { observer = fakes.NewFakeBlockchainObserver() cfg := config.NewConfig("private") - blockchain := geth.NewGethBlockchain(cfg.Client.IPCPath) + blockchain = geth.NewGethBlockchain(cfg.Client.IPCPath) observers := []core.BlockchainObserver{observer} listener = blockchain_listener.NewBlockchainListener(blockchain, observers) }) @@ -43,4 +44,14 @@ var _ = Describe("Reading from the Geth blockchain", func() { close(done) }, 10) + It("retrieves the genesis block and first block", func(done Done) { + genesisBlock := blockchain.GetBlockByNumber(int64(0)) + firstBlock := blockchain.GetBlockByNumber(int64(1)) + + Expect(genesisBlock.Number).To(Equal(int64(0))) + Expect(firstBlock.Number).To(Equal(int64(1))) + + close(done) + }, 10) + }) diff --git a/pkg/blockchain_listener/blockchain_listener_test.go b/pkg/blockchain_listener/blockchain_listener_test.go index 0462518c..d7ef40c1 100644 --- a/pkg/blockchain_listener/blockchain_listener_test.go +++ b/pkg/blockchain_listener/blockchain_listener_test.go @@ -12,7 +12,7 @@ var _ = Describe("Blockchain listeners", func() { It("starts with no blocks", func(done Done) { observer := fakes.NewFakeBlockchainObserver() - blockchain := &fakes.Blockchain{} + blockchain := fakes.NewBlockchain() blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) @@ -22,7 +22,7 @@ var _ = Describe("Blockchain listeners", func() { It("sees when one block was added", func(done Done) { observer := fakes.NewFakeBlockchainObserver() - blockchain := &fakes.Blockchain{} + blockchain := fakes.NewBlockchain() listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) go listener.Start() @@ -38,7 +38,7 @@ var _ = Describe("Blockchain listeners", func() { It("sees a second block", func(done Done) { observer := fakes.NewFakeBlockchainObserver() - blockchain := &fakes.Blockchain{} + blockchain := fakes.NewBlockchain() listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) go listener.Start() @@ -56,7 +56,7 @@ var _ = Describe("Blockchain listeners", func() { It("stops listening", func(done Done) { observer := fakes.NewFakeBlockchainObserver() - blockchain := &fakes.Blockchain{} + blockchain := fakes.NewBlockchain() listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) go listener.Start() diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 7cc99c7e..3c140426 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1,6 +1,7 @@ package core type Blockchain interface { + GetBlockByNumber(blockNumber int64) Block SubscribeToBlocks(blocks chan Block) StartListening() StopListening() diff --git a/pkg/fakes/blockchain.go b/pkg/fakes/blockchain.go index 4e04aa53..b7b7c4f8 100644 --- a/pkg/fakes/blockchain.go +++ b/pkg/fakes/blockchain.go @@ -3,16 +3,36 @@ package fakes import "github.com/8thlight/vulcanizedb/pkg/core" type Blockchain struct { - outputBlocks chan core.Block + blocks map[int64]core.Block + blocksChannel chan core.Block WasToldToStop bool } -func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) { - blockchain.outputBlocks = outputBlocks +func NewBlockchain() *Blockchain { + return &Blockchain{blocks: make(map[int64]core.Block)} } -func (blockchain Blockchain) AddBlock(block core.Block) { - blockchain.outputBlocks <- block +func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain { + blockNumberToBlocks := make(map[int64]core.Block) + for _, block := range blocks { + blockNumberToBlocks[block.Number] = block + } + return &Blockchain{ + blocks: blockNumberToBlocks, + } +} + +func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block { + return blockchain.blocks[blockNumber] +} + +func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) { + blockchain.blocksChannel = outputBlocks +} + +func (blockchain *Blockchain) AddBlock(block core.Block) { + blockchain.blocks[block.Number] = block + blockchain.blocksChannel <- block } func (*Blockchain) StartListening() {} diff --git a/pkg/geth/geth_blockchain.go b/pkg/geth/geth_blockchain.go index 95e72c82..177efe34 100644 --- a/pkg/geth/geth_blockchain.go +++ b/pkg/geth/geth_blockchain.go @@ -3,6 +3,8 @@ package geth import ( "fmt" + "math/big" + "github.com/8thlight/vulcanizedb/pkg/core" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" @@ -17,6 +19,11 @@ type GethBlockchain struct { newHeadSubscription ethereum.Subscription } +func (blockchain *GethBlockchain) GetBlockByNumber(blockNumber int64) core.Block { + gethBlock, _ := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber)) + return GethBlockToCoreBlock(gethBlock) +} + func NewGethBlockchain(ipcPath string) *GethBlockchain { fmt.Printf("Creating Geth Blockchain to: %s\n", ipcPath) blockchain := GethBlockchain{} @@ -36,10 +43,8 @@ func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) { } func (blockchain *GethBlockchain) StartListening() { - myContext := context.Background() for header := range blockchain.readGethHeaders { - gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number) - block := GethBlockToCoreBlock(gethBlock) + block := blockchain.GetBlockByNumber(header.Number.Int64()) blockchain.outputBlocks <- block } } diff --git a/pkg/history/history_suite_test.go b/pkg/history/history_suite_test.go new file mode 100644 index 00000000..98ce5532 --- /dev/null +++ b/pkg/history/history_suite_test.go @@ -0,0 +1,13 @@ +package history_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestHistory(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "History Suite") +} diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go new file mode 100644 index 00000000..c3cc7c0e --- /dev/null +++ b/pkg/history/populate_blocks.go @@ -0,0 +1,15 @@ +package history + +import ( + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/repositories" +) + +func PopulateBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int { + blockNumbers := repository.MissingBlockNumbers(startingBlockNumber, repository.MaxBlockNumber()) + for _, blockNumber := range blockNumbers { + block := blockchain.GetBlockByNumber(blockNumber) + repository.CreateBlock(block) + } + return len(blockNumbers) +} diff --git a/pkg/history/populate_blocks_test.go b/pkg/history/populate_blocks_test.go new file mode 100644 index 00000000..1b3c772b --- /dev/null +++ b/pkg/history/populate_blocks_test.go @@ -0,0 +1,69 @@ +package history_test + +import ( + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/fakes" + "github.com/8thlight/vulcanizedb/pkg/history" + "github.com/8thlight/vulcanizedb/pkg/repositories" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Populating blocks", func() { + + It("fills in the only missing block", func() { + blocks := []core.Block{{Number: 1, Hash: "x012343"}} + blockchain := fakes.NewBlockchainWithBlocks(blocks) + repository := repositories.NewInMemory() + repository.CreateBlock(core.Block{Number: 2}) + + history.PopulateBlocks(blockchain, repository, 1) + + block := repository.FindBlockByNumber(1) + Expect(block).NotTo(BeNil()) + Expect(block.Hash).To(Equal("x012343")) + }) + + It("fills in two missing blocks", func() { + blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ + {Number: 4}, + {Number: 5}, + {Number: 8}, + {Number: 10}, + {Number: 13}, + }) + repository := repositories.NewInMemory() + repository.CreateBlock(core.Block{Number: 1}) + repository.CreateBlock(core.Block{Number: 2}) + repository.CreateBlock(core.Block{Number: 3}) + repository.CreateBlock(core.Block{Number: 6}) + repository.CreateBlock(core.Block{Number: 7}) + repository.CreateBlock(core.Block{Number: 9}) + repository.CreateBlock(core.Block{Number: 11}) + repository.CreateBlock(core.Block{Number: 12}) + + history.PopulateBlocks(blockchain, repository, 5) + + Expect(repository.BlockCount()).To(Equal(11)) + Expect(repository.FindBlockByNumber(4)).To(BeNil()) + Expect(repository.FindBlockByNumber(5)).NotTo(BeNil()) + Expect(repository.FindBlockByNumber(8)).NotTo(BeNil()) + Expect(repository.FindBlockByNumber(10)).NotTo(BeNil()) + Expect(repository.FindBlockByNumber(13)).To(BeNil()) + }) + + It("returns the number of blocks created", func() { + blockchain := fakes.NewBlockchainWithBlocks([]core.Block{ + {Number: 4}, + {Number: 5}, + }) + repository := repositories.NewInMemory() + repository.CreateBlock(core.Block{Number: 3}) + repository.CreateBlock(core.Block{Number: 6}) + + numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, 3) + + Expect(numberOfBlocksCreated).To(Equal(2)) + }) + +}) diff --git a/pkg/repositories/in_memory.go b/pkg/repositories/in_memory.go index 15fdaf4d..99343784 100644 --- a/pkg/repositories/in_memory.go +++ b/pkg/repositories/in_memory.go @@ -8,6 +8,16 @@ type InMemory struct { blocks map[int64]*core.Block } +func (repository *InMemory) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { + missingNumbers := []int64{} + for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { + if repository.blocks[blockNumber] == nil { + missingNumbers = append(missingNumbers, blockNumber) + } + } + return missingNumbers +} + func NewInMemory() *InMemory { return &InMemory{ blocks: make(map[int64]*core.Block), @@ -25,3 +35,13 @@ func (repository *InMemory) BlockCount() int { func (repository *InMemory) FindBlockByNumber(blockNumber int64) *core.Block { return repository.blocks[blockNumber] } + +func (repository *InMemory) MaxBlockNumber() int64 { + highestBlockNumber := int64(-1) + for key := range repository.blocks { + if key > highestBlockNumber { + highestBlockNumber = key + } + } + return highestBlockNumber +} diff --git a/pkg/repositories/postgres.go b/pkg/repositories/postgres.go index fc3cbde9..5672c99d 100644 --- a/pkg/repositories/postgres.go +++ b/pkg/repositories/postgres.go @@ -12,12 +12,33 @@ type Postgres struct { Db *sqlx.DB } +func (repository Postgres) MaxBlockNumber() int64 { + var highestBlockNumber int64 + repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`) + return highestBlockNumber +} + +func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 { + numbers := []int64{} + repository.Db.Select(&numbers, + `SELECT all_block_numbers + FROM ( + SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series + LEFT JOIN blocks + ON block_number = all_block_numbers + WHERE block_number ISNULL`, + startingBlockNumber, + highestBlockNumber) + return numbers +} + func NewPostgres(db *sqlx.DB) Postgres { return Postgres{Db: db} } func (repository Postgres) FindBlockByNumber(blockNumber int64) *core.Block { - blockRows, _ := repository.Db.Query("SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash FROM blocks") + blockRows, _ := repository.Db.Query( + `SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash FROM blocks`) var savedBlocks []core.Block for blockRows.Next() { savedBlock := repository.loadBlock(blockRows) @@ -38,9 +59,9 @@ func (repository Postgres) BlockCount() int { func (repository Postgres) CreateBlock(block core.Block) { insertedBlock := repository.Db.QueryRow( - "Insert INTO blocks "+ - "(block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash) "+ - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id", + `INSERT INTO blocks + (block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id`, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash) var blockId int64 insertedBlock.Scan(&blockId) @@ -49,8 +70,10 @@ func (repository Postgres) CreateBlock(block core.Block) { func (repository Postgres) createTransactions(blockId int64, transactions []core.Transaction) { for _, transaction := range transactions { - repository.Db.MustExec("Insert INTO transactions "+ - "(block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value) VALUES ($1, $2, $3, $4, $5, $6, $7)", + repository.Db.MustExec( + `INSERT INTO transactions + (block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value) + VALUES ($1, $2, $3, $4, $5, $6, $7)`, blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.GasLimit, transaction.GasPrice, transaction.Value) } } diff --git a/pkg/repositories/repository.go b/pkg/repositories/repository.go index aace5bf1..15d88f76 100644 --- a/pkg/repositories/repository.go +++ b/pkg/repositories/repository.go @@ -6,4 +6,6 @@ type Repository interface { CreateBlock(block core.Block) BlockCount() int FindBlockByNumber(blockNumber int64) *core.Block + MaxBlockNumber() int64 + MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 } diff --git a/pkg/repositories/repository_test.go b/pkg/repositories/repository_test.go index ca6015f1..9167ad8d 100644 --- a/pkg/repositories/repository_test.go +++ b/pkg/repositories/repository_test.go @@ -134,6 +134,75 @@ var _ = Describe("Repositories", func() { Expect(savedTransaction.Value).To(Equal(value)) }) }) + + Describe("The missing block numbers", func() { + It("is empty the starting block number is the highest known block number", func() { + repository.CreateBlock(core.Block{Number: 1}) + + Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0)) + }) + + It("is the only missing block number", func() { + repository.CreateBlock(core.Block{Number: 2}) + + Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1})) + }) + + It("is both missing block numbers", func() { + repository.CreateBlock(core.Block{Number: 3}) + + Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2})) + }) + + It("goes back to the starting block number", func() { + repository.CreateBlock(core.Block{Number: 6}) + + Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5})) + }) + + It("only includes missing block numbers", func() { + repository.CreateBlock(core.Block{Number: 4}) + repository.CreateBlock(core.Block{Number: 6}) + + Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5})) + }) + + It("is a list with multiple gaps", func() { + repository.CreateBlock(core.Block{Number: 4}) + repository.CreateBlock(core.Block{Number: 5}) + repository.CreateBlock(core.Block{Number: 8}) + repository.CreateBlock(core.Block{Number: 10}) + + Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9})) + }) + + It("returns empty array when lower bound exceeds upper bound", func() { + Expect(repository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{})) + }) + + It("only returns requested range even when other gaps exist", func() { + repository.CreateBlock(core.Block{Number: 3}) + repository.CreateBlock(core.Block{Number: 8}) + + Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5})) + }) + + }) + + Describe("The max block numbers", func() { + It("returns the block number when a single block", func() { + repository.CreateBlock(core.Block{Number: 1}) + + Expect(repository.MaxBlockNumber()).To(Equal(int64(1))) + }) + + It("returns highest known block number when multiple blocks", func() { + repository.CreateBlock(core.Block{Number: 1}) + repository.CreateBlock(core.Block{Number: 10}) + + Expect(repository.MaxBlockNumber()).To(Equal(int64(10))) + }) + }) } Describe("In memory repository", func() {