Increase logging in contract watcher

- Focus on header mode
- Add context to errors, trace guard clauses, warn on non-returned
  errors
- Give errors distinct names so compiler will recognize if unchecked
- Remove redundant type declarations/fix typos
This commit is contained in:
Rob Mulholand 2019-09-23 22:39:04 -05:00
parent b4e16c4af5
commit 4505382590
16 changed files with 168 additions and 126 deletions

View File

@ -19,6 +19,8 @@ package transformer
import ( import (
"errors" "errors"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever"
@ -106,7 +108,11 @@ func (tr *Transformer) Init() error {
// Get contract name if it has one // Get contract name if it has one
var name = new(string) var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock) pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}
// Remove any potential accidental duplicate inputs in arg filter values // Remove any potential accidental duplicate inputs in arg filter values
eventArgs := map[string]bool{} eventArgs := map[string]bool{}

View File

@ -132,7 +132,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in
// Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs // Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs
func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) { func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil) boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
eventsToLogs := make(map[string][]types.Log) eventsToLogs := make(map[string][]types.Log)
for _, event := range events { for _, event := range events {
eventsToLogs[event.Name] = make([]types.Log, 0, len(logs)) eventsToLogs[event.Name] = make([]types.Log, 0, len(logs))
@ -141,7 +141,7 @@ func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
// If the log is of this event type, process it as such // If the log is of this event type, process it as such
if event.Sig() == log.Topics[0] { if event.Sig() == log.Topics[0] {
values := make(map[string]interface{}) values := make(map[string]interface{})
err := contract.UnpackLogIntoMap(values, event.Name, log) err := boundContract.UnpackLogIntoMap(values, event.Name, log)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -148,7 +149,10 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
pgStr = pgStr[:len(pgStr)-2] pgStr = pgStr[:len(pgStr)-2]
_, err = tx.Exec(pgStr, header.Id) _, err = tx.Exec(pgStr, header.Id)
if err != nil { if err != nil {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error())
}
return err return err
} }
} }
@ -246,6 +250,7 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
// Returns a continuous set of headers // Returns a continuous set of headers
func continuousHeaders(headers []core.Header) []core.Header { func continuousHeaders(headers []core.Header) []core.Header {
if len(headers) < 1 { if len(headers) < 1 {
logrus.Trace("no headers to arrange continuously")
return headers return headers
} }
previousHeader := headers[0].BlockNumber previousHeader := headers[0].BlockNumber

View File

@ -137,9 +137,9 @@ var _ = Describe("Repository", func() {
h1 := missingHeaders[0] h1 := missingHeaders[0]
h2 := missingHeaders[1] h2 := missingHeaders[1]
h3 := missingHeaders[2] h3 := missingHeaders[2]
Expect(h1.BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) Expect(h1.BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(h2.BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) Expect(h2.BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
Expect(h3.BlockNumber).To(Equal(int64(mocks.MockHeader3.BlockNumber))) Expect(h3.BlockNumber).To(Equal(mocks.MockHeader3.BlockNumber))
}) })
It("Returns only contiguous chunks of headers", func() { It("Returns only contiguous chunks of headers", func() {
@ -150,8 +150,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0]) missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0])
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2)) Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
}) })
It("Fails if eventID does not yet exist in check_headers table", func() { It("Fails if eventID does not yet exist in check_headers table", func() {
@ -199,8 +199,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs) missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2)) Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber))) Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber))) Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
}) })
It("returns headers after starting header if starting header not missing", func() { It("returns headers after starting header if starting header not missing", func() {

View File

@ -44,9 +44,12 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveFirstBlock", func() { Describe("RetrieveFirstBlock", func() {
It("Retrieves block number of earliest header in the database", func() { It("Retrieves block number of earliest header in the database", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) _, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())
i, err := r.RetrieveFirstBlock() i, err := r.RetrieveFirstBlock()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -61,9 +64,12 @@ var _ = Describe("Block Retriever", func() {
Describe("RetrieveMostRecentBlock", func() { Describe("RetrieveMostRecentBlock", func() {
It("Retrieves the latest header's block number", func() { It("Retrieves the latest header's block number", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1) _, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2) Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3) _, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())
i, err := r.RetrieveMostRecentBlock() i, err := r.RetrieveMostRecentBlock()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -18,10 +18,12 @@ package transformer
import ( import (
"errors" "errors"
"fmt"
"strings" "strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types" gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter"
@ -107,22 +109,22 @@ func (tr *Transformer) Init() error {
// Configure Abi // Configure Abi
if tr.Config.Abis[contractAddr] == "" { if tr.Config.Abis[contractAddr] == "" {
// If no abi is given in the config, this method will try fetching from internal look-up table and etherscan // If no abi is given in the config, this method will try fetching from internal look-up table and etherscan
err := tr.Parser.Parse(contractAddr) parseErr := tr.Parser.Parse(contractAddr)
if err != nil { if parseErr != nil {
return err return fmt.Errorf("error parsing contract by address: %s", parseErr.Error())
} }
} else { } else {
// If we have an abi from the config, load that into the parser // If we have an abi from the config, load that into the parser
err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr]) parseErr := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if err != nil { if parseErr != nil {
return err return fmt.Errorf("error parsing contract abi: %s", parseErr.Error())
} }
} }
// Get first block and most recent block number in the header repo // Get first block and most recent block number in the header repo
firstBlock, err := tr.Retriever.RetrieveFirstBlock() firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock()
if err != nil { if retrieveErr != nil {
return err return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())
} }
// Set to specified range if it falls within the bounds // Set to specified range if it falls within the bounds
@ -132,7 +134,11 @@ func (tr *Transformer) Init() error {
// Get contract name if it has one // Get contract name if it has one
var name = new(string) var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1) pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}
// Remove any potential accidental duplicate inputs // Remove any potential accidental duplicate inputs
eventArgs := map[string]bool{} eventArgs := map[string]bool{}
@ -165,9 +171,9 @@ func (tr *Transformer) Init() error {
tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events)) tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
for _, event := range con.Events { for _, event := range con.Events {
eventId := strings.ToLower(event.Name + "_" + con.Address) eventId := strings.ToLower(event.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(eventId) addColumnErr := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil { if addColumnErr != nil {
return err return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
} }
// Keep track of this event id; sorted and unsorted // Keep track of this event id; sorted and unsorted
tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId) tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId)
@ -180,9 +186,9 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods)) tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
for _, m := range con.Methods { for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address) methodId := strings.ToLower(m.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(methodId) addColumnErr := tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil { if addColumnErr != nil {
return err return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
} }
tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId) tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId)
} }
@ -202,9 +208,9 @@ func (tr *Transformer) Execute() error {
} }
// Find unchecked headers for all events across all contracts; these are returned in asc order // Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds) missingHeaders, missingHeadersErr := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if err != nil { if missingHeadersErr != nil {
return err return fmt.Errorf("error getting missing headers: %s", missingHeadersErr.Error())
} }
// Iterate over headers // Iterate over headers
@ -216,23 +222,24 @@ func (tr *Transformer) Execute() error {
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing // Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log) sortedLogs := make(map[string][]gethTypes.Log)
// And fetch all event logs across contracts at this header // And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header) allLogs, fetchErr := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if err != nil { if fetchErr != nil {
return err return fmt.Errorf("error fetching logs: %s", fetchErr.Error())
} }
// If no logs are found mark the header checked for all of these eventIDs // If no logs are found mark the header checked for all of these eventIDs
// and continue to method polling and onto the next iteration // and continue to method polling and onto the next iteration
if len(allLogs) < 1 { if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if err != nil { if markCheckedErr != nil {
return err return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
} }
err = tr.methodPolling(header, tr.sortedMethodIds) pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if err != nil { if pollingErr != nil {
return err return fmt.Errorf("error polling methods: %s", pollingErr.Error())
} }
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
logrus.Tracef("no logs found for block %d, continuing", header.BlockNumber)
continue continue
} }
@ -245,6 +252,7 @@ func (tr *Transformer) Execute() error {
// Process logs for each contract // Process logs for each contract
for conAddr, logs := range sortedLogs { for conAddr, logs := range sortedLogs {
if logs == nil { if logs == nil {
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue continue
} }
// Configure converter with this contract // Configure converter with this contract
@ -252,34 +260,35 @@ func (tr *Transformer) Execute() error {
tr.Converter.Update(con) tr.Converter.Update(con)
// Convert logs into batches of log mappings (eventName => []types.Logs // Convert logs into batches of log mappings (eventName => []types.Logs
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id) convertedLogs, convertErr := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil { if convertErr != nil {
return err return fmt.Errorf("error converting logs: %s", convertErr.Error())
} }
// Cycle through each type of event log and persist them // Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs { for eventName, logs := range convertedLogs {
// If logs for this event are empty, mark them checked at this header and continue // If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 { if len(logs) < 1 {
eventId := strings.ToLower(eventName + "_" + con.Address) eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId) markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil { if markCheckedErr != nil {
return err return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
} }
logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber)
continue continue
} }
// If logs aren't empty, persist them // If logs aren't empty, persist them
// Header is marked checked in the transactions // Header is marked checked in the transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil { if persistErr != nil {
return err return fmt.Errorf("error persisting logs: %s", persistErr.Error())
} }
} }
} }
// Poll contracts at this block height // Poll contracts at this block height
err = tr.methodPolling(header, tr.sortedMethodIds) pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if err != nil { if pollingErr != nil {
return err return fmt.Errorf("error polling methods: %s", pollingErr.Error())
} }
// Success; setup to start at the next header // Success; setup to start at the next header
tr.Start = header.BlockNumber + 1 tr.Start = header.BlockNumber + 1
@ -294,19 +303,20 @@ func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[str
// Skip method polling processes if no methods are specified // Skip method polling processes if no methods are specified
// Also don't try to poll methods below this contract's specified starting block // Also don't try to poll methods below this contract's specified starting block
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock { if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
logrus.Tracef("not polling contract: %s", con.Address)
continue continue
} }
// Poll all methods for this contract at this header // Poll all methods for this contract at this header
err := tr.Poller.PollContractAt(*con, header.BlockNumber) pollingErr := tr.Poller.PollContractAt(*con, header.BlockNumber)
if err != nil { if pollingErr != nil {
return err return fmt.Errorf("error polling contract %s: %s", con.Address, pollingErr.Error())
} }
// Mark this header checked for the methods // Mark this header checked for the methods
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address]) markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if err != nil { if markCheckedErr != nil {
return err return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
} }
} }

