Separate DB access into several repos (#28)

* Separate files for InMemory

* Start using separate repos for collaborating objects

* Before Updating schema

* Separate various repos
This commit is contained in:
Matt K 2018-02-12 10:54:05 -06:00 committed by GitHub
parent 605b0a96ae
commit ed907535e3
34 changed files with 538 additions and 501 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -56,7 +57,8 @@ func addFilter() {
}
var logFilters filters.LogFilters
blockchain := geth.NewBlockchain(ipc)
repository := utils.LoadPostgres(databaseConfig, blockchain.Node())
db := utils.LoadPostgres(databaseConfig, blockchain.Node())
filterRepository := postgres.FilterRepository{DB: &db}
absFilePath := utils.AbsFilePath(filterFilepath)
logFilterBytes, err := ioutil.ReadFile(absFilePath)
if err != nil {
@ -67,7 +69,7 @@ func addFilter() {
log.Fatal(err)
}
for _, filter := range logFilters {
err = repository.CreateFilter(filter)
err = filterRepository.CreateFilter(filter)
if err != nil {
log.Fatal(err)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/graphql_server"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -43,8 +44,18 @@ func init() {
func parseSchema() *graphql.Schema {
blockchain := geth.NewBlockchain(ipc)
repository := utils.LoadPostgres(databaseConfig, blockchain.Node())
schema := graphql.MustParseSchema(graphql_server.Schema, graphql_server.NewResolver(repository))
db := utils.LoadPostgres(databaseConfig, blockchain.Node())
blockRepository := &postgres.BlockRepository{DB: &db}
logRepository := &postgres.LogRepository{DB: &db}
filterRepository := &postgres.FilterRepository{DB: &db}
watchedEventRepository := &postgres.WatchedEventRepository{DB: &db}
graphQLRepositories := graphql_server.GraphQLRepositories{
WatchedEventRepository: watchedEventRepository,
BlockRepository: blockRepository,
LogRepository: logRepository,
FilterRepository: filterRepository,
}
schema := graphql.MustParseSchema(graphql_server.Schema, graphql_server.NewResolver(graphQLRepositories))
return schema
}

View File

@ -11,6 +11,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/history"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -49,9 +50,9 @@ func init() {
syncCmd.Flags().IntVarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from")
}
func backFillAllBlocks(blockchain core.Blockchain, repository postgres.DB, missingBlocksPopulated chan int, startingBlockNumber int64) {
func backFillAllBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
go func() {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, startingBlockNumber)
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
}()
}
@ -63,12 +64,13 @@ func sync() {
if blockchain.LastBlock().Int64() == 0 {
log.Fatal("geth initial: state sync not finished")
}
repository := utils.LoadPostgres(databaseConfig, blockchain.Node())
validator := history.NewBlockValidator(blockchain, repository, 15)
db := utils.LoadPostgres(databaseConfig, blockchain.Node())
blockRepository := postgres.BlockRepository{DB: &db}
validator := history.NewBlockValidator(blockchain, blockRepository, 15)
missingBlocksPopulated := make(chan int)
_startingBlockNumber := int64(startingBlockNumber)
go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber)
go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, _startingBlockNumber)
for {
select {
@ -76,7 +78,7 @@ func sync() {
window := validator.ValidateBlocks()
validator.Log(os.Stdout, window)
case <-missingBlocksPopulated:
go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber)
go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, _startingBlockNumber)
}
}
}

View File

@ -1,14 +1,20 @@
[{
"name": "TransferFilter",
"fromBlock": "0x488290",
"toBlock": "0x488678",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
},
{
"name": "NewFilter",
"fromBlock": "0x4B34AD",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": ["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"]
}]
[
{
"name": "TransferFilter",
"fromBlock": "0x488290",
"toBlock": "0x488678",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
]
},
{
"name": "NewFilter",
"fromBlock": "0x4B34AD",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": [
"0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"
]
}
]

View File

