fetch event logs in batches- all events for a contract address at a given block height are fetched together, different contract events still fetched separately

This commit is contained in:
Ian Norden 2018-12-18 11:31:21 -06:00
parent e390a97502
commit d188329661
11 changed files with 354 additions and 82 deletions

View File

@ -195,8 +195,8 @@ func (tr transformer) Execute() error {
tr.Update(con) tr.Update(con)
// Iterate through contract filters and get watched event logs // Iterate through contract filters and get watched event logs
for eventName := range con.Filters { for eventSig, filter := range con.Filters {
watchedEvents, err := tr.GetWatchedEvents(eventName) watchedEvents, err := tr.GetWatchedEvents(filter.Name)
if err != nil { if err != nil {
return err return err
} }
@ -204,7 +204,7 @@ func (tr transformer) Execute() error {
// Iterate over watched event logs // Iterate over watched event logs
for _, we := range watchedEvents { for _, we := range watchedEvents {
// Convert them to our custom log type // Convert them to our custom log type
cstm, err := tr.Converter.Convert(*we, con.Events[eventName]) cstm, err := tr.Converter.Convert(*we, con.Events[eventSig])
if err != nil { if err != nil {
return err return err
} }
@ -214,7 +214,7 @@ func (tr transformer) Execute() error {
// If log is not empty, immediately persist in repo // If log is not empty, immediately persist in repo
// Run this in seperate goroutine? // Run this in seperate goroutine?
err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventName], con.Address, con.Name) err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name)
if err != nil { if err != nil {
return err return err
} }

View File

@ -34,6 +34,7 @@ import (
type Converter interface { type Converter interface {
Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error)
ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error)
Update(info *contract.Contract) Update(info *contract.Contract)
} }
@ -129,3 +130,93 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in
return returnLogs, nil return returnLogs, nil
} }
// Convert the given watched event logs into types.Logs
func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
eventsToLogs := make(map[string][]types.Log)
for _, event := range events {
eventsToLogs[event.Name] = make([]types.Log, 0, len(logs))
// Iterate through all event logs
for _, log := range logs {
// If the log is of this event type, process it as such
if event.Sig() == log.Topics[0] {
values := make(map[string]interface{})
for _, field := range event.Fields {
var i interface{}
values[field.Name] = i
}
err := contract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}
strValues := make(map[string]string, len(values))
seenAddrs := make([]interface{}, 0, len(values))
seenHashes := make([]interface{}, 0, len(values))
for fieldName, input := range values {
// Postgres cannot handle custom types, resolve everything to strings
switch input.(type) {
case *big.Int:
b := input.(*big.Int)
strValues[fieldName] = b.String()
case common.Address:
a := input.(common.Address)
strValues[fieldName] = a.String()
seenAddrs = append(seenAddrs, a)
case common.Hash:
h := input.(common.Hash)
strValues[fieldName] = h.String()
seenHashes = append(seenHashes, h)
case string:
strValues[fieldName] = input.(string)
case bool:
strValues[fieldName] = strconv.FormatBool(input.(bool))
case []byte:
b := input.([]byte)
strValues[fieldName] = hexutil.Encode(b)
if len(b) == 32 {
seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName]))
}
case byte:
b := input.(byte)
strValues[fieldName] = string(b)
default:
return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input))
}
}
// Only hold onto logs that pass our address filter, if any
if c.ContractInfo.PassesEventFilter(strValues) {
raw, err := json.Marshal(log)
if err != nil {
return nil, err
}
eventsToLogs[event.Name] = append(eventsToLogs[event.Name], types.Log{
LogIndex: log.Index,
Values: strValues,
Raw: raw,
TransactionIndex: log.TxIndex,
Id: headerID,
})
// Cache emitted values if their caching is turned on
if c.ContractInfo.EmittedAddrs != nil {
c.ContractInfo.AddEmittedAddr(seenAddrs...)
}
if c.ContractInfo.EmittedHashes != nil {
c.ContractInfo.AddEmittedHash(seenHashes...)
}
}
}
}
}
return eventsToLogs, nil
}
func (c *converter) handleDSNote() {
}

View File

