PR fixes; remove all infura token references and setup travis to use encrypted env

variable; rest of the ethjson_rpc dependent tests extracted to integration_test
This commit is contained in:
Ian Norden 2019-03-14 16:49:27 -05:00
parent d3e172d9ab
commit 1aa849bcb4
32 changed files with 242 additions and 232 deletions

View File

@ -5,28 +5,25 @@ go:
services:
- postgresql
addons:
postgresql: "9.6"
postgresql: '9.6'
go_import_path: github.com/vulcanize/vulcanizedb
before_install:
# ginkgo golint dep goose
- make installtools
- bash ./scripts/install-postgres-10.sh
- curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add -
- echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list
- sudo apt-get update && sudo apt-get install yarn
before_script:
- sudo -u postgres createdb vulcanize_private
- make migrate NAME=vulcanize_private
- cd postgraphile && yarn
script:
- yarn test
- cd ../
- make test
- make integrationtest
notifications:
email: false
env:
matrix:
secure: hz6YPkTm59QhOQ2+05K+AWHw0wOoPjz9wqfUKZcuUi+ICdcuClXMt+hVpUDmwrGOCp8B5y8hyxr/iq7P+3MLwz1/2JYxc9S9MT1O0WXU8ZuWZR0LI1e2ZhZCF3E/JWq6c4atxFIEhcv7roBUkbUcRA8cpRdZmEmpSM8JyP0z76CezIG/HeyUdVW34DLjTBNB0TJdOZyfOh0aCvzgXR/kfKaSYNBhJY7j2UK7x0qK0UlQ/n7RHCrtjWoNWpuwl9bw1F5plMHOD9bq0oDG6gs1SFBaybfEMN71Hp0QxhD/u+1tVuHfGooYhzVgxStPSCpSkgQ7vgSZI766ErqPc3B6Wv9K+s5exPLgCykEiLorW6qI8A+mdiPIiIzLBRMcbF/kCo7gFh0kDIYbSTjS5COfjNw/fKsp59upXF4VtCDgVgjAemY6XT4lziZiVQwiK1Oyln8HrIux1aJEWgRGEQpQqwVeCUHClHus5Paf/N0Ci5f9NHh2zbkZvDuUF2uQu6Wc58wXHcVsloyfQibJrH1q2sQRqdiPfN5Y9l0igFzXILFd4itXMyMnSDQh6+GD6V0YY/hGufAs42UymjGYbEwZQP/gn8/bQpilngbmlJ7bB2nva70kXgqhuNiZM5XuFuQMIP3U2Tbm87FHEFUCoKPv2if9Ft74YkAuzVljAMo2YLQ=

View File

@ -138,6 +138,7 @@ false
If you have full rinkeby chaindata you can move it to `rinkeby_vulcanizedb_geth_data` docker volume to skip long wait of sync.
## Running the Tests
- Replace the empty `ipcPath` in the `environments/infura.toml` with a path to a full archival node's eth_jsonrpc endpoint (e.g. local geth node ipc path or infura url)
- `createdb vulcanize_private` will create the test db
- `make migrate NAME=vulcanize_private` will run the db migrations
- `make test` will run the unit tests and skip the integration tests
@ -154,12 +155,12 @@ Contract watchers work with a light or full sync vDB to fetch raw ethereum data
A watcher is composed of at least a fetcher and a transformer or set of transformers, where a fetcher is an interface for retrieving raw Ethereum data from some source (e.g. eth_jsonrpc, IPFS)
and a transformer is an interface for filtering through that raw Ethereum data to extract, process, and persist data for specific contracts or accounts.
### contractWatcher
## contractWatcher
The `contractWatcher` command is a built-in generic contract watcher. It can watch any and all events for a given contract provided the contract's ABI is available.
It also provides some state variable coverage by automating polling of public methods, with some restrictions:
1. The method must have 2 or less arguments
2. The method's arguments must all be of type address or bytes32 (hash)
3. The method must return a single value
1. The method's arguments must all be of type address or bytes32 (hash)
1. The method must return a single value
This command operates in two modes- `light` and `full`- which require a light or full-synced vulcanizeDB, respectively.
@ -177,7 +178,7 @@ This command takes a config of the form:
port = 5432
[client]
ipcPath = "path_to_ethjson_rpc"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[contract]
network = ""
@ -238,7 +239,7 @@ This command takes a config of the form:
At the very minimum, for each contract address an ABI and a starting block number need to be provided (or just the starting block if the ABI can be reliably fetched from Etherscan).
With just this information we will be able to watch all events at the contract, but with no additional filters and no method polling.
#### contractWatcher output
### contractWatcher output
Transformed events and polled method results are committed to Postgres in schemas and tables generated according to the contract abi.
@ -246,9 +247,9 @@ Schemas are created for each contract using the naming convention `<sync-type>_<
Under this schema, tables are generated for watched events as `<lowercase event name>_event` and for polled methods as `<lowercase method name>_method`
The 'method' and 'event' identifiers are tacked onto the end of the table names to prevent collisions between methods and events of the same lowercase name
Example:
### contractWatcher example:
Modify `./environments/example.toml` to replace `"path_to_ethjson_rpc"` with a path that points to an ethjson_rpc endpoint (e.g. a local geth node ipc path or an Infura url).
Modify `./environments/example.toml` to replace the empty `ipcPath` with a path that points to an ethjson_rpc endpoint (e.g. a local geth node ipc path or an Infura url).
This endpoint should be for an archival eth node if we want to perform method polling as this configuration is currently set up to do. To work with a non-archival full node,
remove the `balanceOf` method from the `0x8dd5fbce2f6a956c3022ba3663759011dd51e73e` (TrueUSD) contract.
@ -314,14 +315,14 @@ The addition of '_' after table names is to prevent collisions with reserved Pos
Also notice that the contract address used for the schema name has been down-cased.
### composeAndExecute
## composeAndExecute
The `composeAndExecute` command is used to compose and execute over an arbitrary set of custom transformers.
This is accomplished by generating a Go pluggin which allows our `vulcanizedb` binary to link to external transformers, so
long as they abide by our standard [interfaces](https://github.com/vulcanize/maker-vulcanizedb/tree/compose_and_execute/libraries/shared/transformer).
This command requires Go 1.11+ and [Go plugins](https://golang.org/pkg/plugin/) only work on Unix based systems.
#### Writing custom transformers
### Writing custom transformers
Storage Transformers
* [Guide](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/factories/storage/README.md)
* [Example](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/factories/storage/EXAMPLE.md)
@ -330,7 +331,7 @@ Event Transformers
* [Guide](https://github.com/vulcanize/maker-vulcanizedb/blob/event_docs/libraries/shared/factories/README.md)
* [Example](https://github.com/vulcanize/ens_transformers/tree/working)
#### composeAndExecute configuration
### composeAndExecute configuration
A .toml config file is specified when executing the command:
`./vulcanizedb composeAndExecute --config=./environments/config_name.toml`
@ -345,7 +346,7 @@ The config provides information for composing a set of transformers:
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
@ -365,7 +366,7 @@ The config provides information for composing a set of transformers:
rank = "0"
[exporter.transformer2]
path = "path/to/transformer2"
type = "eth_generic"
type = "eth_contract"
repository = "github.com/account/repo"
migrations = "db/migrations"
rank = "0"
@ -441,7 +442,7 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
}
```
#### Preparing transformer(s) to work as pluggins for composeAndExecute
### Preparing transformers to work as pluggins for composeAndExecute
To plug in an external transformer we need to:
* Create a [package](https://github.com/vulcanize/ens_transformers/blob/working/transformers/registry/new_owner/initializer/initializer.go)

View File

@ -42,7 +42,7 @@ var composeCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
home = "github.com/vulcanize/vulcanizedb"

View File

@ -42,7 +42,7 @@ var composeAndExecuteCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
@ -180,7 +180,7 @@ func composeAndExecute() {
gw := watcher.NewContractWatcher(&db, blockChain)
gw.AddTransformers(ethContractInitializers)
wg.Add(1)
go contractWatching(&gw, &wg)
go watchEthContract(&gw, &wg)
}
wg.Wait()
}

