From e02b33547d08294a9f50814bb6585af05e445de5 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 23 Nov 2018 22:26:07 -0600 Subject: [PATCH] finishing porting omni watcher to work with light sync; split into full, light, and shared directories and refactor as much into shared; finish lightSync omni watcher tests --- README.md | 4 +- cmd/lightOmniWatcher.go | 44 +-- cmd/omniWatcher.go | 28 +- ...468317_create_checked_headers_table.up.sql | 3 +- pkg/omni/.DS_Store | Bin 6148 -> 0 bytes pkg/omni/full/converter/converter.go | 16 +- pkg/omni/full/converter/converter_test.go | 57 ++- pkg/omni/full/repository/event_repository.go | 182 --------- .../full/repository/event_repository_test.go | 171 --------- .../full/retriever/retriever_suite_test.go | 2 +- pkg/omni/full/transformer/transformer.go | 20 +- pkg/omni/full/transformer/transformer_test.go | 19 +- pkg/omni/light/converter/converter.go | 74 +++- pkg/omni/light/converter/converter_test.go | 95 +++++ pkg/omni/light/repository/event_repository.go | 181 --------- .../light/repository/event_repository_test.go | 17 - .../light/repository/header_repository.go | 47 ++- .../repository/header_repository_test.go | 124 +++++++ .../light/retriever/block_retriever_test.go | 13 +- .../light/retriever/retriever_suite_test.go | 2 +- pkg/omni/light/transformer/transformer.go | 79 ++-- .../light/transformer/transformer_test.go | 107 +++++- pkg/omni/shared/contract/contract_test.go | 5 +- .../shared/helpers/test_helpers/database.go | 47 ++- .../test_helpers/{ => mocks}/entities.go | 47 ++- .../{ => test_helpers}/mocks/parser.go | 0 pkg/omni/shared/parser/parser_test.go | 2 +- pkg/omni/shared/poller/poller.go | 4 +- pkg/omni/shared/poller/poller_test.go | 125 ++++--- .../shared/repository/event_repository.go | 316 ++++++++++++++++ .../repository/event_repository_test.go | 349 ++++++++++++++++++ .../shared/repository/method_repository.go | 106 ++++-- .../repository/method_repository_test.go | 218 ++++++++--- .../retriever/address_retriever.go | 15 +- .../retriever/address_retriever_test.go | 12 +- .../retriever/retriever_suite_test.go} | 6 +- pkg/omni/shared/types/event.go | 4 +- pkg/omni/shared/types/mode.go | 64 ++++ 38 files changed, 1715 insertions(+), 890 deletions(-) delete mode 100644 pkg/omni/.DS_Store delete mode 100644 pkg/omni/full/repository/event_repository.go delete mode 100644 pkg/omni/full/repository/event_repository_test.go delete mode 100644 pkg/omni/light/repository/event_repository.go delete mode 100644 pkg/omni/light/repository/event_repository_test.go rename pkg/omni/shared/helpers/test_helpers/{ => mocks}/entities.go (65%) rename pkg/omni/shared/helpers/{ => test_helpers}/mocks/parser.go (100%) create mode 100644 pkg/omni/shared/repository/event_repository.go create mode 100644 pkg/omni/shared/repository/event_repository_test.go rename pkg/omni/{full => shared}/retriever/address_retriever.go (86%) rename pkg/omni/{full => shared}/retriever/address_retriever_test.go (92%) rename pkg/omni/{full/repository/repository_suite_test.go => shared/retriever/retriever_suite_test.go} (89%) create mode 100644 pkg/omni/shared/types/mode.go diff --git a/README.md b/README.md index 2ccd9df8..414c06c2 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,10 @@ Vulcanize DB is a set of tools that make it easier for developers to write application-specific indexes and caches for dapps built on Ethereum. ## Dependencies - - Go 1.9+ + - Go 1.11+ - Postgres 10 - Ethereum Node - - [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8+) + - [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8.18+) - [Parity 1.8.11+](https://github.com/paritytech/parity/releases) ## Project Setup diff --git a/cmd/lightOmniWatcher.go b/cmd/lightOmniWatcher.go index 4aae5bf3..199eaed2 100644 --- a/cmd/lightOmniWatcher.go +++ b/cmd/lightOmniWatcher.go @@ -17,11 +17,8 @@ package cmd import ( - "bufio" "fmt" "log" - "os" - "strings" "time" "github.com/spf13/cobra" @@ -59,29 +56,6 @@ func lightOmniWatcher() { log.Fatal("Contract address required") } - if len(contractEvents) == 0 || len(contractMethods) == 0 { - var str string - for str != "y" { - reader := bufio.NewReader(os.Stdin) - if len(contractEvents) == 0 && len(contractMethods) == 0 { - fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ") - } else if len(contractEvents) == 0 { - fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ") - } else { - fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ") - } - resp, err := reader.ReadBytes('\n') - if err != nil { - log.Fatal(err) - } - - str = strings.ToLower(string(resp)) - if str == "n" { - return - } - } - } - ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -114,13 +88,13 @@ func lightOmniWatcher() { func init() { rootCmd.AddCommand(lightOmniWatcherCmd) - omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for") - omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address") - omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched") - omniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled") - omniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses") - omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses") - omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) - omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") - omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") + lightOmniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for") + lightOmniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address") + lightOmniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched") + lightOmniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled") + lightOmniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses") + lightOmniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses") + lightOmniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) + lightOmniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") + lightOmniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") } diff --git a/cmd/omniWatcher.go b/cmd/omniWatcher.go index 8c30268a..566b6672 100644 --- a/cmd/omniWatcher.go +++ b/cmd/omniWatcher.go @@ -17,11 +17,8 @@ package cmd import ( - "bufio" "fmt" "log" - "os" - "strings" "time" "github.com/spf13/cobra" @@ -59,29 +56,6 @@ func omniWatcher() { log.Fatal("Contract address required") } - if len(contractEvents) == 0 || len(contractMethods) == 0 { - var str string - for str != "y" { - reader := bufio.NewReader(os.Stdin) - if len(contractEvents) == 0 && len(contractMethods) == 0 { - fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ") - } else if len(contractEvents) == 0 { - fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ") - } else { - fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ") - } - resp, err := reader.ReadBytes('\n') - if err != nil { - log.Fatal(err) - } - - str = strings.ToLower(string(resp)) - if str == "n" { - return - } - } - } - ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -122,5 +96,5 @@ func init() { omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses") omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`) omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists") - omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") + omniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block") } diff --git a/db/migrations/1532468317_create_checked_headers_table.up.sql b/db/migrations/1532468317_create_checked_headers_table.up.sql index 6a39e0dd..638d759c 100644 --- a/db/migrations/1532468317_create_checked_headers_table.up.sql +++ b/db/migrations/1532468317_create_checked_headers_table.up.sql @@ -1,5 +1,4 @@ CREATE TABLE public.checked_headers ( id SERIAL PRIMARY KEY, - header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE, - price_feeds_checked BOOLEAN NOT NULL DEFAULT FALSE + header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE ); \ No newline at end of file diff --git a/pkg/omni/.DS_Store b/pkg/omni/.DS_Store deleted file mode 100644 index 1150c6cb414550592fb53c701aef0ab740abd16c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKIc~#13>+pd47hQba=(xtEDYxb{y+lcXc8Gvj#`!9m8WHRAb=pJNRcKO0_5y+ zIV)%iQ3PPS_xo#L3t&xm#L2_b{M>zE7nLz0op-$A5l4)8!Na2Ze8RaKypeN#_^K@mUfE17dQa}nwfnO@%y_dG$Bq~Y)DIf*D74YvvqdWG(DKS1B3^4)_ zS4@X-9kT?nd4kvrr$lCGmQ-R=twsz>I`ggSdf}9qbXW}^R!_E?P%NI#`&*R5dZMBf zkOFfBE_1u}{(nRNVg8?!w37l-;9n_VtNmfW<11BfUA&z4+D5;nd(9`^jq9K=L^~!% iJLbmQ@l_OMUGp{1d*PHAbmoIj)X#wHB9j7tt-u$6lom1o diff --git a/pkg/omni/full/converter/converter.go b/pkg/omni/full/converter/converter.go index 69b0b629..547acbff 100644 --- a/pkg/omni/full/converter/converter.go +++ b/pkg/omni/full/converter/converter.go @@ -74,22 +74,25 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) ( // Postgres cannot handle custom types, resolve to strings switch input.(type) { case *big.Int: - var b *big.Int - b = input.(*big.Int) + b := input.(*big.Int) strValues[fieldName] = b.String() case common.Address: - var a common.Address - a = input.(common.Address) + a := input.(common.Address) strValues[fieldName] = a.String() c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses case common.Hash: - var h common.Hash - h = input.(common.Hash) + h := input.(common.Hash) strValues[fieldName] = h.String() case string: strValues[fieldName] = input.(string) case bool: strValues[fieldName] = strconv.FormatBool(input.(bool)) + case []byte: + b := input.([]byte) + strValues[fieldName] = string(b) + case byte: + b := input.(byte) + strValues[fieldName] = string(b) default: return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input)) } @@ -98,7 +101,6 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) ( // Only hold onto logs that pass our address filter, if any if c.ContractInfo.PassesEventFilter(strValues) { eventLog := &types.Log{ - Event: event, Id: watchedEvent.LogID, Values: strValues, Block: watchedEvent.BlockNumber, diff --git a/pkg/omni/full/converter/converter_test.go b/pkg/omni/full/converter/converter_test.go index 7ce0c00b..c5bfc3e7 100644 --- a/pkg/omni/full/converter/converter_test.go +++ b/pkg/omni/full/converter/converter_test.go @@ -25,40 +25,41 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) var _ = Describe("Converter", func() { - var info *contract.Contract + var con *contract.Contract var wantedEvents = []string{"Transfer"} var err error BeforeEach(func() { - info = test_helpers.SetupTusdContract(wantedEvents, []string{}) + con = test_helpers.SetupTusdContract(wantedEvents, []string{}) }) Describe("Update", func() { - It("Updates contract info held by the converter", func() { - c := converter.NewConverter(info) - Expect(c.ContractInfo).To(Equal(info)) + It("Updates contract con held by the converter", func() { + c := converter.NewConverter(con) + Expect(c.ContractInfo).To(Equal(con)) - info := test_helpers.SetupTusdContract([]string{}, []string{}) - c.Update(info) - Expect(c.ContractInfo).To(Equal(info)) + con := test_helpers.SetupTusdContract([]string{}, []string{}) + c.Update(con) + Expect(c.ContractInfo).To(Equal(con)) }) }) Describe("Convert", func() { It("Converts a watched event log to mapping of event input names to values", func() { - _, ok := info.Events["Approval"] + _, ok := con.Events["Approval"] Expect(ok).To(Equal(false)) - event, ok := info.Events["Transfer"] + event, ok := con.Events["Transfer"] Expect(ok).To(Equal(true)) - err = info.GenerateFilters() + err = con.GenerateFilters() Expect(err).ToNot(HaveOccurred()) - c := converter.NewConverter(info) - log, err := c.Convert(test_helpers.MockTranferEvent, event) + c := converter.NewConverter(con) + log, err := c.Convert(mocks.MockTranferEvent, event) Expect(err).ToNot(HaveOccurred()) from := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21") @@ -72,10 +73,36 @@ var _ = Describe("Converter", func() { Expect(v).To(Equal(value.String())) }) + It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() { + event, ok := con.Events["Transfer"] + Expect(ok).To(Equal(true)) + + c := converter.NewConverter(con) + _, err := c.Convert(mocks.MockTranferEvent, event) + Expect(err).ToNot(HaveOccurred()) + + b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + _, ok = con.TknHolderAddrs["0x"] + Expect(ok).To(Equal(false)) + + _, ok = con.TknHolderAddrs[""] + Expect(ok).To(Equal(false)) + + _, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"] + Expect(ok).To(Equal(false)) + }) + It("Fails with an empty contract", func() { - event := info.Events["Transfer"] + event := con.Events["Transfer"] c := converter.NewConverter(&contract.Contract{}) - _, err = c.Convert(test_helpers.MockTranferEvent, event) + _, err = c.Convert(mocks.MockTranferEvent, event) Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/omni/full/repository/event_repository.go b/pkg/omni/full/repository/event_repository.go deleted file mode 100644 index e0998308..00000000 --- a/pkg/omni/full/repository/event_repository.go +++ /dev/null @@ -1,182 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository - -import ( - "errors" - "fmt" - "strings" - - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" -) - -// Event repository is used to persist event data into custom tables -type EventRepository interface { - PersistLog(event types.Log, contractAddr, contractName string) error - CreateEventTable(contractName string, event types.Log) (bool, error) - CreateContractSchema(contractName string) (bool, error) -} - -type eventRepository struct { - db *postgres.DB -} - -func NewEventRepository(db *postgres.DB) *eventRepository { - - return &eventRepository{ - db: db, - } -} - -// Creates a schema for the contract if needed -// Creates table for the watched contract event if needed -// Persists converted event log data into this custom table -func (d *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error { - _, err := d.CreateContractSchema(contractAddr) - if err != nil { - return err - } - - _, err = d.CreateEventTable(contractAddr, event) - if err != nil { - return err - } - - return d.persistLog(event, contractAddr, contractName) -} - -// Creates a custom postgres command to persist logs for the given event -func (d *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error { - // Begin postgres string - pgStr := fmt.Sprintf("INSERT INTO c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) - pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx" - - // Pack the corresponding variables in a slice - var data []interface{} - data = append(data, - event.Id, - contractName, - event.Block, - event.Tx) - - // Iterate over name-value pairs in the log adding - // names to the string and pushing values to the slice - counter := 0 // Keep track of number of inputs - for inputName, input := range event.Values { - counter += 1 - pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words - data = append(data, input) - } - - // Finish off the string and execute the command using the packed data - // For each input entry we created we add its postgres command variable to the string - pgStr = pgStr + ") VALUES ($1, $2, $3, $4" - for i := 0; i < counter; i++ { - pgStr = pgStr + fmt.Sprintf(", $%d", i+5) - } - pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" - - _, err := d.db.Exec(pgStr, data...) - if err != nil { - return err - } - - return nil -} - -// Checks for event table and creates it if it does not already exist -func (d *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) { - tableExists, err := d.checkForTable(contractAddr, event.Name) - if err != nil { - return false, err - } - - if !tableExists { - err = d.newEventTable(contractAddr, event) - if err != nil { - return false, err - } - } - - return !tableExists, nil -} - -// Creates a table for the given contract and event -func (d *eventRepository) newEventTable(contractAddr string, event types.Log) error { - // Begin pg string - pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) - pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL," - - // Iterate over event fields, using their name and pgType to grow the string - for _, field := range event.Fields { - pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType) - } - - pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)" - _, err := d.db.Exec(pgStr) - - return err -} - -// Checks if a table already exists for the given contract and event -func (d *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName)) - - var exists bool - err := d.db.Get(&exists, pgStr) - - return exists, err -} - -// Checks for contract schema and creates it if it does not already exist -func (d *eventRepository) CreateContractSchema(contractAddr string) (bool, error) { - if contractAddr == "" { - return false, errors.New("error: no contract address specified") - } - - schemaExists, err := d.checkForSchema(contractAddr) - if err != nil { - return false, err - } - - if !schemaExists { - err = d.newContractSchema(contractAddr) - if err != nil { - return false, err - } - } - - return !schemaExists, nil -} - -// Creates a schema for the given contract -func (d *eventRepository) newContractSchema(contractAddr string) error { - _, err := d.db.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr)) - - return err -} - -// Checks if a schema already exists for the given contract -func (d *eventRepository) checkForSchema(contractAddr string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr)) - - var exists bool - err := d.db.QueryRow(pgStr).Scan(&exists) - - return exists, err -} diff --git a/pkg/omni/full/repository/event_repository_test.go b/pkg/omni/full/repository/event_repository_test.go deleted file mode 100644 index bcaa3704..00000000 --- a/pkg/omni/full/repository/event_repository_test.go +++ /dev/null @@ -1,171 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository_test - -import ( - "fmt" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" -) - -var mockEvent = core.WatchedEvent{ - Name: constants.TransferEvent.String(), - BlockNumber: 5488076, - Address: constants.TusdContractAddress, - TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", - Index: 110, - Topic0: constants.TransferEvent.Signature(), - Topic1: "0x000000000000000000000000000000000000000000000000000000000000af21", - Topic2: "0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391", - Topic3: "", - Data: "0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe", -} - -var _ = Describe("Repository", func() { - var db *postgres.DB - var dataStore repository.EventRepository - var err error - var log *types.Log - var con *contract.Contract - var vulcanizeLogId int64 - var wantedEvents = []string{"Transfer"} - var event types.Event - - BeforeEach(func() { - db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{}) - mockEvent.LogID = vulcanizeLogId - - event = con.Events["Transfer"] - err = con.GenerateFilters() - Expect(err).ToNot(HaveOccurred()) - - c := converter.NewConverter(con) - log, err = c.Convert(mockEvent, event) - Expect(err).ToNot(HaveOccurred()) - - dataStore = repository.NewEventRepository(db) - }) - - AfterEach(func() { - test_helpers.TearDown(db) - }) - - Describe("CreateContractSchema", func() { - It("Creates schema if it doesn't exist", func() { - created, err := dataStore.CreateContractSchema(con.Address) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) - - created, err = dataStore.CreateContractSchema(con.Address) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(false)) - }) - }) - - Describe("CreateEventTable", func() { - It("Creates table if it doesn't exist", func() { - created, err := dataStore.CreateContractSchema(con.Address) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) - - created, err = dataStore.CreateEventTable(con.Address, *log) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) - - created, err = dataStore.CreateEventTable(con.Address, *log) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(false)) - }) - }) - - Describe("PersistLog", func() { - It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() { - err = dataStore.PersistLog(*log, con.Address, con.Name) - Expect(err).ToNot(HaveOccurred()) - - b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - - b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"] - Expect(ok).To(Equal(true)) - Expect(b).To(Equal(true)) - - scanLog := test_helpers.TransferLog{} - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) - expectedLog := test_helpers.TransferLog{ - Id: 1, - VulvanizeLogId: vulcanizeLogId, - TokenName: "TrueUSD", - Block: 5488076, - Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", - From: "0x000000000000000000000000000000000000Af21", - To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391", - Value: "1097077688018008265106216665536940668749033598146", - } - Expect(scanLog).To(Equal(expectedLog)) - }) - - It("Doesn't persist duplicate event logs", func() { - // Perist once - err = dataStore.PersistLog(*log, con.Address, con.Name) - Expect(err).ToNot(HaveOccurred()) - - scanLog := test_helpers.TransferLog{} - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) - expectedLog := test_helpers.TransferLog{ - Id: 1, - VulvanizeLogId: vulcanizeLogId, - TokenName: "TrueUSD", - Block: 5488076, - Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", - From: "0x000000000000000000000000000000000000Af21", - To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391", - Value: "1097077688018008265106216665536940668749033598146", - } - - Expect(scanLog).To(Equal(expectedLog)) - - // Attempt to persist the same log again - err = dataStore.PersistLog(*log, con.Address, con.Name) - Expect(err).ToNot(HaveOccurred()) - - // Show that no new logs were entered - var count int - err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM c%s.transfer_event", constants.TusdContractAddress)) - Expect(err).ToNot(HaveOccurred()) - Expect(count).To(Equal(1)) - }) - - It("Fails with empty log", func() { - err = dataStore.PersistLog(types.Log{}, con.Address, con.Name) - Expect(err).To(HaveOccurred()) - }) - }) -}) diff --git a/pkg/omni/full/retriever/retriever_suite_test.go b/pkg/omni/full/retriever/retriever_suite_test.go index d5815a7c..948f5e8e 100644 --- a/pkg/omni/full/retriever/retriever_suite_test.go +++ b/pkg/omni/full/retriever/retriever_suite_test.go @@ -27,7 +27,7 @@ import ( func TestRetriever(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Full Retriever Suite Test") + RunSpecs(t, "Full Block Number Retriever Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/omni/full/transformer/transformer.go b/pkg/omni/full/transformer/transformer.go index b9987fcb..2191954f 100644 --- a/pkg/omni/full/transformer/transformer.go +++ b/pkg/omni/full/transformer/transformer.go @@ -19,18 +19,17 @@ package transformer import ( "errors" "fmt" - "log" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository" "github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) // Requires a fully synced vDB and a running eth node (or infura) @@ -71,14 +70,14 @@ type transformer struct { // Transformer takes in config for blockchain, database, and network id func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transformer { return &transformer{ - Poller: poller.NewPoller(BC, DB), + Poller: poller.NewPoller(BC, DB, types.FullSync), Parser: parser.NewParser(network), BlockRetriever: retriever.NewBlockRetriever(DB), Converter: converter.NewConverter(&contract.Contract{}), Contracts: map[string]*contract.Contract{}, WatchedEventRepository: repositories.WatchedEventRepository{DB: DB}, FilterRepository: repositories.FilterRepository{DB: DB}, - EventRepository: repository.NewEventRepository(DB), + EventRepository: repository.NewEventRepository(DB, types.FullSync), WatchedEvents: map[string][]string{}, WantedMethods: map[string][]string{}, ContractRanges: map[string][2]int64{}, @@ -184,27 +183,26 @@ func (tr transformer) Execute() error { tr.Update(con) // Iterate through contract filters and get watched event logs - for eventName, filter := range con.Filters { + for eventName := range con.Filters { watchedEvents, err := tr.GetWatchedEvents(eventName) if err != nil { - log.Println(fmt.Sprintf("Error fetching events for %s:", filter.Name), err) return err } // Iterate over watched event logs for _, we := range watchedEvents { // Convert them to our custom log type - log, err := tr.Converter.Convert(*we, con.Events[eventName]) + cstm, err := tr.Converter.Convert(*we, con.Events[eventName]) if err != nil { return err } - if log == nil { - break + if cstm == nil { + continue } // If log is not empty, immediately persist in repo // Run this in seperate goroutine? - err = tr.PersistLog(*log, con.Address, con.Name) + err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventName], con.Address, con.Name) if err != nil { return err } diff --git a/pkg/omni/full/transformer/transformer_test.go b/pkg/omni/full/transformer/transformer_test.go index 83a72073..ef1f9327 100644 --- a/pkg/omni/full/transformer/transformer_test.go +++ b/pkg/omni/full/transformer/transformer_test.go @@ -30,6 +30,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/full/transformer" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) var _ = Describe("Transformer", func() { @@ -95,8 +96,8 @@ var _ = Describe("Transformer", func() { Describe("Init", func() { It("Initializes transformer's contract objects", func() { - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1) - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2) t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) err = t.Init() @@ -120,8 +121,8 @@ var _ = Describe("Transformer", func() { }) It("Does nothing if watched events are unset", func() { - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1) - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2) t := transformer.NewTransformer("", blockChain, db) err = t.Init() Expect(err).ToNot(HaveOccurred()) @@ -133,8 +134,8 @@ var _ = Describe("Transformer", func() { Describe("Execute", func() { BeforeEach(func() { - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1) - blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1) + blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2) }) It("Transforms watched contract data into custom repositories", func() { @@ -148,7 +149,7 @@ var _ = Describe("Transformer", func() { Expect(err).ToNot(HaveOccurred()) log := test_helpers.TransferLog{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log) // We don't know vulcID, so compare individual fields instead of complete structures Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee")) @@ -198,12 +199,12 @@ var _ = Describe("Transformer", func() { res := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) Expect(err).ToNot(HaveOccurred()) Expect(res.Balance).To(Equal("0")) Expect(res.TokenName).To(Equal("TrueUSD")) - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res) Expect(err).To(HaveOccurred()) }) diff --git a/pkg/omni/light/converter/converter.go b/pkg/omni/light/converter/converter.go index 63348445..a92ee665 100644 --- a/pkg/omni/light/converter/converter.go +++ b/pkg/omni/light/converter/converter.go @@ -17,16 +17,22 @@ package converter import ( + "encoding/json" "errors" + "fmt" + "math/big" + "strconv" - geth "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) type Converter interface { - Convert(log geth.Log, event types.Event) (*types.Log, error) + Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) Update(info *contract.Contract) } @@ -45,6 +51,66 @@ func (c *converter) Update(info *contract.Contract) { } // Convert the given watched event log into a types.Log for the given event -func (c *converter) Convert(log geth.Log, event types.Event) (*types.Log, error) { - return nil, errors.New("implement me") +func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) { + contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) + returnLogs := make([]types.Log, 0, len(logs)) + for _, log := range logs { + values := make(map[string]interface{}) + for _, field := range event.Fields { + var i interface{} + values[field.Name] = i + } + + err := contract.UnpackLogIntoMap(values, event.Name, log) + if err != nil { + return nil, err + } + + strValues := make(map[string]string, len(values)) + for fieldName, input := range values { + // Postgres cannot handle custom types, resolve everything to strings + switch input.(type) { + case *big.Int: + b := input.(*big.Int) + strValues[fieldName] = b.String() + case common.Address: + a := input.(common.Address) + strValues[fieldName] = a.String() + c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses + case common.Hash: + h := input.(common.Hash) + strValues[fieldName] = h.String() + case string: + strValues[fieldName] = input.(string) + case bool: + strValues[fieldName] = strconv.FormatBool(input.(bool)) + case []byte: + b := input.([]byte) + strValues[fieldName] = string(b) + case byte: + b := input.(byte) + strValues[fieldName] = string(b) + default: + return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input)) + } + } + + // Only hold onto logs that pass our address filter, if any + if c.ContractInfo.PassesEventFilter(strValues) { + raw, err := json.Marshal(log) + if err != nil { + return nil, err + } + + returnLogs = append(returnLogs, types.Log{ + LogIndex: log.Index, + Values: strValues, + Raw: raw, + TransactionIndex: log.TxIndex, + Id: headerID, + }) + } + } + + return returnLogs, nil } diff --git a/pkg/omni/light/converter/converter_test.go b/pkg/omni/light/converter/converter_test.go index a4d4e8ac..96a375ca 100644 --- a/pkg/omni/light/converter/converter_test.go +++ b/pkg/omni/light/converter/converter_test.go @@ -15,3 +15,98 @@ // along with this program. If not, see . package converter_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/omni/light/converter" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" +) + +var _ = Describe("Converter", func() { + var con *contract.Contract + var wantedEvents = []string{"Transfer", "Mint"} + var err error + + BeforeEach(func() { + con = test_helpers.SetupTusdContract(wantedEvents, []string{}) + }) + + Describe("Update", func() { + It("Updates contract info held by the converter", func() { + c := converter.NewConverter(con) + Expect(c.ContractInfo).To(Equal(con)) + + info := test_helpers.SetupTusdContract([]string{}, []string{}) + c.Update(info) + Expect(c.ContractInfo).To(Equal(info)) + }) + }) + + Describe("Convert", func() { + It("Converts a watched event log to mapping of event input names to values", func() { + _, ok := con.Events["Approval"] + Expect(ok).To(Equal(false)) + + event, ok := con.Events["Transfer"] + Expect(ok).To(Equal(true)) + + c := converter.NewConverter(con) + logs, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232) + Expect(err).ToNot(HaveOccurred()) + Expect(len(logs)).To(Equal(2)) + + sender1 := common.HexToAddress("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391") + sender2 := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21") + value := helpers.BigFromString("1097077688018008265106216665536940668749033598146") + + Expect(logs[0].Values["to"]).To(Equal(sender1.String())) + Expect(logs[0].Values["from"]).To(Equal(sender2.String())) + Expect(logs[0].Values["value"]).To(Equal(value.String())) + Expect(logs[0].Id).To(Equal(int64(232))) + Expect(logs[1].Values["to"]).To(Equal(sender2.String())) + Expect(logs[1].Values["from"]).To(Equal(sender1.String())) + Expect(logs[1].Values["value"]).To(Equal(value.String())) + Expect(logs[1].Id).To(Equal(int64(232))) + }) + + It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() { + event, ok := con.Events["Transfer"] + Expect(ok).To(Equal(true)) + + c := converter.NewConverter(con) + _, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232) + Expect(err).ToNot(HaveOccurred()) + + b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + _, ok = con.TknHolderAddrs["0x"] + Expect(ok).To(Equal(false)) + + _, ok = con.TknHolderAddrs[""] + Expect(ok).To(Equal(false)) + + _, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"] + Expect(ok).To(Equal(false)) + }) + + It("Fails with an empty contract", func() { + event := con.Events["Transfer"] + c := converter.NewConverter(&contract.Contract{}) + _, err = c.Convert([]types.Log{mocks.MockTransferLog1}, event, 232) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/pkg/omni/light/repository/event_repository.go b/pkg/omni/light/repository/event_repository.go deleted file mode 100644 index 14e6b256..00000000 --- a/pkg/omni/light/repository/event_repository.go +++ /dev/null @@ -1,181 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository - -import ( - "errors" - "fmt" - "strings" - - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" -) - -type EventRepository interface { - PersistLog(event types.Log, contractAddr, contractName string) error - CreateEventTable(contractName string, event types.Log) (bool, error) - CreateContractSchema(contractName string) (bool, error) -} - -type eventRepository struct { - db *postgres.DB -} - -func NewEventRepository(db *postgres.DB) *eventRepository { - return &eventRepository{ - db: db, - } -} - -// Creates a schema for the contract if needed -// Creates table for the watched contract event if needed -// Persists converted event log data into this custom table -func (r *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error { - _, err := r.CreateContractSchema(contractAddr) - if err != nil { - return err - } - - _, err = r.CreateEventTable(contractAddr, event) - if err != nil { - return err - } - - return r.persistLog(event, contractAddr, contractName) -} - -// Creates a custom postgres command to persist logs for the given event -func (r *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error { - // Begin postgres string - pgStr := fmt.Sprintf("INSERT INTO l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) - pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx" - - // Pack the corresponding variables in a slice - var data []interface{} - data = append(data, - event.Id, - contractName, - event.Raw, - event.LogIndex, - event.TransactionIndex) - - // Iterate over name-value pairs in the log adding - // names to the string and pushing values to the slice - counter := 0 // Keep track of number of inputs - for inputName, input := range event.Values { - counter += 1 - pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words - data = append(data, input) - } - - // Finish off the string and execute the command using the packed data - // For each input entry we created we add its postgres command variable to the string - pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5" - for i := 0; i < counter; i++ { - pgStr = pgStr + fmt.Sprintf(", $%d", i+6) - } - pgStr = pgStr + ")" - - _, err := r.db.Exec(pgStr, data...) - if err != nil { - return err - } - - return nil -} - -// Checks for event table and creates it if it does not already exist -func (r *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) { - tableExists, err := r.checkForTable(contractAddr, event.Name) - if err != nil { - return false, err - } - - if !tableExists { - err = r.newEventTable(contractAddr, event) - if err != nil { - return false, err - } - } - - return !tableExists, nil -} - -// Creates a table for the given contract and event -func (r *eventRepository) newEventTable(contractAddr string, event types.Log) error { - // Begin pg string - pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name)) - pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL," - - // Iterate over event fields, using their name and pgType to grow the string - for _, field := range event.Fields { - pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType) - } - - pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))" - _, err := r.db.Exec(pgStr) - - return err -} - -// Checks if a table already exists for the given contract and event -func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'l%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName)) - - var exists bool - err := r.db.Get(&exists, pgStr) - - return exists, err -} - -// Checks for contract schema and creates it if it does not already exist -func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) { - if contractAddr == "" { - return false, errors.New("error: no contract address specified") - } - - schemaExists, err := r.checkForSchema(contractAddr) - if err != nil { - return false, err - } - - if !schemaExists { - err = r.newContractSchema(contractAddr) - if err != nil { - return false, err - } - } - - return !schemaExists, nil -} - -// Creates a schema for the given contract -func (r *eventRepository) newContractSchema(contractAddr string) error { - _, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS l" + strings.ToLower(contractAddr)) - - return err -} - -// Checks if a schema already exists for the given contract -func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'l%s')", strings.ToLower(contractAddr)) - - var exists bool - err := r.db.QueryRow(pgStr).Scan(&exists) - - return exists, err -} diff --git a/pkg/omni/light/repository/event_repository_test.go b/pkg/omni/light/repository/event_repository_test.go deleted file mode 100644 index 68165a0d..00000000 --- a/pkg/omni/light/repository/event_repository_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2018 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository_test diff --git a/pkg/omni/light/repository/header_repository.go b/pkg/omni/light/repository/header_repository.go index 668e64e7..8e9ce00f 100644 --- a/pkg/omni/light/repository/header_repository.go +++ b/pkg/omni/light/repository/header_repository.go @@ -17,30 +17,61 @@ package repository import ( + "database/sql" + "github.com/hashicorp/golang-lru" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +const columnCacheSize = 1000 + type HeaderRepository interface { + AddCheckColumn(eventID string) error MarkHeaderChecked(headerID int64, eventID string) error MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error) + CheckCache(key string) (interface{}, bool) } type headerRepository struct { - db *postgres.DB + db *postgres.DB + columns *lru.Cache // Cache created columns to minimize db connections } func NewHeaderRepository(db *postgres.DB) *headerRepository { + ccs, _ := lru.New(columnCacheSize) return &headerRepository{ - db: db, + db: db, + columns: ccs, } } +func (r *headerRepository) AddCheckColumn(eventID string) error { + // Check cache to see if column already exists before querying pg + _, ok := r.columns.Get(eventID) + if ok { + return nil + } + + pgStr := "ALTER TABLE public.checked_headers ADD COLUMN IF NOT EXISTS " + pgStr = pgStr + eventID + " BOOLEAN NOT NULL DEFAULT FALSE" + _, err := r.db.Exec(pgStr) + if err != nil { + return err + } + + // Add column name to cache + r.columns.Add(eventID, true) + + return nil +} + func (r *headerRepository) MarkHeaderChecked(headerID int64, eventID string) error { _, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) VALUES ($1, $2) ON CONFLICT (header_id) DO UPDATE SET `+eventID+` = $2`, headerID, true) + return err } @@ -68,3 +99,15 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock return result, err } + +func (r *headerRepository) CheckCache(key string) (interface{}, bool) { + return r.columns.Get(key) +} + +func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error { + _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET `+eventID+` = $2`, headerID, true) + return err +} diff --git a/pkg/omni/light/repository/header_repository_test.go b/pkg/omni/light/repository/header_repository_test.go index 68165a0d..f5f4ece3 100644 --- a/pkg/omni/light/repository/header_repository_test.go +++ b/pkg/omni/light/repository/header_repository_test.go @@ -15,3 +15,127 @@ // along with this program. If not, see . package repository_test + +import ( + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" +) + +var _ = Describe("Repository", func() { + var db *postgres.DB + var r repository.HeaderRepository + var headerRepository repositories.HeaderRepository + var eventID, query string + + BeforeEach(func() { + db, _ = test_helpers.SetupDBandBC() + r = repository.NewHeaderRepository(db) + headerRepository = repositories.NewHeaderRepository(db) + eventID = "eventName_contractAddr" + }) + + AfterEach(func() { + test_helpers.TearDown(db) + }) + + Describe("AddCheckColumn", func() { + It("Creates a column for the given eventID to mark if the header has been checked for that event", func() { + query = fmt.Sprintf("SELECT %s FROM checked_headers", eventID) + _, err := db.Exec(query) + Expect(err).To(HaveOccurred()) + + err = r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + _, err = db.Exec(query) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + _, ok := r.CheckCache(eventID) + Expect(ok).To(Equal(false)) + + err := r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + v, ok := r.CheckCache(eventID) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("MissingHeaders", func() { + It("Returns all unchecked headers for the given eventID", func() { + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + err := r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + }) + + It("Fails if eventID does not yet exist in check_headers table", func() { + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + err := r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + _, err = r.MissingHeaders(6194630, 6194635, "notEventId") + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("MarkHeaderChecked", func() { + It("Marks the header checked for the given eventID", func() { + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + err := r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + headerID := missingHeaders[0].Id + err = r.MarkHeaderChecked(headerID, eventID) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(2)) + }) + + It("Fails if eventID does not yet exist in check_headers table", func() { + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) + err := r.AddCheckColumn(eventID) + Expect(err).ToNot(HaveOccurred()) + + missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + + headerID := missingHeaders[0].Id + err = r.MarkHeaderChecked(headerID, "notEventId") + Expect(err).To(HaveOccurred()) + + missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID) + Expect(err).ToNot(HaveOccurred()) + Expect(len(missingHeaders)).To(Equal(3)) + }) + }) +}) diff --git a/pkg/omni/light/retriever/block_retriever_test.go b/pkg/omni/light/retriever/block_retriever_test.go index 09be7b77..563e3738 100644 --- a/pkg/omni/light/retriever/block_retriever_test.go +++ b/pkg/omni/light/retriever/block_retriever_test.go @@ -24,6 +24,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) var _ = Describe("Block Retriever", func() { @@ -43,9 +44,9 @@ var _ = Describe("Block Retriever", func() { Describe("RetrieveFirstBlock", func() { It("Retrieves block number of earliest header in the database", func() { - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) i, err := r.RetrieveFirstBlock() Expect(err).NotTo(HaveOccurred()) @@ -60,9 +61,9 @@ var _ = Describe("Block Retriever", func() { Describe("RetrieveMostRecentBlock", func() { It("Retrieves the latest header's block number", func() { - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) i, err := r.RetrieveMostRecentBlock() Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/omni/light/retriever/retriever_suite_test.go b/pkg/omni/light/retriever/retriever_suite_test.go index dc577626..cb5a7313 100644 --- a/pkg/omni/light/retriever/retriever_suite_test.go +++ b/pkg/omni/light/retriever/retriever_suite_test.go @@ -27,7 +27,7 @@ import ( func TestRetriever(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Light Retriever Suite Test") + RunSpecs(t, "Light BLock Number Retriever Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/omni/light/transformer/transformer.go b/pkg/omni/light/transformer/transformer.go index d6c17f63..26804ea7 100644 --- a/pkg/omni/light/transformer/transformer.go +++ b/pkg/omni/light/transformer/transformer.go @@ -18,7 +18,8 @@ package transformer import ( "errors" - "fmt" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers" + "strings" "github.com/ethereum/go-ethereum/common" @@ -31,12 +32,14 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller" + srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) // Requires a light synced vDB (headers) and a running eth node (or infura) type transformer struct { // Database interfaces - repository.EventRepository // Holds transformed watched event log data + srep.EventRepository // Holds transformed watched event log data repository.HeaderRepository // Interface for interaction with header repositories // Pre-processing interfaces @@ -72,14 +75,14 @@ type transformer struct { func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transformer { return &transformer{ - Poller: poller.NewPoller(bc, db), + Poller: poller.NewPoller(bc, db, types.LightSync), Fetcher: fetcher.NewFetcher(bc), Parser: parser.NewParser(network), HeaderRepository: repository.NewHeaderRepository(db), BlockRetriever: retriever.NewBlockRetriever(db), Converter: converter.NewConverter(&contract.Contract{}), Contracts: map[string]*contract.Contract{}, - EventRepository: repository.NewEventRepository(db), + EventRepository: srep.NewEventRepository(db, types.LightSync), WatchedEvents: map[string][]string{}, WantedMethods: map[string][]string{}, ContractRanges: map[string][2]int64{}, @@ -93,7 +96,7 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf // Uses parser to pull event info from abi // Use this info to generate event filters func (tr *transformer) Init() error { - + // Iterate through all internal contract addresses for contractAddr, subset := range tr.WatchedEvents { // Get Abi err := tr.Parser.Parse(contractAddr) @@ -101,7 +104,7 @@ func (tr *transformer) Init() error { return err } - // Get first block for contract and most recent block for the chain + // Get first block and most recent block number in the header repo firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock() if err != nil { return err @@ -111,7 +114,7 @@ func (tr *transformer) Init() error { return err } - // Set to specified range if it falls within the contract's bounds + // Set to specified range if it falls within the bounds if firstBlock < tr.ContractRanges[contractAddr][0] { firstBlock = tr.ContractRanges[contractAddr][0] } @@ -119,14 +122,11 @@ func (tr *transformer) Init() error { lastBlock = tr.ContractRanges[contractAddr][1] } - // Get contract name + // Get contract name if it has one var name = new(string) - err = tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock) - if err != nil { - return errors.New(fmt.Sprintf("unable to fetch contract name: %v\r\n", err)) - } + tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock) - // Remove any accidental duplicate inputs in filter addresses + // Remove any potential accidental duplicate inputs in filter addresses EventAddrs := map[string]bool{} for _, addr := range tr.EventAddrs[contractAddr] { EventAddrs[addr] = true @@ -142,6 +142,7 @@ func (tr *transformer) Init() error { Network: tr.Network, Address: contractAddr, Abi: tr.Abi(), + ParsedAbi: tr.ParsedAbi(), StartingBlock: firstBlock, LastBlock: lastBlock, Events: tr.GetEvents(subset), @@ -151,7 +152,7 @@ func (tr *transformer) Init() error { TknHolderAddrs: map[string]bool{}, } - // Store contract info for further processing + // Store contract info for execution tr.Contracts[contractAddr] = info } @@ -160,53 +161,69 @@ func (tr *transformer) Init() error { func (tr *transformer) Execute() error { if len(tr.Contracts) == 0 { - return errors.New("error: transformer has no initialized contracts to work with") + return errors.New("error: transformer has no initialized contracts") } // Iterate through all internal contracts for _, con := range tr.Contracts { - // Update converter with current contract tr.Update(con) + // Iterate through events for _, event := range con.Events { - topics := [][]common.Hash{{common.HexToHash(event.Sig())}} - eventId := event.Name + "_" + con.Address + // Filter using the event signature + topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}} + + // Generate eventID and use it to create a checked_header column if one does not already exist + eventId := strings.ToLower(event.Name + "_" + con.Address) + if err := tr.AddCheckColumn(eventId); err != nil { + return err + } + + // Find unchecked headers for this event missingHeaders, err := tr.MissingHeaders(con.StartingBlock, con.LastBlock, eventId) if err != nil { return err } + // Iterate over headers for _, header := range missingHeaders { + // And fetch event logs using the header, contract address, and topics filter logs, err := tr.FetchLogs([]string{con.Address}, topics, header) if err != nil { return err } + // Mark the header checked for this eventID and continue to next iteration if no logs are found if len(logs) < 1 { err = tr.MarkHeaderChecked(header.Id, eventId) if err != nil { return err } - continue } - for _, l := range logs { - mapping, err := tr.Convert(l, event) - if err != nil { - return err - } - if mapping == nil { - break - } + // Convert logs into custom type + convertedLogs, err := tr.Convert(logs, event, header.Id) + if err != nil { + return err + } + if len(convertedLogs) < 1 { + continue + } - err = tr.PersistLog(*mapping, con.Address, con.Name) - if err != nil { - return err - } + // If logs aren't empty, persist them + err = tr.PersistLogs(convertedLogs, event, con.Address, con.Name) + if err != nil { + return err } } } + // After persisting all watched event logs + // poller polls select contract methods + // and persists the results into custom pg tables + if err := tr.PollContract(*con); err != nil { + return err + } } return nil diff --git a/pkg/omni/light/transformer/transformer_test.go b/pkg/omni/light/transformer/transformer_test.go index 5f465b86..31a6574e 100644 --- a/pkg/omni/light/transformer/transformer_test.go +++ b/pkg/omni/light/transformer/transformer_test.go @@ -17,9 +17,7 @@ package transformer_test import ( - "math/rand" - "time" - + "fmt" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -29,6 +27,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/light/transformer" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) var _ = Describe("Transformer", func() { @@ -36,7 +35,7 @@ var _ = Describe("Transformer", func() { var err error var blockChain core.BlockChain var headerRepository repositories.HeaderRepository - rand.Seed(time.Now().UnixNano()) + var headerID int64 BeforeEach(func() { db, blockChain = test_helpers.SetupDBandBC() @@ -94,8 +93,8 @@ var _ = Describe("Transformer", func() { Describe("Init", func() { It("Initializes transformer's contract objects", func() { - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) err = t.Init() @@ -111,7 +110,7 @@ var _ = Describe("Transformer", func() { Expect(c.Address).To(Equal(constants.TusdContractAddress)) }) - It("Fails to initialize if first and most recent blocks cannot be fetched from vDB", func() { + It("Fails to initialize if first and most recent block numbers cannot be fetched from vDB headers table", func() { t := transformer.NewTransformer("", blockChain, db) t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) err = t.Init() @@ -119,8 +118,8 @@ var _ = Describe("Transformer", func() { }) It("Does nothing if watched events are unset", func() { - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1) - headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) t := transformer.NewTransformer("", blockChain, db) err = t.Init() Expect(err).ToNot(HaveOccurred()) @@ -131,6 +130,96 @@ var _ = Describe("Transformer", func() { }) Describe("Execute", func() { + BeforeEach(func() { + header1, err := blockChain.GetHeaderByNumber(6791668) + Expect(err).ToNot(HaveOccurred()) + header2, err := blockChain.GetHeaderByNumber(6791669) + Expect(err).ToNot(HaveOccurred()) + header3, err := blockChain.GetHeaderByNumber(6791670) + Expect(err).ToNot(HaveOccurred()) + headerRepository.CreateOrUpdateHeader(header1) + headerID, err = headerRepository.CreateOrUpdateHeader(header2) + Expect(err).ToNot(HaveOccurred()) + headerRepository.CreateOrUpdateHeader(header3) + }) + It("Transforms watched contract data into custom repositories", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) + + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + log := test_helpers.LightTransferLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&log) + + // We don't know vulcID, so compare individual fields instead of complete structures + Expect(log.HeaderID).To(Equal(headerID)) + Expect(log.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")) + Expect(log.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")) + Expect(log.Value).To(Equal("9998940000000000000000")) + }) + + It("Keeps track of contract-related addresses while transforming event data", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + c, ok := t.Contracts[constants.TusdContractAddress] + Expect(ok).To(Equal(true)) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + b, ok := c.TknHolderAddrs["0x1062a747393198f70F71ec65A582423Dba7E5Ab3"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = c.TknHolderAddrs["0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + _, ok = c.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843b1234567890"] + Expect(ok).To(Equal(false)) + + _, ok = c.TknHolderAddrs["0x"] + Expect(ok).To(Equal(false)) + }) + + It("Polls given methods using generated token holder address", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"}) + err = t.Init() + Expect(err).ToNot(HaveOccurred()) + + err = t.Execute() + Expect(err).ToNot(HaveOccurred()) + + res := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Balance).To(Equal("55849938025000000000000")) + Expect(res.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res) + Expect(err).To(HaveOccurred()) + }) + + It("Fails if initialization has not been done", func() { + t := transformer.NewTransformer("", blockChain, db) + t.SetEvents(constants.TusdContractAddress, []string{"Transfer"}) + t.SetMethods(constants.TusdContractAddress, nil) + + err = t.Execute() + Expect(err).To(HaveOccurred()) + }) }) }) diff --git a/pkg/omni/shared/contract/contract_test.go b/pkg/omni/shared/contract/contract_test.go index dc7a5ff6..5d9da792 100644 --- a/pkg/omni/shared/contract/contract_test.go +++ b/pkg/omni/shared/contract/contract_test.go @@ -22,6 +22,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) var _ = Describe("Contract", func() { @@ -38,11 +39,11 @@ var _ = Describe("Contract", func() { val, ok := info.Filters["Transfer"] Expect(ok).To(Equal(true)) - Expect(val).To(Equal(test_helpers.ExpectedTransferFilter)) + Expect(val).To(Equal(mocks.ExpectedTransferFilter)) val, ok = info.Filters["Approval"] Expect(ok).To(Equal(true)) - Expect(val).To(Equal(test_helpers.ExpectedApprovalFilter)) + Expect(val).To(Equal(mocks.ExpectedApprovalFilter)) val, ok = info.Filters["Mint"] Expect(ok).To(Equal(false)) diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go index 581b8aa9..65a28034 100644 --- a/pkg/omni/shared/helpers/test_helpers/database.go +++ b/pkg/omni/shared/helpers/test_helpers/database.go @@ -33,7 +33,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/geth/node" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" ) type TransferLog struct { @@ -47,6 +47,18 @@ type TransferLog struct { Value string `db:"value_"` } +type LightTransferLog struct { + Id int64 `db:"id"` + HeaderID int64 `db:"header_id"` + TokenName string `db:"token_name"` + LogIndex int64 `db:"log_idx"` + TxIndex int64 `db:"tx_idx"` + From string `db:"from_"` + To string `db:"to_"` + Value string `db:"value_"` + RawLog []byte `db:"raw_log"` +} + type BalanceOf struct { Id int64 `db:"id"` TokenName string `db:"token_name"` @@ -119,8 +131,8 @@ func SetupTusdRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string) } func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract { - p := parser.NewParser("") - err := p.Parse(constants.TusdContractAddress) + p := mocks.NewParser(constants.TusdAbiString) + err := p.Parse() Expect(err).ToNot(HaveOccurred()) return &contract.Contract{ @@ -139,25 +151,40 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract } func TearDown(db *postgres.DB) { - _, err := db.Query(`DELETE FROM blocks`) + tx, err := db.Begin() Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DELETE FROM headers`) + _, err = tx.Exec(`DELETE FROM blocks`) Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DELETE FROM checked_headers`) + _, err = tx.Exec(`DELETE FROM headers`) Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DELETE FROM logs`) + _, err = tx.Exec(`DELETE FROM checked_headers`) Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DELETE FROM transactions`) + _, err = tx.Exec(`DELETE FROM logs`) Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DELETE FROM receipts`) + _, err = tx.Exec(`DELETE FROM transactions`) Expect(err).NotTo(HaveOccurred()) - _, err = db.Query(`DROP SCHEMA IF EXISTS c0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`) + _, err = tx.Exec(`DELETE FROM receipts`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`) + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`DROP SCHEMA IF EXISTS light_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`) + Expect(err).NotTo(HaveOccurred()) + + err = tx.Commit() Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/omni/shared/helpers/test_helpers/entities.go b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go similarity index 65% rename from pkg/omni/shared/helpers/test_helpers/entities.go rename to pkg/omni/shared/helpers/test_helpers/mocks/entities.go index 5c3c2eb7..be092771 100644 --- a/pkg/omni/shared/helpers/test_helpers/entities.go +++ b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go @@ -14,11 +14,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package test_helpers +package mocks import ( "encoding/json" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" @@ -126,3 +130,44 @@ var MockHeader3 = core.Header{ Raw: rawFakeHeader, Timestamp: "50000030", } + +var MockTransferLog1 = types.Log{ + Index: 1, + Address: common.HexToAddress(constants.TusdContractAddress), + BlockNumber: 5488076, + TxIndex: 110, + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"), + Topics: []common.Hash{ + common.HexToHash(constants.TransferEvent.Signature()), + common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"), + common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"), +} + +var MockTransferLog2 = types.Log{ + Index: 3, + Address: common.HexToAddress(constants.TusdContractAddress), + BlockNumber: 5488077, + TxIndex: 2, + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"), + Topics: []common.Hash{ + common.HexToHash(constants.TransferEvent.Signature()), + common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"), + common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"), +} + +var MockMintLog = types.Log{ + Index: 10, + Address: common.HexToAddress(constants.TusdContractAddress), + BlockNumber: 548808, + TxIndex: 50, + TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6minty"), + Topics: []common.Hash{ + common.HexToHash(constants.MintEvent.Signature()), + common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"), +} diff --git a/pkg/omni/shared/helpers/mocks/parser.go b/pkg/omni/shared/helpers/test_helpers/mocks/parser.go similarity index 100% rename from pkg/omni/shared/helpers/mocks/parser.go rename to pkg/omni/shared/helpers/test_helpers/mocks/parser.go diff --git a/pkg/omni/shared/parser/parser_test.go b/pkg/omni/shared/parser/parser_test.go index 8d74593b..7631294d 100644 --- a/pkg/omni/shared/parser/parser_test.go +++ b/pkg/omni/shared/parser/parser_test.go @@ -23,7 +23,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser" ) diff --git a/pkg/omni/shared/poller/poller.go b/pkg/omni/shared/poller/poller.go index 52e0425b..bb8239e9 100644 --- a/pkg/omni/shared/poller/poller.go +++ b/pkg/omni/shared/poller/poller.go @@ -42,10 +42,10 @@ type poller struct { contract contract.Contract } -func NewPoller(blockChain core.BlockChain, db *postgres.DB) *poller { +func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *poller { return &poller{ - MethodRepository: repository.NewMethodRepository(db), + MethodRepository: repository.NewMethodRepository(db, mode), bc: blockChain, } } diff --git a/pkg/omni/shared/poller/poller_test.go b/pkg/omni/shared/poller/poller_test.go index a0ba7349..a7357a38 100644 --- a/pkg/omni/shared/poller/poller_test.go +++ b/pkg/omni/shared/poller/poller_test.go @@ -28,6 +28,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) var _ = Describe("Poller", func() { @@ -37,78 +38,80 @@ var _ = Describe("Poller", func() { var db *postgres.DB var bc core.BlockChain - BeforeEach(func() { - db, bc = test_helpers.SetupDBandBC() - p = poller.NewPoller(bc, db) - }) - AfterEach(func() { test_helpers.TearDown(db) }) - Describe("PollContract", func() { - It("Polls specified contract methods using contract's token holder address list", func() { - con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) - Expect(con.Abi).To(Equal(constants.TusdAbiString)) - con.StartingBlock = 6707322 - con.LastBlock = 6707323 - con.TknHolderAddrs = map[string]bool{ - "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true, - "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, - } - - err := p.PollContract(*con) - Expect(err).ToNot(HaveOccurred()) - - scanStruct := test_helpers.BalanceOf{} - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) - Expect(err).ToNot(HaveOccurred()) - Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) - Expect(scanStruct.TokenName).To(Equal("TrueUSD")) - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) - Expect(err).ToNot(HaveOccurred()) - Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) - Expect(scanStruct.TokenName).To(Equal("TrueUSD")) - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) - Expect(err).ToNot(HaveOccurred()) - Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) - Expect(scanStruct.TokenName).To(Equal("TrueUSD")) - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) - Expect(err).ToNot(HaveOccurred()) - Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) - Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + Describe("Full sync mode", func() { + BeforeEach(func() { + db, bc = test_helpers.SetupDBandBC() + p = poller.NewPoller(bc, db, types.FullSync) }) - It("Does not poll and persist any methods if none are specified", func() { - con = test_helpers.SetupTusdContract(nil, nil) - Expect(con.Abi).To(Equal(constants.TusdAbiString)) - con.StartingBlock = 6707322 - con.LastBlock = 6707323 - con.TknHolderAddrs = map[string]bool{ - "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true, - "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, - } + Describe("PollContract", func() { + It("Polls specified contract methods using contract's token holder address list", func() { + con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"}) + Expect(con.Abi).To(Equal(constants.TusdAbiString)) + con.StartingBlock = 6707322 + con.LastBlock = 6707323 + con.TknHolderAddrs = map[string]bool{ + "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true, + "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, + } - err := p.PollContract(*con) - Expect(err).ToNot(HaveOccurred()) + err := p.PollContract(*con) + Expect(err).ToNot(HaveOccurred()) - scanStruct := test_helpers.BalanceOf{} + scanStruct := test_helpers.BalanceOf{} - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) - Expect(err).To(HaveOccurred()) + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("66386309548896882859581786")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).ToNot(HaveOccurred()) + Expect(scanStruct.Balance).To(Equal("17982350181394112023885864")) + Expect(scanStruct.TokenName).To(Equal("TrueUSD")) + }) + + It("Does not poll and persist any methods if none are specified", func() { + con = test_helpers.SetupTusdContract(nil, nil) + Expect(con.Abi).To(Equal(constants.TusdAbiString)) + con.StartingBlock = 6707322 + con.LastBlock = 6707323 + con.TknHolderAddrs = map[string]bool{ + "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true, + "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true, + } + + err := p.PollContract(*con) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct) + Expect(err).To(HaveOccurred()) + }) }) - }) - Describe("PollMethod", func() { - It("Polls a single contract method", func() { - var name = new(string) - err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) - Expect(err).ToNot(HaveOccurred()) - Expect(*name).To(Equal("TrueUSD")) + Describe("PollMethod", func() { + It("Polls a single contract method", func() { + var name = new(string) + err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514) + Expect(err).ToNot(HaveOccurred()) + Expect(*name).To(Equal("TrueUSD")) + }) }) }) }) diff --git a/pkg/omni/shared/repository/event_repository.go b/pkg/omni/shared/repository/event_repository.go new file mode 100644 index 00000000..b733bcb9 --- /dev/null +++ b/pkg/omni/shared/repository/event_repository.go @@ -0,0 +1,316 @@ +// VulcanizeDB +// Copyright © 2018 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repository + +import ( + "errors" + "fmt" + "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository" + "strings" + + "github.com/hashicorp/golang-lru" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" +) + +const ( + // Number of contract address and method ids to keep in cache + contractCacheSize = 100 + eventChacheSize = 1000 +) + +// Event repository is used to persist event data into custom tables +type EventRepository interface { + PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error + CreateEventTable(contractAddr string, event types.Event) (bool, error) + CreateContractSchema(contractName string) (bool, error) + CheckSchemaCache(key string) (interface{}, bool) + CheckTableCache(key string) (interface{}, bool) +} + +type eventRepository struct { + db *postgres.DB + mode types.Mode + schemas *lru.Cache // Cache names of recently used schemas to minimize db connections + tables *lru.Cache // Cache names of recently used tables to minimize db connections +} + +func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository { + ccs, _ := lru.New(contractCacheSize) + ecs, _ := lru.New(eventChacheSize) + return &eventRepository{ + db: db, + mode: mode, + schemas: ccs, + tables: ecs, + } +} + +// Creates a schema for the contract if needed +// Creates table for the watched contract event if needed +// Persists converted event log data into this custom table +func (r *eventRepository) PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { + if logs == nil { + return errors.New("event repository error: passed a nil log slice") + } + + if len(logs) == 0 { + return errors.New("event repository error: passed an empty log slice") + } + _, err := r.CreateContractSchema(contractAddr) + if err != nil { + return err + } + + _, err = r.CreateEventTable(contractAddr, eventInfo) + if err != nil { + return err + } + + return r.persistLogs(logs, eventInfo, contractAddr, contractName) +} + +func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { + var err error + switch r.mode { + case types.LightSync: + err = r.persistLightSyncLogs(logs, eventInfo, contractAddr, contractName) + case types.FullSync: + err = r.persistFullSyncLogs(logs, eventInfo, contractAddr, contractName) + default: + return errors.New("event repository error: unhandled mode") + } + + return err +} + +// Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB) +func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + + for _, event := range logs { + // Begin pg query string + pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name)) + pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx" + el := len(event.Values) + + // Pack the corresponding variables in a slice + data := make([]interface{}, 0, 5+el) + data = append(data, + event.Id, + contractName, + event.Raw, + event.LogIndex, + event.TransactionIndex) + + // Iterate over inputs and append name to query string and value to input data + for inputName, input := range event.Values { + pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words + data = append(data, input) + } + + // For each input entry we created we add its postgres command variable to the string + pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5" + for i := 0; i < el; i++ { + pgStr = pgStr + fmt.Sprintf(", $%d", i+6) + } + pgStr = pgStr + ")" + + _, err = tx.Exec(pgStr, data...) + if err != nil { + tx.Rollback() + return err + } + } + + eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr) + err = repository.MarkHeaderCheckedInTransaction(logs[0].Id, tx, eventId) + if err != nil { + tx.Rollback() + return err + } + + return tx.Commit() +} + +// Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB) +func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { + tx, err := r.db.Begin() + if err != nil { + return err + } + + for _, event := range logs { + pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name)) + pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx" + el := len(event.Values) + + data := make([]interface{}, 0, 4+el) + data = append(data, + event.Id, + contractName, + event.Block, + event.Tx) + + for inputName, input := range event.Values { + pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) + data = append(data, input) + } + + pgStr = pgStr + ") VALUES ($1, $2, $3, $4" + for i := 0; i < el; i++ { + pgStr = pgStr + fmt.Sprintf(", $%d", i+5) + } + pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" + + _, err = tx.Exec(pgStr, data...) + if err != nil { + tx.Rollback() + return err + } + } + + return tx.Commit() +} + +// Checks for event table and creates it if it does not already exist +// Returns true if it created a new table; returns false if table already existed +func (r *eventRepository) CreateEventTable(contractAddr string, event types.Event) (bool, error) { + tableID := fmt.Sprintf("%s_%s.%s_event", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(event.Name)) + + // Check cache before querying pq to see if table exists + _, ok := r.tables.Get(tableID) + if ok { + return false, nil + } + tableExists, err := r.checkForTable(contractAddr, event.Name) + if err != nil { + return false, err + } + + if !tableExists { + err = r.newEventTable(tableID, event) + if err != nil { + return false, err + } + } + + // Add table id to cache + r.tables.Add(tableID, true) + + return !tableExists, nil +} + +// Creates a table for the given contract and event +func (r *eventRepository) newEventTable(tableID string, event types.Event) error { + // Begin pg string + var pgStr = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID) + var err error + + // Handle different modes + switch r.mode { + case types.FullSync: + pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL," + + // Iterate over event fields, using their name and pgType to grow the string + for _, field := range event.Fields { + pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType) + } + pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)" + case types.LightSync: + pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL," + + for _, field := range event.Fields { + pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType) + } + pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))" + default: + return errors.New("unhandled repository mode") + } + + _, err = r.db.Exec(pgStr) + + return err +} + +// Checks if a table already exists for the given contract and event +func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) { + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_event')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventName)) + + var exists bool + err := r.db.Get(&exists, pgStr) + + return exists, err +} + +// Checks for contract schema and creates it if it does not already exist +// Returns true if it created a new schema; returns false if schema already existed +func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) { + if contractAddr == "" { + return false, errors.New("error: no contract address specified") + } + + // Check cache before querying pq to see if schema exists + _, ok := r.schemas.Get(contractAddr) + if ok { + return false, nil + } + schemaExists, err := r.checkForSchema(contractAddr) + if err != nil { + return false, err + } + if !schemaExists { + err = r.newContractSchema(contractAddr) + if err != nil { + return false, err + } + } + + // Add schema name to cache + r.schemas.Add(contractAddr, true) + + return !schemaExists, nil +} + +// Creates a schema for the given contract +func (r *eventRepository) newContractSchema(contractAddr string) error { + _, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr)) + + return err +} + +// Checks if a schema already exists for the given contract +func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) { + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr)) + + var exists bool + err := r.db.QueryRow(pgStr).Scan(&exists) + + return exists, err +} + +func (r *eventRepository) CheckSchemaCache(key string) (interface{}, bool) { + return r.schemas.Get(key) +} + +func (r *eventRepository) CheckTableCache(key string) (interface{}, bool) { + return r.tables.Get(key) +} diff --git a/pkg/omni/shared/repository/event_repository_test.go b/pkg/omni/shared/repository/event_repository_test.go new file mode 100644 index 00000000..6c90711d --- /dev/null +++ b/pkg/omni/shared/repository/event_repository_test.go @@ -0,0 +1,349 @@ +// VulcanizeDB +// Copyright © 2018 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repository_test + +import ( + "encoding/json" + "fmt" + "strings" + + geth "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + fc "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter" + lc "github.com/vulcanize/vulcanizedb/pkg/omni/light/converter" + lr "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks" + sr "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" +) + +var _ = Describe("Repository", func() { + var db *postgres.DB + var dataStore sr.EventRepository + var err error + var log *types.Log + var logs []types.Log + var con *contract.Contract + var vulcanizeLogId int64 + var wantedEvents = []string{"Transfer"} + var event types.Event + var headerID int64 + var mockEvent = mocks.MockTranferEvent + var mockLog1 = mocks.MockTransferLog1 + var mockLog2 = mocks.MockTransferLog2 + + BeforeEach(func() { + db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{}) + mockEvent.LogID = vulcanizeLogId + + event = con.Events["Transfer"] + err = con.GenerateFilters() + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + test_helpers.TearDown(db) + }) + + Describe("Full sync mode", func() { + BeforeEach(func() { + dataStore = sr.NewEventRepository(db, types.FullSync) + }) + + Describe("CreateContractSchema", func() { + It("Creates schema if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + _, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(false)) + + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("CreateEventTable", func() { + It("Creates table if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + tableID := fmt.Sprintf("%s_%s.%s_event", types.FullSync, strings.ToLower(con.Address), strings.ToLower(event.Name)) + _, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(false)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("PersistLogs", func() { + BeforeEach(func() { + c := fc.NewConverter(con) + log, err = c.Convert(mockEvent, event) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() { + err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"] + Expect(ok).To(Equal(true)) + Expect(b).To(Equal(true)) + + scanLog := test_helpers.TransferLog{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) + Expect(err).ToNot(HaveOccurred()) + expectedLog := test_helpers.TransferLog{ + Id: 1, + VulvanizeLogId: vulcanizeLogId, + TokenName: "TrueUSD", + Block: 5488076, + Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", + From: "0x000000000000000000000000000000000000Af21", + To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391", + Value: "1097077688018008265106216665536940668749033598146", + } + Expect(scanLog).To(Equal(expectedLog)) + }) + + It("Doesn't persist duplicate event logs", func() { + // Try to persist the same log twice in a single call + err = dataStore.PersistLogs([]types.Log{*log, *log}, event, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + scanLog := test_helpers.TransferLog{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog) + Expect(err).ToNot(HaveOccurred()) + expectedLog := test_helpers.TransferLog{ + Id: 1, + VulvanizeLogId: vulcanizeLogId, + TokenName: "TrueUSD", + Block: 5488076, + Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", + From: "0x000000000000000000000000000000000000Af21", + To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391", + Value: "1097077688018008265106216665536940668749033598146", + } + Expect(scanLog).To(Equal(expectedLog)) + + // Attempt to persist the same log again in seperate call + err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + // Show that no new logs were entered + var count int + err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM full_%s.transfer_event", constants.TusdContractAddress)) + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(1)) + }) + + It("Fails with empty log", func() { + err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Describe("Light sync mode", func() { + BeforeEach(func() { + dataStore = sr.NewEventRepository(db, types.LightSync) + }) + + Describe("CreateContractSchema", func() { + It("Creates schema if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + _, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(false)) + + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + + It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + tableID := fmt.Sprintf("%s_%s.%s_event", types.LightSync, strings.ToLower(con.Address), strings.ToLower(event.Name)) + _, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(false)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("CreateEventTable", func() { + It("Creates table if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateEventTable(con.Address, event) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + }) + + Describe("PersistLogs", func() { + BeforeEach(func() { + headerRepository := repositories.NewHeaderRepository(db) + headerID, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) + Expect(err).ToNot(HaveOccurred()) + c := lc.NewConverter(con) + logs, err = c.Convert([]geth.Log{mockLog1, mockLog2}, event, headerID) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Persists contract event log values into custom tables", func() { + hr := lr.NewHeaderRepository(db) + err = hr.AddCheckColumn(event.Name + "_" + con.Address) + Expect(err).ToNot(HaveOccurred()) + + err = dataStore.PersistLogs(logs, event, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + var count int + err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress)) + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(2)) + + scanLog := test_helpers.LightTransferLog{} + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event LIMIT 1", constants.TusdContractAddress)).StructScan(&scanLog) + Expect(err).ToNot(HaveOccurred()) + Expect(scanLog.HeaderID).To(Equal(headerID)) + Expect(scanLog.TokenName).To(Equal("TrueUSD")) + Expect(scanLog.TxIndex).To(Equal(int64(110))) + Expect(scanLog.LogIndex).To(Equal(int64(1))) + Expect(scanLog.From).To(Equal("0x000000000000000000000000000000000000Af21")) + Expect(scanLog.To).To(Equal("0x09BbBBE21a5975cAc061D82f7b843bCE061BA391")) + Expect(scanLog.Value).To(Equal("1097077688018008265106216665536940668749033598146")) + + var expectedRawLog, rawLog geth.Log + err = json.Unmarshal(logs[0].Raw, &expectedRawLog) + Expect(err).ToNot(HaveOccurred()) + err = json.Unmarshal(scanLog.RawLog, &rawLog) + Expect(err).ToNot(HaveOccurred()) + Expect(rawLog).To(Equal(expectedRawLog)) + }) + + It("Doesn't persist duplicate event logs", func() { + hr := lr.NewHeaderRepository(db) + err = hr.AddCheckColumn(event.Name + "_" + con.Address) + Expect(err).ToNot(HaveOccurred()) + + // Try and fail to persist the same log twice in a single call + err = dataStore.PersistLogs([]types.Log{logs[0], logs[0]}, event, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + + // Successfuly persist the two unique logs + err = dataStore.PersistLogs(logs, event, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + // Try and fail to persist the same logs again in separate call + err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + + // Show that no new logs were entered + var count int + err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress)) + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(2)) + }) + + It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() { + err = dataStore.PersistLogs(logs, event, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) + + It("Fails with empty log", func() { + err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) + }) + }) +}) diff --git a/pkg/omni/shared/repository/method_repository.go b/pkg/omni/shared/repository/method_repository.go index ede488d8..101ffaa2 100644 --- a/pkg/omni/shared/repository/method_repository.go +++ b/pkg/omni/shared/repository/method_repository.go @@ -21,28 +21,41 @@ import ( "fmt" "strings" + "github.com/hashicorp/golang-lru" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) +const methodCacheSize = 1000 + type MethodRepository interface { PersistResult(method types.Result, contractAddr, contractName string) error CreateMethodTable(contractAddr string, method types.Result) (bool, error) CreateContractSchema(contractAddr string) (bool, error) + CheckSchemaCache(key string) (interface{}, bool) + CheckTableCache(key string) (interface{}, bool) } type methodRepository struct { *postgres.DB + mode types.Mode + schemas *lru.Cache // Cache names of recently used schemas to minimize db connections + tables *lru.Cache // Cache names of recently used tables to minimize db connections } -func NewMethodRepository(db *postgres.DB) *methodRepository { - +func NewMethodRepository(db *postgres.DB, mode types.Mode) *methodRepository { + ccs, _ := lru.New(contractCacheSize) + mcs, _ := lru.New(methodCacheSize) return &methodRepository{ - DB: db, + DB: db, + mode: mode, + schemas: ccs, + tables: mcs, } } -func (d *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error { +func (r *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error { if len(method.Args) != len(method.Inputs) { return errors.New("error: given number of inputs does not match number of method arguments") } @@ -50,51 +63,48 @@ func (d *methodRepository) PersistResult(method types.Result, contractAddr, cont return errors.New("error: given number of outputs does not match number of method return values") } - _, err := d.CreateContractSchema(contractAddr) + _, err := r.CreateContractSchema(contractAddr) if err != nil { return err } - _, err = d.CreateMethodTable(contractAddr, method) + _, err = r.CreateMethodTable(contractAddr, method) if err != nil { return err } - return d.persistResult(method, contractAddr, contractName) + return r.persistResult(method, contractAddr, contractName) } // Creates a custom postgres command to persist logs for the given event -func (d *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error { +func (r *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error { // Begin postgres string - pgStr := fmt.Sprintf("INSERT INTO c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name)) + pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_method ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name)) pgStr = pgStr + "(token_name, block" + ml := len(method.Args) - // Pack the corresponding variables in a slice - var data []interface{} + // Preallocate slice of needed size and proceed to pack variables into it in same order they appear in string + data := make([]interface{}, 0, 3+ml) data = append(data, contractName, method.Block) // Iterate over method args and return value, adding names // to the string and pushing values to the slice - counter := 0 // Keep track of number of inputs for i, arg := range method.Args { - counter += 1 pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(arg.Name)) // Add underscore after to avoid any collisions with reserved pg words data = append(data, method.Inputs[i]) } - - counter += 1 pgStr = pgStr + ", returned) VALUES ($1, $2" data = append(data, method.Output) // For each input entry we created we add its postgres command variable to the string - for i := 0; i < counter; i++ { + for i := 0; i <= ml; i++ { pgStr = pgStr + fmt.Sprintf(", $%d", i+3) } pgStr = pgStr + ")" - _, err := d.DB.Exec(pgStr, data...) + _, err := r.DB.Exec(pgStr, data...) if err != nil { return err } @@ -103,26 +113,35 @@ func (d *methodRepository) persistResult(method types.Result, contractAddr, cont } // Checks for event table and creates it if it does not already exist -func (d *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) { - tableExists, err := d.checkForTable(contractAddr, method.Name) +func (r *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) { + tableID := fmt.Sprintf("%s_%s.%s_method", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name)) + + // Check cache before querying pq to see if table exists + _, ok := r.tables.Get(tableID) + if ok { + return false, nil + } + tableExists, err := r.checkForTable(contractAddr, method.Name) if err != nil { return false, err } - if !tableExists { - err = d.newMethodTable(contractAddr, method) + err = r.newMethodTable(tableID, method) if err != nil { return false, err } } + // Add schema name to cache + r.tables.Add(tableID, true) + return !tableExists, nil } // Creates a table for the given contract and event -func (d *methodRepository) newMethodTable(contractAddr string, method types.Result) error { +func (r *methodRepository) newMethodTable(tableID string, method types.Result) error { // Begin pg string - pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name)) + pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID) pgStr = pgStr + "(id SERIAL, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL," // Iterate over method inputs and outputs, using their name and pgType to grow the string @@ -132,54 +151,69 @@ func (d *methodRepository) newMethodTable(contractAddr string, method types.Resu pgStr = pgStr + fmt.Sprintf(" returned %s NOT NULL)", method.Return[0].PgType) - _, err := d.DB.Exec(pgStr) + _, err := r.DB.Exec(pgStr) return err } // Checks if a table already exists for the given contract and event -func (d *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_method')", strings.ToLower(contractAddr), strings.ToLower(methodName)) +func (r *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) { + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_method')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(methodName)) var exists bool - err := d.DB.Get(&exists, pgStr) + err := r.DB.Get(&exists, pgStr) return exists, err } // Checks for contract schema and creates it if it does not already exist -func (d *methodRepository) CreateContractSchema(contractAddr string) (bool, error) { +func (r *methodRepository) CreateContractSchema(contractAddr string) (bool, error) { if contractAddr == "" { return false, errors.New("error: no contract address specified") } - schemaExists, err := d.checkForSchema(contractAddr) + // Check cache before querying pq to see if schema exists + _, ok := r.schemas.Get(contractAddr) + if ok { + return false, nil + } + schemaExists, err := r.checkForSchema(contractAddr) if err != nil { return false, err } - if !schemaExists { - err = d.newContractSchema(contractAddr) + err = r.newContractSchema(contractAddr) if err != nil { return false, err } } + // Add schema name to cache + r.schemas.Add(contractAddr, true) + return !schemaExists, nil } // Creates a schema for the given contract -func (d *methodRepository) newContractSchema(contractAddr string) error { - _, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr)) +func (r *methodRepository) newContractSchema(contractAddr string) error { + _, err := r.DB.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr)) return err } // Checks if a schema already exists for the given contract -func (d *methodRepository) checkForSchema(contractAddr string) (bool, error) { - pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr)) +func (r *methodRepository) checkForSchema(contractAddr string) (bool, error) { + pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr)) var exists bool - err := d.DB.Get(&exists, pgStr) + err := r.DB.Get(&exists, pgStr) return exists, err } + +func (r *methodRepository) CheckSchemaCache(key string) (interface{}, bool) { + return r.schemas.Get(key) +} + +func (r *methodRepository) CheckTableCache(key string) (interface{}, bool) { + return r.tables.Get(key) +} diff --git a/pkg/omni/shared/repository/method_repository_test.go b/pkg/omni/shared/repository/method_repository_test.go index 80c93e53..e8bc22bf 100644 --- a/pkg/omni/shared/repository/method_repository_test.go +++ b/pkg/omni/shared/repository/method_repository_test.go @@ -18,6 +18,7 @@ package repository_test import ( "fmt" + "strings" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -36,10 +37,11 @@ var _ = Describe("Repository", func() { var con *contract.Contract var err error var mockResult types.Result + var method types.Method BeforeEach(func() { con = test_helpers.SetupTusdContract([]string{}, []string{"balanceOf"}) - method := con.Methods["balanceOf"] + method = con.Methods["balanceOf"] mockResult = types.Result{ Method: method, PgType: method.Return[0].PgType, @@ -50,62 +52,188 @@ var _ = Describe("Repository", func() { mockResult.Inputs[0] = "0xfE9e8709d3215310075d67E3ed32A380CCf451C8" mockResult.Output = "66386309548896882859581786" db, _ = test_helpers.SetupDBandBC() - dataStore = repository.NewMethodRepository(db) + dataStore = repository.NewMethodRepository(db, types.FullSync) }) AfterEach(func() { test_helpers.TearDown(db) }) - Describe("CreateContractSchema", func() { - It("Creates schema if it doesn't exist", func() { - created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) + Describe("Full Sync Mode", func() { + BeforeEach(func() { + dataStore = repository.NewMethodRepository(db, types.FullSync) + }) - created, err = dataStore.CreateContractSchema(constants.TusdContractAddress) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(false)) + Describe("CreateContractSchema", func() { + It("Creates schema if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + _, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(false)) + + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("CreateMethodTable", func() { + It("Creates table if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + tableID := fmt.Sprintf("%s_%s.%s_method", types.FullSync, strings.ToLower(con.Address), strings.ToLower(method.Name)) + _, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(false)) + + created, err = dataStore.CreateMethodTable(con.Address, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("PersistResult", func() { + It("Persists result from method polling in custom pg table", func() { + err = dataStore.PersistResult(mockResult, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct) + expectedLog := test_helpers.BalanceOf{ + Id: 1, + TokenName: "TrueUSD", + Block: 6707323, + Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8", + Balance: "66386309548896882859581786", + } + Expect(scanStruct).To(Equal(expectedLog)) + }) + + It("Fails with empty result", func() { + err = dataStore.PersistResult(types.Result{}, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) }) }) - Describe("CreateMethodTable", func() { - It("Creates table if it doesn't exist", func() { - created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) - - created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(true)) - - created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) - Expect(err).ToNot(HaveOccurred()) - Expect(created).To(Equal(false)) - }) - }) - - Describe("PersistResult", func() { - It("Persists result from method polling in custom pg table", func() { - err = dataStore.PersistResult(mockResult, con.Address, con.Name) - Expect(err).ToNot(HaveOccurred()) - - scanStruct := test_helpers.BalanceOf{} - - err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct) - expectedLog := test_helpers.BalanceOf{ - Id: 1, - TokenName: "TrueUSD", - Block: 6707323, - Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8", - Balance: "66386309548896882859581786", - } - Expect(scanStruct).To(Equal(expectedLog)) + Describe("Light Sync Mode", func() { + BeforeEach(func() { + dataStore = repository.NewMethodRepository(db, types.LightSync) }) - It("Fails with empty result", func() { - err = dataStore.PersistResult(types.Result{}, con.Address, con.Name) - Expect(err).To(HaveOccurred()) + Describe("CreateContractSchema", func() { + It("Creates schema if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + _, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(false)) + + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckSchemaCache(con.Address) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("CreateMethodTable", func() { + It("Creates table if it doesn't exist", func() { + created, err := dataStore.CreateContractSchema(constants.TusdContractAddress) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(false)) + }) + + It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() { + created, err := dataStore.CreateContractSchema(con.Address) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + tableID := fmt.Sprintf("%s_%s.%s_method", types.LightSync, strings.ToLower(con.Address), strings.ToLower(method.Name)) + _, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(false)) + + created, err = dataStore.CreateMethodTable(con.Address, mockResult) + Expect(err).ToNot(HaveOccurred()) + Expect(created).To(Equal(true)) + + v, ok := dataStore.CheckTableCache(tableID) + Expect(ok).To(Equal(true)) + Expect(v).To(Equal(true)) + }) + }) + + Describe("PersistResult", func() { + It("Persists result from method polling in custom pg table for light sync mode vDB", func() { + err = dataStore.PersistResult(mockResult, con.Address, con.Name) + Expect(err).ToNot(HaveOccurred()) + + scanStruct := test_helpers.BalanceOf{} + + err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct) + expectedLog := test_helpers.BalanceOf{ + Id: 1, + TokenName: "TrueUSD", + Block: 6707323, + Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8", + Balance: "66386309548896882859581786", + } + Expect(scanStruct).To(Equal(expectedLog)) + }) + + It("Fails with empty result", func() { + err = dataStore.PersistResult(types.Result{}, con.Address, con.Name) + Expect(err).To(HaveOccurred()) + }) }) }) }) diff --git a/pkg/omni/full/retriever/address_retriever.go b/pkg/omni/shared/retriever/address_retriever.go similarity index 86% rename from pkg/omni/full/retriever/address_retriever.go rename to pkg/omni/shared/retriever/address_retriever.go index 40609279..f56d448f 100644 --- a/pkg/omni/full/retriever/address_retriever.go +++ b/pkg/omni/shared/retriever/address_retriever.go @@ -18,6 +18,7 @@ package retriever import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" "strings" "github.com/ethereum/go-ethereum/accounts/abi" @@ -28,19 +29,19 @@ import ( ) // Address retriever is used to retrieve the addresses associated with a contract -// It requires a vDB synced database with blocks, transactions, receipts, logs, -// AND all of the targeted events persisted type AddressRetriever interface { RetrieveTokenHolderAddresses(info contract.Contract) (map[common.Address]bool, error) } type addressRetriever struct { - db *postgres.DB + db *postgres.DB + mode types.Mode } -func NewAddressRetriever(db *postgres.DB) (r *addressRetriever) { +func NewAddressRetriever(db *postgres.DB, mode types.Mode) (r *addressRetriever) { return &addressRetriever{ - db: db, + db: db, + mode: mode, } } @@ -84,7 +85,7 @@ func (r *addressRetriever) retrieveTransferAddresses(con contract.Contract) ([]s if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses addrs := make([]string, 0) - pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) + pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name)) err := r.db.Select(&addrs, pgStr) if err != nil { return []string{}, err @@ -105,7 +106,7 @@ func (r *addressRetriever) retrieveTokenMintees(con contract.Contract) ([]string if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses addrs := make([]string, 0) - pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name)) + pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name)) err := r.db.Select(&addrs, pgStr) if err != nil { return []string{}, err diff --git a/pkg/omni/full/retriever/address_retriever_test.go b/pkg/omni/shared/retriever/address_retriever_test.go similarity index 92% rename from pkg/omni/full/retriever/address_retriever_test.go rename to pkg/omni/shared/retriever/address_retriever_test.go index daa91d50..48f45531 100644 --- a/pkg/omni/full/retriever/address_retriever_test.go +++ b/pkg/omni/shared/retriever/address_retriever_test.go @@ -20,16 +20,16 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository" - "github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract" "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/retriever" + "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types" ) var mockEvent = core.WatchedEvent{ @@ -68,11 +68,11 @@ var _ = Describe("Address Retriever Test", func() { log, err = c.Convert(mockEvent, event) Expect(err).ToNot(HaveOccurred()) - dataStore = repository.NewEventRepository(db) - dataStore.PersistLog(*log, info.Address, info.Name) + dataStore = repository.NewEventRepository(db, types.FullSync) + dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name) Expect(err).ToNot(HaveOccurred()) - r = retriever.NewAddressRetriever(db) + r = retriever.NewAddressRetriever(db, types.FullSync) }) AfterEach(func() { diff --git a/pkg/omni/full/repository/repository_suite_test.go b/pkg/omni/shared/retriever/retriever_suite_test.go similarity index 89% rename from pkg/omni/full/repository/repository_suite_test.go rename to pkg/omni/shared/retriever/retriever_suite_test.go index 899f5b04..b46c31fd 100644 --- a/pkg/omni/full/repository/repository_suite_test.go +++ b/pkg/omni/shared/retriever/retriever_suite_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package repository_test +package retriever_test import ( "io/ioutil" @@ -25,9 +25,9 @@ import ( . "github.com/onsi/gomega" ) -func TestRepository(t *testing.T) { +func TestRetriever(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Full Repository Suite Test") + RunSpecs(t, "Address Retriever Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/omni/shared/types/event.go b/pkg/omni/shared/types/event.go index a2b1da84..5a80bb97 100644 --- a/pkg/omni/shared/types/event.go +++ b/pkg/omni/shared/types/event.go @@ -21,7 +21,6 @@ import ( "strings" "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/i-norden/go-ethereum/core/types" ) type Event struct { @@ -37,7 +36,6 @@ type Field struct { // Struct to hold instance of an event log data type Log struct { - Event Id int64 // VulcanizeIdLog for full sync and header ID for light sync omni watcher Values map[string]string // Map of event input names to their values @@ -48,7 +46,7 @@ type Log struct { // Used for lightSync only LogIndex uint TransactionIndex uint - Raw types.Log + Raw []byte // json.Unmarshalled byte array of geth/core/types.Log{} } // Unpack abi.Event into our custom Event struct diff --git a/pkg/omni/shared/types/mode.go b/pkg/omni/shared/types/mode.go new file mode 100644 index 00000000..71e5c531 --- /dev/null +++ b/pkg/omni/shared/types/mode.go @@ -0,0 +1,64 @@ +// VulcanizeDB +// Copyright © 2018 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package types + +import "fmt" + +type Mode int + +const ( + LightSync Mode = iota + FullSync +) + +func (mode Mode) IsValid() bool { + return mode >= LightSync && mode <= FullSync +} + +func (mode Mode) String() string { + switch mode { + case LightSync: + return "light" + case FullSync: + return "full" + default: + return "unknown" + } +} + +func (mode Mode) MarshalText() ([]byte, error) { + switch mode { + case LightSync: + return []byte("light"), nil + case FullSync: + return []byte("full"), nil + default: + return nil, fmt.Errorf("omni watcher: unknown mode %d, want LightSync or FullSync", mode) + } +} + +func (mode *Mode) UnmarshalText(text []byte) error { + switch string(text) { + case "light": + *mode = LightSync + case "full": + *mode = FullSync + default: + return fmt.Errorf(`omni watcher: unknown mode %q, want "light" or "full"`, text) + } + return nil +}