review fixes

This commit is contained in:
Ian Norden 2019-10-28 08:48:06 -05:00
parent 4fbde836d4
commit a2d249ca9d
6 changed files with 19 additions and 139 deletions

View File

@ -1,27 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import "github.com/jmoiron/sqlx"
func MarkContractWatcherHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+checkedHeadersColumn+` = checked_headers.`+checkedHeadersColumn+` + 1`, headerID, 1)
return err
}

View File

@ -1,67 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("", func() {
Describe("MarkContractWatcherHeaderCheckedInTransaction", func() {
var (
checkedHeadersColumn string
db *postgres.DB
)
BeforeEach(func() {
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
checkedHeadersColumn = "test_column_checked"
_, migrateErr := db.Exec(`ALTER TABLE public.checked_headers
ADD COLUMN ` + checkedHeadersColumn + ` integer`)
Expect(migrateErr).NotTo(HaveOccurred())
})
AfterEach(func() {
_, cleanupMigrateErr := db.Exec(`ALTER TABLE public.checked_headers DROP COLUMN ` + checkedHeadersColumn)
Expect(cleanupMigrateErr).NotTo(HaveOccurred())
})
It("marks passed header as checked within a passed transaction", func() {
headerRepository := repositories.NewHeaderRepository(db)
headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
Expect(headerErr).NotTo(HaveOccurred())
tx, txErr := db.Beginx()
Expect(txErr).NotTo(HaveOccurred())
err := repository.MarkContractWatcherHeaderCheckedInTransaction(headerID, tx, checkedHeadersColumn)
Expect(err).NotTo(HaveOccurred())
commitErr := tx.Commit()
Expect(commitErr).NotTo(HaveOccurred())
var checkedCount int
fetchErr := db.Get(&checkedCount, `SELECT COUNT(*) FROM public.checked_headers WHERE header_id = $1`, headerID)
Expect(fetchErr).NotTo(HaveOccurred())
Expect(checkedCount).To(Equal(1))
})
})
})

View File

@ -128,7 +128,7 @@ func (tr *Transformer) Init() error {
firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock() firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock()
if retrieveErr != nil { if retrieveErr != nil {
if retrieveErr == sql.ErrNoRows { if retrieveErr == sql.ErrNoRows {
logrus.Error(retrieveErr) logrus.Error(fmt.Errorf("error retrieving first block: %s", retrieveErr.Error()))
firstBlock = 0 firstBlock = 0
} else { } else {
return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error()) return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())
@ -252,12 +252,6 @@ func (tr *Transformer) Execute() error {
continue continue
} }
// Sort logs by the contract they belong to
// Start by adding every contract addr to the map
// So that if we don't have any logs for it, it is caught and the header is still marked checked for its events
for _, addr := range tr.contractAddresses {
sortedLogs[addr] = nil
}
for _, log := range allLogs { for _, log := range allLogs {
addr := strings.ToLower(log.Address.Hex()) addr := strings.ToLower(log.Address.Hex())
sortedLogs[addr] = append(sortedLogs[addr], log) sortedLogs[addr] = append(sortedLogs[addr], log)
@ -265,20 +259,12 @@ func (tr *Transformer) Execute() error {
// Process logs for each contract // Process logs for each contract
for conAddr, logs := range sortedLogs { for conAddr, logs := range sortedLogs {
con := tr.Contracts[conAddr] if logs == nil {
if len(logs) < 1 {
eventIds := make([]string, 0)
for eventName := range con.Events {
eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address))
}
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber) logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue continue
} }
// Configure converter with this contract // Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con) tr.Converter.Update(con)
// Convert logs into batches of log mappings (eventName => []types.Logs // Convert logs into batches of log mappings (eventName => []types.Logs
@ -290,16 +276,10 @@ func (tr *Transformer) Execute() error {
for eventName, logs := range convertedLogs { for eventName, logs := range convertedLogs {
// If logs for this event are empty, mark them checked at this header and continue // If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 { if len(logs) < 1 {
eventID := strings.ToLower(eventName + "_" + con.Address)
markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventID)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber) logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber)
continue continue
} }
// If logs aren't empty, persist them // If logs aren't empty, persist them
// Header is marked checked in the transactions
persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if persistErr != nil { if persistErr != nil {
return fmt.Errorf("error persisting logs: %s", persistErr.Error()) return fmt.Errorf("error persisting logs: %s", persistErr.Error())
@ -307,6 +287,11 @@ func (tr *Transformer) Execute() error {
} }
} }
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
// Poll contracts at this block height // Poll contracts at this block height
pollingErr := tr.methodPolling(header, tr.sortedMethodIds) pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if pollingErr != nil { if pollingErr != nil {

View File

@ -17,6 +17,8 @@
package getter package getter
import ( import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/constants"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/fetcher" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
@ -24,7 +26,7 @@ import (
// InterfaceGetter is used to derive the interface of a contract // InterfaceGetter is used to derive the interface of a contract
type InterfaceGetter interface { type InterfaceGetter interface {
GetABI(resolverAddr string, blockNumber int64) string GetABI(resolverAddr string, blockNumber int64) (string, error)
GetBlockChain() core.BlockChain GetBlockChain() core.BlockChain
} }
@ -42,14 +44,18 @@ func NewInterfaceGetter(blockChain core.BlockChain) InterfaceGetter {
} }
// GetABI is used to construct a custom ABI based on the results from calling supportsInterface // GetABI is used to construct a custom ABI based on the results from calling supportsInterface
func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string { func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) (string, error) {
a := constants.SupportsInterfaceABI a := constants.SupportsInterfaceABI
args := make([]interface{}, 1) args := make([]interface{}, 1)
args[0] = constants.MetaSig.Bytes() args[0] = constants.MetaSig.Bytes()
supports, err := g.getSupportsInterface(a, resolverAddr, blockNumber, args) supports, err := g.getSupportsInterface(a, resolverAddr, blockNumber, args)
if err != nil || !supports { if err != nil {
return "" return "", fmt.Errorf("call to getSupportsInterface failed: %v", err)
} }
if !supports {
return "", fmt.Errorf("contract does not support interface")
}
abiStr := `[` abiStr := `[`
args[0] = constants.AddrChangeSig.Bytes() args[0] = constants.AddrChangeSig.Bytes()
supports, err = g.getSupportsInterface(a, resolverAddr, blockNumber, args) supports, err = g.getSupportsInterface(a, resolverAddr, blockNumber, args)
@ -93,7 +99,7 @@ func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string
} }
abiStr = abiStr[:len(abiStr)-1] + `]` abiStr = abiStr[:len(abiStr)-1] + `]`
return abiStr return abiStr, nil
} }
// Use this method to check whether or not a contract supports a given method/event interface // Use this method to check whether or not a contract supports a given method/event interface

View File

@ -24,7 +24,6 @@ import (
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
@ -144,17 +143,6 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type
} }
} }
// Mark header as checked for this eventId
eventID := strings.ToLower(eventInfo.Name + "_" + contractAddr)
markCheckedErr := repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].ID, tx, eventID) // This assumes all logs are from same block
if markCheckedErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transaction while marking header checked: %s", rollbackErr.Error())
}
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
return tx.Commit() return tx.Commit()
} }

View File

@ -355,11 +355,6 @@ var _ = Describe("Repository", func() {
Expect(count).To(Equal(2)) Expect(count).To(Equal(2))
}) })
It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() {
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
})
It("Fails with empty log", func() { It("Fails with empty log", func() {
err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name) err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())