View File

@ -46,7 +46,7 @@ Requires a .toml config file:
port = 5432
[client]
ipcPath = "path_to_ethjson_rpc"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[contract]
network = ""
@ -112,12 +112,14 @@ func contractWatcher() {
}
for range ticker.C {
t.Execute()
err = t.Execute()
if err != nil {
log.Error("Execution error for transformer:", t.GetConfig().Name, err)
}
}
}
func init() {
rootCmd.AddCommand(contractWatcherCmd)
contractWatcherCmd.Flags().StringVarP(&mode, "mode", "o", "light", "'light' or 'full' mode to work with either light synced or fully synced vDB (default is light)")
}

View File

@ -46,7 +46,7 @@ var executeCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
name = "exampleTransformerExporter"
@ -128,7 +128,7 @@ func execute() {
gw := watcher.NewContractWatcher(&db, blockChain)
gw.AddTransformers(ethContractInitializers)
wg.Add(1)
go contractWatching(&gw, &wg)
go watchEthContract(&gw, &wg)
}
wg.Wait()
}
@ -155,10 +155,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute(recheck)
if err != nil {
// TODO Handle watcher errors in execute
}
w.Execute(recheck)
}
}
@ -169,23 +166,17 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute()
if err != nil {
// TODO Handle watcher errors in execute
}
w.Execute()
}
}
func contractWatching(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the ContractTransformerInitializer set using the contract watcher
log.Info("executing contract_watcher transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute(nil)
if err != nil {
// TODO Handle watcher errors in execute
}
w.Execute()
}
}

View File

@ -4,7 +4,7 @@
port = 5432
[client]
ipcPath = "path_to_ethjson_rpc"
ipcPath = ""
[contract]
network = ""

View File

@ -4,4 +4,4 @@
port = 5432
[client]
ipcPath = "https://mainnet.infura.io/J5Vd2fRtGsw0zZ0Ov3BL"
ipcPath = ""

View File

