Distinguish between missing and unchecked headers
- Missing == not in DB - Unchecked == logs haven't been fetched
This commit is contained in:
parent
c568fedd89
commit
13d503b851
@ -158,7 +158,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
|||||||
if recheckHeadersArg {
|
if recheckHeadersArg {
|
||||||
recheck = constants.HeaderRecheck
|
recheck = constants.HeaderRecheck
|
||||||
} else {
|
} else {
|
||||||
recheck = constants.HeaderMissing
|
recheck = constants.HeaderUnchecked
|
||||||
}
|
}
|
||||||
errs := make(chan error)
|
errs := make(chan error)
|
||||||
go w.Execute(recheck, errs)
|
go w.Execute(recheck, errs)
|
||||||
|
@ -20,6 +20,6 @@ type TransformerExecution bool
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
HeaderRecheck TransformerExecution = true
|
HeaderRecheck TransformerExecution = true
|
||||||
HeaderMissing TransformerExecution = false
|
HeaderUnchecked TransformerExecution = false
|
||||||
RecheckHeaderCap = int64(5)
|
RecheckHeaderCap = int64(5)
|
||||||
)
|
)
|
||||||
|
@ -31,8 +31,8 @@ import (
|
|||||||
var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor")
|
var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor")
|
||||||
|
|
||||||
const (
|
const (
|
||||||
missingHeadersFound = true
|
uncheckedHeadersFound = true
|
||||||
noMissingHeadersFound = false
|
noUncheckedHeadersFound = false
|
||||||
)
|
)
|
||||||
|
|
||||||
type ILogExtractor interface {
|
type ILogExtractor interface {
|
||||||
@ -74,47 +74,47 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran
|
|||||||
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) {
|
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) {
|
||||||
if len(extractor.Addresses) < 1 {
|
if len(extractor.Addresses) < 1 {
|
||||||
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
|
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
|
||||||
return ErrNoWatchedAddresses, noMissingHeadersFound
|
return ErrNoWatchedAddresses, noUncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
|
uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
|
||||||
if uncheckedHeadersErr != nil {
|
if uncheckedHeadersErr != nil {
|
||||||
logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr)
|
logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr)
|
||||||
return uncheckedHeadersErr, noMissingHeadersFound
|
return uncheckedHeadersErr, noUncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(uncheckedHeaders) < 1 {
|
if len(uncheckedHeaders) < 1 {
|
||||||
return nil, noMissingHeadersFound
|
return nil, noUncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, header := range uncheckedHeaders {
|
for _, header := range uncheckedHeaders {
|
||||||
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
|
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
|
||||||
if fetchLogsErr != nil {
|
if fetchLogsErr != nil {
|
||||||
logError("error fetching logs for header: %s", fetchLogsErr, header)
|
logError("error fetching logs for header: %s", fetchLogsErr, header)
|
||||||
return fetchLogsErr, missingHeadersFound
|
return fetchLogsErr, uncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(logs) > 0 {
|
if len(logs) > 0 {
|
||||||
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
|
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
|
||||||
if transactionsSyncErr != nil {
|
if transactionsSyncErr != nil {
|
||||||
logError("error syncing transactions: %s", transactionsSyncErr, header)
|
logError("error syncing transactions: %s", transactionsSyncErr, header)
|
||||||
return transactionsSyncErr, missingHeadersFound
|
return transactionsSyncErr, uncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs)
|
createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs)
|
||||||
if createLogsErr != nil {
|
if createLogsErr != nil {
|
||||||
logError("error persisting logs: %s", createLogsErr, header)
|
logError("error persisting logs: %s", createLogsErr, header)
|
||||||
return createLogsErr, missingHeadersFound
|
return createLogsErr, uncheckedHeadersFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
|
markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
|
||||||
if markHeaderCheckedErr != nil {
|
if markHeaderCheckedErr != nil {
|
||||||
logError("error marking header checked: %s", markHeaderCheckedErr, header)
|
logError("error marking header checked: %s", markHeaderCheckedErr, header)
|
||||||
return markHeaderCheckedErr, missingHeadersFound
|
return markHeaderCheckedErr, uncheckedHeadersFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, missingHeadersFound
|
return nil, uncheckedHeadersFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {
|
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {
|
||||||
@ -130,7 +130,7 @@ func logError(description string, err error, header core.Header) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getCheckCount(recheckHeaders constants.TransformerExecution) int64 {
|
func getCheckCount(recheckHeaders constants.TransformerExecution) int64 {
|
||||||
if recheckHeaders == constants.HeaderMissing {
|
if recheckHeaders == constants.HeaderUnchecked {
|
||||||
return 1
|
return 1
|
||||||
} else {
|
} else {
|
||||||
return constants.RecheckHeaderCap
|
return constants.RecheckHeaderCap
|
||||||
|
@ -157,33 +157,33 @@ var _ = Describe("Log extractor", func() {
|
|||||||
|
|
||||||
Describe("ExtractLogs", func() {
|
Describe("ExtractLogs", func() {
|
||||||
It("returns error if no watched addresses configured", func() {
|
It("returns error if no watched addresses configured", func() {
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(logs.ErrNoWatchedAddresses))
|
Expect(err).To(MatchError(logs.ErrNoWatchedAddresses))
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("when checking missing headers", func() {
|
Describe("when checking unchecked headers", func() {
|
||||||
It("gets missing headers since configured starting block with check_count < 1", func() {
|
It("gets headers since configured starting block with check_count < 1", func() {
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
|
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{}}
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
startingBlockNumber := rand.Int63()
|
startingBlockNumber := rand.Int63()
|
||||||
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
|
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1)))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersEndingBlockNumber).To(Equal(int64(-1)))
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(int64(1)))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersCheckCount).To(Equal(int64(1)))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("when rechecking headers", func() {
|
Describe("when rechecking headers", func() {
|
||||||
It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func() {
|
It("gets headers since configured starting block with check_count < RecheckHeaderCap", func() {
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
|
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{}}
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
startingBlockNumber := rand.Int63()
|
startingBlockNumber := rand.Int63()
|
||||||
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
|
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
|
||||||
@ -191,60 +191,61 @@ var _ = Describe("Log extractor", func() {
|
|||||||
err, _ := extractor.ExtractLogs(constants.HeaderRecheck)
|
err, _ := extractor.ExtractLogs(constants.HeaderRecheck)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1)))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersEndingBlockNumber).To(Equal(int64(-1)))
|
||||||
Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(constants.RecheckHeaderCap))
|
Expect(mockCheckedHeadersRepository.UncheckedHeadersCheckCount).To(Equal(constants.RecheckHeaderCap))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("emits error if getting missing headers fails", func() {
|
It("emits error if getting unchecked headers fails", func() {
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError
|
mockCheckedHeadersRepository.UncheckedHeadersReturnError = fakes.FakeError
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("when no missing headers", func() {
|
Describe("when no unchecked headers", func() {
|
||||||
It("does not fetch logs", func() {
|
It("does not fetch logs", func() {
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockLogFetcher := &mocks.MockLogFetcher{}
|
mockLogFetcher := &mocks.MockLogFetcher{}
|
||||||
extractor.Fetcher = mockLogFetcher
|
extractor.Fetcher = mockLogFetcher
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockLogFetcher.FetchCalled).To(BeFalse())
|
Expect(mockLogFetcher.FetchCalled).To(BeFalse())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("emits that no missing headers were found", func() {
|
It("emits that no unchecked headers were found", func() {
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockLogFetcher := &mocks.MockLogFetcher{}
|
mockLogFetcher := &mocks.MockLogFetcher{}
|
||||||
extractor.Fetcher = mockLogFetcher
|
extractor.Fetcher = mockLogFetcher
|
||||||
|
|
||||||
_, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing)
|
_, uncheckedHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(missingHeadersFound).To(BeFalse())
|
Expect(uncheckedHeadersFound).To(BeFalse())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("when there are missing headers", func() {
|
Describe("when there are unchecked headers", func() {
|
||||||
It("fetches logs for missing headers", func() {
|
It("fetches logs for unchecked headers", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
config := transformer.EventTransformerConfig{
|
config := transformer.EventTransformerConfig{
|
||||||
ContractAddresses: []string{fakes.FakeAddress.Hex()},
|
ContractAddresses: []string{fakes.FakeAddress.Hex()},
|
||||||
Topic: fakes.FakeHash.Hex(),
|
Topic: fakes.FakeHash.Hex(),
|
||||||
StartingBlockNumber: rand.Int63(),
|
StartingBlockNumber: rand.Int63(),
|
||||||
}
|
}
|
||||||
extractor.AddTransformerConfig(config)
|
addTransformerErr := extractor.AddTransformerConfig(config)
|
||||||
|
Expect(addTransformerErr).NotTo(HaveOccurred())
|
||||||
mockLogFetcher := &mocks.MockLogFetcher{}
|
mockLogFetcher := &mocks.MockLogFetcher{}
|
||||||
extractor.Fetcher = mockLogFetcher
|
extractor.Fetcher = mockLogFetcher
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockLogFetcher.FetchCalled).To(BeTrue())
|
Expect(mockLogFetcher.FetchCalled).To(BeTrue())
|
||||||
@ -255,13 +256,13 @@ var _ = Describe("Log extractor", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("returns error if fetching logs fails", func() {
|
It("returns error if fetching logs fails", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockLogFetcher := &mocks.MockLogFetcher{}
|
mockLogFetcher := &mocks.MockLogFetcher{}
|
||||||
mockLogFetcher.ReturnError = fakes.FakeError
|
mockLogFetcher.ReturnError = fakes.FakeError
|
||||||
extractor.Fetcher = mockLogFetcher
|
extractor.Fetcher = mockLogFetcher
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
@ -269,12 +270,12 @@ var _ = Describe("Log extractor", func() {
|
|||||||
|
|
||||||
Describe("when no fetched logs", func() {
|
Describe("when no fetched logs", func() {
|
||||||
It("does not sync transactions", func() {
|
It("does not sync transactions", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
||||||
extractor.Syncer = mockTransactionSyncer
|
extractor.Syncer = mockTransactionSyncer
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse())
|
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse())
|
||||||
@ -283,34 +284,34 @@ var _ = Describe("Log extractor", func() {
|
|||||||
|
|
||||||
Describe("when there are fetched logs", func() {
|
Describe("when there are fetched logs", func() {
|
||||||
It("syncs transactions", func() {
|
It("syncs transactions", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addFetchedLog(extractor)
|
addFetchedLog(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
||||||
extractor.Syncer = mockTransactionSyncer
|
extractor.Syncer = mockTransactionSyncer
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
|
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("returns error if syncing transactions fails", func() {
|
It("returns error if syncing transactions fails", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addFetchedLog(extractor)
|
addFetchedLog(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
|
||||||
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
|
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
|
||||||
extractor.Syncer = mockTransactionSyncer
|
extractor.Syncer = mockTransactionSyncer
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("persists fetched logs", func() {
|
It("persists fetched logs", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
fakeLogs := []types.Log{{
|
fakeLogs := []types.Log{{
|
||||||
Address: common.HexToAddress("0xA"),
|
Address: common.HexToAddress("0xA"),
|
||||||
@ -323,21 +324,21 @@ var _ = Describe("Log extractor", func() {
|
|||||||
mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
|
mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
|
||||||
extractor.LogRepository = mockLogRepository
|
extractor.LogRepository = mockLogRepository
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs))
|
Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("returns error if persisting logs fails", func() {
|
It("returns error if persisting logs fails", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addFetchedLog(extractor)
|
addFetchedLog(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
|
mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
|
||||||
mockLogRepository.CreateError = fakes.FakeError
|
mockLogRepository.CreateError = fakes.FakeError
|
||||||
extractor.LogRepository = mockLogRepository
|
extractor.LogRepository = mockLogRepository
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
@ -349,10 +350,10 @@ var _ = Describe("Log extractor", func() {
|
|||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
headerID := rand.Int63()
|
headerID := rand.Int63()
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: headerID}}
|
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{Id: headerID}}
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID))
|
Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID))
|
||||||
@ -362,21 +363,21 @@ var _ = Describe("Log extractor", func() {
|
|||||||
addFetchedLog(extractor)
|
addFetchedLog(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: rand.Int63()}}
|
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{Id: rand.Int63()}}
|
||||||
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
|
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
|
|
||||||
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
|
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("emits that missing headers were found", func() {
|
It("emits that missing headers were found", func() {
|
||||||
addMissingHeader(extractor)
|
addUncheckedHeader(extractor)
|
||||||
addTransformerConfig(extractor)
|
addTransformerConfig(extractor)
|
||||||
|
|
||||||
err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing)
|
err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(missingHeadersFound).To(BeTrue())
|
Expect(missingHeadersFound).To(BeTrue())
|
||||||
@ -394,9 +395,9 @@ func addTransformerConfig(extractor *logs.LogExtractor) {
|
|||||||
extractor.AddTransformerConfig(fakeConfig)
|
extractor.AddTransformerConfig(fakeConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func addMissingHeader(extractor *logs.LogExtractor) {
|
func addUncheckedHeader(extractor *logs.LogExtractor) {
|
||||||
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
|
||||||
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
|
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{}}
|
||||||
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ type MockLogExtractor struct {
|
|||||||
AddTransformerConfigError error
|
AddTransformerConfigError error
|
||||||
ExtractLogsCount int
|
ExtractLogsCount int
|
||||||
ExtractLogsErrors []error
|
ExtractLogsErrors []error
|
||||||
MissingHeadersExist []bool
|
UncheckedHeadersExist []bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error {
|
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error {
|
||||||
@ -42,6 +42,6 @@ func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.Transfor
|
|||||||
return errorThisRun, false
|
return errorThisRun, false
|
||||||
}
|
}
|
||||||
var missingHeadersExist bool
|
var missingHeadersExist bool
|
||||||
missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:]
|
missingHeadersExist, extractor.UncheckedHeadersExist = extractor.UncheckedHeadersExist[0], extractor.UncheckedHeadersExist[1:]
|
||||||
return nil, missingHeadersExist
|
return nil, missingHeadersExist
|
||||||
}
|
}
|
||||||
|
@ -94,12 +94,12 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) {
|
func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) {
|
||||||
err, missingHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders)
|
err, uncheckedHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs <- err
|
errs <- err
|
||||||
}
|
}
|
||||||
|
|
||||||
if missingHeadersFound {
|
if uncheckedHeadersFound {
|
||||||
watcher.extractLogs(recheckHeaders, errs)
|
watcher.extractLogs(recheckHeaders, errs)
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(NoNewDataPause)
|
time.Sleep(NoNewDataPause)
|
||||||
|
@ -88,9 +88,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil}
|
delegator.DelegateErrors = []error{nil}
|
||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
extractor.ExtractLogsErrors = []error{nil}
|
extractor.ExtractLogsErrors = []error{nil}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Eventually(func() int {
|
Eventually(func() int {
|
||||||
return extractor.ExtractLogsCount
|
return extractor.ExtractLogsCount
|
||||||
@ -102,9 +102,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil}
|
delegator.DelegateErrors = []error{nil}
|
||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
extractor.ExtractLogsErrors = []error{fakes.FakeError}
|
extractor.ExtractLogsErrors = []error{fakes.FakeError}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
||||||
close(done)
|
close(done)
|
||||||
@ -114,9 +114,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil}
|
delegator.DelegateErrors = []error{nil}
|
||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
extractor.ExtractLogsErrors = []error{nil, nil}
|
extractor.ExtractLogsErrors = []error{nil, nil}
|
||||||
extractor.MissingHeadersExist = []bool{true, false}
|
extractor.UncheckedHeadersExist = []bool{true, false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Eventually(func() int {
|
Eventually(func() int {
|
||||||
return extractor.ExtractLogsCount
|
return extractor.ExtractLogsCount
|
||||||
@ -128,9 +128,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil}
|
delegator.DelegateErrors = []error{nil}
|
||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
extractor.ExtractLogsErrors = []error{nil, fakes.FakeError}
|
extractor.ExtractLogsErrors = []error{nil, fakes.FakeError}
|
||||||
extractor.MissingHeadersExist = []bool{true, false}
|
extractor.UncheckedHeadersExist = []bool{true, false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
||||||
close(done)
|
close(done)
|
||||||
@ -141,9 +141,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil}
|
delegator.DelegateErrors = []error{nil}
|
||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
extractor.ExtractLogsErrors = []error{nil}
|
extractor.ExtractLogsErrors = []error{nil}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Eventually(func() int {
|
Eventually(func() int {
|
||||||
return delegator.DelegateCallCount
|
return delegator.DelegateCallCount
|
||||||
@ -155,9 +155,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.LogsFound = []bool{false}
|
delegator.LogsFound = []bool{false}
|
||||||
delegator.DelegateErrors = []error{fakes.FakeError}
|
delegator.DelegateErrors = []error{fakes.FakeError}
|
||||||
extractor.ExtractLogsErrors = []error{nil}
|
extractor.ExtractLogsErrors = []error{nil}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
||||||
close(done)
|
close(done)
|
||||||
@ -167,9 +167,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil, nil}
|
delegator.DelegateErrors = []error{nil, nil}
|
||||||
delegator.LogsFound = []bool{true, false}
|
delegator.LogsFound = []bool{true, false}
|
||||||
extractor.ExtractLogsErrors = []error{nil}
|
extractor.ExtractLogsErrors = []error{nil}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Eventually(func() int {
|
Eventually(func() int {
|
||||||
return delegator.DelegateCallCount
|
return delegator.DelegateCallCount
|
||||||
@ -181,9 +181,9 @@ var _ = Describe("Event Watcher", func() {
|
|||||||
delegator.DelegateErrors = []error{nil, fakes.FakeError}
|
delegator.DelegateErrors = []error{nil, fakes.FakeError}
|
||||||
delegator.LogsFound = []bool{true, false}
|
delegator.LogsFound = []bool{true, false}
|
||||||
extractor.ExtractLogsErrors = []error{nil}
|
extractor.ExtractLogsErrors = []error{nil}
|
||||||
extractor.MissingHeadersExist = []bool{false}
|
extractor.UncheckedHeadersExist = []bool{false}
|
||||||
|
|
||||||
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
|
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan)
|
||||||
|
|
||||||
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
Expect(<-errsChan).To(MatchError(fakes.FakeError))
|
||||||
close(done)
|
close(done)
|
||||||
|
@ -26,11 +26,11 @@ type MockCheckedHeadersRepository struct {
|
|||||||
MarkHeadersUncheckedCalled bool
|
MarkHeadersUncheckedCalled bool
|
||||||
MarkHeadersUncheckedReturnError error
|
MarkHeadersUncheckedReturnError error
|
||||||
MarkHeadersUncheckedStartingBlockNumber int64
|
MarkHeadersUncheckedStartingBlockNumber int64
|
||||||
MissingHeadersCheckCount int64
|
UncheckedHeadersCheckCount int64
|
||||||
MissingHeadersEndingBlockNumber int64
|
UncheckedHeadersEndingBlockNumber int64
|
||||||
MissingHeadersReturnError error
|
UncheckedHeadersReturnError error
|
||||||
MissingHeadersReturnHeaders []core.Header
|
UncheckedHeadersReturnHeaders []core.Header
|
||||||
MissingHeadersStartingBlockNumber int64
|
UncheckedHeadersStartingBlockNumber int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *MockCheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error {
|
func (repository *MockCheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error {
|
||||||
@ -45,8 +45,8 @@ func (repository *MockCheckedHeadersRepository) MarkHeaderChecked(headerID int64
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository *MockCheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
|
func (repository *MockCheckedHeadersRepository) UncheckedHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
|
||||||
repository.MissingHeadersStartingBlockNumber = startingBlockNumber
|
repository.UncheckedHeadersStartingBlockNumber = startingBlockNumber
|
||||||
repository.MissingHeadersEndingBlockNumber = endingBlockNumber
|
repository.UncheckedHeadersEndingBlockNumber = endingBlockNumber
|
||||||
repository.MissingHeadersCheckCount = checkCount
|
repository.UncheckedHeadersCheckCount = checkCount
|
||||||
return repository.MissingHeadersReturnHeaders, repository.MissingHeadersReturnError
|
return repository.UncheckedHeadersReturnHeaders, repository.UncheckedHeadersReturnError
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user