method piping

This commit is contained in:
Ian Norden 2018-12-19 12:42:59 -06:00
parent 456c735087
commit 2cbe6e7a70
17 changed files with 352 additions and 294 deletions

View File

@ -139,21 +139,35 @@ false
If you have full rinkeby chaindata you can move it to `rinkeby_vulcanizedb_geth_data` docker volume to skip long wait of sync.
## omniWatcher and lightOmniWatcher
These commands require a pre-synced (full or light, respectively) vulcanizeDB (see above sections)
To watch all events of a contract:
These commands require a pre-synced (full or light) vulcanizeDB (see above sections)
To watch all events of a contract using a light synced vDB:
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address>`
- Or `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address>`
Or if you are using a full synced vDB, change the mode to full:
- Execute `./vulcanizedb omniWatcher --mode full --config <path to config.toml> --contract-address <contract address>`
To watch contracts on a network other than mainnet, use the network flag:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --network <ropsten, kovan, or rinkeby>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --network <ropsten, kovan, or rinkeby>`
To watch events within a certain block range use the starting block and ending block flags:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --starting-block-number <#> --ending-block-number <#>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --starting-block-number <#> --ending-block-number <#>`
To watch only specified events use the events flag:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --events <EventName1> --events <EventName2>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --events <EventName1> --events <EventName2>`
To watch events and poll the specified methods with any addresses and hashes emitted by the watched events utilize the methods flag:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --methods <methodName1> --methods <methodName2>`
To watch specified events and poll the specified method with any addresses and hashes emiited by the watched events:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --events <EventName1> --events <EventName2> --methods <methodName>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --methods <methodName1> --methods <methodName2>`
To watch specified events and poll the specified method with any addresses and hashes emitted by the watched events:
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --events <EventName1> --events <EventName2> --methods <methodName>`
To turn on method piping so that values returned from previous method calls are cached and used as arguments in subsequent method calls:
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --piping true --contract-address <contract address> --events <EventName1> --events <EventName2> --methods <methodName>`
To watch all types of events of the contract but only persist the ones that emit one of the filtered-for argument values:
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --event-args <arg1> --event-args <arg2>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --event-args <arg1> --event-args <arg2>`
To watch all events of the contract but only poll the specified method with specified argument values (if they are emitted from the watched events):
- Execute `./vulcanizedb lightOmniWatcher --config <path to config.toml> --contract-address <contract address> --methods <methodName> --method-args <arg1> --method-args <arg2>`
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --methods <methodName> --method-args <arg1> --method-args <arg2>`

View File

@ -1,61 +0,0 @@
Run lightSync, starting at the blockheight the ENS registry was published at.
`./vulcanize lightSync --config=./environments/<config.toml> --starting-block-number=3327417`
1. What is the label hash of this domain?
Q. Does this mean for a given namehash of "a.b.c" where we don't know what "c" is find keccak256(c)? Do we know the parent domain and/or owner address?
1. Watch NewOwner(bytes32 indexed node, bytes32 indexed label, address owner) events of the ENS Registry contract and filter for the parent node and/or owner address to narrow search
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewOwner`
2. For each node + label pair we find emitted, calculate the keccak256(abi.encodePacked(node, label)) and see if it matches our domain's namehash
3. If it does, keccak256(label) is our label hash
2. What is the parent domain of this domain?
1. Watch NewOwner(bytes32 indexed node, bytes32 indexed label, address owner) events of the ENS Registry contract
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewOwner`
2. If we know our label, filter for it and collect the node (parent domain namehash) that was emitted with it
3. If we don't know our label I believe we need to take a similar aqpproach as in section 1 where we try every keccak256(abi.encodePacked(node, label)) until we find the namehash for our domain
4. Call the Registry's resolver(bytes32 node) method for the parent node to find the parent domain's Resolver
5. Call its Resolver's name(bytes32 node) method for the parent node to find the parent domain's name
3. What are the subdomains of this domain?
1. Watch NewOwner(bytes32 indexed node, bytes32 indexed label, address owner) events of the ENS Registry contract
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewOwner`
2. Filter for our node (domain) and collect all the labels emitted with it
3. Calculate subdomain hashes: subnode = keccak256(abi.encodePacked(node, label))
4. Call the Registry's resolver(bytes32 node) method for a subnode to find the subdomain's Resolver
5. Call its Resolver's name(bytes32 node) method for a subnode to find the subdomain's name
4. What domains does this address own?
1. Watch NewOwner(bytes32 indexed node, bytes32 indexed label, address owner) and Transfer(bytes32 indexed node, address owner) events of the ENS Registry contract and filter for the address
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-eevents=NewOwner --contract-events=Transfer --event-filter-addresses=<address>`
2. Collect node and label values; calculate subnodes = keccak256(abi.encodePacked(node, label))
3. Aggregate nodes and subnodes into a list and check which of these they still own at a given blockheight by iterating over the list and calling the Registry's owner(bytes32 node) method
4. Call the Registry's resolver(bytes32 node) method for a node to find the domain's Resolver
5. Call its Resolver's name(bytes32 node) method for the node to find the domain's name
5. What names point to this address?
Q. Is this in terms of which ENS nodes point to a given Resolver address? E.g. All nodes where the ENS records[node].resolver == address? Or is this in terms of Resolver records? E.g. All the records[node].names where the Resolver records[node].addr == address
1. In the former case, watch NewResolver(bytes32 indexed node, address resolver) events of the Registry and filter for the account address
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewResolver --event-filter-addresses=<address>`
2. Generate a list of nodes that have pointed to this resolver address
3. Check which of these still point at the address by iterating over the list and calling the Registry's resolver(bytes32 node) method
1. In the latter case, watch AddrChanged(bytes32 indexed node, address a) events of the Resolver and filter for the account address
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3648359 --contract-address=0x1da022710dF5002339274AaDEe8D58218e9D6AB5 --contract-events=AddrChanged --event-filter-addresses=<address>`
2. Generate our list of nodes that have pointed towards our address
3. Check which of these they still own at a given blockheight by iterating over the list and calling the Resolver's addr(bytes32 node) method
4. We can then fetch the string names of these nodes using the Resolver's name(bytes32 node) method.
Currently the only filtering that can be done during event watching is for addresses and the only methods
that can be polled in an automated fashion are ones that take only address-type arguments (of which there
are less than three) and return a single value. For the sake of answering these questions it would be really helpful if
we could also perform []byte filtering on the events and automate polling of events that take []byte-type arguments. I am
currently working on adding this in, and once it is in you would be able to automate more of the steps in these processes.
E.g. you will be able to run
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewOwner --contract-events=Transfer --event-args=<address> --contract-methods=owner`
To automate the process in question 4 through step 3 (it will collect node []byte values emitted from the events it watches and then use those to call the owner method, persisting the results)
Or
`./vulcanize lightOmniWacther --config=./environments/<config.toml> --starting-block-number=3327417 --contract-address=0x314159265dD8dbb310642f98f50C066173C1259b --contract-events=NewOwner --event-args=<bytes-to-filter-for>`
To provide automated filtering for node []byte values in question 3.

