From d1883296611f4ad69f23f03e25917b41906de837 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 18 Dec 2018 11:31:21 -0600 Subject: [PATCH] fetch event logs in batches- all events for a contract address at a given block height are fetched together, different contract events still fetched separately --- pkg/omni/full/transformer/transformer.go | 8 +- pkg/omni/light/converter/converter.go | 91 +++++++++++ .../light/repository/header_repository.go | 82 +++++++++- .../repository/header_repository_test.go | 153 ++++++++++++++---- pkg/omni/light/transformer/transformer.go | 69 ++++---- .../light/transformer/transformer_test.go | 2 +- pkg/omni/shared/contract/contract.go | 7 +- .../shared/helpers/test_helpers/database.go | 9 ++ pkg/omni/shared/parser/parser.go | 3 +- pkg/omni/shared/types/event.go | 6 +- pkg/omni/shared/types/method.go | 6 +- 11 files changed, 354 insertions(+), 82 deletions(-) diff --git a/pkg/omni/full/transformer/transformer.go b/pkg/omni/full/transformer/transformer.go index b9dbcaa7..b072652a 100644 --- a/pkg/omni/full/transformer/transformer.go +++ b/pkg/omni/full/transformer/transformer.go @@ -195,8 +195,8 @@ func (tr transformer) Execute() error { tr.Update(con) // Iterate through contract filters and get watched event logs - for eventName := range con.Filters { - watchedEvents, err := tr.GetWatchedEvents(eventName) + for eventSig, filter := range con.Filters { + watchedEvents, err := tr.GetWatchedEvents(filter.Name) if err != nil { return err } @@ -204,7 +204,7 @@ func (tr transformer) Execute() error { // Iterate over watched event logs for _, we := range watchedEvents { // Convert them to our custom log type - cstm, err := tr.Converter.Convert(*we, con.Events[eventName]) + cstm, err := tr.Converter.Convert(*we, con.Events[eventSig]) if err != nil { return err } @@ -214,7 +214,7 @@ func (tr transformer) Execute() error { // If log is not empty, immediately persist in repo // Run this in seperate goroutine? - err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventName], con.Address, con.Name) + err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name) if err != nil { return err } diff --git a/pkg/omni/light/converter/converter.go b/pkg/omni/light/converter/converter.go index bf4495c7..8efafbb6 100644 --- a/pkg/omni/light/converter/converter.go +++ b/pkg/omni/light/converter/converter.go @@ -34,6 +34,7 @@ import ( type Converter interface { Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) + ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) Update(info *contract.Contract) } @@ -129,3 +130,93 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in return returnLogs, nil } + +// Convert the given watched event logs into types.Logs +func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) { + contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) + eventsToLogs := make(map[string][]types.Log) + for _, event := range events { + eventsToLogs[event.Name] = make([]types.Log, 0, len(logs)) + // Iterate through all event logs + for _, log := range logs { + // If the log is of this event type, process it as such + if event.Sig() == log.Topics[0] { + values := make(map[string]interface{}) + for _, field := range event.Fields { + var i interface{} + values[field.Name] = i + } + + err := contract.UnpackLogIntoMap(values, event.Name, log) + if err != nil { + return nil, err + } + + strValues := make(map[string]string, len(values)) + seenAddrs := make([]interface{}, 0, len(values)) + seenHashes := make([]interface{}, 0, len(values)) + for fieldName, input := range values { + // Postgres cannot handle custom types, resolve everything to strings + switch input.(type) { + case *big.Int: + b := input.(*big.Int) + strValues[fieldName] = b.String() + case common.Address: + a := input.(common.Address) + strValues[fieldName] = a.String() + seenAddrs = append(seenAddrs, a) + case common.Hash: + h := input.(common.Hash) + strValues[fieldName] = h.String() + seenHashes = append(seenHashes, h) + case string: + strValues[fieldName] = input.(string) + case bool: + strValues[fieldName] = strconv.FormatBool(input.(bool)) + case []byte: + b := input.([]byte) + strValues[fieldName] = hexutil.Encode(b) + if len(b) == 32 { + seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName])) + } + case byte: + b := input.(byte) + strValues[fieldName] = string(b) + default: + return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input)) + } + } + + // Only hold onto logs that pass our address filter, if any + if c.ContractInfo.PassesEventFilter(strValues) { + raw, err := json.Marshal(log) + if err != nil { + return nil, err + } + + eventsToLogs[event.Name] = append(eventsToLogs[event.Name], types.Log{ + LogIndex: log.Index, + Values: strValues, + Raw: raw, + TransactionIndex: log.TxIndex, + Id: headerID, + }) + + // Cache emitted values if their caching is turned on + if c.ContractInfo.EmittedAddrs != nil { + c.ContractInfo.AddEmittedAddr(seenAddrs...) + } + if c.ContractInfo.EmittedHashes != nil { + c.ContractInfo.AddEmittedHash(seenHashes...) + } + } + } + } + } + + return eventsToLogs, nil +} + +func (c *converter) handleDSNote() { + +} diff --git a/pkg/omni/light/repository/header_repository.go b/pkg/omni/light/repository/header_repository.go index c8f23493..c26b3e60 100644 --- a/pkg/omni/light/repository/header_repository.go +++ b/pkg/omni/light/repository/header_repository.go @@ -29,11 +29,14 @@ import ( const columnCacheSize = 1000 type HeaderRepository interface { - AddCheckColumn(eventID string) error + AddCheckColumn(id string) error + AddCheckColumns(ids []string) error MarkHeaderChecked(headerID int64, eventID string) error - MarkHeadersChecked(headers []core.Header, ids []string) error + MarkHeaderCheckedForAll(headerID int64, ids []string) error + MarkHeadersCheckedForAll(headers []core.Header, ids []string) error MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) + MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) CheckCache(key string) (interface{}, bool) } @@ -70,6 +73,29 @@ func (r *headerRepository) AddCheckColumn(id string) error { return nil } +func (r *headerRepository) AddCheckColumns(ids []string) error { + var err error + baseQuery := "ALTER TABLE public.checked_headers" + input := make([]string, 0, len(ids)) + for _, id := range ids { + _, ok := r.columns.Get(id) + if !ok { + baseQuery += " ADD COLUMN IF NOT EXISTS " + id + " BOOLEAN NOT NULL DEFAULT FALSE," + input = append(input, id) + } + } + if len(input) > 0 { + _, err = r.db.Exec(baseQuery[:len(baseQuery)-1]) + if err == nil { + for _, id := range input { + r.columns.Add(id, true) + } + } + } + + return err +} + func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`) VALUES ($1, $2) @@ -79,7 +105,26 @@ func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { return err } -func (r *headerRepository) MarkHeadersChecked(headers []core.Header, ids []string) error { +func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error { + pgStr := "INSERT INTO public.checked_headers (header_id, " + for _, id := range ids { + pgStr += id + ", " + } + pgStr = pgStr[:len(pgStr)-2] + ") VALUES ($1, " + for i := 0; i < len(ids); i++ { + pgStr += "true, " + } + pgStr = pgStr[:len(pgStr)-2] + ") ON CONFLICT (header_id) DO UPDATE SET " + for _, id := range ids { + pgStr += fmt.Sprintf("%s = true, ", id) + } + pgStr = pgStr[:len(pgStr)-2] + _, err := r.db.Exec(pgStr, headerID) + + return err +} + +func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error { tx, err := r.db.Begin() if err != nil { return err @@ -136,10 +181,41 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber return result, err } +func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) { + var result []core.Header + var query string + var err error + + baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL` + for _, id := range ids { + baseQuery += ` OR ` + id + ` IS FALSE` + } + + if endingBlockNumber == -1 { + endStr := `) AND headers.block_number >= $1 + AND headers.eth_node_fingerprint = $2 + ORDER BY headers.block_number` + query = baseQuery + endStr + err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID) + } else { + endStr := `) AND headers.block_number >= $1 + AND headers.block_number <= $2 + AND headers.eth_node_fingerprint = $3 + ORDER BY headers.block_number` + query = baseQuery + endStr + err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID) + } + + return result, err +} + func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) { var result []core.Header var query string var err error + baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers LEFT JOIN checked_headers on headers.id = header_id WHERE (header_id IS NOT NULL` diff --git a/pkg/omni/light/repository/header_repository_test.go b/pkg/omni/light/repository/header_repository_test.go index 4c211d6f..4f25cabc 100644 --- a/pkg/omni/light/repository/header_repository_test.go +++ b/pkg/omni/light/repository/header_repository_test.go @@ -34,13 +34,21 @@ var _ = Describe("Repository", func() { var db *postgres.DB var omniHeaderRepo repository.HeaderRepository // omni/light header repository var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository - var eventID, query string + var eventIDs = []string{ + "eventName_contractAddr", + "eventName_contractAddr2", + "eventName_contractAddr3", + } + var methodIDs = []string{ + "methodName_contractAddr", + "methodName_contractAddr2", + "methodName_contractAddr3", + } BeforeEach(func() { db, _ = test_helpers.SetupDBandBC() omniHeaderRepo = repository.NewHeaderRepository(db) coreHeaderRepo = repositories.NewHeaderRepository(db) - eventID = "eventName_contractAddr" }) AfterEach(func() { @@ -49,11 +57,11 @@ var _ = Describe("Repository", func() { Describe("AddCheckColumn", func() { It("Creates a column for the given eventID to mark if the header has been checked for that event", func() { - query = fmt.Sprintf("SELECT %s FROM checked_headers", eventID) + query := fmt.Sprintf("SELECT %s FROM checked_headers", eventIDs[0]) _, err := db.Exec(query) Expect(err).To(HaveOccurred()) - err = omniHeaderRepo.AddCheckColumn(eventID) + err = omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) _, err = db.Exec(query) @@ -61,35 +69,68 @@ var _ = Describe("Repository", func() { }) It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() { - _, ok := omniHeaderRepo.CheckCache(eventID) + _, ok := omniHeaderRepo.CheckCache(eventIDs[0]) Expect(ok).To(Equal(false)) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - v, ok := omniHeaderRepo.CheckCache(eventID) + v, ok := omniHeaderRepo.CheckCache(eventIDs[0]) Expect(ok).To(Equal(true)) Expect(v).To(Equal(true)) }) }) + Describe("AddCheckColumns", func() { + It("Creates a column for the given eventIDs to mark if the header has been checked for those events", func() { + for _, id := range eventIDs { + _, err := db.Exec(fmt.Sprintf("SELECT %s FROM checked_headers", id)) + Expect(err).To(HaveOccurred()) + } + + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + for _, id := range eventIDs { + _, err := db.Exec(fmt.Sprintf("SELECT %s FROM checked_headers", id)) + Expect(err).ToNot(HaveOccurred()) + } + }) + + It("Caches columns it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + for _, id := range eventIDs { + _, ok := omniHeaderRepo.CheckCache(id) + Expect(ok).To(Equal(false)) + } + + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + for _, id := range eventIDs { + v, ok := omniHeaderRepo.CheckCache(id) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + } + }) + }) + Describe("MissingHeaders", func() { It("Returns all unchecked headers for the given eventID", func() { addHeaders(coreHeaderRepo) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) It("Returns unchecked headers in ascending order", func() { addHeaders(coreHeaderRepo) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -103,7 +144,7 @@ var _ = Describe("Repository", func() { It("Fails if eventID does not yet exist in check_headers table", func() { addHeaders(coreHeaderRepo) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) _, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId") @@ -111,31 +152,69 @@ var _ = Describe("Repository", func() { }) }) + Describe("MissingHeadersForAll", func() { // HERE + It("Returns all headers that have not been checked for all of the ids provided", func() { + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[0]) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[1]) + Expect(err).ToNot(HaveOccurred()) + err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[2]) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(2)) + }) + + It("Fails if one of the eventIDs does not yet exist in check_headers table", func() { + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + badEventIDs := append(eventIDs, "notEventId") + + _, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, badEventIDs) + Expect(err).To(HaveOccurred()) + }) + }) + Describe("MarkHeaderChecked", func() { It("Marks the header checked for the given eventID", func() { addHeaders(coreHeaderRepo) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) headerID := missingHeaders[0].Id - err = omniHeaderRepo.MarkHeaderChecked(headerID, eventID) + err = omniHeaderRepo.MarkHeaderChecked(headerID, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) }) It("Fails if eventID does not yet exist in check_headers table", func() { addHeaders(coreHeaderRepo) - err := omniHeaderRepo.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventIDs[0]) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) @@ -143,13 +222,33 @@ var _ = Describe("Repository", func() { err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId") Expect(err).To(HaveOccurred()) - missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) }) - Describe("MarkHeadersChecked", func() { + Describe("MarkHeaderCheckedForAll", func() { + It("Marks the header checked for all provided column ids", func() { + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumns(eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + headerID := missingHeaders[0].Id + err = omniHeaderRepo.MarkHeaderCheckedForAll(headerID, eventIDs) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(2)) + }) + }) + + Describe("MarkHeadersCheckedForAll", func() { It("Marks the headers checked for all provided column ids", func() { addHeaders(coreHeaderRepo) methodIDs := []string{ @@ -167,7 +266,7 @@ var _ = Describe("Repository", func() { Expect(len(missingHeaders)).To(Equal(3)) } - err := omniHeaderRepo.MarkHeadersChecked(missingHeaders, methodIDs) + err := omniHeaderRepo.MarkHeadersCheckedForAll(missingHeaders, methodIDs) Expect(err).ToNot(HaveOccurred()) for _, id := range methodIDs { missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) @@ -180,16 +279,6 @@ var _ = Describe("Repository", func() { Describe("MissingMethodsCheckedEventsIntersection", func() { It("Returns headers that have been checked for all the provided events but have not been checked for all the provided methods", func() { addHeaders(coreHeaderRepo) - eventIDs := []string{ - eventID, - "eventName_contractAddr2", - "eventName_contractAddr3", - } - methodIDs := []string{ - "methodName_contractAddr", - "methodName_contractAddr2", - "methodName_contractAddr3", - } for i, id := range eventIDs { err := omniHeaderRepo.AddCheckColumn(id) Expect(err).ToNot(HaveOccurred()) @@ -197,7 +286,7 @@ var _ = Describe("Repository", func() { Expect(err).ToNot(HaveOccurred()) } - missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0]) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) diff --git a/pkg/omni/light/transformer/transformer.go b/pkg/omni/light/transformer/transformer.go index 9ad174a7..a742030a 100644 --- a/pkg/omni/light/transformer/transformer.go +++ b/pkg/omni/light/transformer/transformer.go @@ -29,7 +29,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository" "github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller" srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" @@ -182,59 +181,63 @@ func (tr *transformer) Execute() error { for _, con := range tr.Contracts { // Update converter with current contract tr.Converter.Update(con) - // This is so that same header slice is retrieved for each event iteration - last, err := tr.BlockRetriever.RetrieveMostRecentBlock() - if err != nil { - return err - } - // Iterate through events - eventIds := make([]string, 0, len(con.Events)) - for _, event := range con.Events { - // Filter using the event signature - topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}} + // Iterate through events + eLen := len(con.Events) + eventIds := make([]string, 0, eLen) + eventTopics := make([][]common.Hash, 0, eLen) + for _, event := range con.Events { + // Append this event sig to the filters + eventTopics = append(eventTopics, []common.Hash{event.Sig()}) // Generate eventID and use it to create a checked_header column if one does not already exist eventId := strings.ToLower(event.Name + "_" + con.Address) - eventIds = append(eventIds, eventId) err := tr.HeaderRepository.AddCheckColumn(eventId) if err != nil { return err } + // Keep track of this event id + eventIds = append(eventIds, eventId) + } - // Find unchecked headers for this event - missingHeaders, err := tr.HeaderRepository.MissingHeaders(con.StartingBlock, last, eventId) + // Find unchecked headers for all events + missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(con.StartingBlock, con.LastBlock, eventIds) + if err != nil { + return err + } + // Iterate over headers + for _, header := range missingHeaders { + // And fetch all event logs for this contract using this header + logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, eventTopics, header) if err != nil { return err } - // Iterate over headers - for _, header := range missingHeaders { - // And fetch event logs using the header, contract address, and topics filter - logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, topics, header) + // Mark the header checked for all of these eventIDs and continue to next iteration if no logs are found + if len(logs) < 1 { + err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds) if err != nil { return err } + continue + } - // Mark the header checked for this eventID and continue to next iteration if no logs are found + // Convert logs into batches of log mappings (event => []types.Log) + convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id) + if err != nil { + return err + } + for name, logs := range convertedLogs { if len(logs) < 1 { + eventId := strings.ToLower(name + "_" + con.Address) err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) if err != nil { return err } continue } - - // Convert logs into custom type - convertedLogs, err := tr.Converter.Convert(logs, event, header.Id) - if err != nil { - return err - } - if len(convertedLogs) < 1 { - continue - } - // If logs aren't empty, persist them - err = tr.EventRepository.PersistLogs(convertedLogs, event, con.Address, con.Name) + // Headers are marked checked in the persistlogs transactions + err = tr.EventRepository.PersistLogs(logs, con.Events[name], con.Address, con.Name) if err != nil { return err } @@ -257,10 +260,11 @@ func (tr *transformer) Execute() error { } // Retrieve headers that have been checked for all events but haven not been checked for the methods - missingHeaders, err := tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, last, methodIds, eventIds) + missingHeaders, err = tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, con.LastBlock, methodIds, eventIds) if err != nil { return err } + // Poll over the missing headers for _, header := range missingHeaders { err = tr.Poller.PollContractAt(*con, header.BlockNumber) @@ -268,8 +272,9 @@ func (tr *transformer) Execute() error { return err } } + // Mark those headers checked for the methods - err = tr.HeaderRepository.MarkHeadersChecked(missingHeaders, methodIds) + err = tr.HeaderRepository.MarkHeadersCheckedForAll(missingHeaders, methodIds) if err != nil { return err } diff --git a/pkg/omni/light/transformer/transformer_test.go b/pkg/omni/light/transformer/transformer_test.go index 6372e6e8..9c027576 100644 --- a/pkg/omni/light/transformer/transformer_test.go +++ b/pkg/omni/light/transformer/transformer_test.go @@ -333,7 +333,7 @@ var _ = Describe("Transformer", func() { Expect(err).To(HaveOccurred()) }) - It("It does not perist events if they do not pass the emitted arg filter", func() { + It("It does not persist events if they do not pass the emitted arg filter", func() { t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) t.SetMethods(constants.EnsContractAddress, nil) diff --git a/pkg/omni/shared/contract/contract.go b/pkg/omni/shared/contract/contract.go index c13955e2..d73997d5 100644 --- a/pkg/omni/shared/contract/contract.go +++ b/pkg/omni/shared/contract/contract.go @@ -24,7 +24,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) @@ -39,7 +38,7 @@ type Contract struct { ParsedAbi abi.ABI // Parsed abi Events map[string]types.Event // Map of events to their names Methods map[string]types.Method // Map of methods to their names - Filters map[string]filters.LogFilter // Map of event filters to their names; used only for full sync watcher + Filters map[string]filters.LogFilter // Map of event filters to their event names; used only for full sync watcher FilterArgs map[string]bool // User-input list of values to filter event logs for MethodArgs map[string]bool // User-input list of values to limit method polling to EmittedAddrs map[interface{}]bool // List of all unique addresses collected from converted event logs @@ -79,11 +78,11 @@ func (c *Contract) GenerateFilters() error { for name, event := range c.Events { c.Filters[name] = filters.LogFilter{ - Name: name, + Name: event.Name, FromBlock: c.StartingBlock, ToBlock: -1, Address: c.Address, - Topics: core.Topics{helpers.GenerateSignature(event.Sig())}, // move generate signatrue to pkg + Topics: core.Topics{event.Sig().Hex()}, } } // If no filters were generated, throw an error (no point in continuing with this contract) diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go index 48aef62b..c63f3a5d 100644 --- a/pkg/omni/shared/helpers/test_helpers/database.go +++ b/pkg/omni/shared/helpers/test_helpers/database.go @@ -255,9 +255,18 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr2`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr3`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS balanceof_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/omni/shared/parser/parser.go b/pkg/omni/shared/parser/parser.go index 0ca4e7e0..a637c388 100644 --- a/pkg/omni/shared/parser/parser.go +++ b/pkg/omni/shared/parser/parser.go @@ -102,8 +102,7 @@ func (p *parser) GetSelectMethods(wanted []string) map[string]types.Method { for _, m := range p.parsedAbi.Methods { if okInputTypes(m, wanted) { - wantedMethod := types.NewMethod(m) - addrMethods[wantedMethod.Name] = wantedMethod + addrMethods[m.Name] = types.NewMethod(m) } } diff --git a/pkg/omni/shared/types/event.go b/pkg/omni/shared/types/event.go index f5b5f3d4..8d801d15 100644 --- a/pkg/omni/shared/types/event.go +++ b/pkg/omni/shared/types/event.go @@ -21,6 +21,8 @@ import ( "strings" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" ) type Event struct { @@ -83,12 +85,12 @@ func NewEvent(e abi.Event) Event { } } -func (e Event) Sig() string { +func (e Event) Sig() common.Hash { types := make([]string, len(e.Fields)) for i, input := range e.Fields { types[i] = input.Type.String() } - return fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ",")) + return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ","))))) } diff --git a/pkg/omni/shared/types/method.go b/pkg/omni/shared/types/method.go index bced188a..fbd3e25a 100644 --- a/pkg/omni/shared/types/method.go +++ b/pkg/omni/shared/types/method.go @@ -21,6 +21,8 @@ import ( "strings" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" ) type Method struct { @@ -97,7 +99,7 @@ func NewMethod(m abi.Method) Method { } } -func (m Method) Sig() string { +func (m Method) Sig() common.Hash { types := make([]string, len(m.Args)) i := 0 for _, arg := range m.Args { @@ -105,5 +107,5 @@ func (m Method) Sig() string { i++ } - return fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ",")) + return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ","))))) }