From ed907535e37f4d7b4f17bcc9c01f1ee136e30147 Mon Sep 17 00:00:00 2001 From: Matt K <1036969+mkrump@users.noreply.github.com> Date: Mon, 12 Feb 2018 10:54:05 -0600 Subject: [PATCH] Separate DB access into several repos (#28) * Separate files for InMemory * Start using separate repos for collaborating objects * Before Updating schema * Separate various repos --- cmd/addFilter.go | 6 +- cmd/graphql.go | 15 ++- cmd/sync.go | 14 +- filters/example-filter.json | 32 +++-- integration_test/geth_blockchain_test.go | 9 +- pkg/contract_summary/summary.go | 4 +- pkg/contract_summary/summary_test.go | 72 +++++----- pkg/graphql_server/schema.go | 19 ++- pkg/graphql_server/schema_test.go | 56 ++++---- pkg/history/populate_blocks.go | 10 +- pkg/history/populate_blocks_test.go | 60 +++++---- pkg/history/validate_blocks.go | 10 +- pkg/history/validate_blocks_test.go | 15 ++- pkg/repositories/inmemory/block_repository.go | 51 +++++++ .../inmemory/contract_repository.go | 35 +++++ pkg/repositories/inmemory/in_memory.go | 126 ------------------ pkg/repositories/postgres/block_repository.go | 72 +++++----- .../postgres/block_repository_test.go | 93 +++++++------ .../postgres/contract_repository.go | 25 ++-- .../postgres/contract_repository_test.go | 40 +++--- pkg/repositories/postgres/helpers.go | 22 +-- .../postgres/log_filter_repository.go | 12 +- .../postgres/log_filter_repository_test.go | 25 ++-- pkg/repositories/postgres/logs.go | 28 ++-- pkg/repositories/postgres/logs_test.go | 25 ++-- pkg/repositories/postgres/node_repository.go | 27 ---- pkg/repositories/postgres/postgres.go | 20 +++ pkg/repositories/postgres/postgres_test.go | 29 ++-- .../postgres/receipt_repository.go | 8 +- .../postgres/receipts_repository_test.go | 20 +-- pkg/repositories/postgres/watched_events.go | 8 +- .../postgres/watched_events_test.go | 34 ++--- pkg/repositories/repository.go | 13 +- utils/utils.go | 4 +- 34 files changed, 538 insertions(+), 501 deletions(-) create mode 100644 pkg/repositories/inmemory/block_repository.go create mode 100644 pkg/repositories/inmemory/contract_repository.go delete mode 100644 pkg/repositories/postgres/node_repository.go diff --git a/cmd/addFilter.go b/cmd/addFilter.go index a6abf908..f4b6e367 100644 --- a/cmd/addFilter.go +++ b/cmd/addFilter.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/geth" + "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" "github.com/vulcanize/vulcanizedb/utils" ) @@ -56,7 +57,8 @@ func addFilter() { } var logFilters filters.LogFilters blockchain := geth.NewBlockchain(ipc) - repository := utils.LoadPostgres(databaseConfig, blockchain.Node()) + db := utils.LoadPostgres(databaseConfig, blockchain.Node()) + filterRepository := postgres.FilterRepository{DB: &db} absFilePath := utils.AbsFilePath(filterFilepath) logFilterBytes, err := ioutil.ReadFile(absFilePath) if err != nil { @@ -67,7 +69,7 @@ func addFilter() { log.Fatal(err) } for _, filter := range logFilters { - err = repository.CreateFilter(filter) + err = filterRepository.CreateFilter(filter) if err != nil { log.Fatal(err) } diff --git a/cmd/graphql.go b/cmd/graphql.go index fc2697f3..20213127 100644 --- a/cmd/graphql.go +++ b/cmd/graphql.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/graphql_server" + "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" "github.com/vulcanize/vulcanizedb/utils" ) @@ -43,8 +44,18 @@ func init() { func parseSchema() *graphql.Schema { blockchain := geth.NewBlockchain(ipc) - repository := utils.LoadPostgres(databaseConfig, blockchain.Node()) - schema := graphql.MustParseSchema(graphql_server.Schema, graphql_server.NewResolver(repository)) + db := utils.LoadPostgres(databaseConfig, blockchain.Node()) + blockRepository := &postgres.BlockRepository{DB: &db} + logRepository := &postgres.LogRepository{DB: &db} + filterRepository := &postgres.FilterRepository{DB: &db} + watchedEventRepository := &postgres.WatchedEventRepository{DB: &db} + graphQLRepositories := graphql_server.GraphQLRepositories{ + WatchedEventRepository: watchedEventRepository, + BlockRepository: blockRepository, + LogRepository: logRepository, + FilterRepository: filterRepository, + } + schema := graphql.MustParseSchema(graphql_server.Schema, graphql_server.NewResolver(graphQLRepositories)) return schema } diff --git a/cmd/sync.go b/cmd/sync.go index 3da13f5d..84742f86 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -11,6 +11,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/pkg/repositories" "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" "github.com/vulcanize/vulcanizedb/utils" ) @@ -49,9 +50,9 @@ func init() { syncCmd.Flags().IntVarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from") } -func backFillAllBlocks(blockchain core.Blockchain, repository postgres.DB, missingBlocksPopulated chan int, startingBlockNumber int64) { +func backFillAllBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { go func() { - missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, startingBlockNumber) + missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) }() } @@ -63,12 +64,13 @@ func sync() { if blockchain.LastBlock().Int64() == 0 { log.Fatal("geth initial: state sync not finished") } - repository := utils.LoadPostgres(databaseConfig, blockchain.Node()) - validator := history.NewBlockValidator(blockchain, repository, 15) + db := utils.LoadPostgres(databaseConfig, blockchain.Node()) + blockRepository := postgres.BlockRepository{DB: &db} + validator := history.NewBlockValidator(blockchain, blockRepository, 15) missingBlocksPopulated := make(chan int) _startingBlockNumber := int64(startingBlockNumber) - go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber) + go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, _startingBlockNumber) for { select { @@ -76,7 +78,7 @@ func sync() { window := validator.ValidateBlocks() validator.Log(os.Stdout, window) case <-missingBlocksPopulated: - go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber) + go backFillAllBlocks(blockchain, blockRepository, missingBlocksPopulated, _startingBlockNumber) } } } diff --git a/filters/example-filter.json b/filters/example-filter.json index f29e9a95..1ac00c7e 100644 --- a/filters/example-filter.json +++ b/filters/example-filter.json @@ -1,14 +1,20 @@ -[{ -"name": "TransferFilter", -"fromBlock": "0x488290", -"toBlock": "0x488678", -"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", -"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] -}, -{ - "name": "NewFilter", - "fromBlock": "0x4B34AD", - "address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", - "topics": ["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"] -}] +[ + { + "name": "TransferFilter", + "fromBlock": "0x488290", + "toBlock": "0x488678", + "address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" + ] + }, + { + "name": "NewFilter", + "fromBlock": "0x4B34AD", + "address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", + "topics": [ + "0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80" + ] + } +] diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index ce238030..515a37a7 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -19,18 +19,19 @@ func init() { var _ = Describe("Reading from the Geth blockchain", func() { var blockchain *geth.Blockchain - var repository *inmemory.InMemory + var inMemory *inmemory.InMemory BeforeEach(func() { cfg, _ := config.NewConfig("private") blockchain = geth.NewBlockchain(cfg.Client.IPCPath) - repository = inmemory.NewInMemory() + inMemory = inmemory.NewInMemory() }) It("reads two blocks", func(done Done) { - validator := history.NewBlockValidator(blockchain, repository, 2) + blocks := &inmemory.BlockRepository{InMemory: inMemory} + validator := history.NewBlockValidator(blockchain, blocks, 2) validator.ValidateBlocks() - Expect(repository.BlockCount()).To(Equal(2)) + Expect(blocks.BlockCount()).To(Equal(2)) close(done) }, 15) diff --git a/pkg/contract_summary/summary.go b/pkg/contract_summary/summary.go index 58908e24..10488389 100644 --- a/pkg/contract_summary/summary.go +++ b/pkg/contract_summary/summary.go @@ -17,8 +17,8 @@ type ContractSummary struct { blockChain core.Blockchain } -func NewSummary(blockchain core.Blockchain, repository repositories.Repository, contractHash string, blockNumber *big.Int) (ContractSummary, error) { - contract, err := repository.GetContract(contractHash) +func NewSummary(blockchain core.Blockchain, contractRepository repositories.ContractRepository, contractHash string, blockNumber *big.Int) (ContractSummary, error) { + contract, err := contractRepository.GetContract(contractHash) if err != nil { return ContractSummary{}, err } else { diff --git a/pkg/contract_summary/summary_test.go b/pkg/contract_summary/summary_test.go index 9b3cca13..06fb9c3f 100644 --- a/pkg/contract_summary/summary_test.go +++ b/pkg/contract_summary/summary_test.go @@ -12,18 +12,26 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories/inmemory" ) -func NewCurrentContractSummary(blockchain core.Blockchain, repository repositories.Repository, contractHash string) (contract_summary.ContractSummary, error) { - return contract_summary.NewSummary(blockchain, repository, contractHash, nil) +func NewCurrentContractSummary(blockchain core.Blockchain, contractRepository repositories.ContractRepository, contractHash string) (contract_summary.ContractSummary, error) { + return contract_summary.NewSummary(blockchain, contractRepository, contractHash, nil) } var _ = Describe("The contract summary", func() { + var inMemoryDB *inmemory.InMemory + var contractRepostiory *inmemory.ContractRepostiory + + BeforeEach(func() { + inMemoryDB = inmemory.NewInMemory() + contractRepostiory = &inmemory.ContractRepostiory{InMemory: inMemoryDB} + + }) + Context("when the given contract does not exist", func() { It("returns an error", func() { - repository := inmemory.NewInMemory() blockchain := fakes.NewBlockchain() - contractSummary, err := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, err := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary).To(Equal(contract_summary.ContractSummary{})) Expect(err).NotTo(BeNil()) @@ -32,101 +40,101 @@ var _ = Describe("The contract summary", func() { Context("when the given contract exists", func() { It("returns the summary", func() { - repository := inmemory.NewInMemory() contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) + contractRepostiory.CreateContract(contract) blockchain := fakes.NewBlockchain() - contractSummary, err := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, err := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary).NotTo(Equal(contract_summary.ContractSummary{})) Expect(err).To(BeNil()) }) It("includes the contract hash in the summary", func() { - repository := inmemory.NewInMemory() contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) + contractRepostiory.CreateContract(contract) blockchain := fakes.NewBlockchain() - contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary.ContractHash).To(Equal("0x123")) }) It("sets the number of transactions", func() { - repository := inmemory.NewInMemory() + blocks := &inmemory.BlockRepository{InMemory: inMemoryDB} + blockchain := fakes.NewBlockchain() + contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) + contractRepostiory.CreateContract(contract) block := core.Block{ Transactions: []core.Transaction{ {To: "0x123"}, {To: "0x123"}, }, } - repository.CreateOrUpdateBlock(block) - blockchain := fakes.NewBlockchain() + blocks.CreateOrUpdateBlock(block) - contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary.NumberOfTransactions).To(Equal(2)) }) It("sets the last transaction", func() { - repository := inmemory.NewInMemory() + blocks := &inmemory.BlockRepository{InMemory: inMemoryDB} + blockchain := fakes.NewBlockchain() + contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) + contractRepostiory.CreateContract(contract) block := core.Block{ Transactions: []core.Transaction{ {Hash: "TRANSACTION2", To: "0x123"}, {Hash: "TRANSACTION1", To: "0x123"}, }, } - repository.CreateOrUpdateBlock(block) - blockchain := fakes.NewBlockchain() + blocks.CreateOrUpdateBlock(block) - contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary.LastTransaction.Hash).To(Equal("TRANSACTION2")) }) It("gets contract state attribute for the contract from the blockchain", func() { - repository := inmemory.NewInMemory() - contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) blockchain := fakes.NewBlockchain() + + contract := core.Contract{Hash: "0x123"} + contractRepostiory.CreateContract(contract) blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar") - contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") attribute := contractSummary.GetStateAttribute("foo") Expect(attribute).To(Equal("bar")) }) It("gets contract state attribute for the contract from the blockchain at specific block height", func() { - repository := inmemory.NewInMemory() - contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) blockchain := fakes.NewBlockchain() + + contract := core.Contract{Hash: "0x123"} + contractRepostiory.CreateContract(contract) blockNumber := big.NewInt(1000) blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar") blockchain.SetContractStateAttribute("0x123", blockNumber, "foo", "baz") - contractSummary, _ := contract_summary.NewSummary(blockchain, repository, "0x123", blockNumber) + contractSummary, _ := contract_summary.NewSummary(blockchain, contractRepostiory, "0x123", blockNumber) attribute := contractSummary.GetStateAttribute("foo") Expect(attribute).To(Equal("baz")) }) It("gets attributes for the contract from the blockchain", func() { - repository := inmemory.NewInMemory() - contract := core.Contract{Hash: "0x123"} - repository.CreateContract(contract) blockchain := fakes.NewBlockchain() + + contract := core.Contract{Hash: "0x123"} + contractRepostiory.CreateContract(contract) blockchain.SetContractStateAttribute("0x123", nil, "foo", "bar") blockchain.SetContractStateAttribute("0x123", nil, "baz", "bar") - contractSummary, _ := NewCurrentContractSummary(blockchain, repository, "0x123") + contractSummary, _ := NewCurrentContractSummary(blockchain, contractRepostiory, "0x123") Expect(contractSummary.Attributes).To(Equal( core.ContractAttributes{ diff --git a/pkg/graphql_server/schema.go b/pkg/graphql_server/schema.go index 3bd9f9fe..34e51e60 100644 --- a/pkg/graphql_server/schema.go +++ b/pkg/graphql_server/schema.go @@ -41,18 +41,25 @@ var Schema = ` } ` -type Resolver struct { - repository repositories.Repository +type GraphQLRepositories struct { + repositories.BlockRepository + repositories.LogRepository + repositories.WatchedEventRepository + repositories.FilterRepository } -func NewResolver(repository repositories.Repository) *Resolver { - return &Resolver{repository: repository} +type Resolver struct { + graphQLRepositories GraphQLRepositories +} + +func NewResolver(repositories GraphQLRepositories) *Resolver { + return &Resolver{graphQLRepositories: repositories} } func (r *Resolver) LogFilter(args struct { Name string }) (*logFilterResolver, error) { - logFilter, err := r.repository.GetFilter(args.Name) + logFilter, err := r.graphQLRepositories.GetFilter(args.Name) if err != nil { return &logFilterResolver{}, err } @@ -94,7 +101,7 @@ func (lfr *logFilterResolver) Topics() []*string { func (r *Resolver) WatchedEvents(args struct { Name string }) (*watchedEventsResolver, error) { - watchedEvents, err := r.repository.GetWatchedEvents(args.Name) + watchedEvents, err := r.graphQLRepositories.GetWatchedEvents(args.Name) if err != nil { return &watchedEventsResolver{}, err } diff --git a/pkg/graphql_server/schema_test.go b/pkg/graphql_server/schema_test.go index dff3eb29..c60978d6 100644 --- a/pkg/graphql_server/schema_test.go +++ b/pkg/graphql_server/schema_test.go @@ -14,7 +14,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/graphql_server" - "github.com/vulcanize/vulcanizedb/pkg/repositories" "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" ) @@ -32,27 +31,38 @@ func formatJSON(data []byte) []byte { var _ = Describe("GraphQL", func() { var cfg config.Config - var repository repositories.Repository + var graphQLRepositories graphql_server.GraphQLRepositories BeforeEach(func() { cfg, _ = config.NewConfig("private") node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"} - repository = postgres.BuildRepository(node) - e := repository.CreateFilter(filters.LogFilter{ + db := postgres.NewTestDB(node) + blockRepository := &postgres.BlockRepository{DB: db} + logRepository := &postgres.LogRepository{DB: db} + filterRepository := &postgres.FilterRepository{DB: db} + watchedEventRepository := &postgres.WatchedEventRepository{DB: db} + graphQLRepositories = graphql_server.GraphQLRepositories{ + WatchedEventRepository: watchedEventRepository, + BlockRepository: blockRepository, + LogRepository: logRepository, + FilterRepository: filterRepository, + } + + err := graphQLRepositories.CreateFilter(filters.LogFilter{ Name: "TestFilter1", FromBlock: 1, ToBlock: 10, Address: "0x123456789", Topics: core.Topics{0: "topic=1", 2: "topic=2"}, }) - if e != nil { - log.Fatal(e) + if err != nil { + log.Fatal(err) } - f, e := repository.GetFilter("TestFilter1") - if e != nil { - log.Println(f) - log.Fatal(e) + filter, err := graphQLRepositories.GetFilter("TestFilter1") + if err != nil { + log.Println(filter) + log.Fatal(err) } matchingEvent := core.Log{ @@ -71,16 +81,16 @@ var _ = Describe("GraphQL", func() { Index: 0, Data: "0xDATADATADATA", } - e = repository.CreateLogs([]core.Log{matchingEvent, nonMatchingEvent}) - if e != nil { - log.Fatal(e) + err = graphQLRepositories.CreateLogs([]core.Log{matchingEvent, nonMatchingEvent}) + if err != nil { + log.Fatal(err) } }) It("Queries example schema for specific log filter", func() { var variables map[string]interface{} - r := graphql_server.NewResolver(repository) - var schema = graphql.MustParseSchema(graphql_server.Schema, r) + resolver := graphql_server.NewResolver(graphQLRepositories) + var schema = graphql.MustParseSchema(graphql_server.Schema, resolver) response := schema.Exec(context.Background(), `{ logFilter(name: "TestFilter1") { @@ -108,16 +118,16 @@ var _ = Describe("GraphQL", func() { } err := json.Unmarshal(response.Data, &v) Expect(err).ToNot(HaveOccurred()) - a := formatJSON(response.Data) - e := formatJSON([]byte(expected)) - Expect(a).To(Equal(e)) + actualJSON := formatJSON(response.Data) + expectedJSON := formatJSON([]byte(expected)) + Expect(actualJSON).To(Equal(expectedJSON)) }) It("Queries example schema for specific watched event log", func() { var variables map[string]interface{} - r := graphql_server.NewResolver(repository) - var schema = graphql.MustParseSchema(graphql_server.Schema, r) + resolver := graphql_server.NewResolver(graphQLRepositories) + var schema = graphql.MustParseSchema(graphql_server.Schema, resolver) response := schema.Exec(context.Background(), `{ watchedEvents(name: "TestFilter1") { @@ -161,8 +171,8 @@ var _ = Describe("GraphQL", func() { } err := json.Unmarshal(response.Data, &v) Expect(err).ToNot(HaveOccurred()) - a := formatJSON(response.Data) - e := formatJSON([]byte(expected)) - Expect(a).To(Equal(e)) + actualJSON := formatJSON(response.Data) + expectedJSON := formatJSON([]byte(expected)) + Expect(actualJSON).To(Equal(expectedJSON)) }) }) diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index deb5192a..24c03536 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -7,19 +7,19 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories" ) -func PopulateMissingBlocks(blockchain core.Blockchain, repository repositories.Repository, startingBlockNumber int64) int { +func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, startingBlockNumber int64) int { lastBlock := blockchain.LastBlock().Int64() - blockRange := repository.MissingBlockNumbers(startingBlockNumber, lastBlock-1) + blockRange := blockRepository.MissingBlockNumbers(startingBlockNumber, lastBlock-1) log.SetPrefix("") log.Printf("Backfilling %d blocks\n\n", len(blockRange)) - RetrieveAndUpdateBlocks(blockchain, repository, blockRange) + RetrieveAndUpdateBlocks(blockchain, blockRepository, blockRange) return len(blockRange) } -func RetrieveAndUpdateBlocks(blockchain core.Blockchain, repository repositories.Repository, blockNumbers []int64) int { +func RetrieveAndUpdateBlocks(blockchain core.Blockchain, blockRepository repositories.BlockRepository, blockNumbers []int64) int { for _, blockNumber := range blockNumbers { block := blockchain.GetBlockByNumber(blockNumber) - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) } return len(blockNumbers) } diff --git a/pkg/history/populate_blocks_test.go b/pkg/history/populate_blocks_test.go index fb4670c2..bb4e3a0d 100644 --- a/pkg/history/populate_blocks_test.go +++ b/pkg/history/populate_blocks_test.go @@ -10,6 +10,13 @@ import ( ) var _ = Describe("Populating blocks", func() { + var inMemory *inmemory.InMemory + var blockRepository *inmemory.BlockRepository + + BeforeEach(func() { + inMemory = inmemory.NewInMemory() + blockRepository = &inmemory.BlockRepository{InMemory: inMemory} + }) It("fills in the only missing block (Number 1)", func() { blocks := []core.Block{ @@ -17,11 +24,11 @@ var _ = Describe("Populating blocks", func() { {Number: 2}, } blockchain := fakes.NewBlockchainWithBlocks(blocks) - repository := inmemory.NewInMemory() - repository.CreateOrUpdateBlock(core.Block{Number: 2}) - blocksAdded := history.PopulateMissingBlocks(blockchain, repository, 1) - _, err := repository.GetBlock(1) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 2}) + + blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 1) + _, err := blockRepository.GetBlock(1) Expect(blocksAdded).To(Equal(1)) Expect(err).ToNot(HaveOccurred()) @@ -40,29 +47,28 @@ var _ = Describe("Populating blocks", func() { {Number: 12}, {Number: 13}, }) - repository := inmemory.NewInMemory() - repository.CreateOrUpdateBlock(core.Block{Number: 1}) - repository.CreateOrUpdateBlock(core.Block{Number: 2}) - repository.CreateOrUpdateBlock(core.Block{Number: 3}) - repository.CreateOrUpdateBlock(core.Block{Number: 6}) - repository.CreateOrUpdateBlock(core.Block{Number: 7}) - repository.CreateOrUpdateBlock(core.Block{Number: 9}) - repository.CreateOrUpdateBlock(core.Block{Number: 11}) - repository.CreateOrUpdateBlock(core.Block{Number: 12}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 1}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 2}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 7}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 9}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 11}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 12}) - blocksAdded := history.PopulateMissingBlocks(blockchain, repository, 5) + blocksAdded := history.PopulateMissingBlocks(blockchain, blockRepository, 5) Expect(blocksAdded).To(Equal(3)) - Expect(repository.BlockCount()).To(Equal(11)) - _, err := repository.GetBlock(4) + Expect(blockRepository.BlockCount()).To(Equal(11)) + _, err := blockRepository.GetBlock(4) Expect(err).To(HaveOccurred()) - _, err = repository.GetBlock(5) + _, err = blockRepository.GetBlock(5) Expect(err).ToNot(HaveOccurred()) - _, err = repository.GetBlock(8) + _, err = blockRepository.GetBlock(8) Expect(err).ToNot(HaveOccurred()) - _, err = repository.GetBlock(10) + _, err = blockRepository.GetBlock(10) Expect(err).ToNot(HaveOccurred()) - _, err = repository.GetBlock(13) + _, err = blockRepository.GetBlock(13) Expect(err).To(HaveOccurred()) }) @@ -72,11 +78,10 @@ var _ = Describe("Populating blocks", func() { {Number: 5}, {Number: 6}, }) - repository := inmemory.NewInMemory() - repository.CreateOrUpdateBlock(core.Block{Number: 3}) - repository.CreateOrUpdateBlock(core.Block{Number: 6}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) - numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, repository, 3) + numberOfBlocksCreated := history.PopulateMissingBlocks(blockchain, blockRepository, 3) Expect(numberOfBlocksCreated).To(Equal(2)) }) @@ -89,11 +94,10 @@ var _ = Describe("Populating blocks", func() { {Number: 4}, {Number: 5}, }) - repository := inmemory.NewInMemory() - history.RetrieveAndUpdateBlocks(blockchain, repository, history.MakeRange(2, 5)) - Expect(repository.BlockCount()).To(Equal(3)) - Expect(repository.CreateOrUpdateBlockCallCount).To(Equal(3)) + history.RetrieveAndUpdateBlocks(blockchain, blockRepository, history.MakeRange(2, 5)) + Expect(blockRepository.BlockCount()).To(Equal(3)) + Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3)) }) }) diff --git a/pkg/history/validate_blocks.go b/pkg/history/validate_blocks.go index e0f6ebcb..8f3be99f 100644 --- a/pkg/history/validate_blocks.go +++ b/pkg/history/validate_blocks.go @@ -17,15 +17,15 @@ var ParsedWindowTemplate = *template.Must(template.New("window").Parse(WindowTem type BlockValidator struct { blockchain core.Blockchain - repository repositories.Repository + blockRepository repositories.BlockRepository windowSize int parsedLoggingTemplate template.Template } -func NewBlockValidator(blockchain core.Blockchain, repository repositories.Repository, windowSize int) *BlockValidator { +func NewBlockValidator(blockchain core.Blockchain, blockRepository repositories.BlockRepository, windowSize int) *BlockValidator { return &BlockValidator{ blockchain, - repository, + blockRepository, windowSize, ParsedWindowTemplate, } @@ -34,9 +34,9 @@ func NewBlockValidator(blockchain core.Blockchain, repository repositories.Repos func (bv BlockValidator) ValidateBlocks() ValidationWindow { window := MakeValidationWindow(bv.blockchain, bv.windowSize) blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateBlocks(bv.blockchain, bv.repository, blockNumbers) + RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) lastBlock := bv.blockchain.LastBlock().Int64() - bv.repository.SetBlocksStatus(lastBlock) + bv.blockRepository.SetBlocksStatus(lastBlock) return window } diff --git a/pkg/history/validate_blocks_test.go b/pkg/history/validate_blocks_test.go index 2ff781ab..a6194451 100644 --- a/pkg/history/validate_blocks_test.go +++ b/pkg/history/validate_blocks_test.go @@ -47,23 +47,26 @@ var _ = Describe("Blocks validator", func() { {Number: 6}, {Number: 7}, }) - repository := inmemory.NewInMemory() + inMemoryDB := inmemory.NewInMemory() + blocksRepository := &inmemory.BlockRepository{InMemory: inMemoryDB} - validator := history.NewBlockValidator(blockchain, repository, 2) + validator := history.NewBlockValidator(blockchain, blocksRepository, 2) window := validator.ValidateBlocks() Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7})) - Expect(repository.BlockCount()).To(Equal(2)) - Expect(repository.CreateOrUpdateBlockCallCount).To(Equal(2)) + Expect(blocksRepository.BlockCount()).To(Equal(2)) + Expect(blocksRepository.CreateOrUpdateBlockCallCount).To(Equal(2)) }) It("logs window message", func() { + inMemoryDB := inmemory.NewInMemory() + blockRepository := &inmemory.BlockRepository{InMemory: inMemoryDB} + expectedMessage := &bytes.Buffer{} window := history.ValidationWindow{LowerBound: 5, UpperBound: 7} history.ParsedWindowTemplate.Execute(expectedMessage, history.ValidationWindow{LowerBound: 5, UpperBound: 7}) blockchain := fakes.NewBlockchainWithBlocks([]core.Block{}) - repository := inmemory.NewInMemory() - validator := history.NewBlockValidator(blockchain, repository, 2) + validator := history.NewBlockValidator(blockchain, blockRepository, 2) actualMessage := &bytes.Buffer{} validator.Log(actualMessage, window) Expect(actualMessage).To(Equal(expectedMessage)) diff --git a/pkg/repositories/inmemory/block_repository.go b/pkg/repositories/inmemory/block_repository.go new file mode 100644 index 00000000..a9dda09d --- /dev/null +++ b/pkg/repositories/inmemory/block_repository.go @@ -0,0 +1,51 @@ +package inmemory + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/repositories" +) + +type BlockRepository struct { + *InMemory +} + +func (blockRepository *BlockRepository) CreateOrUpdateBlock(block core.Block) error { + blockRepository.CreateOrUpdateBlockCallCount++ + blockRepository.blocks[block.Number] = block + for _, transaction := range block.Transactions { + blockRepository.receipts[transaction.Hash] = transaction.Receipt + blockRepository.logs[transaction.TxHash] = transaction.Logs + } + return nil +} + +func (blockRepository *BlockRepository) GetBlock(blockNumber int64) (core.Block, error) { + if block, ok := blockRepository.blocks[blockNumber]; ok { + return block, nil + } + return core.Block{}, repositories.ErrBlockDoesNotExist(blockNumber) +} + +func (blockRepository *BlockRepository) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { + missingNumbers := []int64{} + for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { + if _, ok := blockRepository.blocks[blockNumber]; !ok { + missingNumbers = append(missingNumbers, blockNumber) + } + } + return missingNumbers +} + +func (blockRepository *BlockRepository) SetBlocksStatus(chainHead int64) { + for key, block := range blockRepository.blocks { + if key < (chainHead - blocksFromHeadBeforeFinal) { + tmp := block + tmp.IsFinal = true + blockRepository.blocks[key] = tmp + } + } +} + +func (blockRepository *BlockRepository) BlockCount() int { + return len(blockRepository.blocks) +} diff --git a/pkg/repositories/inmemory/contract_repository.go b/pkg/repositories/inmemory/contract_repository.go new file mode 100644 index 00000000..21db153b --- /dev/null +++ b/pkg/repositories/inmemory/contract_repository.go @@ -0,0 +1,35 @@ +package inmemory + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/repositories" +) + +type ContractRepostiory struct { + *InMemory +} + +func (contractRepository *ContractRepostiory) ContractExists(contractHash string) bool { + _, present := contractRepository.contracts[contractHash] + return present +} + +func (contractRepository *ContractRepostiory) GetContract(contractHash string) (core.Contract, error) { + contract, ok := contractRepository.contracts[contractHash] + if !ok { + return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash) + } + for _, block := range contractRepository.blocks { + for _, transaction := range block.Transactions { + if transaction.To == contractHash { + contract.Transactions = append(contract.Transactions, transaction) + } + } + } + return contract, nil +} + +func (contractRepository *ContractRepostiory) CreateContract(contract core.Contract) error { + contractRepository.contracts[contract.Hash] = contract + return nil +} diff --git a/pkg/repositories/inmemory/in_memory.go b/pkg/repositories/inmemory/in_memory.go index 9b3f5b33..36e272f5 100644 --- a/pkg/repositories/inmemory/in_memory.go +++ b/pkg/repositories/inmemory/in_memory.go @@ -1,13 +1,8 @@ package inmemory import ( - "fmt" - - "errors" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" - "github.com/vulcanize/vulcanizedb/pkg/repositories" ) const ( @@ -23,23 +18,6 @@ type InMemory struct { CreateOrUpdateBlockCallCount int } -func (repository *InMemory) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) { - panic("implement me") -} - -func (repository *InMemory) GetFilter(name string) (filters.LogFilter, error) { - panic("implement me") -} - -func (repository *InMemory) CreateFilter(filter filters.LogFilter) error { - key := filter.Name - if _, ok := repository.logFilters[key]; ok || key == "" { - return errors.New("filter name not unique") - } - repository.logFilters[key] = filter - return nil -} - func NewInMemory() *InMemory { return &InMemory{ CreateOrUpdateBlockCallCount: 0, @@ -50,107 +28,3 @@ func NewInMemory() *InMemory { logFilters: make(map[string]filters.LogFilter), } } - -func (repository *InMemory) GetReceipt(txHash string) (core.Receipt, error) { - if receipt, ok := repository.receipts[txHash]; ok { - return receipt, nil - } - return core.Receipt{}, repositories.ErrReceiptDoesNotExist(txHash) -} - -func (repository *InMemory) SetBlocksStatus(chainHead int64) { - for key, block := range repository.blocks { - if key < (chainHead - blocksFromHeadBeforeFinal) { - tmp := block - tmp.IsFinal = true - repository.blocks[key] = tmp - } - } -} - -func (repository *InMemory) CreateLogs(logs []core.Log) error { - for _, log := range logs { - key := fmt.Sprintf("%d%d", log.BlockNumber, log.Index) - var logs []core.Log - repository.logs[key] = append(logs, log) - } - return nil -} - -func (repository *InMemory) GetLogs(address string, blockNumber int64) []core.Log { - var matchingLogs []core.Log - for _, logs := range repository.logs { - for _, log := range logs { - if log.Address == address && log.BlockNumber == blockNumber { - matchingLogs = append(matchingLogs, log) - } - } - } - return matchingLogs -} - -func (repository *InMemory) CreateContract(contract core.Contract) error { - repository.contracts[contract.Hash] = contract - return nil -} - -func (repository *InMemory) ContractExists(contractHash string) bool { - _, present := repository.contracts[contractHash] - return present -} - -func (repository *InMemory) GetContract(contractHash string) (core.Contract, error) { - contract, ok := repository.contracts[contractHash] - if !ok { - return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash) - } - for _, block := range repository.blocks { - for _, transaction := range block.Transactions { - if transaction.To == contractHash { - contract.Transactions = append(contract.Transactions, transaction) - } - } - } - return contract, nil -} - -func (repository *InMemory) MissingBlockNumbers(startingBlockNumber int64, endingBlockNumber int64) []int64 { - missingNumbers := []int64{} - for blockNumber := int64(startingBlockNumber); blockNumber <= endingBlockNumber; blockNumber++ { - if _, ok := repository.blocks[blockNumber]; !ok { - missingNumbers = append(missingNumbers, blockNumber) - } - } - return missingNumbers -} - -func (repository *InMemory) CreateOrUpdateBlock(block core.Block) error { - repository.CreateOrUpdateBlockCallCount++ - repository.blocks[block.Number] = block - for _, transaction := range block.Transactions { - repository.receipts[transaction.Hash] = transaction.Receipt - repository.logs[transaction.TxHash] = transaction.Logs - } - return nil -} - -func (repository *InMemory) BlockCount() int { - return len(repository.blocks) -} - -func (repository *InMemory) GetBlock(blockNumber int64) (core.Block, error) { - if block, ok := repository.blocks[blockNumber]; ok { - return block, nil - } - return core.Block{}, repositories.ErrBlockDoesNotExist(blockNumber) -} - -func (repository *InMemory) MaxBlockNumber() int64 { - highestBlockNumber := int64(-1) - for key := range repository.blocks { - if key > highestBlockNumber { - highestBlockNumber = key - } - } - return highestBlockNumber -} diff --git a/pkg/repositories/postgres/block_repository.go b/pkg/repositories/postgres/block_repository.go index d7b7eb13..e8b77465 100644 --- a/pkg/repositories/postgres/block_repository.go +++ b/pkg/repositories/postgres/block_repository.go @@ -16,35 +16,39 @@ const ( blocksFromHeadBeforeFinal = 20 ) -func (db DB) SetBlocksStatus(chainHead int64) { +type BlockRepository struct { + *DB +} + +func (blockRepository BlockRepository) SetBlocksStatus(chainHead int64) { cutoff := chainHead - blocksFromHeadBeforeFinal - db.DB.Exec(` + blockRepository.DB.Exec(` UPDATE blocks SET is_final = TRUE WHERE is_final = FALSE AND number < $1`, cutoff) } -func (db DB) CreateOrUpdateBlock(block core.Block) error { +func (blockRepository BlockRepository) CreateOrUpdateBlock(block core.Block) error { var err error - retrievedBlockHash, ok := db.getBlockHash(block) + retrievedBlockHash, ok := blockRepository.getBlockHash(block) if !ok { - err = db.insertBlock(block) + err = blockRepository.insertBlock(block) return err } if ok && retrievedBlockHash != block.Hash { - err = db.removeBlock(block.Number) + err = blockRepository.removeBlock(block.Number) if err != nil { return err } - err = db.insertBlock(block) + err = blockRepository.insertBlock(block) return err } return nil } -func (db DB) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 { +func (blockRepository BlockRepository) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 { numbers := make([]int64, 0) - db.DB.Select(&numbers, + blockRepository.DB.Select(&numbers, `SELECT all_block_numbers FROM ( SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series @@ -56,8 +60,8 @@ func (db DB) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber i return numbers } -func (db DB) GetBlock(blockNumber int64) (core.Block, error) { - blockRows := db.DB.QueryRowx( +func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, error) { + blockRows := blockRepository.DB.QueryRowx( `SELECT id, number, gaslimit, @@ -75,8 +79,8 @@ func (db DB) GetBlock(blockNumber int64) (core.Block, error) { reward, uncles_reward FROM blocks - WHERE node_id = $1 AND number = $2`, db.nodeId, blockNumber) - savedBlock, err := db.loadBlock(blockRows) + WHERE node_id = $1 AND number = $2`, blockRepository.nodeId, blockNumber) + savedBlock, err := blockRepository.loadBlock(blockRows) if err != nil { switch err { case sql.ErrNoRows: @@ -88,21 +92,21 @@ func (db DB) GetBlock(blockNumber int64) (core.Block, error) { return savedBlock, nil } -func (db DB) insertBlock(block core.Block) error { +func (blockRepository BlockRepository) insertBlock(block core.Block) error { var blockId int64 - tx, _ := db.DB.BeginTx(context.Background(), nil) + tx, _ := blockRepository.DB.BeginTx(context.Background(), nil) err := tx.QueryRow( `INSERT INTO blocks (node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING id `, - db.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward). + blockRepository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward). Scan(&blockId) if err != nil { tx.Rollback() return ErrDBInsertFailed } - err = db.createTransactions(tx, blockId, block.Transactions) + err = blockRepository.createTransactions(tx, blockId, block.Transactions) if err != nil { tx.Rollback() return ErrDBInsertFailed @@ -111,9 +115,9 @@ func (db DB) insertBlock(block core.Block) error { return nil } -func (db DB) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error { +func (blockRepository BlockRepository) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error { for _, transaction := range transactions { - err := db.createTransaction(tx, blockId, transaction) + err := blockRepository.createTransaction(tx, blockId, transaction) if err != nil { return err } @@ -131,7 +135,7 @@ func nullStringToZero(s string) string { return s } -func (db DB) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error { +func (blockRepository BlockRepository) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error { var transactionId int err := tx.QueryRow( `INSERT INTO transactions @@ -144,12 +148,12 @@ func (db DB) createTransaction(tx *sql.Tx, blockId int64, transaction core.Trans return err } if hasReceipt(transaction) { - receiptId, err := db.createReceipt(tx, transactionId, transaction.Receipt) + receiptId, err := blockRepository.createReceipt(tx, transactionId, transaction.Receipt) if err != nil { return err } if hasLogs(transaction) { - err = db.createLogs(tx, transaction.Receipt.Logs, receiptId) + err = blockRepository.createLogs(tx, transaction.Receipt.Logs, receiptId) if err != nil { return err } @@ -166,7 +170,7 @@ func hasReceipt(transaction core.Transaction) bool { return transaction.Receipt.TxHash != "" } -func (db DB) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) (int, error) { +func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) (int, error) { //Not currently persisting log bloom filters var receiptId int err := tx.QueryRow( @@ -181,17 +185,17 @@ func (db DB) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) return receiptId, nil } -func (db DB) getBlockHash(block core.Block) (string, bool) { +func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) { var retrievedBlockHash string - db.DB.Get(&retrievedBlockHash, + blockRepository.DB.Get(&retrievedBlockHash, `SELECT hash FROM blocks WHERE number = $1 AND node_id = $2`, - block.Number, db.nodeId) + block.Number, blockRepository.nodeId) return retrievedBlockHash, blockExists(retrievedBlockHash) } -func (db DB) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error { +func (blockRepository BlockRepository) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error { for _, tlog := range logs { _, err := tx.Exec( `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) @@ -210,19 +214,19 @@ func blockExists(retrievedBlockHash string) bool { return retrievedBlockHash != "" } -func (db DB) removeBlock(blockNumber int64) error { - _, err := db.DB.Exec( +func (blockRepository BlockRepository) removeBlock(blockNumber int64) error { + _, err := blockRepository.DB.Exec( `DELETE FROM blocks WHERE number=$1 AND node_id=$2`, - blockNumber, db.nodeId) + blockNumber, blockRepository.nodeId) if err != nil { return ErrDBDeleteFailed } return nil } -func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) { +func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Block, error) { type b struct { ID int core.Block @@ -232,7 +236,7 @@ func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) { if err != nil { return core.Block{}, err } - transactionRows, err := db.DB.Queryx(` + transactionRows, err := blockRepository.DB.Queryx(` SELECT hash, nonce, tx_to, @@ -247,11 +251,11 @@ func (db DB) loadBlock(blockRows *sqlx.Row) (core.Block, error) { if err != nil { return core.Block{}, err } - block.Transactions = db.loadTransactions(transactionRows) + block.Transactions = blockRepository.LoadTransactions(transactionRows) return block.Block, nil } -func (db DB) loadTransactions(transactionRows *sqlx.Rows) []core.Transaction { +func (blockRepository BlockRepository) LoadTransactions(transactionRows *sqlx.Rows) []core.Transaction { var transactions []core.Transaction for transactionRows.Next() { var transaction core.Transaction diff --git a/pkg/repositories/postgres/block_repository_test.go b/pkg/repositories/postgres/block_repository_test.go index 0254586f..01599bfa 100644 --- a/pkg/repositories/postgres/block_repository_test.go +++ b/pkg/repositories/postgres/block_repository_test.go @@ -12,7 +12,8 @@ import ( ) var _ = Describe("Saving blocks", func() { - var repository repositories.BlockRepository + var db *postgres.DB + var blockRepository repositories.BlockRepository BeforeEach(func() { node := core.Node{ GenesisBlock: "GENESIS", @@ -20,21 +21,24 @@ var _ = Describe("Saving blocks", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - repository = postgres.BuildRepository(node) + db = postgres.NewTestDB(node) + blockRepository = postgres.BlockRepository{DB: db} + }) It("associates blocks to a node", func() { block := core.Block{ Number: 123, } - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) nodeTwo := core.Node{ GenesisBlock: "0x456", NetworkId: 1, Id: "x123456", ClientName: "Geth", } - repositoryTwo := postgres.BuildRepository(nodeTwo) + dbTwo := postgres.NewTestDB(nodeTwo) + repositoryTwo := postgres.BlockRepository{DB: dbTwo} _, err := repositoryTwo.GetBlock(123) Expect(err).To(HaveOccurred()) @@ -72,9 +76,9 @@ var _ = Describe("Saving blocks", func() { UnclesReward: unclesReward, } - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) - savedBlock, err := repository.GetBlock(blockNumber) + savedBlock, err := blockRepository.GetBlock(blockNumber) Expect(err).NotTo(HaveOccurred()) Expect(savedBlock.Reward).To(Equal(blockReward)) Expect(savedBlock.Difficulty).To(Equal(difficulty)) @@ -93,7 +97,7 @@ var _ = Describe("Saving blocks", func() { }) It("does not find a block when searching for a number that does not exist", func() { - _, err := repository.GetBlock(111) + _, err := blockRepository.GetBlock(111) Expect(err).To(HaveOccurred()) }) @@ -104,9 +108,9 @@ var _ = Describe("Saving blocks", func() { Transactions: []core.Transaction{{}}, } - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) - savedBlock, _ := repository.GetBlock(123) + savedBlock, _ := blockRepository.GetBlock(123) Expect(len(savedBlock.Transactions)).To(Equal(1)) }) @@ -116,9 +120,9 @@ var _ = Describe("Saving blocks", func() { Transactions: []core.Transaction{{}, {}}, } - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) - savedBlock, _ := repository.GetBlock(123) + savedBlock, _ := blockRepository.GetBlock(123) Expect(len(savedBlock.Transactions)).To(Equal(2)) }) @@ -135,10 +139,10 @@ var _ = Describe("Saving blocks", func() { Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}}, } - repository.CreateOrUpdateBlock(blockOne) - repository.CreateOrUpdateBlock(blockTwo) + blockRepository.CreateOrUpdateBlock(blockOne) + blockRepository.CreateOrUpdateBlock(blockTwo) - savedBlock, _ := repository.GetBlock(123) + savedBlock, _ := blockRepository.GetBlock(123) Expect(len(savedBlock.Transactions)).To(Equal(2)) Expect(savedBlock.Transactions[0].Hash).To(Equal("x678")) Expect(savedBlock.Transactions[1].Hash).To(Equal("x9ab")) @@ -154,16 +158,17 @@ var _ = Describe("Saving blocks", func() { Number: 123, Transactions: []core.Transaction{{Hash: "x678"}, {Hash: "x9ab"}}, } - repository.CreateOrUpdateBlock(blockOne) + blockRepository.CreateOrUpdateBlock(blockOne) nodeTwo := core.Node{ GenesisBlock: "0x456", NetworkId: 1, } - repositoryTwo := postgres.BuildRepository(nodeTwo) + dbTwo := postgres.NewTestDB(nodeTwo) + repositoryTwo := postgres.BlockRepository{DB: dbTwo} - repository.CreateOrUpdateBlock(blockOne) + blockRepository.CreateOrUpdateBlock(blockOne) repositoryTwo.CreateOrUpdateBlock(blockTwo) - retrievedBlockOne, _ := repository.GetBlock(123) + retrievedBlockOne, _ := blockRepository.GetBlock(123) retrievedBlockTwo, _ := repositoryTwo.GetBlock(123) Expect(retrievedBlockOne.Transactions[0].Hash).To(Equal("x123")) @@ -194,9 +199,9 @@ var _ = Describe("Saving blocks", func() { Transactions: []core.Transaction{transaction}, } - repository.CreateOrUpdateBlock(block) + blockRepository.CreateOrUpdateBlock(block) - savedBlock, _ := repository.GetBlock(123) + savedBlock, _ := blockRepository.GetBlock(123) Expect(len(savedBlock.Transactions)).To(Equal(1)) savedTransaction := savedBlock.Transactions[0] Expect(savedTransaction.Data).To(Equal(transaction.Data)) @@ -211,54 +216,54 @@ var _ = Describe("Saving blocks", func() { Describe("The missing block numbers", func() { It("is empty the starting block number is the highest known block number", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 1}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 1}) - Expect(len(repository.MissingBlockNumbers(1, 1))).To(Equal(0)) + Expect(len(blockRepository.MissingBlockNumbers(1, 1))).To(Equal(0)) }) It("is the only missing block number", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 2}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 2}) - Expect(repository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1})) + Expect(blockRepository.MissingBlockNumbers(1, 2)).To(Equal([]int64{1})) }) It("is both missing block numbers", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 3}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) - Expect(repository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2})) + Expect(blockRepository.MissingBlockNumbers(1, 3)).To(Equal([]int64{1, 2})) }) It("goes back to the starting block number", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 6}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) - Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5})) + Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{4, 5})) }) It("only includes missing block numbers", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 4}) - repository.CreateOrUpdateBlock(core.Block{Number: 6}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 4}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 6}) - Expect(repository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5})) + Expect(blockRepository.MissingBlockNumbers(4, 6)).To(Equal([]int64{5})) }) It("is a list with multiple gaps", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 4}) - repository.CreateOrUpdateBlock(core.Block{Number: 5}) - repository.CreateOrUpdateBlock(core.Block{Number: 8}) - repository.CreateOrUpdateBlock(core.Block{Number: 10}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 4}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 5}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 10}) - Expect(repository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9})) + Expect(blockRepository.MissingBlockNumbers(3, 10)).To(Equal([]int64{3, 6, 7, 9})) }) It("returns empty array when lower bound exceeds upper bound", func() { - Expect(repository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{})) + Expect(blockRepository.MissingBlockNumbers(10000, 1)).To(Equal([]int64{})) }) It("only returns requested range even when other gaps exist", func() { - repository.CreateOrUpdateBlock(core.Block{Number: 3}) - repository.CreateOrUpdateBlock(core.Block{Number: 8}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 3}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: 8}) - Expect(repository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5})) + Expect(blockRepository.MissingBlockNumbers(1, 5)).To(Equal([]int64{1, 2, 4, 5})) }) }) @@ -266,15 +271,15 @@ var _ = Describe("Saving blocks", func() { It("sets the status of blocks within n-20 of chain HEAD as final", func() { blockNumberOfChainHead := 25 for i := 0; i < blockNumberOfChainHead; i++ { - repository.CreateOrUpdateBlock(core.Block{Number: int64(i), Hash: strconv.Itoa(i)}) + blockRepository.CreateOrUpdateBlock(core.Block{Number: int64(i), Hash: strconv.Itoa(i)}) } - repository.SetBlocksStatus(int64(blockNumberOfChainHead)) + blockRepository.SetBlocksStatus(int64(blockNumberOfChainHead)) - blockOne, err := repository.GetBlock(1) + blockOne, err := blockRepository.GetBlock(1) Expect(err).ToNot(HaveOccurred()) Expect(blockOne.IsFinal).To(Equal(true)) - blockTwo, err := repository.GetBlock(24) + blockTwo, err := blockRepository.GetBlock(24) Expect(err).ToNot(HaveOccurred()) Expect(blockTwo.IsFinal).To(BeFalse()) }) diff --git a/pkg/repositories/postgres/contract_repository.go b/pkg/repositories/postgres/contract_repository.go index c5069524..64846526 100644 --- a/pkg/repositories/postgres/contract_repository.go +++ b/pkg/repositories/postgres/contract_repository.go @@ -7,13 +7,17 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories" ) -func (db DB) CreateContract(contract core.Contract) error { +type ContractRepository struct { + *DB +} + +func (contractRepository ContractRepository) CreateContract(contract core.Contract) error { abi := contract.Abi var abiToInsert *string if abi != "" { abiToInsert = &abi } - _, err := db.DB.Exec( + _, err := contractRepository.DB.Exec( `INSERT INTO watched_contracts (contract_hash, contract_abi) VALUES ($1, $2) ON CONFLICT (contract_hash) @@ -26,9 +30,9 @@ func (db DB) CreateContract(contract core.Contract) error { return nil } -func (db DB) ContractExists(contractHash string) bool { +func (contractRepository ContractRepository) ContractExists(contractHash string) bool { var exists bool - db.DB.QueryRow( + contractRepository.DB.QueryRow( `SELECT exists( SELECT 1 FROM watched_contracts @@ -36,21 +40,21 @@ func (db DB) ContractExists(contractHash string) bool { return exists } -func (db DB) GetContract(contractHash string) (core.Contract, error) { +func (contractRepository ContractRepository) GetContract(contractHash string) (core.Contract, error) { var hash string var abi string - contract := db.DB.QueryRow( + contract := contractRepository.DB.QueryRow( `SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash) err := contract.Scan(&hash, &abi) if err == sql.ErrNoRows { return core.Contract{}, repositories.ErrContractDoesNotExist(contractHash) } - savedContract := db.addTransactions(core.Contract{Hash: hash, Abi: abi}) + savedContract := contractRepository.addTransactions(core.Contract{Hash: hash, Abi: abi}) return savedContract, nil } -func (db DB) addTransactions(contract core.Contract) core.Contract { - transactionRows, _ := db.DB.Queryx(` +func (contractRepository ContractRepository) addTransactions(contract core.Contract) core.Contract { + transactionRows, _ := contractRepository.DB.Queryx(` SELECT hash, nonce, tx_to, @@ -62,7 +66,8 @@ func (db DB) addTransactions(contract core.Contract) core.Contract { FROM transactions WHERE tx_to = $1 ORDER BY block_id DESC`, contract.Hash) - transactions := db.loadTransactions(transactionRows) + blockRepository := &BlockRepository{contractRepository.DB} + transactions := blockRepository.LoadTransactions(transactionRows) savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi} return savedContract } diff --git a/pkg/repositories/postgres/contract_repository_test.go b/pkg/repositories/postgres/contract_repository_test.go index 66b87f02..892318ac 100644 --- a/pkg/repositories/postgres/contract_repository_test.go +++ b/pkg/repositories/postgres/contract_repository_test.go @@ -1,4 +1,4 @@ -package postgres +package postgres_test import ( "sort" @@ -7,11 +7,14 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/repositories" + "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" ) var _ = Describe("Creating contracts", func() { - var repository repositories.ContractRepository + var db *postgres.DB + var contractRepository repositories.ContractRepository var node core.Node + BeforeEach(func() { node = core.Node{ GenesisBlock: "GENESIS", @@ -19,35 +22,36 @@ var _ = Describe("Creating contracts", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - repository = BuildRepository(node) + db = postgres.NewTestDB(node) + contractRepository = postgres.ContractRepository{DB: db} }) It("returns the contract when it exists", func() { - repository.CreateContract(core.Contract{Hash: "x123"}) + contractRepository.CreateContract(core.Contract{Hash: "x123"}) - contract, err := repository.GetContract("x123") + contract, err := contractRepository.GetContract("x123") Expect(err).NotTo(HaveOccurred()) Expect(contract.Hash).To(Equal("x123")) - Expect(repository.ContractExists("x123")).To(BeTrue()) - Expect(repository.ContractExists("x456")).To(BeFalse()) + Expect(contractRepository.ContractExists("x123")).To(BeTrue()) + Expect(contractRepository.ContractExists("x456")).To(BeFalse()) }) It("returns err if contract does not exist", func() { - _, err := repository.GetContract("x123") + _, err := contractRepository.GetContract("x123") Expect(err).To(HaveOccurred()) }) It("returns empty array when no transactions 'To' a contract", func() { - repository.CreateContract(core.Contract{Hash: "x123"}) - contract, err := repository.GetContract("x123") + contractRepository.CreateContract(core.Contract{Hash: "x123"}) + contract, err := contractRepository.GetContract("x123") Expect(err).ToNot(HaveOccurred()) Expect(contract.Transactions).To(BeEmpty()) }) It("returns transactions 'To' a contract", func() { var blockRepository repositories.BlockRepository - blockRepository = BuildRepository(node) + blockRepository = postgres.BlockRepository{DB: db} block := core.Block{ Number: 123, Transactions: []core.Transaction{ @@ -58,8 +62,8 @@ var _ = Describe("Creating contracts", func() { } blockRepository.CreateOrUpdateBlock(block) - repository.CreateContract(core.Contract{Hash: "x123"}) - contract, err := repository.GetContract("x123") + contractRepository.CreateContract(core.Contract{Hash: "x123"}) + contract, err := contractRepository.GetContract("x123") Expect(err).ToNot(HaveOccurred()) sort.Slice(contract.Transactions, func(i, j int) bool { return contract.Transactions[i].Hash < contract.Transactions[j].Hash @@ -72,25 +76,25 @@ var _ = Describe("Creating contracts", func() { }) It("stores the ABI of the contract", func() { - repository.CreateContract(core.Contract{ + contractRepository.CreateContract(core.Contract{ Abi: "{\"some\": \"json\"}", Hash: "x123", }) - contract, err := repository.GetContract("x123") + contract, err := contractRepository.GetContract("x123") Expect(err).ToNot(HaveOccurred()) Expect(contract.Abi).To(Equal("{\"some\": \"json\"}")) }) It("updates the ABI of the contract if hash already present", func() { - repository.CreateContract(core.Contract{ + contractRepository.CreateContract(core.Contract{ Abi: "{\"some\": \"json\"}", Hash: "x123", }) - repository.CreateContract(core.Contract{ + contractRepository.CreateContract(core.Contract{ Abi: "{\"some\": \"different json\"}", Hash: "x123", }) - contract, err := repository.GetContract("x123") + contract, err := contractRepository.GetContract("x123") Expect(err).ToNot(HaveOccurred()) Expect(contract.Abi).To(Equal("{\"some\": \"different json\"}")) }) diff --git a/pkg/repositories/postgres/helpers.go b/pkg/repositories/postgres/helpers.go index 88efa07a..9e5040c5 100644 --- a/pkg/repositories/postgres/helpers.go +++ b/pkg/repositories/postgres/helpers.go @@ -5,18 +5,18 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -func ClearData(postgres *DB) { - postgres.DB.MustExec("DELETE FROM watched_contracts") - postgres.DB.MustExec("DELETE FROM transactions") - postgres.DB.MustExec("DELETE FROM blocks") - postgres.DB.MustExec("DELETE FROM logs") - postgres.DB.MustExec("DELETE FROM receipts") - postgres.DB.MustExec("DELETE FROM log_filters") +func (db *DB) clearData() { + db.MustExec("DELETE FROM watched_contracts") + db.MustExec("DELETE FROM transactions") + db.MustExec("DELETE FROM blocks") + db.MustExec("DELETE FROM logs") + db.MustExec("DELETE FROM receipts") + db.MustExec("DELETE FROM log_filters") } -func BuildRepository(node core.Node) *DB { +func NewTestDB(node core.Node) *DB { cfg, _ := config.NewConfig("private") - repository, _ := NewDB(cfg.Database, node) - ClearData(repository) - return repository + db, _ := NewDB(cfg.Database, node) + db.clearData() + return db } diff --git a/pkg/repositories/postgres/log_filter_repository.go b/pkg/repositories/postgres/log_filter_repository.go index c2b6c99f..4ef40af2 100644 --- a/pkg/repositories/postgres/log_filter_repository.go +++ b/pkg/repositories/postgres/log_filter_repository.go @@ -10,8 +10,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories" ) -func (db DB) CreateFilter(query filters.LogFilter) error { - _, err := db.DB.Exec( +type FilterRepository struct { + *DB +} + +func (filterRepository FilterRepository) CreateFilter(query filters.LogFilter) error { + _, err := filterRepository.DB.Exec( `INSERT INTO log_filters (name, from_block, to_block, address, topic0, topic1, topic2, topic3) VALUES ($1, NULLIF($2, -1), NULLIF($3, -1), $4, NULLIF($5, ''), NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''))`, @@ -22,9 +26,9 @@ func (db DB) CreateFilter(query filters.LogFilter) error { return nil } -func (db DB) GetFilter(name string) (filters.LogFilter, error) { +func (filterRepository FilterRepository) GetFilter(name string) (filters.LogFilter, error) { lf := DBLogFilter{} - err := db.DB.Get(&lf, + err := filterRepository.DB.Get(&lf, `SELECT id, name, diff --git a/pkg/repositories/postgres/log_filter_repository_test.go b/pkg/repositories/postgres/log_filter_repository_test.go index 51692d4f..12aaa67d 100644 --- a/pkg/repositories/postgres/log_filter_repository_test.go +++ b/pkg/repositories/postgres/log_filter_repository_test.go @@ -1,4 +1,4 @@ -package postgres +package postgres_test import ( . "github.com/onsi/ginkgo" @@ -6,10 +6,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/repositories" + "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" ) -var _ = Describe("Logs Repository", func() { - var repository repositories.FilterRepository +var _ = Describe("Log Filters Repository", func() { + var db *postgres.DB + var filterRepository repositories.FilterRepository var node core.Node BeforeEach(func() { node = core.Node{ @@ -18,7 +20,8 @@ var _ = Describe("Logs Repository", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - repository = BuildRepository(node) + db = postgres.NewTestDB(node) + filterRepository = postgres.FilterRepository{DB: db} }) Describe("LogFilter", func() { @@ -37,7 +40,7 @@ var _ = Describe("Logs Repository", func() { "", }, } - err := repository.CreateFilter(logFilter) + err := filterRepository.CreateFilter(logFilter) Expect(err).ToNot(HaveOccurred()) }) @@ -54,7 +57,7 @@ var _ = Describe("Logs Repository", func() { "", }, } - err := repository.CreateFilter(logFilter) + err := filterRepository.CreateFilter(logFilter) Expect(err).To(HaveOccurred()) }) @@ -72,7 +75,7 @@ var _ = Describe("Logs Repository", func() { "", }, } - err := repository.CreateFilter(logFilter1) + err := filterRepository.CreateFilter(logFilter1) Expect(err).ToNot(HaveOccurred()) logFilter2 := filters.LogFilter{ Name: "TestFilter2", @@ -86,19 +89,19 @@ var _ = Describe("Logs Repository", func() { "", }, } - err = repository.CreateFilter(logFilter2) + err = filterRepository.CreateFilter(logFilter2) Expect(err).ToNot(HaveOccurred()) - logFilter1, err = repository.GetFilter("TestFilter1") + logFilter1, err = filterRepository.GetFilter("TestFilter1") Expect(err).ToNot(HaveOccurred()) Expect(logFilter1).To(Equal(logFilter1)) - logFilter1, err = repository.GetFilter("TestFilter1") + logFilter1, err = filterRepository.GetFilter("TestFilter1") Expect(err).ToNot(HaveOccurred()) Expect(logFilter2).To(Equal(logFilter2)) }) It("returns ErrFilterDoesNotExist error when log does not exist", func() { - _, err := repository.GetFilter("TestFilter1") + _, err := filterRepository.GetFilter("TestFilter1") Expect(err).To(Equal(repositories.ErrFilterDoesNotExist("TestFilter1"))) }) }) diff --git a/pkg/repositories/postgres/logs.go b/pkg/repositories/postgres/logs.go index 9d125a74..16df0b96 100644 --- a/pkg/repositories/postgres/logs.go +++ b/pkg/repositories/postgres/logs.go @@ -8,9 +8,13 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -func (db DB) CreateLogs(logs []core.Log) error { - tx, _ := db.DB.BeginTx(context.Background(), nil) - for _, tlog := range logs { +type LogRepository struct { + *DB +} + +func (logRepository LogRepository) CreateLogs(lgs []core.Log) error { + tx, _ := logRepository.DB.BeginTx(context.Background(), nil) + for _, tlog := range lgs { _, err := tx.Exec( `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -26,8 +30,8 @@ func (db DB) CreateLogs(logs []core.Log) error { return nil } -func (db DB) GetLogs(address string, blockNumber int64) []core.Log { - logRows, _ := db.DB.Query( +func (logRepository LogRepository) GetLogs(address string, blockNumber int64) []core.Log { + logRows, _ := logRepository.DB.Query( `SELECT block_number, address, tx_hash, @@ -40,11 +44,11 @@ func (db DB) GetLogs(address string, blockNumber int64) []core.Log { FROM logs WHERE address = $1 AND block_number = $2 ORDER BY block_number DESC`, address, blockNumber) - return db.loadLogs(logRows) + return logRepository.loadLogs(logRows) } -func (db DB) loadLogs(logsRows *sql.Rows) []core.Log { - var logs []core.Log +func (logRepository LogRepository) loadLogs(logsRows *sql.Rows) []core.Log { + var lgs []core.Log for logsRows.Next() { var blockNumber int64 var address string @@ -53,7 +57,7 @@ func (db DB) loadLogs(logsRows *sql.Rows) []core.Log { var data string var topics core.Topics logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) - log := core.Log{ + lg := core.Log{ BlockNumber: blockNumber, TxHash: txHash, Address: address, @@ -61,9 +65,9 @@ func (db DB) loadLogs(logsRows *sql.Rows) []core.Log { Data: data, } for i, topic := range topics { - log.Topics[i] = topic + lg.Topics[i] = topic } - logs = append(logs, log) + lgs = append(lgs, lg) } - return logs + return lgs } diff --git a/pkg/repositories/postgres/logs_test.go b/pkg/repositories/postgres/logs_test.go index 85f451bd..a582e592 100644 --- a/pkg/repositories/postgres/logs_test.go +++ b/pkg/repositories/postgres/logs_test.go @@ -11,7 +11,8 @@ import ( ) var _ = Describe("Logs Repository", func() { - var repository repositories.LogsRepository + var db *postgres.DB + var logsRepository repositories.LogRepository var node core.Node BeforeEach(func() { node = core.Node{ @@ -20,12 +21,13 @@ var _ = Describe("Logs Repository", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - repository = postgres.BuildRepository(node) + db = postgres.NewTestDB(node) + logsRepository = postgres.LogRepository{DB: db} }) Describe("Saving logs", func() { It("returns the log when it exists", func() { - repository.CreateLogs([]core.Log{{ + logsRepository.CreateLogs([]core.Log{{ BlockNumber: 1, Index: 0, Address: "x123", @@ -35,7 +37,7 @@ var _ = Describe("Logs Repository", func() { }}, ) - log := repository.GetLogs("x123", 1) + log := logsRepository.GetLogs("x123", 1) Expect(log).NotTo(BeNil()) Expect(log[0].BlockNumber).To(Equal(int64(1))) @@ -49,12 +51,12 @@ var _ = Describe("Logs Repository", func() { }) It("returns nil if log does not exist", func() { - log := repository.GetLogs("x123", 1) + log := logsRepository.GetLogs("x123", 1) Expect(log).To(BeNil()) }) It("filters to the correct block number and address", func() { - repository.CreateLogs([]core.Log{{ + logsRepository.CreateLogs([]core.Log{{ BlockNumber: 1, Index: 0, Address: "x123", @@ -63,7 +65,7 @@ var _ = Describe("Logs Repository", func() { Data: "xabc", }}, ) - repository.CreateLogs([]core.Log{{ + logsRepository.CreateLogs([]core.Log{{ BlockNumber: 1, Index: 1, Address: "x123", @@ -72,7 +74,7 @@ var _ = Describe("Logs Repository", func() { Data: "xdef", }}, ) - repository.CreateLogs([]core.Log{{ + logsRepository.CreateLogs([]core.Log{{ BlockNumber: 2, Index: 0, Address: "x123", @@ -82,7 +84,7 @@ var _ = Describe("Logs Repository", func() { }}, ) - log := repository.GetLogs("x123", 1) + log := logsRepository.GetLogs("x123", 1) type logIndex struct { blockNumber int64 @@ -114,7 +116,8 @@ var _ = Describe("Logs Repository", func() { It("saves the logs attached to a receipt", func() { var blockRepository repositories.BlockRepository - blockRepository = postgres.BuildRepository(node) + blockRepository = postgres.BlockRepository{DB: db} + logs := []core.Log{{ Address: "0x8a4774fe82c63484afef97ca8d89a6ea5e21f973", BlockNumber: 4745407, @@ -168,7 +171,7 @@ var _ = Describe("Logs Repository", func() { block := core.Block{Transactions: []core.Transaction{transaction}} err := blockRepository.CreateOrUpdateBlock(block) Expect(err).To(Not(HaveOccurred())) - retrievedLogs := repository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407) + retrievedLogs := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407) expected := logs[1:] Expect(retrievedLogs).To(Equal(expected)) diff --git a/pkg/repositories/postgres/node_repository.go b/pkg/repositories/postgres/node_repository.go deleted file mode 100644 index 8ec0f0e2..00000000 --- a/pkg/repositories/postgres/node_repository.go +++ /dev/null @@ -1,27 +0,0 @@ -package postgres - -import "github.com/vulcanize/vulcanizedb/pkg/core" - -type NodeRepository interface { - CreateNode(node *core.Node) error -} - -func (db *DB) CreateNode(node *core.Node) error { - var nodeId int64 - err := db.DB.QueryRow( - `INSERT INTO nodes (genesis_block, network_id, node_id, client_name) - VALUES ($1, $2, $3, $4) - ON CONFLICT (genesis_block, network_id, node_id) - DO UPDATE - SET genesis_block = $1, - network_id = $2, - node_id = $3, - client_name = $4 - RETURNING id`, - node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId) - if err != nil { - return ErrUnableToSetNode - } - db.nodeId = nodeId - return nil -} diff --git a/pkg/repositories/postgres/postgres.go b/pkg/repositories/postgres/postgres.go index 996fc963..f7d8cdca 100644 --- a/pkg/repositories/postgres/postgres.go +++ b/pkg/repositories/postgres/postgres.go @@ -35,3 +35,23 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) { } return &pg, nil } + +func (db *DB) CreateNode(node *core.Node) error { + var nodeId int64 + err := db.QueryRow( + `INSERT INTO nodes (genesis_block, network_id, node_id, client_name) + VALUES ($1, $2, $3, $4) + ON CONFLICT (genesis_block, network_id, node_id) + DO UPDATE + SET genesis_block = $1, + network_id = $2, + node_id = $3, + client_name = $4 + RETURNING id`, + node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId) + if err != nil { + return ErrUnableToSetNode + } + db.nodeId = nodeId + return nil +} diff --git a/pkg/repositories/postgres/postgres_test.go b/pkg/repositories/postgres/postgres_test.go index 670cea0c..775486b1 100644 --- a/pkg/repositories/postgres/postgres_test.go +++ b/pkg/repositories/postgres/postgres_test.go @@ -22,8 +22,8 @@ func init() { log.SetOutput(ioutil.Discard) } -var _ = Describe("Postgres repository", func() { - var repository *postgres.DB +var _ = Describe("Postgres DB", func() { + var db *postgres.DB It("connects to the database", func() { cfg, _ := config.NewConfig("private") @@ -40,9 +40,7 @@ var _ = Describe("Postgres repository", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - cfg, _ := config.NewConfig("private") - repository, _ = postgres.NewDB(cfg.Database, node) - postgres.ClearData(repository) + db = postgres.NewTestDB(node) }) It("serializes big.Int to db", func() { @@ -91,10 +89,11 @@ var _ = Describe("Postgres repository", func() { } cfg, _ := config.NewConfig("private") node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"} - repository, _ := postgres.NewDB(cfg.Database, node) + db, _ := postgres.NewDB(cfg.Database, node) + blocksRepository := postgres.BlockRepository{DB: db} - err1 := repository.CreateOrUpdateBlock(badBlock) - savedBlock, err2 := repository.GetBlock(123) + err1 := blocksRepository.CreateOrUpdateBlock(badBlock) + savedBlock, err2 := blocksRepository.GetBlock(123) Expect(err1).To(HaveOccurred()) Expect(err2).To(HaveOccurred()) @@ -126,10 +125,11 @@ var _ = Describe("Postgres repository", func() { } cfg, _ := config.NewConfig("private") node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"} - repository, _ := postgres.NewDB(cfg.Database, node) + db, _ := postgres.NewDB(cfg.Database, node) + logRepository := postgres.LogRepository{DB: db} - err := repository.CreateLogs([]core.Log{badLog}) - savedBlock := repository.GetLogs("x123", 1) + err := logRepository.CreateLogs([]core.Log{badLog}) + savedBlock := logRepository.GetLogs("x123", 1) Expect(err).ToNot(BeNil()) Expect(savedBlock).To(BeNil()) @@ -145,10 +145,11 @@ var _ = Describe("Postgres repository", func() { } cfg, _ := config.NewConfig("private") node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1, Id: "x123", ClientName: "geth"} - repository, _ := postgres.NewDB(cfg.Database, node) + db, _ := postgres.NewDB(cfg.Database, node) + blockRepository := postgres.BlockRepository{DB: db} - err1 := repository.CreateOrUpdateBlock(block) - savedBlock, err2 := repository.GetBlock(123) + err1 := blockRepository.CreateOrUpdateBlock(block) + savedBlock, err2 := blockRepository.GetBlock(123) Expect(err1).To(HaveOccurred()) Expect(err2).To(HaveOccurred()) diff --git a/pkg/repositories/postgres/receipt_repository.go b/pkg/repositories/postgres/receipt_repository.go index 341ffb31..0b892bb7 100644 --- a/pkg/repositories/postgres/receipt_repository.go +++ b/pkg/repositories/postgres/receipt_repository.go @@ -7,8 +7,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories" ) -func (db DB) GetReceipt(txHash string) (core.Receipt, error) { - row := db.DB.QueryRow( +type ReceiptRepository struct { + *DB +} + +func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) { + row := receiptRepository.DB.QueryRow( `SELECT contract_address, tx_hash, cumulative_gas_used, diff --git a/pkg/repositories/postgres/receipts_repository_test.go b/pkg/repositories/postgres/receipts_repository_test.go index 1e0a68e4..22f58a40 100644 --- a/pkg/repositories/postgres/receipts_repository_test.go +++ b/pkg/repositories/postgres/receipts_repository_test.go @@ -8,8 +8,9 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" ) -var _ = Describe("Logs Repository", func() { - var repository repositories.ReceiptRepository +var _ bool = Describe("Logs Repository", func() { + var receiptRepository repositories.ReceiptRepository + var db *postgres.DB var node core.Node BeforeEach(func() { node = core.Node{ @@ -18,13 +19,15 @@ var _ = Describe("Logs Repository", func() { Id: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845", ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9", } - repository = postgres.BuildRepository(node) + db = postgres.NewTestDB(node) + receiptRepository = postgres.ReceiptRepository{DB: db} }) Describe("Saving receipts", func() { It("returns the receipt when it exists", func() { var blockRepository repositories.BlockRepository - blockRepository = postgres.BuildRepository(node) + db := postgres.NewTestDB(node) + blockRepository = postgres.BlockRepository{DB: db} expected := core.Receipt{ ContractAddress: "0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", CumulativeGasUsed: 7996119, @@ -42,7 +45,7 @@ var _ = Describe("Logs Repository", func() { block := core.Block{Transactions: []core.Transaction{transaction}} blockRepository.CreateOrUpdateBlock(block) - receipt, err := repository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223") + receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223") Expect(err).ToNot(HaveOccurred()) //Not currently serializing bloom logs @@ -55,14 +58,15 @@ var _ = Describe("Logs Repository", func() { }) It("returns ErrReceiptDoesNotExist when receipt does not exist", func() { - receipt, err := repository.GetReceipt("DOES NOT EXIST") + receipt, err := receiptRepository.GetReceipt("DOES NOT EXIST") Expect(err).To(HaveOccurred()) Expect(receipt).To(BeZero()) }) It("still saves receipts without logs", func() { var blockRepository repositories.BlockRepository - blockRepository = postgres.BuildRepository(node) + db := postgres.NewTestDB(node) + blockRepository = postgres.BlockRepository{DB: db} receipt := core.Receipt{ TxHash: "0x002c4799161d809b23f67884eb6598c9df5894929fe1a9ead97ca175d360f547", } @@ -76,7 +80,7 @@ var _ = Describe("Logs Repository", func() { } blockRepository.CreateOrUpdateBlock(block) - _, err := repository.GetReceipt(receipt.TxHash) + _, err := receiptRepository.GetReceipt(receipt.TxHash) Expect(err).To(Not(HaveOccurred())) }) diff --git a/pkg/repositories/postgres/watched_events.go b/pkg/repositories/postgres/watched_events.go index 29df2faf..0ccc92af 100644 --- a/pkg/repositories/postgres/watched_events.go +++ b/pkg/repositories/postgres/watched_events.go @@ -4,8 +4,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -func (db DB) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) { - rows, err := db.DB.Queryx(`SELECT name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name) +type WatchedEventRepository struct { + *DB +} + +func (watchedEventRepository WatchedEventRepository) GetWatchedEvents(name string) ([]*core.WatchedEvent, error) { + rows, err := watchedEventRepository.DB.Queryx(`SELECT name, block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data FROM watched_event_logs where name=$1`, name) if err != nil { return nil, err } diff --git a/pkg/repositories/postgres/watched_events_test.go b/pkg/repositories/postgres/watched_events_test.go index 3399e79d..2cb65a53 100644 --- a/pkg/repositories/postgres/watched_events_test.go +++ b/pkg/repositories/postgres/watched_events_test.go @@ -1,30 +1,24 @@ package postgres_test import ( - "log" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/repositories/postgres" ) var _ = Describe("Watched Events Repository", func() { - var repository *postgres.DB + var db *postgres.DB + var logRepository postgres.LogRepository + var filterRepository postgres.FilterRepository + var watchedEventRepository postgres.WatchedEventRepository BeforeEach(func() { - - cfg, err := config.NewConfig("private") - if err != nil { - log.Fatal(err) - } - repository, err = postgres.NewDB(cfg.Database, core.Node{}) - if err != nil { - log.Fatal(err) - } - postgres.ClearData(repository) + db = postgres.NewTestDB(core.Node{}) + logRepository = postgres.LogRepository{DB: db} + filterRepository = postgres.FilterRepository{DB: db} + watchedEventRepository = postgres.WatchedEventRepository{DB: db} }) It("retrieves watched event logs that match the event filter", func() { @@ -57,11 +51,11 @@ var _ = Describe("Watched Events Repository", func() { Data: "", }, } - err := repository.CreateFilter(filter) + err := filterRepository.CreateFilter(filter) Expect(err).ToNot(HaveOccurred()) - err = repository.CreateLogs(logs) + err = logRepository.CreateLogs(logs) Expect(err).ToNot(HaveOccurred()) - matchingLogs, err := repository.GetWatchedEvents("Filter1") + matchingLogs, err := watchedEventRepository.GetWatchedEvents("Filter1") Expect(err).ToNot(HaveOccurred()) Expect(matchingLogs).To(Equal(expectedWatchedEventLog)) @@ -103,11 +97,11 @@ var _ = Describe("Watched Events Repository", func() { Index: 0, Data: "", }} - err := repository.CreateFilter(filter) + err := filterRepository.CreateFilter(filter) Expect(err).ToNot(HaveOccurred()) - err = repository.CreateLogs(logs) + err = logRepository.CreateLogs(logs) Expect(err).ToNot(HaveOccurred()) - matchingLogs, err := repository.GetWatchedEvents("Filter1") + matchingLogs, err := watchedEventRepository.GetWatchedEvents("Filter1") Expect(err).ToNot(HaveOccurred()) Expect(matchingLogs).To(Equal(expectedWatchedEventLog)) diff --git a/pkg/repositories/repository.go b/pkg/repositories/repository.go index fd740b97..d11d65ef 100644 --- a/pkg/repositories/repository.go +++ b/pkg/repositories/repository.go @@ -8,15 +8,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/filters" ) -type Repository interface { - BlockRepository - ContractRepository - LogsRepository - ReceiptRepository - FilterRepository - WatchedEventsRepository -} - var ErrBlockDoesNotExist = func(blockNumber int64) error { return errors.New(fmt.Sprintf("Block number %d does not exist", blockNumber)) } @@ -47,7 +38,7 @@ type FilterRepository interface { GetFilter(name string) (filters.LogFilter, error) } -type LogsRepository interface { +type LogRepository interface { CreateLogs(logs []core.Log) error GetLogs(address string, blockNumber int64) []core.Log } @@ -60,6 +51,6 @@ type ReceiptRepository interface { GetReceipt(txHash string) (core.Receipt, error) } -type WatchedEventsRepository interface { +type WatchedEventRepository interface { GetWatchedEvents(name string) ([]*core.WatchedEvent, error) } diff --git a/utils/utils.go b/utils/utils.go index 95835702..3399d949 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -24,11 +24,11 @@ func LoadConfig(environment string) config.Config { } func LoadPostgres(database config.Database, node core.Node) postgres.DB { - repository, err := postgres.NewDB(database, node) + db, err := postgres.NewDB(database, node) if err != nil { log.Fatalf("Error loading postgres\n%v", err) } - return *repository + return *db } func ReadAbiFile(abiFilepath string) string {