@ -50,7 +50,6 @@ var _ = Describe("contractWatcher full transformer", func() {
Expect(ok).To(Equal(true))
Expect(c.StartingBlock).To(Equal(int64(6194633)))
Expect(c.LastBlock).To(Equal(int64(6194634)))
Expect(c.Abi).To(Equal(constants.TusdAbiString))
Expect(c.Name).To(Equal("TrueUSD"))
Expect(c.Address).To(Equal(tusdAddr))

View File

@ -48,7 +48,6 @@ var _ = Describe("contractWatcher light transformer", func() {
Expect(ok).To(Equal(true))
Expect(c.StartingBlock).To(Equal(int64(6194632)))
Expect(c.LastBlock).To(Equal(int64(-1)))
Expect(c.Abi).To(Equal(constants.TusdAbiString))
Expect(c.Name).To(Equal("TrueUSD"))
Expect(c.Address).To(Equal(tusdAddr))
@ -203,6 +202,7 @@ var _ = Describe("contractWatcher light transformer", func() {
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(t.Start).To(Equal(int64(6885698)))
log := test_helpers.LightNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&log)
@ -318,7 +318,7 @@ var _ = Describe("contractWatcher light transformer", func() {
Describe("Execute- against both ENS and TrueUSD", func() {
BeforeEach(func() {
for i := 6885692; i < 6885702; i++ {
for i := 6885692; i <= 6885701; i++ {
header, err := blockChain.GetHeaderByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(header)
@ -332,6 +332,7 @@ var _ = Describe("contractWatcher light transformer", func() {
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(t.Start).To(Equal(int64(6885702)))
newOwnerLog := test_helpers.LightNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog)

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package poller_test
package integration_test
import (
"fmt"
@ -34,7 +34,7 @@ import (
var _ = Describe("Poller", func() {
var p poller.Poller
var contractPoller poller.Poller
var con *contract.Contract
var db *postgres.DB
var bc core.BlockChain
@ -46,7 +46,7 @@ var _ = Describe("Poller", func() {
Describe("Full sync mode", func() {
BeforeEach(func() {
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db, types.FullSync)
contractPoller = poller.NewPoller(bc, db, types.FullSync)
})
Describe("PollContract", func() {
@ -54,10 +54,9 @@ var _ = Describe("Poller", func() {
con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
Expect(con.Abi).To(Equal(constants.TusdAbiString))
con.StartingBlock = 6707322
con.LastBlock = 6707323
con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE"))
err := p.PollContract(*con)
err := contractPoller.PollContract(*con, 6707323)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.BalanceOf{}
@ -89,7 +88,7 @@ var _ = Describe("Poller", func() {
Expect(len(con.Methods)).To(Equal(1))
con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86"))
err := p.PollContractAt(*con, 6885877)
err := contractPoller.PollContractAt(*con, 6885877)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.Owner{}
@ -109,10 +108,9 @@ var _ = Describe("Poller", func() {
con = test_helpers.SetupTusdContract(nil, nil)
Expect(con.Abi).To(Equal(constants.TusdAbiString))
con.StartingBlock = 6707322
con.LastBlock = 6707323
con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE"))
err := p.PollContract(*con)
err := contractPoller.PollContract(*con, 6707323)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.BalanceOf{}
@ -125,7 +123,7 @@ var _ = Describe("Poller", func() {
Describe("FetchContractData", func() {
It("Calls a single contract method", func() {
var name = new(string)
err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
err := contractPoller.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
Expect(err).ToNot(HaveOccurred())
Expect(*name).To(Equal("TrueUSD"))
})
@ -135,7 +133,7 @@ var _ = Describe("Poller", func() {
Describe("Light sync mode", func() {
BeforeEach(func() {
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db, types.LightSync)
contractPoller = poller.NewPoller(bc, db, types.LightSync)
})
Describe("PollContract", func() {
@ -143,10 +141,9 @@ var _ = Describe("Poller", func() {
con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
Expect(con.Abi).To(Equal(constants.TusdAbiString))
con.StartingBlock = 6707322
con.LastBlock = 6707323
con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE"))
err := p.PollContract(*con)
err := contractPoller.PollContract(*con, 6707323)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.BalanceOf{}
@ -178,7 +175,7 @@ var _ = Describe("Poller", func() {
Expect(len(con.Methods)).To(Equal(1))
con.AddEmittedHash(common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"), common.HexToHash("0x7e74a86b6e146964fb965db04dc2590516da77f720bb6759337bf5632415fd86"))
err := p.PollContractAt(*con, 6885877)
err := contractPoller.PollContractAt(*con, 6885877)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.Owner{}
@ -198,10 +195,9 @@ var _ = Describe("Poller", func() {
con = test_helpers.SetupTusdContract(nil, nil)
Expect(con.Abi).To(Equal(constants.TusdAbiString))
con.StartingBlock = 6707322
con.LastBlock = 6707323
con.AddEmittedAddr(common.HexToAddress("0xfE9e8709d3215310075d67E3ed32A380CCf451C8"), common.HexToAddress("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE"))
err := p.PollContract(*con)
err := contractPoller.PollContract(*con, 6707323)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.BalanceOf{}
@ -214,11 +210,10 @@ var _ = Describe("Poller", func() {
con = test_helpers.SetupENSContract(nil, []string{"resolver"})
Expect(con.Abi).To(Equal(constants.ENSAbiString))
con.StartingBlock = 6921967
con.LastBlock = 6921968
con.EmittedAddrs = map[interface{}]bool{}
con.Piping = false
con.AddEmittedHash(common.HexToHash("0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8"))
err := p.PollContract(*con)
err := contractPoller.PollContract(*con, 6921968)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.Resolver{}
@ -230,10 +225,10 @@ var _ = Describe("Poller", func() {
test_helpers.TearDown(db)
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db, types.LightSync)
contractPoller = poller.NewPoller(bc, db, types.LightSync)
con.Piping = true
err = p.PollContract(*con)
err = contractPoller.PollContract(*con, 6921968)
Expect(err).ToNot(HaveOccurred())
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.resolver_method WHERE node_ = '0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8' AND block = '6921967'", constants.EnsContractAddress)).StructScan(&scanStruct)
@ -248,7 +243,7 @@ var _ = Describe("Poller", func() {
Describe("FetchContractData", func() {
It("Calls a single contract method", func() {
var name = new(string)
err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
err := contractPoller.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
Expect(err).ToNot(HaveOccurred())
Expect(*name).To(Equal("TrueUSD"))
})

View File

@ -264,7 +264,7 @@ func (repository *ExampleRepository) SetDB(db *postgres.DB) {
}
func (repository ExampleRepository) Create(headerID int64, models []interface{}) error {
tx, dBaseErr := repository.db.Begin()
tx, dBaseErr := repository.db.Beginx()
if dBaseErr != nil {
return dBaseErr
}

View File

@ -49,7 +49,6 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
return fmt.Errorf("initializers of type %T, not %T", inits, []transformer.ContractTransformerInitializer{})
}
watcher.Transformers = make([]transformer.ContractTransformer, 0, len(initializers))
for _, initializer := range initializers {
t := initializer(watcher.DB, watcher.BlockChain)
watcher.Transformers = append(watcher.Transformers, t)
@ -65,7 +64,7 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
return nil
}
func (watcher *ContractWatcher) Execute(interface{}) error {
func (watcher *ContractWatcher) Execute() error {
for _, transformer := range watcher.Transformers {
err := transformer.Execute()
if err != nil {

View File

@ -19,6 +19,7 @@ package config
import (
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"strings"
)
@ -65,20 +66,20 @@ type ContractConfig struct {
Piping map[string]bool
}
func (oc *ContractConfig) PrepConfig() {
func (contractConfig *ContractConfig) PrepConfig() {
addrs := viper.GetStringSlice("contract.addresses")
oc.Network = viper.GetString("contract.network")
oc.Addresses = make(map[string]bool, len(addrs))
oc.Abis = make(map[string]string, len(addrs))
oc.Methods = make(map[string][]string, len(addrs))
oc.Events = make(map[string][]string, len(addrs))
oc.MethodArgs = make(map[string][]string, len(addrs))
oc.EventArgs = make(map[string][]string, len(addrs))
oc.StartingBlocks = make(map[string]int64, len(addrs))
oc.Piping = make(map[string]bool, len(addrs))
contractConfig.Network = viper.GetString("contract.network")
contractConfig.Addresses = make(map[string]bool, len(addrs))
contractConfig.Abis = make(map[string]string, len(addrs))
contractConfig.Methods = make(map[string][]string, len(addrs))
contractConfig.Events = make(map[string][]string, len(addrs))
contractConfig.MethodArgs = make(map[string][]string, len(addrs))
contractConfig.EventArgs = make(map[string][]string, len(addrs))
contractConfig.StartingBlocks = make(map[string]int64, len(addrs))
contractConfig.Piping = make(map[string]bool, len(addrs))
// De-dupe addresses
for _, addr := range addrs {
oc.Addresses[strings.ToLower(addr)] = true
contractConfig.Addresses[strings.ToLower(addr)] = true
}
// Iterate over addresses to pull out config info for each contract
@ -87,26 +88,29 @@ func (oc *ContractConfig) PrepConfig() {
// Get and check abi
var abi string
_, abiOK := transformer["abi"]
abiInterface, abiOK := transformer["abi"]
if !abiOK {
log.Warnf("contract %s not configured with an ABI, will attempt to fetch it from Etherscan\r\n", addr)
} else {
abiInterface := transformer["abi"]
abi, abiOK = abiInterface.(string)
if !abiOK {
log.Fatal(addr, "transformer `abi` not of type []string")
}
}
oc.Abis[strings.ToLower(addr)] = abi
if abi != "" {
if _, abiErr := geth.ParseAbi(abi); abiErr != nil {
log.Fatal(addr, "transformer `abi` not valid JSON")
}
}
contractConfig.Abis[strings.ToLower(addr)] = abi
// Get and check events
events := make([]string, 0)
_, eventsOK := transformer["events"]
eventsInterface, eventsOK := transformer["events"]
if !eventsOK {
log.Warnf("contract %s not configured with a list of events to watch, will watch all events\r\n", addr)
events = []string{}
} else {
eventsInterface := transformer["events"]
eventsI, eventsOK := eventsInterface.([]interface{})
if !eventsOK {
log.Fatal(addr, "transformer `events` not of type []string\r\n")
@ -119,16 +123,15 @@ func (oc *ContractConfig) PrepConfig() {
events = append(events, str)
}
}
oc.Events[strings.ToLower(addr)] = events
contractConfig.Events[strings.ToLower(addr)] = events
// Get and check methods
methods := make([]string, 0)
_, methodsOK := transformer["methods"]
methodsInterface, methodsOK := transformer["methods"]
if !methodsOK {
log.Warnf("contract %s not configured with a list of methods to poll, will not poll any methods\r\n", addr)
methods = []string{}
} else {
methodsInterface := transformer["methods"]
methodsI, methodsOK := methodsInterface.([]interface{})
if !methodsOK {
log.Fatal(addr, "transformer `methods` not of type []string\r\n")
@ -141,16 +144,15 @@ func (oc *ContractConfig) PrepConfig() {
methods = append(methods, str)
}
}
oc.Methods[strings.ToLower(addr)] = methods
contractConfig.Methods[strings.ToLower(addr)] = methods
// Get and check eventArgs
eventArgs := make([]string, 0)
_, eventArgsOK := transformer["eventArgs"]
eventArgsInterface, eventArgsOK := transformer["eventArgs"]
if !eventArgsOK {
log.Warnf("contract %s not configured with a list of event arguments to filter for, will not filter events for specific emitted values\r\n", addr)
eventArgs = []string{}
} else {
eventArgsInterface := transformer["eventArgs"]
eventArgsI, eventArgsOK := eventArgsInterface.([]interface{})
if !eventArgsOK {
log.Fatal(addr, "transformer `eventArgs` not of type []string\r\n")
@ -163,16 +165,15 @@ func (oc *ContractConfig) PrepConfig() {
eventArgs = append(eventArgs, str)
}
}
oc.EventArgs[strings.ToLower(addr)] = eventArgs
contractConfig.EventArgs[strings.ToLower(addr)] = eventArgs
// Get and check methodArgs
methodArgs := make([]string, 0)
_, methodArgsOK := transformer["methodArgs"]
methodArgsInterface, methodArgsOK := transformer["methodArgs"]
if !methodArgsOK {
log.Warnf("contract %s not configured with a list of method argument values to poll with, will poll methods with all available arguments\r\n", addr)
methodArgs = []string{}
} else {
methodArgsInterface := transformer["methodArgs"]
methodArgsI, methodArgsOK := methodArgsInterface.([]interface{})
if !methodArgsOK {
log.Fatal(addr, "transformer `methodArgs` not of type []string\r\n")
@ -185,7 +186,7 @@ func (oc *ContractConfig) PrepConfig() {
methodArgs = append(methodArgs, str)
}
}
oc.MethodArgs[strings.ToLower(addr)] = methodArgs
contractConfig.MethodArgs[strings.ToLower(addr)] = methodArgs
// Get and check startingBlock
startInterface, startOK := transformer["startingblock"]
@ -196,7 +197,7 @@ func (oc *ContractConfig) PrepConfig() {
if !startOK {
log.Fatal(addr, "transformer `startingBlock` not of type int\r\n")
}
oc.StartingBlocks[strings.ToLower(addr)] = start
contractConfig.StartingBlocks[strings.ToLower(addr)] = start
// Get pipping
var piping bool
@ -211,6 +212,6 @@ func (oc *ContractConfig) PrepConfig() {
log.Fatal(addr, "transformer `piping` not of type bool\r\n")
}
}
oc.Piping[strings.ToLower(addr)] = piping
contractConfig.Piping[strings.ToLower(addr)] = piping
}
}

View File

@ -49,10 +49,10 @@ func (c *Converter) Update(info *contract.Contract) {
// Convert the given watched event log into a types.Log for the given event
func (c *Converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (*types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
values := make(map[string]interface{})
log := helpers.ConvertToLog(watchedEvent)
err := contract.UnpackLogIntoMap(values, event.Name, log)
err := boundContract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}

View File

@ -36,37 +36,40 @@ import (
// Requires a fully synced vDB and a running eth node (or infura)
type Transformer struct {
// Database interfaces
datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters()
datastore.WatchedEventRepository // Watched event log views, created by the log filters
repository.EventRepository // Holds transformed watched event log data
FilterRepository datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters()
WatchedEventRepository datastore.WatchedEventRepository // Watched event log views, created by the log filters
TransformedEventRepository repository.EventRepository // Holds transformed watched event log data
// Pre-processing interfaces
parser.Parser // Parses events and methods out of contract abi fetched using contract address
retriever.BlockRetriever // Retrieves first block for contract and current block height
Parser parser.Parser // Parses events and methods out of contract abi fetched using contract address
Retriever retriever.BlockRetriever // Retrieves first block for contract and current block height
// Processing interfaces
converter.ConverterInterface // Converts watched event logs into custom log
poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore
Converter converter.ConverterInterface // Converts watched event logs into custom log
Poller poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore
// Store contract configuration information
Config config.ContractConfig
// Store contract info as mapping to contract address
Contracts map[string]*contract.Contract
// Latest block in the block repository
LastBlock int64
}
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(con config.ContractConfig, BC core.BlockChain, DB *postgres.DB) *Transformer {
return &Transformer{
Poller: poller.NewPoller(BC, DB, types.FullSync),
Parser: parser.NewParser(con.Network),
BlockRetriever: retriever.NewBlockRetriever(DB),
ConverterInterface: &converter.Converter{},
Contracts: map[string]*contract.Contract{},
WatchedEventRepository: repositories.WatchedEventRepository{DB: DB},
FilterRepository: repositories.FilterRepository{DB: DB},
EventRepository: repository.NewEventRepository(DB, types.FullSync),
Config: con,
Poller: poller.NewPoller(BC, DB, types.FullSync),
Parser: parser.NewParser(con.Network),
Retriever: retriever.NewBlockRetriever(DB),
Converter: &converter.Converter{},
Contracts: map[string]*contract.Contract{},
WatchedEventRepository: repositories.WatchedEventRepository{DB: DB},
FilterRepository: repositories.FilterRepository{DB: DB},
TransformedEventRepository: repository.NewEventRepository(DB, types.FullSync),
Config: con,
}
}
@ -92,15 +95,10 @@ func (tr *Transformer) Init() error {
}
// Get first block and most recent block number in the header repo
firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock(contractAddr)
firstBlock, err := tr.Retriever.RetrieveFirstBlock(contractAddr)
if err != nil {
return err
}
lastBlock, err := tr.BlockRetriever.RetrieveMostRecentBlock()
if err != nil {
return err
}
// Set to specified range if it falls within the bounds
if firstBlock < tr.Config.StartingBlocks[contractAddr] {
firstBlock = tr.Config.StartingBlocks[contractAddr]
@ -108,7 +106,7 @@ func (tr *Transformer) Init() error {
// Get contract name if it has one
var name = new(string)
tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, name, lastBlock)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
// Remove any potential accidental duplicate inputs in arg filter values
eventArgs := map[string]bool{}
@ -128,7 +126,6 @@ func (tr *Transformer) Init() error {
Abi: tr.Parser.Abi(),
ParsedAbi: tr.Parser.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: tr.Parser.GetEvents(tr.Config.Events[contractAddr]),
Methods: tr.Parser.GetSelectMethods(tr.Config.Methods[contractAddr]),
FilterArgs: eventArgs,
@ -144,7 +141,7 @@ func (tr *Transformer) Init() error {
// Iterate over filters and push them to the repo using filter repository interface
for _, filter := range info.Filters {
err = tr.CreateFilter(filter)
err = tr.FilterRepository.CreateFilter(filter)
if err != nil {
return err
}
@ -154,6 +151,13 @@ func (tr *Transformer) Init() error {
tr.Contracts[contractAddr] = info
}
// Get the most recent block number in the block repo
var err error
tr.LastBlock, err = tr.Retriever.RetrieveMostRecentBlock()
if err != nil {
return err
}
return nil
}
@ -169,11 +173,11 @@ func (tr *Transformer) Execute() error {
// Iterate through all internal contracts
for _, con := range tr.Contracts {
// Update converter with current contract
tr.Update(con)
tr.Converter.Update(con)
// Iterate through contract filters and get watched event logs
for eventSig, filter := range con.Filters {
watchedEvents, err := tr.GetWatchedEvents(filter.Name)
watchedEvents, err := tr.WatchedEventRepository.GetWatchedEvents(filter.Name)
if err != nil {
return err
}
@ -181,7 +185,7 @@ func (tr *Transformer) Execute() error {
// Iterate over watched event logs
for _, we := range watchedEvents {
// Convert them to our custom log type
cstm, err := tr.ConverterInterface.Convert(*we, con.Events[eventSig])
cstm, err := tr.Converter.Convert(*we, con.Events[eventSig])
if err != nil {
return err
}
@ -191,7 +195,7 @@ func (tr *Transformer) Execute() error {
// If log is not empty, immediately persist in repo
// Run this in seperate goroutine?
err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name)
err = tr.TransformedEventRepository.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name)
if err != nil {
return err
}
@ -201,12 +205,19 @@ func (tr *Transformer) Execute() error {
// After persisting all watched event logs
// poller polls select contract methods
// and persists the results into custom pg tables
// Run this in seperate goroutine?
if err := tr.PollContract(*con); err != nil {
if err := tr.Poller.PollContract(*con, tr.LastBlock); err != nil {
return err
}
}
// At the end of a transformation cycle, and before the next
// update the latest block from the block repo
var err error
tr.LastBlock, err = tr.Retriever.RetrieveMostRecentBlock()
if err != nil {
return err
}
return nil
}

View File

@ -17,7 +17,6 @@
package transformer_test
import (
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/helpers/test_helpers/mocks"
"math/rand"
"time"
@ -27,6 +26,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/helpers/test_helpers/mocks"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/poller"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
@ -67,7 +67,7 @@ var _ = Describe("Transformer", func() {
Expect(ok).To(Equal(true))
Expect(c.StartingBlock).To(Equal(firstBlock))
Expect(c.LastBlock).To(Equal(mostRecentBlock))
Expect(t.LastBlock).To(Equal(mostRecentBlock))
Expect(c.Abi).To(Equal(fakeAbi))
Expect(c.Name).To(Equal(fakeContractName))
Expect(c.Address).To(Equal(fakeAddress))
@ -90,7 +90,7 @@ func getTransformer(blockRetriever retriever.BlockRetriever, parsr parser.Parser
return transformer.Transformer{
FilterRepository: &fakes.MockFilterRepository{},
Parser: parsr,
BlockRetriever: blockRetriever,
Retriever: blockRetriever,
Poller: pollr,
Contracts: map[string]*contract.Contract{},
Config: mocks.MockConfig,

View File

@ -48,7 +48,7 @@ func (c *Converter) Update(info *contract.Contract) {
// Convert the given watched event log into a types.Log for the given event
func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
returnLogs := make([]types.Log, 0, len(logs))
for _, log := range logs {
values := make(map[string]interface{})
@ -57,7 +57,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in
values[field.Name] = i
}
err := contract.UnpackLogIntoMap(values, event.Name, log)
err := boundContract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}

View File

@ -17,8 +17,8 @@
package repository
import (
"database/sql"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/hashicorp/golang-lru"
@ -130,7 +130,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string)
// Marks all of the provided headers checked for each of the provided column ids
func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error {
tx, err := r.db.Begin()
tx, err := r.db.Beginx()
if err != nil {
return err
}
@ -280,7 +280,7 @@ func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
}
// Used to mark a header checked as part of some external transaction so as to group into one commit
func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error {
func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO

View File

@ -32,7 +32,6 @@ import (
var _ = Describe("Repository", func() {
var db *postgres.DB
var bc core.BlockChain
var contractHeaderRepo repository.HeaderRepository // contract_watcher light header repository
var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository
var eventIDs = []string{
@ -47,7 +46,7 @@ var _ = Describe("Repository", func() {
}
BeforeEach(func() {
db, bc = test_helpers.SetupDBandBC()
db, _ = test_helpers.SetupDBandBC()
contractHeaderRepo = repository.NewHeaderRepository(db)
coreHeaderRepo = repositories.NewHeaderRepository(db)
})
@ -205,7 +204,7 @@ var _ = Describe("Repository", func() {
})
It("Returns at most 100 headers", func() {
add102Headers(coreHeaderRepo, bc)
add102Headers(coreHeaderRepo)
err := contractHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
@ -359,11 +358,11 @@ func addDiscontinuousHeaders(coreHeaderRepo repositories.HeaderRepository) {
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader4)
}
func add102Headers(coreHeaderRepo repositories.HeaderRepository, blockChain core.BlockChain) {
func add102Headers(coreHeaderRepo repositories.HeaderRepository) {
baseHeader := mocks.MockHeader1
for i := 6194632; i < 6194733; i++ {
header, err := blockChain.GetHeaderByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
_, err = coreHeaderRepo.CreateOrUpdateHeader(header)
_, err := coreHeaderRepo.CreateOrUpdateHeader(baseHeader)
Expect(err).ToNot(HaveOccurred())
baseHeader.BlockNumber++
}
}

View File

@ -18,12 +18,12 @@ package transformer
import (
"errors"
"github.com/vulcanize/vulcanizedb/pkg/config"
"strings"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/converter"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/repository"
@ -40,17 +40,17 @@ import (
// Requires a light synced vDB (headers) and a running eth node (or infura)
type Transformer struct {
// Database interfaces
srep.EventRepository // Holds transformed watched event log data
repository.HeaderRepository // Interface for interaction with header repositories
EventRepository srep.EventRepository // Holds transformed watched event log data
HeaderRepository repository.HeaderRepository // Interface for interaction with header repositories
// Pre-processing interfaces
parser.Parser // Parses events and methods out of contract abi fetched using contract address
retriever.BlockRetriever // Retrieves first block for contract and current block height
Parser parser.Parser // Parses events and methods out of contract abi fetched using contract address
Retriever retriever.BlockRetriever // Retrieves first block for contract
// Processing interfaces
fetcher.Fetcher // Fetches event logs, using header hashes
converter.ConverterInterface // Converts watched event logs into custom log
poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore
Fetcher fetcher.Fetcher // Fetches event logs, using header hashes
Converter converter.ConverterInterface // Converts watched event logs into custom log
Poller poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore
// Store contract configuration information
Config config.ContractConfig
@ -64,7 +64,7 @@ type Transformer struct {
sortedMethodIds map[string][]string // Map to sort method column ids by contract, for post fetch method polling
eventIds []string // Holds event column ids across all contract, for batch fetching of headers
eventFilters []common.Hash // Holds topic0 hashes across all contracts, for batch fetching of logs
start int64 // Hold the lowest starting block and the highest ending block
Start int64 // Hold the lowest starting block and the highest ending block
}
// Order-of-operations:
@ -77,15 +77,15 @@ type Transformer struct {
func NewTransformer(con config.ContractConfig, bc core.BlockChain, db *postgres.DB) *Transformer {
return &Transformer{
Poller: poller.NewPoller(bc, db, types.LightSync),
Fetcher: fetcher.NewFetcher(bc),
Parser: parser.NewParser(con.Network),
HeaderRepository: repository.NewHeaderRepository(db),
BlockRetriever: retriever.NewBlockRetriever(db),
ConverterInterface: &converter.Converter{},
Contracts: map[string]*contract.Contract{},
EventRepository: srep.NewEventRepository(db, types.LightSync),
Config: con,
Poller: poller.NewPoller(bc, db, types.LightSync),
Fetcher: fetcher.NewFetcher(bc),
Parser: parser.NewParser(con.Network),
HeaderRepository: repository.NewHeaderRepository(db),
Retriever: retriever.NewBlockRetriever(db),
Converter: &converter.Converter{},
Contracts: map[string]*contract.Contract{},
EventRepository: srep.NewEventRepository(db, types.LightSync),
Config: con,
}
}
@ -100,7 +100,7 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling
tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs
tr.start = 100000000000 // Hold the lowest starting block and the highest ending block
tr.Start = 100000000000
// Iterate through all internal contract addresses
for contractAddr := range tr.Config.Addresses {
@ -120,7 +120,7 @@ func (tr *Transformer) Init() error {
}
// Get first block and most recent block number in the header repo
firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock()
firstBlock, err := tr.Retriever.RetrieveFirstBlock()
if err != nil {
return err
}
@ -132,7 +132,7 @@ func (tr *Transformer) Init() error {
// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Abi(), contractAddr, "name", nil, name, -1)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
// Remove any potential accidental duplicate inputs
eventArgs := map[string]bool{}
@ -152,7 +152,6 @@ func (tr *Transformer) Init() error {
Abi: tr.Parser.Abi(),
ParsedAbi: tr.Parser.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: -1,
Events: tr.Parser.GetEvents(tr.Config.Events[contractAddr]),
Methods: tr.Parser.GetSelectMethods(tr.Config.Methods[contractAddr]),
FilterArgs: eventArgs,
@ -189,8 +188,8 @@ func (tr *Transformer) Init() error {
}
// Update start to the lowest block
if con.StartingBlock < tr.start {
tr.start = con.StartingBlock
if con.StartingBlock < tr.Start {
tr.Start = con.StartingBlock
}
}
@ -202,20 +201,20 @@ func (tr *Transformer) Execute() error {
return errors.New("error: transformer has no initialized contracts")
}
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log)
for _, con := range tr.Contracts {
sortedLogs[con.Address] = []gethTypes.Log{}
}
// Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.start, -1, tr.eventIds)
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if err != nil {
return err
}
// Iterate over headers
for _, header := range missingHeaders {
// Set `start` to this header
// This way if we throw an error but don't bring the execution cycle down (how it is currently handled)
// we restart the cycle at this header
tr.Start = header.BlockNumber
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log)
// And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if err != nil {
@ -233,6 +232,7 @@ func (tr *Transformer) Execute() error {
if err != nil {
return err
}
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
continue
}
@ -249,10 +249,10 @@ func (tr *Transformer) Execute() error {
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.ConverterInterface.Update(con)
tr.Converter.Update(con)
// Convert logs into batches of log mappings (eventName => []types.Logs
convertedLogs, err := tr.ConverterInterface.ConvertBatch(logs, con.Events, header.Id)
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
}
@ -281,8 +281,8 @@ func (tr *Transformer) Execute() error {
if err != nil {
return err
}
tr.start = header.BlockNumber + 1
// Success; setup to start at the next header
tr.Start = header.BlockNumber + 1
}
return nil

View File

@ -19,6 +19,7 @@ package transformer_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/retriever"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/transformer"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/contract"
@ -54,7 +55,47 @@ var _ = Describe("Transformer", func() {
Expect(ok).To(Equal(true))
Expect(c.StartingBlock).To(Equal(firstBlock))
Expect(c.LastBlock).To(Equal(int64(-1)))
Expect(c.Abi).To(Equal(fakeAbi))
Expect(c.Name).To(Equal(fakeContractName))
Expect(c.Address).To(Equal(fakeAddress))
})
It("Fails to initialize if first block cannot be fetched from vDB headers table", func() {
blockRetriever := &fakes.MockLightBlockRetriever{}
blockRetriever.FirstBlockErr = fakes.FakeError
t := getFakeTransformer(blockRetriever, &fakes.MockParser{}, &fakes.MockPoller{})
err := t.Init()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
})
Describe("Execute", func() {
It("Executes contract transformations", func() {
blockRetriever := &fakes.MockLightBlockRetriever{}
firstBlock := int64(1)
blockRetriever.FirstBlock = firstBlock
parsr := &fakes.MockParser{}
fakeAbi := "fake_abi"
parsr.AbiToReturn = fakeAbi
pollr := &fakes.MockPoller{}
fakeContractName := "fake_contract_name"
pollr.ContractName = fakeContractName
t := getFakeTransformer(blockRetriever, parsr, pollr)
err := t.Init()
Expect(err).ToNot(HaveOccurred())
c, ok := t.Contracts[fakeAddress]
Expect(ok).To(Equal(true))
Expect(c.StartingBlock).To(Equal(firstBlock))
Expect(c.Abi).To(Equal(fakeAbi))
Expect(c.Name).To(Equal(fakeContractName))
Expect(c.Address).To(Equal(fakeAddress))
@ -76,7 +117,7 @@ var _ = Describe("Transformer", func() {
func getFakeTransformer(blockRetriever retriever.BlockRetriever, parsr parser.Parser, pollr poller.Poller) transformer.Transformer {
return transformer.Transformer{
Parser: parsr,
BlockRetriever: blockRetriever,
Retriever: blockRetriever,
Poller: pollr,
HeaderRepository: &fakes.MockLightHeaderRepository{},
Contracts: map[string]*contract.Contract{},

View File

@ -34,7 +34,6 @@ type Contract struct {
Address string // Address of the contract
Network string // Network on which the contract is deployed; default empty "" is Ethereum mainnet
StartingBlock int64 // Starting block of the contract
LastBlock int64 // Most recent block on the network
Abi string // Abi string
ParsedAbi abi.ABI // Parsed abi
Events map[string]types.Event // List of events to watch

View File

@ -168,7 +168,6 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract
Abi: p.Abi(),
ParsedAbi: p.ParsedAbi(),
StartingBlock: 6194634,
LastBlock: 6507323,
Events: p.GetEvents(wantedEvents),
Methods: p.GetSelectMethods(wantedMethods),
MethodArgs: map[string]bool{},
@ -215,7 +214,6 @@ func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract {
Abi: p.Abi(),
ParsedAbi: p.ParsedAbi(),
StartingBlock: 6194634,
LastBlock: 6507323,
Events: p.GetEvents(wantedEvents),
Methods: p.GetSelectMethods(wantedMethods),
MethodArgs: map[string]bool{},
@ -224,7 +222,7 @@ func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract {
}
func TearDown(db *postgres.DB) {
tx, err := db.Begin()
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`)

View File

@ -98,7 +98,7 @@ func (p *parser) lookUp(contractAddr string) (string, error) {
return v, nil
}
return "", errors.New("ABI not present in lookup tabe")
return "", errors.New("ABI not present in lookup table")
}
// Returns only specified methods, if they meet the criteria

View File

@ -34,7 +34,7 @@ import (
)
type Poller interface {
PollContract(con contract.Contract) error
PollContract(con contract.Contract, lastBlock int64) error
PollContractAt(con contract.Contract, blockNumber int64) error
FetchContractData(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error
}
@ -52,8 +52,8 @@ func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *po
}
}
func (p *poller) PollContract(con contract.Contract) error {
for i := con.StartingBlock; i <= con.LastBlock; i++ {
func (p *poller) PollContract(con contract.Contract, lastBlock int64) error {
for i := con.StartingBlock; i <= lastBlock; i++ {
if err := p.PollContractAt(con, i); err != nil {
return err
}

View File

@ -1,35 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package poller_test
import (
"github.com/sirupsen/logrus"
"io/ioutil"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPoller(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Poller Suite Test")
}
var _ = BeforeSuite(func() {
logrus.SetOutput(ioutil.Discard)
})

View File

@ -97,7 +97,7 @@ func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, c
// Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB)
func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Begin()
tx, err := r.db.Beginx()
if err != nil {
return err
}
@ -151,7 +151,7 @@ func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types
// Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB)
func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Begin()
tx, err := r.db.Beginx()
if err != nil {
return err
}

View File

@ -77,7 +77,7 @@ func (r *methodRepository) PersistResults(results []types.Result, methodInfo typ
// Creates a custom postgres command to persist logs for the given event
func (r *methodRepository) persistResults(results []types.Result, methodInfo types.Method, contractAddr, contractName string) error {
tx, err := r.DB.Begin()
tx, err := r.DB.Beginx()
if err != nil {
return err
}

View File

@ -8,7 +8,7 @@ type MockPoller struct {
ContractName string
}
func (*MockPoller) PollContract(con contract.Contract) error {
func (*MockPoller) PollContract(con contract.Contract, lastBlock int64) error {
panic("implement me")
}

View File

@ -54,7 +54,7 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) {
}
func TearDown(db *postgres.DB) {
tx, err := db.Begin()
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM headers`)

View File

@ -17,6 +17,7 @@
package test_config
import (
"errors"
"fmt"
"os"
@ -48,10 +49,10 @@ func setTestConfig() {
TestConfig.SetConfigName("private")
TestConfig.AddConfigPath("$GOPATH/src/github.com/vulcanize/vulcanizedb/environments/")
err := TestConfig.ReadInConfig()
ipc := TestConfig.GetString("client.ipcPath")
if err != nil {
log.Fatal(err)
}
ipc := TestConfig.GetString("client.ipcPath")
hn := TestConfig.GetString("database.hostname")
port := TestConfig.GetInt("database.port")
name := TestConfig.GetString("database.name")
@ -71,10 +72,20 @@ func setInfuraConfig() {
Infura.SetConfigName("infura")
Infura.AddConfigPath("$GOPATH/src/github.com/vulcanize/vulcanizedb/environments/")
err := Infura.ReadInConfig()
ipc := Infura.GetString("client.ipcpath")
if err != nil {
log.Fatal(err)
}
ipc := Infura.GetString("client.ipcpath")
// If we don't have an ipc path in the config file, check the env variable
if ipc == "" {
Infura.BindEnv("url", "INFURA_URL")
ipc = Infura.GetString("url")
}
if ipc == "" {
log.Fatal(errors.New("infura.toml IPC path or $INFURA_URL env variable need to be set"))
}
InfuraClient = config.Client{
IPCPath: ipc,
}