Generic watcher that takes a contract address, grabs the contract abi and starting block number, creates custom event filters, and extracts and transforms event data into postgres. Can configure to look at only a subset of events through CLI flag. Building but needs testing.

This commit is contained in:
Ian Norden 2018-11-03 14:00:25 -05:00
parent 57820ff473
commit 8ce75fe5ad
13 changed files with 1011 additions and 1 deletions

122
cmd/omniWatcher.go Normal file
View File

@ -0,0 +1,122 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/omni/transformer"
"github.com/vulcanize/vulcanizedb/pkg/omni/types"
)
// omniWatcherCmd represents the omniWatcher command
var omniWatcherCmd = &cobra.Command{
Use: "omniWatcher",
Short: "Watches events at the provided contract address",
Long: `Uses input contract address and event filters to watch events
Expects an ethereum node to be running
Expects an archival node synced into vulcanizeDB
Requires a .toml config file:
[database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
`,
Run: func(cmd *cobra.Command, args []string) {
omniWatcher()
},
}
func omniWatcher() {
if contractAddress == "" {
log.Fatal("Contract address required")
}
if contractEvents == nil {
var str string
for str != "y" {
reader := bufio.NewReader(os.Stdin)
fmt.Print("Warning: no events specified, proceeding to watch every event at address" + contractAddress + "? (Y/n)\n> ")
resp, err := reader.ReadBytes('\n')
if err != nil {
log.Fatal(err)
}
str = strings.ToLower(string(resp))
if str == "n" {
return
}
}
}
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
client := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(client, node, transactionConverter)
db, err := postgres.NewDB(databaseConfig, blockChain.Node())
if err != nil {
log.Fatal(fmt.Sprintf("Failed to initialize database\r\nerr: %v\r\n", err))
}
con := types.Config{
DB: db,
BC: blockChain,
Network: network,
}
t := transformer.NewTransformer(&con)
t.Set(contractAddress, contractEvents)
err = t.Init()
if err != nil {
log.Fatal(fmt.Sprintf("Failed to initialized generator\r\nerr: %v\r\n", err))
}
log.Fatal(t.Execute())
}
func init() {
rootCmd.AddCommand(omniWatcherCmd)
omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for")
omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch- use only with single address")
omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "Addresses of the contracts to generate watchers for")
omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
}

View File

@ -32,6 +32,10 @@ var (
startingBlockNumber int64 startingBlockNumber int64
syncAll bool syncAll bool
endingBlockNumber int64 endingBlockNumber int64
network string
contractAddress string
contractAddresses []string
contractEvents []string
) )
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{

View File

@ -1,5 +1,5 @@
[database] [database]
name = "vulcanize_private" name = "vulcanize_infura"
hostname = "localhost" hostname = "localhost"
port = 5432 port = 5432

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,94 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package converter
import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/examples/generic/helpers"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/omni/types"
)
type Converter interface {
Convert(watchedEvent core.WatchedEvent, event *types.Event) error
Update(info types.ContractInfo)
}
type converter struct {
contractInfo types.ContractInfo
}
func NewConverter(info types.ContractInfo) *converter {
return &converter{
contractInfo: info,
}
}
func (c converter) Update(info types.ContractInfo) {
c.contractInfo = info
}
func (c converter) Convert(watchedEvent core.WatchedEvent, event *types.Event) error {
contract := bind.NewBoundContract(common.HexToAddress(c.contractInfo.Address), c.contractInfo.ParsedAbi, nil, nil, nil)
values := make(map[string]interface{})
for _, field := range event.Fields {
var i interface{}
values[field.Name] = i
switch field.Type.T {
case abi.StringTy:
field.PgType = "CHARACTER VARYING(66) NOT NULL"
case abi.IntTy, abi.UintTy:
field.PgType = "DECIMAL NOT NULL"
case abi.BoolTy:
field.PgType = "BOOLEAN NOT NULL"
case abi.BytesTy, abi.FixedBytesTy:
field.PgType = "BYTEA NOT NULL"
case abi.AddressTy:
field.PgType = "CHARACTER VARYING(66) NOT NULL"
case abi.HashTy:
field.PgType = "CHARACTER VARYING(66) NOT NULL"
case abi.ArrayTy:
field.PgType = "TEXT[] NOT NULL"
case abi.FixedPointTy:
field.PgType = "MONEY NOT NULL" // use shopspring/decimal for fixed point numbers in go and money type in postgres?
case abi.FunctionTy:
field.PgType = "TEXT NOT NULL"
default:
field.PgType = "TEXT NOT NULL"
}
}
log := helpers.ConvertToLog(watchedEvent)
err := contract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return err
}
eventLog := types.Log{
Values: values,
Block: watchedEvent.BlockNumber,
Tx: watchedEvent.TxHash,
}
event.Logs[watchedEvent.LogID] = eventLog
return nil
}