View File

@ -68,7 +68,7 @@ var _ = Describe("Transformer", func() {
err := t.Init() err := t.Init()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
}) })
}) })
@ -109,7 +109,7 @@ var _ = Describe("Transformer", func() {
err := t.Init() err := t.Init()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
}) })
}) })
}) })

View File

@ -24,7 +24,7 @@ import (
var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]` var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]`
// Individual event interfaces for constructing ABI from // Individual event interfaces for constructing ABI from
var SupportsInterace = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}` var SupportsInterface = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}` var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}`
var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}` var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}`
var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}` var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}`

View File

@ -47,7 +47,7 @@ func newFetcherError(err error, fetchMethod string) *fetcherError {
// Fetcher struct // Fetcher struct
type Fetcher struct { type Fetcher struct {
BlockChain core.BlockChain // Underyling Blockchain BlockChain core.BlockChain // Underlying Blockchain
} }
// Fetcher error // Fetcher error

View File

@ -115,9 +115,9 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) {
rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC) rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC)
ethClient := ethclient.NewClient(rawRpcClient) ethClient := ethclient.NewClient(rawRpcClient)
blockChainClient := client.NewEthClient(ethClient) blockChainClient := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient) madeNode := node.MakeNode(rpcClient)
transactionConverter := rpc2.NewRpcTransactionConverter(ethClient) transactionConverter := rpc2.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter) blockChain := geth.NewBlockChain(blockChainClient, rpcClient, madeNode, transactionConverter)
db, err := postgres.NewDB(config.Database{ db, err := postgres.NewDB(config.Database{
Hostname: "localhost", Hostname: "localhost",

View File

@ -324,7 +324,7 @@ var MockConfig = config.ContractConfig{
"0x1234567890abcdef": "fake_abi", "0x1234567890abcdef": "fake_abi",
}, },
Events: map[string][]string{ Events: map[string][]string{
"0x1234567890abcdef": []string{"Transfer"}, "0x1234567890abcdef": {"Transfer"},
}, },
Methods: map[string][]string{ Methods: map[string][]string{
"0x1234567890abcdef": nil, "0x1234567890abcdef": nil,

View File

@ -35,7 +35,7 @@ var TusdConfig = config.ContractConfig{
tusd: "", tusd: "",
}, },
Events: map[string][]string{ Events: map[string][]string{
tusd: []string{"Transfer"}, tusd: {"Transfer"},
}, },
Methods: map[string][]string{ Methods: map[string][]string{
tusd: nil, tusd: nil,
@ -60,7 +60,7 @@ var ENSConfig = config.ContractConfig{
ens: "", ens: "",
}, },
Events: map[string][]string{ Events: map[string][]string{
ens: []string{"NewOwner"}, ens: {"NewOwner"},
}, },
Methods: map[string][]string{ Methods: map[string][]string{
ens: nil, ens: nil,
@ -87,8 +87,8 @@ var ENSandTusdConfig = config.ContractConfig{
tusd: "", tusd: "",
}, },
Events: map[string][]string{ Events: map[string][]string{
ens: []string{"NewOwner"}, ens: {"NewOwner"},
tusd: []string{"Transfer"}, tusd: {"Transfer"},
}, },
Methods: map[string][]string{ Methods: map[string][]string{
ens: nil, ens: nil,

View File

@ -22,6 +22,7 @@ import (
"strings" "strings"
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
@ -31,7 +32,7 @@ import (
const ( const (
// Number of contract address and method ids to keep in cache // Number of contract address and method ids to keep in cache
contractCacheSize = 100 contractCacheSize = 100
eventChacheSize = 1000 eventCacheSize = 1000
) )
// Event repository is used to persist event data into custom tables // Event repository is used to persist event data into custom tables
@ -52,7 +53,7 @@ type eventRepository struct {
func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository { func NewEventRepository(db *postgres.DB, mode types.Mode) *eventRepository {
ccs, _ := lru.New(contractCacheSize) ccs, _ := lru.New(contractCacheSize)
ecs, _ := lru.New(eventChacheSize) ecs, _ := lru.New(eventCacheSize)
return &eventRepository{ return &eventRepository{
db: db, db: db,
mode: mode, mode: mode,
@ -68,14 +69,14 @@ func (r *eventRepository) PersistLogs(logs []types.Log, eventInfo types.Event, c
if len(logs) == 0 { if len(logs) == 0 {
return errors.New("event repository error: passed empty logs slice") return errors.New("event repository error: passed empty logs slice")
} }
_, err := r.CreateContractSchema(contractAddr) _, schemaErr := r.CreateContractSchema(contractAddr)
if err != nil { if schemaErr != nil {
return err return fmt.Errorf("error creating schema for contract %s: %s", contractAddr, schemaErr.Error())
} }
_, err = r.CreateEventTable(contractAddr, eventInfo) _, tableErr := r.CreateEventTable(contractAddr, eventInfo)
if err != nil { if tableErr != nil {
return err return fmt.Errorf("error creating table for event %s on contract %s: %s", eventInfo.Name, contractAddr, tableErr.Error())
} }
return r.persistLogs(logs, eventInfo, contractAddr, contractName) return r.persistLogs(logs, eventInfo, contractAddr, contractName)
@ -97,9 +98,9 @@ func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, c
// Creates a custom postgres command to persist logs for the given event (compatible with header synced vDB) // Creates a custom postgres command to persist logs for the given event (compatible with header synced vDB)
func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Beginx() tx, txErr := r.db.Beginx()
if err != nil { if txErr != nil {
return err return fmt.Errorf("error beginning db transaction: %s", txErr.Error())
} }
for _, event := range logs { for _, event := range logs {
@ -130,20 +131,27 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type
} }
pgStr = pgStr + ") ON CONFLICT DO NOTHING" pgStr = pgStr + ") ON CONFLICT DO NOTHING"
logrus.Tracef("query for inserting log: %s", pgStr)
// Add this query to the transaction // Add this query to the transaction
_, err = tx.Exec(pgStr, data...) _, execErr := tx.Exec(pgStr, data...)
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
logrus.Warnf("error rolling back transactions while persisting logs: %s", rollbackErr.Error())
}
return fmt.Errorf("error executing query: %s", execErr.Error())
} }
} }
// Mark header as checked for this eventId // Mark header as checked for this eventId
eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr) eventId := strings.ToLower(eventInfo.Name + "_" + contractAddr)
err = repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].Id, tx, eventId) // This assumes all logs are from same block markCheckedErr := repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].Id, tx, eventId) // This assumes all logs are from same block
if err != nil { if markCheckedErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
logrus.Warnf("error rolling back transaction while marking header checked: %s", rollbackErr.Error())
}
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
} }
return tx.Commit() return tx.Commit()
@ -151,9 +159,9 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type
// Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB) // Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB)
func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error {
tx, err := r.db.Beginx() tx, txErr := r.db.Beginx()
if err != nil { if txErr != nil {
return err return fmt.Errorf("error beginning db transaction: %s", txErr.Error())
} }
for _, event := range logs { for _, event := range logs {
@ -179,10 +187,14 @@ func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.
} }
pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING" pgStr = pgStr + ") ON CONFLICT (vulcanize_log_id) DO NOTHING"
_, err = tx.Exec(pgStr, data...) logrus.Tracef("query for inserting log: %s", pgStr)
if err != nil { _, execErr := tx.Exec(pgStr, data...)
tx.Rollback() if execErr != nil {
return err rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transactions while persisting logs: %s", rollbackErr.Error())
}
return fmt.Errorf("error executing query: %s", execErr.Error())
} }
} }
@ -198,15 +210,15 @@ func (r *eventRepository) CreateEventTable(contractAddr string, event types.Even
if ok { if ok {
return false, nil return false, nil
} }
tableExists, err := r.checkForTable(contractAddr, event.Name) tableExists, checkTableErr := r.checkForTable(contractAddr, event.Name)
if err != nil { if checkTableErr != nil {
return false, err return false, fmt.Errorf("error checking for table: %s", checkTableErr)
} }
if !tableExists { if !tableExists {
err = r.newEventTable(tableID, event) createTableErr := r.newEventTable(tableID, event)
if err != nil { if createTableErr != nil {
return false, err return false, fmt.Errorf("error creating table: %s", createTableErr.Error())
} }
} }
@ -270,14 +282,14 @@ func (r *eventRepository) CreateContractSchema(contractAddr string) (bool, error
if ok { if ok {
return false, nil return false, nil
} }
schemaExists, err := r.checkForSchema(contractAddr) schemaExists, checkSchemaErr := r.checkForSchema(contractAddr)
if err != nil { if checkSchemaErr != nil {
return false, err return false, fmt.Errorf("error checking for schema: %s", checkSchemaErr.Error())
} }
if !schemaExists { if !schemaExists {
err = r.newContractSchema(contractAddr) createSchemaErr := r.newContractSchema(contractAddr)
if err != nil { if createSchemaErr != nil {
return false, err return false, fmt.Errorf("error creating schema: %s", createSchemaErr.Error())
} }
} }

View File

@ -190,7 +190,7 @@ var _ = Describe("Repository", func() {
} }
Expect(scanLog).To(Equal(expectedLog)) Expect(scanLog).To(Equal(expectedLog))
// Attempt to persist the same log again in seperate call // Attempt to persist the same log again in separate call
err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name) err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -22,6 +22,7 @@ import (
"strings" "strings"
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -112,7 +113,10 @@ func (r *methodRepository) persistResults(results []types.Result, methodInfo typ
// Add this query to the transaction // Add this query to the transaction
_, err = tx.Exec(pgStr, data...) _, err = tx.Exec(pgStr, data...)
if err != nil { if err != nil {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error())
}
return err return err
} }
} }

View File

@ -48,12 +48,10 @@ var mockEvent = core.WatchedEvent{
var _ = Describe("Address Retriever Test", func() { var _ = Describe("Address Retriever Test", func() {
var db *postgres.DB var db *postgres.DB
var dataStore repository.EventRepository var dataStore repository.EventRepository
var err error
var info *contract.Contract var info *contract.Contract
var vulcanizeLogId int64 var vulcanizeLogId int64
var log *types.Log var log *types.Log
var r retriever.AddressRetriever var r retriever.AddressRetriever
var addresses map[common.Address]bool
var wantedEvents = []string{"Transfer"} var wantedEvents = []string{"Transfer"}
BeforeEach(func() { BeforeEach(func() {
@ -61,17 +59,18 @@ var _ = Describe("Address Retriever Test", func() {
mockEvent.LogID = vulcanizeLogId mockEvent.LogID = vulcanizeLogId
event := info.Events["Transfer"] event := info.Events["Transfer"]
err = info.GenerateFilters() filterErr := info.GenerateFilters()
Expect(err).ToNot(HaveOccurred()) Expect(filterErr).ToNot(HaveOccurred())
c := converter.Converter{} c := converter.Converter{}
c.Update(info) c.Update(info)
log, err = c.Convert(mockEvent, event) var convertErr error
Expect(err).ToNot(HaveOccurred()) log, convertErr = c.Convert(mockEvent, event)
Expect(convertErr).ToNot(HaveOccurred())
dataStore = repository.NewEventRepository(db, types.FullSync) dataStore = repository.NewEventRepository(db, types.FullSync)
dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name) persistErr := dataStore.PersistLogs([]types.Log{*log}, event, info.Address, info.Name)
Expect(err).ToNot(HaveOccurred()) Expect(persistErr).ToNot(HaveOccurred())
r = retriever.NewAddressRetriever(db, types.FullSync) r = retriever.NewAddressRetriever(db, types.FullSync)
}) })
@ -82,8 +81,8 @@ var _ = Describe("Address Retriever Test", func() {
Describe("RetrieveTokenHolderAddresses", func() { Describe("RetrieveTokenHolderAddresses", func() {
It("Retrieves a list of token holder addresses from persisted event logs", func() { It("Retrieves a list of token holder addresses from persisted event logs", func() {
addresses, err = r.RetrieveTokenHolderAddresses(*info) addresses, retrieveErr := r.RetrieveTokenHolderAddresses(*info)
Expect(err).ToNot(HaveOccurred()) Expect(retrieveErr).ToNot(HaveOccurred())
_, ok := addresses[common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")] _, ok := addresses[common.HexToAddress("0x000000000000000000000000000000000000000000000000000000000000af21")]
Expect(ok).To(Equal(true)) Expect(ok).To(Equal(true))
@ -100,8 +99,8 @@ var _ = Describe("Address Retriever Test", func() {
}) })
It("Returns empty list when empty contract info is used", func() { It("Returns empty list when empty contract info is used", func() {
addresses, err = r.RetrieveTokenHolderAddresses(contract.Contract{}) addresses, retrieveErr := r.RetrieveTokenHolderAddresses(contract.Contract{})
Expect(err).ToNot(HaveOccurred()) Expect(retrieveErr).ToNot(HaveOccurred())
Expect(len(addresses)).To(Equal(0)) Expect(len(addresses)).To(Equal(0))
}) })
}) })