@ -29,11 +29,14 @@ import (
const columnCacheSize = 1000 const columnCacheSize = 1000
type HeaderRepository interface { type HeaderRepository interface {
AddCheckColumn(eventID string) error AddCheckColumn(id string) error
AddCheckColumns(ids []string) error
MarkHeaderChecked(headerID int64, eventID string) error MarkHeaderChecked(headerID int64, eventID string) error
MarkHeadersChecked(headers []core.Header, ids []string) error MarkHeaderCheckedForAll(headerID int64, ids []string) error
MarkHeadersCheckedForAll(headers []core.Header, ids []string) error
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error) MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error)
MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error)
MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error)
CheckCache(key string) (interface{}, bool) CheckCache(key string) (interface{}, bool)
} }
@ -70,6 +73,29 @@ func (r *headerRepository) AddCheckColumn(id string) error {
return nil return nil
} }
func (r *headerRepository) AddCheckColumns(ids []string) error {
var err error
baseQuery := "ALTER TABLE public.checked_headers"
input := make([]string, 0, len(ids))
for _, id := range ids {
_, ok := r.columns.Get(id)
if !ok {
baseQuery += " ADD COLUMN IF NOT EXISTS " + id + " BOOLEAN NOT NULL DEFAULT FALSE,"
input = append(input, id)
}
}
if len(input) > 0 {
_, err = r.db.Exec(baseQuery[:len(baseQuery)-1])
if err == nil {
for _, id := range input {
r.columns.Add(id, true)
}
}
}
return err
}
func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`) _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`)
VALUES ($1, $2) VALUES ($1, $2)
@ -79,7 +105,26 @@ func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
return err return err
} }
func (r *headerRepository) MarkHeadersChecked(headers []core.Header, ids []string) error { func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error {
pgStr := "INSERT INTO public.checked_headers (header_id, "
for _, id := range ids {
pgStr += id + ", "
}
pgStr = pgStr[:len(pgStr)-2] + ") VALUES ($1, "
for i := 0; i < len(ids); i++ {
pgStr += "true, "
}
pgStr = pgStr[:len(pgStr)-2] + ") ON CONFLICT (header_id) DO UPDATE SET "
for _, id := range ids {
pgStr += fmt.Sprintf("%s = true, ", id)
}
pgStr = pgStr[:len(pgStr)-2]
_, err := r.db.Exec(pgStr, headerID)
return err
}
func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error {
tx, err := r.db.Begin() tx, err := r.db.Begin()
if err != nil { if err != nil {
return err return err
@ -136,10 +181,41 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber
return result, err return result, err
} }
func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) {
var result []core.Header
var query string
var err error
baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL`
for _, id := range ids {
baseQuery += ` OR ` + id + ` IS FALSE`
}
if endingBlockNumber == -1 {
endStr := `) AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `) AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return result, err
}
func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) { func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) {
var result []core.Header var result []core.Header
var query string var query string
var err error var err error
baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id IS NOT NULL` WHERE (header_id IS NOT NULL`

View File

@ -34,13 +34,21 @@ var _ = Describe("Repository", func() {
var db *postgres.DB var db *postgres.DB
var omniHeaderRepo repository.HeaderRepository // omni/light header repository var omniHeaderRepo repository.HeaderRepository // omni/light header repository
var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository
var eventID, query string var eventIDs = []string{
"eventName_contractAddr",
"eventName_contractAddr2",
"eventName_contractAddr3",
}
var methodIDs = []string{
"methodName_contractAddr",
"methodName_contractAddr2",
"methodName_contractAddr3",
}
BeforeEach(func() { BeforeEach(func() {
db, _ = test_helpers.SetupDBandBC() db, _ = test_helpers.SetupDBandBC()
omniHeaderRepo = repository.NewHeaderRepository(db) omniHeaderRepo = repository.NewHeaderRepository(db)
coreHeaderRepo = repositories.NewHeaderRepository(db) coreHeaderRepo = repositories.NewHeaderRepository(db)
eventID = "eventName_contractAddr"
}) })
AfterEach(func() { AfterEach(func() {
@ -49,11 +57,11 @@ var _ = Describe("Repository", func() {
Describe("AddCheckColumn", func() { Describe("AddCheckColumn", func() {
It("Creates a column for the given eventID to mark if the header has been checked for that event", func() { It("Creates a column for the given eventID to mark if the header has been checked for that event", func() {
query = fmt.Sprintf("SELECT %s FROM checked_headers", eventID) query := fmt.Sprintf("SELECT %s FROM checked_headers", eventIDs[0])
_, err := db.Exec(query) _, err := db.Exec(query)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
err = omniHeaderRepo.AddCheckColumn(eventID) err = omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
_, err = db.Exec(query) _, err = db.Exec(query)
@ -61,35 +69,68 @@ var _ = Describe("Repository", func() {
}) })
It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() { It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := omniHeaderRepo.CheckCache(eventID) _, ok := omniHeaderRepo.CheckCache(eventIDs[0])
Expect(ok).To(Equal(false)) Expect(ok).To(Equal(false))
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
v, ok := omniHeaderRepo.CheckCache(eventID) v, ok := omniHeaderRepo.CheckCache(eventIDs[0])
Expect(ok).To(Equal(true)) Expect(ok).To(Equal(true))
Expect(v).To(Equal(true)) Expect(v).To(Equal(true))
}) })
}) })
Describe("AddCheckColumns", func() {
It("Creates a column for the given eventIDs to mark if the header has been checked for those events", func() {
for _, id := range eventIDs {
_, err := db.Exec(fmt.Sprintf("SELECT %s FROM checked_headers", id))
Expect(err).To(HaveOccurred())
}
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
for _, id := range eventIDs {
_, err := db.Exec(fmt.Sprintf("SELECT %s FROM checked_headers", id))
Expect(err).ToNot(HaveOccurred())
}
})
It("Caches columns it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
for _, id := range eventIDs {
_, ok := omniHeaderRepo.CheckCache(id)
Expect(ok).To(Equal(false))
}
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
for _, id := range eventIDs {
v, ok := omniHeaderRepo.CheckCache(id)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
}
})
})
Describe("MissingHeaders", func() { Describe("MissingHeaders", func() {
It("Returns all unchecked headers for the given eventID", func() { It("Returns all unchecked headers for the given eventID", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
}) })
It("Returns unchecked headers in ascending order", func() { It("Returns unchecked headers in ascending order", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
@ -103,7 +144,7 @@ var _ = Describe("Repository", func() {
It("Fails if eventID does not yet exist in check_headers table", func() { It("Fails if eventID does not yet exist in check_headers table", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
_, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId") _, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId")
@ -111,31 +152,69 @@ var _ = Describe("Repository", func() {
}) })
}) })
Describe("MissingHeadersForAll", func() { // HERE
It("Returns all headers that have not been checked for all of the ids provided", func() {
addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[1])
Expect(err).ToNot(HaveOccurred())
err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[2])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
It("Fails if one of the eventIDs does not yet exist in check_headers table", func() {
addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
badEventIDs := append(eventIDs, "notEventId")
_, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, badEventIDs)
Expect(err).To(HaveOccurred())
})
})
Describe("MarkHeaderChecked", func() { Describe("MarkHeaderChecked", func() {
It("Marks the header checked for the given eventID", func() { It("Marks the header checked for the given eventID", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
headerID := missingHeaders[0].Id headerID := missingHeaders[0].Id
err = omniHeaderRepo.MarkHeaderChecked(headerID, eventID) err = omniHeaderRepo.MarkHeaderChecked(headerID, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2)) Expect(len(missingHeaders)).To(Equal(2))
}) })
It("Fails if eventID does not yet exist in check_headers table", func() { It("Fails if eventID does not yet exist in check_headers table", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventID) err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
@ -143,13 +222,33 @@ var _ = Describe("Repository", func() {
err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId") err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId")
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
}) })
}) })
Describe("MarkHeadersChecked", func() { Describe("MarkHeaderCheckedForAll", func() {
It("Marks the header checked for all provided column ids", func() {
addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
headerID := missingHeaders[0].Id
err = omniHeaderRepo.MarkHeaderCheckedForAll(headerID, eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
})
Describe("MarkHeadersCheckedForAll", func() {
It("Marks the headers checked for all provided column ids", func() { It("Marks the headers checked for all provided column ids", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
methodIDs := []string{ methodIDs := []string{
@ -167,7 +266,7 @@ var _ = Describe("Repository", func() {
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))
} }
err := omniHeaderRepo.MarkHeadersChecked(missingHeaders, methodIDs) err := omniHeaderRepo.MarkHeadersCheckedForAll(missingHeaders, methodIDs)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
for _, id := range methodIDs { for _, id := range methodIDs {
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id)
@ -180,16 +279,6 @@ var _ = Describe("Repository", func() {
Describe("MissingMethodsCheckedEventsIntersection", func() { Describe("MissingMethodsCheckedEventsIntersection", func() {
It("Returns headers that have been checked for all the provided events but have not been checked for all the provided methods", func() { It("Returns headers that have been checked for all the provided events but have not been checked for all the provided methods", func() {
addHeaders(coreHeaderRepo) addHeaders(coreHeaderRepo)
eventIDs := []string{
eventID,
"eventName_contractAddr2",
"eventName_contractAddr3",
}
methodIDs := []string{
"methodName_contractAddr",
"methodName_contractAddr2",
"methodName_contractAddr3",
}
for i, id := range eventIDs { for i, id := range eventIDs {
err := omniHeaderRepo.AddCheckColumn(id) err := omniHeaderRepo.AddCheckColumn(id)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -197,7 +286,7 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
} }
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3)) Expect(len(missingHeaders)).To(Equal(3))

View File

@ -29,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/light/repository" "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever" "github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
@ -182,59 +181,63 @@ func (tr *transformer) Execute() error {
for _, con := range tr.Contracts { for _, con := range tr.Contracts {
// Update converter with current contract // Update converter with current contract
tr.Converter.Update(con) tr.Converter.Update(con)
// This is so that same header slice is retrieved for each event iteration
last, err := tr.BlockRetriever.RetrieveMostRecentBlock()
if err != nil {
return err
}
// Iterate through events
eventIds := make([]string, 0, len(con.Events))
for _, event := range con.Events {
// Filter using the event signature
topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}}
// Iterate through events
eLen := len(con.Events)
eventIds := make([]string, 0, eLen)
eventTopics := make([][]common.Hash, 0, eLen)
for _, event := range con.Events {
// Append this event sig to the filters
eventTopics = append(eventTopics, []common.Hash{event.Sig()})
// Generate eventID and use it to create a checked_header column if one does not already exist // Generate eventID and use it to create a checked_header column if one does not already exist
eventId := strings.ToLower(event.Name + "_" + con.Address) eventId := strings.ToLower(event.Name + "_" + con.Address)
eventIds = append(eventIds, eventId)
err := tr.HeaderRepository.AddCheckColumn(eventId) err := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil { if err != nil {
return err return err
} }
// Keep track of this event id
eventIds = append(eventIds, eventId)
}
// Find unchecked headers for this event // Find unchecked headers for all events
missingHeaders, err := tr.HeaderRepository.MissingHeaders(con.StartingBlock, last, eventId) missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(con.StartingBlock, con.LastBlock, eventIds)
if err != nil {
return err
}
// Iterate over headers
for _, header := range missingHeaders {
// And fetch all event logs for this contract using this header
logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, eventTopics, header)
if err != nil { if err != nil {
return err return err
} }
// Iterate over headers // Mark the header checked for all of these eventIDs and continue to next iteration if no logs are found
for _, header := range missingHeaders { if len(logs) < 1 {
// And fetch event logs using the header, contract address, and topics filter err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, topics, header)
if err != nil { if err != nil {
return err return err
} }
continue
}
// Mark the header checked for this eventID and continue to next iteration if no logs are found // Convert logs into batches of log mappings (event => []types.Log)
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
}
for name, logs := range convertedLogs {
if len(logs) < 1 { if len(logs) < 1 {
eventId := strings.ToLower(name + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil { if err != nil {
return err return err
} }
continue continue
} }
// Convert logs into custom type
convertedLogs, err := tr.Converter.Convert(logs, event, header.Id)
if err != nil {
return err
}
if len(convertedLogs) < 1 {
continue
}
// If logs aren't empty, persist them // If logs aren't empty, persist them
err = tr.EventRepository.PersistLogs(convertedLogs, event, con.Address, con.Name) // Headers are marked checked in the persistlogs transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[name], con.Address, con.Name)
if err != nil { if err != nil {
return err return err
} }
@ -257,10 +260,11 @@ func (tr *transformer) Execute() error {
} }
// Retrieve headers that have been checked for all events but haven not been checked for the methods // Retrieve headers that have been checked for all events but haven not been checked for the methods
missingHeaders, err := tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, last, methodIds, eventIds) missingHeaders, err = tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, con.LastBlock, methodIds, eventIds)
if err != nil { if err != nil {
return err return err
} }
// Poll over the missing headers // Poll over the missing headers
for _, header := range missingHeaders { for _, header := range missingHeaders {
err = tr.Poller.PollContractAt(*con, header.BlockNumber) err = tr.Poller.PollContractAt(*con, header.BlockNumber)
@ -268,8 +272,9 @@ func (tr *transformer) Execute() error {
return err return err
} }
} }
// Mark those headers checked for the methods // Mark those headers checked for the methods
err = tr.HeaderRepository.MarkHeadersChecked(missingHeaders, methodIds) err = tr.HeaderRepository.MarkHeadersCheckedForAll(missingHeaders, methodIds)
if err != nil { if err != nil {
return err return err
} }

View File

@ -333,7 +333,7 @@ var _ = Describe("Transformer", func() {
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
It("It does not perist events if they do not pass the emitted arg filter", func() { It("It does not persist events if they do not pass the emitted arg filter", func() {
t := transformer.NewTransformer("", blockChain, db) t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, nil) t.SetMethods(constants.EnsContractAddress, nil)

View File

@ -24,7 +24,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
) )
@ -39,7 +38,7 @@ type Contract struct {
ParsedAbi abi.ABI // Parsed abi ParsedAbi abi.ABI // Parsed abi
Events map[string]types.Event // Map of events to their names Events map[string]types.Event // Map of events to their names
Methods map[string]types.Method // Map of methods to their names Methods map[string]types.Method // Map of methods to their names
Filters map[string]filters.LogFilter // Map of event filters to their names; used only for full sync watcher Filters map[string]filters.LogFilter // Map of event filters to their event names; used only for full sync watcher
FilterArgs map[string]bool // User-input list of values to filter event logs for FilterArgs map[string]bool // User-input list of values to filter event logs for
MethodArgs map[string]bool // User-input list of values to limit method polling to MethodArgs map[string]bool // User-input list of values to limit method polling to
EmittedAddrs map[interface{}]bool // List of all unique addresses collected from converted event logs EmittedAddrs map[interface{}]bool // List of all unique addresses collected from converted event logs
@ -79,11 +78,11 @@ func (c *Contract) GenerateFilters() error {
for name, event := range c.Events { for name, event := range c.Events {
c.Filters[name] = filters.LogFilter{ c.Filters[name] = filters.LogFilter{
Name: name, Name: event.Name,
FromBlock: c.StartingBlock, FromBlock: c.StartingBlock,
ToBlock: -1, ToBlock: -1,
Address: c.Address, Address: c.Address,
Topics: core.Topics{helpers.GenerateSignature(event.Sig())}, // move generate signatrue to pkg Topics: core.Topics{event.Sig().Hex()},
} }
} }
// If no filters were generated, throw an error (no point in continuing with this contract) // If no filters were generated, throw an error (no point in continuing with this contract)

View File

@ -255,9 +255,18 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`) _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr2`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr3`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS balanceof_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`) _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -102,8 +102,7 @@ func (p *parser) GetSelectMethods(wanted []string) map[string]types.Method {
for _, m := range p.parsedAbi.Methods { for _, m := range p.parsedAbi.Methods {
if okInputTypes(m, wanted) { if okInputTypes(m, wanted) {
wantedMethod := types.NewMethod(m) addrMethods[m.Name] = types.NewMethod(m)
addrMethods[wantedMethod.Name] = wantedMethod
} }
} }

View File

@ -21,6 +21,8 @@ import (
"strings" "strings"
"github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
) )
type Event struct { type Event struct {
@ -83,12 +85,12 @@ func NewEvent(e abi.Event) Event {
} }
} }
func (e Event) Sig() string { func (e Event) Sig() common.Hash {
types := make([]string, len(e.Fields)) types := make([]string, len(e.Fields))
for i, input := range e.Fields { for i, input := range e.Fields {
types[i] = input.Type.String() types[i] = input.Type.String()
} }
return fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ",")) return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ",")))))
} }

View File

@ -21,6 +21,8 @@ import (
"strings" "strings"
"github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
) )
type Method struct { type Method struct {
@ -97,7 +99,7 @@ func NewMethod(m abi.Method) Method {
} }
} }
func (m Method) Sig() string { func (m Method) Sig() common.Hash {
types := make([]string, len(m.Args)) types := make([]string, len(m.Args))
i := 0 i := 0
for _, arg := range m.Args { for _, arg := range m.Args {
@ -105,5 +107,5 @@ func (m Method) Sig() string {
i++ i++
} }
return fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ",")) return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ",")))))
} }