Merge pull request #101 from vulcanize/update-transformer-execute
VDB-696 Update transformer execute
This commit is contained in:
commit
d761f88b30
@ -17,14 +17,11 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Repository interface {
|
type Repository interface {
|
||||||
Create(headerID int64, models []interface{}) error
|
Create(headerID int64, models []interface{}) error
|
||||||
MarkHeaderChecked(headerID int64) error
|
MarkHeaderChecked(headerID int64) error
|
||||||
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
|
|
||||||
RecheckHeaders(startingBlockNumber, endingBlockNUmber int64) ([]core.Header, error)
|
|
||||||
SetDB(db *postgres.DB)
|
SetDB(db *postgres.DB)
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
@ -37,7 +36,7 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Event
|
|||||||
return transformer
|
return transformer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (transformer Transformer) Execute(logs []types.Log, header core.Header, recheckHeaders constants.TransformerExecution) error {
|
func (transformer Transformer) Execute(logs []types.Log, header core.Header) error {
|
||||||
transformerName := transformer.Config.TransformerName
|
transformerName := transformer.Config.TransformerName
|
||||||
config := transformer.Config
|
config := transformer.Config
|
||||||
|
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
|
||||||
@ -60,14 +59,14 @@ var _ = Describe("Transformer", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("marks header checked if no logs returned", func() {
|
It("marks header checked if no logs returned", func() {
|
||||||
err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
|
err := t.Execute([]types.Log{}, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
repository.AssertMarkHeaderCheckedCalledWith(headerOne.Id)
|
repository.AssertMarkHeaderCheckedCalledWith(headerOne.Id)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't attempt to convert or persist an empty collection when there are no logs", func() {
|
It("doesn't attempt to convert or persist an empty collection when there are no logs", func() {
|
||||||
err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
|
err := t.Execute([]types.Log{}, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(converter.ToEntitiesCalledCounter).To(Equal(0))
|
Expect(converter.ToEntitiesCalledCounter).To(Equal(0))
|
||||||
@ -76,7 +75,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("does not call repository.MarkCheckedHeader when there are logs", func() {
|
It("does not call repository.MarkCheckedHeader when there are logs", func() {
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
repository.AssertMarkHeaderCheckedNotCalled()
|
repository.AssertMarkHeaderCheckedNotCalled()
|
||||||
@ -85,14 +84,14 @@ var _ = Describe("Transformer", func() {
|
|||||||
It("returns error if marking header checked returns err", func() {
|
It("returns error if marking header checked returns err", func() {
|
||||||
repository.SetMarkHeaderCheckedError(fakes.FakeError)
|
repository.SetMarkHeaderCheckedError(fakes.FakeError)
|
||||||
|
|
||||||
err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
|
err := t.Execute([]types.Log{}, headerOne)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("converts an eth log to an entity", func() {
|
It("converts an eth log to an entity", func() {
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(converter.ContractAbi).To(Equal(config.ContractAbi))
|
Expect(converter.ContractAbi).To(Equal(config.ContractAbi))
|
||||||
@ -102,7 +101,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
It("returns an error if converter fails", func() {
|
It("returns an error if converter fails", func() {
|
||||||
converter.ToEntitiesError = fakes.FakeError
|
converter.ToEntitiesError = fakes.FakeError
|
||||||
|
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
@ -111,7 +110,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
It("converts an entity to a model", func() {
|
It("converts an entity to a model", func() {
|
||||||
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}
|
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}
|
||||||
|
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(converter.EntitiesToConvert[0]).To(Equal(test_data.GenericEntity{}))
|
Expect(converter.EntitiesToConvert[0]).To(Equal(test_data.GenericEntity{}))
|
||||||
@ -121,7 +120,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}
|
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}
|
||||||
converter.ToModelsError = fakes.FakeError
|
converter.ToModelsError = fakes.FakeError
|
||||||
|
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
@ -130,7 +129,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
It("persists the record", func() {
|
It("persists the record", func() {
|
||||||
converter.ModelsToReturn = []interface{}{test_data.GenericModel{}}
|
converter.ModelsToReturn = []interface{}{test_data.GenericModel{}}
|
||||||
|
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(repository.PassedHeaderID).To(Equal(headerOne.Id))
|
Expect(repository.PassedHeaderID).To(Equal(headerOne.Id))
|
||||||
@ -139,7 +138,7 @@ var _ = Describe("Transformer", func() {
|
|||||||
|
|
||||||
It("returns error if persisting the record fails", func() {
|
It("returns error if persisting the record fails", func() {
|
||||||
repository.SetCreateError(fakes.FakeError)
|
repository.SetCreateError(fakes.FakeError)
|
||||||
err := t.Execute(logs, headerOne, constants.HeaderMissing)
|
err := t.Execute(logs, headerOne)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
|
@ -19,7 +19,6 @@ package mocks
|
|||||||
import (
|
import (
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
|
||||||
shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
@ -33,7 +32,7 @@ type MockTransformer struct {
|
|||||||
config shared_t.EventTransformerConfig
|
config shared_t.EventTransformerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mh *MockTransformer) Execute(logs []types.Log, header core.Header, recheckHeaders constants.TransformerExecution) error {
|
func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error {
|
||||||
if mh.ExecuteError != nil {
|
if mh.ExecuteError != nil {
|
||||||
return mh.ExecuteError
|
return mh.ExecuteError
|
||||||
}
|
}
|
||||||
|
@ -69,31 +69,6 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func RecheckHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, checkedHeadersColumn string) ([]core.Header, error) {
|
|
||||||
var result []core.Header
|
|
||||||
var query string
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if endingBlockNumber == -1 {
|
|
||||||
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
|
|
||||||
LEFT JOIN checked_headers on headers.id = header_id
|
|
||||||
WHERE ` + checkedHeadersColumn + ` between 1 and ` + constants.RecheckHeaderCap + `
|
|
||||||
AND headers.block_number >= $1
|
|
||||||
AND headers.eth_node_fingerprint = $2`
|
|
||||||
err = db.Select(&result, query, startingBlockNumber, db.Node.ID)
|
|
||||||
} else {
|
|
||||||
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
|
|
||||||
LEFT JOIN checked_headers on headers.id = header_id
|
|
||||||
WHERE ` + checkedHeadersColumn + ` between 1 and ` + constants.RecheckHeaderCap + `
|
|
||||||
AND headers.block_number >= $1
|
|
||||||
AND headers.block_number <= $2
|
|
||||||
AND headers.eth_node_fingerprint = $3`
|
|
||||||
err = db.Select(&result, query, startingBlockNumber, endingBlockNumber, db.Node.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
||||||
// Query returns `[]driver.Value`, nullable polymorphic interface
|
// Query returns `[]driver.Value`, nullable polymorphic interface
|
||||||
var queryResult []driver.Value
|
var queryResult []driver.Value
|
||||||
@ -121,37 +96,47 @@ func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
|
|||||||
return columnNames, nil
|
return columnNames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Builds a SQL string that checks if any column value is 0, given the column names.
|
// Builds a SQL string that checks if any column should be checked/rechecked.
|
||||||
// Defaults to FALSE when no columns are provided.
|
// Defaults to FALSE when no columns are provided.
|
||||||
// Ex: ["columnA", "columnB"] => "NOT (columnA!=0 AND columnB!=0)"
|
// Ex: ["columnA", "columnB"] => "NOT (columnA!=0 AND columnB!=0)"
|
||||||
// [] => "FALSE"
|
// [] => "FALSE"
|
||||||
func CreateNotCheckedSQL(boolColumns []string, recheckHeaders constants.TransformerExecution) string {
|
func CreateHeaderCheckedPredicateSQL(boolColumns []string, recheckHeaders constants.TransformerExecution) string {
|
||||||
|
|
||||||
var result bytes.Buffer
|
|
||||||
|
|
||||||
if len(boolColumns) == 0 {
|
if len(boolColumns) == 0 {
|
||||||
return "FALSE"
|
return "FALSE"
|
||||||
}
|
}
|
||||||
|
|
||||||
result.WriteString("NOT (")
|
if recheckHeaders {
|
||||||
|
return createHeaderCheckedPredicateSQLForRecheckedHeaders(boolColumns)
|
||||||
|
} else {
|
||||||
|
return createHeaderCheckedPredicateSQLForMissingHeaders(boolColumns)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createHeaderCheckedPredicateSQLForMissingHeaders(boolColumns []string) string {
|
||||||
|
var result bytes.Buffer
|
||||||
|
result.WriteString(" (")
|
||||||
|
|
||||||
// Loop excluding last column name
|
// Loop excluding last column name
|
||||||
for _, column := range boolColumns[:len(boolColumns)-1] {
|
for _, column := range boolColumns[:len(boolColumns)-1] {
|
||||||
|
result.WriteString(fmt.Sprintf("%v=0 OR ", column))
|
||||||
if recheckHeaders {
|
|
||||||
result.WriteString(fmt.Sprintf("%v>=%s AND ", column, constants.RecheckHeaderCap))
|
|
||||||
} else {
|
|
||||||
result.WriteString(fmt.Sprintf("%v!=0 AND ", column))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// No trailing "OR" for the last column name
|
result.WriteString(fmt.Sprintf("%v=0)", boolColumns[len(boolColumns)-1]))
|
||||||
if recheckHeaders {
|
|
||||||
result.WriteString(fmt.Sprintf("%v>=%s)", boolColumns[len(boolColumns)-1], constants.RecheckHeaderCap))
|
return result.String()
|
||||||
} else {
|
}
|
||||||
result.WriteString(fmt.Sprintf("%v!=0)", boolColumns[len(boolColumns)-1]))
|
|
||||||
|
func createHeaderCheckedPredicateSQLForRecheckedHeaders(boolColumns []string) string {
|
||||||
}
|
var result bytes.Buffer
|
||||||
|
result.WriteString(" (")
|
||||||
|
|
||||||
|
// Loop excluding last column name
|
||||||
|
for _, column := range boolColumns[:len(boolColumns)-1] {
|
||||||
|
result.WriteString(fmt.Sprintf("%v<%s OR ", column, constants.RecheckHeaderCap))
|
||||||
|
}
|
||||||
|
|
||||||
|
// No trailing "OR" for the last column name
|
||||||
|
result.WriteString(fmt.Sprintf("%v<%s)", boolColumns[len(boolColumns)-1], constants.RecheckHeaderCap))
|
||||||
|
|
||||||
return result.String()
|
return result.String()
|
||||||
}
|
}
|
||||||
|
@ -36,12 +36,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Repository", func() {
|
var _ = Describe("Repository", func() {
|
||||||
Describe("MarkHeaderChecked", func() {
|
var (
|
||||||
var (
|
checkedHeadersColumn string
|
||||||
checkedHeadersColumn string
|
db *postgres.DB
|
||||||
db *postgres.DB
|
)
|
||||||
)
|
|
||||||
|
|
||||||
|
Describe("MarkHeaderChecked", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
db = test_config.NewTestDB(test_config.NewTestNode())
|
db = test_config.NewTestDB(test_config.NewTestNode())
|
||||||
test_config.CleanTestDB(db)
|
test_config.CleanTestDB(db)
|
||||||
@ -73,11 +73,6 @@ var _ = Describe("Repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Describe("MarkHeaderCheckedInTransaction", func() {
|
Describe("MarkHeaderCheckedInTransaction", func() {
|
||||||
var (
|
|
||||||
checkedHeadersColumn string
|
|
||||||
db *postgres.DB
|
|
||||||
)
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
db = test_config.NewTestDB(test_config.NewTestNode())
|
db = test_config.NewTestDB(test_config.NewTestNode())
|
||||||
test_config.CleanTestDB(db)
|
test_config.CleanTestDB(db)
|
||||||
@ -114,16 +109,17 @@ var _ = Describe("Repository", func() {
|
|||||||
|
|
||||||
Describe("MissingHeaders", func() {
|
Describe("MissingHeaders", func() {
|
||||||
var (
|
var (
|
||||||
db *postgres.DB
|
|
||||||
headerRepository datastore.HeaderRepository
|
headerRepository datastore.HeaderRepository
|
||||||
startingBlockNumber int64
|
startingBlockNumber int64
|
||||||
endingBlockNumber int64
|
endingBlockNumber int64
|
||||||
eventSpecificBlockNumber int64
|
eventSpecificBlockNumber int64
|
||||||
|
outOfRangeBlockNumber int64
|
||||||
blockNumbers []int64
|
blockNumbers []int64
|
||||||
headerIDs []int64
|
headerIDs []int64
|
||||||
notCheckedSQL string
|
notCheckedSQL string
|
||||||
err error
|
err error
|
||||||
hr r2.HeaderRepository
|
hr r2.HeaderRepository
|
||||||
|
columnNames []string
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
@ -133,14 +129,14 @@ var _ = Describe("Repository", func() {
|
|||||||
hr = r2.NewHeaderRepository(db)
|
hr = r2.NewHeaderRepository(db)
|
||||||
hr.AddCheckColumns(getExpectedColumnNames())
|
hr.AddCheckColumns(getExpectedColumnNames())
|
||||||
|
|
||||||
columnNames, err := shared.GetCheckedColumnNames(db)
|
columnNames, err = shared.GetCheckedColumnNames(db)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
notCheckedSQL = shared.CreateNotCheckedSQL(columnNames, constants.HeaderMissing)
|
notCheckedSQL = shared.CreateHeaderCheckedPredicateSQL(columnNames, constants.HeaderMissing)
|
||||||
|
|
||||||
startingBlockNumber = rand.Int63()
|
startingBlockNumber = rand.Int63()
|
||||||
eventSpecificBlockNumber = startingBlockNumber + 1
|
eventSpecificBlockNumber = startingBlockNumber + 1
|
||||||
endingBlockNumber = startingBlockNumber + 2
|
endingBlockNumber = startingBlockNumber + 2
|
||||||
outOfRangeBlockNumber := endingBlockNumber + 1
|
outOfRangeBlockNumber = endingBlockNumber + 1
|
||||||
|
|
||||||
blockNumbers = []int64{startingBlockNumber, eventSpecificBlockNumber, endingBlockNumber, outOfRangeBlockNumber}
|
blockNumbers = []int64{startingBlockNumber, eventSpecificBlockNumber, endingBlockNumber, outOfRangeBlockNumber}
|
||||||
|
|
||||||
@ -157,6 +153,7 @@ var _ = Describe("Repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("only treats headers as checked if the event specific logs have been checked", func() {
|
It("only treats headers as checked if the event specific logs have been checked", func() {
|
||||||
|
//add a checked_header record, but don't mark it check for any of the columns
|
||||||
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
|
_, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1])
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@ -191,83 +188,47 @@ var _ = Describe("Repository", func() {
|
|||||||
Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(eventSpecificBlockNumber+10), Equal(endingBlockNumber+10)))
|
Expect(nodeTwoMissingHeaders[0].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(eventSpecificBlockNumber+10), Equal(endingBlockNumber+10)))
|
||||||
Expect(nodeTwoMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(eventSpecificBlockNumber+10), Equal(endingBlockNumber+10)))
|
Expect(nodeTwoMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(eventSpecificBlockNumber+10), Equal(endingBlockNumber+10)))
|
||||||
})
|
})
|
||||||
})
|
|
||||||
|
|
||||||
Describe("RecheckHeaders", func() {
|
It("handles an ending block of -1 ", func() {
|
||||||
var (
|
endingBlock := int64(-1)
|
||||||
checkedHeadersColumn string
|
headers, err := shared.MissingHeaders(startingBlockNumber, endingBlock, db, notCheckedSQL)
|
||||||
db *postgres.DB
|
|
||||||
headerOneID, headerTwoID, headerThreeID, headerFourID int64
|
|
||||||
headerOneErr, headerTwoErr, headerThreeErr, headerFourErr error
|
|
||||||
)
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
Expect(err).NotTo(HaveOccurred())
|
||||||
db = test_config.NewTestDB(test_config.NewTestNode())
|
Expect(len(headers)).To(Equal(4))
|
||||||
test_config.CleanTestDB(db)
|
Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventSpecificBlockNumber), Equal(outOfRangeBlockNumber)))
|
||||||
|
Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventSpecificBlockNumber), Equal(outOfRangeBlockNumber)))
|
||||||
|
Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventSpecificBlockNumber), Equal(outOfRangeBlockNumber)))
|
||||||
|
Expect(headers[3].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(eventSpecificBlockNumber), Equal(outOfRangeBlockNumber)))
|
||||||
|
|
||||||
// create header checked column
|
})
|
||||||
checkedHeadersColumn = "test_column_checked"
|
|
||||||
_, migrateErr := db.Exec(`ALTER TABLE public.checked_headers ADD COLUMN ` + checkedHeadersColumn + ` integer`)
|
|
||||||
Expect(migrateErr).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
// create headers
|
It("when a the `notCheckedSQL` argument allows for rechecks it returns headers where the checked count is less than the maximum", func() {
|
||||||
headerRepository := repositories.NewHeaderRepository(db)
|
columnName := columnNames[0]
|
||||||
headerOneID, headerOneErr = headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(1))
|
recheckedSQL := shared.CreateHeaderCheckedPredicateSQL([]string{columnName}, constants.HeaderRecheck)
|
||||||
Expect(headerOneErr).NotTo(HaveOccurred())
|
// mark every header checked at least once
|
||||||
headerTwoID, headerTwoErr = headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(2))
|
// header 4 is marked the maximum number of times, it it is not longer checked
|
||||||
Expect(headerTwoErr).NotTo(HaveOccurred())
|
|
||||||
headerThreeID, headerThreeErr = headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(3))
|
|
||||||
Expect(headerThreeErr).NotTo(HaveOccurred())
|
|
||||||
headerFourID, headerFourErr = headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(4))
|
|
||||||
Expect(headerFourErr).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
// mark every header checked at least once, with one fully rechecked (headerThree)
|
|
||||||
maxCheckCount, intConversionErr := strconv.Atoi(constants.RecheckHeaderCap)
|
maxCheckCount, intConversionErr := strconv.Atoi(constants.RecheckHeaderCap)
|
||||||
Expect(intConversionErr).NotTo(HaveOccurred())
|
Expect(intConversionErr).NotTo(HaveOccurred())
|
||||||
_, markHeaderOneCheckedErr := db.Exec(
|
|
||||||
`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2)`,
|
|
||||||
headerOneID, maxCheckCount)
|
|
||||||
Expect(markHeaderOneCheckedErr).NotTo(HaveOccurred())
|
|
||||||
_, markHeaderTwoCheckedErr := db.Exec(
|
|
||||||
`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2)`,
|
|
||||||
headerTwoID, maxCheckCount)
|
|
||||||
Expect(markHeaderTwoCheckedErr).NotTo(HaveOccurred())
|
|
||||||
_, markHeaderThreeCheckedErr := db.Exec(
|
|
||||||
`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2)`,
|
|
||||||
headerThreeID, maxCheckCount+1)
|
|
||||||
Expect(markHeaderThreeCheckedErr).NotTo(HaveOccurred())
|
|
||||||
_, markHeaderFourCheckedErr := db.Exec(
|
|
||||||
`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2)`,
|
|
||||||
headerFourID, maxCheckCount)
|
|
||||||
Expect(markHeaderFourCheckedErr).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
AfterEach(func() {
|
markHeaderOneErr := shared.MarkHeaderChecked(headerIDs[0], db, columnName)
|
||||||
_, cleanupMigrateErr := db.Exec(`ALTER TABLE public.checked_headers DROP COLUMN ` + checkedHeadersColumn)
|
Expect(markHeaderOneErr).NotTo(HaveOccurred())
|
||||||
Expect(cleanupMigrateErr).NotTo(HaveOccurred())
|
markHeaderTwoErr := shared.MarkHeaderChecked(headerIDs[1], db, columnName)
|
||||||
})
|
Expect(markHeaderTwoErr).NotTo(HaveOccurred())
|
||||||
|
markHeaderThreeErr := shared.MarkHeaderChecked(headerIDs[2], db, columnName)
|
||||||
|
Expect(markHeaderThreeErr).NotTo(HaveOccurred())
|
||||||
|
for i := 0; i <= maxCheckCount; i++ {
|
||||||
|
markHeaderFourErr := shared.MarkHeaderChecked(headerIDs[3], db, columnName)
|
||||||
|
Expect(markHeaderFourErr).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
Describe("when no ending block number (ending block number == -1)", func() {
|
headers, err := shared.MissingHeaders(1, -1, db, recheckedSQL)
|
||||||
It("returns all headers since starting block where checked count is less than cap", func() {
|
|
||||||
headers, err := shared.RecheckHeaders(1, -1, db, checkedHeadersColumn)
|
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(headers)).To(Equal(3))
|
Expect(len(headers)).To(Equal(3))
|
||||||
Expect(headers[0].Id).To(Or(Equal(headerOneID), Equal(headerTwoID), Equal(headerFourID)))
|
Expect(headers[0].Id).To(Or(Equal(headerIDs[0]), Equal(headerIDs[1]), Equal(headerIDs[2])))
|
||||||
Expect(headers[1].Id).To(Or(Equal(headerOneID), Equal(headerTwoID), Equal(headerFourID)))
|
Expect(headers[1].Id).To(Or(Equal(headerIDs[0]), Equal(headerIDs[1]), Equal(headerIDs[2])))
|
||||||
Expect(headers[2].Id).To(Or(Equal(headerOneID), Equal(headerTwoID), Equal(headerFourID)))
|
Expect(headers[2].Id).To(Or(Equal(headerIDs[0]), Equal(headerIDs[1]), Equal(headerIDs[2])))
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("when ending block number specified", func() {
|
|
||||||
It("returns headers between starting and ending block where checked count is less than cap", func() {
|
|
||||||
headers, err := shared.RecheckHeaders(1, 3, db, checkedHeadersColumn)
|
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(len(headers)).To(Equal(2))
|
|
||||||
Expect(headers[0].Id).To(Or(Equal(headerOneID), Equal(headerTwoID)))
|
|
||||||
Expect(headers[1].Id).To(Or(Equal(headerOneID), Equal(headerTwoID)))
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -285,32 +246,49 @@ var _ = Describe("Repository", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("CreateNotCheckedSQL", func() {
|
Describe("CreateHeaderCheckedPredicateSQL", func() {
|
||||||
It("generates a correct SQL string for one column", func() {
|
Describe("for headers that haven't been checked for logs", func() {
|
||||||
columns := []string{"columnA"}
|
It("generates a correct SQL string for one column", func() {
|
||||||
expected := "NOT (columnA!=0)"
|
columns := []string{"columnA"}
|
||||||
actual := shared.CreateNotCheckedSQL(columns, constants.HeaderMissing)
|
expected := " (columnA=0)"
|
||||||
Expect(actual).To(Equal(expected))
|
actual := shared.CreateHeaderCheckedPredicateSQL(columns, constants.HeaderMissing)
|
||||||
|
Expect(actual).To(Equal(expected))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("generates a correct SQL string for several columns", func() {
|
||||||
|
columns := []string{"columnA", "columnB"}
|
||||||
|
expected := " (columnA=0 OR columnB=0)"
|
||||||
|
actual := shared.CreateHeaderCheckedPredicateSQL(columns, constants.HeaderMissing)
|
||||||
|
Expect(actual).To(Equal(expected))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("defaults to FALSE when there are no columns", func() {
|
||||||
|
expected := "FALSE"
|
||||||
|
actual := shared.CreateHeaderCheckedPredicateSQL([]string{}, constants.HeaderMissing)
|
||||||
|
Expect(actual).To(Equal(expected))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("generates a correct SQL string for several columns", func() {
|
Describe("for headers that are being rechecked for logs", func() {
|
||||||
columns := []string{"columnA", "columnB"}
|
It("generates a correct SQL string for rechecking headers for one column", func() {
|
||||||
expected := "NOT (columnA!=0 AND columnB!=0)"
|
columns := []string{"columnA"}
|
||||||
actual := shared.CreateNotCheckedSQL(columns, constants.HeaderMissing)
|
expected := fmt.Sprintf(" (columnA<%s)", constants.RecheckHeaderCap)
|
||||||
Expect(actual).To(Equal(expected))
|
actual := shared.CreateHeaderCheckedPredicateSQL(columns, constants.HeaderRecheck)
|
||||||
})
|
Expect(actual).To(Equal(expected))
|
||||||
|
})
|
||||||
|
|
||||||
It("defaults to FALSE when there are no columns", func() {
|
It("generates a correct SQL string for rechecking headers for several columns", func() {
|
||||||
expected := "FALSE"
|
columns := []string{"columnA", "columnB"}
|
||||||
actual := shared.CreateNotCheckedSQL([]string{}, constants.HeaderMissing)
|
expected := fmt.Sprintf(" (columnA<%s OR columnB<%s)", constants.RecheckHeaderCap, constants.RecheckHeaderCap)
|
||||||
Expect(actual).To(Equal(expected))
|
actual := shared.CreateHeaderCheckedPredicateSQL(columns, constants.HeaderRecheck)
|
||||||
})
|
Expect(actual).To(Equal(expected))
|
||||||
|
})
|
||||||
|
|
||||||
It("generates a correct SQL string for rechecking headers", func() {
|
It("defaults to FALSE when there are no columns", func() {
|
||||||
columns := []string{"columnA", "columnB"}
|
expected := "FALSE"
|
||||||
expected := fmt.Sprintf("NOT (columnA>=%s AND columnB>=%s)", constants.RecheckHeaderCap, constants.RecheckHeaderCap)
|
actual := shared.CreateHeaderCheckedPredicateSQL([]string{}, constants.HeaderRecheck)
|
||||||
actual := shared.CreateNotCheckedSQL(columns, constants.HeaderRecheck)
|
Expect(actual).To(Equal(expected))
|
||||||
Expect(actual).To(Equal(expected))
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -20,13 +20,12 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventTransformer interface {
|
type EventTransformer interface {
|
||||||
Execute(logs []types.Log, header core.Header, recheckHeaders constants.TransformerExecution) error
|
Execute(logs []types.Log, header core.Header) error
|
||||||
GetConfig() EventTransformerConfig
|
GetConfig() EventTransformerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
notCheckedSQL := repository.CreateNotCheckedSQL(checkedColumnNames, recheckHeaders)
|
notCheckedSQL := repository.CreateHeaderCheckedPredicateSQL(checkedColumnNames, recheckHeaders)
|
||||||
|
|
||||||
missingHeaders, err := repository.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL)
|
missingHeaders, err := repository.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -134,7 +134,7 @@ func (watcher *EventWatcher) transformLogs(logs []types.Log, header core.Header)
|
|||||||
for _, t := range watcher.Transformers {
|
for _, t := range watcher.Transformers {
|
||||||
transformerName := t.GetConfig().TransformerName
|
transformerName := t.GetConfig().TransformerName
|
||||||
logChunk := chunkedLogs[transformerName]
|
logChunk := chunkedLogs[transformerName]
|
||||||
err := t.Execute(logChunk, header, constants.HeaderMissing)
|
err := t.Execute(logChunk, header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err)
|
logrus.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err)
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user