diff --git a/.travis.yml b/.travis.yml index 237dc152..1faa79d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,28 +5,25 @@ go: services: - postgresql addons: - postgresql: "9.6" - + postgresql: '9.6' go_import_path: github.com/vulcanize/vulcanizedb - before_install: - # ginkgo golint dep goose - make installtools - bash ./scripts/install-postgres-10.sh - curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - - echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list - sudo apt-get update && sudo apt-get install yarn - before_script: - sudo -u postgres createdb vulcanize_private - make migrate NAME=vulcanize_private - cd postgraphile && yarn - script: - yarn test - cd ../ - make test - make integrationtest - notifications: email: false +env: + matrix: + secure: hz6YPkTm59QhOQ2+05K+AWHw0wOoPjz9wqfUKZcuUi+ICdcuClXMt+hVpUDmwrGOCp8B5y8hyxr/iq7P+3MLwz1/2JYxc9S9MT1O0WXU8ZuWZR0LI1e2ZhZCF3E/JWq6c4atxFIEhcv7roBUkbUcRA8cpRdZmEmpSM8JyP0z76CezIG/HeyUdVW34DLjTBNB0TJdOZyfOh0aCvzgXR/kfKaSYNBhJY7j2UK7x0qK0UlQ/n7RHCrtjWoNWpuwl9bw1F5plMHOD9bq0oDG6gs1SFBaybfEMN71Hp0QxhD/u+1tVuHfGooYhzVgxStPSCpSkgQ7vgSZI766ErqPc3B6Wv9K+s5exPLgCykEiLorW6qI8A+mdiPIiIzLBRMcbF/kCo7gFh0kDIYbSTjS5COfjNw/fKsp59upXF4VtCDgVgjAemY6XT4lziZiVQwiK1Oyln8HrIux1aJEWgRGEQpQqwVeCUHClHus5Paf/N0Ci5f9NHh2zbkZvDuUF2uQu6Wc58wXHcVsloyfQibJrH1q2sQRqdiPfN5Y9l0igFzXILFd4itXMyMnSDQh6+GD6V0YY/hGufAs42UymjGYbEwZQP/gn8/bQpilngbmlJ7bB2nva70kXgqhuNiZM5XuFuQMIP3U2Tbm87FHEFUCoKPv2if9Ft74YkAuzVljAMo2YLQ= diff --git a/README.md b/README.md index 381db3e1..91e5cd99 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,7 @@ false If you have full rinkeby chaindata you can move it to `rinkeby_vulcanizedb_geth_data` docker volume to skip long wait of sync. ## Running the Tests +- Replace the empty `ipcPath` in the `environments/infura.toml` with a path to a full archival node's eth_jsonrpc endpoint (e.g. local geth node ipc path or infura url) - `createdb vulcanize_private` will create the test db - `make migrate NAME=vulcanize_private` will run the db migrations - `make test` will run the unit tests and skip the integration tests @@ -154,12 +155,12 @@ Contract watchers work with a light or full sync vDB to fetch raw ethereum data A watcher is composed of at least a fetcher and a transformer or set of transformers, where a fetcher is an interface for retrieving raw Ethereum data from some source (e.g. eth_jsonrpc, IPFS) and a transformer is an interface for filtering through that raw Ethereum data to extract, process, and persist data for specific contracts or accounts. -### contractWatcher +## contractWatcher The `contractWatcher` command is a built-in generic contract watcher. It can watch any and all events for a given contract provided the contract's ABI is available. It also provides some state variable coverage by automating polling of public methods, with some restrictions: 1. The method must have 2 or less arguments -2. The method's arguments must all be of type address or bytes32 (hash) -3. The method must return a single value +1. The method's arguments must all be of type address or bytes32 (hash) +1. The method must return a single value This command operates in two modes- `light` and `full`- which require a light or full-synced vulcanizeDB, respectively. @@ -177,7 +178,7 @@ This command takes a config of the form: port = 5432 [client] - ipcPath = "path_to_ethjson_rpc" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [contract] network = "" @@ -238,7 +239,7 @@ This command takes a config of the form: At the very minimum, for each contract address an ABI and a starting block number need to be provided (or just the starting block if the ABI can be reliably fetched from Etherscan). With just this information we will be able to watch all events at the contract, but with no additional filters and no method polling. -#### contractWatcher output +### contractWatcher output Transformed events and polled method results are committed to Postgres in schemas and tables generated according to the contract abi. @@ -246,9 +247,9 @@ Schemas are created for each contract using the naming convention `_< Under this schema, tables are generated for watched events as `_event` and for polled methods as `_method` The 'method' and 'event' identifiers are tacked onto the end of the table names to prevent collisions between methods and events of the same lowercase name -Example: +### contractWatcher example: -Modify `./environments/example.toml` to replace `"path_to_ethjson_rpc"` with a path that points to an ethjson_rpc endpoint (e.g. a local geth node ipc path or an Infura url). +Modify `./environments/example.toml` to replace the empty `ipcPath` with a path that points to an ethjson_rpc endpoint (e.g. a local geth node ipc path or an Infura url). This endpoint should be for an archival eth node if we want to perform method polling as this configuration is currently set up to do. To work with a non-archival full node, remove the `balanceOf` method from the `0x8dd5fbce2f6a956c3022ba3663759011dd51e73e` (TrueUSD) contract. @@ -314,14 +315,14 @@ The addition of '_' after table names is to prevent collisions with reserved Pos Also notice that the contract address used for the schema name has been down-cased. -### composeAndExecute +## composeAndExecute The `composeAndExecute` command is used to compose and execute over an arbitrary set of custom transformers. This is accomplished by generating a Go pluggin which allows our `vulcanizedb` binary to link to external transformers, so long as they abide by our standard [interfaces](https://github.com/vulcanize/maker-vulcanizedb/tree/compose_and_execute/libraries/shared/transformer). This command requires Go 1.11+ and [Go plugins](https://golang.org/pkg/plugin/) only work on Unix based systems. -#### Writing custom transformers +### Writing custom transformers Storage Transformers * [Guide](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/factories/storage/README.md) * [Example](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/factories/storage/EXAMPLE.md) @@ -330,7 +331,7 @@ Event Transformers * [Guide](https://github.com/vulcanize/maker-vulcanizedb/blob/event_docs/libraries/shared/factories/README.md) * [Example](https://github.com/vulcanize/ens_transformers/tree/working) -#### composeAndExecute configuration +### composeAndExecute configuration A .toml config file is specified when executing the command: `./vulcanizedb composeAndExecute --config=./environments/config_name.toml` @@ -345,7 +346,7 @@ The config provides information for composing a set of transformers: port = 5432 [client] - ipcPath = "http://kovan0.vulcanize.io:8545" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [exporter] home = "github.com/vulcanize/vulcanizedb" @@ -365,7 +366,7 @@ The config provides information for composing a set of transformers: rank = "0" [exporter.transformer2] path = "path/to/transformer2" - type = "eth_generic" + type = "eth_contract" repository = "github.com/account/repo" migrations = "db/migrations" rank = "0" @@ -441,7 +442,7 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface } ``` -#### Preparing transformer(s) to work as pluggins for composeAndExecute +### Preparing transformers to work as pluggins for composeAndExecute To plug in an external transformer we need to: * Create a [package](https://github.com/vulcanize/ens_transformers/blob/working/transformers/registry/new_owner/initializer/initializer.go) diff --git a/cmd/compose.go b/cmd/compose.go index 28de4af1..b8098b9b 100644 --- a/cmd/compose.go +++ b/cmd/compose.go @@ -42,7 +42,7 @@ var composeCmd = &cobra.Command{ port = 5432 [client] - ipcPath = "http://kovan0.vulcanize.io:8545" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [exporter] home = "github.com/vulcanize/vulcanizedb" diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index c6aa318b..d2abd045 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -42,7 +42,7 @@ var composeAndExecuteCmd = &cobra.Command{ port = 5432 [client] - ipcPath = "http://kovan0.vulcanize.io:8545" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [exporter] home = "github.com/vulcanize/vulcanizedb" @@ -180,7 +180,7 @@ func composeAndExecute() { gw := watcher.NewContractWatcher(&db, blockChain) gw.AddTransformers(ethContractInitializers) wg.Add(1) - go contractWatching(&gw, &wg) + go watchEthContract(&gw, &wg) } wg.Wait() } diff --git a/cmd/contractWatcher.go b/cmd/contractWatcher.go index cfdfd7d9..d27802ce 100644 --- a/cmd/contractWatcher.go +++ b/cmd/contractWatcher.go @@ -46,7 +46,7 @@ Requires a .toml config file: port = 5432 [client] - ipcPath = "path_to_ethjson_rpc" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [contract] network = "" @@ -112,12 +112,14 @@ func contractWatcher() { } for range ticker.C { - t.Execute() + err = t.Execute() + if err != nil { + log.Error("Execution error for transformer:", t.GetConfig().Name, err) + } } } func init() { rootCmd.AddCommand(contractWatcherCmd) - contractWatcherCmd.Flags().StringVarP(&mode, "mode", "o", "light", "'light' or 'full' mode to work with either light synced or fully synced vDB (default is light)") } diff --git a/cmd/execute.go b/cmd/execute.go index 3010db59..3ebc15f1 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -46,7 +46,7 @@ var executeCmd = &cobra.Command{ port = 5432 [client] - ipcPath = "http://kovan0.vulcanize.io:8545" + ipcPath = "/Users/user/Library/Ethereum/geth.ipc" [exporter] name = "exampleTransformerExporter" @@ -128,7 +128,7 @@ func execute() { gw := watcher.NewContractWatcher(&db, blockChain) gw.AddTransformers(ethContractInitializers) wg.Add(1) - go contractWatching(&gw, &wg) + go watchEthContract(&gw, &wg) } wg.Wait() } @@ -155,10 +155,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { - err := w.Execute(recheck) - if err != nil { - // TODO Handle watcher errors in execute - } + w.Execute(recheck) } } @@ -169,23 +166,17 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { - err := w.Execute() - if err != nil { - // TODO Handle watcher errors in execute - } + w.Execute() } } -func contractWatching(w *watcher.ContractWatcher, wg *syn.WaitGroup) { +func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the ContractTransformerInitializer set using the contract watcher log.Info("executing contract_watcher transformers") ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { - err := w.Execute(nil) - if err != nil { - // TODO Handle watcher errors in execute - } + w.Execute() } } diff --git a/environments/example.toml b/environments/example.toml index 9d253390..e025baaa 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -4,7 +4,7 @@ port = 5432 [client] - ipcPath = "path_to_ethjson_rpc" + ipcPath = "" [contract] network = "" diff --git a/environments/infura.toml b/environments/infura.toml index d9afd385..fb9876fb 100644 --- a/environments/infura.toml +++ b/environments/infura.toml @@ -4,4 +4,4 @@ port = 5432 [client] - ipcPath = "https://mainnet.infura.io/J5Vd2fRtGsw0zZ0Ov3BL" \ No newline at end of file + ipcPath = "" diff --git a/integration_test/contract_watcher_full_transformer_test.go b/integration_test/contract_watcher_full_transformer_test.go index 03e174b9..bcb7a29a 100644 --- a/integration_test/contract_watcher_full_transformer_test.go +++ b/integration_test/contract_watcher_full_transformer_test.go @@ -50,7 +50,6 @@ var _ = Describe("contractWatcher full transformer", func() { Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(int64(6194633))) - Expect(c.LastBlock).To(Equal(int64(6194634))) Expect(c.Abi).To(Equal(constants.TusdAbiString)) Expect(c.Name).To(Equal("TrueUSD")) Expect(c.Address).To(Equal(tusdAddr)) diff --git a/integration_test/contract_watcher_light_transformer_test.go b/integration_test/contract_watcher_light_transformer_test.go index 45323c10..cf9efa77 100644 --- a/integration_test/contract_watcher_light_transformer_test.go +++ b/integration_test/contract_watcher_light_transformer_test.go @@ -48,7 +48,6 @@ var _ = Describe("contractWatcher light transformer", func() { Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(int64(6194632))) - Expect(c.LastBlock).To(Equal(int64(-1))) Expect(c.Abi).To(Equal(constants.TusdAbiString)) Expect(c.Name).To(Equal("TrueUSD")) Expect(c.Address).To(Equal(tusdAddr)) @@ -203,6 +202,7 @@ var _ = Describe("contractWatcher light transformer", func() { Expect(err).ToNot(HaveOccurred()) err = t.Execute() Expect(err).ToNot(HaveOccurred()) + Expect(t.Start).To(Equal(int64(6885698))) log := test_helpers.LightNewOwnerLog{} err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&log) @@ -318,7 +318,7 @@ var _ = Describe("contractWatcher light transformer", func() { Describe("Execute- against both ENS and TrueUSD", func() { BeforeEach(func() { - for i := 6885692; i < 6885702; i++ { + for i := 6885692; i <= 6885701; i++ { header, err := blockChain.GetHeaderByNumber(int64(i)) Expect(err).ToNot(HaveOccurred()) _, err = headerRepository.CreateOrUpdateHeader(header) @@ -332,6 +332,7 @@ var _ = Describe("contractWatcher light transformer", func() { Expect(err).ToNot(HaveOccurred()) err = t.Execute() Expect(err).ToNot(HaveOccurred()) + Expect(t.Start).To(Equal(int64(6885702))) newOwnerLog := test_helpers.LightNewOwnerLog{} err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog) diff --git a/pkg/contract_watcher/shared/poller/poller_test.go b/integration_test/poller_test.go similarity index 92% rename from pkg/contract_watcher/shared/poller/poller_test.go rename to integration_test/poller_test.go index c4e06762..fe913176 100644 --- a/pkg/contract_watcher/shared/poller/poller_test.go +++ b/integration_test/poller_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package poller_test +package integration_test import ( "fmt" @@ -34,7 +34,7 @@ import ( var _ = Describe("Poller", func() { - var p poller.Poller + var contractPoller poller.Poller var con *contract.Contract var db *postgres.DB var bc core.BlockChain @@ -46,7 +46,7 @@ var _ = Describe("Poller", func() { Describe("Full sync mode", func() { BeforeEach(func() { db, bc = test_helpers.SetupDBandBC() - p = poller.NewPoller(bc, db, types.FullSync) + contractPoller = poller.NewPoller(bc, db, types.FullSync) }) Describe("PollContract", func() { @@ -54,10 +54,9 @@ var _ = Describe("Poller", func() { con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) Expect(con.Abi).To(Equal(constants.TusdAbiString)) con.StartingBlock = 6707322 - con.LastBlock = 6707323 con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) - err := p.PollContract(*con) + err := contractPoller.PollContract(*con, 6707323) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.BalanceOf{} @@ -89,7 +88,7 @@ var _ = Describe("Poller", func() { Expect(len(con.Methods)).To(Equal(1)) con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86")) - err := p.PollContractAt(*con, 6885877) + err := contractPoller.PollContractAt(*con, 6885877) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.Owner{} @@ -109,10 +108,9 @@ var _ = Describe("Poller", func() { con = test_helpers.SetupTusdContract(nil, nil) Expect(con.Abi).To(Equal(constants.TusdAbiString)) con.StartingBlock = 6707322 - con.LastBlock = 6707323 con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) - err := p.PollContract(*con) + err := contractPoller.PollContract(*con, 6707323) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.BalanceOf{} @@ -125,7 +123,7 @@ var _ = Describe("Poller", func() { Describe("FetchContractData", func() { It("Calls a single contract method", func() { var name = new(string) - err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) + err := contractPoller.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) Expect(err).ToNot(HaveOccurred()) Expect(*name).To(Equal("TrueUSD")) }) @@ -135,7 +133,7 @@ var _ = Describe("Poller", func() { Describe("Light sync mode", func() { BeforeEach(func() { db, bc = test_helpers.SetupDBandBC() - p = poller.NewPoller(bc, db, types.LightSync) + contractPoller = poller.NewPoller(bc, db, types.LightSync) }) Describe("PollContract", func() { @@ -143,10 +141,9 @@ var _ = Describe("Poller", func() { con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) Expect(con.Abi).To(Equal(constants.TusdAbiString)) con.StartingBlock = 6707322 - con.LastBlock = 6707323 con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) - err := p.PollContract(*con) + err := contractPoller.PollContract(*con, 6707323) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.BalanceOf{} @@ -178,7 +175,7 @@ var _ = Describe("Poller", func() { Expect(len(con.Methods)).To(Equal(1)) con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86")) - err := p.PollContractAt(*con, 6885877) + err := contractPoller.PollContractAt(*con, 6885877) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.Owner{} @@ -198,10 +195,9 @@ var _ = Describe("Poller", func() { con = test_helpers.SetupTusdContract(nil, nil) Expect(con.Abi).To(Equal(constants.TusdAbiString)) con.StartingBlock = 6707322 - con.LastBlock = 6707323 con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) - err := p.PollContract(*con) + err := contractPoller.PollContract(*con, 6707323) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.BalanceOf{} @@ -214,11 +210,10 @@ var _ = Describe("Poller", func() { con = test_helpers.SetupENSContract(nil, []string{"resolver"}) Expect(con.Abi).To(Equal(constants.ENSAbiString)) con.StartingBlock = 6921967 - con.LastBlock = 6921968 con.EmittedAddrs = map[interface{}]bool{} con.Piping = false con.AddEmittedHash(common.HexToHash("0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8")) - err := p.PollContract(*con) + err := contractPoller.PollContract(*con, 6921968) Expect(err).ToNot(HaveOccurred()) scanStruct := test_helpers.Resolver{} @@ -230,10 +225,10 @@ var _ = Describe("Poller", func() { test_helpers.TearDown(db) db, bc = test_helpers.SetupDBandBC() - p = poller.NewPoller(bc, db, types.LightSync) + contractPoller = poller.NewPoller(bc, db, types.LightSync) con.Piping = true - err = p.PollContract(*con) + err = contractPoller.PollContract(*con, 6921968) Expect(err).ToNot(HaveOccurred()) err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.resolver_method WHERE node_ = '0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8' AND block = '6921967'", constants.EnsContractAddress)).StructScan(&scanStruct) @@ -248,7 +243,7 @@ var _ = Describe("Poller", func() { Describe("FetchContractData", func() { It("Calls a single contract method", func() { var name = new(string) - err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) + err := contractPoller.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) Expect(err).ToNot(HaveOccurred()) Expect(*name).To(Equal("TrueUSD")) }) diff --git a/libraries/shared/factories/event/README.md b/libraries/shared/factories/event/README.md index 0a1aa33c..9ad2093a 100644 --- a/libraries/shared/factories/event/README.md +++ b/libraries/shared/factories/event/README.md @@ -264,7 +264,7 @@ func (repository *ExampleRepository) SetDB(db *postgres.DB) { } func (repository ExampleRepository) Create(headerID int64, models []interface{}) error { - tx, dBaseErr := repository.db.Begin() + tx, dBaseErr := repository.db.Beginx() if dBaseErr != nil { return dBaseErr } diff --git a/libraries/shared/watcher/contract_watcher.go b/libraries/shared/watcher/contract_watcher.go index 880dfb5b..ebb4cc37 100644 --- a/libraries/shared/watcher/contract_watcher.go +++ b/libraries/shared/watcher/contract_watcher.go @@ -49,7 +49,6 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { return fmt.Errorf("initializers of type %T, not %T", inits, []transformer.ContractTransformerInitializer{}) } - watcher.Transformers = make([]transformer.ContractTransformer, 0, len(initializers)) for _, initializer := range initializers { t := initializer(watcher.DB, watcher.BlockChain) watcher.Transformers = append(watcher.Transformers, t) @@ -65,7 +64,7 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error { return nil } -func (watcher *ContractWatcher) Execute(interface{}) error { +func (watcher *ContractWatcher) Execute() error { for _, transformer := range watcher.Transformers { err := transformer.Execute() if err != nil { diff --git a/pkg/config/contract.go b/pkg/config/contract.go index a2a09d2f..50b8366a 100644 --- a/pkg/config/contract.go +++ b/pkg/config/contract.go @@ -19,6 +19,7 @@ package config import ( log "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/vulcanize/vulcanizedb/pkg/geth" "strings" ) @@ -65,20 +66,20 @@ type ContractConfig struct { Piping map[string]bool } -func (oc *ContractConfig) PrepConfig() { +func (contractConfig *ContractConfig) PrepConfig() { addrs := viper.GetStringSlice("contract.addresses") - oc.Network = viper.GetString("contract.network") - oc.Addresses = make(map[string]bool, len(addrs)) - oc.Abis = make(map[string]string, len(addrs)) - oc.Methods = make(map[string][]string, len(addrs)) - oc.Events = make(map[string][]string, len(addrs)) - oc.MethodArgs = make(map[string][]string, len(addrs)) - oc.EventArgs = make(map[string][]string, len(addrs)) - oc.StartingBlocks = make(map[string]int64, len(addrs)) - oc.Piping = make(map[string]bool, len(addrs)) + contractConfig.Network = viper.GetString("contract.network") + contractConfig.Addresses = make(map[string]bool, len(addrs)) + contractConfig.Abis = make(map[string]string, len(addrs)) + contractConfig.Methods = make(map[string][]string, len(addrs)) + contractConfig.Events = make(map[string][]string, len(addrs)) + contractConfig.MethodArgs = make(map[string][]string, len(addrs)) + contractConfig.EventArgs = make(map[string][]string, len(addrs)) + contractConfig.StartingBlocks = make(map[string]int64, len(addrs)) + contractConfig.Piping = make(map[string]bool, len(addrs)) // De-dupe addresses for _, addr := range addrs { - oc.Addresses[strings.ToLower(addr)] = true + contractConfig.Addresses[strings.ToLower(addr)] = true } // Iterate over addresses to pull out config info for each contract @@ -87,26 +88,29 @@ func (oc *ContractConfig) PrepConfig() { // Get and check abi var abi string - _, abiOK := transformer["abi"] + abiInterface, abiOK := transformer["abi"] if !abiOK { log.Warnf("contract %s not configured with an ABI, will attempt to fetch it from Etherscan\r\n", addr) } else { - abiInterface := transformer["abi"] abi, abiOK = abiInterface.(string) if !abiOK { log.Fatal(addr, "transformer `abi` not of type []string") } } - oc.Abis[strings.ToLower(addr)] = abi + if abi != "" { + if _, abiErr := geth.ParseAbi(abi); abiErr != nil { + log.Fatal(addr, "transformer `abi` not valid JSON") + } + } + contractConfig.Abis[strings.ToLower(addr)] = abi // Get and check events events := make([]string, 0) - _, eventsOK := transformer["events"] + eventsInterface, eventsOK := transformer["events"] if !eventsOK { log.Warnf("contract %s not configured with a list of events to watch, will watch all events\r\n", addr) events = []string{} } else { - eventsInterface := transformer["events"] eventsI, eventsOK := eventsInterface.([]interface{}) if !eventsOK { log.Fatal(addr, "transformer `events` not of type []string\r\n") @@ -119,16 +123,15 @@ func (oc *ContractConfig) PrepConfig() { events = append(events, str) } } - oc.Events[strings.ToLower(addr)] = events + contractConfig.Events[strings.ToLower(addr)] = events // Get and check methods methods := make([]string, 0) - _, methodsOK := transformer["methods"] + methodsInterface, methodsOK := transformer["methods"] if !methodsOK { log.Warnf("contract %s not configured with a list of methods to poll, will not poll any methods\r\n", addr) methods = []string{} } else { - methodsInterface := transformer["methods"] methodsI, methodsOK := methodsInterface.([]interface{}) if !methodsOK { log.Fatal(addr, "transformer `methods` not of type []string\r\n") @@ -141,16 +144,15 @@ func (oc *ContractConfig) PrepConfig() { methods = append(methods, str) } } - oc.Methods[strings.ToLower(addr)] = methods + contractConfig.Methods[strings.ToLower(addr)] = methods // Get and check eventArgs eventArgs := make([]string, 0) - _, eventArgsOK := transformer["eventArgs"] + eventArgsInterface, eventArgsOK := transformer["eventArgs"] if !eventArgsOK { log.Warnf("contract %s not configured with a list of event arguments to filter for, will not filter events for specific emitted values\r\n", addr) eventArgs = []string{} } else { - eventArgsInterface := transformer["eventArgs"] eventArgsI, eventArgsOK := eventArgsInterface.([]interface{}) if !eventArgsOK { log.Fatal(addr, "transformer `eventArgs` not of type []string\r\n") @@ -163,16 +165,15 @@ func (oc *ContractConfig) PrepConfig() { eventArgs = append(eventArgs, str) } } - oc.EventArgs[strings.ToLower(addr)] = eventArgs + contractConfig.EventArgs[strings.ToLower(addr)] = eventArgs // Get and check methodArgs methodArgs := make([]string, 0) - _, methodArgsOK := transformer["methodArgs"] + methodArgsInterface, methodArgsOK := transformer["methodArgs"] if !methodArgsOK { log.Warnf("contract %s not configured with a list of method argument values to poll with, will poll methods with all available arguments\r\n", addr) methodArgs = []string{} } else { - methodArgsInterface := transformer["methodArgs"] methodArgsI, methodArgsOK := methodArgsInterface.([]interface{}) if !methodArgsOK { log.Fatal(addr, "transformer `methodArgs` not of type []string\r\n") @@ -185,7 +186,7 @@ func (oc *ContractConfig) PrepConfig() { methodArgs = append(methodArgs, str) } } - oc.MethodArgs[strings.ToLower(addr)] = methodArgs + contractConfig.MethodArgs[strings.ToLower(addr)] = methodArgs // Get and check startingBlock startInterface, startOK := transformer["startingblock"] @@ -196,7 +197,7 @@ func (oc *ContractConfig) PrepConfig() { if !startOK { log.Fatal(addr, "transformer `startingBlock` not of type int\r\n") } - oc.StartingBlocks[strings.ToLower(addr)] = start + contractConfig.StartingBlocks[strings.ToLower(addr)] = start // Get pipping var piping bool @@ -211,6 +212,6 @@ func (oc *ContractConfig) PrepConfig() { log.Fatal(addr, "transformer `piping` not of type bool\r\n") } } - oc.Piping[strings.ToLower(addr)] = piping + contractConfig.Piping[strings.ToLower(addr)] = piping } } diff --git a/pkg/contract_watcher/full/converter/converter.go b/pkg/contract_watcher/full/converter/converter.go index 39341145..09bae256 100644 --- a/pkg/contract_watcher/full/converter/converter.go +++ b/pkg/contract_watcher/full/converter/converter.go @@ -49,10 +49,10 @@ func (c *Converter) Update(info *contract.Contract) { // Convert the given watched event log into a types.Log for the given event func (c *Converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (*types.Log, error) { - contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) + boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) values := make(map[string]interface{}) log := helpers.ConvertToLog(watchedEvent) - err := contract.UnpackLogIntoMap(values, event.Name, log) + err := boundContract.UnpackLogIntoMap(values, event.Name, log) if err != nil { return nil, err } diff --git a/pkg/contract_watcher/full/transformer/transformer.go b/pkg/contract_watcher/full/transformer/transformer.go index 132292ab..ad2bba69 100644 --- a/pkg/contract_watcher/full/transformer/transformer.go +++ b/pkg/contract_watcher/full/transformer/transformer.go @@ -36,37 +36,40 @@ import ( // Requires a fully synced vDB and a running eth node (or infura) type Transformer struct { // Database interfaces - datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters() - datastore.WatchedEventRepository // Watched event log views, created by the log filters - repository.EventRepository // Holds transformed watched event log data + FilterRepository datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters() + WatchedEventRepository datastore.WatchedEventRepository // Watched event log views, created by the log filters + TransformedEventRepository repository.EventRepository // Holds transformed watched event log data // Pre-processing interfaces - parser.Parser // Parses events and methods out of contract abi fetched using contract address - retriever.BlockRetriever // Retrieves first block for contract and current block height + Parser parser.Parser // Parses events and methods out of contract abi fetched using contract address + Retriever retriever.BlockRetriever // Retrieves first block for contract and current block height // Processing interfaces - converter.ConverterInterface // Converts watched event logs into custom log - poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore + Converter converter.ConverterInterface // Converts watched event logs into custom log + Poller poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore // Store contract configuration information Config config.ContractConfig // Store contract info as mapping to contract address Contracts map[string]*contract.Contract + + // Latest block in the block repository + LastBlock int64 } // Transformer takes in config for blockchain, database, and network id func NewTransformer(con config.ContractConfig, BC core.BlockChain, DB *postgres.DB) *Transformer { return &Transformer{ - Poller: poller.NewPoller(BC, DB, types.FullSync), - Parser: parser.NewParser(con.Network), - BlockRetriever: retriever.NewBlockRetriever(DB), - ConverterInterface: &converter.Converter{}, - Contracts: map[string]*contract.Contract{}, - WatchedEventRepository: repositories.WatchedEventRepository{DB: DB}, - FilterRepository: repositories.FilterRepository{DB: DB}, - EventRepository: repository.NewEventRepository(DB, types.FullSync), - Config: con, + Poller: poller.NewPoller(BC, DB, types.FullSync), + Parser: parser.NewParser(con.Network), + Retriever: retriever.NewBlockRetriever(DB), + Converter: &converter.Converter{}, + Contracts: map[string]*contract.Contract{}, + WatchedEventRepository: repositories.WatchedEventRepository{DB: DB}, + FilterRepository: repositories.FilterRepository{DB: DB}, + TransformedEventRepository: repository.NewEventRepository(DB, types.FullSync), + Config: con, } } @@ -92,15 +95,10 @@ func (tr *Transformer) Init() error { } // Get first block and most recent block number in the header repo - firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock(contractAddr) + firstBlock, err := tr.Retriever.RetrieveFirstBlock(contractAddr) if err != nil { return err } - lastBlock, err := tr.BlockRetriever.RetrieveMostRecentBlock() - if err != nil { - return err - } - // Set to specified range if it falls within the bounds if firstBlock < tr.Config.StartingBlocks[contractAddr] { firstBlock = tr.Config.StartingBlocks[contractAddr] @@ -108,7 +106,7 @@ func (tr *Transformer) Init() error { // Get contract name if it has one var name = new(string) - tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, name, lastBlock) + tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock) // Remove any potential accidental duplicate inputs in arg filter values eventArgs := map[string]bool{} @@ -128,7 +126,6 @@ func (tr *Transformer) Init() error { Abi: tr.Parser.Abi(), ParsedAbi: tr.Parser.ParsedAbi(), StartingBlock: firstBlock, - LastBlock: lastBlock, Events: tr.Parser.GetEvents(tr.Config.Events[contractAddr]), Methods: tr.Parser.GetSelectMethods(tr.Config.Methods[contractAddr]), FilterArgs: eventArgs, @@ -144,7 +141,7 @@ func (tr *Transformer) Init() error { // Iterate over filters and push them to the repo using filter repository interface for _, filter := range info.Filters { - err = tr.CreateFilter(filter) + err = tr.FilterRepository.CreateFilter(filter) if err != nil { return err } @@ -154,6 +151,13 @@ func (tr *Transformer) Init() error { tr.Contracts[contractAddr] = info } + // Get the most recent block number in the block repo + var err error + tr.LastBlock, err = tr.Retriever.RetrieveMostRecentBlock() + if err != nil { + return err + } + return nil } @@ -169,11 +173,11 @@ func (tr *Transformer) Execute() error { // Iterate through all internal contracts for _, con := range tr.Contracts { // Update converter with current contract - tr.Update(con) + tr.Converter.Update(con) // Iterate through contract filters and get watched event logs for eventSig, filter := range con.Filters { - watchedEvents, err := tr.GetWatchedEvents(filter.Name) + watchedEvents, err := tr.WatchedEventRepository.GetWatchedEvents(filter.Name) if err != nil { return err } @@ -181,7 +185,7 @@ func (tr *Transformer) Execute() error { // Iterate over watched event logs for _, we := range watchedEvents { // Convert them to our custom log type - cstm, err := tr.ConverterInterface.Convert(*we, con.Events[eventSig]) + cstm, err := tr.Converter.Convert(*we, con.Events[eventSig]) if err != nil { return err } @@ -191,7 +195,7 @@ func (tr *Transformer) Execute() error { // If log is not empty, immediately persist in repo // Run this in seperate goroutine? - err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name) + err = tr.TransformedEventRepository.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name) if err != nil { return err } @@ -201,12 +205,19 @@ func (tr *Transformer) Execute() error { // After persisting all watched event logs // poller polls select contract methods // and persists the results into custom pg tables - // Run this in seperate goroutine? - if err := tr.PollContract(*con); err != nil { + if err := tr.Poller.PollContract(*con, tr.LastBlock); err != nil { return err } } + // At the end of a transformation cycle, and before the next + // update the latest block from the block repo + var err error + tr.LastBlock, err = tr.Retriever.RetrieveMostRecentBlock() + if err != nil { + return err + } + return nil } diff --git a/pkg/contract_watcher/full/transformer/transformer_test.go b/pkg/contract_watcher/full/transformer/transformer_test.go index 0cb0fb31..ac03e1a6 100644 --- a/pkg/contract_watcher/full/transformer/transformer_test.go +++ b/pkg/contract_watcher/full/transformer/transformer_test.go @@ -17,7 +17,6 @@ package transformer_test import ( - "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/helpers/test_helpers/mocks" "math/rand" "time" @@ -27,6 +26,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/contract" + "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/helpers/test_helpers/mocks" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/parser" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/poller" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" @@ -67,7 +67,7 @@ var _ = Describe("Transformer", func() { Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(firstBlock)) - Expect(c.LastBlock).To(Equal(mostRecentBlock)) + Expect(t.LastBlock).To(Equal(mostRecentBlock)) Expect(c.Abi).To(Equal(fakeAbi)) Expect(c.Name).To(Equal(fakeContractName)) Expect(c.Address).To(Equal(fakeAddress)) @@ -90,7 +90,7 @@ func getTransformer(blockRetriever retriever.BlockRetriever, parsr parser.Parser return transformer.Transformer{ FilterRepository: &fakes.MockFilterRepository{}, Parser: parsr, - BlockRetriever: blockRetriever, + Retriever: blockRetriever, Poller: pollr, Contracts: map[string]*contract.Contract{}, Config: mocks.MockConfig, diff --git a/pkg/contract_watcher/light/converter/converter.go b/pkg/contract_watcher/light/converter/converter.go index a1b7e051..3a62c807 100644 --- a/pkg/contract_watcher/light/converter/converter.go +++ b/pkg/contract_watcher/light/converter/converter.go @@ -48,7 +48,7 @@ func (c *Converter) Update(info *contract.Contract) { // Convert the given watched event log into a types.Log for the given event func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) { - contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) + boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) returnLogs := make([]types.Log, 0, len(logs)) for _, log := range logs { values := make(map[string]interface{}) @@ -57,7 +57,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in values[field.Name] = i } - err := contract.UnpackLogIntoMap(values, event.Name, log) + err := boundContract.UnpackLogIntoMap(values, event.Name, log) if err != nil { return nil, err } diff --git a/pkg/contract_watcher/light/repository/header_repository.go b/pkg/contract_watcher/light/repository/header_repository.go index 47437f13..3af7652c 100644 --- a/pkg/contract_watcher/light/repository/header_repository.go +++ b/pkg/contract_watcher/light/repository/header_repository.go @@ -17,8 +17,8 @@ package repository import ( - "database/sql" "fmt" + "github.com/jmoiron/sqlx" "github.com/hashicorp/golang-lru" @@ -130,7 +130,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) // Marks all of the provided headers checked for each of the provided column ids func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } @@ -280,7 +280,7 @@ func (r *headerRepository) CheckCache(key string) (interface{}, bool) { } // Used to mark a header checked as part of some external transaction so as to group into one commit -func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error { +func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error { _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) VALUES ($1, $2) ON CONFLICT (header_id) DO diff --git a/pkg/contract_watcher/light/repository/header_repository_test.go b/pkg/contract_watcher/light/repository/header_repository_test.go index 02a94bbf..01eb876a 100644 --- a/pkg/contract_watcher/light/repository/header_repository_test.go +++ b/pkg/contract_watcher/light/repository/header_repository_test.go @@ -32,7 +32,6 @@ import ( var _ = Describe("Repository", func() { var db *postgres.DB - var bc core.BlockChain var contractHeaderRepo repository.HeaderRepository // contract_watcher light header repository var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository var eventIDs = []string{ @@ -47,7 +46,7 @@ var _ = Describe("Repository", func() { } BeforeEach(func() { - db, bc = test_helpers.SetupDBandBC() + db, _ = test_helpers.SetupDBandBC() contractHeaderRepo = repository.NewHeaderRepository(db) coreHeaderRepo = repositories.NewHeaderRepository(db) }) @@ -205,7 +204,7 @@ var _ = Describe("Repository", func() { }) It("Returns at most 100 headers", func() { - add102Headers(coreHeaderRepo, bc) + add102Headers(coreHeaderRepo) err := contractHeaderRepo.AddCheckColumns(eventIDs) Expect(err).ToNot(HaveOccurred()) @@ -359,11 +358,11 @@ func addDiscontinuousHeaders(coreHeaderRepo repositories.HeaderRepository) { coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader4) } -func add102Headers(coreHeaderRepo repositories.HeaderRepository, blockChain core.BlockChain) { +func add102Headers(coreHeaderRepo repositories.HeaderRepository) { + baseHeader := mocks.MockHeader1 for i := 6194632; i < 6194733; i++ { - header, err := blockChain.GetHeaderByNumber(int64(i)) - Expect(err).ToNot(HaveOccurred()) - _, err = coreHeaderRepo.CreateOrUpdateHeader(header) + _, err := coreHeaderRepo.CreateOrUpdateHeader(baseHeader) Expect(err).ToNot(HaveOccurred()) + baseHeader.BlockNumber++ } } diff --git a/pkg/contract_watcher/light/transformer/transformer.go b/pkg/contract_watcher/light/transformer/transformer.go index 7753e286..8ce5f064 100644 --- a/pkg/contract_watcher/light/transformer/transformer.go +++ b/pkg/contract_watcher/light/transformer/transformer.go @@ -18,12 +18,12 @@ package transformer import ( "errors" - "github.com/vulcanize/vulcanizedb/pkg/config" "strings" "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/converter" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/fetcher" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/repository" @@ -40,17 +40,17 @@ import ( // Requires a light synced vDB (headers) and a running eth node (or infura) type Transformer struct { // Database interfaces - srep.EventRepository // Holds transformed watched event log data - repository.HeaderRepository // Interface for interaction with header repositories + EventRepository srep.EventRepository // Holds transformed watched event log data + HeaderRepository repository.HeaderRepository // Interface for interaction with header repositories // Pre-processing interfaces - parser.Parser // Parses events and methods out of contract abi fetched using contract address - retriever.BlockRetriever // Retrieves first block for contract and current block height + Parser parser.Parser // Parses events and methods out of contract abi fetched using contract address + Retriever retriever.BlockRetriever // Retrieves first block for contract // Processing interfaces - fetcher.Fetcher // Fetches event logs, using header hashes - converter.ConverterInterface // Converts watched event logs into custom log - poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore + Fetcher fetcher.Fetcher // Fetches event logs, using header hashes + Converter converter.ConverterInterface // Converts watched event logs into custom log + Poller poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore // Store contract configuration information Config config.ContractConfig @@ -64,7 +64,7 @@ type Transformer struct { sortedMethodIds map[string][]string // Map to sort method column ids by contract, for post fetch method polling eventIds []string // Holds event column ids across all contract, for batch fetching of headers eventFilters []common.Hash // Holds topic0 hashes across all contracts, for batch fetching of logs - start int64 // Hold the lowest starting block and the highest ending block + Start int64 // Hold the lowest starting block and the highest ending block } // Order-of-operations: @@ -77,15 +77,15 @@ type Transformer struct { func NewTransformer(con config.ContractConfig, bc core.BlockChain, db *postgres.DB) *Transformer { return &Transformer{ - Poller: poller.NewPoller(bc, db, types.LightSync), - Fetcher: fetcher.NewFetcher(bc), - Parser: parser.NewParser(con.Network), - HeaderRepository: repository.NewHeaderRepository(db), - BlockRetriever: retriever.NewBlockRetriever(db), - ConverterInterface: &converter.Converter{}, - Contracts: map[string]*contract.Contract{}, - EventRepository: srep.NewEventRepository(db, types.LightSync), - Config: con, + Poller: poller.NewPoller(bc, db, types.LightSync), + Fetcher: fetcher.NewFetcher(bc), + Parser: parser.NewParser(con.Network), + HeaderRepository: repository.NewHeaderRepository(db), + Retriever: retriever.NewBlockRetriever(db), + Converter: &converter.Converter{}, + Contracts: map[string]*contract.Contract{}, + EventRepository: srep.NewEventRepository(db, types.LightSync), + Config: con, } } @@ -100,7 +100,7 @@ func (tr *Transformer) Init() error { tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs - tr.start = 100000000000 // Hold the lowest starting block and the highest ending block + tr.Start = 100000000000 // Iterate through all internal contract addresses for contractAddr := range tr.Config.Addresses { @@ -120,7 +120,7 @@ func (tr *Transformer) Init() error { } // Get first block and most recent block number in the header repo - firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock() + firstBlock, err := tr.Retriever.RetrieveFirstBlock() if err != nil { return err } @@ -132,7 +132,7 @@ func (tr *Transformer) Init() error { // Get contract name if it has one var name = new(string) - tr.Poller.FetchContractData(tr.Abi(), contractAddr, "name", nil, name, -1) + tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1) // Remove any potential accidental duplicate inputs eventArgs := map[string]bool{} @@ -152,7 +152,6 @@ func (tr *Transformer) Init() error { Abi: tr.Parser.Abi(), ParsedAbi: tr.Parser.ParsedAbi(), StartingBlock: firstBlock, - LastBlock: -1, Events: tr.Parser.GetEvents(tr.Config.Events[contractAddr]), Methods: tr.Parser.GetSelectMethods(tr.Config.Methods[contractAddr]), FilterArgs: eventArgs, @@ -189,8 +188,8 @@ func (tr *Transformer) Init() error { } // Update start to the lowest block - if con.StartingBlock < tr.start { - tr.start = con.StartingBlock + if con.StartingBlock < tr.Start { + tr.Start = con.StartingBlock } } @@ -202,20 +201,20 @@ func (tr *Transformer) Execute() error { return errors.New("error: transformer has no initialized contracts") } - // Map to sort batch fetched logs by which contract they belong to, for post fetch processing - sortedLogs := make(map[string][]gethTypes.Log) - for _, con := range tr.Contracts { - sortedLogs[con.Address] = []gethTypes.Log{} - } - // Find unchecked headers for all events across all contracts; these are returned in asc order - missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.start, -1, tr.eventIds) + missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds) if err != nil { return err } // Iterate over headers for _, header := range missingHeaders { + // Set `start` to this header + // This way if we throw an error but don't bring the execution cycle down (how it is currently handled) + // we restart the cycle at this header + tr.Start = header.BlockNumber + // Map to sort batch fetched logs by which contract they belong to, for post fetch processing + sortedLogs := make(map[string][]gethTypes.Log) // And fetch all event logs across contracts at this header allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header) if err != nil { @@ -233,6 +232,7 @@ func (tr *Transformer) Execute() error { if err != nil { return err } + tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header continue } @@ -249,10 +249,10 @@ func (tr *Transformer) Execute() error { } // Configure converter with this contract con := tr.Contracts[conAddr] - tr.ConverterInterface.Update(con) + tr.Converter.Update(con) // Convert logs into batches of log mappings (eventName => []types.Logs - convertedLogs, err := tr.ConverterInterface.ConvertBatch(logs, con.Events, header.Id) + convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id) if err != nil { return err } @@ -281,8 +281,8 @@ func (tr *Transformer) Execute() error { if err != nil { return err } - - tr.start = header.BlockNumber + 1 + // Success; setup to start at the next header + tr.Start = header.BlockNumber + 1 } return nil diff --git a/pkg/contract_watcher/light/transformer/transformer_test.go b/pkg/contract_watcher/light/transformer/transformer_test.go index 98a31722..8c17f570 100644 --- a/pkg/contract_watcher/light/transformer/transformer_test.go +++ b/pkg/contract_watcher/light/transformer/transformer_test.go @@ -19,6 +19,7 @@ package transformer_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/retriever" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/transformer" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/contract" @@ -54,7 +55,47 @@ var _ = Describe("Transformer", func() { Expect(ok).To(Equal(true)) Expect(c.StartingBlock).To(Equal(firstBlock)) - Expect(c.LastBlock).To(Equal(int64(-1))) + Expect(c.Abi).To(Equal(fakeAbi)) + Expect(c.Name).To(Equal(fakeContractName)) + Expect(c.Address).To(Equal(fakeAddress)) + }) + + It("Fails to initialize if first block cannot be fetched from vDB headers table", func() { + blockRetriever := &fakes.MockLightBlockRetriever{} + blockRetriever.FirstBlockErr = fakes.FakeError + t := getFakeTransformer(blockRetriever, &fakes.MockParser{}, &fakes.MockPoller{}) + + err := t.Init() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + }) + + Describe("Execute", func() { + It("Executes contract transformations", func() { + blockRetriever := &fakes.MockLightBlockRetriever{} + firstBlock := int64(1) + blockRetriever.FirstBlock = firstBlock + + parsr := &fakes.MockParser{} + fakeAbi := "fake_abi" + parsr.AbiToReturn = fakeAbi + + pollr := &fakes.MockPoller{} + fakeContractName := "fake_contract_name" + pollr.ContractName = fakeContractName + + t := getFakeTransformer(blockRetriever, parsr, pollr) + + err := t.Init() + + Expect(err).ToNot(HaveOccurred()) + + c, ok := t.Contracts[fakeAddress] + Expect(ok).To(Equal(true)) + + Expect(c.StartingBlock).To(Equal(firstBlock)) Expect(c.Abi).To(Equal(fakeAbi)) Expect(c.Name).To(Equal(fakeContractName)) Expect(c.Address).To(Equal(fakeAddress)) @@ -76,7 +117,7 @@ var _ = Describe("Transformer", func() { func getFakeTransformer(blockRetriever retriever.BlockRetriever, parsr parser.Parser, pollr poller.Poller) transformer.Transformer { return transformer.Transformer{ Parser: parsr, - BlockRetriever: blockRetriever, + Retriever: blockRetriever, Poller: pollr, HeaderRepository: &fakes.MockLightHeaderRepository{}, Contracts: map[string]*contract.Contract{}, diff --git a/pkg/contract_watcher/shared/contract/contract.go b/pkg/contract_watcher/shared/contract/contract.go index b3eb3b7f..6b3eb0ff 100644 --- a/pkg/contract_watcher/shared/contract/contract.go +++ b/pkg/contract_watcher/shared/contract/contract.go @@ -34,7 +34,6 @@ type Contract struct { Address string // Address of the contract Network string // Network on which the contract is deployed; default empty "" is Ethereum mainnet StartingBlock int64 // Starting block of the contract - LastBlock int64 // Most recent block on the network Abi string // Abi string ParsedAbi abi.ABI // Parsed abi Events map[string]types.Event // List of events to watch diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/database.go b/pkg/contract_watcher/shared/helpers/test_helpers/database.go index 7c297bef..93c79459 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/database.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/database.go @@ -168,7 +168,6 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract Abi: p.Abi(), ParsedAbi: p.ParsedAbi(), StartingBlock: 6194634, - LastBlock: 6507323, Events: p.GetEvents(wantedEvents), Methods: p.GetSelectMethods(wantedMethods), MethodArgs: map[string]bool{}, @@ -215,7 +214,6 @@ func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract { Abi: p.Abi(), ParsedAbi: p.ParsedAbi(), StartingBlock: 6194634, - LastBlock: 6507323, Events: p.GetEvents(wantedEvents), Methods: p.GetSelectMethods(wantedMethods), MethodArgs: map[string]bool{}, @@ -224,7 +222,7 @@ func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract { } func TearDown(db *postgres.DB) { - tx, err := db.Begin() + tx, err := db.Beginx() Expect(err).NotTo(HaveOccurred()) _, err = tx.Exec(`DELETE FROM blocks`) diff --git a/pkg/contract_watcher/shared/parser/parser.go b/pkg/contract_watcher/shared/parser/parser.go index cc3da5bd..77af237b 100644 --- a/pkg/contract_watcher/shared/parser/parser.go +++ b/pkg/contract_watcher/shared/parser/parser.go @@ -98,7 +98,7 @@ func (p *parser) lookUp(contractAddr string) (string, error) { return v, nil } - return "", errors.New("ABI not present in lookup tabe") + return "", errors.New("ABI not present in lookup table") } // Returns only specified methods, if they meet the criteria diff --git a/pkg/contract_watcher/shared/poller/poller.go b/pkg/contract_watcher/shared/poller/poller.go index ae4f1974..b3f1c51b 100644 --- a/pkg/contract_watcher/shared/poller/poller.go +++ b/pkg/contract_watcher/shared/poller/poller.go @@ -34,7 +34,7 @@ import ( ) type Poller interface { - PollContract(con contract.Contract) error + PollContract(con contract.Contract, lastBlock int64) error PollContractAt(con contract.Contract, blockNumber int64) error FetchContractData(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error } @@ -52,8 +52,8 @@ func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *po } } -func (p *poller) PollContract(con contract.Contract) error { - for i := con.StartingBlock; i <= con.LastBlock; i++ { +func (p *poller) PollContract(con contract.Contract, lastBlock int64) error { + for i := con.StartingBlock; i <= lastBlock; i++ { if err := p.PollContractAt(con, i); err != nil { return err } diff --git a/pkg/contract_watcher/shared/poller/poller_suite_test.go b/pkg/contract_watcher/shared/poller/poller_suite_test.go deleted file mode 100644 index 9b4e94b5..00000000 --- a/pkg/contract_watcher/shared/poller/poller_suite_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package poller_test - -import ( - "github.com/sirupsen/logrus" - "io/ioutil" - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func TestPoller(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Poller Suite Test") -} - -var _ = BeforeSuite(func() { - logrus.SetOutput(ioutil.Discard) -}) diff --git a/pkg/contract_watcher/shared/repository/event_repository.go b/pkg/contract_watcher/shared/repository/event_repository.go index 742ca9ad..14959a49 100644 --- a/pkg/contract_watcher/shared/repository/event_repository.go +++ b/pkg/contract_watcher/shared/repository/event_repository.go @@ -97,7 +97,7 @@ func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, c // Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB) func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } @@ -151,7 +151,7 @@ func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types // Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB) func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } diff --git a/pkg/contract_watcher/shared/repository/method_repository.go b/pkg/contract_watcher/shared/repository/method_repository.go index 4747f141..2616675e 100644 --- a/pkg/contract_watcher/shared/repository/method_repository.go +++ b/pkg/contract_watcher/shared/repository/method_repository.go @@ -77,7 +77,7 @@ func (r *methodRepository) PersistResults(results []types.Result, methodInfo typ // Creates a custom postgres command to persist logs for the given event func (r *methodRepository) persistResults(results []types.Result, methodInfo types.Method, contractAddr, contractName string) error { - tx, err := r.DB.Begin() + tx, err := r.DB.Beginx() if err != nil { return err } diff --git a/pkg/fakes/mock_poller.go b/pkg/fakes/mock_poller.go index ab7287ee..2d782b18 100644 --- a/pkg/fakes/mock_poller.go +++ b/pkg/fakes/mock_poller.go @@ -8,7 +8,7 @@ type MockPoller struct { ContractName string } -func (*MockPoller) PollContract(con contract.Contract) error { +func (*MockPoller) PollContract(con contract.Contract, lastBlock int64) error { panic("implement me") } diff --git a/pkg/plugin/test_helpers/database.go b/pkg/plugin/test_helpers/database.go index 0ab5c63f..345683f0 100644 --- a/pkg/plugin/test_helpers/database.go +++ b/pkg/plugin/test_helpers/database.go @@ -54,7 +54,7 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) { } func TearDown(db *postgres.DB) { - tx, err := db.Begin() + tx, err := db.Beginx() Expect(err).NotTo(HaveOccurred()) _, err = tx.Exec(`DELETE FROM headers`) diff --git a/test_config/test_config.go b/test_config/test_config.go index b64a9278..0f0071ae 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -17,6 +17,7 @@ package test_config import ( + "errors" "fmt" "os" @@ -48,10 +49,10 @@ func setTestConfig() { TestConfig.SetConfigName("private") TestConfig.AddConfigPath("$GOPATH/src/github.com/vulcanize/vulcanizedb/environments/") err := TestConfig.ReadInConfig() - ipc := TestConfig.GetString("client.ipcPath") if err != nil { log.Fatal(err) } + ipc := TestConfig.GetString("client.ipcPath") hn := TestConfig.GetString("database.hostname") port := TestConfig.GetInt("database.port") name := TestConfig.GetString("database.name") @@ -71,10 +72,20 @@ func setInfuraConfig() { Infura.SetConfigName("infura") Infura.AddConfigPath("$GOPATH/src/github.com/vulcanize/vulcanizedb/environments/") err := Infura.ReadInConfig() - ipc := Infura.GetString("client.ipcpath") if err != nil { log.Fatal(err) } + ipc := Infura.GetString("client.ipcpath") + + // If we don't have an ipc path in the config file, check the env variable + if ipc == "" { + Infura.BindEnv("url", "INFURA_URL") + ipc = Infura.GetString("url") + } + if ipc == "" { + log.Fatal(errors.New("infura.toml IPC path or $INFURA_URL env variable need to be set")) + } + InfuraClient = config.Client{ IPCPath: ipc, }