finishing porting omni watcher to work with light sync; split into full, light,

and shared directories and refactor as much into shared; finish
lightSync omni watcher tests
This commit is contained in:
Ian Norden 2018-11-23 22:26:07 -06:00
parent 58b34548f3
commit e02b33547d
38 changed files with 1715 additions and 890 deletions

View File

@ -9,10 +9,10 @@
Vulcanize DB is a set of tools that make it easier for developers to write application-specific indexes and caches for dapps built on Ethereum.
## Dependencies
- Go 1.9+
- Go 1.11+
- Postgres 10
- Ethereum Node
- [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8+)
- [Go Ethereum](https://ethereum.github.io/go-ethereum/downloads/) (1.8.18+)
- [Parity 1.8.11+](https://github.com/paritytech/parity/releases)
## Project Setup

View File

@ -17,11 +17,8 @@
package cmd
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/spf13/cobra"
@ -59,29 +56,6 @@ func lightOmniWatcher() {
log.Fatal("Contract address required")
}
if len(contractEvents) == 0 || len(contractMethods) == 0 {
var str string
for str != "y" {
reader := bufio.NewReader(os.Stdin)
if len(contractEvents) == 0 && len(contractMethods) == 0 {
fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ")
} else if len(contractEvents) == 0 {
fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ")
} else {
fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ")
}
resp, err := reader.ReadBytes('\n')
if err != nil {
log.Fatal(err)
}
str = strings.ToLower(string(resp))
if str == "n" {
return
}
}
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@ -114,13 +88,13 @@ func lightOmniWatcher() {
func init() {
rootCmd.AddCommand(lightOmniWatcherCmd)
omniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for")
omniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address")
omniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched")
omniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled")
omniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses")
omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
lightOmniWatcherCmd.Flags().StringVarP(&contractAddress, "contract-address", "a", "", "Single address to generate watchers for")
lightOmniWatcherCmd.Flags().StringArrayVarP(&contractAddresses, "contract-addresses", "l", []string{}, "list of addresses to use; warning: watcher targets the same events and methods for each address")
lightOmniWatcherCmd.Flags().StringArrayVarP(&contractEvents, "contract-events", "e", []string{}, "Subset of events to watch; by default all events are watched")
lightOmniWatcherCmd.Flags().StringArrayVarP(&contractMethods, "contract-methods", "m", nil, "Subset of methods to poll; by default no methods are polled")
lightOmniWatcherCmd.Flags().StringArrayVarP(&eventAddrs, "event-filter-addresses", "f", []string{}, "Account addresses to persist event data for; default is to persist for all found token holder addresses")
lightOmniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
lightOmniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
lightOmniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
lightOmniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
}

View File

@ -17,11 +17,8 @@
package cmd
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/spf13/cobra"
@ -59,29 +56,6 @@ func omniWatcher() {
log.Fatal("Contract address required")
}
if len(contractEvents) == 0 || len(contractMethods) == 0 {
var str string
for str != "y" {
reader := bufio.NewReader(os.Stdin)
if len(contractEvents) == 0 && len(contractMethods) == 0 {
fmt.Print("Warning: no events or methods specified.\n Proceed to watch every event and poll no methods? (Y/n)\n> ")
} else if len(contractEvents) == 0 {
fmt.Print("Warning: no events specified.\n Proceed to watch every event? (Y/n)\n> ")
} else {
fmt.Print("Warning: no methods specified.\n Proceed to poll no methods? (Y/n)\n> ")
}
resp, err := reader.ReadBytes('\n')
if err != nil {
log.Fatal(err)
}
str = strings.ToLower(string(resp))
if str == "n" {
return
}
}
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@ -122,5 +96,5 @@ func init() {
omniWatcherCmd.Flags().StringArrayVarP(&methodAddrs, "method-filter-addresses", "g", []string{}, "Account addresses to poll methods with; default is to poll with all found token holder addresses")
omniWatcherCmd.Flags().StringVarP(&network, "network", "n", "", `Network the contract is deployed on; options: "ropsten", "kovan", and "rinkeby"; default is mainnet"`)
omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block to begin watching- default is first block the contract exists")
omniWatcherCmd.Flags().Int64VarP(&startingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
omniWatcherCmd.Flags().Int64VarP(&endingBlockNumber, "ending-block-number", "d", -1, "Block to end watching- default is most recent block")
}

View File

@ -1,5 +1,4 @@
CREATE TABLE public.checked_headers (
id SERIAL PRIMARY KEY,
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
price_feeds_checked BOOLEAN NOT NULL DEFAULT FALSE
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE
);

BIN
pkg/omni/.DS_Store vendored

Binary file not shown.

View File

@ -74,22 +74,25 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (
// Postgres cannot handle custom types, resolve to strings
switch input.(type) {
case *big.Int:
var b *big.Int
b = input.(*big.Int)
b := input.(*big.Int)
strValues[fieldName] = b.String()
case common.Address:
var a common.Address
a = input.(common.Address)
a := input.(common.Address)
strValues[fieldName] = a.String()
c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses
case common.Hash:
var h common.Hash
h = input.(common.Hash)
h := input.(common.Hash)
strValues[fieldName] = h.String()
case string:
strValues[fieldName] = input.(string)
case bool:
strValues[fieldName] = strconv.FormatBool(input.(bool))
case []byte:
b := input.([]byte)
strValues[fieldName] = string(b)
case byte:
b := input.(byte)
strValues[fieldName] = string(b)
default:
return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input))
}
@ -98,7 +101,6 @@ func (c *converter) Convert(watchedEvent core.WatchedEvent, event types.Event) (
// Only hold onto logs that pass our address filter, if any
if c.ContractInfo.PassesEventFilter(strValues) {
eventLog := &types.Log{
Event: event,
Id: watchedEvent.LogID,
Values: strValues,
Block: watchedEvent.BlockNumber,

View File

@ -25,40 +25,41 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Converter", func() {
var info *contract.Contract
var con *contract.Contract
var wantedEvents = []string{"Transfer"}
var err error
BeforeEach(func() {
info = test_helpers.SetupTusdContract(wantedEvents, []string{})
con = test_helpers.SetupTusdContract(wantedEvents, []string{})
})
Describe("Update", func() {
It("Updates contract info held by the converter", func() {
c := converter.NewConverter(info)
Expect(c.ContractInfo).To(Equal(info))
It("Updates contract con held by the converter", func() {
c := converter.NewConverter(con)
Expect(c.ContractInfo).To(Equal(con))
info := test_helpers.SetupTusdContract([]string{}, []string{})
c.Update(info)
Expect(c.ContractInfo).To(Equal(info))
con := test_helpers.SetupTusdContract([]string{}, []string{})
c.Update(con)
Expect(c.ContractInfo).To(Equal(con))
})
})
Describe("Convert", func() {
It("Converts a watched event log to mapping of event input names to values", func() {
_, ok := info.Events["Approval"]
_, ok := con.Events["Approval"]
Expect(ok).To(Equal(false))
event, ok := info.Events["Transfer"]
event, ok := con.Events["Transfer"]
Expect(ok).To(Equal(true))
err = info.GenerateFilters()
err = con.GenerateFilters()
Expect(err).ToNot(HaveOccurred())
c := converter.NewConverter(info)
log, err := c.Convert(test_helpers.MockTranferEvent, event)
c := converter.NewConverter(con)
log, err := c.Convert(mocks.MockTranferEvent, event)
Expect(err).ToNot(HaveOccurred())
from := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")
@ -72,10 +73,36 @@ var _ = Describe("Converter", func() {
Expect(v).To(Equal(value.String()))
})
It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() {
event, ok := con.Events["Transfer"]
Expect(ok).To(Equal(true))
c := converter.NewConverter(con)
_, err := c.Convert(mocks.MockTranferEvent, event)
Expect(err).ToNot(HaveOccurred())
b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = con.TknHolderAddrs["0x"]
Expect(ok).To(Equal(false))
_, ok = con.TknHolderAddrs[""]
Expect(ok).To(Equal(false))
_, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"]
Expect(ok).To(Equal(false))
})
It("Fails with an empty contract", func() {
event := info.Events["Transfer"]
event := con.Events["Transfer"]
c := converter.NewConverter(&contract.Contract{})
_, err = c.Convert(test_helpers.MockTranferEvent, event)
_, err = c.Convert(mocks.MockTranferEvent, event)
Expect(err).To(HaveOccurred())
})
})

View File

@ -1,182 +0,0 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
"errors"
"fmt"
"strings"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Event repository is used to persist event data into custom tables
type EventRepository interface {
PersistLog(event types.Log, contractAddr, contractName string) error
CreateEventTable(contractName string, event types.Log) (bool, error)
CreateContractSchema(contractName string) (bool, error)
}
type eventRepository struct {
db *postgres.DB
}
func NewEventRepository(db *postgres.DB) *eventRepository {
return &eventRepository{
db: db,
}
}
// Creates a schema for the contract if needed
// Creates table for the watched contract event if needed
// Persists converted event log data into this custom table
func (d *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error {
_, err := d.CreateContractSchema(contractAddr)
if err != nil {
return err
}
_, err = d.CreateEventTable(contractAddr, event)
if err != nil {
return err
}
return d.persistLog(event, contractAddr, contractName)
}
// Creates a custom postgres command to persist logs for the given event
func (d *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error {
// Begin postgres string
pgStr := fmt.Sprintf("INSERT INTO c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx"
// Pack the corresponding variables in a slice
var data []interface{}
data = append(data,
event.Id,
contractName,
event.Block,
event.Tx)
// Iterate over name-value pairs in the log adding
// names to the string and pushing values to the slice
counter := 0 // Keep track of number of inputs
for inputName, input := range event.Values {
counter += 1
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
data = append(data, input)
}
// Finish off the string and execute the command using the packed data
// For each input entry we created we add its postgres command variable to the string
pgStr = pgStr + ") VALUES ($1, $2, $3, $4"
for i := 0; i < counter; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+5)
}
pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING"
_, err := d.db.Exec(pgStr, data...)
if err != nil {
return err
}
return nil
}
// Checks for event table and creates it if it does not already exist
func (d *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) {
tableExists, err := d.checkForTable(contractAddr, event.Name)
if err != nil {
return false, err
}
if !tableExists {
err = d.newEventTable(contractAddr, event)
if err != nil {
return false, err
}
}
return !tableExists, nil
}
// Creates a table for the given contract and event
func (d *eventRepository) newEventTable(contractAddr string, event types.Log) error {
// Begin pg string
pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL,"
// Iterate over event fields, using their name and pgType to grow the string
for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
}
pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)"
_, err := d.db.Exec(pgStr)
return err
}
// Checks if a table already exists for the given contract and event
func (d *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName))
var exists bool
err := d.db.Get(&exists, pgStr)
return exists, err
}
// Checks for contract schema and creates it if it does not already exist
func (d *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
if contractAddr == "" {
return false, errors.New("error: no contract address specified")
}
schemaExists, err := d.checkForSchema(contractAddr)
if err != nil {
return false, err
}
if !schemaExists {
err = d.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
return !schemaExists, nil
}
// Creates a schema for the given contract
func (d *eventRepository) newContractSchema(contractAddr string) error {
_, err := d.db.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr))
return err
}
// Checks if a schema already exists for the given contract
func (d *eventRepository) checkForSchema(contractAddr string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr))
var exists bool
err := d.db.QueryRow(pgStr).Scan(&exists)
return exists, err
}

View File

@ -1,171 +0,0 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test
import (
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var mockEvent = core.WatchedEvent{
Name: constants.TransferEvent.String(),
BlockNumber: 5488076,
Address: constants.TusdContractAddress,
TxHash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
Index: 110,
Topic0: constants.TransferEvent.Signature(),
Topic1: "0x000000000000000000000000000000000000000000000000000000000000af21",
Topic2: "0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391",
Topic3: "",
Data: "0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe",
}
var _ = Describe("Repository", func() {
var db *postgres.DB
var dataStore repository.EventRepository
var err error
var log *types.Log
var con *contract.Contract
var vulcanizeLogId int64
var wantedEvents = []string{"Transfer"}
var event types.Event
BeforeEach(func() {
db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{})
mockEvent.LogID = vulcanizeLogId
event = con.Events["Transfer"]
err = con.GenerateFilters()
Expect(err).ToNot(HaveOccurred())
c := converter.NewConverter(con)
log, err = c.Convert(mockEvent, event)
Expect(err).ToNot(HaveOccurred())
dataStore = repository.NewEventRepository(db)
})
AfterEach(func() {
test_helpers.TearDown(db)
})
Describe("CreateContractSchema", func() {
It("Creates schema if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
})
Describe("CreateEventTable", func() {
It("Creates table if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, *log)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, *log)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
})
Describe("PersistLog", func() {
It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() {
err = dataStore.PersistLog(*log, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
scanLog := test_helpers.TransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
expectedLog := test_helpers.TransferLog{
Id: 1,
VulvanizeLogId: vulcanizeLogId,
TokenName: "TrueUSD",
Block: 5488076,
Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
From: "0x000000000000000000000000000000000000Af21",
To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
Value: "1097077688018008265106216665536940668749033598146",
}
Expect(scanLog).To(Equal(expectedLog))
})
It("Doesn't persist duplicate event logs", func() {
// Perist once
err = dataStore.PersistLog(*log, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
scanLog := test_helpers.TransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
expectedLog := test_helpers.TransferLog{
Id: 1,
VulvanizeLogId: vulcanizeLogId,
TokenName: "TrueUSD",
Block: 5488076,
Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
From: "0x000000000000000000000000000000000000Af21",
To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
Value: "1097077688018008265106216665536940668749033598146",
}
Expect(scanLog).To(Equal(expectedLog))
// Attempt to persist the same log again
err = dataStore.PersistLog(*log, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Show that no new logs were entered
var count int
err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM c%s.transfer_event", constants.TusdContractAddress))
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(1))
})
It("Fails with empty log", func() {
err = dataStore.PersistLog(types.Log{}, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
})
})

View File

@ -27,7 +27,7 @@ import (
func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Full Retriever Suite Test")
RunSpecs(t, "Full Block Number Retriever Suite Test")
}
var _ = BeforeSuite(func() {

View File

@ -19,18 +19,17 @@ package transformer
import (
"errors"
"fmt"
"log"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Requires a fully synced vDB and a running eth node (or infura)
@ -71,14 +70,14 @@ type transformer struct {
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(network string, BC core.BlockChain, DB *postgres.DB) *transformer {
return &transformer{
Poller: poller.NewPoller(BC, DB),
Poller: poller.NewPoller(BC, DB, types.FullSync),
Parser: parser.NewParser(network),
BlockRetriever: retriever.NewBlockRetriever(DB),
Converter: converter.NewConverter(&contract.Contract{}),
Contracts: map[string]*contract.Contract{},
WatchedEventRepository: repositories.WatchedEventRepository{DB: DB},
FilterRepository: repositories.FilterRepository{DB: DB},
EventRepository: repository.NewEventRepository(DB),
EventRepository: repository.NewEventRepository(DB, types.FullSync),
WatchedEvents: map[string][]string{},
WantedMethods: map[string][]string{},
ContractRanges: map[string][2]int64{},
@ -184,27 +183,26 @@ func (tr transformer) Execute() error {
tr.Update(con)
// Iterate through contract filters and get watched event logs
for eventName, filter := range con.Filters {
for eventName := range con.Filters {
watchedEvents, err := tr.GetWatchedEvents(eventName)
if err != nil {
log.Println(fmt.Sprintf("Error fetching events for %s:", filter.Name), err)
return err
}
// Iterate over watched event logs
for _, we := range watchedEvents {
// Convert them to our custom log type
log, err := tr.Converter.Convert(*we, con.Events[eventName])
cstm, err := tr.Converter.Convert(*we, con.Events[eventName])
if err != nil {
return err
}
if log == nil {
break
if cstm == nil {
continue
}
// If log is not empty, immediately persist in repo
// Run this in seperate goroutine?
err = tr.PersistLog(*log, con.Address, con.Name)
err = tr.PersistLogs([]types.Log{*cstm}, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
}

View File

@ -30,6 +30,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/full/transformer"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Transformer", func() {
@ -95,8 +96,8 @@ var _ = Describe("Transformer", func() {
Describe("Init", func() {
It("Initializes transformer's contract objects", func() {
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@ -120,8 +121,8 @@ var _ = Describe("Transformer", func() {
})
It("Does nothing if watched events are unset", func() {
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
t := transformer.NewTransformer("", blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
@ -133,8 +134,8 @@ var _ = Describe("Transformer", func() {
Describe("Execute", func() {
BeforeEach(func() {
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock1)
blockRepository.CreateOrUpdateBlock(test_helpers.TransferBlock2)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock1)
blockRepository.CreateOrUpdateBlock(mocks.TransferBlock2)
})
It("Transforms watched contract data into custom repositories", func() {
@ -148,7 +149,7 @@ var _ = Describe("Transformer", func() {
Expect(err).ToNot(HaveOccurred())
log := test_helpers.TransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event WHERE block = 6194634", constants.TusdContractAddress)).StructScan(&log)
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(log.Tx).To(Equal("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee"))
@ -198,12 +199,12 @@ var _ = Describe("Transformer", func() {
res := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x000000000000000000000000000000000000Af21' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).ToNot(HaveOccurred())
Expect(res.Balance).To(Equal("0"))
Expect(res.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6194634'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).To(HaveOccurred())
})

View File

@ -17,16 +17,22 @@
package converter
import (
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
geth "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
type Converter interface {
Convert(log geth.Log, event types.Event) (*types.Log, error)
Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error)
Update(info *contract.Contract)
}
@ -45,6 +51,66 @@ func (c *converter) Update(info *contract.Contract) {
}
// Convert the given watched event log into a types.Log for the given event
func (c *converter) Convert(log geth.Log, event types.Event) (*types.Log, error) {
return nil, errors.New("implement me")
func (c *converter) Convert(logs []gethTypes.Log, event types.Event, headerID int64) ([]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
returnLogs := make([]types.Log, 0, len(logs))
for _, log := range logs {
values := make(map[string]interface{})
for _, field := range event.Fields {
var i interface{}
values[field.Name] = i
}
err := contract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}
strValues := make(map[string]string, len(values))
for fieldName, input := range values {
// Postgres cannot handle custom types, resolve everything to strings
switch input.(type) {
case *big.Int:
b := input.(*big.Int)
strValues[fieldName] = b.String()
case common.Address:
a := input.(common.Address)
strValues[fieldName] = a.String()
c.ContractInfo.AddTokenHolderAddress(a.String()) // cache address in a list of contract's token holder addresses
case common.Hash:
h := input.(common.Hash)
strValues[fieldName] = h.String()
case string:
strValues[fieldName] = input.(string)
case bool:
strValues[fieldName] = strconv.FormatBool(input.(bool))
case []byte:
b := input.([]byte)
strValues[fieldName] = string(b)
case byte:
b := input.(byte)
strValues[fieldName] = string(b)
default:
return nil, errors.New(fmt.Sprintf("error: unhandled abi type %T", input))
}
}
// Only hold onto logs that pass our address filter, if any
if c.ContractInfo.PassesEventFilter(strValues) {
raw, err := json.Marshal(log)
if err != nil {
return nil, err
}
returnLogs = append(returnLogs, types.Log{
LogIndex: log.Index,
Values: strValues,
Raw: raw,
TransactionIndex: log.TxIndex,
Id: headerID,
})
}
}
return returnLogs, nil
}

View File

@ -15,3 +15,98 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package converter_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Converter", func() {
var con *contract.Contract
var wantedEvents = []string{"Transfer", "Mint"}
var err error
BeforeEach(func() {
con = test_helpers.SetupTusdContract(wantedEvents, []string{})
})
Describe("Update", func() {
It("Updates contract info held by the converter", func() {
c := converter.NewConverter(con)
Expect(c.ContractInfo).To(Equal(con))
info := test_helpers.SetupTusdContract([]string{}, []string{})
c.Update(info)
Expect(c.ContractInfo).To(Equal(info))
})
})
Describe("Convert", func() {
It("Converts a watched event log to mapping of event input names to values", func() {
_, ok := con.Events["Approval"]
Expect(ok).To(Equal(false))
event, ok := con.Events["Transfer"]
Expect(ok).To(Equal(true))
c := converter.NewConverter(con)
logs, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(2))
sender1 := common.HexToAddress("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391")
sender2 := common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")
value := helpers.BigFromString("1097077688018008265106216665536940668749033598146")
Expect(logs[0].Values["to"]).To(Equal(sender1.String()))
Expect(logs[0].Values["from"]).To(Equal(sender2.String()))
Expect(logs[0].Values["value"]).To(Equal(value.String()))
Expect(logs[0].Id).To(Equal(int64(232)))
Expect(logs[1].Values["to"]).To(Equal(sender2.String()))
Expect(logs[1].Values["from"]).To(Equal(sender1.String()))
Expect(logs[1].Values["value"]).To(Equal(value.String()))
Expect(logs[1].Id).To(Equal(int64(232)))
})
It("Keeps track of addresses it sees to grow a token holder address list for the contract", func() {
event, ok := con.Events["Transfer"]
Expect(ok).To(Equal(true))
c := converter.NewConverter(con)
_, err := c.Convert([]types.Log{mocks.MockTransferLog1, mocks.MockTransferLog2}, event, 232)
Expect(err).ToNot(HaveOccurred())
b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = con.TknHolderAddrs["0x"]
Expect(ok).To(Equal(false))
_, ok = con.TknHolderAddrs[""]
Expect(ok).To(Equal(false))
_, ok = con.TknHolderAddrs["0x09THISE21a5IS5cFAKE1D82fAND43bCE06MADEUP"]
Expect(ok).To(Equal(false))
})
It("Fails with an empty contract", func() {
event := con.Events["Transfer"]
c := converter.NewConverter(&contract.Contract{})
_, err = c.Convert([]types.Log{mocks.MockTransferLog1}, event, 232)
Expect(err).To(HaveOccurred())
})
})
})

View File

@ -1,181 +0,0 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
"errors"
"fmt"
"strings"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
type EventRepository interface {
PersistLog(event types.Log, contractAddr, contractName string) error
CreateEventTable(contractName string, event types.Log) (bool, error)
CreateContractSchema(contractName string) (bool, error)
}
type eventRepository struct {
db *postgres.DB
}
func NewEventRepository(db *postgres.DB) *eventRepository {
return &eventRepository{
db: db,
}
}
// Creates a schema for the contract if needed
// Creates table for the watched contract event if needed
// Persists converted event log data into this custom table
func (r *eventRepository) PersistLog(event types.Log, contractAddr, contractName string) error {
_, err := r.CreateContractSchema(contractAddr)
if err != nil {
return err
}
_, err = r.CreateEventTable(contractAddr, event)
if err != nil {
return err
}
return r.persistLog(event, contractAddr, contractName)
}
// Creates a custom postgres command to persist logs for the given event
func (r *eventRepository) persistLog(event types.Log, contractAddr, contractName string) error {
// Begin postgres string
pgStr := fmt.Sprintf("INSERT INTO l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx"
// Pack the corresponding variables in a slice
var data []interface{}
data = append(data,
event.Id,
contractName,
event.Raw,
event.LogIndex,
event.TransactionIndex)
// Iterate over name-value pairs in the log adding
// names to the string and pushing values to the slice
counter := 0 // Keep track of number of inputs
for inputName, input := range event.Values {
counter += 1
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
data = append(data, input)
}
// Finish off the string and execute the command using the packed data
// For each input entry we created we add its postgres command variable to the string
pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5"
for i := 0; i < counter; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+6)
}
pgStr = pgStr + ")"
_, err := r.db.Exec(pgStr, data...)
if err != nil {
return err
}
return nil
}
// Checks for event table and creates it if it does not already exist
func (r *eventRepository) CreateEventTable(contractAddr string, event types.Log) (bool, error) {
tableExists, err := r.checkForTable(contractAddr, event.Name)
if err != nil {
return false, err
}
if !tableExists {
err = r.newEventTable(contractAddr, event)
if err != nil {
return false, err
}
}
return !tableExists, nil
}
// Creates a table for the given contract and event
func (r *eventRepository) newEventTable(contractAddr string, event types.Log) error {
// Begin pg string
pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS l%s.%s_event ", strings.ToLower(contractAddr), strings.ToLower(event.Name))
pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"
// Iterate over event fields, using their name and pgType to grow the string
for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
}
pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))"
_, err := r.db.Exec(pgStr)
return err
}
// Checks if a table already exists for the given contract and event
func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'l%s' AND table_name = '%s_event')", strings.ToLower(contractAddr), strings.ToLower(eventName))
var exists bool
err := r.db.Get(&exists, pgStr)
return exists, err
}
// Checks for contract schema and creates it if it does not already exist
func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
if contractAddr == "" {
return false, errors.New("error: no contract address specified")
}
schemaExists, err := r.checkForSchema(contractAddr)
if err != nil {
return false, err
}
if !schemaExists {
err = r.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
return !schemaExists, nil
}
// Creates a schema for the given contract
func (r *eventRepository) newContractSchema(contractAddr string) error {
_, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS l" + strings.ToLower(contractAddr))
return err
}
// Checks if a schema already exists for the given contract
func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'l%s')", strings.ToLower(contractAddr))
var exists bool
err := r.db.QueryRow(pgStr).Scan(&exists)
return exists, err
}

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test

View File

@ -17,30 +17,61 @@
package repository
import (
"database/sql"
"github.com/hashicorp/golang-lru"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
const columnCacheSize = 1000
type HeaderRepository interface {
AddCheckColumn(eventID string) error
MarkHeaderChecked(headerID int64, eventID string) error
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error)
CheckCache(key string) (interface{}, bool)
}
type headerRepository struct {
db *postgres.DB
columns *lru.Cache // Cache created columns to minimize db connections
}
func NewHeaderRepository(db *postgres.DB) *headerRepository {
ccs, _ := lru.New(columnCacheSize)
return &headerRepository{
db: db,
columns: ccs,
}
}
func (r *headerRepository) AddCheckColumn(eventID string) error {
// Check cache to see if column already exists before querying pg
_, ok := r.columns.Get(eventID)
if ok {
return nil
}
pgStr := "ALTER TABLE public.checked_headers ADD COLUMN IF NOT EXISTS "
pgStr = pgStr + eventID + " BOOLEAN NOT NULL DEFAULT FALSE"
_, err := r.db.Exec(pgStr)
if err != nil {
return err
}
// Add column name to cache
r.columns.Add(eventID, true)
return nil
}
func (r *headerRepository) MarkHeaderChecked(headerID int64, eventID string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = $2`, headerID, true)
return err
}
@ -68,3 +99,15 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber int64, endingBlock
return result, err
}
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}
func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = $2`, headerID, true)
return err
}

View File

@ -15,3 +15,127 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test
import (
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Repository", func() {
var db *postgres.DB
var r repository.HeaderRepository
var headerRepository repositories.HeaderRepository
var eventID, query string
BeforeEach(func() {
db, _ = test_helpers.SetupDBandBC()
r = repository.NewHeaderRepository(db)
headerRepository = repositories.NewHeaderRepository(db)
eventID = "eventName_contractAddr"
})
AfterEach(func() {
test_helpers.TearDown(db)
})
Describe("AddCheckColumn", func() {
It("Creates a column for the given eventID to mark if the header has been checked for that event", func() {
query = fmt.Sprintf("SELECT %s FROM checked_headers", eventID)
_, err := db.Exec(query)
Expect(err).To(HaveOccurred())
err = r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec(query)
Expect(err).ToNot(HaveOccurred())
})
It("Caches column it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := r.CheckCache(eventID)
Expect(ok).To(Equal(false))
err := r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
v, ok := r.CheckCache(eventID)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("MissingHeaders", func() {
It("Returns all unchecked headers for the given eventID", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
err := r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
})
It("Fails if eventID does not yet exist in check_headers table", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
err := r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
_, err = r.MissingHeaders(6194630, 6194635, "notEventId")
Expect(err).To(HaveOccurred())
})
})
Describe("MarkHeaderChecked", func() {
It("Marks the header checked for the given eventID", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
err := r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
headerID := missingHeaders[0].Id
err = r.MarkHeaderChecked(headerID, eventID)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
It("Fails if eventID does not yet exist in check_headers table", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
err := r.AddCheckColumn(eventID)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := r.MissingHeaders(6194630, 6194635, eventID)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
headerID := missingHeaders[0].Id
err = r.MarkHeaderChecked(headerID, "notEventId")
Expect(err).To(HaveOccurred())
missingHeaders, err = r.MissingHeaders(6194630, 6194635, eventID)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
})
})
})

View File

@ -24,6 +24,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Block Retriever", func() {
@ -43,9 +44,9 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveFirstBlock", func() {
It("Retrieves block number of earliest header in the database", func() {
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
i, err := r.RetrieveFirstBlock()
Expect(err).NotTo(HaveOccurred())
@ -60,9 +61,9 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveMostRecentBlock", func() {
It("Retrieves the latest header's block number", func() {
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader2)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
i, err := r.RetrieveMostRecentBlock()
Expect(err).ToNot(HaveOccurred())

View File

@ -27,7 +27,7 @@ import (
func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Light Retriever Suite Test")
RunSpecs(t, "Light BLock Number Retriever Suite Test")
}
var _ = BeforeSuite(func() {

View File

@ -18,7 +18,8 @@ package transformer
import (
"errors"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers"
"strings"
"github.com/ethereum/go-ethereum/common"
@ -31,12 +32,14 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
srep "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
// Requires a light synced vDB (headers) and a running eth node (or infura)
type transformer struct {
// Database interfaces
repository.EventRepository // Holds transformed watched event log data
srep.EventRepository // Holds transformed watched event log data
repository.HeaderRepository // Interface for interaction with header repositories
// Pre-processing interfaces
@ -72,14 +75,14 @@ type transformer struct {
func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transformer {
return &transformer{
Poller: poller.NewPoller(bc, db),
Poller: poller.NewPoller(bc, db, types.LightSync),
Fetcher: fetcher.NewFetcher(bc),
Parser: parser.NewParser(network),
HeaderRepository: repository.NewHeaderRepository(db),
BlockRetriever: retriever.NewBlockRetriever(db),
Converter: converter.NewConverter(&contract.Contract{}),
Contracts: map[string]*contract.Contract{},
EventRepository: repository.NewEventRepository(db),
EventRepository: srep.NewEventRepository(db, types.LightSync),
WatchedEvents: map[string][]string{},
WantedMethods: map[string][]string{},
ContractRanges: map[string][2]int64{},
@ -93,7 +96,7 @@ func NewTransformer(network string, bc core.BlockChain, db *postgres.DB) *transf
// Uses parser to pull event info from abi
// Use this info to generate event filters
func (tr *transformer) Init() error {
// Iterate through all internal contract addresses
for contractAddr, subset := range tr.WatchedEvents {
// Get Abi
err := tr.Parser.Parse(contractAddr)
@ -101,7 +104,7 @@ func (tr *transformer) Init() error {
return err
}
// Get first block for contract and most recent block for the chain
// Get first block and most recent block number in the header repo
firstBlock, err := tr.BlockRetriever.RetrieveFirstBlock()
if err != nil {
return err
@ -111,7 +114,7 @@ func (tr *transformer) Init() error {
return err
}
// Set to specified range if it falls within the contract's bounds
// Set to specified range if it falls within the bounds
if firstBlock < tr.ContractRanges[contractAddr][0] {
firstBlock = tr.ContractRanges[contractAddr][0]
}
@ -119,14 +122,11 @@ func (tr *transformer) Init() error {
lastBlock = tr.ContractRanges[contractAddr][1]
}
// Get contract name
// Get contract name if it has one
var name = new(string)
err = tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock)
if err != nil {
return errors.New(fmt.Sprintf("unable to fetch contract name: %v\r\n", err))
}
tr.FetchContractData(tr.Abi(), contractAddr, "name", nil, &name, lastBlock)
// Remove any accidental duplicate inputs in filter addresses
// Remove any potential accidental duplicate inputs in filter addresses
EventAddrs := map[string]bool{}
for _, addr := range tr.EventAddrs[contractAddr] {
EventAddrs[addr] = true
@ -142,6 +142,7 @@ func (tr *transformer) Init() error {
Network: tr.Network,
Address: contractAddr,
Abi: tr.Abi(),
ParsedAbi: tr.ParsedAbi(),
StartingBlock: firstBlock,
LastBlock: lastBlock,
Events: tr.GetEvents(subset),
@ -151,7 +152,7 @@ func (tr *transformer) Init() error {
TknHolderAddrs: map[string]bool{},
}
// Store contract info for further processing
// Store contract info for execution
tr.Contracts[contractAddr] = info
}
@ -160,52 +161,68 @@ func (tr *transformer) Init() error {
func (tr *transformer) Execute() error {
if len(tr.Contracts) == 0 {
return errors.New("error: transformer has no initialized contracts to work with")
return errors.New("error: transformer has no initialized contracts")
}
// Iterate through all internal contracts
for _, con := range tr.Contracts {
// Update converter with current contract
tr.Update(con)
// Iterate through events
for _, event := range con.Events {
topics := [][]common.Hash{{common.HexToHash(event.Sig())}}
eventId := event.Name + "_" + con.Address
// Filter using the event signature
topics := [][]common.Hash{{common.HexToHash(helpers.GenerateSignature(event.Sig()))}}
// Generate eventID and use it to create a checked_header column if one does not already exist
eventId := strings.ToLower(event.Name + "_" + con.Address)
if err := tr.AddCheckColumn(eventId); err != nil {
return err
}
// Find unchecked headers for this event
missingHeaders, err := tr.MissingHeaders(con.StartingBlock, con.LastBlock, eventId)
if err != nil {
return err
}
// Iterate over headers
for _, header := range missingHeaders {
// And fetch event logs using the header, contract address, and topics filter
logs, err := tr.FetchLogs([]string{con.Address}, topics, header)
if err != nil {
return err
}
// Mark the header checked for this eventID and continue to next iteration if no logs are found
if len(logs) < 1 {
err = tr.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
}
continue
}
for _, l := range logs {
mapping, err := tr.Convert(l, event)
// Convert logs into custom type
convertedLogs, err := tr.Convert(logs, event, header.Id)
if err != nil {
return err
}
if mapping == nil {
break
if len(convertedLogs) < 1 {
continue
}
err = tr.PersistLog(*mapping, con.Address, con.Name)
// If logs aren't empty, persist them
err = tr.PersistLogs(convertedLogs, event, con.Address, con.Name)
if err != nil {
return err
}
}
}
// After persisting all watched event logs
// poller polls select contract methods
// and persists the results into custom pg tables
if err := tr.PollContract(*con); err != nil {
return err
}
}

View File

@ -17,9 +17,7 @@
package transformer_test
import (
"math/rand"
"time"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -29,6 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/light/transformer"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Transformer", func() {
@ -36,7 +35,7 @@ var _ = Describe("Transformer", func() {
var err error
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
rand.Seed(time.Now().UnixNano())
var headerID int64
BeforeEach(func() {
db, blockChain = test_helpers.SetupDBandBC()
@ -94,8 +93,8 @@ var _ = Describe("Transformer", func() {
Describe("Init", func() {
It("Initializes transformer's contract objects", func() {
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@ -111,7 +110,7 @@ var _ = Describe("Transformer", func() {
Expect(c.Address).To(Equal(constants.TusdContractAddress))
})
It("Fails to initialize if first and most recent blocks cannot be fetched from vDB", func() {
It("Fails to initialize if first and most recent block numbers cannot be fetched from vDB headers table", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
err = t.Init()
@ -119,8 +118,8 @@ var _ = Describe("Transformer", func() {
})
It("Does nothing if watched events are unset", func() {
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader1)
headerRepository.CreateOrUpdateHeader(test_helpers.MockHeader3)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
t := transformer.NewTransformer("", blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
@ -131,6 +130,96 @@ var _ = Describe("Transformer", func() {
})
Describe("Execute", func() {
BeforeEach(func() {
header1, err := blockChain.GetHeaderByNumber(6791668)
Expect(err).ToNot(HaveOccurred())
header2, err := blockChain.GetHeaderByNumber(6791669)
Expect(err).ToNot(HaveOccurred())
header3, err := blockChain.GetHeaderByNumber(6791670)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header1)
headerID, err = headerRepository.CreateOrUpdateHeader(header2)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header3)
})
It("Transforms watched contract data into custom repositories", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
log := test_helpers.LightTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", constants.TusdContractAddress)).StructScan(&log)
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(log.HeaderID).To(Equal(headerID))
Expect(log.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
Expect(log.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"))
Expect(log.Value).To(Equal("9998940000000000000000"))
})
It("Keeps track of contract-related addresses while transforming event data", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
c, ok := t.Contracts[constants.TusdContractAddress]
Expect(ok).To(Equal(true))
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
b, ok := c.TknHolderAddrs["0x1062a747393198f70F71ec65A582423Dba7E5Ab3"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = c.TknHolderAddrs["0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = c.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843b1234567890"]
Expect(ok).To(Equal(false))
_, ok = c.TknHolderAddrs["0x"]
Expect(ok).To(Equal(false))
})
It("Polls given methods using generated token holder address", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, []string{"balanceOf"})
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
res := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).ToNot(HaveOccurred())
Expect(res.Balance).To(Equal("55849938025000000000000"))
Expect(res.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", constants.TusdContractAddress)).StructScan(&res)
Expect(err).To(HaveOccurred())
})
It("Fails if initialization has not been done", func() {
t := transformer.NewTransformer("", blockChain, db)
t.SetEvents(constants.TusdContractAddress, []string{"Transfer"})
t.SetMethods(constants.TusdContractAddress, nil)
err = t.Execute()
Expect(err).To(HaveOccurred())
})
})
})

View File

@ -22,6 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
var _ = Describe("Contract", func() {
@ -38,11 +39,11 @@ var _ = Describe("Contract", func() {
val, ok := info.Filters["Transfer"]
Expect(ok).To(Equal(true))
Expect(val).To(Equal(test_helpers.ExpectedTransferFilter))
Expect(val).To(Equal(mocks.ExpectedTransferFilter))
val, ok = info.Filters["Approval"]
Expect(ok).To(Equal(true))
Expect(val).To(Equal(test_helpers.ExpectedApprovalFilter))
Expect(val).To(Equal(mocks.ExpectedApprovalFilter))
val, ok = info.Filters["Mint"]
Expect(ok).To(Equal(false))

View File

@ -33,7 +33,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
)
type TransferLog struct {
@ -47,6 +47,18 @@ type TransferLog struct {
Value string `db:"value_"`
}
type LightTransferLog struct {
Id int64 `db:"id"`
HeaderID int64 `db:"header_id"`
TokenName string `db:"token_name"`
LogIndex int64 `db:"log_idx"`
TxIndex int64 `db:"tx_idx"`
From string `db:"from_"`
To string `db:"to_"`
Value string `db:"value_"`
RawLog []byte `db:"raw_log"`
}
type BalanceOf struct {
Id int64 `db:"id"`
TokenName string `db:"token_name"`
@ -119,8 +131,8 @@ func SetupTusdRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string)
}
func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract {
p := parser.NewParser("")
err := p.Parse(constants.TusdContractAddress)
p := mocks.NewParser(constants.TusdAbiString)
err := p.Parse()
Expect(err).ToNot(HaveOccurred())
return &contract.Contract{
@ -139,25 +151,40 @@ func SetupTusdContract(wantedEvents, wantedMethods []string) *contract.Contract
}
func TearDown(db *postgres.DB) {
_, err := db.Query(`DELETE FROM blocks`)
tx, err := db.Begin()
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DELETE FROM headers`)
_, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DELETE FROM checked_headers`)
_, err = tx.Exec(`DELETE FROM headers`)
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DELETE FROM logs`)
_, err = tx.Exec(`DELETE FROM checked_headers`)
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DELETE FROM transactions`)
_, err = tx.Exec(`DELETE FROM logs`)
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DELETE FROM receipts`)
_, err = tx.Exec(`DELETE FROM transactions`)
Expect(err).NotTo(HaveOccurred())
_, err = db.Query(`DROP SCHEMA IF EXISTS c0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
_, err = tx.Exec(`DELETE FROM receipts`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS eventName_contractAddr`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`ALTER TABLE public.checked_headers DROP COLUMN IF EXISTS transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP SCHEMA IF EXISTS light_0x8dd5fbCe2F6a956C3022bA3663759011Dd51e73E CASCADE`)
Expect(err).NotTo(HaveOccurred())
err = tx.Commit()
Expect(err).NotTo(HaveOccurred())
}

View File

@ -14,11 +14,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package test_helpers
package mocks
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
@ -126,3 +130,44 @@ var MockHeader3 = core.Header{
Raw: rawFakeHeader,
Timestamp: "50000030",
}
var MockTransferLog1 = types.Log{
Index: 1,
Address: common.HexToAddress(constants.TusdContractAddress),
BlockNumber: 5488076,
TxIndex: 110,
TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"),
Topics: []common.Hash{
common.HexToHash(constants.TransferEvent.Signature()),
common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"),
},
Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
}
var MockTransferLog2 = types.Log{
Index: 3,
Address: common.HexToAddress(constants.TusdContractAddress),
BlockNumber: 5488077,
TxIndex: 2,
TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae"),
Topics: []common.Hash{
common.HexToHash(constants.TransferEvent.Signature()),
common.HexToHash("0x9dd48110dcc444fdc242510c09bbbbe21a5975cac061d82f7b843bce061ba391"),
common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
},
Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
}
var MockMintLog = types.Log{
Index: 10,
Address: common.HexToAddress(constants.TusdContractAddress),
BlockNumber: 548808,
TxIndex: 50,
TxHash: common.HexToHash("0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6minty"),
Topics: []common.Hash{
common.HexToHash(constants.MintEvent.Signature()),
common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000af21"),
},
Data: hexutil.MustDecode("0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000089d24a6b4ccb1b6faa2625fe562bdd9a23260359000000000000000000000000000000000000000000000000392d2e2bda9c00000000000000000000000000000000000000000000000000927f41fa0a4a418000000000000000000000000000000000000000000000000000000000005adcfebe"),
}

View File

@ -23,7 +23,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/mocks"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/parser"
)

View File

@ -42,10 +42,10 @@ type poller struct {
contract contract.Contract
}
func NewPoller(blockChain core.BlockChain, db *postgres.DB) *poller {
func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *poller {
return &poller{
MethodRepository: repository.NewMethodRepository(db),
MethodRepository: repository.NewMethodRepository(db, mode),
bc: blockChain,
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/poller"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var _ = Describe("Poller", func() {
@ -37,15 +38,16 @@ var _ = Describe("Poller", func() {
var db *postgres.DB
var bc core.BlockChain
BeforeEach(func() {
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db)
})
AfterEach(func() {
test_helpers.TearDown(db)
})
Describe("Full sync mode", func() {
BeforeEach(func() {
db, bc = test_helpers.SetupDBandBC()
p = poller.NewPoller(bc, db, types.FullSync)
})
Describe("PollContract", func() {
It("Polls specified contract methods using contract's token holder address list", func() {
con = test_helpers.SetupTusdContract(nil, []string{"balanceOf"})
@ -62,22 +64,22 @@ var _ = Describe("Poller", func() {
scanStruct := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Balance).To(Equal("66386309548896882859581786"))
Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE' AND block = '6707323'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).ToNot(HaveOccurred())
Expect(scanStruct.Balance).To(Equal("17982350181394112023885864"))
Expect(scanStruct.TokenName).To(Equal("TrueUSD"))
@ -98,7 +100,7 @@ var _ = Describe("Poller", func() {
scanStruct := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method WHERE who_ = '0xfE9e8709d3215310075d67E3ed32A380CCf451C8' AND block = '6707322'", constants.TusdContractAddress)).StructScan(&scanStruct)
Expect(err).To(HaveOccurred())
})
})
@ -111,4 +113,5 @@ var _ = Describe("Poller", func() {
Expect(*name).To(Equal("TrueUSD"))
})
})
})
})

View File

@ -0,0 +1,316 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
"errors"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
"strings"
"github.com/hashicorp/golang-lru"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
const (
// Number of contract address and method ids to keep in cache
contractCacheSize = 100
eventChacheSize = 1000
)
// Event repository is used to persist event data into custom tables
type EventRepository interface {
PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error
CreateEventTable(contractAddr string, event types.Event) (bool, error)
CreateContractSchema(contractName string) (bool, error)
CheckSchemaCache(key string) (interface{}, bool)
CheckTableCache(key string) (interface{}, bool)
}
type eventRepository struct {
db *postgres.DB
mode types.Mode
schemas *lru.Cache // Cache names of recently used schemas to minimize db connections
tables *lru.Cache // Cache names of recently used tables to minimize db connections
}
func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository {
ccs, _ := lru.New(contractCacheSize)
ecs, _ := lru.New(eventChacheSize)
return &eventRepository{
db: db,
mode: mode,
schemas: ccs,
tables: ecs,
}
}
// Creates a schema for the contract if needed
// Creates table for the watched contract event if needed
// Persists converted event log data into this custom table
func (r *eventRepository) PersistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
if logs == nil {
return errors.New("event repository error: passed a nil log slice")
}
if len(logs) == 0 {
return errors.New("event repository error: passed an empty log slice")
}
_, err := r.CreateContractSchema(contractAddr)
if err != nil {
return err
}
_, err = r.CreateEventTable(contractAddr, eventInfo)
if err != nil {
return err
}
return r.persistLogs(logs, eventInfo, contractAddr, contractName)
}
func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
var err error
switch r.mode {
case types.LightSync:
err = r.persistLightSyncLogs(logs, eventInfo, contractAddr, contractName)
case types.FullSync:
err = r.persistFullSyncLogs(logs, eventInfo, contractAddr, contractName)
default:
return errors.New("event repository error: unhandled mode")
}
return err
}
// Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB)
func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Begin()
if err != nil {
return err
}
for _, event := range logs {
// Begin pg query string
pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name))
pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx"
el := len(event.Values)
// Pack the corresponding variables in a slice
data := make([]interface{}, 0, 5+el)
data = append(data,
event.Id,
contractName,
event.Raw,
event.LogIndex,
event.TransactionIndex)
// Iterate over inputs and append name to query string and value to input data
for inputName, input := range event.Values {
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName)) // Add underscore after to avoid any collisions with reserved pg words
data = append(data, input)
}
// For each input entry we created we add its postgres command variable to the string
pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5"
for i := 0; i < el; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+6)
}
pgStr = pgStr + ")"
_, err = tx.Exec(pgStr, data...)
if err != nil {
tx.Rollback()
return err
}
}
eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr)
err = repository.MarkHeaderCheckedInTransaction(logs[0].Id, tx, eventId)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB)
func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Begin()
if err != nil {
return err
}
for _, event := range logs {
pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_event ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventInfo.Name))
pgStr = pgStr + "(vulcanize_log_id, token_name, block, tx"
el := len(event.Values)
data := make([]interface{}, 0, 4+el)
data = append(data,
event.Id,
contractName,
event.Block,
event.Tx)
for inputName, input := range event.Values {
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(inputName))
data = append(data, input)
}
pgStr = pgStr + ") VALUES ($1, $2, $3, $4"
for i := 0; i < el; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+5)
}
pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING"
_, err = tx.Exec(pgStr, data...)
if err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
// Checks for event table and creates it if it does not already exist
// Returns true if it created a new table; returns false if table already existed
func (r *eventRepository) CreateEventTable(contractAddr string, event types.Event) (bool, error) {
tableID := fmt.Sprintf("%s_%s.%s_event", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(event.Name))
// Check cache before querying pq to see if table exists
_, ok := r.tables.Get(tableID)
if ok {
return false, nil
}
tableExists, err := r.checkForTable(contractAddr, event.Name)
if err != nil {
return false, err
}
if !tableExists {
err = r.newEventTable(tableID, event)
if err != nil {
return false, err
}
}
// Add table id to cache
r.tables.Add(tableID, true)
return !tableExists, nil
}
// Creates a table for the given contract and event
func (r *eventRepository) newEventTable(tableID string, event types.Event) error {
// Begin pg string
var pgStr = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID)
var err error
// Handle different modes
switch r.mode {
case types.FullSync:
pgStr = pgStr + "(id SERIAL, vulcanize_log_id INTEGER NOT NULL UNIQUE, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL, tx CHARACTER VARYING(66) NOT NULL,"
// Iterate over event fields, using their name and pgType to grow the string
for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
}
pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)"
case types.LightSync:
pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"
for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
}
pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))"
default:
return errors.New("unhandled repository mode")
}
_, err = r.db.Exec(pgStr)
return err
}
// Checks if a table already exists for the given contract and event
func (r *eventRepository) checkForTable(contractAddr string, eventName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_event')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(eventName))
var exists bool
err := r.db.Get(&exists, pgStr)
return exists, err
}
// Checks for contract schema and creates it if it does not already exist
// Returns true if it created a new schema; returns false if schema already existed
func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error) {
if contractAddr == "" {
return false, errors.New("error: no contract address specified")
}
// Check cache before querying pq to see if schema exists
_, ok := r.schemas.Get(contractAddr)
if ok {
return false, nil
}
schemaExists, err := r.checkForSchema(contractAddr)
if err != nil {
return false, err
}
if !schemaExists {
err = r.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
// Add schema name to cache
r.schemas.Add(contractAddr, true)
return !schemaExists, nil
}
// Creates a schema for the given contract
func (r *eventRepository) newContractSchema(contractAddr string) error {
_, err := r.db.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr))
return err
}
// Checks if a schema already exists for the given contract
func (r *eventRepository) checkForSchema(contractAddr string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr))
var exists bool
err := r.db.QueryRow(pgStr).Scan(&exists)
return exists, err
}
func (r *eventRepository) CheckSchemaCache(key string) (interface{}, bool) {
return r.schemas.Get(key)
}
func (r *eventRepository) CheckTableCache(key string) (interface{}, bool) {
return r.tables.Get(key)
}

View File

@ -0,0 +1,349 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test
import (
"encoding/json"
"fmt"
"strings"
geth "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
fc "github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
lc "github.com/vulcanize/vulcanizedb/pkg/omni/light/converter"
lr "github.com/vulcanize/vulcanizedb/pkg/omni/light/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers/mocks"
sr "github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var _ = Describe("Repository", func() {
var db *postgres.DB
var dataStore sr.EventRepository
var err error
var log *types.Log
var logs []types.Log
var con *contract.Contract
var vulcanizeLogId int64
var wantedEvents = []string{"Transfer"}
var event types.Event
var headerID int64
var mockEvent = mocks.MockTranferEvent
var mockLog1 = mocks.MockTransferLog1
var mockLog2 = mocks.MockTransferLog2
BeforeEach(func() {
db, con = test_helpers.SetupTusdRepo(&vulcanizeLogId, wantedEvents, []string{})
mockEvent.LogID = vulcanizeLogId
event = con.Events["Transfer"]
err = con.GenerateFilters()
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
test_helpers.TearDown(db)
})
Describe("Full sync mode", func() {
BeforeEach(func() {
dataStore = sr.NewEventRepository(db, types.FullSync)
})
Describe("CreateContractSchema", func() {
It("Creates schema if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(false))
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("CreateEventTable", func() {
It("Creates table if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
tableID := fmt.Sprintf("%s_%s.%s_event", types.FullSync, strings.ToLower(con.Address), strings.ToLower(event.Name))
_, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(false))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("PersistLogs", func() {
BeforeEach(func() {
c := fc.NewConverter(con)
log, err = c.Convert(mockEvent, event)
Expect(err).ToNot(HaveOccurred())
})
It("Persists contract event log values into custom tables, adding any addresses to a growing list of contract associated addresses", func() {
err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
b, ok := con.TknHolderAddrs["0x000000000000000000000000000000000000Af21"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = con.TknHolderAddrs["0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
scanLog := test_helpers.TransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
Expect(err).ToNot(HaveOccurred())
expectedLog := test_helpers.TransferLog{
Id: 1,
VulvanizeLogId: vulcanizeLogId,
TokenName: "TrueUSD",
Block: 5488076,
Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
From: "0x000000000000000000000000000000000000Af21",
To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
Value: "1097077688018008265106216665536940668749033598146",
}
Expect(scanLog).To(Equal(expectedLog))
})
It("Doesn't persist duplicate event logs", func() {
// Try to persist the same log twice in a single call
err = dataStore.PersistLogs([]types.Log{*log, *log}, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
scanLog := test_helpers.TransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.transfer_event", constants.TusdContractAddress)).StructScan(&scanLog)
Expect(err).ToNot(HaveOccurred())
expectedLog := test_helpers.TransferLog{
Id: 1,
VulvanizeLogId: vulcanizeLogId,
TokenName: "TrueUSD",
Block: 5488076,
Tx: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
From: "0x000000000000000000000000000000000000Af21",
To: "0x09BbBBE21a5975cAc061D82f7b843bCE061BA391",
Value: "1097077688018008265106216665536940668749033598146",
}
Expect(scanLog).To(Equal(expectedLog))
// Attempt to persist the same log again in seperate call
err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Show that no new logs were entered
var count int
err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM full_%s.transfer_event", constants.TusdContractAddress))
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(1))
})
It("Fails with empty log", func() {
err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
})
})
Describe("Light sync mode", func() {
BeforeEach(func() {
dataStore = sr.NewEventRepository(db, types.LightSync)
})
Describe("CreateContractSchema", func() {
It("Creates schema if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(false))
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
tableID := fmt.Sprintf("%s_%s.%s_event", types.LightSync, strings.ToLower(con.Address), strings.ToLower(event.Name))
_, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(false))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("CreateEventTable", func() {
It("Creates table if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateEventTable(con.Address, event)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
})
Describe("PersistLogs", func() {
BeforeEach(func() {
headerRepository := repositories.NewHeaderRepository(db)
headerID, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
Expect(err).ToNot(HaveOccurred())
c := lc.NewConverter(con)
logs, err = c.Convert([]geth.Log{mockLog1, mockLog2}, event, headerID)
Expect(err).ToNot(HaveOccurred())
})
It("Persists contract event log values into custom tables", func() {
hr := lr.NewHeaderRepository(db)
err = hr.AddCheckColumn(event.Name + "_" + con.Address)
Expect(err).ToNot(HaveOccurred())
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
var count int
err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress))
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(2))
scanLog := test_helpers.LightTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event LIMIT 1", constants.TusdContractAddress)).StructScan(&scanLog)
Expect(err).ToNot(HaveOccurred())
Expect(scanLog.HeaderID).To(Equal(headerID))
Expect(scanLog.TokenName).To(Equal("TrueUSD"))
Expect(scanLog.TxIndex).To(Equal(int64(110)))
Expect(scanLog.LogIndex).To(Equal(int64(1)))
Expect(scanLog.From).To(Equal("0x000000000000000000000000000000000000Af21"))
Expect(scanLog.To).To(Equal("0x09BbBBE21a5975cAc061D82f7b843bCE061BA391"))
Expect(scanLog.Value).To(Equal("1097077688018008265106216665536940668749033598146"))
var expectedRawLog, rawLog geth.Log
err = json.Unmarshal(logs[0].Raw, &expectedRawLog)
Expect(err).ToNot(HaveOccurred())
err = json.Unmarshal(scanLog.RawLog, &rawLog)
Expect(err).ToNot(HaveOccurred())
Expect(rawLog).To(Equal(expectedRawLog))
})
It("Doesn't persist duplicate event logs", func() {
hr := lr.NewHeaderRepository(db)
err = hr.AddCheckColumn(event.Name + "_" + con.Address)
Expect(err).ToNot(HaveOccurred())
// Try and fail to persist the same log twice in a single call
err = dataStore.PersistLogs([]types.Log{logs[0], logs[0]}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
// Successfuly persist the two unique logs
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Try and fail to persist the same logs again in separate call
err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
// Show that no new logs were entered
var count int
err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM light_%s.transfer_event", constants.TusdContractAddress))
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(2))
})
It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() {
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
It("Fails with empty log", func() {
err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
})
})
})

View File

@ -21,28 +21,41 @@ import (
"fmt"
"strings"
"github.com/hashicorp/golang-lru"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
const methodCacheSize = 1000
type MethodRepository interface {
PersistResult(method types.Result, contractAddr, contractName string) error
CreateMethodTable(contractAddr string, method types.Result) (bool, error)
CreateContractSchema(contractAddr string) (bool, error)
CheckSchemaCache(key string) (interface{}, bool)
CheckTableCache(key string) (interface{}, bool)
}
type methodRepository struct {
*postgres.DB
mode types.Mode
schemas *lru.Cache // Cache names of recently used schemas to minimize db connections
tables *lru.Cache // Cache names of recently used tables to minimize db connections
}
func NewMethodRepository(db *postgres.DB) *methodRepository {
func NewMethodRepository(db *postgres.DB, mode types.Mode) *methodRepository {
ccs, _ := lru.New(contractCacheSize)
mcs, _ := lru.New(methodCacheSize)
return &methodRepository{
DB: db,
mode: mode,
schemas: ccs,
tables: mcs,
}
}
func (d *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error {
func (r *methodRepository) PersistResult(method types.Result, contractAddr, contractName string) error {
if len(method.Args) != len(method.Inputs) {
return errors.New("error: given number of inputs does not match number of method arguments")
}
@ -50,51 +63,48 @@ func (d *methodRepository) PersistResult(method types.Result, contractAddr, cont
return errors.New("error: given number of outputs does not match number of method return values")
}
_, err := d.CreateContractSchema(contractAddr)
_, err := r.CreateContractSchema(contractAddr)
if err != nil {
return err
}
_, err = d.CreateMethodTable(contractAddr, method)
_, err = r.CreateMethodTable(contractAddr, method)
if err != nil {
return err
}
return d.persistResult(method, contractAddr, contractName)
return r.persistResult(method, contractAddr, contractName)
}
// Creates a custom postgres command to persist logs for the given event
func (d *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error {
func (r *methodRepository) persistResult(method types.Result, contractAddr, contractName string) error {
// Begin postgres string
pgStr := fmt.Sprintf("INSERT INTO c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name))
pgStr := fmt.Sprintf("INSERT INTO %s_%s.%s_method ", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name))
pgStr = pgStr + "(token_name, block"
ml := len(method.Args)
// Pack the corresponding variables in a slice
var data []interface{}
// Preallocate slice of needed size and proceed to pack variables into it in same order they appear in string
data := make([]interface{}, 0, 3+ml)
data = append(data,
contractName,
method.Block)
// Iterate over method args and return value, adding names
// to the string and pushing values to the slice
counter := 0 // Keep track of number of inputs
for i, arg := range method.Args {
counter += 1
pgStr = pgStr + fmt.Sprintf(", %s_", strings.ToLower(arg.Name)) // Add underscore after to avoid any collisions with reserved pg words
data = append(data, method.Inputs[i])
}
counter += 1
pgStr = pgStr + ", returned) VALUES ($1, $2"
data = append(data, method.Output)
// For each input entry we created we add its postgres command variable to the string
for i := 0; i < counter; i++ {
for i := 0; i <= ml; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+3)
}
pgStr = pgStr + ")"
_, err := d.DB.Exec(pgStr, data...)
_, err := r.DB.Exec(pgStr, data...)
if err != nil {
return err
}
@ -103,26 +113,35 @@ func (d *methodRepository) persistResult(method types.Result, contractAddr, cont
}
// Checks for event table and creates it if it does not already exist
func (d *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) {
tableExists, err := d.checkForTable(contractAddr, method.Name)
func (r *methodRepository) CreateMethodTable(contractAddr string, method types.Result) (bool, error) {
tableID := fmt.Sprintf("%s_%s.%s_method", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(method.Name))
// Check cache before querying pq to see if table exists
_, ok := r.tables.Get(tableID)
if ok {
return false, nil
}
tableExists, err := r.checkForTable(contractAddr, method.Name)
if err != nil {
return false, err
}
if !tableExists {
err = r.newMethodTable(tableID, method)
if err != nil {
return false, err
}
}
if !tableExists {
err = d.newMethodTable(contractAddr, method)
if err != nil {
return false, err
}
}
// Add schema name to cache
r.tables.Add(tableID, true)
return !tableExists, nil
}
// Creates a table for the given contract and event
func (d *methodRepository) newMethodTable(contractAddr string, method types.Result) error {
func (r *methodRepository) newMethodTable(tableID string, method types.Result) error {
// Begin pg string
pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS c%s.%s_method ", strings.ToLower(contractAddr), strings.ToLower(method.Name))
pgStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s ", tableID)
pgStr = pgStr + "(id SERIAL, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL,"
// Iterate over method inputs and outputs, using their name and pgType to grow the string
@ -132,54 +151,69 @@ func (d *methodRepository) newMethodTable(contractAddr string, method types.Resu
pgStr = pgStr + fmt.Sprintf(" returned %s NOT NULL)", method.Return[0].PgType)
_, err := d.DB.Exec(pgStr)
_, err := r.DB.Exec(pgStr)
return err
}
// Checks if a table already exists for the given contract and event
func (d *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'c%s' AND table_name = '%s_method')", strings.ToLower(contractAddr), strings.ToLower(methodName))
func (r *methodRepository) checkForTable(contractAddr string, methodName string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_method')", r.mode.String(), strings.ToLower(contractAddr), strings.ToLower(methodName))
var exists bool
err := d.DB.Get(&exists, pgStr)
err := r.DB.Get(&exists, pgStr)
return exists, err
}
// Checks for contract schema and creates it if it does not already exist
func (d *methodRepository) CreateContractSchema(contractAddr string) (bool, error) {
func (r *methodRepository) CreateContractSchema(contractAddr string) (bool, error) {
if contractAddr == "" {
return false, errors.New("error: no contract address specified")
}
schemaExists, err := d.checkForSchema(contractAddr)
// Check cache before querying pq to see if schema exists
_, ok := r.schemas.Get(contractAddr)
if ok {
return false, nil
}
schemaExists, err := r.checkForSchema(contractAddr)
if err != nil {
return false, err
}
if !schemaExists {
err = r.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
if !schemaExists {
err = d.newContractSchema(contractAddr)
if err != nil {
return false, err
}
}
// Add schema name to cache
r.schemas.Add(contractAddr, true)
return !schemaExists, nil
}
// Creates a schema for the given contract
func (d *methodRepository) newContractSchema(contractAddr string) error {
_, err := d.DB.Exec("CREATE SCHEMA IF NOT EXISTS c" + strings.ToLower(contractAddr))
func (r *methodRepository) newContractSchema(contractAddr string) error {
_, err := r.DB.Exec("CREATE SCHEMA IF NOT EXISTS " + r.mode.String() + "_" + strings.ToLower(contractAddr))
return err
}
// Checks if a schema already exists for the given contract
func (d *methodRepository) checkForSchema(contractAddr string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'c%s')", strings.ToLower(contractAddr))
func (r *methodRepository) checkForSchema(contractAddr string) (bool, error) {
pgStr := fmt.Sprintf("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')", r.mode.String(), strings.ToLower(contractAddr))
var exists bool
err := d.DB.Get(&exists, pgStr)
err := r.DB.Get(&exists, pgStr)
return exists, err
}
func (r *methodRepository) CheckSchemaCache(key string) (interface{}, bool) {
return r.schemas.Get(key)
}
func (r *methodRepository) CheckTableCache(key string) (interface{}, bool) {
return r.tables.Get(key)
}

View File

@ -18,6 +18,7 @@ package repository_test
import (
"fmt"
"strings"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -36,10 +37,11 @@ var _ = Describe("Repository", func() {
var con *contract.Contract
var err error
var mockResult types.Result
var method types.Method
BeforeEach(func() {
con = test_helpers.SetupTusdContract([]string{}, []string{"balanceOf"})
method := con.Methods["balanceOf"]
method = con.Methods["balanceOf"]
mockResult = types.Result{
Method: method,
PgType: method.Return[0].PgType,
@ -50,13 +52,18 @@ var _ = Describe("Repository", func() {
mockResult.Inputs[0] = "0xfE9e8709d3215310075d67E3ed32A380CCf451C8"
mockResult.Output = "66386309548896882859581786"
db, _ = test_helpers.SetupDBandBC()
dataStore = repository.NewMethodRepository(db)
dataStore = repository.NewMethodRepository(db, types.FullSync)
})
AfterEach(func() {
test_helpers.TearDown(db)
})
Describe("Full Sync Mode", func() {
BeforeEach(func() {
dataStore = repository.NewMethodRepository(db, types.FullSync)
})
Describe("CreateContractSchema", func() {
It("Creates schema if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
@ -67,6 +74,19 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(false))
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("CreateMethodTable", func() {
@ -83,6 +103,24 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
tableID := fmt.Sprintf("%s_%s.%s_method", types.FullSync, strings.ToLower(con.Address), strings.ToLower(method.Name))
_, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(false))
created, err = dataStore.CreateMethodTable(con.Address, mockResult)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("PersistResult", func() {
@ -92,7 +130,7 @@ var _ = Describe("Repository", func() {
scanStruct := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM c%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM full_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
expectedLog := test_helpers.BalanceOf{
Id: 1,
TokenName: "TrueUSD",
@ -108,4 +146,94 @@ var _ = Describe("Repository", func() {
Expect(err).To(HaveOccurred())
})
})
})
Describe("Light Sync Mode", func() {
BeforeEach(func() {
dataStore = repository.NewMethodRepository(db, types.LightSync)
})
Describe("CreateContractSchema", func() {
It("Creates schema if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateContractSchema(constants.TusdContractAddress)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches schema it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
_, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(false))
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckSchemaCache(con.Address)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("CreateMethodTable", func() {
It("Creates table if it doesn't exist", func() {
created, err := dataStore.CreateContractSchema(constants.TusdContractAddress)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
created, err = dataStore.CreateMethodTable(constants.TusdContractAddress, mockResult)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(false))
})
It("Caches table it creates so that it does not need to repeatedly query the database to check for it's existence", func() {
created, err := dataStore.CreateContractSchema(con.Address)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
tableID := fmt.Sprintf("%s_%s.%s_method", types.LightSync, strings.ToLower(con.Address), strings.ToLower(method.Name))
_, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(false))
created, err = dataStore.CreateMethodTable(con.Address, mockResult)
Expect(err).ToNot(HaveOccurred())
Expect(created).To(Equal(true))
v, ok := dataStore.CheckTableCache(tableID)
Expect(ok).To(Equal(true))
Expect(v).To(Equal(true))
})
})
Describe("PersistResult", func() {
It("Persists result from method polling in custom pg table for light sync mode vDB", func() {
err = dataStore.PersistResult(mockResult, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
scanStruct := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method", constants.TusdContractAddress)).StructScan(&scanStruct)
expectedLog := test_helpers.BalanceOf{
Id: 1,
TokenName: "TrueUSD",
Block: 6707323,
Address: "0xfE9e8709d3215310075d67E3ed32A380CCf451C8",
Balance: "66386309548896882859581786",
}
Expect(scanStruct).To(Equal(expectedLog))
})
It("Fails with empty result", func() {
err = dataStore.PersistResult(types.Result{}, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
})
})
})

View File

@ -18,6 +18,7 @@ package retriever
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
"strings"
"github.com/ethereum/go-ethereum/accounts/abi"
@ -28,19 +29,19 @@ import (
)
// Address retriever is used to retrieve the addresses associated with a contract
// It requires a vDB synced database with blocks, transactions, receipts, logs,
// AND all of the targeted events persisted
type AddressRetriever interface {
RetrieveTokenHolderAddresses(info contract.Contract) (map[common.Address]bool, error)
}
type addressRetriever struct {
db *postgres.DB
mode types.Mode
}
func NewAddressRetriever(db *postgres.DB) (r *addressRetriever) {
func NewAddressRetriever(db *postgres.DB, mode types.Mode) (r *addressRetriever) {
return &addressRetriever{
db: db,
mode: mode,
}
}
@ -84,7 +85,7 @@ func (r *addressRetriever) retrieveTransferAddresses(con contract.Contract) ([]s
if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses
addrs := make([]string, 0)
pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name))
pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name))
err := r.db.Select(&addrs, pgStr)
if err != nil {
return []string{}, err
@ -105,7 +106,7 @@ func (r *addressRetriever) retrieveTokenMintees(con contract.Contract) ([]string
if field.Type.T == abi.AddressTy { // If they have address type, retrieve those addresses
addrs := make([]string, 0)
pgStr := fmt.Sprintf("SELECT %s_ FROM c%s.%s_event", strings.ToLower(field.Name), strings.ToLower(con.Address), strings.ToLower(event.Name))
pgStr := fmt.Sprintf("SELECT %s_ FROM %s_%s.%s_event", strings.ToLower(field.Name), r.mode.String(), strings.ToLower(con.Address), strings.ToLower(event.Name))
err := r.db.Select(&addrs, pgStr)
if err != nil {
return []string{}, err

View File

@ -20,16 +20,16 @@ import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/full/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/helpers/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/retriever"
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
)
var mockEvent = core.WatchedEvent{
@ -68,11 +68,11 @@ var _ = Describe("Address Retriever Test", func() {
log, err = c.Convert(mockEvent, event)
Expect(err).ToNot(HaveOccurred())
dataStore = repository.NewEventRepository(db)
dataStore.PersistLog(*log, info.Address, info.Name)
dataStore = repository.NewEventRepository(db, types.FullSync)
dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name)
Expect(err).ToNot(HaveOccurred())
r = retriever.NewAddressRetriever(db)
r = retriever.NewAddressRetriever(db, types.FullSync)
})
AfterEach(func() {

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test
package retriever_test
import (
"io/ioutil"
@ -25,9 +25,9 @@ import (
. "github.com/onsi/gomega"
)
func TestRepository(t *testing.T) {
func TestRetriever(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Full Repository Suite Test")
RunSpecs(t, "Address Retriever Suite Test")
}
var _ = BeforeSuite(func() {

View File

@ -21,7 +21,6 @@ import (
"strings"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/i-norden/go-ethereum/core/types"
)
type Event struct {
@ -37,7 +36,6 @@ type Field struct {
// Struct to hold instance of an event log data
type Log struct {
Event
Id int64 // VulcanizeIdLog for full sync and header ID for light sync omni watcher
Values map[string]string // Map of event input names to their values
@ -48,7 +46,7 @@ type Log struct {
// Used for lightSync only
LogIndex uint
TransactionIndex uint
Raw types.Log
Raw []byte // json.Unmarshalled byte array of geth/core/types.Log{}
}
// Unpack abi.Event into our custom Event struct

View File

@ -0,0 +1,64 @@
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package types
import "fmt"
type Mode int
const (
LightSync Mode = iota
FullSync
)
func (mode Mode) IsValid() bool {
return mode >= LightSync && mode <= FullSync
}
func (mode Mode) String() string {
switch mode {
case LightSync:
return "light"
case FullSync:
return "full"
default:
return "unknown"
}
}
func (mode Mode) MarshalText() ([]byte, error) {
switch mode {
case LightSync:
return []byte("light"), nil
case FullSync:
return []byte("full"), nil
default:
return nil, fmt.Errorf("omni watcher: unknown mode %d, want LightSync or FullSync", mode)
}
}
func (mode *Mode) UnmarshalText(text []byte) error {
switch string(text) {
case "light":
*mode = LightSync
case "full":
*mode = FullSync
default:
return fmt.Errorf(`omni watcher: unknown mode %q, want "light" or "full"`, text)
}
return nil
}