diff --git a/libraries/shared/repository/repository.go b/libraries/shared/repository/repository.go deleted file mode 100644 index 155e27ae..00000000 --- a/libraries/shared/repository/repository.go +++ /dev/null @@ -1,27 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository - -import "github.com/jmoiron/sqlx" - -func MarkContractWatcherHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error { - _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET `+checkedHeadersColumn+` = checked_headers.`+checkedHeadersColumn+` + 1`, headerID, 1) - return err -} diff --git a/libraries/shared/repository/repository_test.go b/libraries/shared/repository/repository_test.go deleted file mode 100644 index 72a613af..00000000 --- a/libraries/shared/repository/repository_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/repository" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/test_config" -) - -var _ = Describe("", func() { - Describe("MarkContractWatcherHeaderCheckedInTransaction", func() { - var ( - checkedHeadersColumn string - db *postgres.DB - ) - - BeforeEach(func() { - db = test_config.NewTestDB(test_config.NewTestNode()) - test_config.CleanTestDB(db) - checkedHeadersColumn = "test_column_checked" - _, migrateErr := db.Exec(`ALTER TABLE public.checked_headers - ADD COLUMN ` + checkedHeadersColumn + ` integer`) - Expect(migrateErr).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - _, cleanupMigrateErr := db.Exec(`ALTER TABLE public.checked_headers DROP COLUMN ` + checkedHeadersColumn) - Expect(cleanupMigrateErr).NotTo(HaveOccurred()) - }) - - It("marks passed header as checked within a passed transaction", func() { - headerRepository := repositories.NewHeaderRepository(db) - headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(headerErr).NotTo(HaveOccurred()) - tx, txErr := db.Beginx() - Expect(txErr).NotTo(HaveOccurred()) - - err := repository.MarkContractWatcherHeaderCheckedInTransaction(headerID, tx, checkedHeadersColumn) - Expect(err).NotTo(HaveOccurred()) - commitErr := tx.Commit() - Expect(commitErr).NotTo(HaveOccurred()) - var checkedCount int - fetchErr := db.Get(&checkedCount, `SELECT COUNT(*) FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(fetchErr).NotTo(HaveOccurred()) - Expect(checkedCount).To(Equal(1)) - }) - }) -}) diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go index b91d98c8..f3fe0188 100644 --- a/pkg/contract_watcher/header/transformer/transformer.go +++ b/pkg/contract_watcher/header/transformer/transformer.go @@ -128,7 +128,7 @@ func (tr *Transformer) Init() error { firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock() if retrieveErr != nil { if retrieveErr == sql.ErrNoRows { - logrus.Error(retrieveErr) + logrus.Error(fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())) firstBlock = 0 } else { return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error()) @@ -252,12 +252,6 @@ func (tr *Transformer) Execute() error { continue } - // Sort logs by the contract they belong to - // Start by adding every contract addr to the map - // So that if we don't have any logs for it, it is caught and the header is still marked checked for its events - for _, addr := range tr.contractAddresses { - sortedLogs[addr] = nil - } for _, log := range allLogs { addr := strings.ToLower(log.Address.Hex()) sortedLogs[addr] = append(sortedLogs[addr], log) @@ -265,20 +259,12 @@ func (tr *Transformer) Execute() error { // Process logs for each contract for conAddr, logs := range sortedLogs { - con := tr.Contracts[conAddr] - if len(logs) < 1 { - eventIds := make([]string, 0) - for eventName := range con.Events { - eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address)) - } - markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds) - if markCheckedErr != nil { - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } + if logs == nil { logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber) continue } // Configure converter with this contract + con := tr.Contracts[conAddr] tr.Converter.Update(con) // Convert logs into batches of log mappings (eventName => []types.Logs @@ -290,16 +276,10 @@ func (tr *Transformer) Execute() error { for eventName, logs := range convertedLogs { // If logs for this event are empty, mark them checked at this header and continue if len(logs) < 1 { - eventID := strings.ToLower(eventName + "_" + con.Address) - markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventID) - if markCheckedErr != nil { - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber) continue } // If logs aren't empty, persist them - // Header is marked checked in the transactions persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) if persistErr != nil { return fmt.Errorf("error persisting logs: %s", persistErr.Error()) @@ -307,6 +287,11 @@ func (tr *Transformer) Execute() error { } } + markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) + } + // Poll contracts at this block height pollingErr := tr.methodPolling(header, tr.sortedMethodIds) if pollingErr != nil { diff --git a/pkg/contract_watcher/shared/getter/interface_getter.go b/pkg/contract_watcher/shared/getter/interface_getter.go index bc2502b4..273940e9 100644 --- a/pkg/contract_watcher/shared/getter/interface_getter.go +++ b/pkg/contract_watcher/shared/getter/interface_getter.go @@ -17,6 +17,8 @@ package getter import ( + "fmt" + "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/fetcher" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -24,7 +26,7 @@ import ( // InterfaceGetter is used to derive the interface of a contract type InterfaceGetter interface { - GetABI(resolverAddr string, blockNumber int64) string + GetABI(resolverAddr string, blockNumber int64) (string, error) GetBlockChain() core.BlockChain } @@ -42,14 +44,18 @@ func NewInterfaceGetter(blockChain core.BlockChain) InterfaceGetter { } // GetABI is used to construct a custom ABI based on the results from calling supportsInterface -func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string { +func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) (string, error) { a := constants.SupportsInterfaceABI args := make([]interface{}, 1) args[0] = constants.MetaSig.Bytes() supports, err := g.getSupportsInterface(a, resolverAddr, blockNumber, args) - if err != nil || !supports { - return "" + if err != nil { + return "", fmt.Errorf("call to getSupportsInterface failed: %v", err) } + if !supports { + return "", fmt.Errorf("contract does not support interface") + } + abiStr := `[` args[0] = constants.AddrChangeSig.Bytes() supports, err = g.getSupportsInterface(a, resolverAddr, blockNumber, args) @@ -93,7 +99,7 @@ func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string } abiStr = abiStr[:len(abiStr)-1] + `]` - return abiStr + return abiStr, nil } // Use this method to check whether or not a contract supports a given method/event interface diff --git a/pkg/contract_watcher/shared/repository/event_repository.go b/pkg/contract_watcher/shared/repository/event_repository.go index d6d3c601..ab5756df 100644 --- a/pkg/contract_watcher/shared/repository/event_repository.go +++ b/pkg/contract_watcher/shared/repository/event_repository.go @@ -24,7 +24,6 @@ import ( "github.com/hashicorp/golang-lru" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) @@ -144,17 +143,6 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type } } - // Mark header as checked for this eventId - eventID := strings.ToLower(eventInfo.Name + "_" + contractAddr) - markCheckedErr := repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].ID, tx, eventID) // This assumes all logs are from same block - if markCheckedErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - logrus.Warnf("error rolling back transaction while marking header checked: %s", rollbackErr.Error()) - } - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } - return tx.Commit() } diff --git a/pkg/contract_watcher/shared/repository/event_repository_test.go b/pkg/contract_watcher/shared/repository/event_repository_test.go index 40f1a783..bc61d776 100644 --- a/pkg/contract_watcher/shared/repository/event_repository_test.go +++ b/pkg/contract_watcher/shared/repository/event_repository_test.go @@ -355,11 +355,6 @@ var _ = Describe("Repository", func() { Expect(count).To(Equal(2)) }) - It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() { - err = dataStore.PersistLogs(logs, event, con.Address, con.Name) - Expect(err).To(HaveOccurred()) - }) - It("Fails with empty log", func() { err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name) Expect(err).To(HaveOccurred())