@ -19,18 +19,19 @@ func init() {
var _ = Describe("Reading from the Geth blockchain", func() {
var blockchain *geth.Blockchain
var repository *inmemory.InMemory
var inMemory *inmemory.InMemory
BeforeEach(func() {
cfg, _ := config.NewConfig("private")
blockchain = geth.NewBlockchain(cfg.Client.IPCPath)
repository = inmemory.NewInMemory()
inMemory = inmemory.NewInMemory()
})
It("reads two blocks", func(done Done) {
validator := history.NewBlockValidator(blockchain, repository, 2)
blocks := &inmemory.BlockRepository{InMemory: inMemory}
validator := history.NewBlockValidator(blockchain, blocks, 2)
validator.ValidateBlocks()
Expect(repository.BlockCount()).To(Equal(2))
Expect(blocks.BlockCount()).To(Equal(2))
close(done)
}, 15)

View File

@ -17,8 +17,8 @@ type ContractSummary struct {
blockChain core.Blockchain
}
func NewSummary(blockchain core.Blockchain, repository repositories.Repository, contractHash string, blockNumber *big.Int) (ContractSummary, error) {
contract, err := repository.GetContract(contractHash)
func NewSummary(blockchain core.Blockchain, contractRepository repositories.ContractRepository, contractHash string, blockNumber *big.Int) (ContractSummary, error) {
contract, err := contractRepository.GetContract(contractHash)
if err != nil {
return ContractSummary{}, err
} else {

View File

@ -12,18 +12,26 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories/inmemory"
)
func NewCurrentContractSummary(blockchain core.Blockchain, repository repositories.Repository, contractHash string) (contract_summary.ContractSummary, error) {
return contract_summary.NewSummary(blockchain, repository, contractHash, nil)
func NewCurrentContractSummary(blockchain core.Blockchain, contractRepository repositories.ContractRepository, contractHash string) (contract_summary.ContractSummary, error) {
return contract_summary.NewSummary(blockchain, contractRepository, contractHash, nil)
}
var _ = Describe("The contract summary", func() {
var inMemoryDB *inmemory.InMemory
var contractRepostiory *inmemory.ContractRepostiory
BeforeEach(func() {
inMemoryDB = inmemory.NewInMemory()
contractRepostiory = &inmemory.ContractRepostiory{InMemory: inMemoryDB}
})
Context("when the given contract does not exist", func() {
It("returns an error", func() {
repository := inmemory.NewInMemory()
blockchain := fakes.NewBlockchain()
contractSummary, err := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, err := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary).To(Equal(contract_summary.ContractSummary{}))
Expect(err).NotTo(BeNil())
@ -32,101 +40,101 @@ var _ = Describe("The contract summary", func() {
Context("when the given contract exists", func() {
It("returns the summary", func() {
repository := inmemory.NewInMemory()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
contractRepostiory.CreateContract(contract)
blockchain := fakes.NewBlockchain()
contractSummary, err := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, err := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary).NotTo(Equal(contract_summary.ContractSummary{}))
Expect(err).To(BeNil())
})
It("includes the contract hash in the summary", func() {
repository := inmemory.NewInMemory()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
contractRepostiory.CreateContract(contract)
blockchain := fakes.NewBlockchain()
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary.ContractHash).To(Equal("0x123"))
})
It("sets the number of transactions", func() {
repository := inmemory.NewInMemory()
blocks := &inmemory.BlockRepository{InMemory: inMemoryDB}
blockchain := fakes.NewBlockchain()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
contractRepostiory.CreateContract(contract)
block := core.Block{
Transactions: []core.Transaction{
{To: "0x123"},
{To: "0x123"},
},
}
repository.CreateOrUpdateBlock(block)
blockchain := fakes.NewBlockchain()
blocks.CreateOrUpdateBlock(block)
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary.NumberOfTransactions).To(Equal(2))
})
It("sets the last transaction", func() {
repository := inmemory.NewInMemory()
blocks := &inmemory.BlockRepository{InMemory: inMemoryDB}
blockchain := fakes.NewBlockchain()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
contractRepostiory.CreateContract(contract)
block := core.Block{
Transactions: []core.Transaction{
{Hash: "TRANSACTION2", To: "0x123"},
{Hash: "TRANSACTION1", To: "0x123"},
},
}
repository.CreateOrUpdateBlock(block)
blockchain := fakes.NewBlockchain()
blocks.CreateOrUpdateBlock(block)
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary.LastTransaction.Hash).To(Equal("TRANSACTION2"))
})
It("gets contract state attribute for the contract from the blockchain", func() {
repository := inmemory.NewInMemory()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
blockchain := fakes.NewBlockchain()
contract := core.Contract{Hash: "0x123"}
contractRepostiory.CreateContract(contract)
blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar")
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
attribute := contractSummary.GetStateAttribute("foo")
Expect(attribute).To(Equal("bar"))
})
It("gets contract state attribute for the contract from the blockchain at specific block height", func() {
repository := inmemory.NewInMemory()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
blockchain := fakes.NewBlockchain()
contract := core.Contract{Hash: "0x123"}
contractRepostiory.CreateContract(contract)
blockNumber := big.NewInt(1000)
blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar")
blockchain.SetContractStateAttribute("0x123", blockNumber, "foo", "baz")
contractSummary, _ := contract_summary.NewSummary(blockchain, repository, "0x123", blockNumber)
contractSummary, _ := contract_summary.NewSummary(blockchain, contractRepostiory, "0x123", blockNumber)
attribute := contractSummary.GetStateAttribute("foo")
Expect(attribute).To(Equal("baz"))
})
It("gets attributes for the contract from the blockchain", func() {
repository := inmemory.NewInMemory()
contract := core.Contract{Hash: "0x123"}
repository.CreateContract(contract)
blockchain := fakes.NewBlockchain()
contract := core.Contract{Hash: "0x123"}
contractRepostiory.CreateContract(contract)
blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar")
blockchain.SetContractStateAttribute("0x123", nil, "baz", "bar")
contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123")
contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123")
Expect(contractSummary.Attributes).To(Equal(
core.ContractAttributes{

View File

@ -41,18 +41,25 @@ var Schema = `
}
`
type Resolver struct {
repository repositories.Repository
type GraphQLRepositories struct {
repositories.BlockRepository
repositories.LogRepository
repositories.WatchedEventRepository
repositories.FilterRepository
}
func NewResolver(repository repositories.Repository) *Resolver {
return &Resolver{repository: repository}
type Resolver struct {
graphQLRepositories GraphQLRepositories
}
func NewResolver(repositories GraphQLRepositories) *Resolver {
return &Resolver{graphQLRepositories: repositories}
}
func (r *Resolver) LogFilter(args struct {
Name string
}) (*logFilterResolver, error) {
logFilter, err := r.repository.GetFilter(args.Name)
logFilter, err := r.graphQLRepositories.GetFilter(args.Name)
if err != nil {
return &logFilterResolver{}, err
}
@ -94,7 +101,7 @@ func (lfr *logFilterResolver) Topics() []*string {
func (r *Resolver) WatchedEvents(args struct {
Name string
}) (*watchedEventsResolver, error) {
watchedEvents, err := r.repository.GetWatchedEvents(args.Name)
watchedEvents, err := r.graphQLRepositories.GetWatchedEvents(args.Name)
if err != nil {
return &watchedEventsResolver{}, err
}

View File

@ -14,7 +14,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/graphql_server"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
)
@ -32,27 +31,38 @@ func formatJSON(data []byte) []byte {
var _ = Describe("GraphQL", func() {
var cfg config.Config
var repository repositories.Repository
var graphQLRepositories graphql_server.GraphQLRepositories
BeforeEach(func() {
cfg, _ = config.NewConfig("private")
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"}
repository = postgres.BuildRepository(node)
e := repository.CreateFilter(filters.LogFilter{
db := postgres.NewTestDB(node)
blockRepository := &postgres.BlockRepository{DB: db}
logRepository := &postgres.LogRepository{DB: db}
filterRepository := &postgres.FilterRepository{DB: db}
watchedEventRepository := &postgres.WatchedEventRepository{DB: db}
graphQLRepositories = graphql_server.GraphQLRepositories{
WatchedEventRepository: watchedEventRepository,
BlockRepository: blockRepository,
LogRepository: logRepository,
FilterRepository: filterRepository,
}
err := graphQLRepositories.CreateFilter(filters.LogFilter{
Name: "TestFilter1",
FromBlock: 1,
ToBlock: 10,
Address: "0x123456789",
Topics: core.Topics{0: "topic=1", 2: "topic=2"},
})
if e != nil {
log.Fatal(e)
if err != nil {
log.Fatal(err)
}
f, e := repository.GetFilter("TestFilter1")
if e != nil {
log.Println(f)
log.Fatal(e)
filter, err := graphQLRepositories.GetFilter("TestFilter1")
if err != nil {
log.Println(filter)
log.Fatal(err)
}
matchingEvent := core.Log{
@ -71,16 +81,16 @@ var _ = Describe("GraphQL", func() {
Index: 0,
Data: "0xDATADATADATA",
}
e = repository.CreateLogs([]core.Log{matchingEvent, nonMatchingEvent})
if e != nil {
log.Fatal(e)
err = graphQLRepositories.CreateLogs([]core.Log{matchingEvent, nonMatchingEvent})
if err != nil {
log.Fatal(err)
}
})
It("Queries example schema for specific log filter", func() {
var variables map[string]interface{}
r := graphql_server.NewResolver(repository)
var schema = graphql.MustParseSchema(graphql_server.Schema, r)
resolver := graphql_server.NewResolver(graphQLRepositories)
var schema = graphql.MustParseSchema(graphql_server.Schema, resolver)
response := schema.Exec(context.Background(),
`{
logFilter(name: "TestFilter1") {
@ -108,16 +118,16 @@ var _ = Describe("GraphQL", func() {
}
err := json.Unmarshal(response.Data, &v)
Expect(err).ToNot(HaveOccurred())
a := formatJSON(response.Data)
e := formatJSON([]byte(expected))
Expect(a).To(Equal(e))
actualJSON := formatJSON(response.Data)
expectedJSON := formatJSON([]byte(expected))
Expect(actualJSON).To(Equal(expectedJSON))
})
It("Queries example schema for specific watched event log", func() {
var variables map[string]interface{}
r := graphql_server.NewResolver(repository)
var schema = graphql.MustParseSchema(graphql_server.Schema, r)
resolver := graphql_server.NewResolver(graphQLRepositories)
var schema = graphql.MustParseSchema(graphql_server.Schema, resolver)
response := schema.Exec(context.Background(),
`{
watchedEvents(name: "TestFilter1") {
@ -161,8 +171,8 @@ var _ = Describe("GraphQL", func() {
}
err := json.Unmarshal(response.Data, &v)
Expect(err).ToNot(HaveOccurred())
a := formatJSON(response.Data)
e := formatJSON([]byte(expected))
Expect(a).To(Equal(e))
actualJSON := formatJSON(response.Data)
expectedJSON := formatJSON([]byte(expected))
Expect(actualJSON).To(Equal(expectedJSON))
})
})

View File

@ -7,19 +7,19 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
func PopulateMissingBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int {
func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, startingBlockNumber int64) int {
lastBlock := blockchain.LastBlock().Int64()
blockRange := repository.MissingBlockNumbers(startingBlockNumber, lastBlock-1)
blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1)
log.SetPrefix("")
log.Printf("Backfilling %d blocks\n\n", len(blockRange))
RetrieveAndUpdateBlocks(blockchain, repository, blockRange)
RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange)
return len(blockRange)
}
func RetrieveAndUpdateBlocks(blockchain core.Blockchain, repository repositories.Repository, blockNumbers []int64) int {
func RetrieveAndUpdateBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, blockNumbers []int64) int {
for _, blockNumber := range blockNumbers {
block := blockchain.GetBlockByNumber(blockNumber)
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
}
return len(blockNumbers)
}

View File

@ -10,6 +10,13 @@ import (
)
var _ = Describe("Populating blocks", func() {
var inMemory *inmemory.InMemory
var blockRepository *inmemory.BlockRepository
BeforeEach(func() {
inMemory = inmemory.NewInMemory()
blockRepository = &inmemory.BlockRepository{InMemory: inMemory}
})
It("fills in the only missing block (Number 1)", func() {
blocks := []core.Block{
@ -17,11 +24,11 @@ var _ = Describe("Populating blocks", func() {
{Number: 2},
}
blockchain := fakes.NewBlockchainWithBlocks(blocks)
repository := inmemory.NewInMemory()
repository.CreateOrUpdateBlock(core.Block{Number: 2})
blocksAdded := history.PopulateMissingBlocks(blockchain, repository, 1)
_, err := repository.GetBlock(1)
blockRepository.CreateOrUpdateBlock(core.Block{Number: 2})
blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 1)
_, err := blockRepository.GetBlock(1)
Expect(blocksAdded).To(Equal(1))
Expect(err).ToNot(HaveOccurred())
@ -40,29 +47,28 @@ var _ = Describe("Populating blocks", func() {
{Number: 12},
{Number: 13},
})
repository := inmemory.NewInMemory()
repository.CreateOrUpdateBlock(core.Block{Number: 1})
repository.CreateOrUpdateBlock(core.Block{Number: 2})
repository.CreateOrUpdateBlock(core.Block{Number: 3})
repository.CreateOrUpdateBlock(core.Block{Number: 6})
repository.CreateOrUpdateBlock(core.Block{Number: 7})
repository.CreateOrUpdateBlock(core.Block{Number: 9})
repository.CreateOrUpdateBlock(core.Block{Number: 11})
repository.CreateOrUpdateBlock(core.Block{Number: 12})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 1})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 2})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 7})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 9})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 11})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 12})
blocksAdded := history.PopulateMissingBlocks(blockchain, repository, 5)
blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 5)
Expect(blocksAdded).To(Equal(3))
Expect(repository.BlockCount()).To(Equal(11))
_, err := repository.GetBlock(4)
Expect(blockRepository.BlockCount()).To(Equal(11))
_, err := blockRepository.GetBlock(4)
Expect(err).To(HaveOccurred())
_, err = repository.GetBlock(5)
_, err = blockRepository.GetBlock(5)
Expect(err).ToNot(HaveOccurred())
_, err = repository.GetBlock(8)
_, err = blockRepository.GetBlock(8)
Expect(err).ToNot(HaveOccurred())
_, err = repository.GetBlock(10)
_, err = blockRepository.GetBlock(10)
Expect(err).ToNot(HaveOccurred())
_, err = repository.GetBlock(13)
_, err = blockRepository.GetBlock(13)
Expect(err).To(HaveOccurred())
})
@ -72,11 +78,10 @@ var _ = Describe("Populating blocks", func() {
{Number: 5},
{Number: 6},
})
repository := inmemory.NewInMemory()
repository.CreateOrUpdateBlock(core.Block{Number: 3})
repository.CreateOrUpdateBlock(core.Block{Number: 6})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, repository, 3)
numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, blockRepository, 3)
Expect(numberOfBlocksCreated).To(Equal(2))
})
@ -89,11 +94,10 @@ var _ = Describe("Populating blocks", func() {
{Number: 4},
{Number: 5},
})
repository := inmemory.NewInMemory()
history.RetrieveAndUpdateBlocks(blockchain, repository, history.MakeRange(2, 5))
Expect(repository.BlockCount()).To(Equal(3))
Expect(repository.CreateOrUpdateBlockCallCount).To(Equal(3))
history.RetrieveAndUpdateBlocks(blockchain, blockRepository, history.MakeRange(2, 5))
Expect(blockRepository.BlockCount()).To(Equal(3))
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3))
})
})

View File

@ -17,15 +17,15 @@ var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTem
type BlockValidator struct {
blockchain core.Blockchain
repository repositories.Repository
blockRepository repositories.BlockRepository
windowSize int
parsedLoggingTemplate template.Template
}
func NewBlockValidator(blockchain core.Blockchain, repository repositories.Repository, windowSize int) *BlockValidator {
func NewBlockValidator(blockchain core.Blockchain, blockRepository repositories.BlockRepository, windowSize int) *BlockValidator {
return &BlockValidator{
blockchain,
repository,
blockRepository,
windowSize,
ParsedWindowTemplate,
}
@ -34,9 +34,9 @@ func NewBlockValidator(blockchain core.Blockchain, repository repositories.Repos
func (bv BlockValidator) ValidateBlocks() ValidationWindow {
window := MakeValidationWindow(bv.blockchain, bv.windowSize)
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
RetrieveAndUpdateBlocks(bv.blockchain, bv.repository, blockNumbers)
RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
lastBlock := bv.blockchain.LastBlock().Int64()
bv.repository.SetBlocksStatus(lastBlock)
bv.blockRepository.SetBlocksStatus(lastBlock)
return window
}

View File

@ -47,23 +47,26 @@ var _ = Describe("Blocks validator", func() {
{Number: 6},
{Number: 7},
})
repository := inmemory.NewInMemory()
inMemoryDB := inmemory.NewInMemory()
blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB}
validator := history.NewBlockValidator(blockchain, repository, 2)
validator := history.NewBlockValidator(blockchain, blocksRepository, 2)
window := validator.ValidateBlocks()
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))
Expect(repository.BlockCount()).To(Equal(2))
Expect(repository.CreateOrUpdateBlockCallCount).To(Equal(2))
Expect(blocksRepository.BlockCount()).To(Equal(2))
Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(2))
})
It("logs window message", func() {
inMemoryDB := inmemory.NewInMemory()
blockRepository := &inmemory.BlockRepository{InMemory: inMemoryDB}
expectedMessage := &bytes.Buffer{}
window := history.ValidationWindow{LowerBound: 5, UpperBound: 7}
history.ParsedWindowTemplate.Execute(expectedMessage, history.ValidationWindow{LowerBound: 5, UpperBound: 7})
blockchain := fakes.NewBlockchainWithBlocks([]core.Block{})
repository := inmemory.NewInMemory()
validator := history.NewBlockValidator(blockchain, repository, 2)
validator := history.NewBlockValidator(blockchain, blockRepository, 2)
actualMessage := &bytes.Buffer{}
validator.Log(actualMessage, window)
Expect(actualMessage).To(Equal(expectedMessage))

View File

@ -0,0 +1,51 @@
package inmemory
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
type BlockRepository struct {
*InMemory
}
func (blockRepository *BlockRepository) CreateOrUpdateBlock(block core.Block) error {
blockRepository.CreateOrUpdateBlockCallCount++
blockRepository.blocks[block.Number] = block
for _, transaction := range block.Transactions {
blockRepository.receipts[transaction.Hash] = transaction.Receipt
blockRepository.logs[transaction.TxHash] = transaction.Logs
}
return nil
}
func (blockRepository *BlockRepository) GetBlock(blockNumber int64) (core.Block, error) {
if block, ok := blockRepository.blocks[blockNumber]; ok {
return block, nil
}
return core.Block{}, repositories.ErrBlockDoesNotExist(blockNumber)
}
func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 {
missingNumbers := []int64{}
for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ {
if _, ok := blockRepository.blocks[blockNumber]; !ok {
missingNumbers = append(missingNumbers, blockNumber)
}
}
return missingNumbers
}
func (blockRepository *BlockRepository) SetBlocksStatus(chainHead int64) {
for key, block := range blockRepository.blocks {
if key < (chainHead - blocksFromHeadBeforeFinal) {
tmp := block
tmp.IsFinal = true
blockRepository.blocks[key] = tmp
}
}
}
func (blockRepository *BlockRepository) BlockCount() int {
return len(blockRepository.blocks)
}

View File

@ -0,0 +1,35 @@
package inmemory
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
type ContractRepostiory struct {
*InMemory
}
func (contractRepository *ContractRepostiory) ContractExists(contractHash string) bool {
_, present := contractRepository.contracts[contractHash]
return present
}
func (contractRepository *ContractRepostiory) GetContract(contractHash string) (core.Contract, error) {
contract, ok := contractRepository.contracts[contractHash]
if !ok {
return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash)
}
for _, block := range contractRepository.blocks {
for _, transaction := range block.Transactions {
if transaction.To == contractHash {
contract.Transactions = append(contract.Transactions, transaction)
}
}
}
return contract, nil
}
func (contractRepository *ContractRepostiory) CreateContract(contract core.Contract) error {
contractRepository.contracts[contract.Hash] = contract
return nil
}

View File

@ -1,13 +1,8 @@
package inmemory
import (
"fmt"
"errors"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
const (
@ -23,23 +18,6 @@ type InMemory struct {
CreateOrUpdateBlockCallCount int
}
func (repository *InMemory) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) {
panic("implement me")
}
func (repository *InMemory) GetFilter(name string) (filters.LogFilter, error) {
panic("implement me")
}
func (repository *InMemory) CreateFilter(filter filters.LogFilter) error {
key := filter.Name
if _, ok := repository.logFilters[key]; ok || key == "" {
return errors.New("filter name not unique")
}
repository.logFilters[key] = filter
return nil
}
func NewInMemory() *InMemory {
return &InMemory{
CreateOrUpdateBlockCallCount: 0,
@ -50,107 +28,3 @@ func NewInMemory() *InMemory {
logFilters: make(map[string]filters.LogFilter),
}
}
func (repository *InMemory) GetReceipt(txHash string) (core.Receipt, error) {
if receipt, ok := repository.receipts[txHash]; ok {
return receipt, nil
}
return core.Receipt{}, repositories.ErrReceiptDoesNotExist(txHash)
}
func (repository *InMemory) SetBlocksStatus(chainHead int64) {
for key, block := range repository.blocks {
if key < (chainHead - blocksFromHeadBeforeFinal) {
tmp := block
tmp.IsFinal = true
repository.blocks[key] = tmp
}
}
}
func (repository *InMemory) CreateLogs(logs []core.Log) error {
for _, log := range logs {
key := fmt.Sprintf("%d%d", log.BlockNumber, log.Index)
var logs []core.Log
repository.logs[key] = append(logs, log)
}
return nil
}
func (repository *InMemory) GetLogs(address string, blockNumber int64) []core.Log {
var matchingLogs []core.Log
for _, logs := range repository.logs {
for _, log := range logs {
if log.Address == address && log.BlockNumber == blockNumber {
matchingLogs = append(matchingLogs, log)
}
}
}
return matchingLogs
}
func (repository *InMemory) CreateContract(contract core.Contract) error {
repository.contracts[contract.Hash] = contract
return nil
}
func (repository *InMemory) ContractExists(contractHash string) bool {
_, present := repository.contracts[contractHash]
return present
}
func (repository *InMemory) GetContract(contractHash string) (core.Contract, error) {
contract, ok := repository.contracts[contractHash]
if !ok {
return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash)
}
for _, block := range repository.blocks {
for _, transaction := range block.Transactions {
if transaction.To == contractHash {
contract.Transactions = append(contract.Transactions, transaction)
}
}
}
return contract, nil
}
func (repository *InMemory) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 {
missingNumbers := []int64{}
for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ {
if _, ok := repository.blocks[blockNumber]; !ok {
missingNumbers = append(missingNumbers, blockNumber)
}
}
return missingNumbers
}
func (repository *InMemory) CreateOrUpdateBlock(block core.Block) error {
repository.CreateOrUpdateBlockCallCount++
repository.blocks[block.Number] = block
for _, transaction := range block.Transactions {
repository.receipts[transaction.Hash] = transaction.Receipt
repository.logs[transaction.TxHash] = transaction.Logs
}
return nil
}
func (repository *InMemory) BlockCount() int {
return len(repository.blocks)
}
func (repository *InMemory) GetBlock(blockNumber int64) (core.Block, error) {
if block, ok := repository.blocks[blockNumber]; ok {
return block, nil
}
return core.Block{}, repositories.ErrBlockDoesNotExist(blockNumber)
}
func (repository *InMemory) MaxBlockNumber() int64 {
highestBlockNumber := int64(-1)
for key := range repository.blocks {
if key > highestBlockNumber {
highestBlockNumber = key
}
}
return highestBlockNumber
}

View File

@ -16,35 +16,39 @@ const (
blocksFromHeadBeforeFinal = 20
)
func (db DB) SetBlocksStatus(chainHead int64) {
type BlockRepository struct {
*DB
}
func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) {
cutoff := chainHead - blocksFromHeadBeforeFinal
db.DB.Exec(`
blockRepository.DB.Exec(`
UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND number < $1`,
cutoff)
}
func (db DB) CreateOrUpdateBlock(block core.Block) error {
func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) error {
var err error
retrievedBlockHash, ok := db.getBlockHash(block)
retrievedBlockHash, ok := blockRepository.getBlockHash(block)
if !ok {
err = db.insertBlock(block)
err = blockRepository.insertBlock(block)
return err
}
if ok && retrievedBlockHash != block.Hash {
err = db.removeBlock(block.Number)
err = blockRepository.removeBlock(block.Number)
if err != nil {
return err
}
err = db.insertBlock(block)
err = blockRepository.insertBlock(block)
return err
}
return nil
}
func (db DB) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 {
func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 {
numbers := make([]int64, 0)
db.DB.Select(&numbers,
blockRepository.DB.Select(&numbers,
`SELECT all_block_numbers
FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
@ -56,8 +60,8 @@ func (db DB) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber i
return numbers
}
func (db DB) GetBlock(blockNumber int64) (core.Block, error) {
blockRows := db.DB.QueryRowx(
func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, error) {
blockRows := blockRepository.DB.QueryRowx(
`SELECT id,
number,
gaslimit,
@ -75,8 +79,8 @@ func (db DB) GetBlock(blockNumber int64) (core.Block, error) {
reward,
uncles_reward
FROM blocks
WHERE node_id = $1 AND number = $2`, db.nodeId, blockNumber)
savedBlock, err := db.loadBlock(blockRows)
WHERE node_id = $1 AND number = $2`, blockRepository.nodeId, blockNumber)
savedBlock, err := blockRepository.loadBlock(blockRows)
if err != nil {
switch err {
case sql.ErrNoRows:
@ -88,21 +92,21 @@ func (db DB) GetBlock(blockNumber int64) (core.Block, error) {
return savedBlock, nil
}
func (db DB) insertBlock(block core.Block) error {
func (blockRepository BlockRepository) insertBlock(block core.Block) error {
var blockId int64
tx, _ := db.DB.BeginTx(context.Background(), nil)
tx, _ := blockRepository.DB.BeginTx(context.Background(), nil)
err := tx.QueryRow(
`INSERT INTO blocks
(node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id `,
db.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward).
blockRepository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward).
Scan(&blockId)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
err = db.createTransactions(tx, blockId, block.Transactions)
err = blockRepository.createTransactions(tx, blockId, block.Transactions)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
@ -111,9 +115,9 @@ func (db DB) insertBlock(block core.Block) error {
return nil
}
func (db DB) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
func (blockRepository BlockRepository) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
for _, transaction := range transactions {
err := db.createTransaction(tx, blockId, transaction)
err := blockRepository.createTransaction(tx, blockId, transaction)
if err != nil {
return err
}
@ -131,7 +135,7 @@ func nullStringToZero(s string) string {
return s
}
func (db DB) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error {
func (blockRepository BlockRepository) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error {
var transactionId int
err := tx.QueryRow(
`INSERT INTO transactions
@ -144,12 +148,12 @@ func (db DB) createTransaction(tx *sql.Tx, blockId int64, transaction core.Trans
return err
}
if hasReceipt(transaction) {
receiptId, err := db.createReceipt(tx, transactionId, transaction.Receipt)
receiptId, err := blockRepository.createReceipt(tx, transactionId, transaction.Receipt)
if err != nil {
return err
}
if hasLogs(transaction) {
err = db.createLogs(tx, transaction.Receipt.Logs, receiptId)
err = blockRepository.createLogs(tx, transaction.Receipt.Logs, receiptId)
if err != nil {
return err
}
@ -166,7 +170,7 @@ func hasReceipt(transaction core.Transaction) bool {
return transaction.Receipt.TxHash != ""
}
func (db DB) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) (int, error) {
func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) (int, error) {
//Not currently persisting log bloom filters
var receiptId int
err := tx.QueryRow(
@ -181,17 +185,17 @@ func (db DB) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt)
return receiptId, nil
}
func (db DB) getBlockHash(block core.Block) (string, bool) {
func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) {
var retrievedBlockHash string
db.DB.Get(&retrievedBlockHash,
blockRepository.DB.Get(&retrievedBlockHash,
`SELECT hash
FROM blocks
WHERE number = $1 AND node_id = $2`,
block.Number, db.nodeId)
block.Number, blockRepository.nodeId)
return retrievedBlockHash, blockExists(retrievedBlockHash)
}
func (db DB) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error {
func (blockRepository BlockRepository) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error {
for _, tlog := range logs {
_, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)
@ -210,19 +214,19 @@ func blockExists(retrievedBlockHash string) bool {
return retrievedBlockHash != ""
}
func (db DB) removeBlock(blockNumber int64) error {
_, err := db.DB.Exec(
func (blockRepository BlockRepository) removeBlock(blockNumber int64) error {
_, err := blockRepository.DB.Exec(
`DELETE FROM
blocks
WHERE number=$1 AND node_id=$2`,
blockNumber, db.nodeId)
blockNumber, blockRepository.nodeId)
if err != nil {
return ErrDBDeleteFailed
}
return nil
}
func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) {
func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Block, error) {
type b struct {
ID int
core.Block
@ -232,7 +236,7 @@ func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) {
if err != nil {
return core.Block{}, err
}
transactionRows, err := db.DB.Queryx(`
transactionRows, err := blockRepository.DB.Queryx(`
SELECT hash,
nonce,
tx_to,
@ -247,11 +251,11 @@ func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) {
if err != nil {
return core.Block{}, err
}
block.Transactions = db.loadTransactions(transactionRows)
block.Transactions = blockRepository.LoadTransactions(transactionRows)
return block.Block, nil
}
func (db DB) loadTransactions(transactionRows *sqlx.Rows) []core.Transaction {
func (blockRepository BlockRepository) LoadTransactions(transactionRows *sqlx.Rows) []core.Transaction {
var transactions []core.Transaction
for transactionRows.Next() {
var transaction core.Transaction

View File

@ -12,7 +12,8 @@ import (
)
var _ = Describe("Saving blocks", func() {
var repository repositories.BlockRepository
var db *postgres.DB
var blockRepository repositories.BlockRepository
BeforeEach(func() {
node := core.Node{
GenesisBlock: "GENESIS",
@ -20,21 +21,24 @@ var _ = Describe("Saving blocks", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
repository = postgres.BuildRepository(node)
db = postgres.NewTestDB(node)
blockRepository = postgres.BlockRepository{DB: db}
})
It("associates blocks to a node", func() {
block := core.Block{
Number: 123,
}
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
nodeTwo := core.Node{
GenesisBlock: "0x456",
NetworkId: 1,
Id: "x123456",
ClientName: "Geth",
}
repositoryTwo := postgres.BuildRepository(nodeTwo)
dbTwo := postgres.NewTestDB(nodeTwo)
repositoryTwo := postgres.BlockRepository{DB: dbTwo}
_, err := repositoryTwo.GetBlock(123)
Expect(err).To(HaveOccurred())
@ -72,9 +76,9 @@ var _ = Describe("Saving blocks", func() {
UnclesReward: unclesReward,
}
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
savedBlock, err := repository.GetBlock(blockNumber)
savedBlock, err := blockRepository.GetBlock(blockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(savedBlock.Reward).To(Equal(blockReward))
Expect(savedBlock.Difficulty).To(Equal(difficulty))
@ -93,7 +97,7 @@ var _ = Describe("Saving blocks", func() {
})
It("does not find a block when searching for a number that does not exist", func() {
_, err := repository.GetBlock(111)
_, err := blockRepository.GetBlock(111)
Expect(err).To(HaveOccurred())
})
@ -104,9 +108,9 @@ var _ = Describe("Saving blocks", func() {
Transactions: []core.Transaction{{}},
}
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
savedBlock, _ := repository.GetBlock(123)
savedBlock, _ := blockRepository.GetBlock(123)
Expect(len(savedBlock.Transactions)).To(Equal(1))
})
@ -116,9 +120,9 @@ var _ = Describe("Saving blocks", func() {
Transactions: []core.Transaction{{}, {}},
}
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
savedBlock, _ := repository.GetBlock(123)
savedBlock, _ := blockRepository.GetBlock(123)
Expect(len(savedBlock.Transactions)).To(Equal(2))
})
@ -135,10 +139,10 @@ var _ = Describe("Saving blocks", func() {
Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}},
}
repository.CreateOrUpdateBlock(blockOne)
repository.CreateOrUpdateBlock(blockTwo)
blockRepository.CreateOrUpdateBlock(blockOne)
blockRepository.CreateOrUpdateBlock(blockTwo)
savedBlock, _ := repository.GetBlock(123)
savedBlock, _ := blockRepository.GetBlock(123)
Expect(len(savedBlock.Transactions)).To(Equal(2))
Expect(savedBlock.Transactions[0].Hash).To(Equal("x678"))
Expect(savedBlock.Transactions[1].Hash).To(Equal("x9ab"))
@ -154,16 +158,17 @@ var _ = Describe("Saving blocks", func() {
Number: 123,
Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}},
}
repository.CreateOrUpdateBlock(blockOne)
blockRepository.CreateOrUpdateBlock(blockOne)
nodeTwo := core.Node{
GenesisBlock: "0x456",
NetworkId: 1,
}
repositoryTwo := postgres.BuildRepository(nodeTwo)
dbTwo := postgres.NewTestDB(nodeTwo)
repositoryTwo := postgres.BlockRepository{DB: dbTwo}
repository.CreateOrUpdateBlock(blockOne)
blockRepository.CreateOrUpdateBlock(blockOne)
repositoryTwo.CreateOrUpdateBlock(blockTwo)
retrievedBlockOne, _ := repository.GetBlock(123)
retrievedBlockOne, _ := blockRepository.GetBlock(123)
retrievedBlockTwo, _ := repositoryTwo.GetBlock(123)
Expect(retrievedBlockOne.Transactions[0].Hash).To(Equal("x123"))
@ -194,9 +199,9 @@ var _ = Describe("Saving blocks", func() {
Transactions: []core.Transaction{transaction},
}
repository.CreateOrUpdateBlock(block)
blockRepository.CreateOrUpdateBlock(block)
savedBlock, _ := repository.GetBlock(123)
savedBlock, _ := blockRepository.GetBlock(123)
Expect(len(savedBlock.Transactions)).To(Equal(1))
savedTransaction := savedBlock.Transactions[0]
Expect(savedTransaction.Data).To(Equal(transaction.Data))
@ -211,54 +216,54 @@ var _ = Describe("Saving blocks", func() {
Describe("The missing block numbers", func() {
It("is empty the starting block number is the highest known block number", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 1})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 1})
Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0))
Expect(len(blockRepository.MissingBlockNumbers(1, 1))).To(Equal(0))
})
It("is the only missing block number", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 2})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 2})
Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1}))
Expect(blockRepository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1}))
})
It("is both missing block numbers", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 3})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2}))
Expect(blockRepository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2}))
})
It("goes back to the starting block number", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 6})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5}))
Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5}))
})
It("only includes missing block numbers", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 4})
repository.CreateOrUpdateBlock(core.Block{Number: 6})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 4})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 6})
Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5}))
Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5}))
})
It("is a list with multiple gaps", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 4})
repository.CreateOrUpdateBlock(core.Block{Number: 5})
repository.CreateOrUpdateBlock(core.Block{Number: 8})
repository.CreateOrUpdateBlock(core.Block{Number: 10})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 4})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 5})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 8})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 10})
Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9}))
Expect(blockRepository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9}))
})
It("returns empty array when lower bound exceeds upper bound", func() {
Expect(repository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{}))
Expect(blockRepository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{}))
})
It("only returns requested range even when other gaps exist", func() {
repository.CreateOrUpdateBlock(core.Block{Number: 3})
repository.CreateOrUpdateBlock(core.Block{Number: 8})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 3})
blockRepository.CreateOrUpdateBlock(core.Block{Number: 8})
Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5}))
Expect(blockRepository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5}))
})
})
@ -266,15 +271,15 @@ var _ = Describe("Saving blocks", func() {
It("sets the status of blocks within n-20 of chain HEAD as final", func() {
blockNumberOfChainHead := 25
for i := 0; i < blockNumberOfChainHead; i++ {
repository.CreateOrUpdateBlock(core.Block{Number: int64(i), Hash: strconv.Itoa(i)})
blockRepository.CreateOrUpdateBlock(core.Block{Number: int64(i), Hash: strconv.Itoa(i)})
}
repository.SetBlocksStatus(int64(blockNumberOfChainHead))
blockRepository.SetBlocksStatus(int64(blockNumberOfChainHead))
blockOne, err := repository.GetBlock(1)
blockOne, err := blockRepository.GetBlock(1)
Expect(err).ToNot(HaveOccurred())
Expect(blockOne.IsFinal).To(Equal(true))
blockTwo, err := repository.GetBlock(24)
blockTwo, err := blockRepository.GetBlock(24)
Expect(err).ToNot(HaveOccurred())
Expect(blockTwo.IsFinal).To(BeFalse())
})

View File

@ -7,13 +7,17 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
func (db DB) CreateContract(contract core.Contract) error {
type ContractRepository struct {
*DB
}
func (contractRepository ContractRepository) CreateContract(contract core.Contract) error {
abi := contract.Abi
var abiToInsert *string
if abi != "" {
abiToInsert = &abi
}
_, err := db.DB.Exec(
_, err := contractRepository.DB.Exec(
`INSERT INTO watched_contracts (contract_hash, contract_abi)
VALUES ($1, $2)
ON CONFLICT (contract_hash)
@ -26,9 +30,9 @@ func (db DB) CreateContract(contract core.Contract) error {
return nil
}
func (db DB) ContractExists(contractHash string) bool {
func (contractRepository ContractRepository) ContractExists(contractHash string) bool {
var exists bool
db.DB.QueryRow(
contractRepository.DB.QueryRow(
`SELECT exists(
SELECT 1
FROM watched_contracts
@ -36,21 +40,21 @@ func (db DB) ContractExists(contractHash string) bool {
return exists
}
func (db DB) GetContract(contractHash string) (core.Contract, error) {
func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) {
var hash string
var abi string
contract := db.DB.QueryRow(
contract := contractRepository.DB.QueryRow(
`SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash)
err := contract.Scan(&hash, &abi)
if err == sql.ErrNoRows {
return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash)
}
savedContract := db.addTransactions(core.Contract{Hash: hash, Abi: abi})
savedContract := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi})
return savedContract, nil
}
func (db DB) addTransactions(contract core.Contract) core.Contract {
transactionRows, _ := db.DB.Queryx(`
func (contractRepository ContractRepository) addTransactions(contract core.Contract) core.Contract {
transactionRows, _ := contractRepository.DB.Queryx(`
SELECT hash,
nonce,
tx_to,
@ -62,7 +66,8 @@ func (db DB) addTransactions(contract core.Contract) core.Contract {
FROM transactions
WHERE tx_to = $1
ORDER BY block_id DESC`, contract.Hash)
transactions := db.loadTransactions(transactionRows)
blockRepository := &BlockRepository{contractRepository.DB}
transactions := blockRepository.LoadTransactions(transactionRows)
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
return savedContract
}

View File

@ -1,4 +1,4 @@
package postgres
package postgres_test
import (
"sort"
@ -7,11 +7,14 @@ import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
)
var _ = Describe("Creating contracts", func() {
var repository repositories.ContractRepository
var db *postgres.DB
var contractRepository repositories.ContractRepository
var node core.Node
BeforeEach(func() {
node = core.Node{
GenesisBlock: "GENESIS",
@ -19,35 +22,36 @@ var _ = Describe("Creating contracts", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
repository = BuildRepository(node)
db = postgres.NewTestDB(node)
contractRepository = postgres.ContractRepository{DB: db}
})
It("returns the contract when it exists", func() {
repository.CreateContract(core.Contract{Hash: "x123"})
contractRepository.CreateContract(core.Contract{Hash: "x123"})
contract, err := repository.GetContract("x123")
contract, err := contractRepository.GetContract("x123")
Expect(err).NotTo(HaveOccurred())
Expect(contract.Hash).To(Equal("x123"))
Expect(repository.ContractExists("x123")).To(BeTrue())
Expect(repository.ContractExists("x456")).To(BeFalse())
Expect(contractRepository.ContractExists("x123")).To(BeTrue())
Expect(contractRepository.ContractExists("x456")).To(BeFalse())
})
It("returns err if contract does not exist", func() {
_, err := repository.GetContract("x123")
_, err := contractRepository.GetContract("x123")
Expect(err).To(HaveOccurred())
})
It("returns empty array when no transactions 'To' a contract", func() {
repository.CreateContract(core.Contract{Hash: "x123"})
contract, err := repository.GetContract("x123")
contractRepository.CreateContract(core.Contract{Hash: "x123"})
contract, err := contractRepository.GetContract("x123")
Expect(err).ToNot(HaveOccurred())
Expect(contract.Transactions).To(BeEmpty())
})
It("returns transactions 'To' a contract", func() {
var blockRepository repositories.BlockRepository
blockRepository = BuildRepository(node)
blockRepository = postgres.BlockRepository{DB: db}
block := core.Block{
Number: 123,
Transactions: []core.Transaction{
@ -58,8 +62,8 @@ var _ = Describe("Creating contracts", func() {
}
blockRepository.CreateOrUpdateBlock(block)
repository.CreateContract(core.Contract{Hash: "x123"})
contract, err := repository.GetContract("x123")
contractRepository.CreateContract(core.Contract{Hash: "x123"})
contract, err := contractRepository.GetContract("x123")
Expect(err).ToNot(HaveOccurred())
sort.Slice(contract.Transactions, func(i, j int) bool {
return contract.Transactions[i].Hash < contract.Transactions[j].Hash
@ -72,25 +76,25 @@ var _ = Describe("Creating contracts", func() {
})
It("stores the ABI of the contract", func() {
repository.CreateContract(core.Contract{
contractRepository.CreateContract(core.Contract{
Abi: "{\"some\": \"json\"}",
Hash: "x123",
})
contract, err := repository.GetContract("x123")
contract, err := contractRepository.GetContract("x123")
Expect(err).ToNot(HaveOccurred())
Expect(contract.Abi).To(Equal("{\"some\": \"json\"}"))
})
It("updates the ABI of the contract if hash already present", func() {
repository.CreateContract(core.Contract{
contractRepository.CreateContract(core.Contract{
Abi: "{\"some\": \"json\"}",
Hash: "x123",
})
repository.CreateContract(core.Contract{
contractRepository.CreateContract(core.Contract{
Abi: "{\"some\": \"different json\"}",
Hash: "x123",
})
contract, err := repository.GetContract("x123")
contract, err := contractRepository.GetContract("x123")
Expect(err).ToNot(HaveOccurred())
Expect(contract.Abi).To(Equal("{\"some\": \"different json\"}"))
})

View File

@ -5,18 +5,18 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
)
func ClearData(postgres *DB) {
postgres.DB.MustExec("DELETE FROM watched_contracts")
postgres.DB.MustExec("DELETE FROM transactions")
postgres.DB.MustExec("DELETE FROM blocks")
postgres.DB.MustExec("DELETE FROM logs")
postgres.DB.MustExec("DELETE FROM receipts")
postgres.DB.MustExec("DELETE FROM log_filters")
func (db *DB) clearData() {
db.MustExec("DELETE FROM watched_contracts")
db.MustExec("DELETE FROM transactions")
db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM logs")
db.MustExec("DELETE FROM receipts")
db.MustExec("DELETE FROM log_filters")
}
func BuildRepository(node core.Node) *DB {
func NewTestDB(node core.Node) *DB {
cfg, _ := config.NewConfig("private")
repository, _ := NewDB(cfg.Database, node)
ClearData(repository)
return repository
db, _ := NewDB(cfg.Database, node)
db.clearData()
return db
}

View File

@ -10,8 +10,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
func (db DB) CreateFilter(query filters.LogFilter) error {
_, err := db.DB.Exec(
type FilterRepository struct {
*DB
}
func (filterRepository FilterRepository) CreateFilter(query filters.LogFilter) error {
_, err := filterRepository.DB.Exec(
`INSERT INTO log_filters
(name, from_block, to_block, address, topic0, topic1, topic2, topic3)
VALUES ($1, NULLIF($2, -1), NULLIF($3, -1), $4, NULLIF($5, ''), NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''))`,
@ -22,9 +26,9 @@ func (db DB) CreateFilter(query filters.LogFilter) error {
return nil
}
func (db DB) GetFilter(name string) (filters.LogFilter, error) {
func (filterRepository FilterRepository) GetFilter(name string) (filters.LogFilter, error) {
lf := DBLogFilter{}
err := db.DB.Get(&lf,
err := filterRepository.DB.Get(&lf,
`SELECT
id,
name,

View File

@ -1,4 +1,4 @@
package postgres
package postgres_test
import (
. "github.com/onsi/ginkgo"
@ -6,10 +6,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/repositories"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
)
var _ = Describe("Logs Repository", func() {
var repository repositories.FilterRepository
var _ = Describe("Log Filters Repository", func() {
var db *postgres.DB
var filterRepository repositories.FilterRepository
var node core.Node
BeforeEach(func() {
node = core.Node{
@ -18,7 +20,8 @@ var _ = Describe("Logs Repository", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
repository = BuildRepository(node)
db = postgres.NewTestDB(node)
filterRepository = postgres.FilterRepository{DB: db}
})
Describe("LogFilter", func() {
@ -37,7 +40,7 @@ var _ = Describe("Logs Repository", func() {
"",
},
}
err := repository.CreateFilter(logFilter)
err := filterRepository.CreateFilter(logFilter)
Expect(err).ToNot(HaveOccurred())
})
@ -54,7 +57,7 @@ var _ = Describe("Logs Repository", func() {
"",
},
}
err := repository.CreateFilter(logFilter)
err := filterRepository.CreateFilter(logFilter)
Expect(err).To(HaveOccurred())
})
@ -72,7 +75,7 @@ var _ = Describe("Logs Repository", func() {
"",
},
}
err := repository.CreateFilter(logFilter1)
err := filterRepository.CreateFilter(logFilter1)
Expect(err).ToNot(HaveOccurred())
logFilter2 := filters.LogFilter{
Name: "TestFilter2",
@ -86,19 +89,19 @@ var _ = Describe("Logs Repository", func() {
"",
},
}
err = repository.CreateFilter(logFilter2)
err = filterRepository.CreateFilter(logFilter2)
Expect(err).ToNot(HaveOccurred())
logFilter1, err = repository.GetFilter("TestFilter1")
logFilter1, err = filterRepository.GetFilter("TestFilter1")
Expect(err).ToNot(HaveOccurred())
Expect(logFilter1).To(Equal(logFilter1))
logFilter1, err = repository.GetFilter("TestFilter1")
logFilter1, err = filterRepository.GetFilter("TestFilter1")
Expect(err).ToNot(HaveOccurred())
Expect(logFilter2).To(Equal(logFilter2))
})
It("returns ErrFilterDoesNotExist error when log does not exist", func() {
_, err := repository.GetFilter("TestFilter1")
_, err := filterRepository.GetFilter("TestFilter1")
Expect(err).To(Equal(repositories.ErrFilterDoesNotExist("TestFilter1")))
})
})

View File

@ -8,9 +8,13 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
)
func (db DB) CreateLogs(logs []core.Log) error {
tx, _ := db.DB.BeginTx(context.Background(), nil)
for _, tlog := range logs {
type LogRepository struct {
*DB
}
func (logRepository LogRepository) CreateLogs(lgs []core.Log) error {
tx, _ := logRepository.DB.BeginTx(context.Background(), nil)
for _, tlog := range lgs {
_, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
@ -26,8 +30,8 @@ func (db DB) CreateLogs(logs []core.Log) error {
return nil
}
func (db DB) GetLogs(address string, blockNumber int64) []core.Log {
logRows, _ := db.DB.Query(
func (logRepository LogRepository) GetLogs(address string, blockNumber int64) []core.Log {
logRows, _ := logRepository.DB.Query(
`SELECT block_number,
address,
tx_hash,
@ -40,11 +44,11 @@ func (db DB) GetLogs(address string, blockNumber int64) []core.Log {
FROM logs
WHERE address = $1 AND block_number = $2
ORDER BY block_number DESC`, address, blockNumber)
return db.loadLogs(logRows)
return logRepository.loadLogs(logRows)
}
func (db DB) loadLogs(logsRows *sql.Rows) []core.Log {
var logs []core.Log
func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log {
var lgs []core.Log
for logsRows.Next() {
var blockNumber int64
var address string
@ -53,7 +57,7 @@ func (db DB) loadLogs(logsRows *sql.Rows) []core.Log {
var data string
var topics core.Topics
logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data)
log := core.Log{
lg := core.Log{
BlockNumber: blockNumber,
TxHash: txHash,
Address: address,
@ -61,9 +65,9 @@ func (db DB) loadLogs(logsRows *sql.Rows) []core.Log {
Data: data,
}
for i, topic := range topics {
log.Topics[i] = topic
lg.Topics[i] = topic
}
logs = append(logs, log)
lgs = append(lgs, lg)
}
return logs
return lgs
}

View File

@ -11,7 +11,8 @@ import (
)
var _ = Describe("Logs Repository", func() {
var repository repositories.LogsRepository
var db *postgres.DB
var logsRepository repositories.LogRepository
var node core.Node
BeforeEach(func() {
node = core.Node{
@ -20,12 +21,13 @@ var _ = Describe("Logs Repository", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
repository = postgres.BuildRepository(node)
db = postgres.NewTestDB(node)
logsRepository = postgres.LogRepository{DB: db}
})
Describe("Saving logs", func() {
It("returns the log when it exists", func() {
repository.CreateLogs([]core.Log{{
logsRepository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
@ -35,7 +37,7 @@ var _ = Describe("Logs Repository", func() {
}},
)
log := repository.GetLogs("x123", 1)
log := logsRepository.GetLogs("x123", 1)
Expect(log).NotTo(BeNil())
Expect(log[0].BlockNumber).To(Equal(int64(1)))
@ -49,12 +51,12 @@ var _ = Describe("Logs Repository", func() {
})
It("returns nil if log does not exist", func() {
log := repository.GetLogs("x123", 1)
log := logsRepository.GetLogs("x123", 1)
Expect(log).To(BeNil())
})
It("filters to the correct block number and address", func() {
repository.CreateLogs([]core.Log{{
logsRepository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
@ -63,7 +65,7 @@ var _ = Describe("Logs Repository", func() {
Data: "xabc",
}},
)
repository.CreateLogs([]core.Log{{
logsRepository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 1,
Address: "x123",
@ -72,7 +74,7 @@ var _ = Describe("Logs Repository", func() {
Data: "xdef",
}},
)
repository.CreateLogs([]core.Log{{
logsRepository.CreateLogs([]core.Log{{
BlockNumber: 2,
Index: 0,
Address: "x123",
@ -82,7 +84,7 @@ var _ = Describe("Logs Repository", func() {
}},
)
log := repository.GetLogs("x123", 1)
log := logsRepository.GetLogs("x123", 1)
type logIndex struct {
blockNumber int64
@ -114,7 +116,8 @@ var _ = Describe("Logs Repository", func() {
It("saves the logs attached to a receipt", func() {
var blockRepository repositories.BlockRepository
blockRepository = postgres.BuildRepository(node)
blockRepository = postgres.BlockRepository{DB: db}
logs := []core.Log{{
Address: "0x8a4774fe82c63484afef97ca8d89a6ea5e21f973",
BlockNumber: 4745407,
@ -168,7 +171,7 @@ var _ = Describe("Logs Repository", func() {
block := core.Block{Transactions: []core.Transaction{transaction}}
err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).To(Not(HaveOccurred()))
retrievedLogs := repository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407)
retrievedLogs := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407)
expected := logs[1:]
Expect(retrievedLogs).To(Equal(expected))

View File

@ -1,27 +0,0 @@
package postgres
import "github.com/vulcanize/vulcanizedb/pkg/core"
type NodeRepository interface {
CreateNode(node *core.Node) error
}
func (db *DB) CreateNode(node *core.Node) error {
var nodeId int64
err := db.DB.QueryRow(
`INSERT INTO nodes (genesis_block, network_id, node_id, client_name)
VALUES ($1, $2, $3, $4)
ON CONFLICT (genesis_block, network_id, node_id)
DO UPDATE
SET genesis_block = $1,
network_id = $2,
node_id = $3,
client_name = $4
RETURNING id`,
node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId)
if err != nil {
return ErrUnableToSetNode
}
db.nodeId = nodeId
return nil
}

View File

@ -35,3 +35,23 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) {
}
return &pg, nil
}
func (db *DB) CreateNode(node *core.Node) error {
var nodeId int64
err := db.QueryRow(
`INSERT INTO nodes (genesis_block, network_id, node_id, client_name)
VALUES ($1, $2, $3, $4)
ON CONFLICT (genesis_block, network_id, node_id)
DO UPDATE
SET genesis_block = $1,
network_id = $2,
node_id = $3,
client_name = $4
RETURNING id`,
node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId)
if err != nil {
return ErrUnableToSetNode
}
db.nodeId = nodeId
return nil
}

View File

@ -22,8 +22,8 @@ func init() {
log.SetOutput(ioutil.Discard)
}
var _ = Describe("Postgres repository", func() {
var repository *postgres.DB
var _ = Describe("Postgres DB", func() {
var db *postgres.DB
It("connects to the database", func() {
cfg, _ := config.NewConfig("private")
@ -40,9 +40,7 @@ var _ = Describe("Postgres repository", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
cfg, _ := config.NewConfig("private")
repository, _ = postgres.NewDB(cfg.Database, node)
postgres.ClearData(repository)
db = postgres.NewTestDB(node)
})
It("serializes big.Int to db", func() {
@ -91,10 +89,11 @@ var _ = Describe("Postgres repository", func() {
}
cfg, _ := config.NewConfig("private")
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"}
repository, _ := postgres.NewDB(cfg.Database, node)
db, _ := postgres.NewDB(cfg.Database, node)
blocksRepository := postgres.BlockRepository{DB: db}
err1 := repository.CreateOrUpdateBlock(badBlock)
savedBlock, err2 := repository.GetBlock(123)
err1 := blocksRepository.CreateOrUpdateBlock(badBlock)
savedBlock, err2 := blocksRepository.GetBlock(123)
Expect(err1).To(HaveOccurred())
Expect(err2).To(HaveOccurred())
@ -126,10 +125,11 @@ var _ = Describe("Postgres repository", func() {
}
cfg, _ := config.NewConfig("private")
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"}
repository, _ := postgres.NewDB(cfg.Database, node)
db, _ := postgres.NewDB(cfg.Database, node)
logRepository := postgres.LogRepository{DB: db}
err := repository.CreateLogs([]core.Log{badLog})
savedBlock := repository.GetLogs("x123", 1)
err := logRepository.CreateLogs([]core.Log{badLog})
savedBlock := logRepository.GetLogs("x123", 1)
Expect(err).ToNot(BeNil())
Expect(savedBlock).To(BeNil())
@ -145,10 +145,11 @@ var _ = Describe("Postgres repository", func() {
}
cfg, _ := config.NewConfig("private")
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"}
repository, _ := postgres.NewDB(cfg.Database, node)
db, _ := postgres.NewDB(cfg.Database, node)
blockRepository := postgres.BlockRepository{DB: db}
err1 := repository.CreateOrUpdateBlock(block)
savedBlock, err2 := repository.GetBlock(123)
err1 := blockRepository.CreateOrUpdateBlock(block)
savedBlock, err2 := blockRepository.GetBlock(123)
Expect(err1).To(HaveOccurred())
Expect(err2).To(HaveOccurred())

View File

@ -7,8 +7,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories"
)
func (db DB) GetReceipt(txHash string) (core.Receipt, error) {
row := db.DB.QueryRow(
type ReceiptRepository struct {
*DB
}
func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) {
row := receiptRepository.DB.QueryRow(
`SELECT contract_address,
tx_hash,
cumulative_gas_used,

View File

@ -8,8 +8,9 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
)
var _ = Describe("Logs Repository", func() {
var repository repositories.ReceiptRepository
var _ bool = Describe("Logs Repository", func() {
var receiptRepository repositories.ReceiptRepository
var db *postgres.DB
var node core.Node
BeforeEach(func() {
node = core.Node{
@ -18,13 +19,15 @@ var _ = Describe("Logs Repository", func() {
Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
repository = postgres.BuildRepository(node)
db = postgres.NewTestDB(node)
receiptRepository = postgres.ReceiptRepository{DB: db}
})
Describe("Saving receipts", func() {
It("returns the receipt when it exists", func() {
var blockRepository repositories.BlockRepository
blockRepository = postgres.BuildRepository(node)
db := postgres.NewTestDB(node)
blockRepository = postgres.BlockRepository{DB: db}
expected := core.Receipt{
ContractAddress: "0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae",
CumulativeGasUsed: 7996119,
@ -42,7 +45,7 @@ var _ = Describe("Logs Repository", func() {
block := core.Block{Transactions: []core.Transaction{transaction}}
blockRepository.CreateOrUpdateBlock(block)
receipt, err := repository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223")
receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223")
Expect(err).ToNot(HaveOccurred())
//Not currently serializing bloom logs
@ -55,14 +58,15 @@ var _ = Describe("Logs Repository", func() {
})
It("returns ErrReceiptDoesNotExist when receipt does not exist", func() {
receipt, err := repository.GetReceipt("DOES NOT EXIST")
receipt, err := receiptRepository.GetReceipt("DOES NOT EXIST")
Expect(err).To(HaveOccurred())
Expect(receipt).To(BeZero())
})
It("still saves receipts without logs", func() {
var blockRepository repositories.BlockRepository
blockRepository = postgres.BuildRepository(node)
db := postgres.NewTestDB(node)
blockRepository = postgres.BlockRepository{DB: db}
receipt := core.Receipt{
TxHash: "0x002c4799161d809b23f67884eb6598c9df5894929fe1a9ead97ca175d360f547",
}
@ -76,7 +80,7 @@ var _ = Describe("Logs Repository", func() {
}
blockRepository.CreateOrUpdateBlock(block)
_, err := repository.GetReceipt(receipt.TxHash)
_, err := receiptRepository.GetReceipt(receipt.TxHash)
Expect(err).To(Not(HaveOccurred()))
})

View File

@ -4,8 +4,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
)
func (db DB) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) {
rows, err := db.DB.Queryx(`SELECT name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name)
type WatchedEventRepository struct {
*DB
}
func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) {
rows, err := watchedEventRepository.DB.Queryx(`SELECT name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name)
if err != nil {
return nil, err
}

View File

@ -1,30 +1,24 @@
package postgres_test
import (
"log"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/repositories/postgres"
)
var _ = Describe("Watched Events Repository", func() {
var repository *postgres.DB
var db *postgres.DB
var logRepository postgres.LogRepository
var filterRepository postgres.FilterRepository
var watchedEventRepository postgres.WatchedEventRepository
BeforeEach(func() {
cfg, err := config.NewConfig("private")
if err != nil {
log.Fatal(err)
}
repository, err = postgres.NewDB(cfg.Database, core.Node{})
if err != nil {
log.Fatal(err)
}
postgres.ClearData(repository)
db = postgres.NewTestDB(core.Node{})
logRepository = postgres.LogRepository{DB: db}
filterRepository = postgres.FilterRepository{DB: db}
watchedEventRepository = postgres.WatchedEventRepository{DB: db}
})
It("retrieves watched event logs that match the event filter", func() {
@ -57,11 +51,11 @@ var _ = Describe("Watched Events Repository", func() {
Data: "",
},
}
err := repository.CreateFilter(filter)
err := filterRepository.CreateFilter(filter)
Expect(err).ToNot(HaveOccurred())
err = repository.CreateLogs(logs)
err = logRepository.CreateLogs(logs)
Expect(err).ToNot(HaveOccurred())
matchingLogs, err := repository.GetWatchedEvents("Filter1")
matchingLogs, err := watchedEventRepository.GetWatchedEvents("Filter1")
Expect(err).ToNot(HaveOccurred())
Expect(matchingLogs).To(Equal(expectedWatchedEventLog))
@ -103,11 +97,11 @@ var _ = Describe("Watched Events Repository", func() {
Index: 0,
Data: "",
}}
err := repository.CreateFilter(filter)
err := filterRepository.CreateFilter(filter)
Expect(err).ToNot(HaveOccurred())
err = repository.CreateLogs(logs)
err = logRepository.CreateLogs(logs)
Expect(err).ToNot(HaveOccurred())
matchingLogs, err := repository.GetWatchedEvents("Filter1")
matchingLogs, err := watchedEventRepository.GetWatchedEvents("Filter1")
Expect(err).ToNot(HaveOccurred())
Expect(matchingLogs).To(Equal(expectedWatchedEventLog))

View File

@ -8,15 +8,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/filters"
)
type Repository interface {
BlockRepository
ContractRepository
LogsRepository
ReceiptRepository
FilterRepository
WatchedEventsRepository
}
var ErrBlockDoesNotExist = func(blockNumber int64) error {
return errors.New(fmt.Sprintf("Block number %d does not exist", blockNumber))
}
@ -47,7 +38,7 @@ type FilterRepository interface {
GetFilter(name string) (filters.LogFilter, error)
}
type LogsRepository interface {
type LogRepository interface {
CreateLogs(logs []core.Log) error
GetLogs(address string, blockNumber int64) []core.Log
}
@ -60,6 +51,6 @@ type ReceiptRepository interface {
GetReceipt(txHash string) (core.Receipt, error)
}
type WatchedEventsRepository interface {
type WatchedEventRepository interface {
GetWatchedEvents(name string) ([]*core.WatchedEvent, error)
}

View File

@ -24,11 +24,11 @@ func LoadConfig(environment string) config.Config {
}
func LoadPostgres(database config.Database, node core.Node) postgres.DB {
repository, err := postgres.NewDB(database, node)
db, err := postgres.NewDB(database, node)
if err != nil {
log.Fatalf("Error loading postgres\n%v", err)
}
return *repository
return *db
}
func ReadAbiFile(abiFilepath string) string {