Return error when no logs/headers available

- Replaces bool and moots question of error/bool ordering
- Also make event watcher execution synchronous
This commit is contained in:
Rob Mulholand 2019-09-18 20:55:15 -05:00
parent 2b798e00e0
commit 4fa19be90a
9 changed files with 137 additions and 164 deletions

View File

@ -160,13 +160,9 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
} else { } else {
recheck = constants.HeaderUnchecked recheck = constants.HeaderUnchecked
} }
errs := make(chan error) err := w.Execute(recheck)
go w.Execute(recheck, errs) if err != nil {
for { LogWithCommand.Fatalf("error executing event watcher: %s", err.Error())
select {
case err := <-errs:
LogWithCommand.Fatalf("error executing event watcher: %s", err.Error())
}
} }
} }

View File

@ -25,16 +25,14 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
var ErrNoTransformers = errors.New("no event transformers configured in the log delegator") var (
ErrNoLogs = errors.New("no logs available for transforming")
const ( ErrNoTransformers = errors.New("no event transformers configured in the log delegator")
logsFound = true
noLogsFound = false
) )
type ILogDelegator interface { type ILogDelegator interface {
AddTransformer(t transformer.EventTransformer) AddTransformer(t transformer.EventTransformer)
DelegateLogs() (error, bool) DelegateLogs() error
} }
type LogDelegator struct { type LogDelegator struct {
@ -48,28 +46,28 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) {
delegator.Chunker.AddConfig(t.GetConfig()) delegator.Chunker.AddConfig(t.GetConfig())
} }
func (delegator *LogDelegator) DelegateLogs() (error, bool) { func (delegator *LogDelegator) DelegateLogs() error {
if len(delegator.Transformers) < 1 { if len(delegator.Transformers) < 1 {
return ErrNoTransformers, noLogsFound return ErrNoTransformers
} }
persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs()
if fetchErr != nil { if fetchErr != nil {
logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) logrus.Errorf("error loading logs from db: %s", fetchErr.Error())
return fetchErr, noLogsFound return fetchErr
} }
if len(persistedLogs) < 1 { if len(persistedLogs) < 1 {
return nil, noLogsFound return ErrNoLogs
} }
transformErr := delegator.delegateLogs(persistedLogs) transformErr := delegator.delegateLogs(persistedLogs)
if transformErr != nil { if transformErr != nil {
logrus.Errorf("error transforming logs: %s", transformErr) logrus.Errorf("error transforming logs: %s", transformErr)
return transformErr, logsFound return transformErr
} }
return nil, logsFound return nil
} }
func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error {

View File

@ -59,10 +59,10 @@ var _ = Describe("Log delegator", func() {
}) })
Describe("DelegateLogs", func() { Describe("DelegateLogs", func() {
It("returns an error if no transformers configured", func() { It("returns error if no transformers configured", func() {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
err, _ := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(logs.ErrNoTransformers)) Expect(err).To(MatchError(logs.ErrNoTransformers))
@ -70,35 +70,36 @@ var _ = Describe("Log delegator", func() {
It("gets untransformed logs", func() { It("gets untransformed logs", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}}
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
err, _ := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockLogRepository.GetCalled).To(BeTrue()) Expect(mockLogRepository.GetCalled).To(BeTrue())
}) })
It("emits error if getting untransformed logs fails", func() { It("returns error if getting untransformed logs fails", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.GetError = fakes.FakeError mockLogRepository.GetError = fakes.FakeError
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
err, _ := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits that no logs were found if no logs returned", func() { It("returns error that no logs were found if no logs returned", func() {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
err, logsFound := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).NotTo(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(logsFound).To(BeFalse()) Expect(err).To(MatchError(logs.ErrNoLogs))
}) })
It("delegates chunked logs to transformers", func() { It("delegates chunked logs to transformers", func() {
@ -115,27 +116,27 @@ var _ = Describe("Log delegator", func() {
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
err, _ := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue()) Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue())
Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs)) Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs))
}) })
It("emits error if transformer returns an error", func() { It("returns error if transformer returns an error", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}}
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError}
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
err, _ := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits logs found when logs returned and delegated", func() { It("returns nil for error when logs returned and delegated", func() {
fakeTransformer := &mocks.MockEventTransformer{} fakeTransformer := &mocks.MockEventTransformer{}
config := mocks.FakeTransformerConfig config := mocks.FakeTransformerConfig
fakeTransformer.SetTransformerConfig(config) fakeTransformer.SetTransformerConfig(config)
@ -149,10 +150,9 @@ var _ = Describe("Log delegator", func() {
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
err, logsFound := delegator.DelegateLogs() err := delegator.DelegateLogs()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(logsFound).To(BeTrue())
}) })
}) })
}) })

