diff --git a/Makefile b/Makefile index fb5cc3f..bd5ffb6 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ integrationtest: | $(GINKGO) $(GOOSE) test: | $(GINKGO) $(GOOSE) go vet ./... go fmt ./... - $(GINKGO) -r validator_test/ -v + $(GINKGO) -r pkg/validator/ validator_test/ -v build: go fmt ./... diff --git a/pkg/validator/ref_integrity.go b/pkg/validator/ref_integrity.go new file mode 100644 index 0000000..fe4e5cc --- /dev/null +++ b/pkg/validator/ref_integrity.go @@ -0,0 +1,224 @@ +package validator + +import ( + "fmt" + + "github.com/jmoiron/sqlx" +) + +// ValidateReferentialIntegrity validates referential integrity at the given height +func ValidateReferentialIntegrity(db *sqlx.DB, blockNumber uint64) error { + + err := ValidateHeaderCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateUncleCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateTransactionCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateReceiptCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateStateCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateStorageCIDsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateStateAccountsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateAccessListElementsRef(db, blockNumber) + if err != nil { + return err + } + + err = ValidateLogCIDsRef(db, blockNumber) + if err != nil { + return err + } + + return nil +} + +// ValidateHeaderCIDsRef does a reference integrity check on references in eth.header_cids table +func ValidateHeaderCIDsRef(db *sqlx.DB, blockNumber uint64) error { + err := ValidateIPFSBlocks(db, blockNumber, "eth.header_cids", "mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateUncleCIDsRef does a reference integrity check on references in eth.uncle_cids table +func ValidateUncleCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, UncleCIDsRefHeaderCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.uncle_cids", "mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateTransactionCIDsRef does a reference integrity check on references in eth.header_cids table +func ValidateTransactionCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, TransactionCIDsRefHeaderCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.transaction_cids", "mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateReceiptCIDsRef does a reference integrity check on references in eth.receipt_cids table +func ValidateReceiptCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, ReceiptCIDsRefTransactionCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.receipt_cids", "leaf_mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateStateCIDsRef does a reference integrity check on references in eth.state_cids table +func ValidateStateCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, StateCIDsRefHeaderCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.state_cids", "mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateStorageCIDsRef does a reference integrity check on references in eth.storage_cids table +func ValidateStorageCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, StorageCIDsRefStateCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.storage_cids", "mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateStateAccountsRef does a reference integrity check on references in eth.state_accounts table +func ValidateStateAccountsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, StateAccountsRefStateCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids") + } + + return nil +} + +// ValidateAccessListElementsRef does a reference integrity check on references in eth.access_list_elements table +func ValidateAccessListElementsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, AccessListElementsRefTransactionCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids") + } + + return nil +} + +// ValidateLogCIDsRef does a reference integrity check on references in eth.log_cids table +func ValidateLogCIDsRef(db *sqlx.DB, blockNumber uint64) error { + var exists bool + err := db.Get(&exists, LogCIDsRefReceiptCIDs, blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.receipt_cids") + } + + err = ValidateIPFSBlocks(db, blockNumber, "eth.log_cids", "leaf_mh_key") + if err != nil { + return err + } + + return nil +} + +// ValidateIPFSBlocks does a reference integrity check between the given CID table and IPFS blocks table on MHKey and block number +func ValidateIPFSBlocks(db *sqlx.DB, blockNumber uint64, CIDTable string, mhKeyField string) error { + var exists bool + err := db.Get(&exists, fmt.Sprintf(CIDsRefIPLDBlocks, CIDTable, mhKeyField), blockNumber) + if err != nil { + return err + } + if exists { + return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "public.blocks") + } + + return nil +} diff --git a/pkg/validator/ref_integrity_queries.go b/pkg/validator/ref_integrity_queries.go new file mode 100644 index 0000000..cbc5f20 --- /dev/null +++ b/pkg/validator/ref_integrity_queries.go @@ -0,0 +1,119 @@ +package validator + +// Queries to validate referential integrity in the indexed data: +// At the given block number, +// In each table, for each (would be) foreign key reference, perform left join with the referenced table on the foreign key fields. +// Select rows where there are no matching rows in the referenced table. +// If any such rows exist, there are missing entries in the referenced table. + +const ( + CIDsRefIPLDBlocks = `SELECT EXISTS ( + SELECT * + FROM %[1]s + LEFT JOIN public.blocks ON ( + %[1]s.%[2]s = blocks.key + AND %[1]s.block_number = blocks.block_number + ) + WHERE + %[1]s.block_number = $1 + AND blocks.key IS NULL + )` + + UncleCIDsRefHeaderCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.uncle_cids + LEFT JOIN eth.header_cids ON ( + uncle_cids.header_id = header_cids.block_hash + AND uncle_cids.block_number = header_cids.block_number + ) + WHERE + uncle_cids.block_number = $1 + AND header_cids.block_hash IS NULL + )` + + TransactionCIDsRefHeaderCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.transaction_cids + LEFT JOIN eth.header_cids ON ( + transaction_cids.header_id = header_cids.block_hash + AND transaction_cids.block_number = header_cids.block_number + ) + WHERE + transaction_cids.block_number = $1 + AND header_cids.block_hash IS NULL + )` + + ReceiptCIDsRefTransactionCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.receipt_cids + LEFT JOIN eth.transaction_cids ON ( + receipt_cids.tx_id = transaction_cids.tx_hash + AND receipt_cids.block_number = transaction_cids.block_number + ) + WHERE + receipt_cids.block_number = $1 + AND transaction_cids.tx_hash IS NULL + )` + + StateCIDsRefHeaderCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.state_cids + LEFT JOIN eth.header_cids ON ( + state_cids.header_id = header_cids.block_hash + AND state_cids.block_number = header_cids.block_number + ) + WHERE + state_cids.block_number = $1 + AND header_cids.block_hash IS NULL + )` + + StorageCIDsRefStateCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.storage_cids + LEFT JOIN eth.state_cids ON ( + storage_cids.state_path = state_cids.state_path + AND storage_cids.header_id = state_cids.header_id + AND storage_cids.block_number = state_cids.block_number + ) + WHERE + storage_cids.block_number = $1 + AND state_cids.state_path IS NULL + )` + + StateAccountsRefStateCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.state_accounts + LEFT JOIN eth.state_cids ON ( + state_accounts.state_path = state_cids.state_path + AND state_accounts.header_id = state_cids.header_id + AND state_accounts.block_number = state_cids.block_number + ) + WHERE + state_accounts.block_number = $1 + AND state_cids.state_path IS NULL + )` + + AccessListElementsRefTransactionCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.access_list_elements + LEFT JOIN eth.transaction_cids ON ( + access_list_elements.tx_id = transaction_cids.tx_hash + AND access_list_elements.block_number = transaction_cids.block_number + ) + WHERE + access_list_elements.block_number = $1 + AND transaction_cids.tx_hash IS NULL + )` + + LogCIDsRefReceiptCIDs = `SELECT EXISTS ( + SELECT * + FROM eth.log_cids + LEFT JOIN eth.receipt_cids ON ( + log_cids.rct_id = receipt_cids.tx_id + AND log_cids.block_number = receipt_cids.block_number + ) + WHERE + log_cids.block_number = $1 + AND receipt_cids.tx_id IS NULL + )` +) diff --git a/pkg/validator/ref_integrity_test.go b/pkg/validator/ref_integrity_test.go new file mode 100644 index 0000000..80171c7 --- /dev/null +++ b/pkg/validator/ref_integrity_test.go @@ -0,0 +1,334 @@ +package validator_test + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/mocks" + "github.com/jmoiron/sqlx" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/ipld-eth-db-validator/pkg/validator" + "github.com/vulcanize/ipld-eth-server/v4/pkg/eth/test_helpers" + "github.com/vulcanize/ipld-eth-server/v4/pkg/shared" +) + +var _ = Describe("RefIntegrity", func() { + var ( + ctx = context.Background() + + db *sqlx.DB + diffIndexer interfaces.StateDiffIndexer + ) + + BeforeEach(func() { + db = shared.SetupDB() + diffIndexer = shared.SetupTestStateDiffIndexer(ctx, params.TestChainConfig, test_helpers.Genesis.Hash()) + }) + + AfterEach(func() { + shared.TearDownDB(db) + }) + + Describe("ValidateHeaderCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of header_cids table", func() { + err := validator.ValidateHeaderCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding header IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateHeaderCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateUncleCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of uncle_cids table", func() { + err := validator.ValidateUncleCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding header_cid entry not found", func() { + err := deleteEntriesFrom(db, "eth.header_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateUncleCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.header_cids")) + }) + + It("Throws an error if corresponding uncle IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateUncleCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateTransactionCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of transaction_cids table", func() { + err := validator.ValidateTransactionCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding header_cid entry not found", func() { + err := deleteEntriesFrom(db, "eth.header_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateTransactionCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.header_cids")) + }) + + It("Throws an error if corresponding transaction IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateTransactionCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateReceiptCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of receipt_cids table", func() { + err := validator.ValidateReceiptCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding transaction_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.transaction_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateReceiptCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.transaction_cids")) + }) + + It("Throws an error if corresponding receipt IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateReceiptCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateStateCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + for _, node := range test_helpers.MockStateNodes { + err = diffIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of state_cids table", func() { + err := validator.ValidateStateCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding header_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.header_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateStateCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.header_cids")) + }) + + It("Throws an error if corresponding state IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateStateCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateStorageCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + for _, node := range test_helpers.MockStateNodes { + err = diffIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of storage_cids table", func() { + err := validator.ValidateStorageCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding state_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.state_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateStorageCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.state_cids")) + }) + + It("Throws an error if corresponding storage IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateStorageCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + + Describe("ValidateStateAccountsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + for _, node := range test_helpers.MockStateNodes { + err = diffIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of state_accounts table", func() { + err := validator.ValidateStateAccountsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding state_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.state_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateStateAccountsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.state_cids")) + }) + }) + + Describe("ValidateAccessListElementsRef", func() { + BeforeEach(func() { + indexAndPublisher := shared.SetupTestStateDiffIndexer(ctx, mocks.TestConfig, test_helpers.Genesis.Hash()) + + tx, err := indexAndPublisher.PushBlock(mocks.MockBlock, mocks.MockReceipts, mocks.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of access_list_elements table", func() { + err := validator.ValidateAccessListElementsRef(db, mocks.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding transaction_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.transaction_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateAccessListElementsRef(db, mocks.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.transaction_cids")) + }) + }) + + Describe("ValidateLogCIDsRef", func() { + BeforeEach(func() { + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + for _, node := range test_helpers.MockStateNodes { + err = diffIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Submit(err) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Validates referential integrity of log_cids table", func() { + err := validator.ValidateLogCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Throws an error if corresponding receipt_cids entry not found", func() { + err := deleteEntriesFrom(db, "eth.receipt_cids") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateLogCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "eth.receipt_cids")) + }) + + It("Throws an error if corresponding log IPFS block entry not found", func() { + err := deleteEntriesFrom(db, "public.blocks") + Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateLogCIDsRef(db, test_helpers.MockBlock.NumberU64()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(validator.EntryNotFoundErr, "public.blocks")) + }) + }) + +}) + +func deleteEntriesFrom(db *sqlx.DB, tableName string) error { + pgStr := "DELETE FROM %s" + _, err := db.Exec(fmt.Sprintf(pgStr, tableName)) + return err +} diff --git a/pkg/validator/validator.go b/pkg/validator/validator.go index da12fb4..658e1ed 100644 --- a/pkg/validator/validator.go +++ b/pkg/validator/validator.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" + ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres" ipldEth "github.com/vulcanize/ipld-eth-server/v4/pkg/eth" ethServerShared "github.com/vulcanize/ipld-eth-server/v4/pkg/shared" @@ -28,6 +29,9 @@ import ( var ( big8 = big.NewInt(8) big32 = big.NewInt(32) + + ReferentialIntegrityErr = "referential integrity check failed at block %d, entry for %s not found" + EntryNotFoundErr = "entry for %s not found" ) type service struct { @@ -129,6 +133,15 @@ func (s *service) Validate(ctx context.Context, api *ipldEth.PublicEthAPI, idxBl } s.logger.Infof("state root verified for block %d", idxBlockNum) + + err = ValidateReferentialIntegrity(s.db, idxBlockNum) + if err != nil { + s.logger.Errorf("failed to verify referential integrity at block %d", idxBlockNum) + return idxBlockNum, err + } + + s.logger.Infof("referential integrity verified for block %d", idxBlockNum) + idxBlockNum++ } else { // Sleep / wait for head to move ahead diff --git a/pkg/validator/validator_suite_test.go b/pkg/validator/validator_suite_test.go new file mode 100644 index 0000000..49fabbd --- /dev/null +++ b/pkg/validator/validator_suite_test.go @@ -0,0 +1,13 @@ +package validator_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestValidator(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Validator Suite") +} diff --git a/validator_test/validator_test.go b/validator_test/validator_test.go index 97a52fb..0c481fe 100644 --- a/validator_test/validator_test.go +++ b/validator_test/validator_test.go @@ -120,6 +120,9 @@ var _ = Describe("eth state reading tests", func() { for i := uint64(blockHeight); i <= chainLength-trail; i++ { err = validator.ValidateBlock(context.Background(), api, i) Expect(err).ToNot(HaveOccurred()) + + err = validator.ValidateReferentialIntegrity(db, i) + Expect(err).ToNot(HaveOccurred()) } }) })