128
pkg/omni/fetcher/fetcher.go Normal file
View File

@ -0,0 +1,128 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fetcher
import (
"fmt"
"log"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/core"
)
// Fetcher serves as the lower level data fetcher that calls the underlying
// blockchain's FetchConctractData method for a given return type
// Interface definition for a Fetcher
type Fetcher interface {
FetchBigInt(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (big.Int, error)
FetchBool(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (bool, error)
FetchAddress(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (common.Address, error)
FetchString(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (string, error)
FetchHash(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (common.Hash, error)
}
// Fetcher struct
type fetcher struct {
BlockChain core.BlockChain // Underyling Blockchain
}
// Fetcher error
type fetcherError struct {
err string
fetchMethod string
}
func NewFetcher(blockChain core.BlockChain) *fetcher {
return &fetcher{
BlockChain: blockChain,
}
}
// Fetcher error method
func (fe *fetcherError) Error() string {
return fmt.Sprintf("Error fetching %s: %s", fe.fetchMethod, fe.err)
}
// Used to create a new Fetcher error for a given error and fetch method
func newFetcherError(err error, fetchMethod string) *fetcherError {
e := fetcherError{err.Error(), fetchMethod}
log.Println(e.Error())
return &e
}
// Generic Fetcher methods used by Getters to call contract methods
// Method used to fetch big.Int value from contract
func (f fetcher) FetchBigInt(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (big.Int, error) {
var result = new(big.Int)
err := f.BlockChain.FetchContractData(contractAbi, contractAddress, method, methodArgs, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}
// Method used to fetch bool value from contract
func (f fetcher) FetchBool(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (bool, error) {
var result = new(bool)
err := f.BlockChain.FetchContractData(contractAbi, contractAddress, method, methodArgs, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}
// Method used to fetch address value from contract
func (f fetcher) FetchAddress(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (common.Address, error) {
var result = new(common.Address)
err := f.BlockChain.FetchContractData(contractAbi, contractAddress, method, methodArgs, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}
// Method used to fetch string value from contract
func (f fetcher) FetchString(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (string, error) {
var result = new(string)
err := f.BlockChain.FetchContractData(contractAbi, contractAddress, method, methodArgs, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}
// Method used to fetch hash value from contract
func (f fetcher) FetchHash(method, contractAbi, contractAddress string, blockNumber int64, methodArgs []interface{}) (common.Hash, error) {
var result = new(common.Hash)
err := f.BlockChain.FetchContractData(contractAbi, contractAddress, method, methodArgs, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}

85
pkg/omni/parser/parser.go Normal file
View File

@ -0,0 +1,85 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parser
import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/omni/types"
)
type Parser interface {
Parse(contractAddr string) error
GetAbi() string
GetParsedAbi() abi.ABI
GetMethods() map[string]*types.Method
GetEvents() map[string]*types.Event
}
type parser struct {
client *geth.EtherScanAPI
abi string
parsedAbi abi.ABI
}
func NewParser(network string) *parser {
url := geth.GenURL(network)
return &parser{
client: geth.NewEtherScanClient(url),
}
}
func (p *parser) GetAbi() string {
return p.abi
}
func (p *parser) GetParsedAbi() abi.ABI {
return p.parsedAbi
}
func (p *parser) Parse(contractAddr string) error {
abiStr, err := p.client.GetAbi(contractAddr)
if err != nil {
return err
}
p.abi = abiStr
p.parsedAbi, err = geth.ParseAbi(abiStr)
return err
}
func (p *parser) GetMethods() map[string]*types.Method {
methods := map[string]*types.Method{}
for _, m := range p.parsedAbi.Methods {
method := types.NewMethod(m)
methods[m.Name] = method
}
return methods
}
func (p *parser) GetEvents() map[string]*types.Event {
events := map[string]*types.Event{}
for _, e := range p.parsedAbi.Events {
event := types.NewEvent(e)
events[e.Name] = event
}
return events
}

View File

@ -0,0 +1,150 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"fmt"
"strings"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/types"
)
type DataStore interface {
PersistEvents(info types.ContractInfo) error
}
type dataStore struct {
*postgres.DB
}
func NewDataStore(db *postgres.DB) *dataStore {
return &dataStore{
DB: db,
}
}
func (d *dataStore) PersistEvents(contract types.ContractInfo) error {
schemaExists, err := d.CheckForSchema(contract.Name)
if err != nil {
return err
}
if !schemaExists {
err = d.CreateContractSchema(contract.Name)
if err != nil {
return err
}
}
for eventName, event := range contract.Events {
tableExists, err := d.CheckForTable(contract.Name, eventName)
if err != nil {
return err
}
if !tableExists {
err = d.CreateEventTable(contract.Name, event)
if err != nil {
return err
}
}
for id, log := range event.Logs {
// Create postgres command to persist any given event
pgStr := fmt.Sprintf("INSERT INTO %s.%s ", strings.ToLower(contract.Name), strings.ToLower(eventName))
pgStr = pgStr + "(vulcanize_log_id, token_name, token_address, event_name, block, tx"
var data []interface{}
data = append(data,
id,
strings.ToLower(contract.Name),
strings.ToLower(contract.Address),
strings.ToLower(eventName),
log.Block,
log.Tx)
counter := 0
for inputName, input := range log.Values {
counter += 1
pgStr = pgStr + fmt.Sprintf(", %s", strings.ToLower(inputName))
data = append(data, input)
}
pgStr = pgStr + ") "
appendStr := "VALUES ($1, $2, $3, $4, $5, $6"
for i := 0; i < counter; i++ {
appendStr = appendStr + fmt.Sprintf(", $%d", i+7)
}
appendStr = appendStr + ") "
appendStr = appendStr + "ON CONFLICT (vulcanize_log_id) DO NOTHING"
pgStr = pgStr + fmt.Sprintf(") %s", appendStr)
_, err := d.DB.Exec(pgStr, data...)
if err != nil {
return err
}
}
}
return nil
}
func (d *dataStore) CreateEventTable(contractName string, event *types.Event) error {
// Create postgres command to create table for any given event
pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s ", strings.ToLower(contractName), strings.ToLower(event.Name))
pgStr = pgStr + `(id SERIAL,
vulcanize_log_id INTEGER NOT NULL UNIQUE,
token_name CHARACTER VARYING(66) NOT NULL,
token_address CHARACTER VARYING(66) NOT NULL,
event_name CHARACTER VARYING(66) NOT NULL,
block INTEGER NOT NULL,
tx CHARACTER VARYING(66) NOT NULL, `
for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf("%s %s NOT NULL, ", field.Name, field.PgType)
}
pgStr = pgStr + "CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)"
_, err := d.DB.Exec(pgStr)
return err
}
func (d *dataStore) CheckForTable(contractName string, eventName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s')", contractName, eventName)
var exists bool
err := d.DB.Get(exists, pgStr)
return exists, err
}
func (d *dataStore) CreateContractSchema(contractName string) error {
_, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS " + contractName)
return err
}
func (d *dataStore) CheckForSchema(contractName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s')", contractName)
var exists bool
err := d.DB.Get(exists, pgStr)
return exists, err
}

View File

@ -0,0 +1,74 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retriever
import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type Retriever interface {
RetrieveFirstBlock(contractAddr string) (int64, error)
RetrieveFirstBlockFromLogs(contractAddr string) (int64, error)
RetrieveFirstBlockFromReceipts(contractAddr string) (int64, error)
}
type retriever struct {
*postgres.DB
}
func NewRetriever(db *postgres.DB) (r *retriever) {
return &retriever{
DB: db,
}
}
// For some contracts the creation transaction receipt doesn't have the contract address so this doesn't work (e.g. Sai)
func (r *retriever) RetrieveFirstBlockFromReceipts(contractAddr string) (int64, error) {
var firstBlock int
err := r.DB.Get(
&firstBlock,
`SELECT number FROM blocks
WHERE id = (SELECT block_id FROM receipts
WHERE contract_address = $1
ORDER BY block_id ASC
LIMIT 1)`,
contractAddr,
)
return int64(firstBlock), err
}
// This servers as a heuristic to find the first block by finding the first contract event log
func (r *retriever) RetrieveFirstBlockFromLogs(contractAddr string) (int64, error) {
var firstBlock int
err := r.DB.Get(
&firstBlock,
"SELECT block_number FROM logs WHERE address = $1 ORDER BY block_number ASC LIMIT 1",
contractAddr,
)
return int64(firstBlock), err
}
// Try both methods of finding the first block, with the receipt method taking precedence
func (r *retriever) RetrieveFirstBlock(contractAddr string) (int64, error) {
i, err := r.RetrieveFirstBlockFromReceipts(contractAddr)
if err != nil {
i, err = r.RetrieveFirstBlockFromLogs(contractAddr)
}
return i, err
}

View File

@ -0,0 +1,160 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transformer
import (
"errors"
"fmt"
"log"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/omni/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/types"
)
// Top-level object similar to generator
// but attempts to solve problem without
// automated code generation
type EventTransformer interface {
Init(contractAddr string) error
}
type eventTransformer struct {
// Network, database, and blockchain config
*types.Config
// Underlying databases
datastore.WatchedEventRepository
datastore.FilterRepository
repository.DataStore
// Underlying interfaces
parser.Parser // Parses events out of contract abi fetched with addr
retriever.Retriever // Retrieves first block with contract addr referenced
fetcher.Fetcher // Fetches data from contract methods
// Store contract info as mapping to contract address
ContractInfo map[string]types.ContractInfo
// Subset of events of interest, stored as map of contract address to events
// By default this
sets map[string][]string
}
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(c *types.Config) (t *eventTransformer) {
t.Parser = parser.NewParser(c.Network)
t.Retriever = retriever.NewRetriever(c.DB)
t.Fetcher = fetcher.NewFetcher(c.BC)
t.ContractInfo = map[string]types.ContractInfo{}
t.WatchedEventRepository = repositories.WatchedEventRepository{DB: c.DB}
t.FilterRepository = repositories.FilterRepository{DB: c.DB}
t.DataStore = repository.NewDataStore(c.DB)
t.sets = map[string][]string{}
return t
}
// Used to set which contract addresses and which of their events to watch
func (t *eventTransformer) Set(contractAddr string, filterSet []string) {
t.sets[contractAddr] = filterSet
}
// Use after creating and setting transformer
// Loops over all of the addr => filter sets
// Uses parser to pull event info from abi
// Use this info to generate event filters
func (t *eventTransformer) Init() error {
for contractAddr, subset := range t.sets {
err := t.Parser.Parse(contractAddr)
if err != nil {
return err
}
var ctrName string
strName, err1 := t.Fetcher.FetchString("name", t.Parser.GetAbi(), contractAddr, -1, nil)
if err1 != nil || strName == "" {
hashName, err2 := t.Fetcher.FetchHash("name", t.Parser.GetAbi(), contractAddr, -1, nil)
if err2 != nil || hashName.String() == "" {
return errors.New(fmt.Sprintf("fetching string: %s and hash: %s names failed\r\nerr1: %v\r\nerr2: %v\r\n ", strName, hashName, err1, err2))
}
ctrName = hashName.String()
} else {
ctrName = strName
}
firstBlock, err := t.Retriever.RetrieveFirstBlock(contractAddr)
if err != nil {
return err
}
info := types.ContractInfo{
Name: ctrName,
Address: contractAddr,
Abi: t.Parser.GetAbi(),
ParsedAbi: t.Parser.GetParsedAbi(),
StartingBlock: firstBlock,
Events: t.Parser.GetEvents(),
Methods: t.Parser.GetMethods(),
}
info.GenerateFilters(subset)
for _, filter := range info.Filters {
t.CreateFilter(filter)
}
t.ContractInfo[contractAddr] = info
}
return nil
}
func (tr eventTransformer) Execute() error {
for _, contract := range tr.ContractInfo {
c := converter.NewConverter(contract)
for eventName, filter := range contract.Filters {
watchedEvents, err := tr.GetWatchedEvents(eventName)
if err != nil {
log.Println(fmt.Sprintf("Error fetching events for %s:", filter.Name), err)
return err
}
for _, we := range watchedEvents {
err = c.Convert(*we, contract.Events[eventName])
if err != nil {
return err
}
}
}
err := tr.PersistEvents(contract)
if err != nil {
return err
}
}
return nil
}

26
pkg/omni/types/config.go Normal file
View File

@ -0,0 +1,26 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type Config struct {
Network string
BC core.BlockChain
DB *postgres.DB
}

View File

@ -0,0 +1,58 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
)
type ContractInfo struct {
Name string
Address string
StartingBlock int64
Abi string
ParsedAbi abi.ABI
Events map[string]*Event // Map of events to their names
Methods map[string]*Method // Map of methods to their names
Filters map[string]filters.LogFilter // Map of event filters to their names
}
func (i *ContractInfo) GenerateFilters(subset []string) {
i.Filters = map[string]filters.LogFilter{}
for name, event := range i.Events {
if len(subset) == 0 || stringInSlice(subset, name) {
i.Filters[name] = filters.LogFilter{
Name: name,
FromBlock: i.StartingBlock,
ToBlock: -1,
Address: i.Address,
Topics: core.Topics{event.Sig()},
}
}
}
}
func stringInSlice(list []string, s string) bool {
for _, b := range list {
if b == s {
return true
}
}
return false
}

108
pkg/omni/types/entities.go Normal file
View File

@ -0,0 +1,108 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/accounts/abi"
)
type Event struct {
Name string
Anonymous bool
Fields []Field
Logs map[int64]Log // Map of VulcanizeIdLog to parsed event log
}
type Method struct {
Name string
Const bool
Inputs []Field
Outputs []Field
}
type Field struct {
abi.Argument
Value interface{}
PgType string
}
type Log struct {
Values map[string]interface{} // Map of event input names to their values
Block int64
Tx string
}
func NewEvent(e abi.Event) *Event {
fields := make([]Field, len(e.Inputs))
for i, input := range e.Inputs {
fields[i].Name = input.Name
fields[i].Type = input.Type
fields[i].Indexed = input.Indexed
}
return &Event{
Name: e.Name,
Anonymous: e.Anonymous,
Fields: fields,
Logs: map[int64]Log{},
}
}
func NewMethod(m abi.Method) *Method {
inputs := make([]Field, len(m.Inputs))
for i, input := range m.Inputs {
inputs[i].Name = input.Name
inputs[i].Type = input.Type
inputs[i].Indexed = input.Indexed
}
outputs := make([]Field, len(m.Outputs))
for i, output := range m.Outputs {
outputs[i].Name = output.Name
outputs[i].Type = output.Type
outputs[i].Indexed = output.Indexed
}
return &Method{
Name: m.Name,
Const: m.Const,
Inputs: inputs,
Outputs: outputs,
}
}
func (e Event) Sig() string {
types := make([]string, len(e.Fields))
for i, input := range e.Fields {
types[i] = input.Type.String()
}
return fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ","))
}
func (m Method) Sig() string {
types := make([]string, len(m.Inputs))
i := 0
for _, input := range m.Inputs {
types[i] = input.Type.String()
i++
}
return fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ","))
}