View File

@ -28,16 +28,14 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") var (
ErrNoUncheckedHeaders = errors.New("no unchecked headers available for log fetching")
const ( ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor")
uncheckedHeadersFound = true
noUncheckedHeadersFound = false
) )
type ILogExtractor interface { type ILogExtractor interface {
AddTransformerConfig(config transformer.EventTransformerConfig) error AddTransformerConfig(config transformer.EventTransformerConfig) error
ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) ExtractLogs(recheckHeaders constants.TransformerExecution) error
} }
type LogExtractor struct { type LogExtractor struct {
@ -71,50 +69,50 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran
} }
// Fetch and persist watched logs // Fetch and persist watched logs
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error {
if len(extractor.Addresses) < 1 { if len(extractor.Addresses) < 1 {
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
return ErrNoWatchedAddresses, noUncheckedHeadersFound return ErrNoWatchedAddresses
} }
uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
if uncheckedHeadersErr != nil { if uncheckedHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr) logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr)
return uncheckedHeadersErr, noUncheckedHeadersFound return uncheckedHeadersErr
} }
if len(uncheckedHeaders) < 1 { if len(uncheckedHeaders) < 1 {
return nil, noUncheckedHeadersFound return ErrNoUncheckedHeaders
} }
for _, header := range uncheckedHeaders { for _, header := range uncheckedHeaders {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil { if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header) logError("error fetching logs for header: %s", fetchLogsErr, header)
return fetchLogsErr, uncheckedHeadersFound return fetchLogsErr
} }
if len(logs) > 0 { if len(logs) > 0 {
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil { if transactionsSyncErr != nil {
logError("error syncing transactions: %s", transactionsSyncErr, header) logError("error syncing transactions: %s", transactionsSyncErr, header)
return transactionsSyncErr, uncheckedHeadersFound return transactionsSyncErr
} }
createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs)
if createLogsErr != nil { if createLogsErr != nil {
logError("error persisting logs: %s", createLogsErr, header) logError("error persisting logs: %s", createLogsErr, header)
return createLogsErr, uncheckedHeadersFound return createLogsErr
} }
} }
markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
if markHeaderCheckedErr != nil { if markHeaderCheckedErr != nil {
logError("error marking header checked: %s", markHeaderCheckedErr, header) logError("error marking header checked: %s", markHeaderCheckedErr, header)
return markHeaderCheckedErr, uncheckedHeadersFound return markHeaderCheckedErr
} }
} }
return nil, uncheckedHeadersFound return nil
} }
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {

View File

@ -157,7 +157,7 @@ var _ = Describe("Log extractor", func() {
Describe("ExtractLogs", func() { Describe("ExtractLogs", func() {
It("returns error if no watched addresses configured", func() { It("returns error if no watched addresses configured", func() {
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) Expect(err).To(MatchError(logs.ErrNoWatchedAddresses))
@ -171,7 +171,7 @@ var _ = Describe("Log extractor", func() {
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
@ -188,7 +188,7 @@ var _ = Describe("Log extractor", func() {
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
err, _ := extractor.ExtractLogs(constants.HeaderRecheck) err := extractor.ExtractLogs(constants.HeaderRecheck)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
@ -197,13 +197,13 @@ var _ = Describe("Log extractor", func() {
}) })
}) })
It("emits error if getting unchecked headers fails", func() { It("returns error if getting unchecked headers fails", func() {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.UncheckedHeadersReturnError = fakes.FakeError mockCheckedHeadersRepository.UncheckedHeadersReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -215,20 +215,19 @@ var _ = Describe("Log extractor", func() {
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) _ = extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred())
Expect(mockLogFetcher.FetchCalled).To(BeFalse()) Expect(mockLogFetcher.FetchCalled).To(BeFalse())
}) })
It("emits that no unchecked headers were found", func() { It("returns error that no unchecked headers were found", func() {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
_, uncheckedHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(uncheckedHeadersFound).To(BeFalse()) Expect(err).To(MatchError(logs.ErrNoUncheckedHeaders))
}) })
}) })
@ -245,7 +244,7 @@ var _ = Describe("Log extractor", func() {
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockLogFetcher.FetchCalled).To(BeTrue()) Expect(mockLogFetcher.FetchCalled).To(BeTrue())
@ -262,7 +261,7 @@ var _ = Describe("Log extractor", func() {
mockLogFetcher.ReturnError = fakes.FakeError mockLogFetcher.ReturnError = fakes.FakeError
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -275,7 +274,7 @@ var _ = Describe("Log extractor", func() {
mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse())
@ -290,7 +289,7 @@ var _ = Describe("Log extractor", func() {
mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
@ -304,7 +303,7 @@ var _ = Describe("Log extractor", func() {
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -324,7 +323,7 @@ var _ = Describe("Log extractor", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs))
@ -338,7 +337,7 @@ var _ = Describe("Log extractor", func() {
mockLogRepository.CreateError = fakes.FakeError mockLogRepository.CreateError = fakes.FakeError
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -353,7 +352,7 @@ var _ = Describe("Log extractor", func() {
mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{Id: headerID}} mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{Id: headerID}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID)) Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID))
@ -367,20 +366,19 @@ var _ = Describe("Log extractor", func() {
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits that missing headers were found", func() { It("returns nil for error if everything succeeds", func() {
addUncheckedHeader(extractor) addUncheckedHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked) err := extractor.ExtractLogs(constants.HeaderUnchecked)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(missingHeadersFound).To(BeTrue())
}) })
}) })
}) })

