Block categorization (#110)

* Add block categorization (is_final=)

* Add godo task for vulcanizeDB (Example of how everything could work together)

* Add unique constraint on block_number and node

* Add index on block_id for transactions_table

* Add node_id index on blocks table

* Sort transactions returned from FindBlock by tx_hash

* lowercase tx_to, tx_from like etherscan
This commit is contained in:
Matt K 2017-12-20 14:06:22 -06:00 committed by GitHub
parent 266c9587c8
commit 24bc83a448
26 changed files with 239 additions and 86 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@ test_data_dir/
vendor/
contracts/*
environments/*.toml
Vagrantfile
vagrant_bootstrap.sh
.vagrant

View File

@ -26,6 +26,12 @@ func tasks(p *do.Project) {
do.M{"environment": environment, "$in": "cmd/run"})
})
p.Task("vulcanizeDb", nil, func(context *do.Context) {
environment := parseEnvironment(context)
context.Start(`go run main.go --environment={{.environment}}`,
do.M{"environment": environment, "$in": "cmd/vulcanize_db"})
})
p.Task("populateBlocks", nil, func(context *do.Context) {
environment := parseEnvironment(context)
startingNumber := context.Args.MayInt(-1, "starting-number")

View File

@ -11,18 +11,17 @@
- `go get -u github.com/golang/dep/cmd/dep`
- https://github.com/go-godo/godo
- `go get -u gopkg.in/godo.v2/cmd/godo`
- Postgres 10
- Postgres 10
- Go Ethereum
- https://ethereum.github.io/go-ethereum/downloads/
### Cloning the Repository
1. `git config --global url."git@github.com:".insteadOf "https://github.com/"`
- By default, `go get` does not work for private GitHub repos. This will fix that.
2. `go get github.com/8thlight/vulcanizedb`
3. `go get github.com/ethereum/go-ethereum`
- This will take a while and gives poor indication of progress.
4. `go install github.com/ethereum/go-ethereum/cmd/geth`
5. `cd $GOPATH/src/github.com/8thlight/vulcanizedb`
6. `dep ensure`
3. `cd $GOPATH/src/github.com/8thlight/vulcanizedb`
4. `dep ensure`
### Setting up the Databases
@ -60,6 +59,11 @@ The default location for Ethereum is:
**Note the location of the ipc file is outputted when you connect to a blockchain. It is needed to for configuration**
## Start Vulcanize DB
1. Start a blockchain.
2. In a separate terminal start vulcanize_db
- `godo vulcanizeDb -- --environment=<some-environment>`
## Running Listener
1. Start a blockchain.
@ -102,13 +106,13 @@ You can create configuration files for additional environments.
## Running the Tests
### Integration Test
In order to run the integration tests, you will need to run them against a real blockchain.
1. Run `./scripts/start_private_blockchain` as a separate process.
2. `go test ./...` to run all tests.
### Unit Tests
1. `go test ./pkg/...`
### Integration Test
In order to run the integration tests, you will need to run them against a real blockchain. At the moment the integration tests require [Geth v1.7.2](https://ethereum.github.io/go-ethereum/downloads/) as they depend on the `--dev` mode, which changed in v1.7.3
1. Run `./scripts/start_private_blockchain` as a separate process.
2. `go test ./...` to run all tests.

View File

@ -1,47 +0,0 @@
package main
import (
"flag"
"time"
"os"
"text/template"
"github.com/8thlight/vulcanizedb/cmd"
"github.com/8thlight/vulcanizedb/pkg/blockchain_listener"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/geth"
"github.com/8thlight/vulcanizedb/pkg/history"
"github.com/8thlight/vulcanizedb/pkg/observers"
)
const windowTemplate = `Validating Existing Blocks
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| {{.MaxBlockNumber}}(HEAD)
`
func main() {
environment := flag.String("environment", "", "Environment name")
flag.Parse()
config := cmd.LoadConfig(*environment)
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
listener := blockchain_listener.NewBlockchainListener(
blockchain,
[]core.BlockchainObserver{
observers.BlockchainLoggingObserver{},
observers.NewBlockchainDbObserver(repository),
},
)
go listener.Start()
windowSize := 10
ticker := time.NewTicker(10 * time.Second)
t := template.Must(template.New("window").Parse(windowTemplate))
for _ = range ticker.C {
window := history.UpdateBlocksWindow(blockchain, repository, windowSize)
t.Execute(os.Stdout, window)
}
ticker.Stop()
}

76
cmd/vulcanize_db/main.go Normal file
View File

@ -0,0 +1,76 @@
package main
import (
"flag"
"time"
"os"
"text/template"
"github.com/8thlight/vulcanizedb/cmd"
"github.com/8thlight/vulcanizedb/pkg/blockchain_listener"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/geth"
"github.com/8thlight/vulcanizedb/pkg/history"
"github.com/8thlight/vulcanizedb/pkg/observers"
"github.com/8thlight/vulcanizedb/pkg/repositories"
)
const windowTemplate = `Validating Existing Blocks
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| {{.MaxBlockNumber}}(HEAD)
`
const (
windowSize = 24
pollingInterval = 10 * time.Second
)
func createListener(blockchain *geth.GethBlockchain, repository repositories.Postgres) blockchain_listener.BlockchainListener {
listener := blockchain_listener.NewBlockchainListener(
blockchain,
[]core.BlockchainObserver{
observers.BlockchainLoggingObserver{},
observers.NewBlockchainDbObserver(repository),
},
)
return listener
}
func validateBlocks(blockchain *geth.GethBlockchain, repository repositories.Postgres, windowSize int, windowTemplate *template.Template) {
window := history.UpdateBlocksWindow(blockchain, repository, windowSize)
repository.SetBlocksStatus(blockchain.LastBlock().Int64())
windowTemplate.Execute(os.Stdout, window)
}
func main() {
parsedWindowTemplate := template.Must(template.New("window").Parse(windowTemplate))
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
environment := flag.String("environment", "", "Environment name")
flag.Parse()
config := cmd.LoadConfig(*environment)
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
listner := createListener(blockchain, repository)
go listner.Start()
defer listner.Stop()
missingBlocksPopulated := make(chan int)
go func() {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, 0)
}()
for range ticker.C {
validateBlocks(blockchain, repository, windowSize, parsedWindowTemplate)
select {
case <-missingBlocksPopulated:
go func() {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, 0)
}()
default:
}
}
}

View File

@ -0,0 +1,2 @@
ALTER TABLE blocks
DROP COLUMN is_final;

View File

@ -0,0 +1,2 @@
ALTER TABLE blocks
ADD COLUMN is_final BOOLEAN;

View File

@ -0,0 +1,2 @@
ALTER TABLE blocks
DROP CONSTRAINT node_id_block_number_uc;

View File

@ -0,0 +1,2 @@
ALTER TABLE blocks
ADD CONSTRAINT node_id_block_number_uc UNIQUE (block_number, node_id);

View File

@ -0,0 +1 @@
DROP INDEX block_id_index;

View File

@ -0,0 +1 @@
CREATE INDEX block_id_index ON transactions (block_id);

View File

@ -0,0 +1 @@
DROP INDEX node_id_index;

View File

@ -0,0 +1 @@
CREATE INDEX node_id_index ON blocks (node_id);

View File

@ -50,7 +50,8 @@ CREATE TABLE blocks (
block_parenthash character varying(66),
block_size bigint,
uncle_hash character varying(66),
node_id integer NOT NULL
node_id integer NOT NULL,
is_final boolean
);
@ -283,6 +284,14 @@ ALTER TABLE ONLY logs
ADD CONSTRAINT logs_pkey PRIMARY KEY (id);
--
-- Name: blocks node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY blocks
ADD CONSTRAINT node_id_block_number_uc UNIQUE (block_number, node_id);
--
-- Name: nodes node_uc; Type: CONSTRAINT; Schema: public; Owner: -
--
@ -323,6 +332,13 @@ ALTER TABLE ONLY watched_contracts
ADD CONSTRAINT watched_contracts_pkey PRIMARY KEY (contract_id);
--
-- Name: block_id_index; Type: INDEX; Schema: public; Owner: -
--
CREATE INDEX block_id_index ON transactions USING btree (block_id);
--
-- Name: block_number_index; Type: INDEX; Schema: public; Owner: -
--
@ -330,6 +346,13 @@ ALTER TABLE ONLY watched_contracts
CREATE INDEX block_number_index ON blocks USING btree (block_number);
--
-- Name: node_id_index; Type: INDEX; Schema: public; Owner: -
--
CREATE INDEX node_id_index ON blocks USING btree (node_id);
--
-- Name: transactions blocks_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
--

View File

@ -54,10 +54,11 @@ var _ = Describe("Reading from the Geth blockchain", func() {
It("retrieves the genesis block and first block", func(done Done) {
genesisBlock := blockchain.GetBlockByNumber(int64(0))
firstBlock := blockchain.GetBlockByNumber(int64(1))
lastBlockNumber := blockchain.LastBlock()
Expect(genesisBlock.Number).To(Equal(int64(0)))
Expect(firstBlock.Number).To(Equal(int64(1)))
Expect(lastBlockNumber.Int64()).To(BeNumerically(">", 0))
close(done)
}, 15)

View File

@ -12,4 +12,5 @@ type Block struct {
Time int64
Transactions []Transaction
UncleHash string
IsFinal bool
}

View File

@ -4,6 +4,7 @@ import "math/big"
type Blockchain interface {
GetBlockByNumber(blockNumber int64) Block
LastBlock() *big.Int
Node() Node
SubscribeToBlocks(blocks chan Block)
StartListening()

View File

@ -17,6 +17,16 @@ type Blockchain struct {
node core.Node
}
func (blockchain *Blockchain) LastBlock() *big.Int {
var max int64
for blockNumber := range blockchain.blocks {
if blockNumber > max {
max = blockNumber
}
}
return big.NewInt(max)
}
func (blockchain *Blockchain) GetLogs(contract core.Contract, blockNumber *big.Int) ([]core.Log, error) {
return blockchain.logs[contract.Hash], nil
}

View File

@ -1,6 +1,8 @@
package geth
import (
"strings"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -39,8 +41,8 @@ func gethTransToCoreTrans(transaction *types.Transaction, from *common.Address)
Hash: transaction.Hash().Hex(),
Data: transaction.Data(),
Nonce: transaction.Nonce(),
To: addressToHex(transaction.To()),
From: addressToHex(from),
To: strings.ToLower(addressToHex(transaction.To())),
From: strings.ToLower(addressToHex(from)),
GasLimit: transaction.Gas().Int64(),
GasPrice: transaction.GasPrice().Int64(),
Value: transaction.Value().Int64(),

View File

@ -53,6 +53,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
Expect(gethBlock.Size).To(Equal(block.Size().Int64()))
Expect(gethBlock.Time).To(Equal(time))
Expect(gethBlock.UncleHash).To(Equal(block.UncleHash().Hex()))
Expect(gethBlock.IsFinal).To(BeFalse())
})
Describe("the converted transations", func() {

View File

@ -25,7 +25,7 @@ type GethBlockchain struct {
func (blockchain *GethBlockchain) GetLogs(contract core.Contract, blockNumber *big.Int) ([]core.Log, error) {
if blockNumber == nil {
blockNumber = blockchain.latestBlock()
blockNumber = blockchain.LastBlock()
}
contractAddress := common.HexToAddress(contract.Hash)
fc := ethereum.FilterQuery{
@ -80,7 +80,7 @@ func (blockchain *GethBlockchain) StopListening() {
blockchain.newHeadSubscription.Unsubscribe()
}
func (blockchain *GethBlockchain) latestBlock() *big.Int {
func (blockchain *GethBlockchain) LastBlock() *big.Int {
block, _ := blockchain.client.HeaderByNumber(context.Background(), nil)
return block.Number
}

View File

@ -100,4 +100,14 @@ var _ = Describe("Populating blocks", func() {
Expect(window.Size()).To(Equal(2))
})
It("returns the number of largest block", func() {
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
{Number: 1},
{Number: 2},
{Number: 3},
})
maxBlockNumber := blockchain.LastBlock()
Expect(maxBlockNumber.Int64()).To(Equal(int64(3)))
})
})

View File

@ -13,6 +13,16 @@ type InMemory struct {
HandleBlockCallCount int
}
func (repository *InMemory) SetBlocksStatus(chainHead int64) {
for key, block := range repository.blocks {
if key < (chainHead - blocksFromHeadBeforeFinal) {
tmp := block
tmp.IsFinal = true
repository.blocks[key] = tmp
}
}
}
func (repository *InMemory) CreateLogs(logs []core.Log) error {
for _, log := range logs {
key := fmt.Sprintf("%s%s", log.BlockNumber, log.Index)

View File

@ -52,6 +52,14 @@ func NewPostgres(databaseConfig config.Database, node core.Node) (Postgres, erro
return pg, nil
}
func (repository Postgres) SetBlocksStatus(chainHead int64) {
cutoff := chainHead - blocksFromHeadBeforeFinal
repository.Db.Exec(`
UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND block_number < $1`,
cutoff)
}
func (repository Postgres) CreateLogs(logs []core.Log) error {
tx, _ := repository.Db.BeginTx(context.Background(), nil)
for _, tlog := range logs {
@ -178,7 +186,7 @@ func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highes
}
func (repository Postgres) FindBlockByNumber(blockNumber int64) (core.Block, error) {
blockRows, _ := repository.Db.Query(
blockRows := repository.Db.QueryRow(
`SELECT id,
block_number,
block_gaslimit,
@ -189,19 +197,20 @@ func (repository Postgres) FindBlockByNumber(blockNumber int64) (core.Block, err
block_nonce,
block_parenthash,
block_size,
uncle_hash
uncle_hash,
is_final
FROM blocks
WHERE node_id = $1`, repository.nodeId)
var savedBlocks []core.Block
for blockRows.Next() {
savedBlock := repository.loadBlock(blockRows)
savedBlocks = append(savedBlocks, savedBlock)
}
if len(savedBlocks) > 0 {
return savedBlocks[0], nil
} else {
return core.Block{}, ErrBlockDoesNotExist(blockNumber)
WHERE node_id = $1 AND block_number = $2`, repository.nodeId, blockNumber)
savedBlock, err := repository.loadBlock(blockRows)
if err != nil {
switch err {
case sql.ErrNoRows:
return core.Block{}, ErrBlockDoesNotExist(blockNumber)
default:
return savedBlock, err
}
}
return savedBlock, nil
}
func (repository Postgres) BlockCount() int {
@ -247,10 +256,10 @@ func (repository Postgres) insertBlock(block core.Block) error {
tx, _ := repository.Db.BeginTx(context.Background(), nil)
err := tx.QueryRow(
`INSERT INTO blocks
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash, is_final)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING id `,
repository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash).
repository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal).
Scan(&blockId)
if err != nil {
tx.Rollback()
@ -291,7 +300,7 @@ func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transac
return nil
}
func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
func (repository Postgres) loadBlock(blockRows *sql.Row) (core.Block, error) {
var blockId int64
var blockHash string
var blockNonce string
@ -303,7 +312,11 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
var gasLimit float64
var gasUsed float64
var uncleHash string
blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash)
var isFinal bool
err := blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash, &isFinal)
if err != nil {
return core.Block{}, err
}
transactionRows, _ := repository.Db.Query(`
SELECT tx_hash,
tx_nonce,
@ -313,7 +326,8 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
tx_gasprice,
tx_value
FROM transactions
WHERE block_id = $1`, blockId)
WHERE block_id = $1
ORDER BY tx_hash`, blockId)
transactions := repository.loadTransactions(transactionRows)
return core.Block{
Difficulty: difficulty,
@ -327,7 +341,8 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
Time: int64(blockTime),
Transactions: transactions,
UncleHash: uncleHash,
}
IsFinal: isFinal,
}, nil
}
func (repository Postgres) loadLogs(logsRows *sql.Rows) []core.Log {

View File

@ -2,6 +2,10 @@ package repositories
import "github.com/8thlight/vulcanizedb/pkg/core"
const (
blocksFromHeadBeforeFinal = 20
)
type Repository interface {
CreateOrUpdateBlock(block core.Block) error
BlockCount() int
@ -13,4 +17,5 @@ type Repository interface {
FindContract(contractHash string) (core.Contract, error)
CreateLogs(log []core.Log) error
FindLogs(address string, blockNumber int64) []core.Log
SetBlocksStatus(chainHead int64)
}

View File

@ -2,6 +2,7 @@ package testing
import (
"sort"
"strconv"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/repositories"
@ -276,6 +277,25 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
})
})
Describe("The block status", func() {
It("sets the status of blocks within n-20 of chain HEAD as final", func() {
blockNumberOfChainHead := 25
for i := 0; i < blockNumberOfChainHead; i++ {
repository.CreateOrUpdateBlock(core.Block{Number: int64(i), Hash: strconv.Itoa(i)})
}
repository.SetBlocksStatus(int64(blockNumberOfChainHead))
blockOne, err := repository.FindBlockByNumber(1)
Expect(err).ToNot(HaveOccurred())
Expect(blockOne.IsFinal).To(Equal(true))
blockTwo, err := repository.FindBlockByNumber(24)
Expect(err).ToNot(HaveOccurred())
Expect(blockTwo.IsFinal).To(BeFalse())
})
})
Describe("Creating contracts", func() {
It("returns the contract when it exists", func() {
repository.CreateContract(core.Contract{Hash: "x123"})