fetch headers and logs in batches across all contracts and events

This commit is contained in:
Ian Norden 2018-12-18 13:00:26 -06:00
parent d188329661
commit 456c735087
4 changed files with 254 additions and 91 deletions

View File

@ -24,39 +24,38 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
)
type LogFetcher interface {
FetchLogs(contractAddresses []string, topics [][]common.Hash, header core.Header) ([]types.Log, error)
type Fetcher interface {
FetchLogs(contractAddresses []string, topics []common.Hash, missingHeader core.Header) ([]types.Log, error)
}
type SettableLogFetcher interface {
LogFetcher
SetBC(bc core.BlockChain)
}
type Fetcher struct {
type fetcher struct {
blockChain core.BlockChain
}
func (fetcher *Fetcher) SetBC(bc core.BlockChain) {
fetcher.blockChain = bc
}
func NewFetcher(blockchain core.BlockChain) Fetcher {
return Fetcher{
func NewFetcher(blockchain core.BlockChain) *fetcher {
return &fetcher{
blockChain: blockchain,
}
}
func (fetcher Fetcher) FetchLogs(contractAddresses []string, topics [][]common.Hash, header core.Header) ([]types.Log, error) {
// Checks all topic0s, on all addresses, fetching matching logs for the given header
func (fetcher *fetcher) FetchLogs(contractAddresses []string, topic0s []common.Hash, header core.Header) ([]types.Log, error) {
addresses := hexStringsToAddresses(contractAddresses)
blockHash := common.HexToHash(header.Hash)
query := ethereum.FilterQuery{
BlockHash: &blockHash,
Addresses: addresses,
Topics: topics,
// Search for _any_ of the topics in topic0 position; see docs on `FilterQuery`
Topics: [][]common.Hash{topic0s},
}
return fetcher.blockChain.GetEthLogsWithCustomQuery(query)
logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query)
if err != nil {
// TODO review aggregate fetching error handling
return []types.Log{}, err
}
return logs, nil
}
func hexStringsToAddresses(hexStrings []string) []common.Address {

View File

@ -37,7 +37,7 @@ var _ = Describe("Fetcher", func() {
addresses := []string{"0xfakeAddress", "0xanotherFakeAddress"}
topicZeros := [][]common.Hash{{common.BytesToHash([]byte{1, 2, 3, 4, 5})}}
_, err := fetcher.FetchLogs(addresses, topicZeros, header)
_, err := fetcher.FetchLogs(addresses, []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})}, header)
address1 := common.HexToAddress("0xfakeAddress")
address2 := common.HexToAddress("0xanotherFakeAddress")
@ -57,7 +57,7 @@ var _ = Describe("Fetcher", func() {
blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError)
fetcher := fetcher.NewFetcher(blockChain)
_, err := fetcher.FetchLogs([]string{}, [][]common.Hash{}, core.Header{})
_, err := fetcher.FetchLogs([]string{}, []common.Hash{}, core.Header{})
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))

View File

@ -21,6 +21,7 @@ import (
"strings"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -177,43 +178,61 @@ func (tr *transformer) Execute() error {
if len(tr.Contracts) == 0 {
return errors.New("error: transformer has no initialized contracts")
}
// Iterate through all initialized contracts
for _, con := range tr.Contracts {
// Update converter with current contract
tr.Converter.Update(con)
// Iterate through events
cLen := len(tr.Contracts)
contractAddresses := make([]string, 0, cLen) // Holds all contract addresses, for batch fetching of logs
sortedIds := make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing
eventIds := make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
eventFilters := make([]common.Hash, 0) // Holds topic hashes across all contracts, for batch fetching of logs
sortedLogs := make(map[string][]gethTypes.Log) // Map to sort batch fetched logs by which contract they belong to, for post fetch processing
var start, end int64 // Hold the lowest starting block and the highest ending block
start = 100000000
end = -1
// Cycle through all contracts and extract info needed for fetching and post-processing
for _, con := range tr.Contracts {
eLen := len(con.Events)
eventIds := make([]string, 0, eLen)
eventTopics := make([][]common.Hash, 0, eLen)
sortedLogs[con.Address] = []gethTypes.Log{}
sortedIds[con.Address] = make([]string, 0, eLen)
for _, event := range con.Events {
// Append this event sig to the filters
eventTopics = append(eventTopics, []common.Hash{event.Sig()})
// Generate eventID and use it to create a checked_header column if one does not already exist
eventId := strings.ToLower(event.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil {
return err
}
// Keep track of this event id
// Keep track of this event id; sorted and unsorted
sortedIds[con.Address] = append(sortedIds[con.Address], eventId)
eventIds = append(eventIds, eventId)
// Append this event sig to the filters
eventFilters = append(eventFilters, event.Sig())
}
contractAddresses = append(contractAddresses, con.Address)
// Update start to the lowest block and end to the highest block
if con.StartingBlock < start {
start = con.StartingBlock
}
if con.LastBlock > end {
end = con.LastBlock
}
}
// Find unchecked headers for all events
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(con.StartingBlock, con.LastBlock, eventIds)
// Find unchecked headers for all events across all contracts
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(start, end, eventIds)
if err != nil {
return err
}
// Iterate over headers
for _, header := range missingHeaders {
// And fetch all event logs for this contract using this header
logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, eventTopics, header)
// And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(contractAddresses, eventFilters, header)
if err != nil {
return err
}
// Mark the header checked for all of these eventIDs and continue to next iteration if no logs are found
if len(logs) < 1 {
if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
if err != nil {
return err
@ -221,14 +240,28 @@ func (tr *transformer) Execute() error {
continue
}
// Sort logs by the contract they belong to
for _, log := range allLogs {
sortedLogs[log.Address.Hex()] = append(sortedLogs[log.Address.Hex()], log)
}
// Process logs for each contract
for conAddr, logs := range sortedLogs {
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)
// Convert logs into batches of log mappings (event => []types.Log)
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
}
for name, logs := range convertedLogs {
// Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs {
// If logs are empty, mark checked
if len(logs) < 1 {
eventId := strings.ToLower(name + "_" + con.Address)
eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
@ -237,18 +270,18 @@ func (tr *transformer) Execute() error {
}
// If logs aren't empty, persist them
// Headers are marked checked in the persistlogs transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[name], con.Address, con.Name)
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
}
}
}
// Skip method polling processes if no methods are specified
if len(con.Methods) == 0 {
continue
}
// Create checked_headers columns for each method id
// Create checked_headers columns for each method id and generate list of all method ids
methodIds := make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
@ -259,8 +292,8 @@ func (tr *transformer) Execute() error {
methodIds = append(methodIds, methodId)
}
// Retrieve headers that have been checked for all events but haven not been checked for the methods
missingHeaders, err = tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, con.LastBlock, methodIds, eventIds)
// Retrieve headers that have been checked for all of this contract's events but haven not been checked for this contract's methods
missingHeaders, err = tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, con.LastBlock, methodIds, sortedIds[conAddr])
if err != nil {
return err
}
@ -278,7 +311,7 @@ func (tr *transformer) Execute() error {
if err != nil {
return err
}
}
}
return nil

View File

@ -37,7 +37,7 @@ var _ = Describe("Transformer", func() {
var err error
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
var headerID int64
var headerID, headerID2 int64
BeforeEach(func() {
db, blockChain = test_helpers.SetupDBandBC()
@ -147,7 +147,7 @@ var _ = Describe("Transformer", func() {
})
})
Describe("Execute", func() {
Describe("Execute- against TrueUSD contract", func() {
BeforeEach(func() {
header1, err := blockChain.GetHeaderByNumber(6791668)
Expect(err).ToNot(HaveOccurred())
@ -165,16 +165,14 @@ var _ = Describe("Transformer", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
log := test_helpers.LightTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&log)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(log.HeaderID).To(Equal(headerID))
Expect(log.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
@ -188,12 +186,12 @@ var _ = Describe("Transformer", func() {
t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
c, ok := t.Contracts[constants.TusdContractAddress]
Expect(ok).To(Equal(true))
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(len(c.EmittedAddrs)).To(Equal(4))
Expect(len(c.EmittedHashes)).To(Equal(0))
b, ok := c.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")]
Expect(ok).To(Equal(true))
@ -203,6 +201,14 @@ var _ = Describe("Transformer", func() {
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = c.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = c.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = c.EmittedAddrs[common.HexToAddress("0x09BbBBE21a5975cAc061D82f7b843b1234567890")]
Expect(ok).To(Equal(false))
@ -222,7 +228,6 @@ var _ = Describe("Transformer", func() {
t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
@ -240,7 +245,6 @@ var _ = Describe("Transformer", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Execute()
Expect(err).To(HaveOccurred())
})
@ -264,16 +268,14 @@ var _ = Describe("Transformer", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, nil)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
log := test_helpers.LightNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(log.HeaderID).To(Equal(headerID))
Expect(log.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"))
@ -287,13 +289,12 @@ var _ = Describe("Transformer", func() {
t.SetMethods(constants.EnsContractAddress, []string{"owner"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
c, ok := t.Contracts[constants.EnsContractAddress]
Expect(ok).To(Equal(true))
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(len(c.EmittedHashes)).To(Equal(2))
Expect(len(c.EmittedAddrs)).To(Equal(0))
b, ok := c.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")]
Expect(ok).To(Equal(true))
@ -308,13 +309,12 @@ var _ = Describe("Transformer", func() {
Expect(ok).To(Equal(false))
})
It("Polls given methods using generated token holder address", func() {
It("Polls given method using list of collected hashes", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, []string{"owner"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
@ -338,10 +338,8 @@ var _ = Describe("Transformer", func() {
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, nil)
t.SetEventArgs(constants.EnsContractAddress, []string{"fake_filter_value"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
@ -357,7 +355,6 @@ var _ = Describe("Transformer", func() {
t.SetMethodArgs(constants.EnsContractAddress, []string{"0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
@ -371,4 +368,138 @@ var _ = Describe("Transformer", func() {
Expect(err).To(HaveOccurred())
})
})
Describe("Execute- against both ENS and TrueUSD", func() {
BeforeEach(func() {
header1, err := blockChain.GetHeaderByNumber(6791668)
Expect(err).ToNot(HaveOccurred())
header2, err := blockChain.GetHeaderByNumber(6791669)
Expect(err).ToNot(HaveOccurred())
header3, err := blockChain.GetHeaderByNumber(6791670)
Expect(err).ToNot(HaveOccurred())
header4, err := blockChain.GetHeaderByNumber(6885695)
Expect(err).ToNot(HaveOccurred())
header5, err := blockChain.GetHeaderByNumber(6885696)
Expect(err).ToNot(HaveOccurred())
header6, err := blockChain.GetHeaderByNumber(6885697)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header1)
headerID, err = headerRepository.CreateOrUpdateHeader(header2)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header3)
headerRepository.CreateOrUpdateHeader(header4)
headerID2, err = headerRepository.CreateOrUpdateHeader(header5)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header6)
})
It("Transforms watched contract data into custom repositories", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, nil)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
newOwnerLog := test_helpers.LightNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&newOwnerLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(newOwnerLog.HeaderID).To(Equal(headerID2))
Expect(newOwnerLog.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"))
Expect(newOwnerLog.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047"))
Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef"))
transferLog := test_helpers.LightTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&transferLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(transferLog.HeaderID).To(Equal(headerID))
Expect(transferLog.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
Expect(transferLog.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"))
Expect(transferLog.Value).To(Equal("9998940000000000000000"))
})
It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, []string{"owner"})
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
ens, ok := t.Contracts[constants.EnsContractAddress]
Expect(ok).To(Equal(true))
tusd, ok := t.Contracts[constants.TusdContractAddress]
Expect(ok).To(Equal(true))
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(len(ens.EmittedHashes)).To(Equal(2))
Expect(len(ens.EmittedAddrs)).To(Equal(0))
Expect(len(tusd.EmittedAddrs)).To(Equal(4))
Expect(len(tusd.EmittedHashes)).To(Equal(0))
b, ok := ens.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = ens.EmittedHashes[common.HexToHash("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
})
It("Polls given methods for each contract, using list of collected values", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"})
t.SetMethods(constants.EnsContractAddress, []string{"owner"})
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
owner := test_helpers.Owner{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner)
Expect(err).ToNot(HaveOccurred())
Expect(owner.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef"))
Expect(owner.TokenName).To(Equal(""))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner)
Expect(err).ToNot(HaveOccurred())
Expect(owner.Address).To(Equal("0x0000000000000000000000000000000000000000"))
Expect(owner.TokenName).To(Equal(""))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner)
Expect(err).To(HaveOccurred())
bal := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&bal)
Expect(err).ToNot(HaveOccurred())
Expect(bal.Balance).To(Equal("55849938025000000000000"))
Expect(bal.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&bal)
Expect(err).To(HaveOccurred())
})
})
})