View File

@ -24,21 +24,22 @@ type MockLogDelegator struct {
AddedTransformers []transformer.EventTransformer AddedTransformers []transformer.EventTransformer
DelegateCallCount int DelegateCallCount int
DelegateErrors []error DelegateErrors []error
LogsFound []bool
} }
func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) { func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) {
delegator.AddedTransformers = append(delegator.AddedTransformers, t) delegator.AddedTransformers = append(delegator.AddedTransformers, t)
} }
func (delegator *MockLogDelegator) DelegateLogs() (error, bool) { func (delegator *MockLogDelegator) DelegateLogs() error {
delegator.DelegateCallCount++ delegator.DelegateCallCount++
var delegateErrorThisRun error if len(delegator.DelegateErrors) > 1 {
delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] var delegateErrorThisRun error
if delegateErrorThisRun != nil { delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:]
return delegateErrorThisRun, false return delegateErrorThisRun
} else if len(delegator.DelegateErrors) == 1 {
thisErr := delegator.DelegateErrors[0]
delegator.DelegateErrors = []error{}
return thisErr
} }
var logsFoundThisRun bool return nil
logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:]
return nil, logsFoundThisRun
} }

View File

@ -26,7 +26,6 @@ type MockLogExtractor struct {
AddTransformerConfigError error AddTransformerConfigError error
ExtractLogsCount int ExtractLogsCount int
ExtractLogsErrors []error ExtractLogsErrors []error
UncheckedHeadersExist []bool
} }
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error { func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error {
@ -34,14 +33,16 @@ func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.Event
return extractor.AddTransformerConfigError return extractor.AddTransformerConfigError
} }
func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error {
extractor.ExtractLogsCount++ extractor.ExtractLogsCount++
var errorThisRun error if len(extractor.ExtractLogsErrors) > 1 {
errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] var errorThisRun error
if errorThisRun != nil { errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:]
return errorThisRun, false return errorThisRun
} else if len(extractor.ExtractLogsErrors) == 1 {
thisErr := extractor.ExtractLogsErrors[0]
extractor.ExtractLogsErrors = []error{}
return thisErr
} }
var missingHeadersExist bool return nil
missingHeadersExist, extractor.UncheckedHeadersExist = extractor.UncheckedHeadersExist[0], extractor.UncheckedHeadersExist[1:]
return nil, missingHeadersExist
} }

