From f7c4a6736d6cc20fa20ad91330c64cdd9aa231fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edvard=20H=C3=BCbinette?= Date: Mon, 28 Oct 2019 11:48:31 +0100 Subject: [PATCH] VDB-919 Generalise converter (#152) * Generalise transformer stack to use InsertionModel * Add tests for event repository * Restrict accepted values in InsertionModel * Add call to repository.SetDB * Improve error propagation/clarity on GetABI() * Remove maker references in example * Please golint * refactor rollback error handling in repository * Cleaner errors in repository, refactor tests --- .../integration_test_suite_test.go | 6 + libraries/shared/factories/event/converter.go | 10 +- .../shared/factories/event/repository.go | 158 +++++++++++++- .../shared/factories/event/repository_test.go | 204 ++++++++++++++++++ .../shared/factories/event/transformer.go | 9 +- .../factories/event/transformer_test.go | 26 +-- libraries/shared/mocks/event_converter.go | 40 ++-- libraries/shared/mocks/event_repository.go | 5 +- libraries/shared/test_data/generic.go | 6 +- libraries/shared/test_data/test_helpers.go | 37 ++++ .../shared/getter/getter_test.go | 3 +- .../shared/getter/interface_getter.go | 13 +- utils/utils.go | 19 +- 13 files changed, 460 insertions(+), 76 deletions(-) create mode 100644 libraries/shared/factories/event/repository_test.go create mode 100644 libraries/shared/test_data/test_helpers.go diff --git a/integration_test/integration_test_suite_test.go b/integration_test/integration_test_suite_test.go index fd5e5588..d0db85e5 100644 --- a/integration_test/integration_test_suite_test.go +++ b/integration_test/integration_test_suite_test.go @@ -17,6 +17,8 @@ package integration_test import ( + "github.com/sirupsen/logrus" + "io/ioutil" "testing" . "github.com/onsi/ginkgo" @@ -27,3 +29,7 @@ func TestIntegrationTest(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "IntegrationTest Suite") } + +var _ = BeforeSuite(func() { + logrus.SetOutput(ioutil.Discard) +}) diff --git a/libraries/shared/factories/event/converter.go b/libraries/shared/factories/event/converter.go index 525b628a..ba159e51 100644 --- a/libraries/shared/factories/event/converter.go +++ b/libraries/shared/factories/event/converter.go @@ -16,9 +16,13 @@ package event -import "github.com/vulcanize/vulcanizedb/pkg/core" +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) +// Converter transforms log data into general InsertionModels the Repository can persist__ type Converter interface { - ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error) - ToModels([]interface{}) ([]interface{}, error) + ToModels(contractAbi string, ethLog []core.HeaderSyncLog) ([]InsertionModel, error) + SetDB(db *postgres.DB) } diff --git a/libraries/shared/factories/event/repository.go b/libraries/shared/factories/event/repository.go index c537455b..c8fd397c 100644 --- a/libraries/shared/factories/event/repository.go +++ b/libraries/shared/factories/event/repository.go @@ -16,9 +16,163 @@ package event -import "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +import ( + "database/sql/driver" + "fmt" + "github.com/vulcanize/vulcanizedb/utils" + "strings" + "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +const SetLogTransformedQuery = `UPDATE public.header_sync_logs SET transformed = true WHERE id = $1` + +// Repository persists transformed values to the DB type Repository interface { - Create(models []interface{}) error + Create(models []InsertionModel) error SetDB(db *postgres.DB) } + +// LogFK is the name of log foreign key columns +const LogFK ColumnName = "log_id" + +// AddressFK is the name of address foreign key columns +const AddressFK ColumnName = "address_id" + +// HeaderFK is the name of header foreign key columns +const HeaderFK ColumnName = "header_id" + +// SchemaName is the schema to work with +type SchemaName string + +// TableName identifies the table for inserting the data +type TableName string + +// ColumnName identifies columns on the given table +type ColumnName string + +// ColumnValues maps a column to the value for insertion. This is restricted to []byte, bool, float64, int64, string, time.Time +type ColumnValues map[ColumnName]interface{} + +// ErrUnsupportedValue is thrown when a model supplies a type of value the postgres driver cannot handle. +var ErrUnsupportedValue = func(value interface{}) error { + return fmt.Errorf("unsupported type of value supplied in model: %v (%T)", value, value) +} + +// InsertionModel is the generalised data structure a converter returns, and contains everything the repository needs to +// persist the converted data. +type InsertionModel struct { + SchemaName SchemaName + TableName TableName + OrderedColumns []ColumnName // Defines the fields to insert, and in which order the table expects them + ColumnValues ColumnValues // Associated values for columns, restricted to []byte, bool, float64, int64, string, time.Time +} + +// ModelToQuery stores memoised insertion queries to minimise computation +var ModelToQuery = map[string]string{} + +// GetMemoizedQuery gets/creates a DB insertion query for the model +func GetMemoizedQuery(model InsertionModel) string { + // The schema and table name uniquely determines the insertion query, use that for memoization + queryKey := string(model.SchemaName) + string(model.TableName) + query, queryMemoized := ModelToQuery[queryKey] + if !queryMemoized { + query = GenerateInsertionQuery(model) + ModelToQuery[queryKey] = query + } + return query +} + +// GenerateInsertionQuery creates an SQL insertion query from an insertion model. +// Should be called through GetMemoizedQuery, so the query is not generated on each call to Create. +func GenerateInsertionQuery(model InsertionModel) string { + var valuePlaceholders []string + var updateOnConflict []string + for i := 0; i < len(model.OrderedColumns); i++ { + valuePlaceholder := fmt.Sprintf("$%d", 1+i) + valuePlaceholders = append(valuePlaceholders, valuePlaceholder) + updateOnConflict = append(updateOnConflict, + fmt.Sprintf("%s = %s", model.OrderedColumns[i], valuePlaceholder)) + } + + baseQuery := `INSERT INTO %v.%v (%v) VALUES(%v) + ON CONFLICT (header_id, log_id) DO UPDATE SET %v;` + + return fmt.Sprintf(baseQuery, + model.SchemaName, + model.TableName, + joinOrderedColumns(model.OrderedColumns), + strings.Join(valuePlaceholders, ", "), + strings.Join(updateOnConflict, ", ")) +} + +/* +Create generates an insertion query and persists to the DB, given a slice of InsertionModels. +ColumnValues are restricted to []byte, bool, float64, int64, string, time.Time. + +testModel = shared.InsertionModel{ + SchemaName: "public" + TableName: "testEvent", + OrderedColumns: []string{"header_id", "log_id", "variable1"}, + ColumnValues: ColumnValues{ + "header_id": 303 + "log_id": "808", + "variable1": "value1", + }, +} +*/ +func Create(models []InsertionModel, db *postgres.DB) error { + if len(models) == 0 { + return fmt.Errorf("repository got empty model slice") + } + + tx, dbErr := db.Beginx() + if dbErr != nil { + return dbErr + } + + for _, model := range models { + // Maps can't be iterated over in a reliable manner, so we rely on OrderedColumns to define the order to insert + // tx.Exec is variadically typed in the args, so if we wrap in []interface{} we can apply them all automatically + var args []interface{} + for _, col := range model.OrderedColumns { + value := model.ColumnValues[col] + // Check whether or not PG can accept the type of value in the model + okPgValue := driver.IsValue(value) + if !okPgValue { + logrus.WithField("model", model).Errorf("PG cannot handle value of this type: %T", value) + return ErrUnsupportedValue(value) + } + args = append(args, value) + } + + insertionQuery := GetMemoizedQuery(model) + _, execErr := tx.Exec(insertionQuery, args...) // couldn't pass varying types in bulk with args :: []string + + if execErr != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Error("failed to rollback ", rollbackErr) + } + return execErr + } + + _, logErr := tx.Exec(SetLogTransformedQuery, model.ColumnValues[LogFK]) + + if logErr != nil { + utils.RollbackAndLogFailure(tx, logErr, "header_sync_logs.transformed") + return logErr + } + } + + return tx.Commit() +} + +func joinOrderedColumns(columns []ColumnName) string { + var stringColumns []string + for _, columnName := range columns { + stringColumns = append(stringColumns, string(columnName)) + } + return strings.Join(stringColumns, ", ") +} diff --git a/libraries/shared/factories/event/repository_test.go b/libraries/shared/factories/event/repository_test.go new file mode 100644 index 00000000..93dfb789 --- /dev/null +++ b/libraries/shared/factories/event/repository_test.go @@ -0,0 +1,204 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package event_test + +import ( + "fmt" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/test_config" + "math/big" +) + +var _ = Describe("Repository", func() { + var db *postgres.DB + + BeforeEach(func() { + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + }) + + Describe("Create", func() { + const createTestEventTableQuery = `CREATE TABLE public.testEvent( + id SERIAL PRIMARY KEY, + header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, + log_id BIGINT NOT NULL REFERENCES header_sync_logs (id) ON DELETE CASCADE, + variable1 TEXT, + UNIQUE (header_id, log_id) + );` + + var ( + headerID, logID int64 + headerRepository repositories.HeaderRepository + testModel event.InsertionModel + ) + + BeforeEach(func() { + _, tableErr := db.Exec(createTestEventTableQuery) + Expect(tableErr).NotTo(HaveOccurred()) + headerRepository = repositories.NewHeaderRepository(db) + var insertHeaderErr error + headerID, insertHeaderErr = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(insertHeaderErr).NotTo(HaveOccurred()) + headerSyncLog := test_data.CreateTestLog(headerID, db) + logID = headerSyncLog.ID + + testModel = event.InsertionModel{ + SchemaName: "public", + TableName: "testEvent", + OrderedColumns: []event.ColumnName{ + event.HeaderFK, event.LogFK, "variable1", + }, + ColumnValues: event.ColumnValues{ + event.HeaderFK: headerID, + event.LogFK: logID, + "variable1": "value1", + }, + } + }) + + AfterEach(func() { + db.MustExec(`DROP TABLE public.testEvent;`) + }) + + // Needs to run before the other tests, since those insert keys in map + It("memoizes queries", func() { + Expect(len(event.ModelToQuery)).To(Equal(0)) + event.GetMemoizedQuery(testModel) + Expect(len(event.ModelToQuery)).To(Equal(1)) + event.GetMemoizedQuery(testModel) + Expect(len(event.ModelToQuery)).To(Equal(1)) + }) + + It("persists a model to postgres", func() { + createErr := event.Create([]event.InsertionModel{testModel}, db) + Expect(createErr).NotTo(HaveOccurred()) + + var res TestEvent + dbErr := db.Get(&res, `SELECT log_id, variable1 FROM public.testEvent;`) + Expect(dbErr).NotTo(HaveOccurred()) + + Expect(res.LogID).To(Equal(fmt.Sprint(testModel.ColumnValues[event.LogFK]))) + Expect(res.Variable1).To(Equal(testModel.ColumnValues["variable1"])) + }) + + Describe("returns errors", func() { + It("for empty model slice", func() { + err := event.Create([]event.InsertionModel{}, db) + Expect(err).To(MatchError("repository got empty model slice")) + }) + + It("for failed SQL inserts", func() { + header := fakes.GetFakeHeader(1) + headerID, headerErr := headerRepository.CreateOrUpdateHeader(header) + Expect(headerErr).NotTo(HaveOccurred()) + + brokenModel := event.InsertionModel{ + SchemaName: "public", + TableName: "testEvent", + // Wrong name of last column compared to DB, will generate incorrect query + OrderedColumns: []event.ColumnName{ + event.HeaderFK, event.LogFK, "variable2", + }, + ColumnValues: event.ColumnValues{ + event.HeaderFK: headerID, + event.LogFK: logID, + "variable1": "value1", + }, + } + + // Remove cached queries, or we won't generate a new (incorrect) one + delete(event.ModelToQuery, "publictestEvent") + + createErr := event.Create([]event.InsertionModel{brokenModel}, db) + // Remove incorrect query, so other tests won't get it + delete(event.ModelToQuery, "publictestEvent") + + Expect(createErr).To(HaveOccurred()) + }) + + It("for unsupported types in ColumnValue", func() { + unsupportedValue := big.NewInt(5) + testModel = event.InsertionModel{ + SchemaName: "public", + TableName: "testEvent", + OrderedColumns: []event.ColumnName{ + event.HeaderFK, event.LogFK, "variable1", + }, + ColumnValues: event.ColumnValues{ + event.HeaderFK: headerID, + event.LogFK: logID, + "variable1": unsupportedValue, + }, + } + + createErr := event.Create([]event.InsertionModel{testModel}, db) + Expect(createErr).To(MatchError(event.ErrUnsupportedValue(unsupportedValue))) + }) + }) + + It("upserts queries with conflicting source", func() { + conflictingModel := event.InsertionModel{ + SchemaName: "public", + TableName: "testEvent", + OrderedColumns: []event.ColumnName{ + event.HeaderFK, event.LogFK, "variable1", + }, + ColumnValues: event.ColumnValues{ + event.HeaderFK: headerID, + event.LogFK: logID, + "variable1": "conflictingValue", + }, + } + + createErr := event.Create([]event.InsertionModel{testModel, conflictingModel}, db) + Expect(createErr).NotTo(HaveOccurred()) + + var res TestEvent + dbErr := db.Get(&res, `SELECT log_id, variable1 FROM public.testEvent;`) + Expect(dbErr).NotTo(HaveOccurred()) + Expect(res.Variable1).To(Equal(conflictingModel.ColumnValues["variable1"])) + }) + + It("generates correct queries", func() { + actualQuery := event.GenerateInsertionQuery(testModel) + expectedQuery := `INSERT INTO public.testEvent (header_id, log_id, variable1) VALUES($1, $2, $3) + ON CONFLICT (header_id, log_id) DO UPDATE SET header_id = $1, log_id = $2, variable1 = $3;` + Expect(actualQuery).To(Equal(expectedQuery)) + }) + + It("marks log transformed", func() { + createErr := event.Create([]event.InsertionModel{testModel}, db) + Expect(createErr).NotTo(HaveOccurred()) + + var logTransformed bool + getErr := db.Get(&logTransformed, `SELECT transformed FROM public.header_sync_logs WHERE id = $1`, logID) + Expect(getErr).NotTo(HaveOccurred()) + Expect(logTransformed).To(BeTrue()) + }) + }) +}) + +type TestEvent struct { + LogID string `db:"log_id"` + Variable1 string +} diff --git a/libraries/shared/factories/event/transformer.go b/libraries/shared/factories/event/transformer.go index ee7b0a6b..70339f1c 100644 --- a/libraries/shared/factories/event/transformer.go +++ b/libraries/shared/factories/event/transformer.go @@ -30,6 +30,7 @@ type Transformer struct { } func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.EventTransformer { + transformer.Converter.SetDB(db) transformer.Repository.SetDB(db) return transformer } @@ -42,13 +43,7 @@ func (transformer Transformer) Execute(logs []core.HeaderSyncLog) error { return nil } - entities, err := transformer.Converter.ToEntities(config.ContractAbi, logs) - if err != nil { - logrus.Errorf("error converting logs to entities in %v: %v", transformerName, err) - return err - } - - models, err := transformer.Converter.ToModels(entities) + models, err := transformer.Converter.ToModels(config.ContractAbi, logs) if err != nil { logrus.Errorf("error converting entities to models in %v: %v", transformerName, err) return err diff --git a/libraries/shared/factories/event/transformer_test.go b/libraries/shared/factories/event/transformer_test.go index 2306f251..4684695b 100644 --- a/libraries/shared/factories/event/transformer_test.go +++ b/libraries/shared/factories/event/transformer_test.go @@ -66,12 +66,11 @@ var _ = Describe("Transformer", func() { err := t.Execute([]core.HeaderSyncLog{}) Expect(err).NotTo(HaveOccurred()) - Expect(converter.ToEntitiesCalledCounter).To(Equal(0)) Expect(converter.ToModelsCalledCounter).To(Equal(0)) Expect(repository.CreateCalledCounter).To(Equal(0)) }) - It("converts an eth log to an entity", func() { + It("converts an eth log to a model", func() { err := t.Execute(logs) Expect(err).NotTo(HaveOccurred()) @@ -79,26 +78,7 @@ var _ = Describe("Transformer", func() { Expect(converter.LogsToConvert).To(Equal(logs)) }) - It("returns an error if converter fails", func() { - converter.ToEntitiesError = fakes.FakeError - - err := t.Execute(logs) - - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) - }) - - It("converts an entity to a model", func() { - converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}} - - err := t.Execute(logs) - - Expect(err).NotTo(HaveOccurred()) - Expect(converter.EntitiesToConvert[0]).To(Equal(test_data.GenericEntity{})) - }) - It("returns an error if converting to models fails", func() { - converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}} converter.ToModelsError = fakes.FakeError err := t.Execute(logs) @@ -108,12 +88,12 @@ var _ = Describe("Transformer", func() { }) It("persists the record", func() { - converter.ModelsToReturn = []interface{}{test_data.GenericModel{}} + converter.ModelsToReturn = []event.InsertionModel{test_data.GenericModel} err := t.Execute(logs) Expect(err).NotTo(HaveOccurred()) - Expect(repository.PassedModels[0]).To(Equal(test_data.GenericModel{})) + Expect(repository.PassedModels[0]).To(Equal(test_data.GenericModel)) }) It("returns error if persisting the record fails", func() { diff --git a/libraries/shared/mocks/event_converter.go b/libraries/shared/mocks/event_converter.go index c0f42c4e..31b96d99 100644 --- a/libraries/shared/mocks/event_converter.go +++ b/libraries/shared/mocks/event_converter.go @@ -16,41 +16,29 @@ package mocks -import "github.com/vulcanize/vulcanizedb/pkg/core" +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) type MockConverter struct { - ToEntitiesError error - PassedContractAddresses []string ToModelsError error - entityConverterError error - modelConverterError error ContractAbi string LogsToConvert []core.HeaderSyncLog - EntitiesToConvert []interface{} - EntitiesToReturn []interface{} - ModelsToReturn []interface{} - ToEntitiesCalledCounter int + ModelsToReturn []event.InsertionModel + PassedContractAddresses []string + SetDBCalled bool ToModelsCalledCounter int } -func (converter *MockConverter) ToEntities(contractAbi string, ethLogs []core.HeaderSyncLog) ([]interface{}, error) { - for _, log := range ethLogs { - converter.PassedContractAddresses = append(converter.PassedContractAddresses, log.Log.Address.Hex()) - } - converter.ContractAbi = contractAbi - converter.LogsToConvert = ethLogs - return converter.EntitiesToReturn, converter.ToEntitiesError -} - -func (converter *MockConverter) ToModels(entities []interface{}) ([]interface{}, error) { - converter.EntitiesToConvert = entities +func (converter *MockConverter) ToModels(abi string, logs []core.HeaderSyncLog) ([]event.InsertionModel, error) { + converter.LogsToConvert = logs + converter.ContractAbi = abi + converter.ToModelsCalledCounter = converter.ToModelsCalledCounter + 1 return converter.ModelsToReturn, converter.ToModelsError } -func (converter *MockConverter) SetToEntityConverterError(err error) { - converter.entityConverterError = err -} - -func (converter *MockConverter) SetToModelConverterError(err error) { - converter.modelConverterError = err +func (converter *MockConverter) SetDB(db *postgres.DB) { + converter.SetDBCalled = true } diff --git a/libraries/shared/mocks/event_repository.go b/libraries/shared/mocks/event_repository.go index d8d878a4..7b6a8a45 100644 --- a/libraries/shared/mocks/event_repository.go +++ b/libraries/shared/mocks/event_repository.go @@ -17,17 +17,18 @@ package mocks import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type MockEventRepository struct { createError error - PassedModels []interface{} + PassedModels []event.InsertionModel SetDbCalled bool CreateCalledCounter int } -func (repository *MockEventRepository) Create(models []interface{}) error { +func (repository *MockEventRepository) Create(models []event.InsertionModel) error { repository.PassedModels = models repository.CreateCalledCounter++ diff --git a/libraries/shared/test_data/generic.go b/libraries/shared/test_data/generic.go index c414ab0e..43d3ba12 100644 --- a/libraries/shared/test_data/generic.go +++ b/libraries/shared/test_data/generic.go @@ -20,14 +20,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "math/rand" "time" ) -type GenericModel struct{} -type GenericEntity struct{} - var startingBlockNumber = rand.Int63() var topic0 = "0x" + randomString(64) @@ -44,6 +42,8 @@ var GenericTestLog = func() types.Log { } } +var GenericModel = event.InsertionModel{} + var GenericTestConfig = transformer.EventTransformerConfig{ TransformerName: "generic-test-transformer", ContractAddresses: []string{fakeAddress().Hex()}, diff --git a/libraries/shared/test_data/test_helpers.go b/libraries/shared/test_data/test_helpers.go new file mode 100644 index 00000000..4295f4c6 --- /dev/null +++ b/libraries/shared/test_data/test_helpers.go @@ -0,0 +1,37 @@ +package test_data + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "math/rand" +) + +// Create a header sync log to reference in an event, returning inserted header sync log +func CreateTestLog(headerID int64, db *postgres.DB) core.HeaderSyncLog { + log := types.Log{ + Address: common.Address{}, + Topics: nil, + Data: nil, + BlockNumber: 0, + TxHash: common.Hash{}, + TxIndex: uint(rand.Int31()), + BlockHash: common.Hash{}, + Index: 0, + Removed: false, + } + headerSyncLogRepository := repositories.NewHeaderSyncLogRepository(db) + insertLogsErr := headerSyncLogRepository.CreateHeaderSyncLogs(headerID, []types.Log{log}) + Expect(insertLogsErr).NotTo(HaveOccurred()) + headerSyncLogs, getLogsErr := headerSyncLogRepository.GetUntransformedHeaderSyncLogs() + Expect(getLogsErr).NotTo(HaveOccurred()) + for _, headerSyncLog := range headerSyncLogs { + if headerSyncLog.Log.TxIndex == log.TxIndex { + return headerSyncLog + } + } + panic("couldn't find inserted test log") +} diff --git a/pkg/contract_watcher/shared/getter/getter_test.go b/pkg/contract_watcher/shared/getter/getter_test.go index 0f5d7cb6..80084c64 100644 --- a/pkg/contract_watcher/shared/getter/getter_test.go +++ b/pkg/contract_watcher/shared/getter/getter_test.go @@ -47,7 +47,8 @@ var _ = Describe("Interface Getter", func() { transactionConverter := rpc2.NewRpcTransactionConverter(ethClient) blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter) interfaceGetter := getter.NewInterfaceGetter(blockChain) - abi := interfaceGetter.GetABI(constants.PublicResolverAddress, blockNumber) + abi, err := interfaceGetter.GetABI(constants.PublicResolverAddress, blockNumber) + Expect(err).NotTo(HaveOccurred()) Expect(abi).To(Equal(expectedABI)) _, err = geth.ParseAbi(abi) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/contract_watcher/shared/getter/interface_getter.go b/pkg/contract_watcher/shared/getter/interface_getter.go index 8aee7d03..5cf39865 100644 --- a/pkg/contract_watcher/shared/getter/interface_getter.go +++ b/pkg/contract_watcher/shared/getter/interface_getter.go @@ -17,6 +17,7 @@ package getter import ( + "fmt" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/fetcher" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -40,14 +41,18 @@ func NewInterfaceGetter(blockChain core.BlockChain) *interfaceGetter { } // Used to construct a custom ABI based on the results from calling supportsInterface -func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string { +func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) (string, error) { a := constants.SupportsInterfaceABI args := make([]interface{}, 1) args[0] = constants.MetaSig.Bytes() supports, err := g.getSupportsInterface(a, resolverAddr, blockNumber, args) - if err != nil || !supports { - return "" + if err != nil { + return "", fmt.Errorf("call to getSupportsInterface failed: %v", err) } + if !supports { + return "", fmt.Errorf("contract does not support interface") + } + abiStr := `[` args[0] = constants.AddrChangeSig.Bytes() supports, err = g.getSupportsInterface(a, resolverAddr, blockNumber, args) @@ -91,7 +96,7 @@ func (g *interfaceGetter) GetABI(resolverAddr string, blockNumber int64) string } abiStr = abiStr[:len(abiStr)-1] + `]` - return abiStr + return abiStr, nil } // Use this method to check whether or not a contract supports a given method/event interface diff --git a/utils/utils.go b/utils/utils.go index 3891a00b..91354fcf 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -17,7 +17,8 @@ package utils import ( - log "github.com/sirupsen/logrus" + "github.com/jmoiron/sqlx" + logrus "github.com/sirupsen/logrus" "math/big" "os" "path/filepath" @@ -31,7 +32,7 @@ import ( func LoadPostgres(database config.Database, node core.Node) postgres.DB { db, err := postgres.NewDB(database, node) if err != nil { - log.Fatal("Error loading postgres: ", err) + logrus.Fatal("Error loading postgres: ", err) } return *db } @@ -40,7 +41,7 @@ func ReadAbiFile(abiFilepath string) string { abiFilepath = AbsFilePath(abiFilepath) abi, err := geth.ReadAbiFile(abiFilepath) if err != nil { - log.Fatalf("Error reading ABI file at \"%s\"\n %v", abiFilepath, err) + logrus.Fatalf("Error reading ABI file at \"%s\"\n %v", abiFilepath, err) } return abi } @@ -60,12 +61,12 @@ func GetAbi(abiFilepath string, contractHash string, network string) string { } else { url := geth.GenURL(network) etherscan := geth.NewEtherScanClient(url) - log.Printf("No ABI supplied. Retrieving ABI from Etherscan: %s", url) + logrus.Printf("No ABI supplied. Retrieving ABI from Etherscan: %s", url) contractAbiString, _ = etherscan.GetAbi(contractHash) } _, err := geth.ParseAbi(contractAbiString) if err != nil { - log.Fatalln("Invalid ABI: ", err) + logrus.Fatalln("Invalid ABI: ", err) } return contractAbiString } @@ -79,3 +80,11 @@ func RequestedBlockNumber(blockNumber *int64) *big.Int { } return _blockNumber } + +func RollbackAndLogFailure(tx *sqlx.Tx, txErr error, fieldName string) { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.WithFields(logrus.Fields{"rollbackErr": rollbackErr, "txErr": txErr}). + Warnf("failed to rollback transaction after failing to insert %s", fieldName) + } +}