View File

@ -19,12 +19,12 @@ package converter
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
@ -57,12 +57,6 @@ func (c *converter) Update(info *contract.Contract) {
func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (*types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
values := make(map[string]interface{})
for _, field := range event.Fields {
var i interface{}
values[field.Name] = i
}
log := helpers.ConvertToLog(watchedEvent)
err := contract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
@ -93,7 +87,7 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (
case []byte:
b := input.([]byte)
strValues[fieldName] = hexutil.Encode(b)
if len(b) == 32 {
if len(b) == 32 { // collect byte arrays of size 32 as hashes
seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName]))
}
case byte:

View File

@ -34,7 +34,7 @@ var _ = Describe("Converter", func() {
var err error
BeforeEach(func() {
con = test_helpers.SetupTusdContract(wantedEvents, []string{})
con = test_helpers.SetupTusdContract(wantedEvents, []string{"balanceOf"})
})
Describe("Update", func() {

View File

@ -145,12 +145,12 @@ func (t *transformer) Init() error {
Name: *name,
Network: t.Network,
Address: contractAddr,
Abi: t.Abi(),
ParsedAbi: t.ParsedAbi(),
Abi: t.Parser.Abi(),
ParsedAbi: t.Parser.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: t.GetEvents(subset),
Methods: t.GetSelectMethods(t.WantedMethods[contractAddr]),
Events: t.Parser.GetEvents(subset),
Methods: t.Parser.GetSelectMethods(t.WantedMethods[contractAddr]),
FilterArgs: eventArgs,
MethodArgs: methodArgs,
CreateAddrList: t.CreateAddrList[contractAddr],

View File

@ -131,7 +131,7 @@ func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID in
return returnLogs, nil
}
// Convert the given watched event logs into types.Logs
// Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs
func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
eventsToLogs := make(map[string][]types.Log)
@ -142,21 +142,16 @@ func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
// If the log is of this event type, process it as such
if event.Sig() == log.Topics[0] {
values := make(map[string]interface{})
for _, field := range event.Fields {
var i interface{}
values[field.Name] = i
}
err := contract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}
// Postgres cannot handle custom types, so we will resolve everything to strings
strValues := make(map[string]string, len(values))
// Keep track of addresses and hashes emitted from events
seenAddrs := make([]interface{}, 0, len(values))
seenHashes := make([]interface{}, 0, len(values))
for fieldName, input := range values {
// Postgres cannot handle custom types, resolve everything to strings
switch input.(type) {
case *big.Int:
b := input.(*big.Int)
@ -176,8 +171,8 @@ func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
case []byte:
b := input.([]byte)
strValues[fieldName] = hexutil.Encode(b)
if len(b) == 32 {
seenHashes = append(seenHashes, common.HexToHash(strValues[fieldName]))
if len(b) == 32 { // collect byte arrays of size 32 as hashes
seenHashes = append(seenHashes, common.BytesToHash(b))
}
case byte:
b := input.(byte)
@ -187,7 +182,7 @@ func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
}
}
// Only hold onto logs that pass our address filter, if any
// Only hold onto logs that pass our argument filter, if any
if c.ContractInfo.PassesEventFilter(strValues) {
raw, err := json.Marshal(log)
if err != nil {
@ -202,7 +197,7 @@ func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
Id: headerID,
})
// Cache emitted values if their caching is turned on
// Cache emitted values that pass the argument filter if their caching is turned on
if c.ContractInfo.EmittedAddrs != nil {
c.ContractInfo.AddEmittedAddr(seenAddrs...)
}
@ -216,7 +211,3 @@ func (c *converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
return eventsToLogs, nil
}
func (c *converter) handleDSNote() {
}

View File

@ -157,12 +157,12 @@ func (tr *transformer) Init() error {
Name: *name,
Network: tr.Network,
Address: contractAddr,
Abi: tr.Abi(),
ParsedAbi: tr.ParsedAbi(),
Abi: tr.Parser.Abi(),
ParsedAbi: tr.Parser.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: tr.GetEvents(subset),
Methods: tr.GetSelectMethods(tr.WantedMethods[contractAddr]),
Events: tr.Parser.GetEvents(subset),
Methods: tr.Parser.GetSelectMethods(tr.WantedMethods[contractAddr]),
FilterArgs: eventArgs,
MethodArgs: methodArgs,
CreateAddrList: tr.CreateAddrList[contractAddr],
@ -175,25 +175,25 @@ func (tr *transformer) Init() error {
}
func (tr *transformer) Execute() error {
if len(tr.Contracts) == 0 {
cLen := len(tr.Contracts)
if cLen == 0 {
return errors.New("error: transformer has no initialized contracts")
}
cLen := len(tr.Contracts)
contractAddresses := make([]string, 0, cLen) // Holds all contract addresses, for batch fetching of logs
sortedIds := make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing
sortedEventIds := make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing and persisting of logs
sortedMethodIds := make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling
eventIds := make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
eventFilters := make([]common.Hash, 0) // Holds topic hashes across all contracts, for batch fetching of logs
eventFilters := make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs
sortedLogs := make(map[string][]gethTypes.Log) // Map to sort batch fetched logs by which contract they belong to, for post fetch processing
var start, end int64 // Hold the lowest starting block and the highest ending block
start = 100000000
start = 100000000000
end = -1
// Cycle through all contracts and extract info needed for fetching and post-processing
for _, con := range tr.Contracts {
eLen := len(con.Events)
sortedLogs[con.Address] = []gethTypes.Log{}
sortedIds[con.Address] = make([]string, 0, eLen)
sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
contractAddresses = append(contractAddresses, con.Address)
for _, event := range con.Events {
// Generate eventID and use it to create a checked_header column if one does not already exist
eventId := strings.ToLower(event.Name + "_" + con.Address)
@ -202,12 +202,23 @@ func (tr *transformer) Execute() error {
return err
}
// Keep track of this event id; sorted and unsorted
sortedIds[con.Address] = append(sortedIds[con.Address], eventId)
sortedEventIds[con.Address] = append(sortedEventIds[con.Address], eventId)
eventIds = append(eventIds, eventId)
// Append this event sig to the filters
eventFilters = append(eventFilters, event.Sig())
}
contractAddresses = append(contractAddresses, con.Address)
// Create checked_headers columns for each method id and generate list of all method ids
sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil {
return err
}
sortedMethodIds[con.Address] = append(sortedMethodIds[con.Address], methodId)
}
// Update start to the lowest block and end to the highest block
if con.StartingBlock < start {
start = con.StartingBlock
@ -217,7 +228,7 @@ func (tr *transformer) Execute() error {
}
}
// Find unchecked headers for all events across all contracts
// Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(start, end, eventIds)
if err != nil {
return err
@ -231,13 +242,13 @@ func (tr *transformer) Execute() error {
return err
}
// Mark the header checked for all of these eventIDs and continue to next iteration if no logs are found
// Mark the header checked for all of these eventIDs and continue to method polling and then the next iteration if no logs are found
if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
if err != nil {
return err
}
continue
goto Polling
}
// Sort logs by the contract they belong to
@ -259,7 +270,7 @@ func (tr *transformer) Execute() error {
// Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs {
// If logs are empty, mark checked
// If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 {
eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
@ -269,51 +280,47 @@ func (tr *transformer) Execute() error {
continue
}
// If logs aren't empty, persist them
// Headers are marked checked in the persistlogs transactions
// Header is marked checked in the transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
}
}
}
Polling:
// Poll contracts at this block height
err = tr.pollContracts(header, sortedMethodIds)
if err != nil {
return err
}
}
return nil
}
// Used to poll contract methods at a given header
func (tr *transformer) pollContracts(header core.Header, sortedMethodIds map[string][]string) error {
for _, con := range tr.Contracts {
// Skip method polling processes if no methods are specified
if len(con.Methods) == 0 {
// Also don't try to poll methods below this contract's specified starting block
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
continue
}
// Create checked_headers columns for each method id and generate list of all method ids
methodIds := make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
err = tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil {
return err
}
methodIds = append(methodIds, methodId)
}
// Retrieve headers that have been checked for all of this contract's events but haven not been checked for this contract's methods
missingHeaders, err = tr.HeaderRepository.MissingMethodsCheckedEventsIntersection(con.StartingBlock, con.LastBlock, methodIds, sortedIds[conAddr])
// Poll all methods for this contract at this header
err := tr.Poller.PollContractAt(*con, header.BlockNumber)
if err != nil {
return err
}
// Poll over the missing headers
for _, header := range missingHeaders {
err = tr.Poller.PollContractAt(*con, header.BlockNumber)
// Mark this header checked for the methods
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if err != nil {
return err
}
}
// Mark those headers checked for the methods
err = tr.HeaderRepository.MarkHeadersCheckedForAll(missingHeaders, methodIds)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -36,8 +36,8 @@ type Contract struct {
LastBlock int64 // Most recent block on the network
Abi string // Abi string
ParsedAbi abi.ABI // Parsed abi
Events map[string]types.Event // Map of events to their names
Methods map[string]types.Method // Map of methods to their names
Events map[string]types.Event // List of events to watch
Methods []types.Method // List of methods to poll
Filters map[string]filters.LogFilter // Map of event filters to their event names; used only for full sync watcher
FilterArgs map[string]bool // User-input list of values to filter event logs for
MethodArgs map[string]bool // User-input list of values to limit method polling to

View File

@ -208,7 +208,7 @@ var _ = Describe("Contract", func() {
info = &contract.Contract{}
info.FilterArgs = map[string]bool{}
info.MethodArgs = map[string]bool{}
info.Methods = map[string]types.Method{}
info.Methods = []types.Method{}
info.EmittedAddrs = map[interface{}]bool{}
})

View File

@ -90,6 +90,14 @@ type BalanceOf struct {
Balance string `db:"returned"`
}
type Resolver struct {
Id int64 `db:"id"`
TokenName string `db:"token_name"`
Block int64 `db:"block"`
Node string `db:"node_"`
Address string `db:"returned"`
}
type Owner struct {
Id int64 `db:"id"`
TokenName string `db:"token_name"`
@ -261,12 +269,30 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr3`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr2`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS methodname_contractaddr3`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS balanceof_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS newowner_0x314159265dd8dbb310642f98f50c066173c1259b`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS owner_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS owner_0x314159265dd8dbb310642f98f50c066173c1259b`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`)
Expect(err).NotTo(HaveOccurred())

View File

@ -55,23 +55,42 @@ func (p *parser) Parse() error {
return err
}
// Returns wanted methods, if they meet the criteria, as map of types.Methods
// Empty wanted array => all methods that fit are returned
// Returns only specified methods, if they meet the criteria
// Returns as array with methods in same order they were specified
// Nil wanted array => no events are returned
func (p *parser) GetSelectMethods(wanted []string) map[string]types.Method {
addrMethods := map[string]types.Method{}
func (p *parser) GetSelectMethods(wanted []string) []types.Method {
wLen := len(wanted)
if wLen == 0 {
return nil
}
methods := make([]types.Method, wLen)
for _, m := range p.parsedAbi.Methods {
for i, name := range wanted {
if name == m.Name && okTypes(m, wanted) {
methods[i] = types.NewMethod(m)
}
}
}
return methods
}
// Returns wanted methods
// Empty wanted array => all methods are returned
// Nil wanted array => no methods are returned
func (p *parser) GetMethods(wanted []string) []types.Method {
if wanted == nil {
return nil
}
methods := make([]types.Method, 0)
length := len(wanted)
for _, m := range p.parsedAbi.Methods {
if okInputTypes(m, wanted) {
wantedMethod := types.NewMethod(m)
addrMethods[wantedMethod.Name] = wantedMethod
if length == 0 || stringInSlice(wanted, m.Name) {
methods = append(methods, types.NewMethod(m))
}
}
return addrMethods
return methods
}
// Returns wanted events as map of types.Events
@ -89,17 +108,6 @@ func (p *parser) GetEvents(wanted []string) map[string]types.Event {
return events
}
func wantType(arg abi.Argument) bool {
wanted := []byte{abi.UintTy, abi.IntTy, abi.BoolTy, abi.StringTy, abi.AddressTy, abi.HashTy}
for _, ty := range wanted {
if arg.Type.T == ty {
return true
}
}
return false
}
func stringInSlice(list []string, s string) bool {
for _, b := range list {
if b == s {
@ -110,7 +118,7 @@ func stringInSlice(list []string, s string) bool {
return false
}
func okInputTypes(m abi.Method, wanted []string) bool {
func okTypes(m abi.Method, wanted []string) bool {
// Only return method if it has less than 3 arguments, a single output value, and it is a method we want or we want all methods (empty 'wanted' slice)
if len(m.Inputs) < 3 && len(m.Outputs) == 1 && (len(wanted) == 0 || stringInSlice(wanted, m.Name)) {
// Only return methods if inputs are all of accepted types and output is of the accepted types

View File

@ -33,8 +33,8 @@ type Parser interface {
Parse(contractAddr string) error
Abi() string
ParsedAbi() abi.ABI
GetMethods(wanted []string) map[string]types.Method
GetSelectMethods(wanted []string) map[string]types.Method
GetMethods(wanted []string) []types.Method
GetSelectMethods(wanted []string) []types.Method
GetEvents(wanted []string) map[string]types.Event
}
@ -91,37 +91,38 @@ func (p *parser) lookUp(contractAddr string) (string, error) {
return "", errors.New("ABI not present in lookup tabe")
}
// Returns wanted methods, if they meet the criteria, as map of types.Methods
// Empty wanted array => all methods that fit are returned
// Returns only specified methods, if they meet the criteria
// Returns as array with methods in same order they were specified
// Nil wanted array => no events are returned
func (p *parser) GetSelectMethods(wanted []string) map[string]types.Method {
addrMethods := map[string]types.Method{}
func (p *parser) GetSelectMethods(wanted []string) []types.Method {
wLen := len(wanted)
if wLen == 0 {
return nil
}
methods := make([]types.Method, wLen)
for _, m := range p.parsedAbi.Methods {
for i, name := range wanted {
if name == m.Name && okTypes(m, wanted) {
methods[i] = types.NewMethod(m)
}
}
}
return methods
}
// Returns wanted methods
// Empty wanted array => all methods are returned
// Nil wanted array => no methods are returned
func (p *parser) GetMethods(wanted []string) []types.Method {
if wanted == nil {
return nil
}
for _, m := range p.parsedAbi.Methods {
if okInputTypes(m, wanted) {
addrMethods[m.Name] = types.NewMethod(m)
}
}
return addrMethods
}
// Returns wanted methods as map of types.Methods
// Empty wanted array => all events are returned
// Nil wanted array => no events are returned
func (p *parser) GetMethods(wanted []string) map[string]types.Method {
methods := map[string]types.Method{}
if wanted == nil {
return methods
}
methods := make([]types.Method, 0)
length := len(wanted)
for _, m := range p.parsedAbi.Methods {
if length == 0 || stringInSlice(wanted, m.Name) {
methods[m.Name] = types.NewMethod(m)
methods = append(methods, types.NewMethod(m))
}
}
@ -169,7 +170,7 @@ func okReturnType(arg abi.Argument) bool {
return false
}
func okInputTypes(m abi.Method, wanted []string) bool {
func okTypes(m abi.Method, wanted []string) bool {
// Only return method if it has less than 3 arguments, a single output value, and it is a method we want or we want all methods (empty 'wanted' slice)
if len(m.Inputs) < 3 && len(m.Outputs) == 1 && (len(wanted) == 0 || stringInSlice(wanted, m.Name)) {
// Only return methods if inputs are all of accepted types and output is of the accepted types
@ -178,12 +179,18 @@ func okInputTypes(m abi.Method, wanted []string) bool {
}
for _, input := range m.Inputs {
switch input.Type.T {
case abi.AddressTy, abi.HashTy, abi.BytesTy, abi.FixedBytesTy:
// Addresses are properly labeled and caught
// But hashes tend to not be explicitly labeled and caught
// Instead bytes32 are assumed to be hashes
case abi.AddressTy, abi.HashTy:
case abi.FixedBytesTy:
if input.Type.Size != 32 {
return false
}
default:
return false
}
}
return true
}

View File

@ -25,6 +25,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var _ = Describe("Parser", func() {
@ -48,15 +49,14 @@ var _ = Describe("Parser", func() {
Expect(parsedAbi).To(Equal(expectedAbi))
methods := mp.GetSelectMethods([]string{"balanceOf"})
_, ok := methods["totalSupply"]
Expect(ok).To(Equal(false))
m, ok := methods["balanceOf"]
Expect(ok).To(Equal(true))
Expect(len(m.Args)).To(Equal(1))
Expect(len(m.Return)).To(Equal(1))
Expect(len(methods)).To(Equal(1))
balOf := methods[0]
Expect(balOf.Name).To(Equal("balanceOf"))
Expect(len(balOf.Args)).To(Equal(1))
Expect(len(balOf.Return)).To(Equal(1))
events := mp.GetEvents([]string{"Transfer"})
_, ok = events["Mint"]
_, ok := events["Mint"]
Expect(ok).To(Equal(false))
e, ok := events["Transfer"]
Expect(ok).To(Equal(true))
@ -119,99 +119,108 @@ var _ = Describe("Parser", func() {
})
})
Describe("GetSelectMethods", func() {
It("Parses and returns only methods specified in passed array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
methods := p.GetSelectMethods([]string{"balanceOf"})
Expect(len(methods)).To(Equal(1))
balOf := methods[0]
Expect(balOf.Name).To(Equal("balanceOf"))
Expect(len(balOf.Args)).To(Equal(1))
Expect(len(balOf.Return)).To(Equal(1))
abiTy := balOf.Args[0].Type.T
Expect(abiTy).To(Equal(abi.AddressTy))
pgTy := balOf.Args[0].PgType
Expect(pgTy).To(Equal("CHARACTER VARYING(66)"))
abiTy = balOf.Return[0].Type.T
Expect(abiTy).To(Equal(abi.UintTy))
pgTy = balOf.Return[0].PgType
Expect(pgTy).To(Equal("DECIMAL"))
})
It("Parses and returns methods in the order they were specified", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
selectMethods := p.GetSelectMethods([]string{"balanceOf", "allowance"})
Expect(len(selectMethods)).To(Equal(2))
balOf := selectMethods[0]
allow := selectMethods[1]
Expect(balOf.Name).To(Equal("balanceOf"))
Expect(allow.Name).To(Equal("allowance"))
})
It("Returns nil if given a nil or empty array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
var nilArr []types.Method
selectMethods := p.GetSelectMethods([]string{})
Expect(selectMethods).To(Equal(nilArr))
selectMethods = p.GetMethods(nil)
Expect(selectMethods).To(Equal(nilArr))
})
})
Describe("GetMethods", func() {
It("Parses and returns only methods specified in passed array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
selectMethods := p.GetMethods([]string{"balanceOf"})
methods := p.GetMethods([]string{"balanceOf"})
Expect(len(methods)).To(Equal(1))
m, ok := selectMethods["balanceOf"]
Expect(ok).To(Equal(true))
balOf := methods[0]
Expect(balOf.Name).To(Equal("balanceOf"))
Expect(len(balOf.Args)).To(Equal(1))
Expect(len(balOf.Return)).To(Equal(1))
abiTy := m.Args[0].Type.T
abiTy := balOf.Args[0].Type.T
Expect(abiTy).To(Equal(abi.AddressTy))
pgTy := m.Args[0].PgType
pgTy := balOf.Args[0].PgType
Expect(pgTy).To(Equal("CHARACTER VARYING(66)"))
abiTy = m.Return[0].Type.T
abiTy = balOf.Return[0].Type.T
Expect(abiTy).To(Equal(abi.UintTy))
pgTy = m.Return[0].PgType
pgTy = balOf.Return[0].PgType
Expect(pgTy).To(Equal("DECIMAL"))
_, ok = selectMethods["totalSupply"]
Expect(ok).To(Equal(false))
})
It("Parses and returns all methods if passed an empty array", func() {
It("Returns nil if given a nil array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
var nilArr []types.Method
selectMethods := p.GetMethods(nil)
Expect(selectMethods).To(Equal(nilArr))
})
It("Returns every method if given an empty array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
selectMethods := p.GetMethods([]string{})
_, ok := selectMethods["balanceOf"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["totalSupply"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["allowance"]
Expect(ok).To(Equal(true))
})
It("Parses and returns no methods if pass a nil array", func() {
contractAddr := "0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
selectMethods := p.GetMethods(nil)
Expect(len(selectMethods)).To(Equal(0))
})
})
Describe("GetAddrMethods", func() {
It("Parses and returns only methods whose inputs, if any, are all of type address, hash or []byte", func() {
contractAddr := "0xDdE2D979e8d39BB8416eAfcFC1758f3CaB2C9C72"
err = p.Parse(contractAddr)
Expect(err).ToNot(HaveOccurred())
wanted := []string{"isApprovedForAll", "supportsInterface", "getApproved", "totalSupply", "balanceOf"}
methods := p.GetMethods(wanted)
selectMethods := p.GetSelectMethods(wanted)
_, ok := selectMethods["totalSupply"]
Expect(ok).To(Equal(true))
_, ok = methods["totalSupply"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["balanceOf"]
Expect(ok).To(Equal(true))
_, ok = methods["balanceOf"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["isApprovedForAll"]
Expect(ok).To(Equal(true))
_, ok = methods["isApprovedForAll"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["supportsInterface"]
Expect(ok).To(Equal(true))
_, ok = methods["supportsInterface"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["getApproved"]
Expect(ok).To(Equal(false))
_, ok = methods["getApproved"]
Expect(ok).To(Equal(true))
_, ok = selectMethods["name"]
Expect(ok).To(Equal(false))
_, ok = methods["name"]
Expect(ok).To(Equal(false))
Expect(len(selectMethods)).To(Equal(22))
})
})
})

View File

@ -46,7 +46,6 @@ type poller struct {
}
func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *poller {
return &poller{
MethodRepository: repository.NewMethodRepository(db, mode),
bc: blockChain,
@ -105,6 +104,8 @@ func (p *poller) pollNoArgAt(m types.Method, bn int64) error {
return err
}
// Cache returned value if piping is turned on
p.cache(out)
result.Output = strOut
// Persist result immediately
@ -129,7 +130,7 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error {
// the correct argument set to iterate over
var args map[interface{}]bool
switch m.Args[0].Type.T {
case abi.HashTy, abi.FixedBytesTy, abi.BytesTy:
case abi.HashTy, abi.FixedBytesTy:
args = p.contract.EmittedHashes
case abi.AddressTy:
args = p.contract.EmittedAddrs
@ -152,13 +153,15 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error {
if err != nil {
return err
}
p.cache(out)
// Write inputs and outputs to result and append result to growing set
result.Inputs = strIn
result.Output = strOut
results = append(results, result)
}
// Persist results as batch
// Persist result set as batch
err := p.PersistResults(results, m, p.contract.Address, p.contract.Name)
if err != nil {
return errors.New(fmt.Sprintf("poller error persisting 1 argument method result\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", bn, m.Name, p.contract.Address, err))
@ -180,7 +183,7 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error {
// the correct argument sets to iterate over
var firstArgs map[interface{}]bool
switch m.Args[0].Type.T {
case abi.HashTy, abi.FixedBytesTy, abi.BytesTy:
case abi.HashTy, abi.FixedBytesTy:
firstArgs = p.contract.EmittedHashes
case abi.AddressTy:
firstArgs = p.contract.EmittedAddrs
@ -191,7 +194,7 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error {
var secondArgs map[interface{}]bool
switch m.Args[1].Type.T {
case abi.HashTy, abi.FixedBytesTy, abi.BytesTy:
case abi.HashTy, abi.FixedBytesTy:
secondArgs = p.contract.EmittedHashes
case abi.AddressTy:
secondArgs = p.contract.EmittedAddrs
@ -218,6 +221,8 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error {
return err
}
p.cache(out)
result.Output = strOut
result.Inputs = strIn
results = append(results, result)
@ -238,6 +243,28 @@ func (p *poller) FetchContractData(contractAbi, contractAddress, method string,
return p.bc.FetchContractData(contractAbi, contractAddress, method, methodArgs, result, blockNumber)
}
// This is used to cache an method return value if method piping is turned on
func (p *poller) cache(out interface{}) {
// Cache returned value if piping is turned on
if p.contract.Piping {
switch out.(type) {
case common.Hash:
if p.contract.EmittedHashes != nil {
p.contract.AddEmittedHash(out.(common.Hash))
}
case []byte:
if p.contract.EmittedHashes != nil && len(out.([]byte)) == 32 {
p.contract.AddEmittedHash(common.BytesToHash(out.([]byte)))
}
case common.Address:
if p.contract.EmittedAddrs != nil {
p.contract.AddEmittedAddr(out.(common.Address))
}
default:
}
}
}
func stringify(input interface{}) (string, error) {
switch input.(type) {
case *big.Int:

View File

@ -50,7 +50,7 @@ var _ = Describe("Poller", func() {
})
Describe("PollContract", func() {
It("Polls specified contract methods using contract's token holder address list", func() {
It("Polls specified contract methods using contract's argument list", func() {
con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
Expect(con.Abi).To(Equal(constants.TusdAbiString))
con.StartingBlock = 6707322
@ -122,8 +122,8 @@ var _ = Describe("Poller", func() {
})
})
Describe("PollMethod", func() {
It("Polls a single contract method", func() {
Describe("FetchContractData", func() {
It("Calls a single contract method", func() {
var name = new(string)
err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
Expect(err).ToNot(HaveOccurred())
@ -209,10 +209,44 @@ var _ = Describe("Poller", func() {
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).To(HaveOccurred())
})
It("Caches returned values of the appropriate types for downstream method polling if method piping is turned on", func() {
con = test_helpers.SetupENSContract(nil, []string{"resolver"})
Expect(con.Abi).To(Equal(constants.ENSAbiString))
con.StartingBlock = 6921967
con.LastBlock = 6921968
con.EmittedAddrs = map[interface{}]bool{}
con.Piping = false
con.AddEmittedHash(common.HexToHash("0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8"))
err := p.PollContract(*con)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.Resolver{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.resolver_method WHERE node_ = '0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8' AND block = '6921967'", constants.EnsContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Address).To(Equal("0x5FfC014343cd971B7eb70732021E26C35B744cc4"))
Expect(scanStruct.TokenName).To(Equal("ENS-Registry"))
Expect(len(con.EmittedAddrs)).To(Equal(0)) // With piping off the address is not saved
test_helpers.TearDown(db)
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db, types.LightSync)
con.Piping = true
err = p.PollContract(*con)
Expect(err).ToNot(HaveOccurred())
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.resolver_method WHERE node_ = '0x495b6e6efdedb750aa519919b5cf282bdaa86067b82a2293a3ff5723527141e8' AND block = '6921967'", constants.EnsContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Address).To(Equal("0x5FfC014343cd971B7eb70732021E26C35B744cc4"))
Expect(scanStruct.TokenName).To(Equal("ENS-Registry"))
Expect(len(con.EmittedAddrs)).To(Equal(1)) // With piping on it is saved
Expect(con.EmittedAddrs[common.HexToAddress("0x5FfC014343cd971B7eb70732021E26C35B744cc4")]).To(Equal(true))
})
})
Describe("PollMethod", func() {
It("Polls a single contract method", func() {
Describe("FetchContractData", func() {
It("Calls a single contract method", func() {
var name = new(string)
err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
Expect(err).ToNot(HaveOccurred())

View File

@ -48,6 +48,7 @@ var _ = Describe("Repository", func() {
var con *contract.Contract
var vulcanizeLogId int64
var wantedEvents = []string{"Transfer"}
var wantedMethods = []string{"balanceOf"}
var event types.Event
var headerID int64
var mockEvent = mocks.MockTranferEvent
@ -55,7 +56,7 @@ var _ = Describe("Repository", func() {
var mockLog2 = mocks.MockTransferLog2
BeforeEach(func() {
db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{})
db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, wantedMethods)
mockEvent.LogID = vulcanizeLogId
event = con.Events["Transfer"]

View File

@ -41,7 +41,8 @@ var _ = Describe("Repository", func() {
BeforeEach(func() {
con = test_helpers.SetupTusdContract([]string{}, []string{"balanceOf"})
method = con.Methods["balanceOf"]
Expect(len(con.Methods)).To(Equal(1))
method = con.Methods[0]
mockResult = types.Result{
Method: method,
PgType: method.Return[0].PgType,