Add ability to populate missing blocks

* The command populates up to the highest known block number
 * The anticipated use case is that the listener will be running
   in parallel to the populateBlocks command
    * This will mean that the listener is responsible for picking up
      new blocks, and the populateBlocks command is reposible for
      historical blocks
 * Reformat SQL statements
This commit is contained in:
Eric Meyer 2017-11-06 14:36:12 -06:00
parent cdc861e5f2
commit 4c84173bc0
15 changed files with 315 additions and 19 deletions

View File

@ -25,6 +25,16 @@ func tasks(p *do.Project) {
do.M{"environment": environment, "$in": "cmd/run"})
})
p.Task("populateBlocks", nil, func(context *do.Context) {
environment := parseEnvironment(context)
startingNumber := context.Args.MayInt(-1, "starting-number")
if startingNumber < 0 {
log.Fatalln("--starting-number required")
}
context.Start(`go run main.go --environment={{.environment}} --starting-number={{.startingNumber}}`,
do.M{"environment": environment, "startingNumber": startingNumber, "$in": "cmd/populate_blocks"})
})
p.Task("migrate", nil, func(context *do.Context) {
environment := parseEnvironment(context)
cfg := config.NewConfig(environment)

View File

@ -65,6 +65,12 @@ The default location for Ethereum is:
1. Start a blockchain.
2. In a separate terminal start listener (ipcDir location)
- `godo run -- --environment=<some-environment>`
## Retrieving Historical Data
1. Start a blockchain.
2. In a separate terminal start listener (ipcDir location)
- `godo populateBlocks -- --environment=<some-environment> --starting-number=<starting-block-number>`
### Configuring Additional Environments

View File

@ -0,0 +1,32 @@
package main
import (
"flag"
"log"
"fmt"
"github.com/8thlight/vulcanizedb/pkg/config"
"github.com/8thlight/vulcanizedb/pkg/geth"
"github.com/8thlight/vulcanizedb/pkg/history"
"github.com/8thlight/vulcanizedb/pkg/repositories"
"github.com/jmoiron/sqlx"
)
func main() {
environment := flag.String("environment", "", "Environment name")
startingBlockNumber := flag.Int("starting-number", -1, "First block to fill from")
flag.Parse()
cfg := config.NewConfig(*environment)
blockchain := geth.NewGethBlockchain(cfg.Client.IPCPath)
connectString := config.DbConnectionString(cfg.Database)
db, err := sqlx.Connect("postgres", connectString)
if err != nil {
log.Fatalf("Error connecting to DB: %v\n", err)
}
repository := repositories.NewPostgres(db)
numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, int64(*startingBlockNumber))
fmt.Printf("Populated %d blocks", numberOfBlocksCreated)
}

View File

@ -14,11 +14,12 @@ var _ = Describe("Reading from the Geth blockchain", func() {
var listener blockchain_listener.BlockchainListener
var observer *fakes.BlockchainObserver
var blockchain *geth.GethBlockchain
BeforeEach(func() {
observer = fakes.NewFakeBlockchainObserver()
cfg := config.NewConfig("private")
blockchain := geth.NewGethBlockchain(cfg.Client.IPCPath)
blockchain = geth.NewGethBlockchain(cfg.Client.IPCPath)
observers := []core.BlockchainObserver{observer}
listener = blockchain_listener.NewBlockchainListener(blockchain, observers)
})
@ -43,4 +44,14 @@ var _ = Describe("Reading from the Geth blockchain", func() {
close(done)
}, 10)
It("retrieves the genesis block and first block", func(done Done) {
genesisBlock := blockchain.GetBlockByNumber(int64(0))
firstBlock := blockchain.GetBlockByNumber(int64(1))
Expect(genesisBlock.Number).To(Equal(int64(0)))
Expect(firstBlock.Number).To(Equal(int64(1)))
close(done)
}, 10)
})

View File

@ -12,7 +12,7 @@ var _ = Describe("Blockchain listeners", func() {
It("starts with no blocks", func(done Done) {
observer := fakes.NewFakeBlockchainObserver()
blockchain := &fakes.Blockchain{}
blockchain := fakes.NewBlockchain()
blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
@ -22,7 +22,7 @@ var _ = Describe("Blockchain listeners", func() {
It("sees when one block was added", func(done Done) {
observer := fakes.NewFakeBlockchainObserver()
blockchain := &fakes.Blockchain{}
blockchain := fakes.NewBlockchain()
listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()
@ -38,7 +38,7 @@ var _ = Describe("Blockchain listeners", func() {
It("sees a second block", func(done Done) {
observer := fakes.NewFakeBlockchainObserver()
blockchain := &fakes.Blockchain{}
blockchain := fakes.NewBlockchain()
listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()
@ -56,7 +56,7 @@ var _ = Describe("Blockchain listeners", func() {
It("stops listening", func(done Done) {
observer := fakes.NewFakeBlockchainObserver()
blockchain := &fakes.Blockchain{}
blockchain := fakes.NewBlockchain()
listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()

View File

@ -1,6 +1,7 @@
package core
type Blockchain interface {
GetBlockByNumber(blockNumber int64) Block
SubscribeToBlocks(blocks chan Block)
StartListening()
StopListening()

View File

@ -3,16 +3,36 @@ package fakes
import "github.com/8thlight/vulcanizedb/pkg/core"
type Blockchain struct {
outputBlocks chan core.Block
blocks map[int64]core.Block
blocksChannel chan core.Block
WasToldToStop bool
}
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
blockchain.outputBlocks = outputBlocks
func NewBlockchain() *Blockchain {
return &Blockchain{blocks: make(map[int64]core.Block)}
}
func (blockchain Blockchain) AddBlock(block core.Block) {
blockchain.outputBlocks <- block
func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain {
blockNumberToBlocks := make(map[int64]core.Block)
for _, block := range blocks {
blockNumberToBlocks[block.Number] = block
}
return &Blockchain{
blocks: blockNumberToBlocks,
}
}
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block {
return blockchain.blocks[blockNumber]
}
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
blockchain.blocksChannel = outputBlocks
}
func (blockchain *Blockchain) AddBlock(block core.Block) {
blockchain.blocks[block.Number] = block
blockchain.blocksChannel <- block
}
func (*Blockchain) StartListening() {}

View File

@ -3,6 +3,8 @@ package geth
import (
"fmt"
"math/big"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
@ -17,6 +19,11 @@ type GethBlockchain struct {
newHeadSubscription ethereum.Subscription
}
func (blockchain *GethBlockchain) GetBlockByNumber(blockNumber int64) core.Block {
gethBlock, _ := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber))
return GethBlockToCoreBlock(gethBlock)
}
func NewGethBlockchain(ipcPath string) *GethBlockchain {
fmt.Printf("Creating Geth Blockchain to: %s\n", ipcPath)
blockchain := GethBlockchain{}
@ -36,10 +43,8 @@ func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) {
}
func (blockchain *GethBlockchain) StartListening() {
myContext := context.Background()
for header := range blockchain.readGethHeaders {
gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number)
block := GethBlockToCoreBlock(gethBlock)
block := blockchain.GetBlockByNumber(header.Number.Int64())
blockchain.outputBlocks <- block
}
}

View File

@ -0,0 +1,13 @@
package history_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)
func TestHistory(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "History Suite")
}

View File

@ -0,0 +1,15 @@
package history
import (
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/repositories"
)
func PopulateBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int {
blockNumbers := repository.MissingBlockNumbers(startingBlockNumber, repository.MaxBlockNumber())
for _, blockNumber := range blockNumbers {
block := blockchain.GetBlockByNumber(blockNumber)
repository.CreateBlock(block)
}
return len(blockNumbers)
}

View File

@ -0,0 +1,69 @@
package history_test
import (
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/fakes"
"github.com/8thlight/vulcanizedb/pkg/history"
"github.com/8thlight/vulcanizedb/pkg/repositories"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Populating blocks", func() {
It("fills in the only missing block", func() {
blocks := []core.Block{{Number: 1, Hash: "x012343"}}
blockchain := fakes.NewBlockchainWithBlocks(blocks)
repository := repositories.NewInMemory()
repository.CreateBlock(core.Block{Number: 2})
history.PopulateBlocks(blockchain, repository, 1)
block := repository.FindBlockByNumber(1)
Expect(block).NotTo(BeNil())
Expect(block.Hash).To(Equal("x012343"))
})
It("fills in two missing blocks", func() {
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
{Number: 4},
{Number: 5},
{Number: 8},
{Number: 10},
{Number: 13},
})
repository := repositories.NewInMemory()
repository.CreateBlock(core.Block{Number: 1})
repository.CreateBlock(core.Block{Number: 2})
repository.CreateBlock(core.Block{Number: 3})
repository.CreateBlock(core.Block{Number: 6})
repository.CreateBlock(core.Block{Number: 7})
repository.CreateBlock(core.Block{Number: 9})
repository.CreateBlock(core.Block{Number: 11})
repository.CreateBlock(core.Block{Number: 12})
history.PopulateBlocks(blockchain, repository, 5)
Expect(repository.BlockCount()).To(Equal(11))
Expect(repository.FindBlockByNumber(4)).To(BeNil())
Expect(repository.FindBlockByNumber(5)).NotTo(BeNil())
Expect(repository.FindBlockByNumber(8)).NotTo(BeNil())
Expect(repository.FindBlockByNumber(10)).NotTo(BeNil())
Expect(repository.FindBlockByNumber(13)).To(BeNil())
})
It("returns the number of blocks created", func() {
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
{Number: 4},
{Number: 5},
})
repository := repositories.NewInMemory()
repository.CreateBlock(core.Block{Number: 3})
repository.CreateBlock(core.Block{Number: 6})
numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, 3)
Expect(numberOfBlocksCreated).To(Equal(2))
})
})

View File

@ -8,6 +8,16 @@ type InMemory struct {
blocks map[int64]*core.Block
}
func (repository *InMemory) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 {
missingNumbers := []int64{}
for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ {
if repository.blocks[blockNumber] == nil {
missingNumbers = append(missingNumbers, blockNumber)
}
}
return missingNumbers
}
func NewInMemory() *InMemory {
return &InMemory{
blocks: make(map[int64]*core.Block),
@ -25,3 +35,13 @@ func (repository *InMemory) BlockCount() int {
func (repository *InMemory) FindBlockByNumber(blockNumber int64) *core.Block {
return repository.blocks[blockNumber]
}
func (repository *InMemory) MaxBlockNumber() int64 {
highestBlockNumber := int64(-1)
for key := range repository.blocks {
if key > highestBlockNumber {
highestBlockNumber = key
}
}
return highestBlockNumber
}

View File

@ -12,12 +12,33 @@ type Postgres struct {
Db *sqlx.DB
}
func (repository Postgres) MaxBlockNumber() int64 {
var highestBlockNumber int64
repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`)
return highestBlockNumber
}
func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 {
numbers := []int64{}
repository.Db.Select(&numbers,
`SELECT all_block_numbers
FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
LEFT JOIN blocks
ON block_number = all_block_numbers
WHERE block_number ISNULL`,
startingBlockNumber,
highestBlockNumber)
return numbers
}
func NewPostgres(db *sqlx.DB) Postgres {
return Postgres{Db: db}
}
func (repository Postgres) FindBlockByNumber(blockNumber int64) *core.Block {
blockRows, _ := repository.Db.Query("SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash FROM blocks")
blockRows, _ := repository.Db.Query(
`SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash FROM blocks`)
var savedBlocks []core.Block
for blockRows.Next() {
savedBlock := repository.loadBlock(blockRows)
@ -38,9 +59,9 @@ func (repository Postgres) BlockCount() int {
func (repository Postgres) CreateBlock(block core.Block) {
insertedBlock := repository.Db.QueryRow(
"Insert INTO blocks "+
"(block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash) "+
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id",
`INSERT INTO blocks
(block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id`,
block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash)
var blockId int64
insertedBlock.Scan(&blockId)
@ -49,8 +70,10 @@ func (repository Postgres) CreateBlock(block core.Block) {
func (repository Postgres) createTransactions(blockId int64, transactions []core.Transaction) {
for _, transaction := range transactions {
repository.Db.MustExec("Insert INTO transactions "+
"(block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value) VALUES ($1, $2, $3, $4, $5, $6, $7)",
repository.Db.MustExec(
`INSERT INTO transactions
(block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.GasLimit, transaction.GasPrice, transaction.Value)
}
}

View File

@ -6,4 +6,6 @@ type Repository interface {
CreateBlock(block core.Block)
BlockCount() int
FindBlockByNumber(blockNumber int64) *core.Block
MaxBlockNumber() int64
MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64
}

View File

@ -134,6 +134,75 @@ var _ = Describe("Repositories", func() {
Expect(savedTransaction.Value).To(Equal(value))
})
})
Describe("The missing block numbers", func() {
It("is empty the starting block number is the highest known block number", func() {
repository.CreateBlock(core.Block{Number: 1})
Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0))
})
It("is the only missing block number", func() {
repository.CreateBlock(core.Block{Number: 2})
Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1}))
})
It("is both missing block numbers", func() {
repository.CreateBlock(core.Block{Number: 3})
Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2}))
})
It("goes back to the starting block number", func() {
repository.CreateBlock(core.Block{Number: 6})
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5}))
})
It("only includes missing block numbers", func() {
repository.CreateBlock(core.Block{Number: 4})
repository.CreateBlock(core.Block{Number: 6})
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5}))
})
It("is a list with multiple gaps", func() {
repository.CreateBlock(core.Block{Number: 4})
repository.CreateBlock(core.Block{Number: 5})
repository.CreateBlock(core.Block{Number: 8})
repository.CreateBlock(core.Block{Number: 10})
Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9}))
})
It("returns empty array when lower bound exceeds upper bound", func() {
Expect(repository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{}))
})
It("only returns requested range even when other gaps exist", func() {
repository.CreateBlock(core.Block{Number: 3})
repository.CreateBlock(core.Block{Number: 8})
Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5}))
})
})
Describe("The max block numbers", func() {
It("returns the block number when a single block", func() {
repository.CreateBlock(core.Block{Number: 1})
Expect(repository.MaxBlockNumber()).To(Equal(int64(1)))
})
It("returns highest known block number when multiple blocks", func() {
repository.CreateBlock(core.Block{Number: 1})
repository.CreateBlock(core.Block{Number: 10})
Expect(repository.MaxBlockNumber()).To(Equal(int64(10)))
})
})
}
Describe("In memory repository", func() {