diff --git a/integration_test/omni_full_transformer_test.go b/integration_test/contract_watcher_full_transformer_test.go similarity index 99% rename from integration_test/omni_full_transformer_test.go rename to integration_test/contract_watcher_full_transformer_test.go index 1da30e73..cecf11eb 100644 --- a/integration_test/omni_full_transformer_test.go +++ b/integration_test/contract_watcher_full_transformer_test.go @@ -20,7 +20,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" ) -var _ = Describe("Omni full transformer", func() { +var _ = Describe("contractWatcher full transformer", func() { var db *postgres.DB var err error var blockChain core.BlockChain diff --git a/integration_test/omni_light_transformer_test.go b/integration_test/contract_watcher_light_transformer_test.go similarity index 88% rename from integration_test/omni_light_transformer_test.go rename to integration_test/contract_watcher_light_transformer_test.go index c0efd79d..3ca326c3 100644 --- a/integration_test/omni_light_transformer_test.go +++ b/integration_test/contract_watcher_light_transformer_test.go @@ -18,12 +18,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" ) -var _ = Describe("Omnit light transformer", func() { +var _ = Describe("contractWatcher light transformer", func() { var db *postgres.DB var err error var blockChain core.BlockChain var headerRepository repositories.HeaderRepository - var headerID, headerID2 int64 + var headerID int64 var ensAddr = strings.ToLower(constants.EnsContractAddress) var tusdAddr = strings.ToLower(constants.TusdContractAddress) @@ -292,26 +292,12 @@ var _ = Describe("Omnit light transformer", func() { Describe("Execute- against both ENS and TrueUSD", func() { BeforeEach(func() { - header1, err := blockChain.GetHeaderByNumber(6791668) - Expect(err).ToNot(HaveOccurred()) - header2, err := blockChain.GetHeaderByNumber(6791669) - Expect(err).ToNot(HaveOccurred()) - header3, err := blockChain.GetHeaderByNumber(6791670) - Expect(err).ToNot(HaveOccurred()) - header4, err := blockChain.GetHeaderByNumber(6885695) - Expect(err).ToNot(HaveOccurred()) - header5, err := blockChain.GetHeaderByNumber(6885696) - Expect(err).ToNot(HaveOccurred()) - header6, err := blockChain.GetHeaderByNumber(6885697) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header1) - headerID, err = headerRepository.CreateOrUpdateHeader(header2) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header3) - headerRepository.CreateOrUpdateHeader(header4) - headerID2, err = headerRepository.CreateOrUpdateHeader(header5) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header6) + for i := 6885692; i < 6885702; i++ { + header, err := blockChain.GetHeaderByNumber(int64(i)) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(header) + Expect(err).ToNot(HaveOccurred()) + } }) It("Transforms watched contract data into custom repositories", func() { @@ -325,7 +311,6 @@ var _ = Describe("Omnit light transformer", func() { err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures - Expect(newOwnerLog.HeaderID).To(Equal(headerID2)) Expect(newOwnerLog.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")) Expect(newOwnerLog.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047")) Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) @@ -334,10 +319,9 @@ var _ = Describe("Omnit light transformer", func() { err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&transferLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures - Expect(transferLog.HeaderID).To(Equal(headerID)) - Expect(transferLog.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")) - Expect(transferLog.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")) - Expect(transferLog.Value).To(Equal("9998940000000000000000")) + Expect(transferLog.From).To(Equal("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")) + Expect(transferLog.To).To(Equal("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")) + Expect(transferLog.Value).To(Equal("2800000000000000000000")) }) It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() { @@ -358,7 +342,7 @@ var _ = Describe("Omnit light transformer", func() { Expect(err).ToNot(HaveOccurred()) Expect(len(ens.EmittedHashes)).To(Equal(2)) Expect(len(ens.EmittedAddrs)).To(Equal(0)) - Expect(len(tusd.EmittedAddrs)).To(Equal(4)) + Expect(len(tusd.EmittedAddrs)).To(Equal(2)) Expect(len(tusd.EmittedHashes)).To(Equal(0)) b, ok := ens.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")] @@ -369,21 +353,16 @@ var _ = Describe("Omnit light transformer", func() { Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")] + b, ok = tusd.EmittedAddrs[common.HexToAddress("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")] Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")] + b, ok = tusd.EmittedAddrs[common.HexToAddress("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")] Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - - b, ok = tusd.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) + _, ok = tusd.EmittedAddrs[common.HexToAddress("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")] + Expect(ok).To(Equal(false)) }) It("Polls given methods for each contract, using list of collected values", func() { @@ -414,12 +393,12 @@ var _ = Describe("Omnit light transformer", func() { Expect(err).To(HaveOccurred()) bal := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x8cA465764873E71CEa525F5EB6AE973d650c22C2' AND block = '6885701'", tusdAddr)).StructScan(&bal) Expect(err).ToNot(HaveOccurred()) - Expect(bal.Balance).To(Equal("55849938025000000000000")) + Expect(bal.Balance).To(Equal("1954436000000000000000")) Expect(bal.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6885701'", tusdAddr)).StructScan(&bal) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/contract_watcher/light/repository/header_repository.go b/pkg/contract_watcher/light/repository/header_repository.go index d2aeac3d..47437f13 100644 --- a/pkg/contract_watcher/light/repository/header_repository.go +++ b/pkg/contract_watcher/light/repository/header_repository.go @@ -53,6 +53,7 @@ func NewHeaderRepository(db *postgres.DB) *headerRepository { } } +// Adds a checked_header column for the provided column id func (r *headerRepository) AddCheckColumn(id string) error { // Check cache to see if column already exists before querying pg _, ok := r.columns.Get(id) @@ -73,6 +74,7 @@ func (r *headerRepository) AddCheckColumn(id string) error { return nil } +// Adds a checked_header column for all of the provided column ids func (r *headerRepository) AddCheckColumns(ids []string) error { var err error baseQuery := "ALTER TABLE public.checked_headers" @@ -96,6 +98,7 @@ func (r *headerRepository) AddCheckColumns(ids []string) error { return err } +// Marks the header checked for the provided column id func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`) VALUES ($1, $2) @@ -105,6 +108,7 @@ func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { return err } +// Marks the header checked for all of the provided column ids func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error { pgStr := "INSERT INTO public.checked_headers (header_id, " for _, id := range ids { @@ -124,6 +128,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) return err } +// Marks all of the provided headers checked for each of the provided column ids func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error { tx, err := r.db.Begin() if err != nil { @@ -154,6 +159,7 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [ return tx.Commit() } +// Returns missing headers for the provided checked_headers column id func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, id string) ([]core.Header, error) { var result []core.Header var query string @@ -165,7 +171,7 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber WHERE (header_id ISNULL OR checked_headers.` + id + `=0) AND headers.block_number >= $1 AND headers.eth_node_fingerprint = $2 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID) } else { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers @@ -174,13 +180,14 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID) } - return result, err + return contiguousHeaders(result, startingBlockNumber), err } +// Returns missing headers for all of the provided checked_headers column ids func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) { var result []core.Header var query string @@ -196,21 +203,42 @@ func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlock if endingBlockNumber == -1 { endStr := `) AND headers.block_number >= $1 AND headers.eth_node_fingerprint = $2 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` query = baseQuery + endStr err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID) } else { endStr := `) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` query = baseQuery + endStr err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID) } - return result, err + return contiguousHeaders(result, startingBlockNumber), err } +// Takes in an ordered sequence of headers and returns only the first contiguous segment +// Enforce continuity with previous segment with the appropriate startingBlockNumber +func contiguousHeaders(headers []core.Header, startingBlockNumber int64) []core.Header { + if len(headers) < 1 { + return headers + } + previousHeader := headers[0].BlockNumber + if previousHeader != startingBlockNumber { + return []core.Header{} + } + for i := 1; i < len(headers); i++ { + previousHeader++ + if headers[i].BlockNumber != previousHeader { + return headers[:i] + } + } + + return headers +} + +// Returns headers that have been checked for all of the provided event ids but not for the provided method ids func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) { var result []core.Header var query string @@ -231,14 +259,14 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock if endingBlockNumber == -1 { endStr := `AND headers.block_number >= $1 AND headers.eth_node_fingerprint = $2 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` query = baseQuery + endStr err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID) } else { endStr := `AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3 - ORDER BY headers.block_number` + ORDER BY headers.block_number LIMIT 100` query = baseQuery + endStr err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID) } @@ -246,10 +274,12 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock return result, err } +// Check the repositories column id cache for a value func (r *headerRepository) CheckCache(key string) (interface{}, bool) { return r.columns.Get(key) } +// Used to mark a header checked as part of some external transaction so as to group into one commit func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error { _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) VALUES ($1, $2) diff --git a/pkg/contract_watcher/light/repository/header_repository_test.go b/pkg/contract_watcher/light/repository/header_repository_test.go index 60aabf39..1ca2e1ac 100644 --- a/pkg/contract_watcher/light/repository/header_repository_test.go +++ b/pkg/contract_watcher/light/repository/header_repository_test.go @@ -32,6 +32,7 @@ import ( var _ = Describe("Repository", func() { var db *postgres.DB + var bc core.BlockChain var omniHeaderRepo repository.HeaderRepository // omni/light header repository var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository var eventIDs = []string{ @@ -46,7 +47,7 @@ var _ = Describe("Repository", func() { } BeforeEach(func() { - db, _ = test_helpers.SetupDBandBC() + db, bc = test_helpers.SetupDBandBC() omniHeaderRepo = repository.NewHeaderRepository(db) coreHeaderRepo = repositories.NewHeaderRepository(db) }) @@ -120,7 +121,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) @@ -130,7 +131,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -142,12 +143,24 @@ var _ = Describe("Repository", func() { Expect(h3.BlockNumber).To(Equal(int64(6194634))) }) + It("Returns only contiguous chunks of headers", func() { + addDiscontinuousHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(2)) + Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632))) + Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633))) + }) + It("Fails if eventID does not yet exist in check_headers table", func() { addHeaders(coreHeaderRepo) err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - _, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId") + _, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, "notEventId") Expect(err).To(HaveOccurred()) }) }) @@ -158,14 +171,14 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumns(eventIDs) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -174,18 +187,42 @@ var _ = Describe("Repository", func() { err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[2]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194633, 6194635, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) }) + It("Returns only contiguous chunks of headers", func() { + addDiscontinuousHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(2)) + Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632))) + Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633))) + }) + + It("Returns at most 100 headers", func() { + add102Headers(coreHeaderRepo, bc) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194733, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(100)) + Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632))) + Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633))) + }) + It("Fails if one of the eventIDs does not yet exist in check_headers table", func() { addHeaders(coreHeaderRepo) err := omniHeaderRepo.AddCheckColumns(eventIDs) Expect(err).ToNot(HaveOccurred()) badEventIDs := append(eventIDs, "notEventId") - _, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, badEventIDs) + _, err = omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, badEventIDs) Expect(err).To(HaveOccurred()) }) }) @@ -196,7 +233,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -204,7 +241,7 @@ var _ = Describe("Repository", func() { err = omniHeaderRepo.MarkHeaderChecked(headerID, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194633, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) }) @@ -214,7 +251,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -222,7 +259,7 @@ var _ = Describe("Repository", func() { err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId") Expect(err).To(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) @@ -234,7 +271,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.AddCheckColumns(eventIDs) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -242,7 +279,7 @@ var _ = Describe("Repository", func() { err = omniHeaderRepo.MarkHeaderCheckedForAll(headerID, eventIDs) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194633, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) }) @@ -261,7 +298,7 @@ var _ = Describe("Repository", func() { for _, id := range methodIDs { err := omniHeaderRepo.AddCheckColumn(id) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, id) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) } @@ -269,7 +306,7 @@ var _ = Describe("Repository", func() { err := omniHeaderRepo.MarkHeadersCheckedForAll(missingHeaders, methodIDs) Expect(err).ToNot(HaveOccurred()) for _, id := range methodIDs { - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, id) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(0)) } @@ -286,7 +323,7 @@ var _ = Describe("Repository", func() { Expect(err).ToNot(HaveOccurred()) } - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -301,7 +338,7 @@ var _ = Describe("Repository", func() { Expect(err).ToNot(HaveOccurred()) } - intersectionHeaders, err := omniHeaderRepo.MissingMethodsCheckedEventsIntersection(6194630, 6194635, methodIDs, eventIDs) + intersectionHeaders, err := omniHeaderRepo.MissingMethodsCheckedEventsIntersection(6194632, 6194635, methodIDs, eventIDs) Expect(err).ToNot(HaveOccurred()) Expect(len(intersectionHeaders)).To(Equal(1)) Expect(intersectionHeaders[0].Id).To(Equal(headerID2)) @@ -315,3 +352,18 @@ func addHeaders(coreHeaderRepo repositories.HeaderRepository) { coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader2) coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader3) } + +func addDiscontinuousHeaders(coreHeaderRepo repositories.HeaderRepository) { + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader1) + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader2) + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader4) +} + +func add102Headers(coreHeaderRepo repositories.HeaderRepository, blockChain core.BlockChain) { + for i := 6194632; i < 6194733; i++ { + header, err := blockChain.GetHeaderByNumber(int64(i)) + Expect(err).ToNot(HaveOccurred()) + _, err = coreHeaderRepo.CreateOrUpdateHeader(header) + Expect(err).ToNot(HaveOccurred()) + } +} diff --git a/pkg/contract_watcher/light/transformer/transformer.go b/pkg/contract_watcher/light/transformer/transformer.go index d3daa48a..080ca040 100644 --- a/pkg/contract_watcher/light/transformer/transformer.go +++ b/pkg/contract_watcher/light/transformer/transformer.go @@ -285,6 +285,8 @@ func (tr *transformer) Execute() error { if err != nil { return err } + + tr.start = header.BlockNumber + 1 } return nil diff --git a/pkg/contract_watcher/light/transformer/transformer_test.go b/pkg/contract_watcher/light/transformer/transformer_test.go index 8e5902e4..9fe3d3a1 100644 --- a/pkg/contract_watcher/light/transformer/transformer_test.go +++ b/pkg/contract_watcher/light/transformer/transformer_test.go @@ -39,7 +39,7 @@ var _ = Describe("Transformer", func() { var err error var blockChain core.BlockChain var headerRepository repositories.HeaderRepository - var headerID, headerID2 int64 + var headerID int64 var ensAddr = strings.ToLower(constants.EnsContractAddress) var tusdAddr = strings.ToLower(constants.TusdContractAddress) @@ -328,26 +328,12 @@ var _ = Describe("Transformer", func() { Describe("Execute- against both ENS and TrueUSD", func() { BeforeEach(func() { - header1, err := blockChain.GetHeaderByNumber(6791668) - Expect(err).ToNot(HaveOccurred()) - header2, err := blockChain.GetHeaderByNumber(6791669) - Expect(err).ToNot(HaveOccurred()) - header3, err := blockChain.GetHeaderByNumber(6791670) - Expect(err).ToNot(HaveOccurred()) - header4, err := blockChain.GetHeaderByNumber(6885695) - Expect(err).ToNot(HaveOccurred()) - header5, err := blockChain.GetHeaderByNumber(6885696) - Expect(err).ToNot(HaveOccurred()) - header6, err := blockChain.GetHeaderByNumber(6885697) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header1) - headerID, err = headerRepository.CreateOrUpdateHeader(header2) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header3) - headerRepository.CreateOrUpdateHeader(header4) - headerID2, err = headerRepository.CreateOrUpdateHeader(header5) - Expect(err).ToNot(HaveOccurred()) - headerRepository.CreateOrUpdateHeader(header6) + for i := 6885692; i < 6885702; i++ { + header, err := blockChain.GetHeaderByNumber(int64(i)) + Expect(err).ToNot(HaveOccurred()) + _, err = headerRepository.CreateOrUpdateHeader(header) + Expect(err).ToNot(HaveOccurred()) + } }) It("Transforms watched contract data into custom repositories", func() { @@ -361,7 +347,6 @@ var _ = Describe("Transformer", func() { err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures - Expect(newOwnerLog.HeaderID).To(Equal(headerID2)) Expect(newOwnerLog.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")) Expect(newOwnerLog.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047")) Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) @@ -370,10 +355,9 @@ var _ = Describe("Transformer", func() { err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&transferLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures - Expect(transferLog.HeaderID).To(Equal(headerID)) - Expect(transferLog.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")) - Expect(transferLog.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")) - Expect(transferLog.Value).To(Equal("9998940000000000000000")) + Expect(transferLog.From).To(Equal("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")) + Expect(transferLog.To).To(Equal("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")) + Expect(transferLog.Value).To(Equal("2800000000000000000000")) }) It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() { @@ -394,7 +378,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) Expect(len(ens.EmittedHashes)).To(Equal(2)) Expect(len(ens.EmittedAddrs)).To(Equal(0)) - Expect(len(tusd.EmittedAddrs)).To(Equal(4)) + Expect(len(tusd.EmittedAddrs)).To(Equal(2)) Expect(len(tusd.EmittedHashes)).To(Equal(0)) b, ok := ens.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")] @@ -405,21 +389,16 @@ var _ = Describe("Transformer", func() { Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")] + b, ok = tusd.EmittedAddrs[common.HexToAddress("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")] Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")] + b, ok = tusd.EmittedAddrs[common.HexToAddress("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")] Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - b, ok = tusd.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - - b, ok = tusd.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) + _, ok = tusd.EmittedAddrs[common.HexToAddress("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")] + Expect(ok).To(Equal(false)) }) It("Polls given methods for each contract, using list of collected values", func() { @@ -450,12 +429,12 @@ var _ = Describe("Transformer", func() { Expect(err).To(HaveOccurred()) bal := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x8cA465764873E71CEa525F5EB6AE973d650c22C2' AND block = '6885701'", tusdAddr)).StructScan(&bal) Expect(err).ToNot(HaveOccurred()) - Expect(bal.Balance).To(Equal("55849938025000000000000")) + Expect(bal.Balance).To(Equal("1954436000000000000000")) Expect(bal.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6885701'", tusdAddr)).StructScan(&bal) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go b/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go index debcc8dc..e65db708 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/mocks/entities.go @@ -183,6 +183,13 @@ var MockHeader3 = core.Header{ Timestamp: "50000030", } +var MockHeader4 = core.Header{ + Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs", + BlockNumber: 6194635, + Raw: rawFakeHeader, + Timestamp: "50000030", +} + var MockTransferLog1 = types.Log{ Index: 1, Address: common.HexToAddress(constants.TusdContractAddress), diff --git a/pkg/contract_watcher/shared/poller/poller.go b/pkg/contract_watcher/shared/poller/poller.go index 0a38316c..ae4f1974 100644 --- a/pkg/contract_watcher/shared/poller/poller.go +++ b/pkg/contract_watcher/shared/poller/poller.go @@ -54,7 +54,9 @@ func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *po func (p *poller) PollContract(con contract.Contract) error { for i := con.StartingBlock; i <= con.LastBlock; i++ { - p.PollContractAt(con, i) + if err := p.PollContractAt(con, i); err != nil { + return err + } } return nil @@ -98,7 +100,6 @@ func (p *poller) pollNoArgAt(m types.Method, bn int64) error { if err != nil { return errors.New(fmt.Sprintf("poller error calling 0 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", bn, m.Name, p.contract.Address, err)) } - strOut, err := stringify(out) if err != nil { return err @@ -138,8 +139,8 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error { if len(args) == 0 { // If we haven't collected any args by now we can't call the method return nil } - results := make([]types.Result, 0, len(args)) + results := make([]types.Result, 0, len(args)) for arg := range args { in := []interface{}{arg} strIn := []interface{}{contract.StringifyArg(arg)} @@ -160,7 +161,6 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error { result.Output = strOut results = append(results, result) } - // Persist result set as batch err := p.PersistResults(results, m, p.contract.Address, p.contract.Name) if err != nil { @@ -204,7 +204,6 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error { } results := make([]types.Result, 0, len(firstArgs)*len(secondArgs)) - for arg1 := range firstArgs { for arg2 := range secondArgs { in := []interface{}{arg1, arg2} @@ -215,18 +214,15 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error { if err != nil { return errors.New(fmt.Sprintf("poller error calling 2 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", bn, m.Name, p.contract.Address, err)) } - strOut, err := stringify(out) if err != nil { return err } - p.cache(out) result.Output = strOut result.Inputs = strIn results = append(results, result) - } } @@ -243,9 +239,8 @@ func (p *poller) FetchContractData(contractAbi, contractAddress, method string, return p.bc.FetchContractData(contractAbi, contractAddress, method, methodArgs, result, blockNumber) } -// This is used to cache an method return value if method piping is turned on +// This is used to cache a method return value if method piping is turned on func (p *poller) cache(out interface{}) { - // Cache returned value if piping is turned on if p.contract.Piping { switch out.(type) { case common.Hash: diff --git a/pkg/geth/contract.go b/pkg/geth/contract.go index f59cab5f..b5f968ec 100644 --- a/pkg/geth/contract.go +++ b/pkg/geth/contract.go @@ -43,7 +43,11 @@ func (blockChain *BlockChain) FetchContractData(abiJSON string, address string, if err != nil { return err } - output, err := blockChain.callContract(address, input, big.NewInt(blockNumber)) + var bn *big.Int + if blockNumber > 0 { + bn = big.NewInt(blockNumber) + } + output, err := blockChain.callContract(address, input, bn) if err != nil { return err }