View File

@ -74,49 +74,53 @@ func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTra
} }
// Extracts and delegates watched log events. // Extracts and delegates watched log events.
func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution, errsChan chan error) { func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution) error {
extractErrsChan := make(chan error)
delegateErrsChan := make(chan error) delegateErrsChan := make(chan error)
extractErrsChan := make(chan error)
defer close(delegateErrsChan)
defer close(extractErrsChan)
go watcher.extractLogs(recheckHeaders, extractErrsChan) go watcher.extractLogs(recheckHeaders, extractErrsChan)
go watcher.delegateLogs(delegateErrsChan) go watcher.delegateLogs(delegateErrsChan)
for { for {
select { select {
case extractErr := <-extractErrsChan:
logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error())
errsChan <- extractErr
case delegateErr := <-delegateErrsChan: case delegateErr := <-delegateErrsChan:
logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error()) logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error())
errsChan <- delegateErr return delegateErr
case extractErr := <-extractErrsChan:
logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error())
return extractErr
} }
} }
} }
func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) { func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) {
err, uncheckedHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders) err := watcher.LogExtractor.ExtractLogs(recheckHeaders)
if err != nil { if err != nil && err != logs.ErrNoUncheckedHeaders {
errs <- err errs <- err
return
} }
if uncheckedHeadersFound { if err == logs.ErrNoUncheckedHeaders {
time.Sleep(NoNewDataPause)
watcher.extractLogs(recheckHeaders, errs) watcher.extractLogs(recheckHeaders, errs)
} else { } else {
time.Sleep(NoNewDataPause)
watcher.extractLogs(recheckHeaders, errs) watcher.extractLogs(recheckHeaders, errs)
} }
} }
func (watcher *EventWatcher) delegateLogs(errs chan error) { func (watcher *EventWatcher) delegateLogs(errs chan error) {
err, logsFound := watcher.LogDelegator.DelegateLogs() err := watcher.LogDelegator.DelegateLogs()
if err != nil { if err != nil && err != logs.ErrNoLogs {
errs <- err errs <- err
return
} }
if logsFound { if err == logs.ErrNoLogs {
time.Sleep(NoNewDataPause)
watcher.delegateLogs(errs) watcher.delegateLogs(errs)
} else { } else {
time.Sleep(NoNewDataPause)
watcher.delegateLogs(errs) watcher.delegateLogs(errs)
} }
} }

View File

