diff --git a/README.md b/README.md
index 2ccd9df8..414c06c2 100644
--- a/README.md
+++ b/README.md
@@ -9,10 +9,10 @@
Vulcanize DB is a set of tools that make it easier for developers to write application-specific indexes and caches for dapps built on Ethereum.
## Dependencies
- - Go 1.9+
+ - Go 1.11+
- Postgres 10
- Ethereum Node
- - [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8+)
+ - [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8.18+)
- [Parity 1.8.11+](https://github.com/paritytech/parity/releases)
## Project Setup
diff --git a/cmd/lightOmniWatcher.go b/cmd/lightOmniWatcher.go
index 4aae5bf3..199eaed2 100644
--- a/cmd/lightOmniWatcher.go
+++ b/cmd/lightOmniWatcher.go
@@ -17,11 +17,8 @@
package cmd
import (
- "bufio"
"fmt"
"log"
- "os"
- "strings"
"time"
"github.com/spf13/cobra"
@@ -59,29 +56,6 @@ func lightOmniWatcher() {
log.Fatal("Contract address required")
}
- if len(contractEvents) == 0 || len(contractMethods) == 0 {
- var str string
- for str != "y" {
- reader := bufio.NewReader(os.Stdin)
- if len(contractEvents) == 0 && len(contractMethods) == 0 {
- fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ")
- } else if len(contractEvents) == 0 {
- fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ")
- } else {
- fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ")
- }
- resp, err := reader.ReadBytes('\n')
- if err != nil {
- log.Fatal(err)
- }
-
- str = strings.ToLower(string(resp))
- if str == "n" {
- return
- }
- }
- }
-
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@@ -114,13 +88,13 @@ func lightOmniWatcher() {
func init() {
rootCmd.AddCommand(lightOmniWatcherCmd)
- omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for")
- omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address")
- omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched")
- omniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled")
- omniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses")
- omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
- omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
- omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
- omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
+ lightOmniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for")
+ lightOmniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address")
+ lightOmniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched")
+ lightOmniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled")
+ lightOmniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses")
+ lightOmniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
+ lightOmniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
+ lightOmniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
+ lightOmniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
}
diff --git a/cmd/omniWatcher.go b/cmd/omniWatcher.go
index 8c30268a..566b6672 100644
--- a/cmd/omniWatcher.go
+++ b/cmd/omniWatcher.go
@@ -17,11 +17,8 @@
package cmd
import (
- "bufio"
"fmt"
"log"
- "os"
- "strings"
"time"
"github.com/spf13/cobra"
@@ -59,29 +56,6 @@ func omniWatcher() {
log.Fatal("Contract address required")
}
- if len(contractEvents) == 0 || len(contractMethods) == 0 {
- var str string
- for str != "y" {
- reader := bufio.NewReader(os.Stdin)
- if len(contractEvents) == 0 && len(contractMethods) == 0 {
- fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ")
- } else if len(contractEvents) == 0 {
- fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ")
- } else {
- fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ")
- }
- resp, err := reader.ReadBytes('\n')
- if err != nil {
- log.Fatal(err)
- }
-
- str = strings.ToLower(string(resp))
- if str == "n" {
- return
- }
- }
- }
-
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@@ -122,5 +96,5 @@ func init() {
omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
- omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
+ omniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
}
diff --git a/db/migrations/1532468317_create_checked_headers_table.up.sql b/db/migrations/1532468317_create_checked_headers_table.up.sql
index 6a39e0dd..638d759c 100644
--- a/db/migrations/1532468317_create_checked_headers_table.up.sql
+++ b/db/migrations/1532468317_create_checked_headers_table.up.sql
@@ -1,5 +1,4 @@
CREATE TABLE public.checked_headers (
id SERIAL PRIMARY KEY,
- header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
- price_feeds_checked BOOLEAN NOT NULL DEFAULT FALSE
+ header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE
);
\ No newline at end of file
diff --git a/pkg/omni/.DS_Store b/pkg/omni/.DS_Store
deleted file mode 100644
index 1150c6cb..00000000
Binary files a/pkg/omni/.DS_Store and /dev/null differ
diff --git a/pkg/omni/full/converter/converter.go b/pkg/omni/full/converter/converter.go
index 69b0b629..547acbff 100644
--- a/pkg/omni/full/converter/converter.go
+++ b/pkg/omni/full/converter/converter.go
@@ -74,22 +74,25 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (
// Postgres cannot handle custom types, resolve to strings
switch input.(type) {
case *big.Int:
- var b *big.Int
- b = input.(*big.Int)
+ b := input.(*big.Int)
strValues[fieldName] = b.String()
case common.Address:
- var a common.Address
- a = input.(common.Address)
+ a := input.(common.Address)
strValues[fieldName] = a.String()
c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses
case common.Hash:
- var h common.Hash
- h = input.(common.Hash)
+ h := input.(common.Hash)
strValues[fieldName] = h.String()
case string:
strValues[fieldName] = input.(string)
case bool:
strValues[fieldName] = strconv.FormatBool(input.(bool))
+ case []byte:
+ b := input.([]byte)
+ strValues[fieldName] = string(b)
+ case byte:
+ b := input.(byte)
+ strValues[fieldName] = string(b)
default:
return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input))
}
@@ -98,7 +101,6 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (
// Only hold onto logs that pass our address filter, if any
if c.ContractInfo.PassesEventFilter(strValues) {
eventLog := &types.Log{
- Event: event,
Id: watchedEvent.LogID,
Values: strValues,
Block: watchedEvent.BlockNumber,
diff --git a/pkg/omni/full/converter/converter_test.go b/pkg/omni/full/converter/converter_test.go
index 7ce0c00b..c5bfc3e7 100644
--- a/pkg/omni/full/converter/converter_test.go
+++ b/pkg/omni/full/converter/converter_test.go
@@ -25,40 +25,41 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Converter", func() {
- var info *contract.Contract
+ var con *contract.Contract
var wantedEvents = []string{"Transfer"}
var err error
BeforeEach(func() {
- info = test_helpers.SetupTusdContract(wantedEvents, []string{})
+ con = test_helpers.SetupTusdContract(wantedEvents, []string{})
})
Describe("Update", func() {
- It("Updates contract info held by the converter", func() {
- c := converter.NewConverter(info)
- Expect(c.ContractInfo).To(Equal(info))
+ It("Updates contract con held by the converter", func() {
+ c := converter.NewConverter(con)
+ Expect(c.ContractInfo).To(Equal(con))
- info := test_helpers.SetupTusdContract([]string{}, []string{})
- c.Update(info)
- Expect(c.ContractInfo).To(Equal(info))
+ con := test_helpers.SetupTusdContract([]string{}, []string{})
+ c.Update(con)
+ Expect(c.ContractInfo).To(Equal(con))
})
})
Describe("Convert", func() {
It("Converts a watched event log to mapping of event input names to values", func() {
- _, ok := info.Events["Approval"]
+ _, ok := con.Events["Approval"]
Expect(ok).To(Equal(false))
- event, ok := info.Events["Transfer"]
+ event, ok := con.Events["Transfer"]
Expect(ok).To(Equal(true))
- err = info.GenerateFilters()
+ err = con.GenerateFilters()
Expect(err).ToNot(HaveOccurred())
- c := converter.NewConverter(info)
- log, err := c.Convert(test_helpers.MockTranferEvent, event)
+ c := converter.NewConverter(con)
+ log, err := c.Convert(mocks.MockTranferEvent, event)
Expect(err).ToNot(HaveOccurred())
from := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")
@@ -72,10 +73,36 @@ var _ = Describe("Converter", func() {
Expect(v).To(Equal(value.String()))
})
+ It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() {
+ event, ok := con.Events["Transfer"]
+ Expect(ok).To(Equal(true))
+
+ c := converter.NewConverter(con)
+ _, err := c.Convert(mocks.MockTranferEvent, event)
+ Expect(err).ToNot(HaveOccurred())
+
+ b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ _, ok = con.TknHolderAddrs["0x"]
+ Expect(ok).To(Equal(false))
+
+ _, ok = con.TknHolderAddrs[""]
+ Expect(ok).To(Equal(false))
+
+ _, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"]
+ Expect(ok).To(Equal(false))
+ })
+
It("Fails with an empty contract", func() {
- event := info.Events["Transfer"]
+ event := con.Events["Transfer"]
c := converter.NewConverter(&contract.Contract{})
- _, err = c.Convert(test_helpers.MockTranferEvent, event)
+ _, err = c.Convert(mocks.MockTranferEvent, event)
Expect(err).To(HaveOccurred())
})
})
diff --git a/pkg/omni/full/repository/event_repository.go b/pkg/omni/full/repository/event_repository.go
deleted file mode 100644
index e0998308..00000000
--- a/pkg/omni/full/repository/event_repository.go
+++ /dev/null
@@ -1,182 +0,0 @@
-// VulcanizeDB
-// Copyright © 2018 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package repository
-
-import (
- "errors"
- "fmt"
- "strings"
-
- "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
-)
-
-// Event repository is used to persist event data into custom tables
-type EventRepository interface {
- PersistLog(event types.Log, contractAddr, contractName string) error
- CreateEventTable(contractName string, event types.Log) (bool, error)
- CreateContractSchema(contractName string) (bool, error)
-}
-
-type eventRepository struct {
- db *postgres.DB
-}
-
-func NewEventRepository(db *postgres.DB) *eventRepository {
-
- return &eventRepository{
- db: db,
- }
-}
-
-// Creates a schema for the contract if needed
-// Creates table for the watched contract event if needed
-// Persists converted event log data into this custom table
-func (d *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error {
- _, err := d.CreateContractSchema(contractAddr)
- if err != nil {
- return err
- }
-
- _, err = d.CreateEventTable(contractAddr, event)
- if err != nil {
- return err
- }
-
- return d.persistLog(event, contractAddr, contractName)
-}
-
-// Creates a custom postgres command to persist logs for the given event
-func (d *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error {
- // Begin postgres string
- pgStr := fmt.Sprintf("INSERT INTO c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
- pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx"
-
- // Pack the corresponding variables in a slice
- var data []interface{}
- data = append(data,
- event.Id,
- contractName,
- event.Block,
- event.Tx)
-
- // Iterate over name-value pairs in the log adding
- // names to the string and pushing values to the slice
- counter := 0 // Keep track of number of inputs
- for inputName, input := range event.Values {
- counter += 1
- pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
- data = append(data, input)
- }
-
- // Finish off the string and execute the command using the packed data
- // For each input entry we created we add its postgres command variable to the string
- pgStr = pgStr + ") VALUES ($1, $2, $3, $4"
- for i := 0; i < counter; i++ {
- pgStr = pgStr + fmt.Sprintf(", $%d", i+5)
- }
- pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING"
-
- _, err := d.db.Exec(pgStr, data...)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// Checks for event table and creates it if it does not already exist
-func (d *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) {
- tableExists, err := d.checkForTable(contractAddr, event.Name)
- if err != nil {
- return false, err
- }
-
- if !tableExists {
- err = d.newEventTable(contractAddr, event)
- if err != nil {
- return false, err
- }
- }
-
- return !tableExists, nil
-}
-
-// Creates a table for the given contract and event
-func (d *eventRepository) newEventTable(contractAddr string, event types.Log) error {
- // Begin pg string
- pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
- pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL,"
-
- // Iterate over event fields, using their name and pgType to grow the string
- for _, field := range event.Fields {
- pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
- }
-
- pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)"
- _, err := d.db.Exec(pgStr)
-
- return err
-}
-
-// Checks if a table already exists for the given contract and event
-func (d *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName))
-
- var exists bool
- err := d.db.Get(&exists, pgStr)
-
- return exists, err
-}
-
-// Checks for contract schema and creates it if it does not already exist
-func (d *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
- if contractAddr == "" {
- return false, errors.New("error: no contract address specified")
- }
-
- schemaExists, err := d.checkForSchema(contractAddr)
- if err != nil {
- return false, err
- }
-
- if !schemaExists {
- err = d.newContractSchema(contractAddr)
- if err != nil {
- return false, err
- }
- }
-
- return !schemaExists, nil
-}
-
-// Creates a schema for the given contract
-func (d *eventRepository) newContractSchema(contractAddr string) error {
- _, err := d.db.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr))
-
- return err
-}
-
-// Checks if a schema already exists for the given contract
-func (d *eventRepository) checkForSchema(contractAddr string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr))
-
- var exists bool
- err := d.db.QueryRow(pgStr).Scan(&exists)
-
- return exists, err
-}
diff --git a/pkg/omni/full/repository/event_repository_test.go b/pkg/omni/full/repository/event_repository_test.go
deleted file mode 100644
index bcaa3704..00000000
--- a/pkg/omni/full/repository/event_repository_test.go
+++ /dev/null
@@ -1,171 +0,0 @@
-// VulcanizeDB
-// Copyright © 2018 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package repository_test
-
-import (
- "fmt"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-
- "github.com/vulcanize/vulcanizedb/pkg/core"
- "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
- "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
- "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
-)
-
-var mockEvent = core.WatchedEvent{
- Name: constants.TransferEvent.String(),
- BlockNumber: 5488076,
- Address: constants.TusdContractAddress,
- TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
- Index: 110,
- Topic0: constants.TransferEvent.Signature(),
- Topic1: "0x000000000000000000000000000000000000000000000000000000000000af21",
- Topic2: "0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391",
- Topic3: "",
- Data: "0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe",
-}
-
-var _ = Describe("Repository", func() {
- var db *postgres.DB
- var dataStore repository.EventRepository
- var err error
- var log *types.Log
- var con *contract.Contract
- var vulcanizeLogId int64
- var wantedEvents = []string{"Transfer"}
- var event types.Event
-
- BeforeEach(func() {
- db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{})
- mockEvent.LogID = vulcanizeLogId
-
- event = con.Events["Transfer"]
- err = con.GenerateFilters()
- Expect(err).ToNot(HaveOccurred())
-
- c := converter.NewConverter(con)
- log, err = c.Convert(mockEvent, event)
- Expect(err).ToNot(HaveOccurred())
-
- dataStore = repository.NewEventRepository(db)
- })
-
- AfterEach(func() {
- test_helpers.TearDown(db)
- })
-
- Describe("CreateContractSchema", func() {
- It("Creates schema if it doesn't exist", func() {
- created, err := dataStore.CreateContractSchema(con.Address)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
-
- created, err = dataStore.CreateContractSchema(con.Address)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(false))
- })
- })
-
- Describe("CreateEventTable", func() {
- It("Creates table if it doesn't exist", func() {
- created, err := dataStore.CreateContractSchema(con.Address)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
-
- created, err = dataStore.CreateEventTable(con.Address, *log)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
-
- created, err = dataStore.CreateEventTable(con.Address, *log)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(false))
- })
- })
-
- Describe("PersistLog", func() {
- It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() {
- err = dataStore.PersistLog(*log, con.Address, con.Name)
- Expect(err).ToNot(HaveOccurred())
-
- b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
- Expect(ok).To(Equal(true))
- Expect(b).To(Equal(true))
-
- b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
- Expect(ok).To(Equal(true))
- Expect(b).To(Equal(true))
-
- scanLog := test_helpers.TransferLog{}
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
- expectedLog := test_helpers.TransferLog{
- Id: 1,
- VulvanizeLogId: vulcanizeLogId,
- TokenName: "TrueUSD",
- Block: 5488076,
- Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
- From: "0x000000000000000000000000000000000000Af21",
- To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
- Value: "1097077688018008265106216665536940668749033598146",
- }
- Expect(scanLog).To(Equal(expectedLog))
- })
-
- It("Doesn't persist duplicate event logs", func() {
- // Perist once
- err = dataStore.PersistLog(*log, con.Address, con.Name)
- Expect(err).ToNot(HaveOccurred())
-
- scanLog := test_helpers.TransferLog{}
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
- expectedLog := test_helpers.TransferLog{
- Id: 1,
- VulvanizeLogId: vulcanizeLogId,
- TokenName: "TrueUSD",
- Block: 5488076,
- Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
- From: "0x000000000000000000000000000000000000Af21",
- To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
- Value: "1097077688018008265106216665536940668749033598146",
- }
-
- Expect(scanLog).To(Equal(expectedLog))
-
- // Attempt to persist the same log again
- err = dataStore.PersistLog(*log, con.Address, con.Name)
- Expect(err).ToNot(HaveOccurred())
-
- // Show that no new logs were entered
- var count int
- err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM c%s.transfer_event", constants.TusdContractAddress))
- Expect(err).ToNot(HaveOccurred())
- Expect(count).To(Equal(1))
- })
-
- It("Fails with empty log", func() {
- err = dataStore.PersistLog(types.Log{}, con.Address, con.Name)
- Expect(err).To(HaveOccurred())
- })
- })
-})
diff --git a/pkg/omni/full/retriever/retriever_suite_test.go b/pkg/omni/full/retriever/retriever_suite_test.go
index d5815a7c..948f5e8e 100644
--- a/pkg/omni/full/retriever/retriever_suite_test.go
+++ b/pkg/omni/full/retriever/retriever_suite_test.go
@@ -27,7 +27,7 @@ import (
func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Full Retriever Suite Test")
+ RunSpecs(t, "Full Block Number Retriever Suite Test")
}
var _ = BeforeSuite(func() {
diff --git a/pkg/omni/full/transformer/transformer.go b/pkg/omni/full/transformer/transformer.go
index b9987fcb..2191954f 100644
--- a/pkg/omni/full/transformer/transformer.go
+++ b/pkg/omni/full/transformer/transformer.go
@@ -19,18 +19,17 @@ package transformer
import (
"errors"
"fmt"
- "log"
-
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
- "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Requires a fully synced vDB and a running eth node (or infura)
@@ -71,14 +70,14 @@ type transformer struct {
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transformer {
return &transformer{
- Poller: poller.NewPoller(BC, DB),
+ Poller: poller.NewPoller(BC, DB, types.FullSync),
Parser: parser.NewParser(network),
BlockRetriever: retriever.NewBlockRetriever(DB),
Converter: converter.NewConverter(&contract.Contract{}),
Contracts: map[string]*contract.Contract{},
WatchedEventRepository: repositories.WatchedEventRepository{DB: DB},
FilterRepository: repositories.FilterRepository{DB: DB},
- EventRepository: repository.NewEventRepository(DB),
+ EventRepository: repository.NewEventRepository(DB, types.FullSync),
WatchedEvents: map[string][]string{},
WantedMethods: map[string][]string{},
ContractRanges: map[string][2]int64{},
@@ -184,27 +183,26 @@ func (tr transformer) Execute() error {
tr.Update(con)
// Iterate through contract filters and get watched event logs
- for eventName, filter := range con.Filters {
+ for eventName := range con.Filters {
watchedEvents, err := tr.GetWatchedEvents(eventName)
if err != nil {
- log.Println(fmt.Sprintf("Error fetching events for %s:", filter.Name), err)
return err
}
// Iterate over watched event logs
for _, we := range watchedEvents {
// Convert them to our custom log type
- log, err := tr.Converter.Convert(*we, con.Events[eventName])
+ cstm, err := tr.Converter.Convert(*we, con.Events[eventName])
if err != nil {
return err
}
- if log == nil {
- break
+ if cstm == nil {
+ continue
}
// If log is not empty, immediately persist in repo
// Run this in seperate goroutine?
- err = tr.PersistLog(*log, con.Address, con.Name)
+ err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
}
diff --git a/pkg/omni/full/transformer/transformer_test.go b/pkg/omni/full/transformer/transformer_test.go
index 83a72073..ef1f9327 100644
--- a/pkg/omni/full/transformer/transformer_test.go
+++ b/pkg/omni/full/transformer/transformer_test.go
@@ -30,6 +30,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/full/transformer"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Transformer", func() {
@@ -95,8 +96,8 @@ var _ = Describe("Transformer", func() {
Describe("Init", func() {
It("Initializes transformer's contract objects", func() {
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@@ -120,8 +121,8 @@ var _ = Describe("Transformer", func() {
})
It("Does nothing if watched events are unset", func() {
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
t := transformer.NewTransformer("", blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
@@ -133,8 +134,8 @@ var _ = Describe("Transformer", func() {
Describe("Execute", func() {
BeforeEach(func() {
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
- blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
+ blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
})
It("Transforms watched contract data into custom repositories", func() {
@@ -148,7 +149,7 @@ var _ = Describe("Transformer", func() {
Expect(err).ToNot(HaveOccurred())
log := test_helpers.TransferLog{}
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log)
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log)
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee"))
@@ -198,12 +199,12 @@ var _ = Describe("Transformer", func() {
res := test_helpers.BalanceOf{}
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).ToNot(HaveOccurred())
Expect(res.Balance).To(Equal("0"))
Expect(res.TokenName).To(Equal("TrueUSD"))
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).To(HaveOccurred())
})
diff --git a/pkg/omni/light/converter/converter.go b/pkg/omni/light/converter/converter.go
index 63348445..a92ee665 100644
--- a/pkg/omni/light/converter/converter.go
+++ b/pkg/omni/light/converter/converter.go
@@ -17,16 +17,22 @@
package converter
import (
+ "encoding/json"
"errors"
+ "fmt"
+ "math/big"
+ "strconv"
- geth "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/accounts/abi/bind"
+ "github.com/ethereum/go-ethereum/common"
+ gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
type Converter interface {
- Convert(log geth.Log, event types.Event) (*types.Log, error)
+ Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error)
Update(info *contract.Contract)
}
@@ -45,6 +51,66 @@ func (c *converter) Update(info *contract.Contract) {
}
// Convert the given watched event log into a types.Log for the given event
-func (c *converter) Convert(log geth.Log, event types.Event) (*types.Log, error) {
- return nil, errors.New("implement me")
+func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) {
+ contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
+ returnLogs := make([]types.Log, 0, len(logs))
+ for _, log := range logs {
+ values := make(map[string]interface{})
+ for _, field := range event.Fields {
+ var i interface{}
+ values[field.Name] = i
+ }
+
+ err := contract.UnpackLogIntoMap(values, event.Name, log)
+ if err != nil {
+ return nil, err
+ }
+
+ strValues := make(map[string]string, len(values))
+ for fieldName, input := range values {
+ // Postgres cannot handle custom types, resolve everything to strings
+ switch input.(type) {
+ case *big.Int:
+ b := input.(*big.Int)
+ strValues[fieldName] = b.String()
+ case common.Address:
+ a := input.(common.Address)
+ strValues[fieldName] = a.String()
+ c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses
+ case common.Hash:
+ h := input.(common.Hash)
+ strValues[fieldName] = h.String()
+ case string:
+ strValues[fieldName] = input.(string)
+ case bool:
+ strValues[fieldName] = strconv.FormatBool(input.(bool))
+ case []byte:
+ b := input.([]byte)
+ strValues[fieldName] = string(b)
+ case byte:
+ b := input.(byte)
+ strValues[fieldName] = string(b)
+ default:
+ return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input))
+ }
+ }
+
+ // Only hold onto logs that pass our address filter, if any
+ if c.ContractInfo.PassesEventFilter(strValues) {
+ raw, err := json.Marshal(log)
+ if err != nil {
+ return nil, err
+ }
+
+ returnLogs = append(returnLogs, types.Log{
+ LogIndex: log.Index,
+ Values: strValues,
+ Raw: raw,
+ TransactionIndex: log.TxIndex,
+ Id: headerID,
+ })
+ }
+ }
+
+ return returnLogs, nil
}
diff --git a/pkg/omni/light/converter/converter_test.go b/pkg/omni/light/converter/converter_test.go
index a4d4e8ac..96a375ca 100644
--- a/pkg/omni/light/converter/converter_test.go
+++ b/pkg/omni/light/converter/converter_test.go
@@ -15,3 +15,98 @@
// along with this program. If not, see .
package converter_test
+
+import (
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/pkg/omni/light/converter"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
+)
+
+var _ = Describe("Converter", func() {
+ var con *contract.Contract
+ var wantedEvents = []string{"Transfer", "Mint"}
+ var err error
+
+ BeforeEach(func() {
+ con = test_helpers.SetupTusdContract(wantedEvents, []string{})
+ })
+
+ Describe("Update", func() {
+ It("Updates contract info held by the converter", func() {
+ c := converter.NewConverter(con)
+ Expect(c.ContractInfo).To(Equal(con))
+
+ info := test_helpers.SetupTusdContract([]string{}, []string{})
+ c.Update(info)
+ Expect(c.ContractInfo).To(Equal(info))
+ })
+ })
+
+ Describe("Convert", func() {
+ It("Converts a watched event log to mapping of event input names to values", func() {
+ _, ok := con.Events["Approval"]
+ Expect(ok).To(Equal(false))
+
+ event, ok := con.Events["Transfer"]
+ Expect(ok).To(Equal(true))
+
+ c := converter.NewConverter(con)
+ logs, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(logs)).To(Equal(2))
+
+ sender1 := common.HexToAddress("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391")
+ sender2 := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")
+ value := helpers.BigFromString("1097077688018008265106216665536940668749033598146")
+
+ Expect(logs[0].Values["to"]).To(Equal(sender1.String()))
+ Expect(logs[0].Values["from"]).To(Equal(sender2.String()))
+ Expect(logs[0].Values["value"]).To(Equal(value.String()))
+ Expect(logs[0].Id).To(Equal(int64(232)))
+ Expect(logs[1].Values["to"]).To(Equal(sender2.String()))
+ Expect(logs[1].Values["from"]).To(Equal(sender1.String()))
+ Expect(logs[1].Values["value"]).To(Equal(value.String()))
+ Expect(logs[1].Id).To(Equal(int64(232)))
+ })
+
+ It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() {
+ event, ok := con.Events["Transfer"]
+ Expect(ok).To(Equal(true))
+
+ c := converter.NewConverter(con)
+ _, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232)
+ Expect(err).ToNot(HaveOccurred())
+
+ b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ _, ok = con.TknHolderAddrs["0x"]
+ Expect(ok).To(Equal(false))
+
+ _, ok = con.TknHolderAddrs[""]
+ Expect(ok).To(Equal(false))
+
+ _, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"]
+ Expect(ok).To(Equal(false))
+ })
+
+ It("Fails with an empty contract", func() {
+ event := con.Events["Transfer"]
+ c := converter.NewConverter(&contract.Contract{})
+ _, err = c.Convert([]types.Log{mocks.MockTransferLog1}, event, 232)
+ Expect(err).To(HaveOccurred())
+ })
+ })
+})
diff --git a/pkg/omni/light/repository/event_repository.go b/pkg/omni/light/repository/event_repository.go
deleted file mode 100644
index 14e6b256..00000000
--- a/pkg/omni/light/repository/event_repository.go
+++ /dev/null
@@ -1,181 +0,0 @@
-// VulcanizeDB
-// Copyright © 2018 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package repository
-
-import (
- "errors"
- "fmt"
- "strings"
-
- "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
-)
-
-type EventRepository interface {
- PersistLog(event types.Log, contractAddr, contractName string) error
- CreateEventTable(contractName string, event types.Log) (bool, error)
- CreateContractSchema(contractName string) (bool, error)
-}
-
-type eventRepository struct {
- db *postgres.DB
-}
-
-func NewEventRepository(db *postgres.DB) *eventRepository {
- return &eventRepository{
- db: db,
- }
-}
-
-// Creates a schema for the contract if needed
-// Creates table for the watched contract event if needed
-// Persists converted event log data into this custom table
-func (r *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error {
- _, err := r.CreateContractSchema(contractAddr)
- if err != nil {
- return err
- }
-
- _, err = r.CreateEventTable(contractAddr, event)
- if err != nil {
- return err
- }
-
- return r.persistLog(event, contractAddr, contractName)
-}
-
-// Creates a custom postgres command to persist logs for the given event
-func (r *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error {
- // Begin postgres string
- pgStr := fmt.Sprintf("INSERT INTO l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
- pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx"
-
- // Pack the corresponding variables in a slice
- var data []interface{}
- data = append(data,
- event.Id,
- contractName,
- event.Raw,
- event.LogIndex,
- event.TransactionIndex)
-
- // Iterate over name-value pairs in the log adding
- // names to the string and pushing values to the slice
- counter := 0 // Keep track of number of inputs
- for inputName, input := range event.Values {
- counter += 1
- pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
- data = append(data, input)
- }
-
- // Finish off the string and execute the command using the packed data
- // For each input entry we created we add its postgres command variable to the string
- pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5"
- for i := 0; i < counter; i++ {
- pgStr = pgStr + fmt.Sprintf(", $%d", i+6)
- }
- pgStr = pgStr + ")"
-
- _, err := r.db.Exec(pgStr, data...)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// Checks for event table and creates it if it does not already exist
-func (r *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) {
- tableExists, err := r.checkForTable(contractAddr, event.Name)
- if err != nil {
- return false, err
- }
-
- if !tableExists {
- err = r.newEventTable(contractAddr, event)
- if err != nil {
- return false, err
- }
- }
-
- return !tableExists, nil
-}
-
-// Creates a table for the given contract and event
-func (r *eventRepository) newEventTable(contractAddr string, event types.Log) error {
- // Begin pg string
- pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
- pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"
-
- // Iterate over event fields, using their name and pgType to grow the string
- for _, field := range event.Fields {
- pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
- }
-
- pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))"
- _, err := r.db.Exec(pgStr)
-
- return err
-}
-
-// Checks if a table already exists for the given contract and event
-func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'l%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName))
-
- var exists bool
- err := r.db.Get(&exists, pgStr)
-
- return exists, err
-}
-
-// Checks for contract schema and creates it if it does not already exist
-func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
- if contractAddr == "" {
- return false, errors.New("error: no contract address specified")
- }
-
- schemaExists, err := r.checkForSchema(contractAddr)
- if err != nil {
- return false, err
- }
-
- if !schemaExists {
- err = r.newContractSchema(contractAddr)
- if err != nil {
- return false, err
- }
- }
-
- return !schemaExists, nil
-}
-
-// Creates a schema for the given contract
-func (r *eventRepository) newContractSchema(contractAddr string) error {
- _, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS l" + strings.ToLower(contractAddr))
-
- return err
-}
-
-// Checks if a schema already exists for the given contract
-func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'l%s')", strings.ToLower(contractAddr))
-
- var exists bool
- err := r.db.QueryRow(pgStr).Scan(&exists)
-
- return exists, err
-}
diff --git a/pkg/omni/light/repository/event_repository_test.go b/pkg/omni/light/repository/event_repository_test.go
deleted file mode 100644
index 68165a0d..00000000
--- a/pkg/omni/light/repository/event_repository_test.go
+++ /dev/null
@@ -1,17 +0,0 @@
-// VulcanizeDB
-// Copyright © 2018 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package repository_test
diff --git a/pkg/omni/light/repository/header_repository.go b/pkg/omni/light/repository/header_repository.go
index 668e64e7..8e9ce00f 100644
--- a/pkg/omni/light/repository/header_repository.go
+++ b/pkg/omni/light/repository/header_repository.go
@@ -17,30 +17,61 @@
package repository
import (
+ "database/sql"
+ "github.com/hashicorp/golang-lru"
+
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
+const columnCacheSize = 1000
+
type HeaderRepository interface {
+ AddCheckColumn(eventID string) error
MarkHeaderChecked(headerID int64, eventID string) error
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error)
+ CheckCache(key string) (interface{}, bool)
}
type headerRepository struct {
- db *postgres.DB
+ db *postgres.DB
+ columns *lru.Cache // Cache created columns to minimize db connections
}
func NewHeaderRepository(db *postgres.DB) *headerRepository {
+ ccs, _ := lru.New(columnCacheSize)
return &headerRepository{
- db: db,
+ db: db,
+ columns: ccs,
}
}
+func (r *headerRepository) AddCheckColumn(eventID string) error {
+ // Check cache to see if column already exists before querying pg
+ _, ok := r.columns.Get(eventID)
+ if ok {
+ return nil
+ }
+
+ pgStr := "ALTER TABLE public.checked_headers ADD COLUMN IF NOT EXISTS "
+ pgStr = pgStr + eventID + " BOOLEAN NOT NULL DEFAULT FALSE"
+ _, err := r.db.Exec(pgStr)
+ if err != nil {
+ return err
+ }
+
+ // Add column name to cache
+ r.columns.Add(eventID, true)
+
+ return nil
+}
+
func (r *headerRepository) MarkHeaderChecked(headerID int64, eventID string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = $2`, headerID, true)
+
return err
}
@@ -68,3 +99,15 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock
return result, err
}
+
+func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
+ return r.columns.Get(key)
+}
+
+func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error {
+ _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
+ VALUES ($1, $2)
+ ON CONFLICT (header_id) DO
+ UPDATE SET `+eventID+` = $2`, headerID, true)
+ return err
+}
diff --git a/pkg/omni/light/repository/header_repository_test.go b/pkg/omni/light/repository/header_repository_test.go
index 68165a0d..f5f4ece3 100644
--- a/pkg/omni/light/repository/header_repository_test.go
+++ b/pkg/omni/light/repository/header_repository_test.go
@@ -15,3 +15,127 @@
// along with this program. If not, see .
package repository_test
+
+import (
+ "fmt"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
+)
+
+var _ = Describe("Repository", func() {
+ var db *postgres.DB
+ var r repository.HeaderRepository
+ var headerRepository repositories.HeaderRepository
+ var eventID, query string
+
+ BeforeEach(func() {
+ db, _ = test_helpers.SetupDBandBC()
+ r = repository.NewHeaderRepository(db)
+ headerRepository = repositories.NewHeaderRepository(db)
+ eventID = "eventName_contractAddr"
+ })
+
+ AfterEach(func() {
+ test_helpers.TearDown(db)
+ })
+
+ Describe("AddCheckColumn", func() {
+ It("Creates a column for the given eventID to mark if the header has been checked for that event", func() {
+ query = fmt.Sprintf("SELECT %s FROM checked_headers", eventID)
+ _, err := db.Exec(query)
+ Expect(err).To(HaveOccurred())
+
+ err = r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ _, err = db.Exec(query)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ _, ok := r.CheckCache(eventID)
+ Expect(ok).To(Equal(false))
+
+ err := r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ v, ok := r.CheckCache(eventID)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("MissingHeaders", func() {
+ It("Returns all unchecked headers for the given eventID", func() {
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
+ err := r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(missingHeaders)).To(Equal(3))
+ })
+
+ It("Fails if eventID does not yet exist in check_headers table", func() {
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
+ err := r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ _, err = r.MissingHeaders(6194630, 6194635, "notEventId")
+ Expect(err).To(HaveOccurred())
+ })
+ })
+
+ Describe("MarkHeaderChecked", func() {
+ It("Marks the header checked for the given eventID", func() {
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
+ err := r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(missingHeaders)).To(Equal(3))
+
+ headerID := missingHeaders[0].Id
+ err = r.MarkHeaderChecked(headerID, eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(missingHeaders)).To(Equal(2))
+ })
+
+ It("Fails if eventID does not yet exist in check_headers table", func() {
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
+ err := r.AddCheckColumn(eventID)
+ Expect(err).ToNot(HaveOccurred())
+
+ missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(missingHeaders)).To(Equal(3))
+
+ headerID := missingHeaders[0].Id
+ err = r.MarkHeaderChecked(headerID, "notEventId")
+ Expect(err).To(HaveOccurred())
+
+ missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(missingHeaders)).To(Equal(3))
+ })
+ })
+})
diff --git a/pkg/omni/light/retriever/block_retriever_test.go b/pkg/omni/light/retriever/block_retriever_test.go
index 09be7b77..563e3738 100644
--- a/pkg/omni/light/retriever/block_retriever_test.go
+++ b/pkg/omni/light/retriever/block_retriever_test.go
@@ -24,6 +24,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Block Retriever", func() {
@@ -43,9 +44,9 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveFirstBlock", func() {
It("Retrieves block number of earliest header in the database", func() {
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
i, err := r.RetrieveFirstBlock()
Expect(err).NotTo(HaveOccurred())
@@ -60,9 +61,9 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveMostRecentBlock", func() {
It("Retrieves the latest header's block number", func() {
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
i, err := r.RetrieveMostRecentBlock()
Expect(err).ToNot(HaveOccurred())
diff --git a/pkg/omni/light/retriever/retriever_suite_test.go b/pkg/omni/light/retriever/retriever_suite_test.go
index dc577626..cb5a7313 100644
--- a/pkg/omni/light/retriever/retriever_suite_test.go
+++ b/pkg/omni/light/retriever/retriever_suite_test.go
@@ -27,7 +27,7 @@ import (
func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Light Retriever Suite Test")
+ RunSpecs(t, "Light BLock Number Retriever Suite Test")
}
var _ = BeforeSuite(func() {
diff --git a/pkg/omni/light/transformer/transformer.go b/pkg/omni/light/transformer/transformer.go
index d6c17f63..26804ea7 100644
--- a/pkg/omni/light/transformer/transformer.go
+++ b/pkg/omni/light/transformer/transformer.go
@@ -18,7 +18,8 @@ package transformer
import (
"errors"
- "fmt"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
+ "strings"
"github.com/ethereum/go-ethereum/common"
@@ -31,12 +32,14 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
+ srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Requires a light synced vDB (headers) and a running eth node (or infura)
type transformer struct {
// Database interfaces
- repository.EventRepository // Holds transformed watched event log data
+ srep.EventRepository // Holds transformed watched event log data
repository.HeaderRepository // Interface for interaction with header repositories
// Pre-processing interfaces
@@ -72,14 +75,14 @@ type transformer struct {
func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transformer {
return &transformer{
- Poller: poller.NewPoller(bc, db),
+ Poller: poller.NewPoller(bc, db, types.LightSync),
Fetcher: fetcher.NewFetcher(bc),
Parser: parser.NewParser(network),
HeaderRepository: repository.NewHeaderRepository(db),
BlockRetriever: retriever.NewBlockRetriever(db),
Converter: converter.NewConverter(&contract.Contract{}),
Contracts: map[string]*contract.Contract{},
- EventRepository: repository.NewEventRepository(db),
+ EventRepository: srep.NewEventRepository(db, types.LightSync),
WatchedEvents: map[string][]string{},
WantedMethods: map[string][]string{},
ContractRanges: map[string][2]int64{},
@@ -93,7 +96,7 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf
// Uses parser to pull event info from abi
// Use this info to generate event filters
func (tr *transformer) Init() error {
-
+ // Iterate through all internal contract addresses
for contractAddr, subset := range tr.WatchedEvents {
// Get Abi
err := tr.Parser.Parse(contractAddr)
@@ -101,7 +104,7 @@ func (tr *transformer) Init() error {
return err
}
- // Get first block for contract and most recent block for the chain
+ // Get first block and most recent block number in the header repo
firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock()
if err != nil {
return err
@@ -111,7 +114,7 @@ func (tr *transformer) Init() error {
return err
}
- // Set to specified range if it falls within the contract's bounds
+ // Set to specified range if it falls within the bounds
if firstBlock < tr.ContractRanges[contractAddr][0] {
firstBlock = tr.ContractRanges[contractAddr][0]
}
@@ -119,14 +122,11 @@ func (tr *transformer) Init() error {
lastBlock = tr.ContractRanges[contractAddr][1]
}
- // Get contract name
+ // Get contract name if it has one
var name = new(string)
- err = tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock)
- if err != nil {
- return errors.New(fmt.Sprintf("unable to fetch contract name: %v\r\n", err))
- }
+ tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock)
- // Remove any accidental duplicate inputs in filter addresses
+ // Remove any potential accidental duplicate inputs in filter addresses
EventAddrs := map[string]bool{}
for _, addr := range tr.EventAddrs[contractAddr] {
EventAddrs[addr] = true
@@ -142,6 +142,7 @@ func (tr *transformer) Init() error {
Network: tr.Network,
Address: contractAddr,
Abi: tr.Abi(),
+ ParsedAbi: tr.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: tr.GetEvents(subset),
@@ -151,7 +152,7 @@ func (tr *transformer) Init() error {
TknHolderAddrs: map[string]bool{},
}
- // Store contract info for further processing
+ // Store contract info for execution
tr.Contracts[contractAddr] = info
}
@@ -160,53 +161,69 @@ func (tr *transformer) Init() error {
func (tr *transformer) Execute() error {
if len(tr.Contracts) == 0 {
- return errors.New("error: transformer has no initialized contracts to work with")
+ return errors.New("error: transformer has no initialized contracts")
}
// Iterate through all internal contracts
for _, con := range tr.Contracts {
-
// Update converter with current contract
tr.Update(con)
+ // Iterate through events
for _, event := range con.Events {
- topics := [][]common.Hash{{common.HexToHash(event.Sig())}}
- eventId := event.Name + "_" + con.Address
+ // Filter using the event signature
+ topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}}
+
+ // Generate eventID and use it to create a checked_header column if one does not already exist
+ eventId := strings.ToLower(event.Name + "_" + con.Address)
+ if err := tr.AddCheckColumn(eventId); err != nil {
+ return err
+ }
+
+ // Find unchecked headers for this event
missingHeaders, err := tr.MissingHeaders(con.StartingBlock, con.LastBlock, eventId)
if err != nil {
return err
}
+ // Iterate over headers
for _, header := range missingHeaders {
+ // And fetch event logs using the header, contract address, and topics filter
logs, err := tr.FetchLogs([]string{con.Address}, topics, header)
if err != nil {
return err
}
+ // Mark the header checked for this eventID and continue to next iteration if no logs are found
if len(logs) < 1 {
err = tr.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
}
-
continue
}
- for _, l := range logs {
- mapping, err := tr.Convert(l, event)
- if err != nil {
- return err
- }
- if mapping == nil {
- break
- }
+ // Convert logs into custom type
+ convertedLogs, err := tr.Convert(logs, event, header.Id)
+ if err != nil {
+ return err
+ }
+ if len(convertedLogs) < 1 {
+ continue
+ }
- err = tr.PersistLog(*mapping, con.Address, con.Name)
- if err != nil {
- return err
- }
+ // If logs aren't empty, persist them
+ err = tr.PersistLogs(convertedLogs, event, con.Address, con.Name)
+ if err != nil {
+ return err
}
}
}
+ // After persisting all watched event logs
+ // poller polls select contract methods
+ // and persists the results into custom pg tables
+ if err := tr.PollContract(*con); err != nil {
+ return err
+ }
}
return nil
diff --git a/pkg/omni/light/transformer/transformer_test.go b/pkg/omni/light/transformer/transformer_test.go
index 5f465b86..31a6574e 100644
--- a/pkg/omni/light/transformer/transformer_test.go
+++ b/pkg/omni/light/transformer/transformer_test.go
@@ -17,9 +17,7 @@
package transformer_test
import (
- "math/rand"
- "time"
-
+ "fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -29,6 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/light/transformer"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Transformer", func() {
@@ -36,7 +35,7 @@ var _ = Describe("Transformer", func() {
var err error
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
- rand.Seed(time.Now().UnixNano())
+ var headerID int64
BeforeEach(func() {
db, blockChain = test_helpers.SetupDBandBC()
@@ -94,8 +93,8 @@ var _ = Describe("Transformer", func() {
Describe("Init", func() {
It("Initializes transformer's contract objects", func() {
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@@ -111,7 +110,7 @@ var _ = Describe("Transformer", func() {
Expect(c.Address).To(Equal(constants.TusdContractAddress))
})
- It("Fails to initialize if first and most recent blocks cannot be fetched from vDB", func() {
+ It("Fails to initialize if first and most recent block numbers cannot be fetched from vDB headers table", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@@ -119,8 +118,8 @@ var _ = Describe("Transformer", func() {
})
It("Does nothing if watched events are unset", func() {
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
- headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
t := transformer.NewTransformer("", blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
@@ -131,6 +130,96 @@ var _ = Describe("Transformer", func() {
})
Describe("Execute", func() {
+ BeforeEach(func() {
+ header1, err := blockChain.GetHeaderByNumber(6791668)
+ Expect(err).ToNot(HaveOccurred())
+ header2, err := blockChain.GetHeaderByNumber(6791669)
+ Expect(err).ToNot(HaveOccurred())
+ header3, err := blockChain.GetHeaderByNumber(6791670)
+ Expect(err).ToNot(HaveOccurred())
+ headerRepository.CreateOrUpdateHeader(header1)
+ headerID, err = headerRepository.CreateOrUpdateHeader(header2)
+ Expect(err).ToNot(HaveOccurred())
+ headerRepository.CreateOrUpdateHeader(header3)
+ })
+ It("Transforms watched contract data into custom repositories", func() {
+ t := transformer.NewTransformer("", blockChain, db)
+ t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
+ t.SetMethods(constants.TusdContractAddress, nil)
+
+ err = t.Init()
+ Expect(err).ToNot(HaveOccurred())
+
+ err = t.Execute()
+ Expect(err).ToNot(HaveOccurred())
+
+ log := test_helpers.LightTransferLog{}
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&log)
+
+ // We don't know vulcID, so compare individual fields instead of complete structures
+ Expect(log.HeaderID).To(Equal(headerID))
+ Expect(log.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
+ Expect(log.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"))
+ Expect(log.Value).To(Equal("9998940000000000000000"))
+ })
+
+ It("Keeps track of contract-related addresses while transforming event data", func() {
+ t := transformer.NewTransformer("", blockChain, db)
+ t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
+ t.SetMethods(constants.TusdContractAddress, nil)
+ err = t.Init()
+ Expect(err).ToNot(HaveOccurred())
+
+ c, ok := t.Contracts[constants.TusdContractAddress]
+ Expect(ok).To(Equal(true))
+
+ err = t.Execute()
+ Expect(err).ToNot(HaveOccurred())
+
+ b, ok := c.TknHolderAddrs["0x1062a747393198f70F71ec65A582423Dba7E5Ab3"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ b, ok = c.TknHolderAddrs["0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ _, ok = c.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843b1234567890"]
+ Expect(ok).To(Equal(false))
+
+ _, ok = c.TknHolderAddrs["0x"]
+ Expect(ok).To(Equal(false))
+ })
+
+ It("Polls given methods using generated token holder address", func() {
+ t := transformer.NewTransformer("", blockChain, db)
+ t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
+ t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
+ err = t.Init()
+ Expect(err).ToNot(HaveOccurred())
+
+ err = t.Execute()
+ Expect(err).ToNot(HaveOccurred())
+
+ res := test_helpers.BalanceOf{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(res.Balance).To(Equal("55849938025000000000000"))
+ Expect(res.TokenName).To(Equal("TrueUSD"))
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res)
+ Expect(err).To(HaveOccurred())
+ })
+
+ It("Fails if initialization has not been done", func() {
+ t := transformer.NewTransformer("", blockChain, db)
+ t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
+ t.SetMethods(constants.TusdContractAddress, nil)
+
+ err = t.Execute()
+ Expect(err).To(HaveOccurred())
+ })
})
})
diff --git a/pkg/omni/shared/contract/contract_test.go b/pkg/omni/shared/contract/contract_test.go
index dc7a5ff6..5d9da792 100644
--- a/pkg/omni/shared/contract/contract_test.go
+++ b/pkg/omni/shared/contract/contract_test.go
@@ -22,6 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Contract", func() {
@@ -38,11 +39,11 @@ var _ = Describe("Contract", func() {
val, ok := info.Filters["Transfer"]
Expect(ok).To(Equal(true))
- Expect(val).To(Equal(test_helpers.ExpectedTransferFilter))
+ Expect(val).To(Equal(mocks.ExpectedTransferFilter))
val, ok = info.Filters["Approval"]
Expect(ok).To(Equal(true))
- Expect(val).To(Equal(test_helpers.ExpectedApprovalFilter))
+ Expect(val).To(Equal(mocks.ExpectedApprovalFilter))
val, ok = info.Filters["Mint"]
Expect(ok).To(Equal(false))
diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go
index 581b8aa9..65a28034 100644
--- a/pkg/omni/shared/helpers/test_helpers/database.go
+++ b/pkg/omni/shared/helpers/test_helpers/database.go
@@ -33,7 +33,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
type TransferLog struct {
@@ -47,6 +47,18 @@ type TransferLog struct {
Value string `db:"value_"`
}
+type LightTransferLog struct {
+ Id int64 `db:"id"`
+ HeaderID int64 `db:"header_id"`
+ TokenName string `db:"token_name"`
+ LogIndex int64 `db:"log_idx"`
+ TxIndex int64 `db:"tx_idx"`
+ From string `db:"from_"`
+ To string `db:"to_"`
+ Value string `db:"value_"`
+ RawLog []byte `db:"raw_log"`
+}
+
type BalanceOf struct {
Id int64 `db:"id"`
TokenName string `db:"token_name"`
@@ -119,8 +131,8 @@ func SetupTusdRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string)
}
func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract {
- p := parser.NewParser("")
- err := p.Parse(constants.TusdContractAddress)
+ p := mocks.NewParser(constants.TusdAbiString)
+ err := p.Parse()
Expect(err).ToNot(HaveOccurred())
return &contract.Contract{
@@ -139,25 +151,40 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract
}
func TearDown(db *postgres.DB) {
- _, err := db.Query(`DELETE FROM blocks`)
+ tx, err := db.Begin()
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DELETE FROM headers`)
+ _, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DELETE FROM checked_headers`)
+ _, err = tx.Exec(`DELETE FROM headers`)
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DELETE FROM logs`)
+ _, err = tx.Exec(`DELETE FROM checked_headers`)
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DELETE FROM transactions`)
+ _, err = tx.Exec(`DELETE FROM logs`)
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DELETE FROM receipts`)
+ _, err = tx.Exec(`DELETE FROM transactions`)
Expect(err).NotTo(HaveOccurred())
- _, err = db.Query(`DROP SCHEMA IF EXISTS c0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
+ _, err = tx.Exec(`DELETE FROM receipts`)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = tx.Exec(`DROP SCHEMA IF EXISTS light_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
+ Expect(err).NotTo(HaveOccurred())
+
+ err = tx.Commit()
Expect(err).NotTo(HaveOccurred())
}
diff --git a/pkg/omni/shared/helpers/test_helpers/entities.go b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go
similarity index 65%
rename from pkg/omni/shared/helpers/test_helpers/entities.go
rename to pkg/omni/shared/helpers/test_helpers/mocks/entities.go
index 5c3c2eb7..be092771 100644
--- a/pkg/omni/shared/helpers/test_helpers/entities.go
+++ b/pkg/omni/shared/helpers/test_helpers/mocks/entities.go
@@ -14,11 +14,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package test_helpers
+package mocks
import (
"encoding/json"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/core/types"
+
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
@@ -126,3 +130,44 @@ var MockHeader3 = core.Header{
Raw: rawFakeHeader,
Timestamp: "50000030",
}
+
+var MockTransferLog1 = types.Log{
+ Index: 1,
+ Address: common.HexToAddress(constants.TusdContractAddress),
+ BlockNumber: 5488076,
+ TxIndex: 110,
+ TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"),
+ Topics: []common.Hash{
+ common.HexToHash(constants.TransferEvent.Signature()),
+ common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
+ common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"),
+ },
+ Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
+}
+
+var MockTransferLog2 = types.Log{
+ Index: 3,
+ Address: common.HexToAddress(constants.TusdContractAddress),
+ BlockNumber: 5488077,
+ TxIndex: 2,
+ TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"),
+ Topics: []common.Hash{
+ common.HexToHash(constants.TransferEvent.Signature()),
+ common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"),
+ common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
+ },
+ Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
+}
+
+var MockMintLog = types.Log{
+ Index: 10,
+ Address: common.HexToAddress(constants.TusdContractAddress),
+ BlockNumber: 548808,
+ TxIndex: 50,
+ TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6minty"),
+ Topics: []common.Hash{
+ common.HexToHash(constants.MintEvent.Signature()),
+ common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
+ },
+ Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
+}
diff --git a/pkg/omni/shared/helpers/mocks/parser.go b/pkg/omni/shared/helpers/test_helpers/mocks/parser.go
similarity index 100%
rename from pkg/omni/shared/helpers/mocks/parser.go
rename to pkg/omni/shared/helpers/test_helpers/mocks/parser.go
diff --git a/pkg/omni/shared/parser/parser_test.go b/pkg/omni/shared/parser/parser_test.go
index 8d74593b..7631294d 100644
--- a/pkg/omni/shared/parser/parser_test.go
+++ b/pkg/omni/shared/parser/parser_test.go
@@ -23,7 +23,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
)
diff --git a/pkg/omni/shared/poller/poller.go b/pkg/omni/shared/poller/poller.go
index 52e0425b..bb8239e9 100644
--- a/pkg/omni/shared/poller/poller.go
+++ b/pkg/omni/shared/poller/poller.go
@@ -42,10 +42,10 @@ type poller struct {
contract contract.Contract
}
-func NewPoller(blockChain core.BlockChain, db *postgres.DB) *poller {
+func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *poller {
return &poller{
- MethodRepository: repository.NewMethodRepository(db),
+ MethodRepository: repository.NewMethodRepository(db, mode),
bc: blockChain,
}
}
diff --git a/pkg/omni/shared/poller/poller_test.go b/pkg/omni/shared/poller/poller_test.go
index a0ba7349..a7357a38 100644
--- a/pkg/omni/shared/poller/poller_test.go
+++ b/pkg/omni/shared/poller/poller_test.go
@@ -28,6 +28,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var _ = Describe("Poller", func() {
@@ -37,78 +38,80 @@ var _ = Describe("Poller", func() {
var db *postgres.DB
var bc core.BlockChain
- BeforeEach(func() {
- db, bc = test_helpers.SetupDBandBC()
- p = poller.NewPoller(bc, db)
- })
-
AfterEach(func() {
test_helpers.TearDown(db)
})
- Describe("PollContract", func() {
- It("Polls specified contract methods using contract's token holder address list", func() {
- con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
- Expect(con.Abi).To(Equal(constants.TusdAbiString))
- con.StartingBlock = 6707322
- con.LastBlock = 6707323
- con.TknHolderAddrs = map[string]bool{
- "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true,
- "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true,
- }
-
- err := p.PollContract(*con)
- Expect(err).ToNot(HaveOccurred())
-
- scanStruct := test_helpers.BalanceOf{}
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
- Expect(err).ToNot(HaveOccurred())
- Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
- Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
- Expect(err).ToNot(HaveOccurred())
- Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
- Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
- Expect(err).ToNot(HaveOccurred())
- Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
- Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
- Expect(err).ToNot(HaveOccurred())
- Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
- Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
+ Describe("Full sync mode", func() {
+ BeforeEach(func() {
+ db, bc = test_helpers.SetupDBandBC()
+ p = poller.NewPoller(bc, db, types.FullSync)
})
- It("Does not poll and persist any methods if none are specified", func() {
- con = test_helpers.SetupTusdContract(nil, nil)
- Expect(con.Abi).To(Equal(constants.TusdAbiString))
- con.StartingBlock = 6707322
- con.LastBlock = 6707323
- con.TknHolderAddrs = map[string]bool{
- "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true,
- "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true,
- }
+ Describe("PollContract", func() {
+ It("Polls specified contract methods using contract's token holder address list", func() {
+ con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
+ Expect(con.Abi).To(Equal(constants.TusdAbiString))
+ con.StartingBlock = 6707322
+ con.LastBlock = 6707323
+ con.TknHolderAddrs = map[string]bool{
+ "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true,
+ "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true,
+ }
- err := p.PollContract(*con)
- Expect(err).ToNot(HaveOccurred())
+ err := p.PollContract(*con)
+ Expect(err).ToNot(HaveOccurred())
- scanStruct := test_helpers.BalanceOf{}
+ scanStruct := test_helpers.BalanceOf{}
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
- Expect(err).To(HaveOccurred())
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
+ Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
+ Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
+ Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
+ Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
+ })
+
+ It("Does not poll and persist any methods if none are specified", func() {
+ con = test_helpers.SetupTusdContract(nil, nil)
+ Expect(con.Abi).To(Equal(constants.TusdAbiString))
+ con.StartingBlock = 6707322
+ con.LastBlock = 6707323
+ con.TknHolderAddrs = map[string]bool{
+ "0xfE9e8709d3215310075d67E3ed32A380CCf451C8": true,
+ "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE": true,
+ }
+
+ err := p.PollContract(*con)
+ Expect(err).ToNot(HaveOccurred())
+
+ scanStruct := test_helpers.BalanceOf{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
+ Expect(err).To(HaveOccurred())
+ })
})
- })
- Describe("PollMethod", func() {
- It("Polls a single contract method", func() {
- var name = new(string)
- err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
- Expect(err).ToNot(HaveOccurred())
- Expect(*name).To(Equal("TrueUSD"))
+ Describe("PollMethod", func() {
+ It("Polls a single contract method", func() {
+ var name = new(string)
+ err := p.FetchContractData(constants.TusdAbiString, constants.TusdContractAddress, "name", nil, &name, 6197514)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(*name).To(Equal("TrueUSD"))
+ })
})
})
})
diff --git a/pkg/omni/shared/repository/event_repository.go b/pkg/omni/shared/repository/event_repository.go
new file mode 100644
index 00000000..b733bcb9
--- /dev/null
+++ b/pkg/omni/shared/repository/event_repository.go
@@ -0,0 +1,316 @@
+// VulcanizeDB
+// Copyright © 2018 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package repository
+
+import (
+ "errors"
+ "fmt"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
+ "strings"
+
+ "github.com/hashicorp/golang-lru"
+
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
+)
+
+const (
+ // Number of contract address and method ids to keep in cache
+ contractCacheSize = 100
+ eventChacheSize = 1000
+)
+
+// Event repository is used to persist event data into custom tables
+type EventRepository interface {
+ PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error
+ CreateEventTable(contractAddr string, event types.Event) (bool, error)
+ CreateContractSchema(contractName string) (bool, error)
+ CheckSchemaCache(key string) (interface{}, bool)
+ CheckTableCache(key string) (interface{}, bool)
+}
+
+type eventRepository struct {
+ db *postgres.DB
+ mode types.Mode
+ schemas *lru.Cache // Cache names of recently used schemas to minimize db connections
+ tables *lru.Cache // Cache names of recently used tables to minimize db connections
+}
+
+func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository {
+ ccs, _ := lru.New(contractCacheSize)
+ ecs, _ := lru.New(eventChacheSize)
+ return &eventRepository{
+ db: db,
+ mode: mode,
+ schemas: ccs,
+ tables: ecs,
+ }
+}
+
+// Creates a schema for the contract if needed
+// Creates table for the watched contract event if needed
+// Persists converted event log data into this custom table
+func (r *eventRepository) PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
+ if logs == nil {
+ return errors.New("event repository error: passed a nil log slice")
+ }
+
+ if len(logs) == 0 {
+ return errors.New("event repository error: passed an empty log slice")
+ }
+ _, err := r.CreateContractSchema(contractAddr)
+ if err != nil {
+ return err
+ }
+
+ _, err = r.CreateEventTable(contractAddr, eventInfo)
+ if err != nil {
+ return err
+ }
+
+ return r.persistLogs(logs, eventInfo, contractAddr, contractName)
+}
+
+func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
+ var err error
+ switch r.mode {
+ case types.LightSync:
+ err = r.persistLightSyncLogs(logs, eventInfo, contractAddr, contractName)
+ case types.FullSync:
+ err = r.persistFullSyncLogs(logs, eventInfo, contractAddr, contractName)
+ default:
+ return errors.New("event repository error: unhandled mode")
+ }
+
+ return err
+}
+
+// Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB)
+func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
+ tx, err := r.db.Begin()
+ if err != nil {
+ return err
+ }
+
+ for _, event := range logs {
+ // Begin pg query string
+ pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name))
+ pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx"
+ el := len(event.Values)
+
+ // Pack the corresponding variables in a slice
+ data := make([]interface{}, 0, 5+el)
+ data = append(data,
+ event.Id,
+ contractName,
+ event.Raw,
+ event.LogIndex,
+ event.TransactionIndex)
+
+ // Iterate over inputs and append name to query string and value to input data
+ for inputName, input := range event.Values {
+ pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
+ data = append(data, input)
+ }
+
+ // For each input entry we created we add its postgres command variable to the string
+ pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5"
+ for i := 0; i < el; i++ {
+ pgStr = pgStr + fmt.Sprintf(", $%d", i+6)
+ }
+ pgStr = pgStr + ")"
+
+ _, err = tx.Exec(pgStr, data...)
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+ }
+
+ eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr)
+ err = repository.MarkHeaderCheckedInTransaction(logs[0].Id, tx, eventId)
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+
+ return tx.Commit()
+}
+
+// Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB)
+func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
+ tx, err := r.db.Begin()
+ if err != nil {
+ return err
+ }
+
+ for _, event := range logs {
+ pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name))
+ pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx"
+ el := len(event.Values)
+
+ data := make([]interface{}, 0, 4+el)
+ data = append(data,
+ event.Id,
+ contractName,
+ event.Block,
+ event.Tx)
+
+ for inputName, input := range event.Values {
+ pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName))
+ data = append(data, input)
+ }
+
+ pgStr = pgStr + ") VALUES ($1, $2, $3, $4"
+ for i := 0; i < el; i++ {
+ pgStr = pgStr + fmt.Sprintf(", $%d", i+5)
+ }
+ pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING"
+
+ _, err = tx.Exec(pgStr, data...)
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+ }
+
+ return tx.Commit()
+}
+
+// Checks for event table and creates it if it does not already exist
+// Returns true if it created a new table; returns false if table already existed
+func (r *eventRepository) CreateEventTable(contractAddr string, event types.Event) (bool, error) {
+ tableID := fmt.Sprintf("%s_%s.%s_event", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(event.Name))
+
+ // Check cache before querying pq to see if table exists
+ _, ok := r.tables.Get(tableID)
+ if ok {
+ return false, nil
+ }
+ tableExists, err := r.checkForTable(contractAddr, event.Name)
+ if err != nil {
+ return false, err
+ }
+
+ if !tableExists {
+ err = r.newEventTable(tableID, event)
+ if err != nil {
+ return false, err
+ }
+ }
+
+ // Add table id to cache
+ r.tables.Add(tableID, true)
+
+ return !tableExists, nil
+}
+
+// Creates a table for the given contract and event
+func (r *eventRepository) newEventTable(tableID string, event types.Event) error {
+ // Begin pg string
+ var pgStr = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID)
+ var err error
+
+ // Handle different modes
+ switch r.mode {
+ case types.FullSync:
+ pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL,"
+
+ // Iterate over event fields, using their name and pgType to grow the string
+ for _, field := range event.Fields {
+ pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
+ }
+ pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)"
+ case types.LightSync:
+ pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"
+
+ for _, field := range event.Fields {
+ pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
+ }
+ pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))"
+ default:
+ return errors.New("unhandled repository mode")
+ }
+
+ _, err = r.db.Exec(pgStr)
+
+ return err
+}
+
+// Checks if a table already exists for the given contract and event
+func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
+ pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_event')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventName))
+
+ var exists bool
+ err := r.db.Get(&exists, pgStr)
+
+ return exists, err
+}
+
+// Checks for contract schema and creates it if it does not already exist
+// Returns true if it created a new schema; returns false if schema already existed
+func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
+ if contractAddr == "" {
+ return false, errors.New("error: no contract address specified")
+ }
+
+ // Check cache before querying pq to see if schema exists
+ _, ok := r.schemas.Get(contractAddr)
+ if ok {
+ return false, nil
+ }
+ schemaExists, err := r.checkForSchema(contractAddr)
+ if err != nil {
+ return false, err
+ }
+ if !schemaExists {
+ err = r.newContractSchema(contractAddr)
+ if err != nil {
+ return false, err
+ }
+ }
+
+ // Add schema name to cache
+ r.schemas.Add(contractAddr, true)
+
+ return !schemaExists, nil
+}
+
+// Creates a schema for the given contract
+func (r *eventRepository) newContractSchema(contractAddr string) error {
+ _, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr))
+
+ return err
+}
+
+// Checks if a schema already exists for the given contract
+func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) {
+ pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr))
+
+ var exists bool
+ err := r.db.QueryRow(pgStr).Scan(&exists)
+
+ return exists, err
+}
+
+func (r *eventRepository) CheckSchemaCache(key string) (interface{}, bool) {
+ return r.schemas.Get(key)
+}
+
+func (r *eventRepository) CheckTableCache(key string) (interface{}, bool) {
+ return r.tables.Get(key)
+}
diff --git a/pkg/omni/shared/repository/event_repository_test.go b/pkg/omni/shared/repository/event_repository_test.go
new file mode 100644
index 00000000..6c90711d
--- /dev/null
+++ b/pkg/omni/shared/repository/event_repository_test.go
@@ -0,0 +1,349 @@
+// VulcanizeDB
+// Copyright © 2018 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package repository_test
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ geth "github.com/ethereum/go-ethereum/core/types"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
+ fc "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
+ lc "github.com/vulcanize/vulcanizedb/pkg/omni/light/converter"
+ lr "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
+ sr "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
+)
+
+var _ = Describe("Repository", func() {
+ var db *postgres.DB
+ var dataStore sr.EventRepository
+ var err error
+ var log *types.Log
+ var logs []types.Log
+ var con *contract.Contract
+ var vulcanizeLogId int64
+ var wantedEvents = []string{"Transfer"}
+ var event types.Event
+ var headerID int64
+ var mockEvent = mocks.MockTranferEvent
+ var mockLog1 = mocks.MockTransferLog1
+ var mockLog2 = mocks.MockTransferLog2
+
+ BeforeEach(func() {
+ db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{})
+ mockEvent.LogID = vulcanizeLogId
+
+ event = con.Events["Transfer"]
+ err = con.GenerateFilters()
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ test_helpers.TearDown(db)
+ })
+
+ Describe("Full sync mode", func() {
+ BeforeEach(func() {
+ dataStore = sr.NewEventRepository(db, types.FullSync)
+ })
+
+ Describe("CreateContractSchema", func() {
+ It("Creates schema if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ _, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(false))
+
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("CreateEventTable", func() {
+ It("Creates table if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ tableID := fmt.Sprintf("%s_%s.%s_event", types.FullSync, strings.ToLower(con.Address), strings.ToLower(event.Name))
+ _, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(false))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("PersistLogs", func() {
+ BeforeEach(func() {
+ c := fc.NewConverter(con)
+ log, err = c.Convert(mockEvent, event)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() {
+ err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
+ Expect(ok).To(Equal(true))
+ Expect(b).To(Equal(true))
+
+ scanLog := test_helpers.TransferLog{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
+ Expect(err).ToNot(HaveOccurred())
+ expectedLog := test_helpers.TransferLog{
+ Id: 1,
+ VulvanizeLogId: vulcanizeLogId,
+ TokenName: "TrueUSD",
+ Block: 5488076,
+ Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
+ From: "0x000000000000000000000000000000000000Af21",
+ To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
+ Value: "1097077688018008265106216665536940668749033598146",
+ }
+ Expect(scanLog).To(Equal(expectedLog))
+ })
+
+ It("Doesn't persist duplicate event logs", func() {
+ // Try to persist the same log twice in a single call
+ err = dataStore.PersistLogs([]types.Log{*log, *log}, event, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ scanLog := test_helpers.TransferLog{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
+ Expect(err).ToNot(HaveOccurred())
+ expectedLog := test_helpers.TransferLog{
+ Id: 1,
+ VulvanizeLogId: vulcanizeLogId,
+ TokenName: "TrueUSD",
+ Block: 5488076,
+ Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
+ From: "0x000000000000000000000000000000000000Af21",
+ To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
+ Value: "1097077688018008265106216665536940668749033598146",
+ }
+ Expect(scanLog).To(Equal(expectedLog))
+
+ // Attempt to persist the same log again in seperate call
+ err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ // Show that no new logs were entered
+ var count int
+ err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM full_%s.transfer_event", constants.TusdContractAddress))
+ Expect(err).ToNot(HaveOccurred())
+ Expect(count).To(Equal(1))
+ })
+
+ It("Fails with empty log", func() {
+ err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+ })
+ })
+ })
+
+ Describe("Light sync mode", func() {
+ BeforeEach(func() {
+ dataStore = sr.NewEventRepository(db, types.LightSync)
+ })
+
+ Describe("CreateContractSchema", func() {
+ It("Creates schema if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ _, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(false))
+
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+
+ It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ tableID := fmt.Sprintf("%s_%s.%s_event", types.LightSync, strings.ToLower(con.Address), strings.ToLower(event.Name))
+ _, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(false))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("CreateEventTable", func() {
+ It("Creates table if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateEventTable(con.Address, event)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+ })
+
+ Describe("PersistLogs", func() {
+ BeforeEach(func() {
+ headerRepository := repositories.NewHeaderRepository(db)
+ headerID, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
+ Expect(err).ToNot(HaveOccurred())
+ c := lc.NewConverter(con)
+ logs, err = c.Convert([]geth.Log{mockLog1, mockLog2}, event, headerID)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("Persists contract event log values into custom tables", func() {
+ hr := lr.NewHeaderRepository(db)
+ err = hr.AddCheckColumn(event.Name + "_" + con.Address)
+ Expect(err).ToNot(HaveOccurred())
+
+ err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ var count int
+ err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress))
+ Expect(err).ToNot(HaveOccurred())
+ Expect(count).To(Equal(2))
+
+ scanLog := test_helpers.LightTransferLog{}
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event LIMIT 1", constants.TusdContractAddress)).StructScan(&scanLog)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(scanLog.HeaderID).To(Equal(headerID))
+ Expect(scanLog.TokenName).To(Equal("TrueUSD"))
+ Expect(scanLog.TxIndex).To(Equal(int64(110)))
+ Expect(scanLog.LogIndex).To(Equal(int64(1)))
+ Expect(scanLog.From).To(Equal("0x000000000000000000000000000000000000Af21"))
+ Expect(scanLog.To).To(Equal("0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"))
+ Expect(scanLog.Value).To(Equal("1097077688018008265106216665536940668749033598146"))
+
+ var expectedRawLog, rawLog geth.Log
+ err = json.Unmarshal(logs[0].Raw, &expectedRawLog)
+ Expect(err).ToNot(HaveOccurred())
+ err = json.Unmarshal(scanLog.RawLog, &rawLog)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(rawLog).To(Equal(expectedRawLog))
+ })
+
+ It("Doesn't persist duplicate event logs", func() {
+ hr := lr.NewHeaderRepository(db)
+ err = hr.AddCheckColumn(event.Name + "_" + con.Address)
+ Expect(err).ToNot(HaveOccurred())
+
+ // Try and fail to persist the same log twice in a single call
+ err = dataStore.PersistLogs([]types.Log{logs[0], logs[0]}, event, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+
+ // Successfuly persist the two unique logs
+ err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ // Try and fail to persist the same logs again in separate call
+ err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+
+ // Show that no new logs were entered
+ var count int
+ err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress))
+ Expect(err).ToNot(HaveOccurred())
+ Expect(count).To(Equal(2))
+ })
+
+ It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() {
+ err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+ })
+
+ It("Fails with empty log", func() {
+ err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+ })
+ })
+ })
+})
diff --git a/pkg/omni/shared/repository/method_repository.go b/pkg/omni/shared/repository/method_repository.go
index ede488d8..101ffaa2 100644
--- a/pkg/omni/shared/repository/method_repository.go
+++ b/pkg/omni/shared/repository/method_repository.go
@@ -21,28 +21,41 @@ import (
"fmt"
"strings"
+ "github.com/hashicorp/golang-lru"
+
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
+const methodCacheSize = 1000
+
type MethodRepository interface {
PersistResult(method types.Result, contractAddr, contractName string) error
CreateMethodTable(contractAddr string, method types.Result) (bool, error)
CreateContractSchema(contractAddr string) (bool, error)
+ CheckSchemaCache(key string) (interface{}, bool)
+ CheckTableCache(key string) (interface{}, bool)
}
type methodRepository struct {
*postgres.DB
+ mode types.Mode
+ schemas *lru.Cache // Cache names of recently used schemas to minimize db connections
+ tables *lru.Cache // Cache names of recently used tables to minimize db connections
}
-func NewMethodRepository(db *postgres.DB) *methodRepository {
-
+func NewMethodRepository(db *postgres.DB, mode types.Mode) *methodRepository {
+ ccs, _ := lru.New(contractCacheSize)
+ mcs, _ := lru.New(methodCacheSize)
return &methodRepository{
- DB: db,
+ DB: db,
+ mode: mode,
+ schemas: ccs,
+ tables: mcs,
}
}
-func (d *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error {
+func (r *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error {
if len(method.Args) != len(method.Inputs) {
return errors.New("error: given number of inputs does not match number of method arguments")
}
@@ -50,51 +63,48 @@ func (d *methodRepository) PersistResult(method types.Result, contractAddr, cont
return errors.New("error: given number of outputs does not match number of method return values")
}
- _, err := d.CreateContractSchema(contractAddr)
+ _, err := r.CreateContractSchema(contractAddr)
if err != nil {
return err
}
- _, err = d.CreateMethodTable(contractAddr, method)
+ _, err = r.CreateMethodTable(contractAddr, method)
if err != nil {
return err
}
- return d.persistResult(method, contractAddr, contractName)
+ return r.persistResult(method, contractAddr, contractName)
}
// Creates a custom postgres command to persist logs for the given event
-func (d *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error {
+func (r *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error {
// Begin postgres string
- pgStr := fmt.Sprintf("INSERT INTO c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name))
+ pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_method ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name))
pgStr = pgStr + "(token_name, block"
+ ml := len(method.Args)
- // Pack the corresponding variables in a slice
- var data []interface{}
+ // Preallocate slice of needed size and proceed to pack variables into it in same order they appear in string
+ data := make([]interface{}, 0, 3+ml)
data = append(data,
contractName,
method.Block)
// Iterate over method args and return value, adding names
// to the string and pushing values to the slice
- counter := 0 // Keep track of number of inputs
for i, arg := range method.Args {
- counter += 1
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(arg.Name)) // Add underscore after to avoid any collisions with reserved pg words
data = append(data, method.Inputs[i])
}
-
- counter += 1
pgStr = pgStr + ", returned) VALUES ($1, $2"
data = append(data, method.Output)
// For each input entry we created we add its postgres command variable to the string
- for i := 0; i < counter; i++ {
+ for i := 0; i <= ml; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+3)
}
pgStr = pgStr + ")"
- _, err := d.DB.Exec(pgStr, data...)
+ _, err := r.DB.Exec(pgStr, data...)
if err != nil {
return err
}
@@ -103,26 +113,35 @@ func (d *methodRepository) persistResult(method types.Result, contractAddr, cont
}
// Checks for event table and creates it if it does not already exist
-func (d *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) {
- tableExists, err := d.checkForTable(contractAddr, method.Name)
+func (r *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) {
+ tableID := fmt.Sprintf("%s_%s.%s_method", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name))
+
+ // Check cache before querying pq to see if table exists
+ _, ok := r.tables.Get(tableID)
+ if ok {
+ return false, nil
+ }
+ tableExists, err := r.checkForTable(contractAddr, method.Name)
if err != nil {
return false, err
}
-
if !tableExists {
- err = d.newMethodTable(contractAddr, method)
+ err = r.newMethodTable(tableID, method)
if err != nil {
return false, err
}
}
+ // Add schema name to cache
+ r.tables.Add(tableID, true)
+
return !tableExists, nil
}
// Creates a table for the given contract and event
-func (d *methodRepository) newMethodTable(contractAddr string, method types.Result) error {
+func (r *methodRepository) newMethodTable(tableID string, method types.Result) error {
// Begin pg string
- pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name))
+ pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID)
pgStr = pgStr + "(id SERIAL, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL,"
// Iterate over method inputs and outputs, using their name and pgType to grow the string
@@ -132,54 +151,69 @@ func (d *methodRepository) newMethodTable(contractAddr string, method types.Resu
pgStr = pgStr + fmt.Sprintf(" returned %s NOT NULL)", method.Return[0].PgType)
- _, err := d.DB.Exec(pgStr)
+ _, err := r.DB.Exec(pgStr)
return err
}
// Checks if a table already exists for the given contract and event
-func (d *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_method')", strings.ToLower(contractAddr), strings.ToLower(methodName))
+func (r *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) {
+ pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_method')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(methodName))
var exists bool
- err := d.DB.Get(&exists, pgStr)
+ err := r.DB.Get(&exists, pgStr)
return exists, err
}
// Checks for contract schema and creates it if it does not already exist
-func (d *methodRepository) CreateContractSchema(contractAddr string) (bool, error) {
+func (r *methodRepository) CreateContractSchema(contractAddr string) (bool, error) {
if contractAddr == "" {
return false, errors.New("error: no contract address specified")
}
- schemaExists, err := d.checkForSchema(contractAddr)
+ // Check cache before querying pq to see if schema exists
+ _, ok := r.schemas.Get(contractAddr)
+ if ok {
+ return false, nil
+ }
+ schemaExists, err := r.checkForSchema(contractAddr)
if err != nil {
return false, err
}
-
if !schemaExists {
- err = d.newContractSchema(contractAddr)
+ err = r.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
+ // Add schema name to cache
+ r.schemas.Add(contractAddr, true)
+
return !schemaExists, nil
}
// Creates a schema for the given contract
-func (d *methodRepository) newContractSchema(contractAddr string) error {
- _, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr))
+func (r *methodRepository) newContractSchema(contractAddr string) error {
+ _, err := r.DB.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr))
return err
}
// Checks if a schema already exists for the given contract
-func (d *methodRepository) checkForSchema(contractAddr string) (bool, error) {
- pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr))
+func (r *methodRepository) checkForSchema(contractAddr string) (bool, error) {
+ pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr))
var exists bool
- err := d.DB.Get(&exists, pgStr)
+ err := r.DB.Get(&exists, pgStr)
return exists, err
}
+
+func (r *methodRepository) CheckSchemaCache(key string) (interface{}, bool) {
+ return r.schemas.Get(key)
+}
+
+func (r *methodRepository) CheckTableCache(key string) (interface{}, bool) {
+ return r.tables.Get(key)
+}
diff --git a/pkg/omni/shared/repository/method_repository_test.go b/pkg/omni/shared/repository/method_repository_test.go
index 80c93e53..e8bc22bf 100644
--- a/pkg/omni/shared/repository/method_repository_test.go
+++ b/pkg/omni/shared/repository/method_repository_test.go
@@ -18,6 +18,7 @@ package repository_test
import (
"fmt"
+ "strings"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -36,10 +37,11 @@ var _ = Describe("Repository", func() {
var con *contract.Contract
var err error
var mockResult types.Result
+ var method types.Method
BeforeEach(func() {
con = test_helpers.SetupTusdContract([]string{}, []string{"balanceOf"})
- method := con.Methods["balanceOf"]
+ method = con.Methods["balanceOf"]
mockResult = types.Result{
Method: method,
PgType: method.Return[0].PgType,
@@ -50,62 +52,188 @@ var _ = Describe("Repository", func() {
mockResult.Inputs[0] = "0xfE9e8709d3215310075d67E3ed32A380CCf451C8"
mockResult.Output = "66386309548896882859581786"
db, _ = test_helpers.SetupDBandBC()
- dataStore = repository.NewMethodRepository(db)
+ dataStore = repository.NewMethodRepository(db, types.FullSync)
})
AfterEach(func() {
test_helpers.TearDown(db)
})
- Describe("CreateContractSchema", func() {
- It("Creates schema if it doesn't exist", func() {
- created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
+ Describe("Full Sync Mode", func() {
+ BeforeEach(func() {
+ dataStore = repository.NewMethodRepository(db, types.FullSync)
+ })
- created, err = dataStore.CreateContractSchema(constants.TusdContractAddress)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(false))
+ Describe("CreateContractSchema", func() {
+ It("Creates schema if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ _, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(false))
+
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("CreateMethodTable", func() {
+ It("Creates table if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ tableID := fmt.Sprintf("%s_%s.%s_method", types.FullSync, strings.ToLower(con.Address), strings.ToLower(method.Name))
+ _, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(false))
+
+ created, err = dataStore.CreateMethodTable(con.Address, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("PersistResult", func() {
+ It("Persists result from method polling in custom pg table", func() {
+ err = dataStore.PersistResult(mockResult, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ scanStruct := test_helpers.BalanceOf{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
+ expectedLog := test_helpers.BalanceOf{
+ Id: 1,
+ TokenName: "TrueUSD",
+ Block: 6707323,
+ Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8",
+ Balance: "66386309548896882859581786",
+ }
+ Expect(scanStruct).To(Equal(expectedLog))
+ })
+
+ It("Fails with empty result", func() {
+ err = dataStore.PersistResult(types.Result{}, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+ })
})
})
- Describe("CreateMethodTable", func() {
- It("Creates table if it doesn't exist", func() {
- created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
-
- created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(true))
-
- created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
- Expect(err).ToNot(HaveOccurred())
- Expect(created).To(Equal(false))
- })
- })
-
- Describe("PersistResult", func() {
- It("Persists result from method polling in custom pg table", func() {
- err = dataStore.PersistResult(mockResult, con.Address, con.Name)
- Expect(err).ToNot(HaveOccurred())
-
- scanStruct := test_helpers.BalanceOf{}
-
- err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
- expectedLog := test_helpers.BalanceOf{
- Id: 1,
- TokenName: "TrueUSD",
- Block: 6707323,
- Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8",
- Balance: "66386309548896882859581786",
- }
- Expect(scanStruct).To(Equal(expectedLog))
+ Describe("Light Sync Mode", func() {
+ BeforeEach(func() {
+ dataStore = repository.NewMethodRepository(db, types.LightSync)
})
- It("Fails with empty result", func() {
- err = dataStore.PersistResult(types.Result{}, con.Address, con.Name)
- Expect(err).To(HaveOccurred())
+ Describe("CreateContractSchema", func() {
+ It("Creates schema if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ _, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(false))
+
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckSchemaCache(con.Address)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("CreateMethodTable", func() {
+ It("Creates table if it doesn't exist", func() {
+ created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(false))
+ })
+
+ It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
+ created, err := dataStore.CreateContractSchema(con.Address)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ tableID := fmt.Sprintf("%s_%s.%s_method", types.LightSync, strings.ToLower(con.Address), strings.ToLower(method.Name))
+ _, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(false))
+
+ created, err = dataStore.CreateMethodTable(con.Address, mockResult)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(created).To(Equal(true))
+
+ v, ok := dataStore.CheckTableCache(tableID)
+ Expect(ok).To(Equal(true))
+ Expect(v).To(Equal(true))
+ })
+ })
+
+ Describe("PersistResult", func() {
+ It("Persists result from method polling in custom pg table for light sync mode vDB", func() {
+ err = dataStore.PersistResult(mockResult, con.Address, con.Name)
+ Expect(err).ToNot(HaveOccurred())
+
+ scanStruct := test_helpers.BalanceOf{}
+
+ err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
+ expectedLog := test_helpers.BalanceOf{
+ Id: 1,
+ TokenName: "TrueUSD",
+ Block: 6707323,
+ Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8",
+ Balance: "66386309548896882859581786",
+ }
+ Expect(scanStruct).To(Equal(expectedLog))
+ })
+
+ It("Fails with empty result", func() {
+ err = dataStore.PersistResult(types.Result{}, con.Address, con.Name)
+ Expect(err).To(HaveOccurred())
+ })
})
})
})
diff --git a/pkg/omni/full/retriever/address_retriever.go b/pkg/omni/shared/retriever/address_retriever.go
similarity index 86%
rename from pkg/omni/full/retriever/address_retriever.go
rename to pkg/omni/shared/retriever/address_retriever.go
index 40609279..f56d448f 100644
--- a/pkg/omni/full/retriever/address_retriever.go
+++ b/pkg/omni/shared/retriever/address_retriever.go
@@ -18,6 +18,7 @@ package retriever
import (
"fmt"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
"strings"
"github.com/ethereum/go-ethereum/accounts/abi"
@@ -28,19 +29,19 @@ import (
)
// Address retriever is used to retrieve the addresses associated with a contract
-// It requires a vDB synced database with blocks, transactions, receipts, logs,
-// AND all of the targeted events persisted
type AddressRetriever interface {
RetrieveTokenHolderAddresses(info contract.Contract) (map[common.Address]bool, error)
}
type addressRetriever struct {
- db *postgres.DB
+ db *postgres.DB
+ mode types.Mode
}
-func NewAddressRetriever(db *postgres.DB) (r *addressRetriever) {
+func NewAddressRetriever(db *postgres.DB, mode types.Mode) (r *addressRetriever) {
return &addressRetriever{
- db: db,
+ db: db,
+ mode: mode,
}
}
@@ -84,7 +85,7 @@ func (r *addressRetriever) retrieveTransferAddresses(con contract.Contract) ([]s
if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses
addrs := make([]string, 0)
- pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name))
+ pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name))
err := r.db.Select(&addrs, pgStr)
if err != nil {
return []string{}, err
@@ -105,7 +106,7 @@ func (r *addressRetriever) retrieveTokenMintees(con contract.Contract) ([]string
if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses
addrs := make([]string, 0)
- pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name))
+ pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name))
err := r.db.Select(&addrs, pgStr)
if err != nil {
return []string{}, err
diff --git a/pkg/omni/full/retriever/address_retriever_test.go b/pkg/omni/shared/retriever/address_retriever_test.go
similarity index 92%
rename from pkg/omni/full/retriever/address_retriever_test.go
rename to pkg/omni/shared/retriever/address_retriever_test.go
index daa91d50..48f45531 100644
--- a/pkg/omni/full/retriever/address_retriever_test.go
+++ b/pkg/omni/shared/retriever/address_retriever_test.go
@@ -20,16 +20,16 @@ import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
- "github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
- "github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/retriever"
+ "github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var mockEvent = core.WatchedEvent{
@@ -68,11 +68,11 @@ var _ = Describe("Address Retriever Test", func() {
log, err = c.Convert(mockEvent, event)
Expect(err).ToNot(HaveOccurred())
- dataStore = repository.NewEventRepository(db)
- dataStore.PersistLog(*log, info.Address, info.Name)
+ dataStore = repository.NewEventRepository(db, types.FullSync)
+ dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name)
Expect(err).ToNot(HaveOccurred())
- r = retriever.NewAddressRetriever(db)
+ r = retriever.NewAddressRetriever(db, types.FullSync)
})
AfterEach(func() {
diff --git a/pkg/omni/full/repository/repository_suite_test.go b/pkg/omni/shared/retriever/retriever_suite_test.go
similarity index 89%
rename from pkg/omni/full/repository/repository_suite_test.go
rename to pkg/omni/shared/retriever/retriever_suite_test.go
index 899f5b04..b46c31fd 100644
--- a/pkg/omni/full/repository/repository_suite_test.go
+++ b/pkg/omni/shared/retriever/retriever_suite_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package repository_test
+package retriever_test
import (
"io/ioutil"
@@ -25,9 +25,9 @@ import (
. "github.com/onsi/gomega"
)
-func TestRepository(t *testing.T) {
+func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Full Repository Suite Test")
+ RunSpecs(t, "Address Retriever Suite Test")
}
var _ = BeforeSuite(func() {
diff --git a/pkg/omni/shared/types/event.go b/pkg/omni/shared/types/event.go
index a2b1da84..5a80bb97 100644
--- a/pkg/omni/shared/types/event.go
+++ b/pkg/omni/shared/types/event.go
@@ -21,7 +21,6 @@ import (
"strings"
"github.com/ethereum/go-ethereum/accounts/abi"
- "github.com/i-norden/go-ethereum/core/types"
)
type Event struct {
@@ -37,7 +36,6 @@ type Field struct {
// Struct to hold instance of an event log data
type Log struct {
- Event
Id int64 // VulcanizeIdLog for full sync and header ID for light sync omni watcher
Values map[string]string // Map of event input names to their values
@@ -48,7 +46,7 @@ type Log struct {
// Used for lightSync only
LogIndex uint
TransactionIndex uint
- Raw types.Log
+ Raw []byte // json.Unmarshalled byte array of geth/core/types.Log{}
}
// Unpack abi.Event into our custom Event struct
diff --git a/pkg/omni/shared/types/mode.go b/pkg/omni/shared/types/mode.go
new file mode 100644
index 00000000..71e5c531
--- /dev/null
+++ b/pkg/omni/shared/types/mode.go
@@ -0,0 +1,64 @@
+// VulcanizeDB
+// Copyright © 2018 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package types
+
+import "fmt"
+
+type Mode int
+
+const (
+ LightSync Mode = iota
+ FullSync
+)
+
+func (mode Mode) IsValid() bool {
+ return mode >= LightSync && mode <= FullSync
+}
+
+func (mode Mode) String() string {
+ switch mode {
+ case LightSync:
+ return "light"
+ case FullSync:
+ return "full"
+ default:
+ return "unknown"
+ }
+}
+
+func (mode Mode) MarshalText() ([]byte, error) {
+ switch mode {
+ case LightSync:
+ return []byte("light"), nil
+ case FullSync:
+ return []byte("full"), nil
+ default:
+ return nil, fmt.Errorf("omni watcher: unknown mode %d, want LightSync or FullSync", mode)
+ }
+}
+
+func (mode *Mode) UnmarshalText(text []byte) error {
+ switch string(text) {
+ case "light":
+ *mode = LightSync
+ case "full":
+ *mode = FullSync
+ default:
+ return fmt.Errorf(`omni watcher: unknown mode %q, want "light" or "full"`, text)
+ }
+ return nil
+}