Canonical blocks (#108)
* Update Block w/ newest Block * Add cascading delete to blocks and transactions tables * Add handling for new conflicting blocks * Command line version of sliding window n behind HEAD
This commit is contained in:
parent
84e77f259d
commit
266c9587c8
@ -17,6 +17,6 @@ func main() {
|
|||||||
config := cmd.LoadConfig(*environment)
|
config := cmd.LoadConfig(*environment)
|
||||||
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
|
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
|
||||||
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
|
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
|
||||||
numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, int64(*startingBlockNumber))
|
numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, repository, int64(*startingBlockNumber))
|
||||||
fmt.Printf("Populated %d blocks", numberOfBlocksCreated)
|
fmt.Printf("Populated %d blocks", numberOfBlocksCreated)
|
||||||
}
|
}
|
||||||
|
47
cmd/update_canonical/main.go
Normal file
47
cmd/update_canonical/main.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"os"
|
||||||
|
"text/template"
|
||||||
|
|
||||||
|
"github.com/8thlight/vulcanizedb/cmd"
|
||||||
|
"github.com/8thlight/vulcanizedb/pkg/blockchain_listener"
|
||||||
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
|
"github.com/8thlight/vulcanizedb/pkg/geth"
|
||||||
|
"github.com/8thlight/vulcanizedb/pkg/history"
|
||||||
|
"github.com/8thlight/vulcanizedb/pkg/observers"
|
||||||
|
)
|
||||||
|
|
||||||
|
const windowTemplate = `Validating Existing Blocks
|
||||||
|
|{{.LowerBound}}|-- Validation Window --|{{.UpperBound}}| {{.MaxBlockNumber}}(HEAD)
|
||||||
|
|
||||||
|
`
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
environment := flag.String("environment", "", "Environment name")
|
||||||
|
flag.Parse()
|
||||||
|
config := cmd.LoadConfig(*environment)
|
||||||
|
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
|
||||||
|
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
|
||||||
|
listener := blockchain_listener.NewBlockchainListener(
|
||||||
|
blockchain,
|
||||||
|
[]core.BlockchainObserver{
|
||||||
|
observers.BlockchainLoggingObserver{},
|
||||||
|
observers.NewBlockchainDbObserver(repository),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
go listener.Start()
|
||||||
|
|
||||||
|
windowSize := 10
|
||||||
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
|
t := template.Must(template.New("window").Parse(windowTemplate))
|
||||||
|
for _ = range ticker.C {
|
||||||
|
window := history.UpdateBlocksWindow(blockchain, repository, windowSize)
|
||||||
|
t.Execute(os.Stdout, window)
|
||||||
|
}
|
||||||
|
ticker.Stop()
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
BEGIN;
|
||||||
|
|
||||||
|
ALTER TABLE transactions
|
||||||
|
DROP CONSTRAINT blocks_fk;
|
||||||
|
|
||||||
|
ALTER TABLE transactions
|
||||||
|
ADD CONSTRAINT fk_test
|
||||||
|
FOREIGN KEY (block_id)
|
||||||
|
REFERENCES blocks (id);
|
||||||
|
|
||||||
|
COMMIT;
|
@ -0,0 +1,12 @@
|
|||||||
|
BEGIN;
|
||||||
|
|
||||||
|
ALTER TABLE transactions
|
||||||
|
DROP CONSTRAINT fk_test;
|
||||||
|
|
||||||
|
ALTER TABLE transactions
|
||||||
|
ADD CONSTRAINT blocks_fk
|
||||||
|
FOREIGN KEY (block_id)
|
||||||
|
REFERENCES blocks (id)
|
||||||
|
ON DELETE CASCADE;
|
||||||
|
|
||||||
|
COMMIT;
|
@ -0,0 +1,11 @@
|
|||||||
|
BEGIN;
|
||||||
|
|
||||||
|
ALTER TABLE blocks
|
||||||
|
DROP CONSTRAINT node_fk;
|
||||||
|
|
||||||
|
ALTER TABLE blocks
|
||||||
|
ADD CONSTRAINT node_fk
|
||||||
|
FOREIGN KEY (node_id)
|
||||||
|
REFERENCES nodes (id);
|
||||||
|
|
||||||
|
COMMIT;
|
12
db/migrations/1513275969_add_cascade_delete_to_blocks.up.sql
Normal file
12
db/migrations/1513275969_add_cascade_delete_to_blocks.up.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
BEGIN;
|
||||||
|
|
||||||
|
ALTER TABLE blocks
|
||||||
|
DROP CONSTRAINT node_fk;
|
||||||
|
|
||||||
|
ALTER TABLE blocks
|
||||||
|
ADD CONSTRAINT node_fk
|
||||||
|
FOREIGN KEY (node_id)
|
||||||
|
REFERENCES nodes (id)
|
||||||
|
ON DELETE CASCADE;
|
||||||
|
|
||||||
|
COMMIT;
|
@ -331,11 +331,11 @@ CREATE INDEX block_number_index ON blocks USING btree (block_number);
|
|||||||
|
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Name: transactions fk_test; Type: FK CONSTRAINT; Schema: public; Owner: -
|
-- Name: transactions blocks_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
|
||||||
--
|
--
|
||||||
|
|
||||||
ALTER TABLE ONLY transactions
|
ALTER TABLE ONLY transactions
|
||||||
ADD CONSTRAINT fk_test FOREIGN KEY (block_id) REFERENCES blocks(id);
|
ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES blocks(id) ON DELETE CASCADE;
|
||||||
|
|
||||||
|
|
||||||
--
|
--
|
||||||
@ -343,7 +343,7 @@ ALTER TABLE ONLY transactions
|
|||||||
--
|
--
|
||||||
|
|
||||||
ALTER TABLE ONLY blocks
|
ALTER TABLE ONLY blocks
|
||||||
ADD CONSTRAINT node_fk FOREIGN KEY (node_id) REFERENCES nodes(id);
|
ADD CONSTRAINT node_fk FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE;
|
||||||
|
|
||||||
|
|
||||||
--
|
--
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
package integration_test
|
package integration_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
|
||||||
"github.com/8thlight/vulcanizedb/pkg/blockchain_listener"
|
"github.com/8thlight/vulcanizedb/pkg/blockchain_listener"
|
||||||
"github.com/8thlight/vulcanizedb/pkg/config"
|
"github.com/8thlight/vulcanizedb/pkg/config"
|
||||||
"github.com/8thlight/vulcanizedb/pkg/core"
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
@ -10,6 +13,10 @@ import (
|
|||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetOutput(ioutil.Discard)
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("Reading from the Geth blockchain", func() {
|
var _ = Describe("Reading from the Geth blockchain", func() {
|
||||||
|
|
||||||
var listener blockchain_listener.BlockchainListener
|
var listener blockchain_listener.BlockchainListener
|
||||||
|
@ -63,7 +63,7 @@ var _ = Describe("The contract summary", func() {
|
|||||||
{To: "0x123"},
|
{To: "0x123"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
blockchain := fakes.NewBlockchain()
|
blockchain := fakes.NewBlockchain()
|
||||||
|
|
||||||
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
|
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
|
||||||
@ -81,7 +81,7 @@ var _ = Describe("The contract summary", func() {
|
|||||||
{Hash: "TRANSACTION1", To: "0x123"},
|
{Hash: "TRANSACTION1", To: "0x123"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
blockchain := fakes.NewBlockchain()
|
blockchain := fakes.NewBlockchain()
|
||||||
|
|
||||||
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
|
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
package geth
|
package geth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
|
"log"
|
||||||
|
|
||||||
"github.com/8thlight/vulcanizedb/pkg/core"
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
"github.com/8thlight/vulcanizedb/pkg/geth/node"
|
"github.com/8thlight/vulcanizedb/pkg/geth/node"
|
||||||
"github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
@ -61,7 +61,7 @@ func NewGethBlockchain(ipcPath string) *GethBlockchain {
|
|||||||
|
|
||||||
func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) {
|
func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) {
|
||||||
blockchain.outputBlocks = blocks
|
blockchain.outputBlocks = blocks
|
||||||
fmt.Println("SubscribeToBlocks")
|
log.Println("SubscribeToBlocks")
|
||||||
inputHeaders := make(chan *types.Header, 10)
|
inputHeaders := make(chan *types.Header, 10)
|
||||||
myContext := context.Background()
|
myContext := context.Background()
|
||||||
blockchain.readGethHeaders = inputHeaders
|
blockchain.readGethHeaders = inputHeaders
|
||||||
|
@ -5,11 +5,43 @@ import (
|
|||||||
"github.com/8thlight/vulcanizedb/pkg/repositories"
|
"github.com/8thlight/vulcanizedb/pkg/repositories"
|
||||||
)
|
)
|
||||||
|
|
||||||
func PopulateBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int {
|
type Window struct {
|
||||||
blockNumbers := repository.MissingBlockNumbers(startingBlockNumber, repository.MaxBlockNumber())
|
LowerBound int
|
||||||
|
UpperBound int
|
||||||
|
MaxBlockNumber int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (window Window) Size() int {
|
||||||
|
return int(window.UpperBound - window.LowerBound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func PopulateMissingBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int {
|
||||||
|
blockRange := repository.MissingBlockNumbers(startingBlockNumber, repository.MaxBlockNumber())
|
||||||
|
updateBlockRange(blockchain, repository, blockRange)
|
||||||
|
return len(blockRange)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpdateBlocksWindow(blockchain core.Blockchain, repository repositories.Repository, windowSize int) Window {
|
||||||
|
maxBlockNumber := repository.MaxBlockNumber()
|
||||||
|
upperBound := repository.MaxBlockNumber() - int64(2)
|
||||||
|
lowerBound := upperBound - int64(windowSize)
|
||||||
|
blockRange := MakeRange(lowerBound, upperBound)
|
||||||
|
updateBlockRange(blockchain, repository, blockRange)
|
||||||
|
return Window{int(lowerBound), int(upperBound), int(maxBlockNumber)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateBlockRange(blockchain core.Blockchain, repository repositories.Repository, blockNumbers []int64) int {
|
||||||
for _, blockNumber := range blockNumbers {
|
for _, blockNumber := range blockNumbers {
|
||||||
block := blockchain.GetBlockByNumber(blockNumber)
|
block := blockchain.GetBlockByNumber(blockNumber)
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
}
|
}
|
||||||
return len(blockNumbers)
|
return len(blockNumbers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MakeRange(min, max int64) []int64 {
|
||||||
|
a := make([]int64, max-min)
|
||||||
|
for i := range a {
|
||||||
|
a[i] = min + int64(i)
|
||||||
|
}
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
@ -15,16 +15,16 @@ var _ = Describe("Populating blocks", func() {
|
|||||||
blocks := []core.Block{{Number: 1, Hash: "x012343"}}
|
blocks := []core.Block{{Number: 1, Hash: "x012343"}}
|
||||||
blockchain := fakes.NewBlockchainWithBlocks(blocks)
|
blockchain := fakes.NewBlockchainWithBlocks(blocks)
|
||||||
repository := repositories.NewInMemory()
|
repository := repositories.NewInMemory()
|
||||||
repository.CreateBlock(core.Block{Number: 2})
|
repository.CreateOrUpdateBlock(core.Block{Number: 2})
|
||||||
|
|
||||||
history.PopulateBlocks(blockchain, repository, 1)
|
history.PopulateMissingBlocks(blockchain, repository, 1)
|
||||||
|
|
||||||
block, err := repository.FindBlockByNumber(1)
|
block, err := repository.FindBlockByNumber(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(block.Hash).To(Equal("x012343"))
|
Expect(block.Hash).To(Equal("x012343"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("fills in two missing blocks", func() {
|
It("fills in the three missing blocks (5,8,10)", func() {
|
||||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||||
{Number: 4},
|
{Number: 4},
|
||||||
{Number: 5},
|
{Number: 5},
|
||||||
@ -33,16 +33,16 @@ var _ = Describe("Populating blocks", func() {
|
|||||||
{Number: 13},
|
{Number: 13},
|
||||||
})
|
})
|
||||||
repository := repositories.NewInMemory()
|
repository := repositories.NewInMemory()
|
||||||
repository.CreateBlock(core.Block{Number: 1})
|
repository.CreateOrUpdateBlock(core.Block{Number: 1})
|
||||||
repository.CreateBlock(core.Block{Number: 2})
|
repository.CreateOrUpdateBlock(core.Block{Number: 2})
|
||||||
repository.CreateBlock(core.Block{Number: 3})
|
repository.CreateOrUpdateBlock(core.Block{Number: 3})
|
||||||
repository.CreateBlock(core.Block{Number: 6})
|
repository.CreateOrUpdateBlock(core.Block{Number: 6})
|
||||||
repository.CreateBlock(core.Block{Number: 7})
|
repository.CreateOrUpdateBlock(core.Block{Number: 7})
|
||||||
repository.CreateBlock(core.Block{Number: 9})
|
repository.CreateOrUpdateBlock(core.Block{Number: 9})
|
||||||
repository.CreateBlock(core.Block{Number: 11})
|
repository.CreateOrUpdateBlock(core.Block{Number: 11})
|
||||||
repository.CreateBlock(core.Block{Number: 12})
|
repository.CreateOrUpdateBlock(core.Block{Number: 12})
|
||||||
|
|
||||||
history.PopulateBlocks(blockchain, repository, 5)
|
history.PopulateMissingBlocks(blockchain, repository, 5)
|
||||||
|
|
||||||
Expect(repository.BlockCount()).To(Equal(11))
|
Expect(repository.BlockCount()).To(Equal(11))
|
||||||
_, err := repository.FindBlockByNumber(4)
|
_, err := repository.FindBlockByNumber(4)
|
||||||
@ -57,18 +57,47 @@ var _ = Describe("Populating blocks", func() {
|
|||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("updates the repository with a range of blocks w/in sliding window ", func() {
|
||||||
|
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||||
|
{Number: 1},
|
||||||
|
{Number: 2},
|
||||||
|
{Number: 3},
|
||||||
|
{Number: 4},
|
||||||
|
{Number: 5},
|
||||||
|
})
|
||||||
|
repository := repositories.NewInMemory()
|
||||||
|
repository.CreateOrUpdateBlock(blockchain.GetBlockByNumber(5))
|
||||||
|
|
||||||
|
history.UpdateBlocksWindow(blockchain, repository, 2)
|
||||||
|
|
||||||
|
Expect(repository.BlockCount()).To(Equal(3))
|
||||||
|
Expect(repository.HandleBlockCallCount).To(Equal(3))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Generates a range of int64", func() {
|
||||||
|
numberOfBlocksCreated := history.MakeRange(0, 5)
|
||||||
|
expected := []int64{0, 1, 2, 3, 4}
|
||||||
|
|
||||||
|
Expect(numberOfBlocksCreated).To(Equal(expected))
|
||||||
|
})
|
||||||
|
|
||||||
It("returns the number of blocks created", func() {
|
It("returns the number of blocks created", func() {
|
||||||
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{
|
||||||
{Number: 4},
|
{Number: 4},
|
||||||
{Number: 5},
|
{Number: 5},
|
||||||
})
|
})
|
||||||
repository := repositories.NewInMemory()
|
repository := repositories.NewInMemory()
|
||||||
repository.CreateBlock(core.Block{Number: 3})
|
repository.CreateOrUpdateBlock(core.Block{Number: 3})
|
||||||
repository.CreateBlock(core.Block{Number: 6})
|
repository.CreateOrUpdateBlock(core.Block{Number: 6})
|
||||||
|
|
||||||
numberOfBlocksCreated := history.PopulateBlocks(blockchain, repository, 3)
|
numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, repository, 3)
|
||||||
|
|
||||||
Expect(numberOfBlocksCreated).To(Equal(2))
|
Expect(numberOfBlocksCreated).To(Equal(2))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("returns the window size", func() {
|
||||||
|
window := history.Window{1, 3, 10}
|
||||||
|
Expect(window.Size()).To(Equal(2))
|
||||||
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
@ -14,5 +14,5 @@ func NewBlockchainDbObserver(repository repositories.Repository) BlockchainDbObs
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (observer BlockchainDbObserver) NotifyBlockAdded(block core.Block) {
|
func (observer BlockchainDbObserver) NotifyBlockAdded(block core.Block) {
|
||||||
observer.repository.CreateBlock(block)
|
observer.repository.CreateOrUpdateBlock(block)
|
||||||
}
|
}
|
||||||
|
@ -1,18 +1,32 @@
|
|||||||
package observers
|
package observers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"os"
|
||||||
|
"text/template"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/8thlight/vulcanizedb/pkg/core"
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const blockAddedTemplate = `
|
||||||
|
New block was added: {{.Number}}
|
||||||
|
Time: {{.Time | unix_time}}
|
||||||
|
Gas Limit: {{.GasLimit}}
|
||||||
|
Gas Used: {{.GasUsed}}
|
||||||
|
Number of Transactions {{.Transactions | len}}
|
||||||
|
|
||||||
|
`
|
||||||
|
|
||||||
|
var funcMap = template.FuncMap{
|
||||||
|
"unix_time": func(n int64) time.Time {
|
||||||
|
return time.Unix(n, 0)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var tmp = template.Must(template.New("window").Funcs(funcMap).Parse(blockAddedTemplate))
|
||||||
|
|
||||||
type BlockchainLoggingObserver struct{}
|
type BlockchainLoggingObserver struct{}
|
||||||
|
|
||||||
func (blockchainObserver BlockchainLoggingObserver) NotifyBlockAdded(block core.Block) {
|
func (blockchainObserver BlockchainLoggingObserver) NotifyBlockAdded(block core.Block) {
|
||||||
fmt.Printf("New block was added: %d\n"+
|
tmp.Execute(os.Stdout, block)
|
||||||
"\tTime: %v\n"+
|
|
||||||
"\tGas Limit: %d\n"+
|
|
||||||
"\tGas Used: %d\n"+
|
|
||||||
"\tNumber of Transactions %d\n", block.Number, time.Unix(block.Time, 0), block.GasLimit, block.GasUsed, len(block.Transactions))
|
|
||||||
}
|
}
|
||||||
|
@ -7,9 +7,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type InMemory struct {
|
type InMemory struct {
|
||||||
blocks map[int64]core.Block
|
blocks map[int64]core.Block
|
||||||
contracts map[string]core.Contract
|
contracts map[string]core.Contract
|
||||||
logs map[string][]core.Log
|
logs map[string][]core.Log
|
||||||
|
HandleBlockCallCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *InMemory) CreateLogs(logs []core.Log) error {
|
func (repository *InMemory) CreateLogs(logs []core.Log) error {
|
||||||
@ -70,13 +71,15 @@ func (repository *InMemory) MissingBlockNumbers(startingBlockNumber int64, endin
|
|||||||
|
|
||||||
func NewInMemory() *InMemory {
|
func NewInMemory() *InMemory {
|
||||||
return &InMemory{
|
return &InMemory{
|
||||||
blocks: make(map[int64]core.Block),
|
HandleBlockCallCount: 0,
|
||||||
contracts: make(map[string]core.Contract),
|
blocks: make(map[int64]core.Block),
|
||||||
logs: make(map[string][]core.Log),
|
contracts: make(map[string]core.Contract),
|
||||||
|
logs: make(map[string][]core.Log),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *InMemory) CreateBlock(block core.Block) error {
|
func (repository *InMemory) CreateOrUpdateBlock(block core.Block) error {
|
||||||
|
repository.HandleBlockCallCount++
|
||||||
repository.blocks[block.Number] = block
|
repository.blocks[block.Number] = block
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,8 @@ import (
|
|||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type BlockStatus int
|
||||||
|
|
||||||
type Postgres struct {
|
type Postgres struct {
|
||||||
Db *sqlx.DB
|
Db *sqlx.DB
|
||||||
node core.Node
|
node core.Node
|
||||||
@ -23,6 +25,7 @@ type Postgres struct {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
ErrDBInsertFailed = errors.New("postgres: insert failed")
|
ErrDBInsertFailed = errors.New("postgres: insert failed")
|
||||||
|
ErrDBDeleteFailed = errors.New("postgres: delete failed")
|
||||||
ErrDBConnectionFailed = errors.New("postgres: db connection failed")
|
ErrDBConnectionFailed = errors.New("postgres: db connection failed")
|
||||||
ErrUnableToSetNode = errors.New("postgres: unable to set node")
|
ErrUnableToSetNode = errors.New("postgres: unable to set node")
|
||||||
)
|
)
|
||||||
@ -134,8 +137,10 @@ func (repository Postgres) CreateContract(contract core.Contract) error {
|
|||||||
func (repository Postgres) ContractExists(contractHash string) bool {
|
func (repository Postgres) ContractExists(contractHash string) bool {
|
||||||
var exists bool
|
var exists bool
|
||||||
repository.Db.QueryRow(
|
repository.Db.QueryRow(
|
||||||
`SELECT exists(SELECT 1 FROM watched_contracts WHERE contract_hash=$1)
|
`SELECT exists(
|
||||||
FROM watched_contracts`, contractHash).Scan(&exists)
|
SELECT 1
|
||||||
|
FROM watched_contracts
|
||||||
|
WHERE contract_hash = $1)`, contractHash).Scan(&exists)
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,9 +210,41 @@ func (repository Postgres) BlockCount() int {
|
|||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository Postgres) CreateBlock(block core.Block) error {
|
func (repository Postgres) getBlockHash(block core.Block) (string, bool) {
|
||||||
tx, _ := repository.Db.BeginTx(context.Background(), nil)
|
var retrievedBlockHash string
|
||||||
|
repository.Db.Get(&retrievedBlockHash,
|
||||||
|
`SELECT block_hash
|
||||||
|
FROM blocks
|
||||||
|
WHERE block_number = $1 AND node_id = $2`,
|
||||||
|
block.Number, repository.nodeId)
|
||||||
|
return retrievedBlockHash, blockExists(retrievedBlockHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
func blockExists(retrievedBlockHash string) bool {
|
||||||
|
return retrievedBlockHash != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (repository Postgres) CreateOrUpdateBlock(block core.Block) error {
|
||||||
|
var err error
|
||||||
|
retrievedBlockHash, ok := repository.getBlockHash(block)
|
||||||
|
if !ok {
|
||||||
|
err = repository.insertBlock(block)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if ok && retrievedBlockHash != block.Hash {
|
||||||
|
err = repository.removeBlock(block.Number)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = repository.insertBlock(block)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (repository Postgres) insertBlock(block core.Block) error {
|
||||||
var blockId int64
|
var blockId int64
|
||||||
|
tx, _ := repository.Db.BeginTx(context.Background(), nil)
|
||||||
err := tx.QueryRow(
|
err := tx.QueryRow(
|
||||||
`INSERT INTO blocks
|
`INSERT INTO blocks
|
||||||
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash)
|
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash)
|
||||||
@ -228,6 +265,18 @@ func (repository Postgres) CreateBlock(block core.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (repository Postgres) removeBlock(blockNumber int64) error {
|
||||||
|
_, err := repository.Db.Exec(
|
||||||
|
`DELETE FROM
|
||||||
|
blocks
|
||||||
|
WHERE block_number=$1 AND node_id=$2`,
|
||||||
|
blockNumber, repository.nodeId)
|
||||||
|
if err != nil {
|
||||||
|
return ErrDBDeleteFailed
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
|
func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
|
||||||
for _, transaction := range transactions {
|
for _, transaction := range transactions {
|
||||||
_, err := tx.Exec(
|
_, err := tx.Exec(
|
||||||
|
@ -4,6 +4,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
|
||||||
"github.com/8thlight/vulcanizedb/pkg/config"
|
"github.com/8thlight/vulcanizedb/pkg/config"
|
||||||
"github.com/8thlight/vulcanizedb/pkg/core"
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
"github.com/8thlight/vulcanizedb/pkg/repositories"
|
"github.com/8thlight/vulcanizedb/pkg/repositories"
|
||||||
@ -14,6 +17,10 @@ import (
|
|||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetOutput(ioutil.Discard)
|
||||||
|
}
|
||||||
|
|
||||||
var _ = Describe("Postgres repository", func() {
|
var _ = Describe("Postgres repository", func() {
|
||||||
|
|
||||||
It("connects to the database", func() {
|
It("connects to the database", func() {
|
||||||
@ -43,7 +50,7 @@ var _ = Describe("Postgres repository", func() {
|
|||||||
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1}
|
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1}
|
||||||
repository, _ := repositories.NewPostgres(cfg.Database, node)
|
repository, _ := repositories.NewPostgres(cfg.Database, node)
|
||||||
|
|
||||||
err1 := repository.CreateBlock(badBlock)
|
err1 := repository.CreateOrUpdateBlock(badBlock)
|
||||||
savedBlock, err2 := repository.FindBlockByNumber(123)
|
savedBlock, err2 := repository.FindBlockByNumber(123)
|
||||||
|
|
||||||
Expect(err1).To(HaveOccurred())
|
Expect(err1).To(HaveOccurred())
|
||||||
@ -97,7 +104,7 @@ var _ = Describe("Postgres repository", func() {
|
|||||||
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1}
|
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1}
|
||||||
repository, _ := repositories.NewPostgres(cfg.Database, node)
|
repository, _ := repositories.NewPostgres(cfg.Database, node)
|
||||||
|
|
||||||
err1 := repository.CreateBlock(block)
|
err1 := repository.CreateOrUpdateBlock(block)
|
||||||
savedBlock, err2 := repository.FindBlockByNumber(123)
|
savedBlock, err2 := repository.FindBlockByNumber(123)
|
||||||
|
|
||||||
Expect(err1).To(HaveOccurred())
|
Expect(err1).To(HaveOccurred())
|
||||||
|
@ -3,7 +3,7 @@ package repositories
|
|||||||
import "github.com/8thlight/vulcanizedb/pkg/core"
|
import "github.com/8thlight/vulcanizedb/pkg/core"
|
||||||
|
|
||||||
type Repository interface {
|
type Repository interface {
|
||||||
CreateBlock(block core.Block) error
|
CreateOrUpdateBlock(block core.Block) error
|
||||||
BlockCount() int
|
BlockCount() int
|
||||||
FindBlockByNumber(blockNumber int64) (core.Block, error)
|
FindBlockByNumber(blockNumber int64) (core.Block, error)
|
||||||
MaxBlockNumber() int64
|
MaxBlockNumber() int64
|
||||||
|
@ -33,7 +33,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
It("increments the block count", func() {
|
It("increments the block count", func() {
|
||||||
block := core.Block{Number: 123}
|
block := core.Block{Number: 123}
|
||||||
|
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
Expect(repository.BlockCount()).To(Equal(1))
|
Expect(repository.BlockCount()).To(Equal(1))
|
||||||
})
|
})
|
||||||
@ -42,7 +42,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
block := core.Block{
|
block := core.Block{
|
||||||
Number: 123,
|
Number: 123,
|
||||||
}
|
}
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
nodeTwo := core.Node{
|
nodeTwo := core.Node{
|
||||||
GenesisBlock: "0x456",
|
GenesisBlock: "0x456",
|
||||||
NetworkId: 1,
|
NetworkId: 1,
|
||||||
@ -77,7 +77,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
UncleHash: uncleHash,
|
UncleHash: uncleHash,
|
||||||
}
|
}
|
||||||
|
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
savedBlock, err := repository.FindBlockByNumber(blockNumber)
|
savedBlock, err := repository.FindBlockByNumber(blockNumber)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@ -105,7 +105,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
Transactions: []core.Transaction{{}},
|
Transactions: []core.Transaction{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
savedBlock, _ := repository.FindBlockByNumber(123)
|
savedBlock, _ := repository.FindBlockByNumber(123)
|
||||||
Expect(len(savedBlock.Transactions)).To(Equal(1))
|
Expect(len(savedBlock.Transactions)).To(Equal(1))
|
||||||
@ -117,12 +117,60 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
Transactions: []core.Transaction{{}, {}},
|
Transactions: []core.Transaction{{}, {}},
|
||||||
}
|
}
|
||||||
|
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
savedBlock, _ := repository.FindBlockByNumber(123)
|
savedBlock, _ := repository.FindBlockByNumber(123)
|
||||||
Expect(len(savedBlock.Transactions)).To(Equal(2))
|
Expect(len(savedBlock.Transactions)).To(Equal(2))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It(`replaces blocks and transactions associated to the block
|
||||||
|
when a more new block is in conflict (same block number + nodeid)`, func() {
|
||||||
|
blockOne := core.Block{
|
||||||
|
Number: 123,
|
||||||
|
Hash: "xabc",
|
||||||
|
Transactions: []core.Transaction{{Hash: "x123"}, {Hash: "x345"}},
|
||||||
|
}
|
||||||
|
blockTwo := core.Block{
|
||||||
|
Number: 123,
|
||||||
|
Hash: "xdef",
|
||||||
|
Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}},
|
||||||
|
}
|
||||||
|
|
||||||
|
repository.CreateOrUpdateBlock(blockOne)
|
||||||
|
repository.CreateOrUpdateBlock(blockTwo)
|
||||||
|
|
||||||
|
savedBlock, _ := repository.FindBlockByNumber(123)
|
||||||
|
Expect(len(savedBlock.Transactions)).To(Equal(2))
|
||||||
|
Expect(savedBlock.Transactions[0].Hash).To(Equal("x678"))
|
||||||
|
Expect(savedBlock.Transactions[1].Hash).To(Equal("x9ab"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It(`does not replace blocks when block number is not unique
|
||||||
|
but block number + node id is`, func() {
|
||||||
|
blockOne := core.Block{
|
||||||
|
Number: 123,
|
||||||
|
Transactions: []core.Transaction{{Hash: "x123"}, {Hash: "x345"}},
|
||||||
|
}
|
||||||
|
blockTwo := core.Block{
|
||||||
|
Number: 123,
|
||||||
|
Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}},
|
||||||
|
}
|
||||||
|
repository.CreateOrUpdateBlock(blockOne)
|
||||||
|
nodeTwo := core.Node{
|
||||||
|
GenesisBlock: "0x456",
|
||||||
|
NetworkId: 1,
|
||||||
|
}
|
||||||
|
repositoryTwo := buildRepository(nodeTwo)
|
||||||
|
|
||||||
|
repository.CreateOrUpdateBlock(blockOne)
|
||||||
|
repositoryTwo.CreateOrUpdateBlock(blockTwo)
|
||||||
|
retrievedBlockOne, _ := repository.FindBlockByNumber(123)
|
||||||
|
retrievedBlockTwo, _ := repositoryTwo.FindBlockByNumber(123)
|
||||||
|
|
||||||
|
Expect(retrievedBlockOne.Transactions[0].Hash).To(Equal("x123"))
|
||||||
|
Expect(retrievedBlockTwo.Transactions[0].Hash).To(Equal("x678"))
|
||||||
|
})
|
||||||
|
|
||||||
It("saves the attributes associated to a transaction", func() {
|
It("saves the attributes associated to a transaction", func() {
|
||||||
gasLimit := int64(5000)
|
gasLimit := int64(5000)
|
||||||
gasPrice := int64(3)
|
gasPrice := int64(3)
|
||||||
@ -144,7 +192,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
Transactions: []core.Transaction{transaction},
|
Transactions: []core.Transaction{transaction},
|
||||||
}
|
}
|
||||||
|
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
savedBlock, _ := repository.FindBlockByNumber(123)
|
savedBlock, _ := repository.FindBlockByNumber(123)
|
||||||
Expect(len(savedBlock.Transactions)).To(Equal(1))
|
Expect(len(savedBlock.Transactions)).To(Equal(1))
|
||||||
@ -162,41 +210,41 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
|
|
||||||
Describe("The missing block numbers", func() {
|
Describe("The missing block numbers", func() {
|
||||||
It("is empty the starting block number is the highest known block number", func() {
|
It("is empty the starting block number is the highest known block number", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 1})
|
repository.CreateOrUpdateBlock(core.Block{Number: 1})
|
||||||
|
|
||||||
Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0))
|
Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("is the only missing block number", func() {
|
It("is the only missing block number", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 2})
|
repository.CreateOrUpdateBlock(core.Block{Number: 2})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1}))
|
Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("is both missing block numbers", func() {
|
It("is both missing block numbers", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 3})
|
repository.CreateOrUpdateBlock(core.Block{Number: 3})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2}))
|
Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("goes back to the starting block number", func() {
|
It("goes back to the starting block number", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 6})
|
repository.CreateOrUpdateBlock(core.Block{Number: 6})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5}))
|
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("only includes missing block numbers", func() {
|
It("only includes missing block numbers", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 4})
|
repository.CreateOrUpdateBlock(core.Block{Number: 4})
|
||||||
repository.CreateBlock(core.Block{Number: 6})
|
repository.CreateOrUpdateBlock(core.Block{Number: 6})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5}))
|
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("is a list with multiple gaps", func() {
|
It("is a list with multiple gaps", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 4})
|
repository.CreateOrUpdateBlock(core.Block{Number: 4})
|
||||||
repository.CreateBlock(core.Block{Number: 5})
|
repository.CreateOrUpdateBlock(core.Block{Number: 5})
|
||||||
repository.CreateBlock(core.Block{Number: 8})
|
repository.CreateOrUpdateBlock(core.Block{Number: 8})
|
||||||
repository.CreateBlock(core.Block{Number: 10})
|
repository.CreateOrUpdateBlock(core.Block{Number: 10})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9}))
|
Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9}))
|
||||||
})
|
})
|
||||||
@ -206,8 +254,8 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("only returns requested range even when other gaps exist", func() {
|
It("only returns requested range even when other gaps exist", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 3})
|
repository.CreateOrUpdateBlock(core.Block{Number: 3})
|
||||||
repository.CreateBlock(core.Block{Number: 8})
|
repository.CreateOrUpdateBlock(core.Block{Number: 8})
|
||||||
|
|
||||||
Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5}))
|
Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5}))
|
||||||
})
|
})
|
||||||
@ -215,14 +263,14 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
|
|
||||||
Describe("The max block numbers", func() {
|
Describe("The max block numbers", func() {
|
||||||
It("returns the block number when a single block", func() {
|
It("returns the block number when a single block", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 1})
|
repository.CreateOrUpdateBlock(core.Block{Number: 1})
|
||||||
|
|
||||||
Expect(repository.MaxBlockNumber()).To(Equal(int64(1)))
|
Expect(repository.MaxBlockNumber()).To(Equal(int64(1)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("returns highest known block number when multiple blocks", func() {
|
It("returns highest known block number when multiple blocks", func() {
|
||||||
repository.CreateBlock(core.Block{Number: 1})
|
repository.CreateOrUpdateBlock(core.Block{Number: 1})
|
||||||
repository.CreateBlock(core.Block{Number: 10})
|
repository.CreateOrUpdateBlock(core.Block{Number: 10})
|
||||||
|
|
||||||
Expect(repository.MaxBlockNumber()).To(Equal(int64(10)))
|
Expect(repository.MaxBlockNumber()).To(Equal(int64(10)))
|
||||||
})
|
})
|
||||||
@ -261,7 +309,7 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
|
|||||||
{Hash: "TRANSACTION3", To: "x123"},
|
{Hash: "TRANSACTION3", To: "x123"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
repository.CreateBlock(block)
|
repository.CreateOrUpdateBlock(block)
|
||||||
|
|
||||||
repository.CreateContract(core.Contract{Hash: "x123"})
|
repository.CreateContract(core.Contract{Hash: "x123"})
|
||||||
contract, err := repository.FindContract("x123")
|
contract, err := repository.FindContract("x123")
|
||||||
|
Loading…
Reference in New Issue
Block a user