diff --git a/.travis.yml b/.travis.yml index 8d52cda4..bda07aa3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ dist: trusty language: go go: - - 1.9 + - 1.11 services: - postgresql addons: diff --git a/README.md b/README.md index 90a8249f..7364a2d3 100644 --- a/README.md +++ b/README.md @@ -139,21 +139,21 @@ false If you have full rinkeby chaindata you can move it to `rinkeby_vulcanizedb_geth_data` docker volume to skip long wait of sync. ## omniWatcher and lightOmniWatcher -These commands require a pre-synced (full or light, respectively) vulcanizeDB +These commands require a pre-synced (full or light, respectively) vulcanizeDB (see above sections) To watch all events of a contract: - Execute `./vulcanizedb omniWatcher --config --contract-address ` - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address ` + - Or `./vulcanizedb lightOmniWatcher --config --contract-address ` To watch contracts on a network other than mainnet, use the network flag: - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --network ` To watch events within a certain block range use the starting block and ending block flags: - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --starting-block-number <#> --ending-block-number <#>` -To watch only select events use the contract events flag: - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --contract-events --contract-events ` -To watch all events and poll select methods with the addresses emitted by those events: - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --contract-methods --contract-methods ` -To watch select event and poll select method: - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --contract-events --contract-methods ` -To watch all types of events of the contract but only persist the ones that emit one of the filtered-for addresses: - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --event-filter-addresses --event-filter-addresses ` -To watch all events of the contract but only poll a select method with specified addresses: - - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --method-filter-addresses --method-filter-addresses ` \ No newline at end of file +To watch only specified events use the events flag: + - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --events --events ` +To watch events and poll the specified methods with any addresses and hashes emitted by the watched events utilize the methods flag: + - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --methods --methods ` +To watch specified events and poll the specified method with any addresses and hashes emiited by the watched events: + - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --events --events --methods ` +To watch all types of events of the contract but only persist the ones that emit one of the filtered-for argument values: + - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --event-args --event-args ` +To watch all events of the contract but only poll the specified method with specified argument values (if they are emitted from the watched events): + - Execute `./vulcanizedb lightOmniWatcher --config --contract-address --methods --method-args --method-args ` \ No newline at end of file diff --git a/cmd/lightOmniWatcher.go b/cmd/lightOmniWatcher.go deleted file mode 100644 index 98f4e6ae..00000000 --- a/cmd/lightOmniWatcher.go +++ /dev/null @@ -1,102 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "fmt" - "log" - "time" - - "github.com/spf13/cobra" - - "github.com/vulcanize/vulcanizedb/libraries/shared" - "github.com/vulcanize/vulcanizedb/pkg/omni/light/transformer" - "github.com/vulcanize/vulcanizedb/utils" -) - -// omniWatcherCmd represents the omniWatcher command -var lightOmniWatcherCmd = &cobra.Command{ - Use: "lightOmniWatcher", - Short: "Watches events at the provided contract address using lightSynced vDB", - Long: `Uses input contract address and event filters to watch events - -Expects an ethereum node to be running -Expects lightSync to have been run and the presence of headers in the Vulcanize database -Requires a .toml config file: - - [database] - name = "vulcanize_public" - hostname = "localhost" - port = 5432 - - [client] - ipcPath = "/Users/user/Library/Ethereum/geth.ipc" -`, - Run: func(cmd *cobra.Command, args []string) { - lightOmniWatcher() - }, -} - -func lightOmniWatcher() { - if contractAddress == "" && len(contractAddresses) == 0 { - log.Fatal("Contract address required") - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - blockChain := getBlockChain() - db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - t := transformer.NewTransformer(network, blockChain, &db) - - contractAddresses = append(contractAddresses, contractAddress) - for _, addr := range contractAddresses { - t.SetEvents(addr, contractEvents) - t.SetMethods(addr, contractMethods) - t.SetEventArgs(addr, eventArgs) - t.SetMethodArgs(addr, methodArgs) - t.SetRange(addr, [2]int64{startingBlockNumber, endingBlockNumber}) - t.SetCreateAddrList(addr, createAddrList) - } - - err := t.Init() - if err != nil { - log.Fatal(fmt.Sprintf("Failed to initialized transformer\r\nerr: %v\r\n", err)) - } - - w := shared.Watcher{} - w.AddTransformer(t) - - for range ticker.C { - w.Execute() - } -} - -func init() { - rootCmd.AddCommand(lightOmniWatcherCmd) - - lightOmniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for") - lightOmniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address") - lightOmniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "events", "e", []string{}, "Subset of events to watch; by default all events are watched") - lightOmniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "methods", "m", nil, "Subset of methods to poll; by default no methods are polled") - lightOmniWatcherCmd.Flags().StringArrayVarP(&eventArgs, "event-args", "f", []string{}, "Argument values to filter event logs for; will only persist event logs that emit at least one of the value specified") - lightOmniWatcherCmd.Flags().StringArrayVarP(&methodArgs, "method-args", "g", []string{}, "Argument values to limit methods to; will only call methods with emitted values that were specified here") - lightOmniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) - lightOmniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") - lightOmniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") - lightOmniWatcherCmd.Flags().BoolVarP(&createAddrList, "create-address-list", "c", false, "Set to true to persist address seen in emitted events into the database") -} diff --git a/cmd/lightSync.go b/cmd/lightSync.go index b6845a1d..7bd2f6bc 100644 --- a/cmd/lightSync.go +++ b/cmd/lightSync.go @@ -14,20 +14,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Copyright © 2018 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package cmd import ( diff --git a/cmd/omniWatcher.go b/cmd/omniWatcher.go index 50f4ccbb..cf17cadf 100644 --- a/cmd/omniWatcher.go +++ b/cmd/omniWatcher.go @@ -24,7 +24,9 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/libraries/shared" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/transformer" + ft "github.com/vulcanize/vulcanizedb/pkg/omni/full/transformer" + lt "github.com/vulcanize/vulcanizedb/pkg/omni/light/transformer" + st "github.com/vulcanize/vulcanizedb/pkg/omni/shared/transformer" "github.com/vulcanize/vulcanizedb/utils" ) @@ -51,6 +53,18 @@ Requires a .toml config file: }, } +var ( + network string + contractAddress string + contractAddresses []string + contractEvents []string + contractMethods []string + eventArgs []string + methodArgs []string + methodPiping bool + mode string +) + func omniWatcher() { if contractAddress == "" && len(contractAddresses) == 0 { log.Fatal("Contract address required") @@ -61,7 +75,16 @@ func omniWatcher() { blockChain := getBlockChain() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - t := transformer.NewTransformer(network, blockChain, &db) + + var t st.Transformer + switch mode { + case "light": + t = lt.NewTransformer(network, blockChain, &db) + case "full": + t = ft.NewTransformer(network, blockChain, &db) + default: + log.Fatal("Invalid mode") + } contractAddresses = append(contractAddresses, contractAddress) for _, addr := range contractAddresses { @@ -70,6 +93,7 @@ func omniWatcher() { t.SetEventArgs(addr, eventArgs) t.SetMethodArgs(addr, methodArgs) t.SetRange(addr, [2]int64{startingBlockNumber, endingBlockNumber}) + t.SetPiping(addr, methodPiping) } err := t.Init() @@ -88,6 +112,7 @@ func omniWatcher() { func init() { rootCmd.AddCommand(omniWatcherCmd) + omniWatcherCmd.Flags().StringVarP(&mode, "mode", "o", "light", "'light' or 'full' mode to work with either light synced or fully synced vDB (default is light)") omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for") omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address") omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "events", "e", []string{}, "Subset of events to watch; by default all events are watched") @@ -97,5 +122,5 @@ func init() { omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") omniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") - omniWatcherCmd.Flags().BoolVarP(&createAddrList, "create-address-list", "c", false, "Set to true to persist address seen in emitted events into the database") + omniWatcherCmd.Flags().BoolVarP(&methodPiping, "piping", "p", false, "Turn on method output piping: methods listed first will be polled first and their output used as input to subsequent methods") } diff --git a/cmd/root.go b/cmd/root.go index f5ae196b..49c1b69b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -42,14 +42,6 @@ var ( startingBlockNumber int64 syncAll bool endingBlockNumber int64 - network string - contractAddress string - contractAddresses []string - contractEvents []string - contractMethods []string - eventArgs []string - methodArgs []string - createAddrList bool ) var rootCmd = &cobra.Command{ diff --git a/pkg/omni/full/converter/converter.go b/pkg/omni/full/converter/converter.go index e4bc0d6a..4f6e16f8 100644 --- a/pkg/omni/full/converter/converter.go +++ b/pkg/omni/full/converter/converter.go @@ -19,6 +19,7 @@ package converter import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/common/hexutil" "math/big" "strconv" @@ -69,7 +70,6 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) ( } strValues := make(map[string]string, len(values)) - seenBytes := make([]interface{}, 0, len(values)) seenAddrs := make([]interface{}, 0, len(values)) seenHashes := make([]interface{}, 0, len(values)) for fieldName, input := range values { @@ -92,8 +92,10 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) ( strValues[fieldName] = strconv.FormatBool(input.(bool)) case []byte: b := input.([]byte) - strValues[fieldName] = string(b) - seenBytes = append(seenBytes, b) + strValues[fieldName] = hexutil.Encode(b) + if len(b) == 32 { + seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName])) + } case byte: b := input.(byte) strValues[fieldName] = string(b) @@ -118,9 +120,6 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) ( if c.ContractInfo.EmittedHashes != nil { c.ContractInfo.AddEmittedHash(seenHashes...) } - if c.ContractInfo.EmittedBytes != nil { - c.ContractInfo.AddEmittedBytes(seenBytes...) - } return eventLog, nil } diff --git a/pkg/omni/full/transformer/transformer.go b/pkg/omni/full/transformer/transformer.go index d18c6482..b9dbcaa7 100644 --- a/pkg/omni/full/transformer/transformer.go +++ b/pkg/omni/full/transformer/transformer.go @@ -66,8 +66,12 @@ type transformer struct { EventArgs map[string][]string MethodArgs map[string][]string - // Whether or not to create a list of token holder addresses for the contract in postgres + // Whether or not to create a list of emitted address or hashes for the contract in postgres CreateAddrList map[string]bool + CreateHashList map[string]bool + + // Method piping on/off for a contract + Piping map[string]bool } // Transformer takes in config for blockchain, database, and network id @@ -86,6 +90,9 @@ func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transf ContractRanges: map[string][2]int64{}, EventArgs: map[string][]string{}, MethodArgs: map[string][]string{}, + CreateAddrList: map[string]bool{}, + CreateHashList: map[string]bool{}, + Piping: map[string]bool{}, } } @@ -147,6 +154,8 @@ func (t *transformer) Init() error { FilterArgs: eventArgs, MethodArgs: methodArgs, CreateAddrList: t.CreateAddrList[contractAddr], + CreateHashList: t.CreateHashList[contractAddr], + Piping: t.Piping[contractAddr], }.Init() // Use info to create filters @@ -157,7 +166,10 @@ func (t *transformer) Init() error { // Iterate over filters and push them to the repo using filter repository interface for _, filter := range info.Filters { - t.CreateFilter(filter) + err = t.CreateFilter(filter) + if err != nil { + return err + } } // Store contract info for further processing @@ -246,7 +258,17 @@ func (tr *transformer) SetRange(contractAddr string, rng [2]int64) { tr.ContractRanges[contractAddr] = rng } -// Used to set the block range to watch for a given address +// Used to set whether or not to persist an account address list func (tr *transformer) SetCreateAddrList(contractAddr string, on bool) { tr.CreateAddrList[contractAddr] = on } + +// Used to set whether or not to persist an hash list +func (tr *transformer) SetCreateHashList(contractAddr string, on bool) { + tr.CreateHashList[contractAddr] = on +} + +// Used to turn method piping on for a contract +func (tr *transformer) SetPiping(contractAddr string, on bool) { + tr.Piping[contractAddr] = on +} diff --git a/pkg/omni/full/transformer/transformer_test.go b/pkg/omni/full/transformer/transformer_test.go index ac989c9e..d5e4ed2e 100644 --- a/pkg/omni/full/transformer/transformer_test.go +++ b/pkg/omni/full/transformer/transformer_test.go @@ -95,6 +95,22 @@ var _ = Describe("Transformer", func() { }) }) + Describe("SetCreateAddrList", func() { + It("Sets the block range that the contract should be watched within", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetCreateAddrList(constants.TusdContractAddress, true) + Expect(t.CreateAddrList[constants.TusdContractAddress]).To(Equal(true)) + }) + }) + + Describe("SetCreateHashList", func() { + It("Sets the block range that the contract should be watched within", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetCreateHashList(constants.TusdContractAddress, true) + Expect(t.CreateHashList[constants.TusdContractAddress]).To(Equal(true)) + }) + }) + Describe("Init", func() { It("Initializes transformer's contract objects", func() { blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1) @@ -206,7 +222,7 @@ var _ = Describe("Transformer", func() { res := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843bCE061BA391' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("0")) Expect(res.TokenName).To(Equal("TrueUSD")) @@ -229,4 +245,123 @@ var _ = Describe("Transformer", func() { Expect(err).To(HaveOccurred()) }) }) + + Describe("Execute- against ENS registry contract", func() { + BeforeEach(func() { + blockRepository.CreateOrUpdateBlock(mocks.NewOwnerBlock1) + blockRepository.CreateOrUpdateBlock(mocks.NewOwnerBlock2) + }) + + It("Transforms watched contract data into custom repositories", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, nil) + + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + log := test_helpers.NewOwnerLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + + // We don't know vulcID, so compare individual fields instead of complete structures + Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb")) + Expect(log.Block).To(Equal(int64(6194635))) + Expect(log.Node).To(Equal("0x0000000000000000000000000000000000000000000000000000c02aaa39b223")) + Expect(log.Label).To(Equal("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391")) + Expect(log.Owner).To(Equal("0x000000000000000000000000000000000000Af21")) + }) + + It("Keeps track of contract-related hashes while transforming event data if they need to be used for later method polling", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + c, ok := t.Contracts[constants.EnsContractAddress] + Expect(ok).To(Equal(true)) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + Expect(len(c.EmittedHashes)).To(Equal(3)) + + b, ok := c.EmittedHashes[common.HexToHash("0x0000000000000000000000000000000000000000000000000000c02aaa39b223")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = c.EmittedHashes[common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + // Doesn't keep track of address since it wouldn't be used in calling the 'owner' method + _, ok = c.EmittedAddrs[common.HexToAddress("0x000000000000000000000000000000000000Af21")] + Expect(ok).To(Equal(false)) + }) + + It("Polls given methods using generated token holder address", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.Owner{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + + It("It does not perist events if they do not pass the emitted arg filter", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, nil) + t.SetEventArgs(constants.EnsContractAddress, []string{"fake_filter_value"}) + + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + log := test_helpers.LightNewOwnerLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + Expect(err).To(HaveOccurred()) + }) + + It("If a method arg filter is applied, only those arguments are used in polling", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + t.SetMethodArgs(constants.EnsContractAddress, []string{"0x0000000000000000000000000000000000000000000000000000c02aaa39b223"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.Owner{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x0000000000000000000000000000000000000000000000000000c02aaa39b223' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391' AND block = '6194636'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + }) }) diff --git a/pkg/omni/light/converter/converter.go b/pkg/omni/light/converter/converter.go index 9fd5b7c7..bf4495c7 100644 --- a/pkg/omni/light/converter/converter.go +++ b/pkg/omni/light/converter/converter.go @@ -68,7 +68,6 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in } strValues := make(map[string]string, len(values)) - seenBytes := make([]interface{}, 0, len(values)) seenAddrs := make([]interface{}, 0, len(values)) seenHashes := make([]interface{}, 0, len(values)) for fieldName, input := range values { @@ -92,7 +91,9 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in case []byte: b := input.([]byte) strValues[fieldName] = hexutil.Encode(b) - seenBytes = append(seenBytes, b) + if len(b) == 32 { + seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName])) + } case byte: b := input.(byte) strValues[fieldName] = string(b) @@ -123,9 +124,6 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in if c.ContractInfo.EmittedHashes != nil { c.ContractInfo.AddEmittedHash(seenHashes...) } - if c.ContractInfo.EmittedBytes != nil { - c.ContractInfo.AddEmittedBytes(seenBytes...) - } } } diff --git a/pkg/omni/light/converter/converter_test.go b/pkg/omni/light/converter/converter_test.go index 3ee988cd..374bb699 100644 --- a/pkg/omni/light/converter/converter_test.go +++ b/pkg/omni/light/converter/converter_test.go @@ -31,15 +31,13 @@ import ( var _ = Describe("Converter", func() { var con *contract.Contract - var wantedEvents = []string{"Transfer", "Mint"} + var tusdWantedEvents = []string{"Transfer", "Mint"} + var ensWantedEvents = []string{"NewOwner"} var err error - BeforeEach(func() { - con = test_helpers.SetupTusdContract(wantedEvents, []string{}) - }) - Describe("Update", func() { It("Updates contract info held by the converter", func() { + con = test_helpers.SetupTusdContract(tusdWantedEvents, []string{}) c := converter.NewConverter(con) Expect(c.ContractInfo).To(Equal(con)) @@ -51,6 +49,7 @@ var _ = Describe("Converter", func() { Describe("Convert", func() { It("Converts a watched event log to mapping of event input names to values", func() { + con = test_helpers.SetupTusdContract(tusdWantedEvents, []string{}) _, ok := con.Events["Approval"] Expect(ok).To(Equal(false)) @@ -76,7 +75,8 @@ var _ = Describe("Converter", func() { Expect(logs[1].Id).To(Equal(int64(232))) }) - It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() { + It("Keeps track of addresses it sees if they will be used for method polling", func() { + con = test_helpers.SetupTusdContract(tusdWantedEvents, []string{"balanceOf"}) event, ok := con.Events["Transfer"] Expect(ok).To(Equal(true)) @@ -100,6 +100,45 @@ var _ = Describe("Converter", func() { _, ok = con.EmittedAddrs[common.HexToAddress("0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP")] Expect(ok).To(Equal(false)) + + _, ok = con.EmittedHashes[common.HexToHash("0x000000000000000000000000c02aaa39b223helloa0e5c4f27ead9083c752553")] + Expect(ok).To(Equal(false)) + }) + + It("Keeps track of hashes it sees if they will be used for method polling", func() { + con = test_helpers.SetupENSContract(ensWantedEvents, []string{"owner"}) + event, ok := con.Events["NewOwner"] + Expect(ok).To(Equal(true)) + + c := converter.NewConverter(con) + _, err := c.Convert([]types.Log{mocks.MockNewOwnerLog1, mocks.MockNewOwnerLog2}, event, 232) + Expect(err).ToNot(HaveOccurred()) + Expect(len(con.EmittedHashes)).To(Equal(3)) + + b, ok := con.EmittedHashes[common.HexToHash("0x000000000000000000000000c02aaa39b223helloa0e5c4f27ead9083c752553")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = con.EmittedHashes[common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = con.EmittedHashes[common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba400")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + _, ok = con.EmittedHashes[common.HexToHash("0x9dd48thiscc444isc242510c0made03upa5975cac061dhashb843bce061ba400")] + Expect(ok).To(Equal(false)) + + _, ok = con.EmittedHashes[common.HexToAddress("0x")] + Expect(ok).To(Equal(false)) + + _, ok = con.EmittedHashes[""] + Expect(ok).To(Equal(false)) + + // Does not keep track of emitted addresses if the methods provided will not use them + _, ok = con.EmittedAddrs[common.HexToAddress("0x000000000000000000000000000000000000Af21")] + Expect(ok).To(Equal(false)) }) It("Fails with an empty contract", func() { diff --git a/pkg/omni/light/repository/header_repository.go b/pkg/omni/light/repository/header_repository.go index 8818003c..c8f23493 100644 --- a/pkg/omni/light/repository/header_repository.go +++ b/pkg/omni/light/repository/header_repository.go @@ -18,6 +18,7 @@ package repository import ( "database/sql" + "fmt" "github.com/hashicorp/golang-lru" @@ -30,7 +31,9 @@ const columnCacheSize = 1000 type HeaderRepository interface { AddCheckColumn(eventID string) error MarkHeaderChecked(headerID int64, eventID string) error + MarkHeadersChecked(headers []core.Header, ids []string) error MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error) + MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) CheckCache(key string) (interface{}, bool) } @@ -47,36 +50,66 @@ func NewHeaderRepository(db *postgres.DB) *headerRepository { } } -func (r *headerRepository) AddCheckColumn(eventID string) error { +func (r *headerRepository) AddCheckColumn(id string) error { // Check cache to see if column already exists before querying pg - _, ok := r.columns.Get(eventID) + _, ok := r.columns.Get(id) if ok { return nil } pgStr := "ALTER TABLE public.checked_headers ADD COLUMN IF NOT EXISTS " - pgStr = pgStr + eventID + " BOOLEAN NOT NULL DEFAULT FALSE" + pgStr = pgStr + id + " BOOLEAN NOT NULL DEFAULT FALSE" _, err := r.db.Exec(pgStr) if err != nil { return err } // Add column name to cache - r.columns.Add(eventID, true) + r.columns.Add(id, true) return nil } -func (r *headerRepository) MarkHeaderChecked(headerID int64, eventID string) error { - _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) +func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error { + _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`) VALUES ($1, $2) ON CONFLICT (header_id) DO - UPDATE SET `+eventID+` = $2`, headerID, true) + UPDATE SET `+id+` = $2`, headerID, true) return err } -func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error) { +func (r *headerRepository) MarkHeadersChecked(headers []core.Header, ids []string) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + + for _, header := range headers { + pgStr := "INSERT INTO public.checked_headers (header_id, " + for _, id := range ids { + pgStr += id + ", " + } + pgStr = pgStr[:len(pgStr)-2] + ") VALUES ($1, " + for i := 0; i < len(ids); i++ { + pgStr += "true, " + } + pgStr = pgStr[:len(pgStr)-2] + ") ON CONFLICT (header_id) DO UPDATE SET " + for _, id := range ids { + pgStr += fmt.Sprintf("%s = true, ", id) + } + pgStr = pgStr[:len(pgStr)-2] + _, err = tx.Exec(pgStr, header.Id) + if err != nil { + tx.Rollback() + return err + } + } + + return tx.Commit() +} + +func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, id string) ([]core.Header, error) { var result []core.Header var query string var err error @@ -84,7 +117,7 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock if endingBlockNumber == -1 { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR ` + eventID + ` IS FALSE) + WHERE (header_id ISNULL OR ` + id + ` IS FALSE) AND headers.block_number >= $1 AND headers.eth_node_fingerprint = $2 ORDER BY headers.block_number` @@ -92,7 +125,7 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock } else { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR ` + eventID + ` IS FALSE) + WHERE (header_id ISNULL OR ` + id + ` IS FALSE) AND headers.block_number >= $1 AND headers.block_number <= $2 AND headers.eth_node_fingerprint = $3 @@ -103,6 +136,40 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock return result, err } +func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) { + var result []core.Header + var query string + var err error + baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id IS NOT NULL` + for _, id := range eventIds { + baseQuery += ` AND ` + id + ` IS TRUE` + } + baseQuery += `) AND (` + for _, id := range methodIds { + baseQuery += id + ` IS FALSE AND ` + } + baseQuery = baseQuery[:len(baseQuery)-5] + `) ` + + if endingBlockNumber == -1 { + endStr := `AND headers.block_number >= $1 + AND headers.eth_node_fingerprint = $2 + ORDER BY headers.block_number` + query = baseQuery + endStr + err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID) + } else { + endStr := `AND headers.block_number >= $1 + AND headers.block_number <= $2 + AND headers.eth_node_fingerprint = $3 + ORDER BY headers.block_number` + query = baseQuery + endStr + err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID) + } + + return result, err +} + func (r *headerRepository) CheckCache(key string) (interface{}, bool) { return r.columns.Get(key) } diff --git a/pkg/omni/light/repository/header_repository_test.go b/pkg/omni/light/repository/header_repository_test.go index f5f4ece3..4c211d6f 100644 --- a/pkg/omni/light/repository/header_repository_test.go +++ b/pkg/omni/light/repository/header_repository_test.go @@ -18,6 +18,7 @@ package repository_test import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/core" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -31,14 +32,14 @@ import ( var _ = Describe("Repository", func() { var db *postgres.DB - var r repository.HeaderRepository - var headerRepository repositories.HeaderRepository + var omniHeaderRepo repository.HeaderRepository // omni/light header repository + var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository var eventID, query string BeforeEach(func() { db, _ = test_helpers.SetupDBandBC() - r = repository.NewHeaderRepository(db) - headerRepository = repositories.NewHeaderRepository(db) + omniHeaderRepo = repository.NewHeaderRepository(db) + coreHeaderRepo = repositories.NewHeaderRepository(db) eventID = "eventName_contractAddr" }) @@ -52,7 +53,7 @@ var _ = Describe("Repository", func() { _, err := db.Exec(query) Expect(err).To(HaveOccurred()) - err = r.AddCheckColumn(eventID) + err = omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) _, err = db.Exec(query) @@ -60,13 +61,13 @@ var _ = Describe("Repository", func() { }) It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() { - _, ok := r.CheckCache(eventID) + _, ok := omniHeaderRepo.CheckCache(eventID) Expect(ok).To(Equal(false)) - err := r.AddCheckColumn(eventID) + err := omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) - v, ok := r.CheckCache(eventID) + v, ok := omniHeaderRepo.CheckCache(eventID) Expect(ok).To(Equal(true)) Expect(v).To(Equal(true)) }) @@ -74,68 +75,154 @@ var _ = Describe("Repository", func() { Describe("MissingHeaders", func() { It("Returns all unchecked headers for the given eventID", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) - err := r.AddCheckColumn(eventID) + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) - It("Fails if eventID does not yet exist in check_headers table", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) - err := r.AddCheckColumn(eventID) + It("Returns unchecked headers in ascending order", func() { + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) - _, err = r.MissingHeaders(6194630, 6194635, "notEventId") + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + h1 := missingHeaders[0] + h2 := missingHeaders[1] + h3 := missingHeaders[2] + Expect(h1.BlockNumber).To(Equal(int64(6194632))) + Expect(h2.BlockNumber).To(Equal(int64(6194633))) + Expect(h3.BlockNumber).To(Equal(int64(6194634))) + }) + + It("Fails if eventID does not yet exist in check_headers table", func() { + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + _, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId") Expect(err).To(HaveOccurred()) }) }) Describe("MarkHeaderChecked", func() { It("Marks the header checked for the given eventID", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) - err := r.AddCheckColumn(eventID) + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) headerID := missingHeaders[0].Id - err = r.MarkHeaderChecked(headerID, eventID) + err = omniHeaderRepo.MarkHeaderChecked(headerID, eventID) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(2)) }) It("Fails if eventID does not yet exist in check_headers table", func() { - headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) - headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) - err := r.AddCheckColumn(eventID) + addHeaders(coreHeaderRepo) + err := omniHeaderRepo.AddCheckColumn(eventID) Expect(err).ToNot(HaveOccurred()) - missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) headerID := missingHeaders[0].Id - err = r.MarkHeaderChecked(headerID, "notEventId") + err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId") Expect(err).To(HaveOccurred()) - missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) Expect(err).ToNot(HaveOccurred()) Expect(len(missingHeaders)).To(Equal(3)) }) }) + + Describe("MarkHeadersChecked", func() { + It("Marks the headers checked for all provided column ids", func() { + addHeaders(coreHeaderRepo) + methodIDs := []string{ + "methodName_contractAddr", + "methodName_contractAddr2", + "methodName_contractAddr3", + } + + var missingHeaders []core.Header + for _, id := range methodIDs { + err := omniHeaderRepo.AddCheckColumn(id) + Expect(err).ToNot(HaveOccurred()) + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + } + + err := omniHeaderRepo.MarkHeadersChecked(missingHeaders, methodIDs) + Expect(err).ToNot(HaveOccurred()) + for _, id := range methodIDs { + missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(0)) + } + }) + }) + + Describe("MissingMethodsCheckedEventsIntersection", func() { + It("Returns headers that have been checked for all the provided events but have not been checked for all the provided methods", func() { + addHeaders(coreHeaderRepo) + eventIDs := []string{ + eventID, + "eventName_contractAddr2", + "eventName_contractAddr3", + } + methodIDs := []string{ + "methodName_contractAddr", + "methodName_contractAddr2", + "methodName_contractAddr3", + } + for i, id := range eventIDs { + err := omniHeaderRepo.AddCheckColumn(id) + Expect(err).ToNot(HaveOccurred()) + err = omniHeaderRepo.AddCheckColumn(methodIDs[i]) + Expect(err).ToNot(HaveOccurred()) + } + + missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + headerID := missingHeaders[0].Id + headerID2 := missingHeaders[1].Id + for i, id := range eventIDs { + err = omniHeaderRepo.MarkHeaderChecked(headerID, id) + Expect(err).ToNot(HaveOccurred()) + err = omniHeaderRepo.MarkHeaderChecked(headerID2, id) + Expect(err).ToNot(HaveOccurred()) + err = omniHeaderRepo.MarkHeaderChecked(headerID, methodIDs[i]) + Expect(err).ToNot(HaveOccurred()) + } + + intersectionHeaders, err := omniHeaderRepo.MissingMethodsCheckedEventsIntersection(6194630, 6194635, methodIDs, eventIDs) + Expect(err).ToNot(HaveOccurred()) + Expect(len(intersectionHeaders)).To(Equal(1)) + Expect(intersectionHeaders[0].Id).To(Equal(headerID2)) + + }) + }) }) + +func addHeaders(coreHeaderRepo repositories.HeaderRepository) { + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader1) + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader2) + coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader3) +} diff --git a/pkg/omni/light/transformer/transformer.go b/pkg/omni/light/transformer/transformer.go index 161a41c7..9ad174a7 100644 --- a/pkg/omni/light/transformer/transformer.go +++ b/pkg/omni/light/transformer/transformer.go @@ -70,10 +70,20 @@ type transformer struct { EventArgs map[string][]string MethodArgs map[string][]string - // Whether or not to create a list of token holder addresses for the contract in postgres + // Whether or not to create a list of emitted address or hashes for the contract in postgres CreateAddrList map[string]bool + CreateHashList map[string]bool + + // Method piping on/off for a contract + Piping map[string]bool } +// Order-of-operations: +// 1. Create new transformer +// 2. Load contract addresses and their parameters +// 3. Init +// 3. Execute + // Transformer takes in config for blockchain, database, and network id func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transformer { @@ -92,6 +102,8 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf EventArgs: map[string][]string{}, MethodArgs: map[string][]string{}, CreateAddrList: map[string]bool{}, + CreateHashList: map[string]bool{}, + Piping: map[string]bool{}, } } @@ -154,6 +166,8 @@ func (tr *transformer) Init() error { FilterArgs: eventArgs, MethodArgs: methodArgs, CreateAddrList: tr.CreateAddrList[contractAddr], + CreateHashList: tr.CreateHashList[contractAddr], + Piping: tr.Piping[contractAddr], }.Init() } @@ -164,24 +178,31 @@ func (tr *transformer) Execute() error { if len(tr.Contracts) == 0 { return errors.New("error: transformer has no initialized contracts") } - // Iterate through all internal contracts + // Iterate through all initialized contracts for _, con := range tr.Contracts { // Update converter with current contract - tr.Update(con) - + tr.Converter.Update(con) + // This is so that same header slice is retrieved for each event iteration + last, err := tr.BlockRetriever.RetrieveMostRecentBlock() + if err != nil { + return err + } // Iterate through events + eventIds := make([]string, 0, len(con.Events)) for _, event := range con.Events { // Filter using the event signature topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}} // Generate eventID and use it to create a checked_header column if one does not already exist eventId := strings.ToLower(event.Name + "_" + con.Address) - if err := tr.AddCheckColumn(eventId); err != nil { + eventIds = append(eventIds, eventId) + err := tr.HeaderRepository.AddCheckColumn(eventId) + if err != nil { return err } // Find unchecked headers for this event - missingHeaders, err := tr.MissingHeaders(con.StartingBlock, con.LastBlock, eventId) + missingHeaders, err := tr.HeaderRepository.MissingHeaders(con.StartingBlock, last, eventId) if err != nil { return err } @@ -189,14 +210,14 @@ func (tr *transformer) Execute() error { // Iterate over headers for _, header := range missingHeaders { // And fetch event logs using the header, contract address, and topics filter - logs, err := tr.FetchLogs([]string{con.Address}, topics, header) + logs, err := tr.Fetcher.FetchLogs([]string{con.Address}, topics, header) if err != nil { return err } // Mark the header checked for this eventID and continue to next iteration if no logs are found if len(logs) < 1 { - err = tr.MarkHeaderChecked(header.Id, eventId) + err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) if err != nil { return err } @@ -204,7 +225,7 @@ func (tr *transformer) Execute() error { } // Convert logs into custom type - convertedLogs, err := tr.Convert(logs, event, header.Id) + convertedLogs, err := tr.Converter.Convert(logs, event, header.Id) if err != nil { return err } @@ -213,18 +234,46 @@ func (tr *transformer) Execute() error { } // If logs aren't empty, persist them - err = tr.PersistLogs(convertedLogs, event, con.Address, con.Name) + err = tr.EventRepository.PersistLogs(convertedLogs, event, con.Address, con.Name) if err != nil { return err } - - // Poll contract methods at this header's block height - // with arguments collected from event logs up to this point - if err := tr.PollContractAt(*con, header.BlockNumber); err != nil { - return err - } } } + + if len(con.Methods) == 0 { + continue + } + + // Create checked_headers columns for each method id + methodIds := make([]string, 0, len(con.Methods)) + for _, m := range con.Methods { + methodId := strings.ToLower(m.Name + "_" + con.Address) + err = tr.HeaderRepository.AddCheckColumn(methodId) + if err != nil { + return err + } + methodIds = append(methodIds, methodId) + } + + // Retrieve headers that have been checked for all events but haven not been checked for the methods + missingHeaders, err := tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, last, methodIds, eventIds) + if err != nil { + return err + } + // Poll over the missing headers + for _, header := range missingHeaders { + err = tr.Poller.PollContractAt(*con, header.BlockNumber) + if err != nil { + return err + } + } + // Mark those headers checked for the methods + err = tr.HeaderRepository.MarkHeadersChecked(missingHeaders, methodIds) + if err != nil { + return err + } + } return nil @@ -255,7 +304,17 @@ func (tr *transformer) SetRange(contractAddr string, rng [2]int64) { tr.ContractRanges[contractAddr] = rng } -// Used to set the block range to watch for a given address +// Used to set whether or not to persist an account address list func (tr *transformer) SetCreateAddrList(contractAddr string, on bool) { tr.CreateAddrList[contractAddr] = on } + +// Used to set whether or not to persist an hash list +func (tr *transformer) SetCreateHashList(contractAddr string, on bool) { + tr.CreateHashList[contractAddr] = on +} + +// Used to turn method piping on for a contract +func (tr *transformer) SetPiping(contractAddr string, on bool) { + tr.Piping[contractAddr] = on +} diff --git a/pkg/omni/light/transformer/transformer_test.go b/pkg/omni/light/transformer/transformer_test.go index 7df83858..6372e6e8 100644 --- a/pkg/omni/light/transformer/transformer_test.go +++ b/pkg/omni/light/transformer/transformer_test.go @@ -93,6 +93,22 @@ var _ = Describe("Transformer", func() { }) }) + Describe("SetCreateAddrList", func() { + It("Sets the block range that the contract should be watched within", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetCreateAddrList(constants.TusdContractAddress, true) + Expect(t.CreateAddrList[constants.TusdContractAddress]).To(Equal(true)) + }) + }) + + Describe("SetCreateHashList", func() { + It("Sets the block range that the contract should be watched within", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetCreateHashList(constants.TusdContractAddress, true) + Expect(t.CreateHashList[constants.TusdContractAddress]).To(Equal(true)) + }) + }) + Describe("Init", func() { It("Initializes transformer's contract objects", func() { headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) @@ -211,18 +227,6 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) res := test_helpers.BalanceOf{} - - c, ok := t.Contracts[constants.TusdContractAddress] - Expect(ok).To(Equal(true)) - - b, ok := c.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - - b, ok = c.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("55849938025000000000000")) @@ -241,4 +245,130 @@ var _ = Describe("Transformer", func() { Expect(err).To(HaveOccurred()) }) }) + + Describe("Execute- against ENS registry contract", func() { + BeforeEach(func() { + header1, err := blockChain.GetHeaderByNumber(6885695) + Expect(err).ToNot(HaveOccurred()) + header2, err := blockChain.GetHeaderByNumber(6885696) + Expect(err).ToNot(HaveOccurred()) + header3, err := blockChain.GetHeaderByNumber(6885697) + Expect(err).ToNot(HaveOccurred()) + headerRepository.CreateOrUpdateHeader(header1) + headerID, err = headerRepository.CreateOrUpdateHeader(header2) + Expect(err).ToNot(HaveOccurred()) + headerRepository.CreateOrUpdateHeader(header3) + }) + + It("Transforms watched contract data into custom repositories", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, nil) + + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + log := test_helpers.LightNewOwnerLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + + // We don't know vulcID, so compare individual fields instead of complete structures + Expect(log.HeaderID).To(Equal(headerID)) + Expect(log.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")) + Expect(log.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047")) + Expect(log.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) + }) + + It("Keeps track of contract-related hashes while transforming event data if they need to be used for later method polling", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + c, ok := t.Contracts[constants.EnsContractAddress] + Expect(ok).To(Equal(true)) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + Expect(len(c.EmittedHashes)).To(Equal(2)) + + b, ok := c.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = c.EmittedHashes[common.HexToHash("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047")] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + // Doesn't keep track of address since it wouldn't be used in calling the 'owner' method + _, ok = c.EmittedAddrs[common.HexToAddress("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")] + Expect(ok).To(Equal(false)) + }) + + It("Polls given methods using generated token holder address", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.Owner{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x0000000000000000000000000000000000000000")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ceMADEUPaaf4HASHc186badTHIS288IS625bFAKE' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + + It("It does not perist events if they do not pass the emitted arg filter", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, nil) + t.SetEventArgs(constants.EnsContractAddress, []string{"fake_filter_value"}) + + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + log := test_helpers.LightNewOwnerLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", constants.EnsContractAddress)).StructScan(&log) + Expect(err).To(HaveOccurred()) + }) + + It("If a method arg filter is applied, only those arguments are used in polling", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.EnsContractAddress, []string{"NewOwner"}) + t.SetMethods(constants.EnsContractAddress, []string{"owner"}) + t.SetMethodArgs(constants.EnsContractAddress, []string{"0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.Owner{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) + Expect(res.TokenName).To(Equal("")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047' AND block = '6885696'", constants.EnsContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + }) }) diff --git a/pkg/omni/shared/constants/constants.go b/pkg/omni/shared/constants/constants.go index 553cd1ce..97ed2037 100644 --- a/pkg/omni/shared/constants/constants.go +++ b/pkg/omni/shared/constants/constants.go @@ -31,6 +31,7 @@ const ( ApprovalEvent Event = 1 BurnEvent Event = 2 MintEvent Event = 3 + NewOwnerEvent Event = 4 ) func (e Event) String() string { @@ -39,9 +40,10 @@ func (e Event) String() string { "Approval", "Burn", "Mint", + "NewOwner", } - if e < TransferEvent || e > MintEvent { + if e < TransferEvent || e > NewOwnerEvent { return "Unknown" } @@ -54,9 +56,10 @@ func (e Event) Signature() string { helpers.GenerateSignature("Approval(address,address,uint256)"), helpers.GenerateSignature("Burn(address,uint256)"), helpers.GenerateSignature("Mint(address,uint256)"), + helpers.GenerateSignature("NewOwner(bytes32,bytes32,address)"), } - if e < TransferEvent || e > MintEvent { + if e < TransferEvent || e > NewOwnerEvent { return "Unknown" } diff --git a/pkg/omni/shared/contract/contract.go b/pkg/omni/shared/contract/contract.go index a7f66936..c13955e2 100644 --- a/pkg/omni/shared/contract/contract.go +++ b/pkg/omni/shared/contract/contract.go @@ -18,7 +18,6 @@ package contract import ( "errors" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -44,23 +43,22 @@ type Contract struct { FilterArgs map[string]bool // User-input list of values to filter event logs for MethodArgs map[string]bool // User-input list of values to limit method polling to EmittedAddrs map[interface{}]bool // List of all unique addresses collected from converted event logs - EmittedBytes map[interface{}]bool // List of all unique bytes collected from converted event logs EmittedHashes map[interface{}]bool // List of all unique hashes collected from converted event logs CreateAddrList bool // Whether or not to persist address list to postgres + CreateHashList bool // Whether or not to persist hash list to postgres + Piping bool // Whether or not to pipe method results forward as arguments to subsequent methods } // If we will be calling methods that use addr, hash, or byte arrays -// as arguments then we initialize map to hold these types of values +// as arguments then we initialize maps to hold these types of values func (c Contract) Init() *Contract { for _, method := range c.Methods { for _, arg := range method.Args { switch arg.Type.T { case abi.AddressTy: c.EmittedAddrs = map[interface{}]bool{} - case abi.HashTy: + case abi.HashTy, abi.BytesTy, abi.FixedBytesTy: c.EmittedHashes = map[interface{}]bool{} - case abi.BytesTy, abi.FixedBytesTy: - c.EmittedBytes = map[interface{}]bool{} default: } } @@ -160,15 +158,6 @@ func (c *Contract) AddEmittedHash(hashes ...interface{}) { } } -// Add event emitted bytes to our list if it passes filter and method polling is on -func (c *Contract) AddEmittedBytes(byteArrays ...interface{}) { - for _, bytes := range byteArrays { - if c.WantedMethodArg(bytes) && c.Methods != nil { - c.EmittedBytes[bytes] = true - } - } -} - func StringifyArg(arg interface{}) (str string) { switch arg.(type) { case string: diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go index 666ac612..48aef62b 100644 --- a/pkg/omni/shared/helpers/test_helpers/database.go +++ b/pkg/omni/shared/helpers/test_helpers/database.go @@ -47,6 +47,17 @@ type TransferLog struct { Value string `db:"value_"` } +type NewOwnerLog struct { + Id int64 `db:"id"` + VulvanizeLogId int64 `db:"vulcanize_log_id"` + TokenName string `db:"token_name"` + Block int64 `db:"block"` + Tx string `db:"tx"` + Node string `db:"node_"` + Label string `db:"label_"` + Owner string `db:"owner_"` +} + type LightTransferLog struct { Id int64 `db:"id"` HeaderID int64 `db:"header_id"` @@ -59,6 +70,18 @@ type LightTransferLog struct { RawLog []byte `db:"raw_log"` } +type LightNewOwnerLog struct { + Id int64 `db:"id"` + HeaderID int64 `db:"header_id"` + TokenName string `db:"token_name"` + LogIndex int64 `db:"log_idx"` + TxIndex int64 `db:"tx_idx"` + Node string `db:"node_"` + Label string `db:"label_"` + Owner string `db:"owner_"` + RawLog []byte `db:"raw_log"` +} + type BalanceOf struct { Id int64 `db:"id"` TokenName string `db:"token_name"` @@ -67,6 +90,14 @@ type BalanceOf struct { Balance string `db:"returned"` } +type Owner struct { + Id int64 `db:"id"` + TokenName string `db:"token_name"` + Block int64 `db:"block"` + Node string `db:"node_"` + Address string `db:"returned"` +} + func SetupBC() core.BlockChain { infuraIPC := "https://mainnet.infura.io/v3/b09888c1113640cc9ab42750ce750c05" rawRpcClient, err := rpc.Dial(infuraIPC) @@ -135,7 +166,7 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract err := p.Parse() Expect(err).ToNot(HaveOccurred()) - return &contract.Contract{ + return contract.Contract{ Name: "TrueUSD", Address: constants.TusdContractAddress, Abi: p.Abi(), @@ -143,13 +174,57 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract StartingBlock: 6194634, LastBlock: 6507323, Events: p.GetEvents(wantedEvents), - Methods: p.GetMethods(wantedMethods), + Methods: p.GetSelectMethods(wantedMethods), MethodArgs: map[string]bool{}, FilterArgs: map[string]bool{}, - EmittedAddrs: map[interface{}]bool{}, - EmittedBytes: map[interface{}]bool{}, - EmittedHashes: map[interface{}]bool{}, - } + }.Init() +} + +func SetupENSRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string) (*postgres.DB, *contract.Contract) { + db, err := postgres.NewDB(config.Database{ + Hostname: "localhost", + Name: "vulcanize_private", + Port: 5432, + }, core.Node{}) + Expect(err).NotTo(HaveOccurred()) + + receiptRepository := repositories.ReceiptRepository{DB: db} + logRepository := repositories.LogRepository{DB: db} + blockRepository := *repositories.NewBlockRepository(db) + + blockNumber := rand.Int63() + blockId := CreateBlock(blockNumber, blockRepository) + + receipts := []core.Receipt{{Logs: []core.Log{{}}}} + + err = receiptRepository.CreateReceiptsAndLogs(blockId, receipts) + Expect(err).ToNot(HaveOccurred()) + + err = logRepository.Get(vulcanizeLogId, `SELECT id FROM logs`) + Expect(err).ToNot(HaveOccurred()) + + info := SetupENSContract(wantedEvents, wantedMethods) + + return db, info +} + +func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract { + p := mocks.NewParser(constants.ENSAbiString) + err := p.Parse() + Expect(err).ToNot(HaveOccurred()) + + return contract.Contract{ + Name: "ENS-Registry", + Address: constants.EnsContractAddress, + Abi: p.Abi(), + ParsedAbi: p.ParsedAbi(), + StartingBlock: 6194634, + LastBlock: 6507323, + Events: p.GetEvents(wantedEvents), + Methods: p.GetSelectMethods(wantedMethods), + MethodArgs: map[string]bool{}, + FilterArgs: map[string]bool{}, + }.Init() } func TearDown(db *postgres.DB) { @@ -168,6 +243,9 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`DELETE FROM logs`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM log_filters`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM transactions`) Expect(err).NotTo(HaveOccurred()) @@ -186,6 +264,12 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`DROP SCHEMA IF EXISTS light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x314159265dd8dbb310642f98f50c066173c1259b CASCADE`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`DROP SCHEMA IF EXISTS light_0x314159265dd8dbb310642f98f50c066173c1259b CASCADE`) + Expect(err).NotTo(HaveOccurred()) + err = tx.Commit() Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/omni/shared/helpers/test_helpers/mocks/entities.go b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go index be092771..4d865382 100644 --- a/pkg/omni/shared/helpers/test_helpers/mocks/entities.go +++ b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go @@ -78,6 +78,56 @@ var TransferBlock2 = core.Block{ }}, } +var NewOwnerBlock1 = core.Block{ + Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ppp", + Number: 6194635, + Transactions: []core.Transaction{{ + Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb", + Receipt: core.Receipt{ + TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb", + ContractAddress: "", + Logs: []core.Log{{ + BlockNumber: 6194635, + TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb", + Address: constants.EnsContractAddress, + Topics: core.Topics{ + constants.NewOwnerEvent.Signature(), + "0x0000000000000000000000000000000000000000000000000000c02aaa39b223", + "0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391", + "", + }, + Index: 1, + Data: "0x000000000000000000000000000000000000000000000000000000000000af21", + }}, + }, + }}, +} + +var NewOwnerBlock2 = core.Block{ + Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ggg", + Number: 6194636, + Transactions: []core.Transaction{{ + Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654lll", + Receipt: core.Receipt{ + TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654lll", + ContractAddress: "", + Logs: []core.Log{{ + BlockNumber: 6194636, + TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654lll", + Address: constants.EnsContractAddress, + Topics: core.Topics{ + constants.NewOwnerEvent.Signature(), + "0x0000000000000000000000000000000000000000000000000000c02aaa39b223", + "0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba400", + "", + }, + Index: 1, + Data: "0x000000000000000000000000000000000000000000000000000000000000af21", + }}, + }, + }}, +} + var ExpectedTransferFilter = filters.LogFilter{ Name: "Transfer", Address: constants.TusdContractAddress, @@ -150,7 +200,7 @@ var MockTransferLog2 = types.Log{ Address: common.HexToAddress(constants.TusdContractAddress), BlockNumber: 5488077, TxIndex: 2, - TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"), + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546df"), Topics: []common.Hash{ common.HexToHash(constants.TransferEvent.Signature()), common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"), @@ -162,7 +212,7 @@ var MockTransferLog2 = types.Log{ var MockMintLog = types.Log{ Index: 10, Address: common.HexToAddress(constants.TusdContractAddress), - BlockNumber: 548808, + BlockNumber: 5488080, TxIndex: 50, TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6minty"), Topics: []common.Hash{ @@ -171,3 +221,31 @@ var MockMintLog = types.Log{ }, Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"), } + +var MockNewOwnerLog1 = types.Log{ + Index: 1, + Address: common.HexToAddress(constants.EnsContractAddress), + BlockNumber: 5488076, + TxIndex: 110, + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"), + Topics: []common.Hash{ + common.HexToHash(constants.NewOwnerEvent.Signature()), + common.HexToHash("0x000000000000000000000000c02aaa39b223helloa0e5c4f27ead9083c752553"), + common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000af21"), +} + +var MockNewOwnerLog2 = types.Log{ + Index: 3, + Address: common.HexToAddress(constants.EnsContractAddress), + BlockNumber: 5488077, + TxIndex: 2, + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546df"), + Topics: []common.Hash{ + common.HexToHash(constants.NewOwnerEvent.Signature()), + common.HexToHash("0x000000000000000000000000c02aaa39b223helloa0e5c4f27ead9083c752553"), + common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba400"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000af21"), +} diff --git a/pkg/omni/shared/helpers/test_helpers/mocks/parser.go b/pkg/omni/shared/helpers/test_helpers/mocks/parser.go index 6decb484..360cc8eb 100644 --- a/pkg/omni/shared/helpers/test_helpers/mocks/parser.go +++ b/pkg/omni/shared/helpers/test_helpers/mocks/parser.go @@ -56,25 +56,18 @@ func (p *parser) Parse() error { } // Returns wanted methods, if they meet the criteria, as map of types.Methods -// Only returns specified methods -func (p *parser) GetMethods(wanted []string) map[string]types.Method { +// Empty wanted array => all methods that fit are returned +// Nil wanted array => no events are returned +func (p *parser) GetSelectMethods(wanted []string) map[string]types.Method { addrMethods := map[string]types.Method{} + if wanted == nil { + return nil + } for _, m := range p.parsedAbi.Methods { - // Only return methods that have less than 3 inputs, 1 output, and wanted - if len(m.Inputs) < 3 && len(m.Outputs) == 1 && stringInSlice(wanted, m.Name) { - addrsOnly := true - for _, input := range m.Inputs { - if input.Type.T != abi.AddressTy { - addrsOnly = false - } - } - - // Only return methods if inputs are all of type address and output is of the accepted types - if addrsOnly && wantType(m.Outputs[0]) { - method := types.NewMethod(m) - addrMethods[method.Name] = method - } + if okInputTypes(m, wanted) { + wantedMethod := types.NewMethod(m) + addrMethods[wantedMethod.Name] = wantedMethod } } @@ -116,3 +109,46 @@ func stringInSlice(list []string, s string) bool { return false } + +func okInputTypes(m abi.Method, wanted []string) bool { + // Only return method if it has less than 3 arguments, a single output value, and it is a method we want or we want all methods (empty 'wanted' slice) + if len(m.Inputs) < 3 && len(m.Outputs) == 1 && (len(wanted) == 0 || stringInSlice(wanted, m.Name)) { + // Only return methods if inputs are all of accepted types and output is of the accepted types + if !okReturnType(m.Outputs[0]) { + return false + } + for _, input := range m.Inputs { + switch input.Type.T { + case abi.AddressTy, abi.HashTy, abi.BytesTy, abi.FixedBytesTy: + default: + return false + } + } + + return true + } + + return false +} + +func okReturnType(arg abi.Argument) bool { + wantedTypes := []byte{ + abi.UintTy, + abi.IntTy, + abi.BoolTy, + abi.StringTy, + abi.AddressTy, + abi.HashTy, + abi.BytesTy, + abi.FixedBytesTy, + abi.FixedPointTy, + } + + for _, ty := range wantedTypes { + if arg.Type.T == ty { + return true + } + } + + return false +} diff --git a/pkg/omni/shared/parser/parser_test.go b/pkg/omni/shared/parser/parser_test.go index 55af1973..cbe42671 100644 --- a/pkg/omni/shared/parser/parser_test.go +++ b/pkg/omni/shared/parser/parser_test.go @@ -47,7 +47,7 @@ var _ = Describe("Parser", func() { Expect(err).ToNot(HaveOccurred()) Expect(parsedAbi).To(Equal(expectedAbi)) - methods := mp.GetMethods([]string{"balanceOf"}) + methods := mp.GetSelectMethods([]string{"balanceOf"}) _, ok := methods["totalSupply"] Expect(ok).To(Equal(false)) m, ok := methods["balanceOf"] diff --git a/pkg/omni/shared/poller/poller.go b/pkg/omni/shared/poller/poller.go index 7e5d43a9..57322f0c 100644 --- a/pkg/omni/shared/poller/poller.go +++ b/pkg/omni/shared/poller/poller.go @@ -129,9 +129,7 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error { // the correct argument set to iterate over var args map[interface{}]bool switch m.Args[0].Type.T { - case abi.FixedBytesTy, abi.BytesTy: - args = p.contract.EmittedBytes - case abi.HashTy: + case abi.HashTy, abi.FixedBytesTy, abi.BytesTy: args = p.contract.EmittedHashes case abi.AddressTy: args = p.contract.EmittedAddrs @@ -182,9 +180,7 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error { // the correct argument sets to iterate over var firstArgs map[interface{}]bool switch m.Args[0].Type.T { - case abi.FixedBytesTy, abi.BytesTy: - firstArgs = p.contract.EmittedBytes - case abi.HashTy: + case abi.HashTy, abi.FixedBytesTy, abi.BytesTy: firstArgs = p.contract.EmittedHashes case abi.AddressTy: firstArgs = p.contract.EmittedAddrs @@ -195,9 +191,7 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error { var secondArgs map[interface{}]bool switch m.Args[1].Type.T { - case abi.FixedBytesTy, abi.BytesTy: - secondArgs = p.contract.EmittedBytes - case abi.HashTy: + case abi.HashTy, abi.FixedBytesTy, abi.BytesTy: secondArgs = p.contract.EmittedHashes case abi.AddressTy: secondArgs = p.contract.EmittedAddrs diff --git a/pkg/omni/shared/poller/poller_test.go b/pkg/omni/shared/poller/poller_test.go index 438bbae8..1975e5db 100644 --- a/pkg/omni/shared/poller/poller_test.go +++ b/pkg/omni/shared/poller/poller_test.go @@ -83,6 +83,28 @@ var _ = Describe("Poller", func() { Expect(scanStruct.TokenName).To(Equal("TrueUSD")) }) + It("Polls specified contract methods using contract's hash list", func() { + con = test_helpers.SetupENSContract(nil, []string{"owner"}) + Expect(con.Abi).To(Equal(constants.ENSAbiString)) + Expect(len(con.Methods)).To(Equal(1)) + con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86")) + + err := p.PollContractAt(*con, 6885877) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.Owner{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86' AND block = '6885877'", constants.EnsContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Address).To(Equal("0x546aA2EaE2514494EeaDb7bbb35243348983C59d")) + Expect(scanStruct.TokenName).To(Equal("ENS-Registry")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885877'", constants.EnsContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) + Expect(scanStruct.TokenName).To(Equal("ENS-Registry")) + }) + It("Does not poll and persist any methods if none are specified", func() { con = test_helpers.SetupTusdContract(nil, nil) Expect(con.Abi).To(Equal(constants.TusdAbiString)) @@ -109,4 +131,93 @@ var _ = Describe("Poller", func() { }) }) }) + + Describe("Light sync mode", func() { + BeforeEach(func() { + db, bc = test_helpers.SetupDBandBC() + p = poller.NewPoller(bc, db, types.LightSync) + }) + + Describe("PollContract", func() { + It("Polls specified contract methods using contract's token holder address list", func() { + con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) + Expect(con.Abi).To(Equal(constants.TusdAbiString)) + con.StartingBlock = 6707322 + con.LastBlock = 6707323 + con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) + + err := p.PollContract(*con) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + }) + + It("Polls specified contract methods using contract's hash list", func() { + con = test_helpers.SetupENSContract(nil, []string{"owner"}) + Expect(con.Abi).To(Equal(constants.ENSAbiString)) + Expect(len(con.Methods)).To(Equal(1)) + con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86")) + + err := p.PollContractAt(*con, 6885877) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.Owner{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86' AND block = '6885877'", constants.EnsContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Address).To(Equal("0x546aA2EaE2514494EeaDb7bbb35243348983C59d")) + Expect(scanStruct.TokenName).To(Equal("ENS-Registry")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.owner_method WHERE node_ = '0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae' AND block = '6885877'", constants.EnsContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Address).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")) + Expect(scanStruct.TokenName).To(Equal("ENS-Registry")) + }) + + It("Does not poll and persist any methods if none are specified", func() { + con = test_helpers.SetupTusdContract(nil, nil) + Expect(con.Abi).To(Equal(constants.TusdAbiString)) + con.StartingBlock = 6707322 + con.LastBlock = 6707323 + con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) + + err := p.PollContract(*con) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("PollMethod", func() { + It("Polls a single contract method", func() { + var name = new(string) + err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) + Expect(err).ToNot(HaveOccurred()) + Expect(*name).To(Equal("TrueUSD")) + }) + }) + }) }) diff --git a/pkg/omni/shared/repository/method_repository.go b/pkg/omni/shared/repository/method_repository.go index cace1204..4231404b 100644 --- a/pkg/omni/shared/repository/method_repository.go +++ b/pkg/omni/shared/repository/method_repository.go @@ -112,7 +112,6 @@ func (r *methodRepository) persistResults(results []types.Result, methodInfo typ // Add this query to the transaction _, err = tx.Exec(pgStr, data...) if err != nil { - println("howdy") tx.Rollback() return err } diff --git a/pkg/omni/shared/transformer/interface.go b/pkg/omni/shared/transformer/interface.go index c69f1046..64e5d03e 100644 --- a/pkg/omni/shared/transformer/interface.go +++ b/pkg/omni/shared/transformer/interface.go @@ -20,10 +20,13 @@ package transformer // data for any contract and persists it to custom postgres tables in vDB type Transformer interface { SetEvents(contractAddr string, filterSet []string) - SetEventAddrs(contractAddr string, filterSet []string) + SetEventArgs(contractAddr string, filterSet []string) SetMethods(contractAddr string, filterSet []string) - SetMethodAddrs(contractAddr string, filterSet []string) - SetRange(contractAddr string, rng []int64) + SetMethodArgs(contractAddr string, filterSet []string) + SetRange(contractAddr string, rng [2]int64) + SetCreateAddrList(contractAddr string, on bool) + SetCreateHashList(contractAddr string, on bool) + SetPiping(contractAddr string, on bool) Init() error Execute() error }