diff --git a/cmd/omniWatcher.go b/cmd/omniWatcher.go index c7967ffe..6e293d7b 100644 --- a/cmd/omniWatcher.go +++ b/cmd/omniWatcher.go @@ -117,7 +117,7 @@ func init() { omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for") omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address") omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched") - omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled") + omniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled") omniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses") omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses") omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) diff --git a/pkg/omni/contract/contract.go b/pkg/omni/contract/contract.go index ac8d3d38..a3faf494 100644 --- a/pkg/omni/contract/contract.go +++ b/pkg/omni/contract/contract.go @@ -34,8 +34,8 @@ type Contract struct { LastBlock int64 Abi string ParsedAbi abi.ABI - Events map[string]*types.Event // Map of events to their names - Methods map[string]*types.Method // Map of methods to their names + Events map[string]types.Event // Map of events to their names + Methods map[string]types.Method // Map of methods to their names Filters map[string]filters.LogFilter // Map of event filters to their names EventAddrs map[string]bool // User-input list of account addresses to watch events for MethodAddrs map[string]bool // User-input list of account addresses to poll methods for diff --git a/pkg/omni/converter/converter.go b/pkg/omni/converter/converter.go index 9ab38468..70464337 100644 --- a/pkg/omni/converter/converter.go +++ b/pkg/omni/converter/converter.go @@ -33,7 +33,7 @@ import ( // Converter is used to convert watched event logs to // custom logs containing event input name => value maps type Converter interface { - Convert(watchedEvent core.WatchedEvent, event *types.Event) error + Convert(watchedEvent core.WatchedEvent, event types.Event) (*types.Log, error) Update(info *contract.Contract) } @@ -57,7 +57,7 @@ func (c *converter) CheckInfo() *contract.Contract { } // Convert the given watched event log into a types.Log for the given event -func (c *converter) Convert(watchedEvent core.WatchedEvent, event *types.Event) error { +func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (*types.Log, error) { contract := bind.NewBoundContract(common.HexToAddress(c.contractInfo.Address), c.contractInfo.ParsedAbi, nil, nil, nil) values := make(map[string]interface{}) @@ -69,7 +69,7 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event *types.Event) log := helpers.ConvertToLog(watchedEvent) err := contract.UnpackLogIntoMap(values, event.Name, log) if err != nil { - return err + return nil, err } strValues := make(map[string]string, len(values)) @@ -95,22 +95,22 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event *types.Event) case bool: strValues[fieldName] = strconv.FormatBool(input.(bool)) default: - return errors.New("error: unhandled abi type") + return nil, errors.New("error: unhandled abi type") } } // Only hold onto logs that pass our address filter, if any - // Persist log here and don't hold onto it if c.contractInfo.PassesEventFilter(strValues) { - eventLog := types.Log{ + eventLog := &types.Log{ + Event: event, Id: watchedEvent.LogID, Values: strValues, Block: watchedEvent.BlockNumber, Tx: watchedEvent.TxHash, } - event.Logs[watchedEvent.LogID] = eventLog + return eventLog, err } - return nil + return nil, nil } diff --git a/pkg/omni/converter/converter_test.go b/pkg/omni/converter/converter_test.go index d73cb240..9ef7b3b0 100644 --- a/pkg/omni/converter/converter_test.go +++ b/pkg/omni/converter/converter_test.go @@ -76,24 +76,24 @@ var _ = Describe("Converter", func() { Expect(err).ToNot(HaveOccurred()) c := converter.NewConverter(info) - err = c.Convert(mockEvent, event) + log, err := c.Convert(mockEvent, event) Expect(err).ToNot(HaveOccurred()) from := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21") to := common.HexToAddress("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391") value := helpers.BigFromString("1097077688018008265106216665536940668749033598146") - v := event.Logs[1].Values["value"] + v := log.Values["value"] - Expect(event.Logs[1].Values["to"]).To(Equal(to.String())) - Expect(event.Logs[1].Values["from"]).To(Equal(from.String())) + Expect(log.Values["to"]).To(Equal(to.String())) + Expect(log.Values["from"]).To(Equal(from.String())) Expect(v).To(Equal(value.String())) }) It("Fails with an empty contract", func() { event := info.Events["Transfer"] c := converter.NewConverter(&contract.Contract{}) - err = c.Convert(mockEvent, event) + _, err = c.Convert(mockEvent, event) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/omni/helpers/mocks/parser.go b/pkg/omni/helpers/mocks/parser.go index 54a61229..d6ca7ee0 100644 --- a/pkg/omni/helpers/mocks/parser.go +++ b/pkg/omni/helpers/mocks/parser.go @@ -57,8 +57,8 @@ func (p *parser) Parse() error { // Returns wanted methods, if they meet the criteria, as map of types.Methods // Only returns specified methods -func (p *parser) GetMethods(wanted []string) map[string]*types.Method { - addrMethods := map[string]*types.Method{} +func (p *parser) GetMethods(wanted []string) map[string]types.Method { + addrMethods := map[string]types.Method{} for _, m := range p.parsedAbi.Methods { // Only return methods that have less than 3 inputs, 1 output, and wanted @@ -83,8 +83,8 @@ func (p *parser) GetMethods(wanted []string) map[string]*types.Method { // Returns wanted events as map of types.Events // If no events are specified, all events are returned -func (p *parser) GetEvents(wanted []string) map[string]*types.Event { - events := map[string]*types.Event{} +func (p *parser) GetEvents(wanted []string) map[string]types.Event { + events := map[string]types.Event{} for _, e := range p.parsedAbi.Events { if len(wanted) == 0 || stringInSlice(wanted, e.Name) { diff --git a/pkg/omni/helpers/test_helpers/database.go b/pkg/omni/helpers/test_helpers/database.go index 29a9bf5a..6c04dd71 100644 --- a/pkg/omni/helpers/test_helpers/database.go +++ b/pkg/omni/helpers/test_helpers/database.go @@ -41,7 +41,7 @@ var ExpectedTransferFilter = filters.LogFilter{ Name: "Transfer", Address: constants.TusdContractAddress, ToBlock: -1, - FromBlock: 5197514, + FromBlock: 6194634, Topics: core.Topics{constants.TransferEvent.Signature()}, } @@ -49,7 +49,7 @@ var ExpectedApprovalFilter = filters.LogFilter{ Name: "Approval", Address: constants.TusdContractAddress, ToBlock: -1, - FromBlock: 5197514, + FromBlock: 6194634, Topics: core.Topics{constants.ApprovalEvent.Signature()}, } @@ -57,8 +57,6 @@ type TransferLog struct { Id int64 `db:"id"` VulvanizeLogId int64 `db:"vulcanize_log_id"` TokenName string `db:"token_name"` - TokenAddress string `db:"token_address"` - EventName string `db:"event_name"` Block int64 `db:"block"` Tx string `db:"tx"` From string `db:"from_"` @@ -66,6 +64,14 @@ type TransferLog struct { Value string `db:"value_"` } +type BalanceOf struct { + Id int64 `db:"id"` + TokenName string `db:"token_name"` + Block int64 `db:"block"` + Address string `db:"who_"` + Balance string `db:"returned"` +} + func SetupBC() core.BlockChain { infuraIPC := "https://mainnet.infura.io/v3/b09888c1113640cc9ab42750ce750c05" rawRpcClient, err := rpc.Dial(infuraIPC) @@ -139,7 +145,7 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract Address: constants.TusdContractAddress, Abi: p.Abi(), ParsedAbi: p.ParsedAbi(), - StartingBlock: 5197514, + StartingBlock: 6194634, LastBlock: 6507323, Events: p.GetEvents(wantedEvents), Methods: p.GetMethods(wantedMethods), diff --git a/pkg/omni/parser/parser.go b/pkg/omni/parser/parser.go index 7022d79d..bf49c1c5 100644 --- a/pkg/omni/parser/parser.go +++ b/pkg/omni/parser/parser.go @@ -29,9 +29,9 @@ type Parser interface { Parse(contractAddr string) error Abi() string ParsedAbi() abi.ABI - GetMethods(wanted []string) map[string]*types.Method - GetAddrMethods(wanted []string) map[string]*types.Method - GetEvents(wanted []string) map[string]*types.Event + GetMethods(wanted []string) map[string]types.Method + GetAddrMethods(wanted []string) map[string]types.Method + GetEvents(wanted []string) map[string]types.Event } type parser struct { @@ -73,8 +73,8 @@ func (p *parser) Parse(contractAddr string) error { // Returns wanted methods, if they meet the criteria, as map of types.Methods // Empty wanted array => all methods that fit are returned // Nil wanted array => no events are returned -func (p *parser) GetAddrMethods(wanted []string) map[string]*types.Method { - addrMethods := map[string]*types.Method{} +func (p *parser) GetAddrMethods(wanted []string) map[string]types.Method { + addrMethods := map[string]types.Method{} if wanted == nil { return addrMethods } @@ -103,8 +103,8 @@ func (p *parser) GetAddrMethods(wanted []string) map[string]*types.Method { // Returns wanted methods as map of types.Methods // Empty wanted array => all events are returned // Nil wanted array => no events are returned -func (p *parser) GetMethods(wanted []string) map[string]*types.Method { - methods := map[string]*types.Method{} +func (p *parser) GetMethods(wanted []string) map[string]types.Method { + methods := map[string]types.Method{} if wanted == nil { return methods } @@ -122,8 +122,8 @@ func (p *parser) GetMethods(wanted []string) map[string]*types.Method { // Returns wanted events as map of types.Events // Empty wanted array => all events are returned // Nil wanted array => no events are returned -func (p *parser) GetEvents(wanted []string) map[string]*types.Event { - events := map[string]*types.Event{} +func (p *parser) GetEvents(wanted []string) map[string]types.Event { + events := map[string]types.Event{} if wanted == nil { return events } diff --git a/pkg/omni/parser/parser_test.go b/pkg/omni/parser/parser_test.go index 14c4c803..aba71c93 100644 --- a/pkg/omni/parser/parser_test.go +++ b/pkg/omni/parser/parser_test.go @@ -52,8 +52,8 @@ var _ = Describe("Parser", func() { Expect(ok).To(Equal(false)) m, ok := methods["balanceOf"] Expect(ok).To(Equal(true)) - Expect(len(m.Inputs)).To(Equal(1)) - Expect(len(m.Outputs)).To(Equal(1)) + Expect(len(m.Args)).To(Equal(1)) + Expect(len(m.Return)).To(Equal(1)) events := mp.GetEvents([]string{"Transfer"}) _, ok = events["Mint"] @@ -130,16 +130,16 @@ var _ = Describe("Parser", func() { m, ok := selectMethods["balanceOf"] Expect(ok).To(Equal(true)) - abiTy := m.Inputs[0].Type.T + abiTy := m.Args[0].Type.T Expect(abiTy).To(Equal(abi.AddressTy)) - pgTy := m.Inputs[0].PgType + pgTy := m.Args[0].PgType Expect(pgTy).To(Equal("CHARACTER VARYING(66)")) - abiTy = m.Outputs[0].Type.T + abiTy = m.Return[0].Type.T Expect(abiTy).To(Equal(abi.UintTy)) - pgTy = m.Outputs[0].PgType + pgTy = m.Return[0].PgType Expect(pgTy).To(Equal("DECIMAL")) _, ok = selectMethods["totalSupply"] diff --git a/pkg/omni/poller/poller.go b/pkg/omni/poller/poller.go index a75b3601..71ee2d49 100644 --- a/pkg/omni/poller/poller.go +++ b/pkg/omni/poller/poller.go @@ -18,37 +18,44 @@ package poller import ( "errors" + "fmt" + "math/big" + "strconv" + "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/pkg/omni/constants" "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/omni/contract" + "github.com/vulcanize/vulcanizedb/pkg/omni/repository" "github.com/vulcanize/vulcanizedb/pkg/omni/types" ) type Poller interface { - PollContract(con *contract.Contract) error - PollMethod(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error + PollContract(con contract.Contract) error + FetchContractData(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error } type poller struct { + repository.MethodDatastore bc core.BlockChain - contract *contract.Contract + contract contract.Contract } -func NewPoller(blockChain core.BlockChain) *poller { +func NewPoller(blockChain core.BlockChain, db *postgres.DB) *poller { return &poller{ - bc: blockChain, + MethodDatastore: repository.NewMethodDatastore(db), + bc: blockChain, } } // Used to call contract's methods found in abi using list of contract-related addresses -func (p *poller) PollContract(con *contract.Contract) error { +func (p *poller) PollContract(con contract.Contract) error { p.contract = con // Iterate over each of the contracts methods for _, m := range con.Methods { - switch len(m.Inputs) { + switch len(m.Args) { case 0: if err := p.pollNoArg(m); err != nil { return err @@ -71,76 +78,122 @@ func (p *poller) PollContract(con *contract.Contract) error { } // Poll methods that take no arguments -func (p *poller) pollNoArg(m *types.Method) error { - result := &types.Result{ - Inputs: nil, - Outputs: map[int64]interface{}{}, - PgType: m.Outputs[0].PgType, +func (p *poller) pollNoArg(m types.Method) error { + result := types.Result{ + Method: m, + Inputs: nil, + PgType: m.Return[0].PgType, } for i := p.contract.StartingBlock; i <= p.contract.LastBlock; i++ { - var res interface{} - err := p.bc.FetchContractData(p.contract.Abi, p.contract.Address, m.Name, result.Inputs, &res, i) + var out interface{} + err := p.bc.FetchContractData(p.contract.Abi, p.contract.Address, m.Name, nil, &out, i) + if err != nil { + return errors.New(fmt.Sprintf("poller error calling 0 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } + + strOut, err := stringify(out) if err != nil { return err } - result.Outputs[i] = res - } - // Persist results now instead of holding onto them - m.Results = append(m.Results, result) + result.Output = strOut + result.Block = i + + // Persist result immediately + err = p.PersistResult(result, p.contract.Address, p.contract.Name) + if err != nil { + return errors.New(fmt.Sprintf("poller error persisting 0 argument method result\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } + } return nil } // Use token holder address to poll methods that take 1 address argument (e.g. balanceOf) -func (p *poller) pollSingleArg(m *types.Method) error { - for addr := range p.contract.TknHolderAddrs { - result := &types.Result{ - Inputs: make([]interface{}, 1), - Outputs: map[int64]interface{}{}, - PgType: m.Outputs[0].PgType, - } - result.Inputs[0] = common.HexToAddress(addr) +func (p *poller) pollSingleArg(m types.Method) error { + result := types.Result{ + Method: m, + Inputs: make([]interface{}, 1), + PgType: m.Return[0].PgType, + } + for addr := range p.contract.TknHolderAddrs { for i := p.contract.StartingBlock; i <= p.contract.LastBlock; i++ { - var res interface{} - err := p.bc.FetchContractData(constants.TusdAbiString, p.contract.Address, m.Name, result.Inputs, &res, i) + hashArgs := []common.Address{common.HexToAddress(addr)} + in := make([]interface{}, len(hashArgs)) + strIn := make([]interface{}, len(hashArgs)) + for i, s := range hashArgs { + in[i] = s + strIn[i] = s.String() + } + + var out interface{} + err := p.bc.FetchContractData(p.contract.Abi, p.contract.Address, m.Name, in, &out, i) + if err != nil { + return errors.New(fmt.Sprintf("poller error calling 1 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } + + strOut, err := stringify(out) if err != nil { return err } - result.Outputs[i] = res - } - m.Results = append(m.Results, result) + result.Output = strOut + result.Block = i + result.Inputs = strIn + + err = p.PersistResult(result, p.contract.Address, p.contract.Name) + if err != nil { + return errors.New(fmt.Sprintf("poller error persisting 1 argument method result\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } + } } return nil } // Use token holder address to poll methods that take 2 address arguments (e.g. allowance) -func (p *poller) pollDoubleArg(m *types.Method) error { +func (p *poller) pollDoubleArg(m types.Method) error { // For a large block range and address list this will take a really, really long time- maybe we should only do 1 arg methods + result := types.Result{ + Method: m, + Inputs: make([]interface{}, 2), + PgType: m.Return[0].PgType, + } + for addr1 := range p.contract.TknHolderAddrs { for addr2 := range p.contract.TknHolderAddrs { - result := &types.Result{ - Inputs: make([]interface{}, 2), - Outputs: map[int64]interface{}{}, - PgType: m.Outputs[0].PgType, - } - result.Inputs[0] = common.HexToAddress(addr1) - result.Inputs[1] = common.HexToAddress(addr2) - for i := p.contract.StartingBlock; i <= p.contract.LastBlock; i++ { - var res interface{} - err := p.bc.FetchContractData(p.contract.Abi, p.contract.Address, m.Name, result.Inputs, &res, i) + hashArgs := []common.Address{common.HexToAddress(addr1), common.HexToAddress(addr2)} + in := make([]interface{}, len(hashArgs)) + strIn := make([]interface{}, len(hashArgs)) + for i, s := range hashArgs { + in[i] = s + strIn[i] = s.String() + } + + var out interface{} + err := p.bc.FetchContractData(p.contract.Abi, p.contract.Address, m.Name, in, &out, i) + if err != nil { + return errors.New(fmt.Sprintf("poller error calling 2 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } + + strOut, err := stringify(out) if err != nil { return err } - result.Outputs[i] = res + + result.Output = strOut + result.Block = i + result.Inputs = strIn + + err = p.PersistResult(result, p.contract.Address, p.contract.Name) + if err != nil { + return errors.New(fmt.Sprintf("poller error persisting 2 argument method result\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", i, m.Name, p.contract.Address, err)) + } } - m.Results = append(m.Results, result) } } @@ -148,6 +201,29 @@ func (p *poller) pollDoubleArg(m *types.Method) error { } // This is just a wrapper around the poller blockchain's FetchContractData method -func (p *poller) PollMethod(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error { +func (p *poller) FetchContractData(contractAbi, contractAddress, method string, methodArgs []interface{}, result interface{}, blockNumber int64) error { return p.bc.FetchContractData(contractAbi, contractAddress, method, methodArgs, result, blockNumber) } + +func stringify(input interface{}) (string, error) { + switch input.(type) { + case *big.Int: + var b *big.Int + b = input.(*big.Int) + return b.String(), nil + case common.Address: + var a common.Address + a = input.(common.Address) + return a.String(), nil + case common.Hash: + var h common.Hash + h = input.(common.Hash) + return h.String(), nil + case string: + return input.(string), nil + case bool: + return strconv.FormatBool(input.(bool)), nil + default: + return "", errors.New("error: unhandled return type") + } +} diff --git a/pkg/omni/poller/poller_test.go b/pkg/omni/poller/poller_test.go index 19c346c0..a71f1f27 100644 --- a/pkg/omni/poller/poller_test.go +++ b/pkg/omni/poller/poller_test.go @@ -17,12 +17,13 @@ package poller_test import ( - "github.com/ethereum/go-ethereum/common" + "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/omni/helpers" - "math/big" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/omni/constants" "github.com/vulcanize/vulcanizedb/pkg/omni/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/helpers/test_helpers" @@ -33,13 +34,20 @@ var _ = Describe("Poller", func() { var p poller.Poller var con *contract.Contract + var db *postgres.DB + var bc core.BlockChain BeforeEach(func() { - p = poller.NewPoller(test_helpers.SetupBC()) + db, bc = test_helpers.SetupDBandBC() + p = poller.NewPoller(bc, db) + }) + + AfterEach(func() { + test_helpers.TearDown(db) }) Describe("PollContract", func() { - It("Polls contract methods using token holder address list", func() { + It("Polls specified contract methods using contract's token holder address list", func() { con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) Expect(con.Abi).To(Equal(constants.TusdAbiString)) con.StartingBlock = 6707322 @@ -49,36 +57,56 @@ var _ = Describe("Poller", func() { "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, } - err := p.PollContract(con) + err := p.PollContract(*con) Expect(err).ToNot(HaveOccurred()) - Expect(len(con.Methods["balanceOf"].Results)).To(Equal(2)) - res1 := con.Methods["balanceOf"].Results[0] - res2 := con.Methods["balanceOf"].Results[1] - expectedRes11 := helpers.BigFromString("66386309548896882859581786") - expectedRes12 := helpers.BigFromString("66386309548896882859581786") - expectedRes21 := helpers.BigFromString("17982350181394112023885864") - expectedRes22 := helpers.BigFromString("17982350181394112023885864") - if res1.Inputs[0].(common.Address).String() == "0xfE9e8709d3215310075d67E3ed32A380CCf451C8" { - Expect(res2.Inputs[0].(common.Address).String()).To(Equal("0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE")) - Expect(res1.Outputs[6707322].(*big.Int).String()).To(Equal(expectedRes11.String())) - Expect(res1.Outputs[6707323].(*big.Int).String()).To(Equal(expectedRes12.String())) - Expect(res2.Outputs[6707322].(*big.Int).String()).To(Equal(expectedRes21.String())) - Expect(res2.Outputs[6707323].(*big.Int).String()).To(Equal(expectedRes22.String())) - } else { - Expect(res2.Inputs[0].(common.Address).String()).To(Equal("0xfE9e8709d3215310075d67E3ed32A380CCf451C8")) - Expect(res2.Outputs[6707322].(*big.Int).String()).To(Equal(expectedRes11.String())) - Expect(res2.Outputs[6707323].(*big.Int).String()).To(Equal(expectedRes12.String())) - Expect(res1.Outputs[6707322].(*big.Int).String()).To(Equal(expectedRes21.String())) - Expect(res1.Outputs[6707323].(*big.Int).String()).To(Equal(expectedRes22.String())) + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + }) + + It("Does not poll and persist any methods if none are specified", func() { + con = test_helpers.SetupTusdContract(nil, nil) + Expect(con.Abi).To(Equal(constants.TusdAbiString)) + con.StartingBlock = 6707322 + con.LastBlock = 6707323 + con.TknHolderAddrs = map[string]bool{ + "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true, + "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, } + + err := p.PollContract(*con) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).To(HaveOccurred()) }) }) Describe("PollMethod", func() { It("Polls a single contract method", func() { var name = new(string) - err := p.PollMethod(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) + err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) Expect(err).ToNot(HaveOccurred()) Expect(*name).To(Equal("TrueUSD")) }) diff --git a/pkg/omni/repository/event_repository.go b/pkg/omni/repository/event_repository.go index b6208f84..e38672db 100644 --- a/pkg/omni/repository/event_repository.go +++ b/pkg/omni/repository/event_repository.go @@ -22,14 +22,13 @@ import ( "strings" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/omni/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/types" ) // Event datastore is used to persist event data into custom tables type EventDatastore interface { - PersistContractEvents(con *contract.Contract) error - CreateEventTable(contractName string, event *types.Event) (bool, error) + PersistLog(event types.Log, contractAddr, contractName string) error + CreateEventTable(contractName string, event types.Log) (bool, error) CreateContractSchema(contractName string) (bool, error) } @@ -44,93 +43,71 @@ func NewEventDataStore(db *postgres.DB) *eventDatastore { } } -// Creates a schema for the contract -// Creates tables for the watched contract events -// Persists converted event log data into these custom tables -// Edit this method to accept a single event -func (d *eventDatastore) PersistContractEvents(con *contract.Contract) error { - _, err := d.CreateContractSchema(con.Address) +// Creates a schema for the contract if needed +// Creates table for the watched contract event if needed +// Persists converted event log data into this custom table +func (d *eventDatastore) PersistLog(event types.Log, contractAddr, contractName string) error { + _, err := d.CreateContractSchema(contractAddr) if err != nil { return err } - for eventName := range con.Filters { - event := con.Events[eventName] - - // Move to next event if there are no logs to process - if len(event.Logs) == 0 { - break - } - - // Create a pg table for the event if one does not already exist - _, err = d.CreateEventTable(con.Address, event) - if err != nil { - return err - } - - // Persist event log data in this table - err = d.persistLogs(eventName, con) - if err != nil { - return err - } - - // Clear logs when we are done using them - event.Logs = map[int64]types.Log{} + _, err = d.CreateEventTable(contractAddr, event) + if err != nil { + return err } - return nil + return d.persistLog(event, contractAddr, contractName) } // Creates a custom postgres command to persist logs for the given event -func (d *eventDatastore) persistLogs(eventName string, con *contract.Contract) error { - for id, log := range con.Events[eventName].Logs { - // Begin postgres string - pgStr := fmt.Sprintf("INSERT INTO c%s.%s ", strings.ToLower(con.Address), strings.ToLower(eventName)) - pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx" +func (d *eventDatastore) persistLog(event types.Log, contractAddr, contractName string) error { + // Begin postgres string + pgStr := fmt.Sprintf("INSERT INTO c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) + pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx" - // Pack the corresponding variables in a slice - var data []interface{} - data = append(data, - id, - con.Name, - log.Block, - log.Tx) + // Pack the corresponding variables in a slice + var data []interface{} + data = append(data, + event.Id, + contractName, + event.Block, + event.Tx) - // Iterate over name-value pairs in the log adding - // names to the string and pushing values to the slice - counter := 0 // Keep track of number of inputs - for inputName, input := range log.Values { - counter += 1 - pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words - data = append(data, input) - } + // Iterate over name-value pairs in the log adding + // names to the string and pushing values to the slice + counter := 0 // Keep track of number of inputs + for inputName, input := range event.Values { + counter += 1 + pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words + data = append(data, input) + } - // Finish off the string and execute the command using the packed data - // For each input entry we created we add its postgres command variable to the string - pgStr = pgStr + ") VALUES ($1, $2, $3, $4" - for i := 0; i < counter; i++ { - pgStr = pgStr + fmt.Sprintf(", $%d", i+5) - } - pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" + // Finish off the string and execute the command using the packed data + // For each input entry we created we add its postgres command variable to the string + pgStr = pgStr + ") VALUES ($1, $2, $3, $4" + for i := 0; i < counter; i++ { + pgStr = pgStr + fmt.Sprintf(", $%d", i+5) + } + pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" - _, err := d.DB.Exec(pgStr, data...) - if err != nil { - return err - } + _, err := d.DB.Exec(pgStr, data...) + if err != nil { + return err } return nil } // Checks for event table and creates it if it does not already exist -func (d *eventDatastore) CreateEventTable(contractName string, event *types.Event) (bool, error) { - tableExists, err := d.checkForTable(contractName, event.Name) +func (d *eventDatastore) CreateEventTable(contractAddr string, event types.Log) (bool, error) { + tableExists, err := d.checkForTable(contractAddr, event.Name) if err != nil { return false, err } if !tableExists { - err = d.newEventTable(contractName, event) + err = d.newEventTable(contractAddr, event) if err != nil { return false, err } @@ -140,9 +117,9 @@ func (d *eventDatastore) CreateEventTable(contractName string, event *types.Even } // Creates a table for the given contract and event -func (d *eventDatastore) newEventTable(contractAddr string, event *types.Event) error { +func (d *eventDatastore) newEventTable(contractAddr string, event types.Log) error { // Begin pg string - pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) + pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL," // Iterate over event fields, using their name and pgType to grow the string @@ -158,7 +135,7 @@ func (d *eventDatastore) newEventTable(contractAddr string, event *types.Event) // Checks if a table already exists for the given contract and event func (d *eventDatastore) checkForTable(contractAddr string, eventName string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s')", strings.ToLower(contractAddr), strings.ToLower(eventName)) + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName)) var exists bool err := d.DB.Get(&exists, pgStr) @@ -171,6 +148,7 @@ func (d *eventDatastore) CreateContractSchema(contractAddr string) (bool, error) if contractAddr == "" { return false, errors.New("error: no contract address specified") } + schemaExists, err := d.checkForSchema(contractAddr) if err != nil { return false, err diff --git a/pkg/omni/repository/event_repository_test.go b/pkg/omni/repository/event_repository_test.go index b5c343f9..16d4829a 100644 --- a/pkg/omni/repository/event_repository_test.go +++ b/pkg/omni/repository/event_repository_test.go @@ -18,8 +18,6 @@ package repository_test import ( "fmt" - "math/rand" - "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -51,11 +49,11 @@ var _ = Describe("Repository", func() { var db *postgres.DB var dataStore repository.EventDatastore var err error + var log *types.Log var con *contract.Contract var vulcanizeLogId int64 var wantedEvents = []string{"Transfer"} - var event *types.Event - rand.Seed(time.Now().UnixNano()) + var event types.Event BeforeEach(func() { db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{}) @@ -66,7 +64,7 @@ var _ = Describe("Repository", func() { Expect(err).ToNot(HaveOccurred()) c := converter.NewConverter(con) - err = c.Convert(mockEvent, event) + log, err = c.Convert(mockEvent, event) Expect(err).ToNot(HaveOccurred()) dataStore = repository.NewEventDataStore(db) @@ -88,25 +86,25 @@ var _ = Describe("Repository", func() { }) }) - Describe("CreateContractTable", func() { + Describe("CreateEventTable", func() { It("Creates table if it doesn't exist", func() { created, err := dataStore.CreateContractSchema(con.Address) Expect(err).ToNot(HaveOccurred()) Expect(created).To(Equal(true)) - created, err = dataStore.CreateEventTable(con.Address, event) + created, err = dataStore.CreateEventTable(con.Address, *log) Expect(err).ToNot(HaveOccurred()) Expect(created).To(Equal(true)) - created, err = dataStore.CreateEventTable(con.Address, event) + created, err = dataStore.CreateEventTable(con.Address, *log) Expect(err).ToNot(HaveOccurred()) Expect(created).To(Equal(false)) }) }) - Describe("PersistContractEvents", func() { - It("Persists contract event values into custom tables, adding any addresses to a growing list of contract associated addresses", func() { - err = dataStore.PersistContractEvents(con) + Describe("PersistLog", func() { + It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() { + err = dataStore.PersistLog(*log, con.Address, con.Name) Expect(err).ToNot(HaveOccurred()) b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"] @@ -117,9 +115,9 @@ var _ = Describe("Repository", func() { Expect(ok).To(Equal(true)) Expect(b).To(Equal(true)) - log := test_helpers.TransferLog{} + scanLog := test_helpers.TransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.Transfer", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) expectedLog := test_helpers.TransferLog{ Id: 1, VulvanizeLogId: vulcanizeLogId, @@ -130,17 +128,17 @@ var _ = Describe("Repository", func() { To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391", Value: "1097077688018008265106216665536940668749033598146", } - Expect(log).To(Equal(expectedLog)) + Expect(scanLog).To(Equal(expectedLog)) }) It("Doesn't persist duplicate event logs", func() { // Perist once - err = dataStore.PersistContractEvents(con) + err = dataStore.PersistLog(*log, con.Address, con.Name) Expect(err).ToNot(HaveOccurred()) - log := test_helpers.TransferLog{} + scanLog := test_helpers.TransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.Transfer", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) expectedLog := test_helpers.TransferLog{ Id: 1, VulvanizeLogId: vulcanizeLogId, @@ -152,21 +150,21 @@ var _ = Describe("Repository", func() { Value: "1097077688018008265106216665536940668749033598146", } - Expect(log).To(Equal(expectedLog)) + Expect(scanLog).To(Equal(expectedLog)) - // Attempt to persist the same contract again - err = dataStore.PersistContractEvents(con) + // Attempt to persist the same log again + err = dataStore.PersistLog(*log, con.Address, con.Name) Expect(err).ToNot(HaveOccurred()) // Show that no new logs were entered var count int - err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM c%s.Transfer", constants.TusdContractAddress)) + err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM c%s.transfer_event", constants.TusdContractAddress)) Expect(err).ToNot(HaveOccurred()) Expect(count).To(Equal(1)) }) - It("Fails with empty contract", func() { - err = dataStore.PersistContractEvents(&contract.Contract{}) + It("Fails with empty log", func() { + err = dataStore.PersistLog(types.Log{}, con.Address, con.Name) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/omni/repository/method_repository.go b/pkg/omni/repository/method_repository.go index 7a9f5eca..24af230e 100644 --- a/pkg/omni/repository/method_repository.go +++ b/pkg/omni/repository/method_repository.go @@ -17,18 +17,18 @@ package repository import ( + "errors" "fmt" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/omni/contract" - "github.com/vulcanize/vulcanizedb/pkg/omni/types" "strings" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/omni/types" ) type MethodDatastore interface { - PersistContractMethods(con *contract.Contract) error - PersistResults(methodName string, con *contract.Contract) error - CreateMethodTable(contractName string, method *types.Method) error - CreateContractSchema(contractName string) error + PersistResult(method types.Result, contractAddr, contractName string) error + CreateMethodTable(contractAddr string, method types.Result) (bool, error) + CreateContractSchema(contractAddr string) (bool, error) } type methodDatastore struct { @@ -42,68 +42,96 @@ func NewMethodDatastore(db *postgres.DB) *methodDatastore { } } -func (d *methodDatastore) PersistContractMethods(con *contract.Contract) error { - err := d.CreateContractSchema(con.Name) +func (d *methodDatastore) PersistResult(method types.Result, contractAddr, contractName string) error { + if len(method.Args) != len(method.Inputs) { + return errors.New("error: given number of inputs does not match number of method arguments") + } + if len(method.Return) != 1 { + return errors.New("error: given number of outputs does not match number of method return values") + } + + _, err := d.CreateContractSchema(contractAddr) if err != nil { return err } - for _, method := range con.Methods { - err = d.CreateMethodTable(con.Name, method) - if err != nil { - return err - } - - //TODO: Persist method data - + _, err = d.CreateMethodTable(contractAddr, method) + if err != nil { + return err } - return nil + return d.persistResult(method, contractAddr, contractName) } // Creates a custom postgres command to persist logs for the given event -func (d *methodDatastore) PersistResults(methodName string, con *contract.Contract) error { - for _, result := range con.Methods[methodName].Results { - println(result) - //TODO: Persist result data +func (d *methodDatastore) persistResult(method types.Result, contractAddr, contractName string) error { + // Begin postgres string + pgStr := fmt.Sprintf("INSERT INTO c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name)) + pgStr = pgStr + "(token_name, block" + + // Pack the corresponding variables in a slice + var data []interface{} + data = append(data, + contractName, + method.Block) + + // Iterate over method args and return value, adding names + // to the string and pushing values to the slice + counter := 0 // Keep track of number of inputs + for i, arg := range method.Args { + counter += 1 + pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(arg.Name)) // Add underscore after to avoid any collisions with reserved pg words + data = append(data, method.Inputs[i]) + } + + counter += 1 + pgStr = pgStr + ", returned) VALUES ($1, $2" + data = append(data, method.Output) + + // For each input entry we created we add its postgres command variable to the string + for i := 0; i < counter; i++ { + pgStr = pgStr + fmt.Sprintf(", $%d", i+3) + } + pgStr = pgStr + ")" + + _, err := d.DB.Exec(pgStr, data...) + if err != nil { + return err } return nil } // Checks for event table and creates it if it does not already exist -func (d *methodDatastore) CreateMethodTable(contractAddr string, method *types.Method) error { +func (d *methodDatastore) CreateMethodTable(contractAddr string, method types.Result) (bool, error) { tableExists, err := d.checkForTable(contractAddr, method.Name) if err != nil { - return err + return false, err } if !tableExists { err = d.newMethodTable(contractAddr, method) if err != nil { - return err + return false, err } } - return nil + return !tableExists, nil } // Creates a table for the given contract and event -func (d *methodDatastore) newMethodTable(contractAddr string, method *types.Method) error { +func (d *methodDatastore) newMethodTable(contractAddr string, method types.Result) error { // Begin pg string - pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS _%s.%s ", strings.ToLower(contractAddr), strings.ToLower(method.Name)) + pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name)) pgStr = pgStr + "(id SERIAL, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL," // Iterate over method inputs and outputs, using their name and pgType to grow the string - for _, input := range method.Inputs { - pgStr = pgStr + fmt.Sprintf("%s_ %s NOT NULL,", strings.ToLower(input.Name), input.PgType) + for _, arg := range method.Args { + pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(arg.Name), arg.PgType) } - for _, output := range method.Outputs { - pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(output.Name), output.PgType) - } + pgStr = pgStr + fmt.Sprintf(" returned %s NOT NULL)", method.Return[0].PgType) - pgStr = pgStr[:len(pgStr)-1] + ")" _, err := d.DB.Exec(pgStr) return err @@ -111,7 +139,7 @@ func (d *methodDatastore) newMethodTable(contractAddr string, method *types.Meth // Checks if a table already exists for the given contract and event func (d *methodDatastore) checkForTable(contractAddr string, methodName string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '_%s' AND table_name = '%s')", strings.ToLower(contractAddr), strings.ToLower(methodName)) + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_method')", strings.ToLower(contractAddr), strings.ToLower(methodName)) var exists bool err := d.DB.Get(&exists, pgStr) @@ -119,32 +147,36 @@ func (d *methodDatastore) checkForTable(contractAddr string, methodName string) } // Checks for contract schema and creates it if it does not already exist -func (d *methodDatastore) CreateContractSchema(contractName string) error { - schemaExists, err := d.checkForSchema(contractName) +func (d *methodDatastore) CreateContractSchema(contractAddr string) (bool, error) { + if contractAddr == "" { + return false, errors.New("error: no contract address specified") + } + + schemaExists, err := d.checkForSchema(contractAddr) if err != nil { - return err + return false, err } if !schemaExists { - err = d.newContractSchema(contractName) + err = d.newContractSchema(contractAddr) if err != nil { - return err + return false, err } } - return nil + return !schemaExists, nil } // Creates a schema for the given contract func (d *methodDatastore) newContractSchema(contractAddr string) error { - _, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS _" + strings.ToLower(contractAddr)) + _, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr)) return err } // Checks if a schema already exists for the given contract func (d *methodDatastore) checkForSchema(contractAddr string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '_%s')", strings.ToLower(contractAddr)) + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr)) var exists bool err := d.DB.Get(&exists, pgStr) diff --git a/pkg/omni/repository/method_repository_test.go b/pkg/omni/repository/method_repository_test.go index 49683f17..b153ef75 100644 --- a/pkg/omni/repository/method_repository_test.go +++ b/pkg/omni/repository/method_repository_test.go @@ -14,4 +14,98 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package repository +package repository_test + +import ( + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/omni/constants" + "github.com/vulcanize/vulcanizedb/pkg/omni/contract" + "github.com/vulcanize/vulcanizedb/pkg/omni/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/types" +) + +var _ = Describe("Repository", func() { + var db *postgres.DB + var dataStore repository.MethodDatastore + var con *contract.Contract + var err error + var mockResult types.Result + + BeforeEach(func() { + con = test_helpers.SetupTusdContract([]string{}, []string{"balanceOf"}) + method := con.Methods["balanceOf"] + mockResult = types.Result{ + Method: method, + PgType: method.Return[0].PgType, + Inputs: make([]interface{}, 1), + Output: new(interface{}), + Block: 6707323, + } + mockResult.Inputs[0] = "0xfE9e8709d3215310075d67E3ed32A380CCf451C8" + mockResult.Output = "66386309548896882859581786" + db, _ = test_helpers.SetupDBandBC() + dataStore = repository.NewMethodDatastore(db) + }) + + AfterEach(func() { + test_helpers.TearDown(db) + }) + + Describe("CreateContractSchema", func() { + It("Creates schema if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + }) + + Describe("CreateMethodTable", func() { + It("Creates table if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + }) + + Describe("PersistResult", func() { + It("Persists result from method polling in custom pg table", func() { + err = dataStore.PersistResult(mockResult, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct) + expectedLog := test_helpers.BalanceOf{ + Id: 1, + TokenName: "TrueUSD", + Block: 6707323, + Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8", + Balance: "66386309548896882859581786", + } + Expect(scanStruct).To(Equal(expectedLog)) + }) + + It("Fails with empty result", func() { + err = dataStore.PersistResult(types.Result{}, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/pkg/omni/retriever/address_retriever.go b/pkg/omni/retriever/address_retriever.go index 122edf5f..d9388abb 100644 --- a/pkg/omni/retriever/address_retriever.go +++ b/pkg/omni/retriever/address_retriever.go @@ -85,7 +85,7 @@ func (r *addressRetriever) retrieveTransferAddresses(con contract.Contract) ([]s if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses addrs := make([]string, 0) - pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) + pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) err := r.DB.Select(&addrs, pgStr) if err != nil { return []string{}, err @@ -106,7 +106,7 @@ func (r *addressRetriever) retrieveTokenMintees(con contract.Contract) ([]string if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses addrs := make([]string, 0) - pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) + pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) err := r.DB.Select(&addrs, pgStr) if err != nil { return []string{}, err diff --git a/pkg/omni/retriever/address_retriever_test.go b/pkg/omni/retriever/address_retriever_test.go index d61627bd..f95ab5fa 100644 --- a/pkg/omni/retriever/address_retriever_test.go +++ b/pkg/omni/retriever/address_retriever_test.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/omni/types" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -50,6 +51,7 @@ var _ = Describe("Address Retriever Test", func() { var err error var info *contract.Contract var vulcanizeLogId int64 + var log *types.Log var r retriever.AddressRetriever var addresses map[common.Address]bool var wantedEvents = []string{"Transfer"} @@ -63,11 +65,11 @@ var _ = Describe("Address Retriever Test", func() { Expect(err).ToNot(HaveOccurred()) c := converter.NewConverter(info) - err = c.Convert(mockEvent, event) + log, err = c.Convert(mockEvent, event) Expect(err).ToNot(HaveOccurred()) dataStore = repository.NewEventDataStore(db) - dataStore.PersistContractEvents(info) + dataStore.PersistLog(*log, info.Address, info.Name) Expect(err).ToNot(HaveOccurred()) r = retriever.NewAddressRetriever(db) diff --git a/pkg/omni/transformer/integration_test.go b/pkg/omni/transformer/integration_test.go deleted file mode 100644 index bc6e0255..00000000 --- a/pkg/omni/transformer/integration_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package transformer_test diff --git a/pkg/omni/transformer/transformer.go b/pkg/omni/transformer/transformer.go index 3ced8b67..f9afff05 100644 --- a/pkg/omni/transformer/transformer.go +++ b/pkg/omni/transformer/transformer.go @@ -49,19 +49,21 @@ type Transformer interface { } type transformer struct { - Network string // Ethereum network name; default/"" is mainnet - // Database interfaces - datastore.FilterRepository // Holds watched event log filters - datastore.WatchedEventRepository // Holds watched event log views, created with the filters + datastore.FilterRepository // Log filters repo; accepts filters generated by Contract.GenerateFilters() + datastore.WatchedEventRepository // Watched event log views, created by the log filters repository.EventDatastore // Holds transformed watched event log data + // Pre-processing interfaces + parser.Parser // Parses events and methods out of contract abi fetched using contract address + retriever.BlockRetriever // Retrieves first block for contract and current block height + // Processing interfaces - parser.Parser // Parses events out of contract abi fetched with addr - retriever.BlockRetriever // Retrieves first block with contract addr referenced - retriever.AddressRetriever // Retrieves token holder addresses - converter.Converter // Converts watched event logs into custom log - poller.Poller // Polls methods using contract's token holder addresses + converter.Converter // Converts watched event logs into custom log + poller.Poller // Polls methods using contract's token holder addresses and persists them using method datastore + + // Ethereum network name; default "" is mainnet + Network string // Store contract info as mapping to contract address Contracts map[string]*contract.Contract @@ -84,7 +86,7 @@ type transformer struct { func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transformer { return &transformer{ - Poller: poller.NewPoller(BC), + Poller: poller.NewPoller(BC, DB), Parser: parser.NewParser(network), BlockRetriever: retriever.NewBlockRetriever(DB), Converter: converter.NewConverter(&contract.Contract{}), @@ -133,7 +135,7 @@ func (t *transformer) Init() error { // Get contract name var name = new(string) - err = t.PollMethod(t.Abi(), contractAddr, "name", nil, &name, lastBlock) + err = t.FetchContractData(t.Abi(), contractAddr, "name", nil, &name, lastBlock) if err != nil { return errors.New(fmt.Sprintf("unable to fetch contract name: %v\r\n", err)) } @@ -169,22 +171,27 @@ func (t *transformer) Init() error { return err } - // Iterate over filters and push them to the repo + // Iterate over filters and push them to the repo using filter repository interface for _, filter := range info.Filters { t.CreateFilter(filter) } + // Store contract info for further processing t.Contracts[contractAddr] = info } return nil } -// Iterate through contracts, updating the -// converter with each one and using it to -// convert watched event logs. -// Then persist them into the postgres db +// Iterates through stored, initialized contract objects +// Iterates through contract's event filters, grabbing watched event logs +// Uses converter to convert logs into custom log type +// Persists converted logs into custuom postgres tables +// Calls selected methods, using token holder address generated during event log conversion func (tr transformer) Execute() error { + if len(tr.Contracts) == 0 { + return errors.New("error: transformer has no initialized contracts to work with") + } // Iterate through all internal contracts for _, con := range tr.Contracts { @@ -199,20 +206,28 @@ func (tr transformer) Execute() error { return err } - // Iterate over watched event logs and convert them + // Iterate over watched event logs for _, we := range watchedEvents { - err = tr.Converter.Convert(*we, con.Events[eventName]) + // Convert them to our custom log type + log, err := tr.Converter.Convert(*we, con.Events[eventName]) + if err != nil { + return err + } + + // And immediately persist converted logs in repo + // Run this in seperate goroutine? + err = tr.PersistLog(*log, con.Address, con.Name) if err != nil { return err } } } - // After converting all logs for events of interest, persist all of the data - // Change this so that event logs are persisted immediately after being created - // So as to prevent a huge growth of data in the contract memory - err := tr.PersistContractEvents(con) - if err != nil { + // After persisting all watched event logs + // poller polls select contract methods + // and persists the results into custom pg tables + // Run this in seperate goroutine? + if err := tr.PollContract(*con); err != nil { return err } } diff --git a/pkg/omni/transformer/transformer_test.go b/pkg/omni/transformer/transformer_test.go index b6ba48dd..7f3d01eb 100644 --- a/pkg/omni/transformer/transformer_test.go +++ b/pkg/omni/transformer/transformer_test.go @@ -34,14 +34,14 @@ import ( var block1 = core.Block{ Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", - Number: 5194634, + Number: 6194633, Transactions: []core.Transaction{{ Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654aaa", Receipt: core.Receipt{ TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654aaa", ContractAddress: "", Logs: []core.Log{{ - BlockNumber: 5194634, + BlockNumber: 6194633, TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654aaa", Address: constants.TusdContractAddress, Topics: core.Topics{ @@ -155,7 +155,7 @@ var _ = Describe("Transformer", func() { c, ok := t.Contracts[constants.TusdContractAddress] Expect(ok).To(Equal(true)) - Expect(c.StartingBlock).To(Equal(int64(5194634))) + Expect(c.StartingBlock).To(Equal(int64(6194633))) Expect(c.LastBlock).To(Equal(int64(6194634))) Expect(c.Abi).To(Equal(constants.TusdAbiString)) Expect(c.Name).To(Equal("TrueUSD")) @@ -190,6 +190,7 @@ var _ = Describe("Transformer", func() { It("Transforms watched contract data into custom repositories", func() { t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) err = t.Init() Expect(err).ToNot(HaveOccurred()) @@ -197,7 +198,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.TransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee")) @@ -210,6 +211,7 @@ var _ = Describe("Transformer", func() { It("Keeps track of contract-related addresses while transforming event data", func() { t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) err = t.Init() Expect(err).ToNot(HaveOccurred()) @@ -233,5 +235,35 @@ var _ = Describe("Transformer", func() { _, ok = c.TknHolderAddrs["0x"] Expect(ok).To(Equal(false)) }) + + It("Polls given methods using generated token holder address", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Balance).To(Equal("0")) + Expect(res.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + + It("Fails if initialization has not been done", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) + + err = t.Execute() + Expect(err).To(HaveOccurred()) + }) }) }) diff --git a/pkg/omni/types/entities.go b/pkg/omni/types/entities.go index 520c5a0c..c59a00ac 100644 --- a/pkg/omni/types/entities.go +++ b/pkg/omni/types/entities.go @@ -26,16 +26,14 @@ import ( type Event struct { Name string Anonymous bool - Fields []*Field - Logs map[int64]Log // Map of VulcanizeIdLog to parsed event log + Fields []Field } type Method struct { - Name string - Const bool - Inputs []*Field - Outputs []*Field - Results []*Result + Name string + Const bool + Args []Field + Return []Field } type Field struct { @@ -43,15 +41,18 @@ type Field struct { PgType string // Holds type used when committing data held in this field to postgres } -// Struct to hold results from method call with given inputs across different blocks +// Struct to hold instance of result from method call with given inputs and block type Result struct { - Inputs []interface{} // Will only use addresses - Outputs map[int64]interface{} - PgType string // Holds output pg type + Method + Inputs []interface{} // Will only use addresses + Output interface{} + PgType string // Holds output pg type + Block int64 } -// Struct to hold event log data data +// Struct to hold instance of an event log data type Log struct { + Event Id int64 // VulcanizeIdLog Values map[string]string // Map of event input names to their values Block int64 @@ -59,10 +60,10 @@ type Log struct { } // Unpack abi.Event into our custom Event struct -func NewEvent(e abi.Event) *Event { - fields := make([]*Field, len(e.Inputs)) +func NewEvent(e abi.Event) Event { + fields := make([]Field, len(e.Inputs)) for i, input := range e.Inputs { - fields[i] = &Field{} + fields[i] = Field{} fields[i].Name = input.Name fields[i].Type = input.Type fields[i].Indexed = input.Indexed @@ -87,19 +88,18 @@ func NewEvent(e abi.Event) *Event { } } - return &Event{ + return Event{ Name: e.Name, Anonymous: e.Anonymous, Fields: fields, - Logs: map[int64]Log{}, } } // Unpack abi.Method into our custom Method struct -func NewMethod(m abi.Method) *Method { - inputs := make([]*Field, len(m.Inputs)) +func NewMethod(m abi.Method) Method { + inputs := make([]Field, len(m.Inputs)) for i, input := range m.Inputs { - inputs[i] = &Field{} + inputs[i] = Field{} inputs[i].Name = input.Name inputs[i].Type = input.Type inputs[i].Indexed = input.Indexed @@ -123,9 +123,9 @@ func NewMethod(m abi.Method) *Method { } } - outputs := make([]*Field, len(m.Outputs)) + outputs := make([]Field, len(m.Outputs)) for i, output := range m.Outputs { - outputs[i] = &Field{} + outputs[i] = Field{} outputs[i].Name = output.Name outputs[i].Type = output.Type outputs[i].Indexed = output.Indexed @@ -149,12 +149,11 @@ func NewMethod(m abi.Method) *Method { } } - return &Method{ - Name: m.Name, - Const: m.Const, - Inputs: inputs, - Outputs: outputs, - Results: make([]*Result, 0), + return Method{ + Name: m.Name, + Const: m.Const, + Args: inputs, + Return: outputs, } } @@ -169,10 +168,10 @@ func (e Event) Sig() string { } func (m Method) Sig() string { - types := make([]string, len(m.Inputs)) + types := make([]string, len(m.Args)) i := 0 - for _, input := range m.Inputs { - types[i] = input.Type.String() + for _, arg := range m.Args { + types[i] = arg.Type.String() i++ }