diff --git a/integration_test/contract_watcher_header_sync_transformer_test.go b/integration_test/contract_watcher_header_sync_transformer_test.go index 4181b08e..08030134 100644 --- a/integration_test/contract_watcher_header_sync_transformer_test.go +++ b/integration_test/contract_watcher_header_sync_transformer_test.go @@ -40,8 +40,8 @@ var _ = Describe("contractWatcher headerSync transformer", func() { var blockChain core.BlockChain var headerRepository repositories.HeaderRepository var headerID int64 - var ensAddr = strings.ToLower(constants.EnsContractAddress) - var tusdAddr = strings.ToLower(constants.TusdContractAddress) + var ensAddr = strings.ToLower(constants.EnsContractAddress) // 0x314159265dd8dbb310642f98f50c066173c1259b + var tusdAddr = strings.ToLower(constants.TusdContractAddress) // 0x8dd5fbce2f6a956c3022ba3663759011dd51e73e BeforeEach(func() { db, blockChain = test_helpers.SetupDBandBC() @@ -377,6 +377,42 @@ var _ = Describe("contractWatcher headerSync transformer", func() { Expect(transferLog.Value).To(Equal("2800000000000000000000")) }) + It("Marks header checked for a contract that has no logs at that header", func() { + t := transformer.NewTransformer(test_helpers.ENSandTusdConfig, blockChain, db) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + Expect(t.Start).To(Equal(int64(6885702))) + + newOwnerLog := test_helpers.HeaderSyncNewOwnerLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog) + Expect(err).ToNot(HaveOccurred()) + transferLog := test_helpers.HeaderSyncTransferLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.transfer_event", tusdAddr)).StructScan(&transferLog) + Expect(err).ToNot(HaveOccurred()) + Expect(transferLog.HeaderID).ToNot(Equal(newOwnerLog.HeaderID)) + + type checkedHeader struct { + ID int64 `db:"id"` + HeaderID int64 `db:"header_id"` + NewOwner int64 `db:"newowner_0x314159265dd8dbb310642f98f50c066173c1259b"` + Transfer int64 `db:"transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e"` + } + + transferCheckedHeader := new(checkedHeader) + err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", transferLog.HeaderID).StructScan(transferCheckedHeader) + Expect(err).ToNot(HaveOccurred()) + Expect(transferCheckedHeader.Transfer).To(Equal(int64(1))) + Expect(transferCheckedHeader.NewOwner).To(Equal(int64(1))) + + newOwnerCheckedHeader := new(checkedHeader) + err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", newOwnerLog.HeaderID).StructScan(newOwnerCheckedHeader) + Expect(err).ToNot(HaveOccurred()) + Expect(newOwnerCheckedHeader.NewOwner).To(Equal(int64(1))) + Expect(newOwnerCheckedHeader.Transfer).To(Equal(int64(1))) + }) + It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() { var testConf config.ContractConfig testConf = test_helpers.ENSandTusdConfig diff --git a/pkg/contract_watcher/header/repository/header_repository.go b/pkg/contract_watcher/header/repository/header_repository.go index 4ad817f0..11be42a9 100644 --- a/pkg/contract_watcher/header/repository/header_repository.go +++ b/pkg/contract_watcher/header/repository/header_repository.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/hashicorp/golang-lru" - "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -268,12 +267,3 @@ func continuousHeaders(headers []core.Header) []core.Header { func (r *headerRepository) CheckCache(key string) (interface{}, bool) { return r.columns.Get(key) } - -// Used to mark a header checked as part of some external transaction so as to group into one commit -func (r *headerRepository) MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error { - _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET `+eventID+` = checked_headers.`+eventID+` + 1`, headerID, 1) - return err -} diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go index 34b3ddc1..6c436412 100644 --- a/pkg/contract_watcher/header/transformer/transformer.go +++ b/pkg/contract_watcher/header/transformer/transformer.go @@ -249,6 +249,11 @@ func (tr *Transformer) Execute() error { } // Sort logs by the contract they belong to + // Start by adding every contract addr to the map + // So that if we don't have any logs for it, it is caught and the header is still marked checked for its events + for _, addr := range tr.contractAddresses { + sortedLogs[addr] = nil + } for _, log := range allLogs { addr := strings.ToLower(log.Address.Hex()) sortedLogs[addr] = append(sortedLogs[addr], log) @@ -256,12 +261,20 @@ func (tr *Transformer) Execute() error { // Process logs for each contract for conAddr, logs := range sortedLogs { - if logs == nil { + con := tr.Contracts[conAddr] + if len(logs) < 1 { + eventIds := make([]string, 0) + for eventName := range con.Events { + eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address)) + } + markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) + } logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber) continue } // Configure converter with this contract - con := tr.Contracts[conAddr] tr.Converter.Update(con) // Convert logs into batches of log mappings (eventName => []types.Logs diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/database.go b/pkg/contract_watcher/shared/helpers/test_helpers/database.go index 352ad79b..1988060e 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/database.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/database.go @@ -294,8 +294,7 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`CREATE TABLE checked_headers ( id SERIAL PRIMARY KEY, - header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE, - check_count INTEGER NOT NULL DEFAULT 1);`) + header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE);`) Expect(err).NotTo(HaveOccurred()) _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`)