@ -17,6 +17,7 @@
package watcher_test package watcher_test
import ( import (
"errors"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
@ -26,6 +27,8 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
) )
var errExecuteClosed = errors.New("this error means the mocks were finished executing")
var _ = Describe("Event Watcher", func() { var _ = Describe("Event Watcher", func() {
var ( var (
delegator *mocks.MockLogDelegator delegator *mocks.MockLogDelegator
@ -57,7 +60,8 @@ var _ = Describe("Event Watcher", func() {
fakeTransformerTwo.FakeTransformerInitializer, fakeTransformerTwo.FakeTransformerInitializer,
} }
eventWatcher.AddTransformers(initializers) err := eventWatcher.AddTransformers(initializers)
Expect(err).NotTo(HaveOccurred())
}) })
It("adds initialized transformer to log delegator", func() { It("adds initialized transformer to log delegator", func() {
@ -78,114 +82,87 @@ var _ = Describe("Event Watcher", func() {
}) })
Describe("Execute", func() { Describe("Execute", func() {
var errsChan chan error
BeforeEach(func() {
errsChan = make(chan error)
})
It("extracts watched logs", func(done Done) { It("extracts watched logs", func(done Done) {
delegator.DelegateErrors = []error{nil} extractor.ExtractLogsErrors = []error{nil, errExecuteClosed}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Eventually(func() int { Expect(err).To(MatchError(errExecuteClosed))
return extractor.ExtractLogsCount Eventually(func() bool {
}).Should(Equal(1)) return extractor.ExtractLogsCount > 0
}).Should(BeTrue())
close(done) close(done)
}) })
It("returns error if extracting logs fails", func(done Done) { It("returns error if extracting logs fails", func(done Done) {
delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{fakes.FakeError} extractor.ExtractLogsErrors = []error{fakes.FakeError}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
close(done) close(done)
}) })
It("extracts watched logs again if missing headers found", func(done Done) { It("extracts watched logs again if missing headers found", func(done Done) {
delegator.DelegateErrors = []error{nil} extractor.ExtractLogsErrors = []error{nil, errExecuteClosed}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil, nil}
extractor.UncheckedHeadersExist = []bool{true, false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Eventually(func() int { Expect(err).To(MatchError(errExecuteClosed))
return extractor.ExtractLogsCount Eventually(func() bool {
}).Should(Equal(2)) return extractor.ExtractLogsCount > 1
}).Should(BeTrue())
close(done) close(done)
}) })
It("returns error if extracting logs fails on subsequent run", func(done Done) { It("returns error if extracting logs fails on subsequent run", func(done Done) {
delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil, fakes.FakeError} extractor.ExtractLogsErrors = []error{nil, fakes.FakeError}
extractor.UncheckedHeadersExist = []bool{true, false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
close(done) close(done)
}) })
It("delegates untransformed logs", func(done Done) { It("delegates untransformed logs", func() {
delegator.DelegateErrors = []error{nil} delegator.DelegateErrors = []error{nil, errExecuteClosed}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Eventually(func() int { Expect(err).To(MatchError(errExecuteClosed))
return delegator.DelegateCallCount Eventually(func() bool {
}).Should(Equal(1)) return delegator.DelegateCallCount > 0
close(done) }).Should(BeTrue())
}) })
It("returns error if delegating logs fails", func(done Done) { It("returns error if delegating logs fails", func(done Done) {
delegator.LogsFound = []bool{false}
delegator.DelegateErrors = []error{fakes.FakeError} delegator.DelegateErrors = []error{fakes.FakeError}
extractor.ExtractLogsErrors = []error{nil}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
close(done) close(done)
}) })
It("delegates logs again if untransformed logs found", func(done Done) { It("delegates logs again if untransformed logs found", func(done Done) {
delegator.DelegateErrors = []error{nil, nil} delegator.DelegateErrors = []error{nil, nil, nil, errExecuteClosed}
delegator.LogsFound = []bool{true, false}
extractor.ExtractLogsErrors = []error{nil}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Eventually(func() int { Expect(err).To(MatchError(errExecuteClosed))
return delegator.DelegateCallCount Eventually(func() bool {
}).Should(Equal(2)) return delegator.DelegateCallCount > 1
}).Should(BeTrue())
close(done) close(done)
}) })
It("returns error if delegating logs fails on subsequent run", func(done Done) { It("returns error if delegating logs fails on subsequent run", func(done Done) {
delegator.DelegateErrors = []error{nil, fakes.FakeError} delegator.DelegateErrors = []error{nil, fakes.FakeError}
delegator.LogsFound = []bool{true, false}
extractor.ExtractLogsErrors = []error{nil}
extractor.UncheckedHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) err := eventWatcher.Execute(constants.HeaderUnchecked)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
close(done) close(done)
}) })
}) })