ipld-eth-server/pkg/omni/full/transformer/transformer.go

272 lines
9.9 KiB
Go
Raw Normal View History

// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package transformer
import (
"errors"
"strings"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Requires a fully synced vDB and a running eth node (or infura)
type Transformer struct {
// Database interfaces
datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters()
datastore.WatchedEventRepository // Watched event log views, created by the log filters
repository.EventRepository // Holds transformed watched event log data
// Pre-processing interfaces
parser.Parser // Parses events and methods out of contract abi fetched using contract address
retriever.BlockRetriever // Retrieves first block for contract and current block height
// Processing interfaces
converter.Converter // Converts watched event logs into custom log
poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore
// Ethereum network name; default "" is mainnet
Network string
// Store contract info as mapping to contract address
Contracts map[string]*contract.Contract
// Targeted subset of events/methods
// Stored as map sof contract address to events/method names of interest
WatchedEvents map[string][]string // Default/empty event list means all are watched
WantedMethods map[string][]string // Default/empty method list means none are polled
// Starting block for contracts
ContractStart map[string]int64
// Lists of addresses to filter event or method data
// before persisting; if empty no filter is applied
EventArgs map[string][]string
MethodArgs map[string][]string
// Whether or not to create a list of emitted address or hashes for the contract in postgres
CreateAddrList map[string]bool
CreateHashList map[string]bool
// Method piping on/off for a contract
Piping map[string]bool
}
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *Transformer {
return &Transformer{
Poller: poller.NewPoller(BC, DB, types.FullSync),
Parser: parser.NewParser(network),
BlockRetriever: retriever.NewBlockRetriever(DB),
Converter: converter.NewConverter(&contract.Contract{}),
Contracts: map[string]*contract.Contract{},
WatchedEventRepository: repositories.WatchedEventRepository{DB: DB},
FilterRepository: repositories.FilterRepository{DB: DB},
EventRepository: repository.NewEventRepository(DB, types.FullSync),
WatchedEvents: map[string][]string{},
WantedMethods: map[string][]string{},
ContractStart: map[string]int64{},
EventArgs: map[string][]string{},
MethodArgs: map[string][]string{},
CreateAddrList: map[string]bool{},
CreateHashList: map[string]bool{},
Piping: map[string]bool{},
}
}
// Use after creating and setting transformer
// Loops over all of the addr => filter sets
// Uses parser to pull event info from abi
// Use this info to generate event filters
func (transformer *Transformer) Init() error {
for contractAddr, subset := range transformer.WatchedEvents {
// Get Abi
err := transformer.Parser.Parse(contractAddr)
if err != nil {
return err
}
// Get first block and most recent block number in the header repo
firstBlock, err := transformer.BlockRetriever.RetrieveFirstBlock(contractAddr)
if err != nil {
return err
}
lastBlock, err := transformer.BlockRetriever.RetrieveMostRecentBlock()
if err != nil {
return err
}
// Set to specified range if it falls within the bounds
if firstBlock < transformer.ContractStart[contractAddr] {
firstBlock = transformer.ContractStart[contractAddr]
}
// Get contract name if it has one
var name = new(string)
transformer.Poller.FetchContractData(transformer.Abi(), contractAddr, "name", nil, name, lastBlock)
// Remove any potential accidental duplicate inputs in arg filter values
eventArgs := map[string]bool{}
for _, arg := range transformer.EventArgs[contractAddr] {
eventArgs[arg] = true
}
methodArgs := map[string]bool{}
for _, arg := range transformer.MethodArgs[contractAddr] {
methodArgs[arg] = true
}
// Aggregate info into contract object
info := contract.Contract{
Name: *name,
Network: transformer.Network,
Address: contractAddr,
Abi: transformer.Parser.Abi(),
ParsedAbi: transformer.Parser.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: transformer.Parser.GetEvents(subset),
Methods: transformer.Parser.GetSelectMethods(transformer.WantedMethods[contractAddr]),
FilterArgs: eventArgs,
MethodArgs: methodArgs,
CreateAddrList: transformer.CreateAddrList[contractAddr],
CreateHashList: transformer.CreateHashList[contractAddr],
Piping: transformer.Piping[contractAddr],
}.Init()
// Use info to create filters
err = info.GenerateFilters()
if err != nil {
return err
}
// Iterate over filters and push them to the repo using filter repository interface
for _, filter := range info.Filters {
err = transformer.FilterRepository.CreateFilter(filter)
if err != nil {
return err
}
}
// Store contract info for further processing
transformer.Contracts[contractAddr] = info
}
return nil
}
// Iterates through stored, initialized contract objects
// Iterates through contract's event filters, grabbing watched event logs
// Uses converter to convert logs into custom log type
// Persists converted logs into custuom postgres tables
// Calls selected methods, using token holder address generated during event log conversion
func (transformer Transformer) Execute() error {
if len(transformer.Contracts) == 0 {
return errors.New("error: transformer has no initialized contracts to work with")
}
// Iterate through all internal contracts
for _, con := range transformer.Contracts {
// Update converter with current contract
transformer.Update(con)
// Iterate through contract filters and get watched event logs
for eventSig, filter := range con.Filters {
watchedEvents, err := transformer.GetWatchedEvents(filter.Name)
if err != nil {
return err
}
// Iterate over watched event logs
for _, we := range watchedEvents {
// Convert them to our custom log type
cstm, err := transformer.Converter.Convert(*we, con.Events[eventSig])
if err != nil {
return err
}
if cstm == nil {
continue
}
// If log is not empty, immediately persist in repo
// Run this in seperate goroutine?
err = transformer.PersistLogs([]types.Log{*cstm}, con.Events[eventSig], con.Address, con.Name)
if err != nil {
return err
}
}
}
// After persisting all watched event logs
// poller polls select contract methods
// and persists the results into custom pg tables
// Run this in seperate goroutine?
if err := transformer.PollContract(*con); err != nil {
return err
}
}
return nil
}
// Used to set which contract addresses and which of their events to watch
func (transformer *Transformer) SetEvents(contractAddr string, filterSet []string) {
transformer.WatchedEvents[strings.ToLower(contractAddr)] = filterSet
}
// Used to set subset of account addresses to watch events for
func (transformer *Transformer) SetEventArgs(contractAddr string, filterSet []string) {
transformer.EventArgs[strings.ToLower(contractAddr)] = filterSet
}
// Used to set which contract addresses and which of their methods to call
func (transformer *Transformer) SetMethods(contractAddr string, filterSet []string) {
transformer.WantedMethods[strings.ToLower(contractAddr)] = filterSet
}
// Used to set subset of account addresses to poll methods on
func (transformer *Transformer) SetMethodArgs(contractAddr string, filterSet []string) {
transformer.MethodArgs[strings.ToLower(contractAddr)] = filterSet
}
// Used to set the block range to watch for a given address
func (transformer *Transformer) SetStartingBlock(contractAddr string, start int64) {
transformer.ContractStart[strings.ToLower(contractAddr)] = start
}
// Used to set whether or not to persist an account address list
func (transformer *Transformer) SetCreateAddrList(contractAddr string, on bool) {
transformer.CreateAddrList[strings.ToLower(contractAddr)] = on
}
// Used to set whether or not to persist an hash list
func (transformer *Transformer) SetCreateHashList(contractAddr string, on bool) {
transformer.CreateHashList[strings.ToLower(contractAddr)] = on
}
// Used to turn method piping on for a contract
func (transformer *Transformer) SetPiping(contractAddr string, on bool) {
transformer.Piping[strings.ToLower(contractAddr)] = on
}