diff --git a/README.md b/README.md index e4d7b2a6..dc80404a 100644 --- a/README.md +++ b/README.md @@ -150,8 +150,8 @@ Or if you are using a full synced vDB, change the mode to full: To watch contracts on a network other than mainnet, use the network flag: - Execute `./vulcanizedb omniWatcher --config --contract-address --network ` -To watch events within a certain block range use the starting block and ending block flags: - - Execute `./vulcanizedb omniWatcher --config --contract-address --starting-block-number <#> --ending-block-number <#>` +To watch events starting at a certain block use the starting block flag: + - Execute `./vulcanizedb omniWatcher --config --contract-address --starting-block-number <#>` To watch only specified events use the events flag: - Execute `./vulcanizedb omniWatcher --config --contract-address --events --events ` diff --git a/cmd/omniWatcher.go b/cmd/omniWatcher.go index cf17cadf..dbb9beb9 100644 --- a/cmd/omniWatcher.go +++ b/cmd/omniWatcher.go @@ -92,8 +92,8 @@ func omniWatcher() { t.SetMethods(addr, contractMethods) t.SetEventArgs(addr, eventArgs) t.SetMethodArgs(addr, methodArgs) - t.SetRange(addr, [2]int64{startingBlockNumber, endingBlockNumber}) t.SetPiping(addr, methodPiping) + t.SetStartingBlock(addr, startingBlockNumber) } err := t.Init() @@ -121,6 +121,5 @@ func init() { omniWatcherCmd.Flags().StringArrayVarP(&methodArgs, "method-args", "g", []string{}, "Argument values to limit methods to; will only call methods with emitted values that were specified here") omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") - omniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") omniWatcherCmd.Flags().BoolVarP(&methodPiping, "piping", "p", false, "Turn on method output piping: methods listed first will be polled first and their output used as input to subsequent methods") } diff --git a/examples/erc20_watcher/every_block/getter.go b/examples/erc20_watcher/every_block/getter.go index 02cb15d1..7afbf3bb 100644 --- a/examples/erc20_watcher/every_block/getter.go +++ b/examples/erc20_watcher/every_block/getter.go @@ -17,9 +17,9 @@ package every_block import ( - "github.com/vulcanize/vulcanizedb/examples/generic" "math/big" + "github.com/vulcanize/vulcanizedb/examples/generic" "github.com/vulcanize/vulcanizedb/pkg/core" ) diff --git a/pkg/omni/full/retriever/block_retriever.go b/pkg/omni/full/retriever/block_retriever.go index 2eb0337d..f4c3ac5d 100644 --- a/pkg/omni/full/retriever/block_retriever.go +++ b/pkg/omni/full/retriever/block_retriever.go @@ -54,7 +54,7 @@ func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (in &firstBlock, `SELECT number FROM blocks WHERE id = (SELECT block_id FROM receipts - WHERE contract_address = $1 + WHERE lower(contract_address) = $1 ORDER BY block_id ASC LIMIT 1)`, contractAddr, @@ -68,7 +68,7 @@ func (r *blockRetriever) retrieveFirstBlockFromLogs(contractAddr string) (int64, var firstBlock int err := r.db.Get( &firstBlock, - "SELECT block_number FROM logs WHERE address = $1 ORDER BY block_number ASC LIMIT 1", + "SELECT block_number FROM logs WHERE lower(address) = $1 ORDER BY block_number ASC LIMIT 1", contractAddr, ) diff --git a/pkg/omni/full/retriever/block_retriever_test.go b/pkg/omni/full/retriever/block_retriever_test.go index eaef15a2..caa1a1ed 100644 --- a/pkg/omni/full/retriever/block_retriever_test.go +++ b/pkg/omni/full/retriever/block_retriever_test.go @@ -19,6 +19,7 @@ package retriever_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "strings" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -97,7 +98,7 @@ var _ = Describe("Block Retriever", func() { blockRepository.CreateOrUpdateBlock(block2) blockRepository.CreateOrUpdateBlock(block3) - i, err := r.RetrieveFirstBlock(constants.TusdContractAddress) + i, err := r.RetrieveFirstBlock(strings.ToLower(constants.TusdContractAddress)) Expect(err).NotTo(HaveOccurred()) Expect(i).To(Equal(int64(2))) }) diff --git a/pkg/omni/full/transformer/transformer.go b/pkg/omni/full/transformer/transformer.go index ae948960..5c8a4337 100644 --- a/pkg/omni/full/transformer/transformer.go +++ b/pkg/omni/full/transformer/transformer.go @@ -18,6 +18,7 @@ package transformer import ( "errors" + "strings" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" @@ -58,8 +59,8 @@ type transformer struct { WatchedEvents map[string][]string // Default/empty event list means all are watched WantedMethods map[string][]string // Default/empty method list means none are polled - // Block ranges to watch contracts - ContractRanges map[string][2]int64 + // Starting block for contracts + ContractStart map[string]int64 // Lists of addresses to filter event or method data // before persisting; if empty no filter is applied @@ -87,7 +88,7 @@ func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transf EventRepository: repository.NewEventRepository(DB, types.FullSync), WatchedEvents: map[string][]string{}, WantedMethods: map[string][]string{}, - ContractRanges: map[string][2]int64{}, + ContractStart: map[string]int64{}, EventArgs: map[string][]string{}, MethodArgs: map[string][]string{}, CreateAddrList: map[string]bool{}, @@ -108,7 +109,7 @@ func (t *transformer) Init() error { return err } - // Get first block for contract and most recent block for the chain + // Get first block and most recent block number in the header repo firstBlock, err := t.BlockRetriever.RetrieveFirstBlock(contractAddr) if err != nil { return err @@ -118,12 +119,9 @@ func (t *transformer) Init() error { return err } - // Set to specified range if it falls within the contract's bounds - if firstBlock < t.ContractRanges[contractAddr][0] { - firstBlock = t.ContractRanges[contractAddr][0] - } - if lastBlock > t.ContractRanges[contractAddr][1] && t.ContractRanges[contractAddr][1] > firstBlock { - lastBlock = t.ContractRanges[contractAddr][1] + // Set to specified range if it falls within the bounds + if firstBlock < t.ContractStart[contractAddr] { + firstBlock = t.ContractStart[contractAddr] } // Get contract name if it has one @@ -190,7 +188,6 @@ func (tr transformer) Execute() error { } // Iterate through all internal contracts for _, con := range tr.Contracts { - // Update converter with current contract tr.Update(con) @@ -235,40 +232,40 @@ func (tr transformer) Execute() error { // Used to set which contract addresses and which of their events to watch func (tr *transformer) SetEvents(contractAddr string, filterSet []string) { - tr.WatchedEvents[contractAddr] = filterSet + tr.WatchedEvents[strings.ToLower(contractAddr)] = filterSet } // Used to set subset of account addresses to watch events for func (tr *transformer) SetEventArgs(contractAddr string, filterSet []string) { - tr.EventArgs[contractAddr] = filterSet + tr.EventArgs[strings.ToLower(contractAddr)] = filterSet } // Used to set which contract addresses and which of their methods to call func (tr *transformer) SetMethods(contractAddr string, filterSet []string) { - tr.WantedMethods[contractAddr] = filterSet + tr.WantedMethods[strings.ToLower(contractAddr)] = filterSet } // Used to set subset of account addresses to poll methods on func (tr *transformer) SetMethodArgs(contractAddr string, filterSet []string) { - tr.MethodArgs[contractAddr] = filterSet + tr.MethodArgs[strings.ToLower(contractAddr)] = filterSet } // Used to set the block range to watch for a given address -func (tr *transformer) SetRange(contractAddr string, rng [2]int64) { - tr.ContractRanges[contractAddr] = rng +func (tr *transformer) SetStartingBlock(contractAddr string, start int64) { + tr.ContractStart[strings.ToLower(contractAddr)] = start } // Used to set whether or not to persist an account address list func (tr *transformer) SetCreateAddrList(contractAddr string, on bool) { - tr.CreateAddrList[contractAddr] = on + tr.CreateAddrList[strings.ToLower(contractAddr)] = on } // Used to set whether or not to persist an hash list func (tr *transformer) SetCreateHashList(contractAddr string, on bool) { - tr.CreateHashList[contractAddr] = on + tr.CreateHashList[strings.ToLower(contractAddr)] = on } // Used to turn method piping on for a contract func (tr *transformer) SetPiping(contractAddr string, on bool) { - tr.Piping[contractAddr] = on + tr.Piping[strings.ToLower(contractAddr)] = on } diff --git a/pkg/omni/full/transformer/transformer_test.go b/pkg/omni/full/transformer/transformer_test.go index d5e4ed2e..9d68143c 100644 --- a/pkg/omni/full/transformer/transformer_test.go +++ b/pkg/omni/full/transformer/transformer_test.go @@ -19,6 +19,7 @@ package transformer_test import ( "fmt" "math/rand" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -39,6 +40,8 @@ var _ = Describe("Transformer", func() { var err error var blockChain core.BlockChain var blockRepository repositories.BlockRepository + var ensAddr = strings.ToLower(constants.EnsContractAddress) + var tusdAddr = strings.ToLower(constants.TusdContractAddress) rand.Seed(time.Now().UnixNano()) BeforeEach(func() { @@ -55,7 +58,7 @@ var _ = Describe("Transformer", func() { watchedEvents := []string{"Transfer", "Mint"} t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, watchedEvents) - Expect(t.WatchedEvents[constants.TusdContractAddress]).To(Equal(watchedEvents)) + Expect(t.WatchedEvents[tusdAddr]).To(Equal(watchedEvents)) }) }) @@ -64,7 +67,7 @@ var _ = Describe("Transformer", func() { eventAddrs := []string{"test1", "test2"} t := transformer.NewTransformer("", blockChain, db) t.SetEventArgs(constants.TusdContractAddress, eventAddrs) - Expect(t.EventArgs[constants.TusdContractAddress]).To(Equal(eventAddrs)) + Expect(t.EventArgs[tusdAddr]).To(Equal(eventAddrs)) }) }) @@ -73,7 +76,7 @@ var _ = Describe("Transformer", func() { watchedMethods := []string{"balanceOf", "totalSupply"} t := transformer.NewTransformer("", blockChain, db) t.SetMethods(constants.TusdContractAddress, watchedMethods) - Expect(t.WantedMethods[constants.TusdContractAddress]).To(Equal(watchedMethods)) + Expect(t.WantedMethods[tusdAddr]).To(Equal(watchedMethods)) }) }) @@ -82,16 +85,15 @@ var _ = Describe("Transformer", func() { methodAddrs := []string{"test1", "test2"} t := transformer.NewTransformer("", blockChain, db) t.SetMethodArgs(constants.TusdContractAddress, methodAddrs) - Expect(t.MethodArgs[constants.TusdContractAddress]).To(Equal(methodAddrs)) + Expect(t.MethodArgs[tusdAddr]).To(Equal(methodAddrs)) }) }) - Describe("SetRange", func() { + Describe("SetStartingBlock", func() { It("Sets the block range that the contract should be watched within", func() { - rng := [2]int64{1, 100000} t := transformer.NewTransformer("", blockChain, db) - t.SetRange(constants.TusdContractAddress, rng) - Expect(t.ContractRanges[constants.TusdContractAddress]).To(Equal(rng)) + t.SetStartingBlock(constants.TusdContractAddress, 11) + Expect(t.ContractStart[tusdAddr]).To(Equal(int64(11))) }) }) @@ -99,7 +101,7 @@ var _ = Describe("Transformer", func() { It("Sets the block range that the contract should be watched within", func() { t := transformer.NewTransformer("", blockChain, db) t.SetCreateAddrList(constants.TusdContractAddress, true) - Expect(t.CreateAddrList[constants.TusdContractAddress]).To(Equal(true)) + Expect(t.CreateAddrList[tusdAddr]).To(Equal(true)) }) }) @@ -107,7 +109,7 @@ var _ = Describe("Transformer", func() { It("Sets the block range that the contract should be watched within", func() { t := transformer.NewTransformer("", blockChain, db) t.SetCreateHashList(constants.TusdContractAddress, true) - Expect(t.CreateHashList[constants.TusdContractAddress]).To(Equal(true)) + Expect(t.CreateHashList[tusdAddr]).To(Equal(true)) }) }) @@ -120,14 +122,14 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.TusdContractAddress] + c, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(int64(6194633))) Expect(c.LastBlock).To(Equal(int64(6194634))) Expect(c.Abi).To(Equal(constants.TusdAbiString)) Expect(c.Name).To(Equal("TrueUSD")) - Expect(c.Address).To(Equal(constants.TusdContractAddress)) + Expect(c.Address).To(Equal(tusdAddr)) }) It("Fails to initialize if first and most recent blocks cannot be fetched from vDB", func() { @@ -144,7 +146,7 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - _, ok := t.Contracts[constants.TusdContractAddress] + _, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(false)) }) }) @@ -166,7 +168,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.TransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event WHERE block = 6194634", tusdAddr)).StructScan(&log) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee")) @@ -183,7 +185,7 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.TusdContractAddress] + c, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(true)) err = t.Execute() @@ -222,17 +224,17 @@ var _ = Describe("Transformer", func() { res := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", tusdAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("0")) Expect(res.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843bCE061BA391' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843bCE061BA391' AND block = '6194634'", tusdAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("0")) Expect(res.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", tusdAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) @@ -264,7 +266,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.NewOwnerLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", ensAddr)).StructScan(&log) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb")) @@ -281,7 +283,7 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.EnsContractAddress] + c, ok := t.Contracts[ensAddr] Expect(ok).To(Equal(true)) err = t.Execute() @@ -312,17 +314,17 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.Owner{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6194636'", ensAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) @@ -339,7 +341,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.LightNewOwnerLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", ensAddr)).StructScan(&log) Expect(err).To(HaveOccurred()) }) @@ -355,12 +357,12 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.Owner{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", ensAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/omni/light/transformer/transformer.go b/pkg/omni/light/transformer/transformer.go index a0684d65..565e6966 100644 --- a/pkg/omni/light/transformer/transformer.go +++ b/pkg/omni/light/transformer/transformer.go @@ -49,7 +49,7 @@ type transformer struct { // Processing interfaces fetcher.Fetcher // Fetches event logs, using header hashes converter.Converter // Converts watched event logs into custom log - poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore + poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore // Ethereum network name; default "" is mainnet Network string @@ -58,15 +58,15 @@ type transformer struct { Contracts map[string]*contract.Contract // Targeted subset of events/methods - // Stored as map sof contract address to events/method names of interest + // Stored as maps of contract address to events/method names of interest WatchedEvents map[string][]string // Default/empty event list means all are watched WantedMethods map[string][]string // Default/empty method list means none are polled - // Block ranges to watch contracts - ContractRanges map[string][2]int64 + // Starting block number for each contract + ContractStart map[string]int64 - // Lists of addresses to filter event or method data - // before persisting; if empty no filter is applied + // Lists of argument values to filter event or + // method data with; if empty no filter is applied EventArgs map[string][]string MethodArgs map[string][]string @@ -76,13 +76,21 @@ type transformer struct { // Method piping on/off for a contract Piping map[string]bool + + // Internally configured transformer variables + contractAddresses []string // Holds all contract addresses, for batch fetching of logs + sortedEventIds map[string][]string // Map to sort event column ids by contract, for post fetch processing and persisting of logs + sortedMethodIds map[string][]string // Map to sort method column ids by contract, for post fetch method polling + eventIds []string // Holds event column ids across all contract, for batch fetching of headers + eventFilters []common.Hash // Holds topic0 hashes across all contracts, for batch fetching of logs + start int64 // Hold the lowest starting block and the highest ending block } // Order-of-operations: // 1. Create new transformer // 2. Load contract addresses and their parameters // 3. Init -// 3. Execute +// 4. Execute // Transformer takes in config for blockchain, database, and network id func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transformer { @@ -98,12 +106,13 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf EventRepository: srep.NewEventRepository(db, types.LightSync), WatchedEvents: map[string][]string{}, WantedMethods: map[string][]string{}, - ContractRanges: map[string][2]int64{}, + ContractStart: map[string]int64{}, EventArgs: map[string][]string{}, MethodArgs: map[string][]string{}, CreateAddrList: map[string]bool{}, CreateHashList: map[string]bool{}, Piping: map[string]bool{}, + Network: network, } } @@ -112,6 +121,14 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf // Uses parser to pull event info from abi // Use this info to generate event filters func (tr *transformer) Init() error { + // Initialize internally configured transformer settings + tr.contractAddresses = make([]string, 0) // Holds all contract addresses, for batch fetching of logs + tr.sortedEventIds = make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing and persisting of logs + tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling + tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers + tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs + tr.start = 100000000000 // Hold the lowest starting block and the highest ending block + // Iterate through all internal contract addresses for contractAddr, subset := range tr.WatchedEvents { // Get Abi @@ -131,11 +148,8 @@ func (tr *transformer) Init() error { } // Set to specified range if it falls within the bounds - if firstBlock < tr.ContractRanges[contractAddr][0] { - firstBlock = tr.ContractRanges[contractAddr][0] - } - if lastBlock > tr.ContractRanges[contractAddr][1] && tr.ContractRanges[contractAddr][1] > firstBlock { - lastBlock = tr.ContractRanges[contractAddr][1] + if firstBlock < tr.ContractStart[contractAddr] { + firstBlock = tr.ContractStart[contractAddr] } // Get contract name if it has one @@ -153,14 +167,14 @@ func (tr *transformer) Init() error { } // Aggregate info into contract object and store for execution - tr.Contracts[contractAddr] = contract.Contract{ + con := contract.Contract{ Name: *name, Network: tr.Network, Address: contractAddr, Abi: tr.Parser.Abi(), ParsedAbi: tr.Parser.ParsedAbi(), StartingBlock: firstBlock, - LastBlock: lastBlock, + LastBlock: -1, Events: tr.Parser.GetEvents(subset), Methods: tr.Parser.GetSelectMethods(tr.WantedMethods[contractAddr]), FilterArgs: eventArgs, @@ -169,67 +183,57 @@ func (tr *transformer) Init() error { CreateHashList: tr.CreateHashList[contractAddr], Piping: tr.Piping[contractAddr], }.Init() - } + tr.Contracts[contractAddr] = con + tr.contractAddresses = append(tr.contractAddresses, con.Address) - return nil -} - -func (tr *transformer) Execute() error { - cLen := len(tr.Contracts) - if cLen == 0 { - return errors.New("error: transformer has no initialized contracts") - } - contractAddresses := make([]string, 0, cLen) // Holds all contract addresses, for batch fetching of logs - sortedEventIds := make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing and persisting of logs - sortedMethodIds := make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling - eventIds := make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers - eventFilters := make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs - sortedLogs := make(map[string][]gethTypes.Log) // Map to sort batch fetched logs by which contract they belong to, for post fetch processing - var start, end int64 // Hold the lowest starting block and the highest ending block - start = 100000000000 - end = -1 - - // Cycle through all contracts and extract info needed for fetching and post-processing - for _, con := range tr.Contracts { - sortedLogs[con.Address] = []gethTypes.Log{} - sortedEventIds[con.Address] = make([]string, 0, len(con.Events)) - contractAddresses = append(contractAddresses, con.Address) + // Create checked_headers columns for each event id and append to list of all event ids + tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events)) for _, event := range con.Events { - // Generate eventID and use it to create a checked_header column if one does not already exist eventId := strings.ToLower(event.Name + "_" + con.Address) err := tr.HeaderRepository.AddCheckColumn(eventId) if err != nil { return err } // Keep track of this event id; sorted and unsorted - sortedEventIds[con.Address] = append(sortedEventIds[con.Address], eventId) - eventIds = append(eventIds, eventId) + tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId) + tr.eventIds = append(tr.eventIds, eventId) // Append this event sig to the filters - eventFilters = append(eventFilters, event.Sig()) + tr.eventFilters = append(tr.eventFilters, event.Sig()) } - // Create checked_headers columns for each method id and generate list of all method ids - sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods)) + // Create checked_headers columns for each method id and append list of all method ids + tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods)) for _, m := range con.Methods { methodId := strings.ToLower(m.Name + "_" + con.Address) err := tr.HeaderRepository.AddCheckColumn(methodId) if err != nil { return err } - sortedMethodIds[con.Address] = append(sortedMethodIds[con.Address], methodId) + tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId) } - // Update start to the lowest block and end to the highest block - if con.StartingBlock < start { - start = con.StartingBlock - } - if con.LastBlock > end { - end = con.LastBlock + // Update start to the lowest block + if con.StartingBlock < tr.start { + tr.start = con.StartingBlock } } + return nil +} + +func (tr *transformer) Execute() error { + if len(tr.Contracts) == 0 { + return errors.New("error: transformer has no initialized contracts") + } + + // Map to sort batch fetched logs by which contract they belong to, for post fetch processing + sortedLogs := make(map[string][]gethTypes.Log) + for _, con := range tr.Contracts { + sortedLogs[con.Address] = []gethTypes.Log{} + } + // Find unchecked headers for all events across all contracts; these are returned in asc order - missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(start, end, eventIds) + missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.start, -1, tr.eventIds) if err != nil { return err } @@ -237,37 +241,45 @@ func (tr *transformer) Execute() error { // Iterate over headers for _, header := range missingHeaders { // And fetch all event logs across contracts at this header - allLogs, err := tr.Fetcher.FetchLogs(contractAddresses, eventFilters, header) + allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header) if err != nil { return err } - // Mark the header checked for all of these eventIDs and continue to method polling and then the next iteration if no logs are found + // If no logs are found mark the header checked for all of these eventIDs + // and continue to method polling and onto the next iteration if len(allLogs) < 1 { - err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds) + err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) if err != nil { return err } - goto Polling + err = tr.methodPolling(header, tr.sortedMethodIds) + if err != nil { + return err + } + continue } // Sort logs by the contract they belong to for _, log := range allLogs { - sortedLogs[log.Address.Hex()] = append(sortedLogs[log.Address.Hex()], log) + addr := strings.ToLower(log.Address.Hex()) + sortedLogs[addr] = append(sortedLogs[addr], log) } // Process logs for each contract for conAddr, logs := range sortedLogs { + if logs == nil { + continue + } // Configure converter with this contract con := tr.Contracts[conAddr] tr.Converter.Update(con) - // Convert logs into batches of log mappings (event => []types.Log) + // Convert logs into batches of log mappings (eventName => []types.Logs convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id) if err != nil { return err } - // Cycle through each type of event log and persist them for eventName, logs := range convertedLogs { // If logs for this event are empty, mark them checked at this header and continue @@ -288,9 +300,8 @@ func (tr *transformer) Execute() error { } } - Polling: // Poll contracts at this block height - err = tr.pollContracts(header, sortedMethodIds) + err = tr.methodPolling(header, tr.sortedMethodIds) if err != nil { return err } @@ -300,7 +311,7 @@ func (tr *transformer) Execute() error { } // Used to poll contract methods at a given header -func (tr *transformer) pollContracts(header core.Header, sortedMethodIds map[string][]string) error { +func (tr *transformer) methodPolling(header core.Header, sortedMethodIds map[string][]string) error { for _, con := range tr.Contracts { // Skip method polling processes if no methods are specified // Also don't try to poll methods below this contract's specified starting block @@ -326,40 +337,40 @@ func (tr *transformer) pollContracts(header core.Header, sortedMethodIds map[str // Used to set which contract addresses and which of their events to watch func (tr *transformer) SetEvents(contractAddr string, filterSet []string) { - tr.WatchedEvents[contractAddr] = filterSet + tr.WatchedEvents[strings.ToLower(contractAddr)] = filterSet } // Used to set subset of account addresses to watch events for func (tr *transformer) SetEventArgs(contractAddr string, filterSet []string) { - tr.EventArgs[contractAddr] = filterSet + tr.EventArgs[strings.ToLower(contractAddr)] = filterSet } // Used to set which contract addresses and which of their methods to call func (tr *transformer) SetMethods(contractAddr string, filterSet []string) { - tr.WantedMethods[contractAddr] = filterSet + tr.WantedMethods[strings.ToLower(contractAddr)] = filterSet } // Used to set subset of account addresses to poll methods on func (tr *transformer) SetMethodArgs(contractAddr string, filterSet []string) { - tr.MethodArgs[contractAddr] = filterSet + tr.MethodArgs[strings.ToLower(contractAddr)] = filterSet } // Used to set the block range to watch for a given address -func (tr *transformer) SetRange(contractAddr string, rng [2]int64) { - tr.ContractRanges[contractAddr] = rng +func (tr *transformer) SetStartingBlock(contractAddr string, start int64) { + tr.ContractStart[strings.ToLower(contractAddr)] = start } // Used to set whether or not to persist an account address list func (tr *transformer) SetCreateAddrList(contractAddr string, on bool) { - tr.CreateAddrList[contractAddr] = on + tr.CreateAddrList[strings.ToLower(contractAddr)] = on } // Used to set whether or not to persist an hash list func (tr *transformer) SetCreateHashList(contractAddr string, on bool) { - tr.CreateHashList[contractAddr] = on + tr.CreateHashList[strings.ToLower(contractAddr)] = on } // Used to turn method piping on for a contract func (tr *transformer) SetPiping(contractAddr string, on bool) { - tr.Piping[contractAddr] = on + tr.Piping[strings.ToLower(contractAddr)] = on } diff --git a/pkg/omni/light/transformer/transformer_test.go b/pkg/omni/light/transformer/transformer_test.go index abf20ff9..db805532 100644 --- a/pkg/omni/light/transformer/transformer_test.go +++ b/pkg/omni/light/transformer/transformer_test.go @@ -18,6 +18,7 @@ package transformer_test import ( "fmt" + "strings" "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" @@ -38,6 +39,8 @@ var _ = Describe("Transformer", func() { var blockChain core.BlockChain var headerRepository repositories.HeaderRepository var headerID, headerID2 int64 + var ensAddr = strings.ToLower(constants.EnsContractAddress) + var tusdAddr = strings.ToLower(constants.TusdContractAddress) BeforeEach(func() { db, blockChain = test_helpers.SetupDBandBC() @@ -53,7 +56,7 @@ var _ = Describe("Transformer", func() { watchedEvents := []string{"Transfer", "Mint"} t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, watchedEvents) - Expect(t.WatchedEvents[constants.TusdContractAddress]).To(Equal(watchedEvents)) + Expect(t.WatchedEvents[tusdAddr]).To(Equal(watchedEvents)) }) }) @@ -62,7 +65,7 @@ var _ = Describe("Transformer", func() { eventAddrs := []string{"test1", "test2"} t := transformer.NewTransformer("", blockChain, db) t.SetEventArgs(constants.TusdContractAddress, eventAddrs) - Expect(t.EventArgs[constants.TusdContractAddress]).To(Equal(eventAddrs)) + Expect(t.EventArgs[tusdAddr]).To(Equal(eventAddrs)) }) }) @@ -71,7 +74,7 @@ var _ = Describe("Transformer", func() { watchedMethods := []string{"balanceOf", "totalSupply"} t := transformer.NewTransformer("", blockChain, db) t.SetMethods(constants.TusdContractAddress, watchedMethods) - Expect(t.WantedMethods[constants.TusdContractAddress]).To(Equal(watchedMethods)) + Expect(t.WantedMethods[tusdAddr]).To(Equal(watchedMethods)) }) }) @@ -80,16 +83,15 @@ var _ = Describe("Transformer", func() { methodAddrs := []string{"test1", "test2"} t := transformer.NewTransformer("", blockChain, db) t.SetMethodArgs(constants.TusdContractAddress, methodAddrs) - Expect(t.MethodArgs[constants.TusdContractAddress]).To(Equal(methodAddrs)) + Expect(t.MethodArgs[tusdAddr]).To(Equal(methodAddrs)) }) }) - Describe("SetRange", func() { + Describe("SetStartingBlock", func() { It("Sets the block range that the contract should be watched within", func() { - rng := [2]int64{1, 100000} t := transformer.NewTransformer("", blockChain, db) - t.SetRange(constants.TusdContractAddress, rng) - Expect(t.ContractRanges[constants.TusdContractAddress]).To(Equal(rng)) + t.SetStartingBlock(constants.TusdContractAddress, 11) + Expect(t.ContractStart[tusdAddr]).To(Equal(int64(11))) }) }) @@ -97,7 +99,7 @@ var _ = Describe("Transformer", func() { It("Sets the block range that the contract should be watched within", func() { t := transformer.NewTransformer("", blockChain, db) t.SetCreateAddrList(constants.TusdContractAddress, true) - Expect(t.CreateAddrList[constants.TusdContractAddress]).To(Equal(true)) + Expect(t.CreateAddrList[tusdAddr]).To(Equal(true)) }) }) @@ -105,7 +107,7 @@ var _ = Describe("Transformer", func() { It("Sets the block range that the contract should be watched within", func() { t := transformer.NewTransformer("", blockChain, db) t.SetCreateHashList(constants.TusdContractAddress, true) - Expect(t.CreateHashList[constants.TusdContractAddress]).To(Equal(true)) + Expect(t.CreateHashList[tusdAddr]).To(Equal(true)) }) }) @@ -118,14 +120,14 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.TusdContractAddress] + c, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(int64(6194632))) - Expect(c.LastBlock).To(Equal(int64(6194634))) + Expect(c.LastBlock).To(Equal(int64(-1))) Expect(c.Abi).To(Equal(constants.TusdAbiString)) Expect(c.Name).To(Equal("TrueUSD")) - Expect(c.Address).To(Equal(constants.TusdContractAddress)) + Expect(c.Address).To(Equal(tusdAddr)) }) It("Fails to initialize if first and most recent block numbers cannot be fetched from vDB headers table", func() { @@ -142,7 +144,7 @@ var _ = Describe("Transformer", func() { err = t.Init() Expect(err).ToNot(HaveOccurred()) - _, ok := t.Contracts[constants.TusdContractAddress] + _, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(false)) }) }) @@ -171,7 +173,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.LightTransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&log) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.HeaderID).To(Equal(headerID)) @@ -186,7 +188,7 @@ var _ = Describe("Transformer", func() { t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"}) err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.TusdContractAddress] + c, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(true)) err = t.Execute() Expect(err).ToNot(HaveOccurred()) @@ -232,12 +234,12 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("55849938025000000000000")) Expect(res.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) @@ -274,7 +276,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.LightNewOwnerLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&log) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.HeaderID).To(Equal(headerID)) @@ -289,7 +291,7 @@ var _ = Describe("Transformer", func() { t.SetMethods(constants.EnsContractAddress, []string{"owner"}) err = t.Init() Expect(err).ToNot(HaveOccurred()) - c, ok := t.Contracts[constants.EnsContractAddress] + c, ok := t.Contracts[ensAddr] Expect(ok).To(Equal(true)) err = t.Execute() Expect(err).ToNot(HaveOccurred()) @@ -319,17 +321,17 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.Owner{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", ensAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) @@ -344,7 +346,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.LightNewOwnerLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&log) Expect(err).To(HaveOccurred()) }) @@ -359,12 +361,12 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.Owner{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", ensAddr)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) Expect(res.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", ensAddr)).StructScan(&res) Expect(err).To(HaveOccurred()) }) }) @@ -405,7 +407,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) newOwnerLog := test_helpers.LightNewOwnerLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&newOwnerLog) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures Expect(newOwnerLog.HeaderID).To(Equal(headerID2)) @@ -414,7 +416,7 @@ var _ = Describe("Transformer", func() { Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) transferLog := test_helpers.LightTransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&transferLog) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&transferLog) Expect(err).ToNot(HaveOccurred()) // We don't know vulcID, so compare individual fields instead of complete structures Expect(transferLog.HeaderID).To(Equal(headerID)) @@ -431,9 +433,9 @@ var _ = Describe("Transformer", func() { t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"}) err = t.Init() Expect(err).ToNot(HaveOccurred()) - ens, ok := t.Contracts[constants.EnsContractAddress] + ens, ok := t.Contracts[ensAddr] Expect(ok).To(Equal(true)) - tusd, ok := t.Contracts[constants.TusdContractAddress] + tusd, ok := t.Contracts[tusdAddr] Expect(ok).To(Equal(true)) err = t.Execute() Expect(err).ToNot(HaveOccurred()) @@ -479,26 +481,26 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) owner := test_helpers.Owner{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", ensAddr)).StructScan(&owner) Expect(err).ToNot(HaveOccurred()) Expect(owner.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) Expect(owner.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", ensAddr)).StructScan(&owner) Expect(err).ToNot(HaveOccurred()) Expect(owner.Address).To(Equal("0x0000000000000000000000000000000000000000")) Expect(owner.TokenName).To(Equal("")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&owner) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", ensAddr)).StructScan(&owner) Expect(err).To(HaveOccurred()) bal := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&bal) Expect(err).ToNot(HaveOccurred()) Expect(bal.Balance).To(Equal("55849938025000000000000")) Expect(bal.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&bal) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&bal) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/omni/shared/contract/contract.go b/pkg/omni/shared/contract/contract.go index afda9487..a916e734 100644 --- a/pkg/omni/shared/contract/contract.go +++ b/pkg/omni/shared/contract/contract.go @@ -81,7 +81,7 @@ func (c *Contract) GenerateFilters() error { Name: event.Name, FromBlock: c.StartingBlock, ToBlock: -1, - Address: c.Address, + Address: common.HexToAddress(c.Address).Hex(), Topics: core.Topics{event.Sig().Hex()}, } } diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go index a3e04dfb..75db6497 100644 --- a/pkg/omni/shared/helpers/test_helpers/database.go +++ b/pkg/omni/shared/helpers/test_helpers/database.go @@ -260,37 +260,10 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`DELETE FROM receipts`) Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`) + _, err = tx.Exec(`DROP TABLE public.checked_headers`) Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr2`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr3`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr2`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr3`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS balanceof_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS newowner_0x314159265dd8dbb310642f98f50c066173c1259b`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS owner_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS owner_0x314159265dd8dbb310642f98f50c066173c1259b`) + _, err = tx.Exec(`CREATE TABLE public.checked_headers (id SERIAL PRIMARY KEY, header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE);`) Expect(err).NotTo(HaveOccurred()) _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`) diff --git a/pkg/omni/shared/parser/parser.go b/pkg/omni/shared/parser/parser.go index 568be2ca..85e4a50f 100644 --- a/pkg/omni/shared/parser/parser.go +++ b/pkg/omni/shared/parser/parser.go @@ -93,7 +93,7 @@ func (p *parser) lookUp(contractAddr string) (string, error) { // Returns only specified methods, if they meet the criteria // Returns as array with methods in same order they were specified -// Nil wanted array => no events are returned +// Nil or empty wanted array => no events are returned func (p *parser) GetSelectMethods(wanted []string) []types.Method { wLen := len(wanted) if wLen == 0 { diff --git a/pkg/omni/shared/repository/event_repository.go b/pkg/omni/shared/repository/event_repository.go index b1b4241e..4b599447 100644 --- a/pkg/omni/shared/repository/event_repository.go +++ b/pkg/omni/shared/repository/event_repository.go @@ -193,7 +193,6 @@ func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types. // Returns true if it created a new table; returns false if table already existed func (r *eventRepository) CreateEventTable(contractAddr string, event types.Event) (bool, error) { tableID := fmt.Sprintf("%s_%s.%s_event", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(event.Name)) - // Check cache before querying pq to see if table exists _, ok := r.tables.Get(tableID) if ok { diff --git a/pkg/omni/shared/transformer/interface.go b/pkg/omni/shared/transformer/interface.go index 64e5d03e..42a241df 100644 --- a/pkg/omni/shared/transformer/interface.go +++ b/pkg/omni/shared/transformer/interface.go @@ -23,7 +23,7 @@ type Transformer interface { SetEventArgs(contractAddr string, filterSet []string) SetMethods(contractAddr string, filterSet []string) SetMethodArgs(contractAddr string, filterSet []string) - SetRange(contractAddr string, rng [2]int64) + SetStartingBlock(contractAddr string, start int64) SetCreateAddrList(contractAddr string, on bool) SetCreateHashList(contractAddr string, on bool) SetPiping(contractAddr string, on bool)