diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/database.go b/pkg/contract_watcher/shared/helpers/test_helpers/database.go index 8e8f92b0..cd1ed7c7 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/database.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/database.go @@ -175,34 +175,6 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract }.Init() } -func SetupENSRepo(vulcanizeLogID *int64, wantedEvents, wantedMethods []string) (*postgres.DB, *contract.Contract) { - db, err := postgres.NewDB(config.Database{ - Hostname: "localhost", - Name: "vulcanize_testing", - Port: 5432, - }, core.Node{}) - Expect(err).NotTo(HaveOccurred()) - - receiptRepository := repositories.FullSyncReceiptRepository{DB: db} - logRepository := repositories.FullSyncLogRepository{DB: db} - blockRepository := *repositories.NewBlockRepository(db) - - blockNumber := rand.Int63() - blockID := CreateBlock(blockNumber, blockRepository) - - receipts := []core.Receipt{{Logs: []core.FullSyncLog{{}}}} - - err = receiptRepository.CreateReceiptsAndLogs(blockID, receipts) - Expect(err).ToNot(HaveOccurred()) - - err = logRepository.Get(vulcanizeLogID, `SELECT id FROM full_sync_logs`) - Expect(err).ToNot(HaveOccurred()) - - info := SetupENSContract(wantedEvents, wantedMethods) - - return db, info -} - func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract { p := mocks.NewParser(constants.ENSAbiString) err := p.Parse(constants.EnsContractAddress) diff --git a/pkg/datastore/postgres/postgres.go b/pkg/datastore/postgres/postgres.go index 47fcd1bb..14a54536 100644 --- a/pkg/datastore/postgres/postgres.go +++ b/pkg/datastore/postgres/postgres.go @@ -29,8 +29,6 @@ type DB struct { NodeID int64 } -var () - func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) { connectString := config.DbConnectionString(databaseConfig) db, connectErr := sqlx.Connect("postgres", connectString) diff --git a/pkg/seed_node/repository.go b/pkg/seed_node/repository.go index 1e522d4d..24357909 100644 --- a/pkg/seed_node/repository.go +++ b/pkg/seed_node/repository.go @@ -19,6 +19,7 @@ package seed_node import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" + log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" @@ -49,24 +50,24 @@ func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error { } headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) if err != nil { - tx.Rollback() + log.Error(tx.Rollback()) return err } for uncleHash, cid := range cidPayload.UncleCIDS { err = repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex()) if err != nil { - tx.Rollback() + log.Error(tx.Rollback()) return err } } err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) if err != nil { - tx.Rollback() + log.Error(tx.Rollback()) return err } err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) if err != nil { - tx.Rollback() + log.Error(tx.Rollback()) return err } return tx.Commit() diff --git a/pkg/seed_node/repository_test.go b/pkg/seed_node/repository_test.go new file mode 100644 index 00000000..7f9a623d --- /dev/null +++ b/pkg/seed_node/repository_test.go @@ -0,0 +1,110 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package seed_node_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" +) + +var ( + db *postgres.DB + err error + repo seed_node.CIDRepository +) + +var _ = Describe("Repository", func() { + BeforeEach(func() { + db, err = seed_node.SetupDB() + Expect(err).ToNot(HaveOccurred()) + repo = seed_node.NewCIDRepository(db) + }) + AfterEach(func() { + seed_node.TearDownDB(db) + }) + Describe("Index", func() { + It("Indexes CIDs and related metadata into vulcanizedb", func() { + err = repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) + pgStr := `SELECT cid FROM header_cids + WHERE block_number = $1 AND final IS TRUE` + // check header was properly indexed + headers := make([]string, 0) + err = db.Select(&headers, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(headers)).To(Equal(1)) + Expect(headers[0]).To(Equal("mockHeaderCID")) + // check trxs were properly indexed + trxs := make([]string, 0) + pgStr = `SELECT transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(&trxs, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(trxs)).To(Equal(2)) + Expect(trxs[0]).To(Equal("mockTrxCID1")) + Expect(trxs[1]).To(Equal("mockTrxCID2")) + // check receipts were properly indexed + rcts := make([]string, 0) + pgStr = `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + err = db.Select(&rcts, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(rcts)).To(Equal(2)) + Expect(rcts[0]).To(Equal("mockRctCID1")) + Expect(rcts[1]).To(Equal("mockRctCID2")) + // check that state nodes were properly indexed + stateNodes := make([]ipfs.StateNodeCID, 0) + pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(&stateNodes, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(stateNodes)).To(Equal(2)) + Expect(stateNodes[0]).To(Equal(ipfs.StateNodeCID{ + CID: "mockStateCID1", + Leaf: true, + Key: mocks.ContractLeafKey.Hex(), + })) + Expect(stateNodes[1]).To(Equal(ipfs.StateNodeCID{ + CID: "mockStateCID2", + Leaf: true, + Key: mocks.AnotherContractLeafKey.Hex(), + })) + // check that storage nodes were properly indexed + storageNodes := make([]ipfs.StorageNodeCID, 0) + pgStr = `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.leaf FROM storage_cids, state_cids, header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + err = db.Select(&storageNodes, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(storageNodes)).To(Equal(1)) + Expect(storageNodes[0]).To(Equal(ipfs.StorageNodeCID{ + CID: "mockStorageCID", + Leaf: true, + Key: "0x0000000000000000000000000000000000000000000000000000000000000001", + StateKey: mocks.ContractLeafKey.Hex(), + })) + }) + }) +}) diff --git a/pkg/seed_node/retreiver.go b/pkg/seed_node/retreiver.go index 0fcc4c3d..acd25c5f 100644 --- a/pkg/seed_node/retreiver.go +++ b/pkg/seed_node/retreiver.go @@ -216,7 +216,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StateNodeCID, error) { log.Debug("retrieving state cids for block ", blockNumber) args := make([]interface{}, 0, 2) - pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) + pgStr := `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` args = append(args, blockNumber) addrLen := len(streamFilters.StateFilter.Addresses) @@ -239,7 +239,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config. func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StorageNodeCID, error) { log.Debug("retrieving storage cids for block ", blockNumber) args := make([]interface{}, 0, 3) - pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids + pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.leaf FROM storage_cids, state_cids, header_cids WHERE storage_cids.state_id = state_cids.id AND state_cids.header_id = header_cids.id AND header_cids.block_number = $1` diff --git a/pkg/seed_node/test_helpers.go b/pkg/seed_node/test_helpers.go new file mode 100644 index 00000000..ebf77028 --- /dev/null +++ b/pkg/seed_node/test_helpers.go @@ -0,0 +1,54 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package seed_node + +import ( + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +func SetupDB() (*postgres.DB, error) { + return postgres.NewDB(config.Database{ + Hostname: "localhost", + Name: "vulcanize_private", + Port: 5432, + }, core.Node{}) +} + +func TearDownDB(db *postgres.DB) { + tx, err := db.Beginx() + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`DELETE FROM header_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM transaction_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM receipt_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM state_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM storage_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM blocks`) + Expect(err).NotTo(HaveOccurred()) + + err = tx.Commit() + Expect(err).NotTo(HaveOccurred()) +}