diff --git a/cmd/execute.go b/cmd/execute.go index 72adf856..b416cbb5 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -160,13 +160,9 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { } else { recheck = constants.HeaderUnchecked } - errs := make(chan error) - go w.Execute(recheck, errs) - for { - select { - case err := <-errs: - LogWithCommand.Fatalf("error executing event watcher: %s", err.Error()) - } + err := w.Execute(recheck) + if err != nil { + LogWithCommand.Fatalf("error executing event watcher: %s", err.Error()) } } diff --git a/libraries/shared/logs/delegator.go b/libraries/shared/logs/delegator.go index 7d2b2a5c..79b398ad 100644 --- a/libraries/shared/logs/delegator.go +++ b/libraries/shared/logs/delegator.go @@ -25,16 +25,14 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore" ) -var ErrNoTransformers = errors.New("no event transformers configured in the log delegator") - -const ( - logsFound = true - noLogsFound = false +var ( + ErrNoLogs = errors.New("no logs available for transforming") + ErrNoTransformers = errors.New("no event transformers configured in the log delegator") ) type ILogDelegator interface { AddTransformer(t transformer.EventTransformer) - DelegateLogs() (error, bool) + DelegateLogs() error } type LogDelegator struct { @@ -48,28 +46,28 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) { delegator.Chunker.AddConfig(t.GetConfig()) } -func (delegator *LogDelegator) DelegateLogs() (error, bool) { +func (delegator *LogDelegator) DelegateLogs() error { if len(delegator.Transformers) < 1 { - return ErrNoTransformers, noLogsFound + return ErrNoTransformers } persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() if fetchErr != nil { logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) - return fetchErr, noLogsFound + return fetchErr } if len(persistedLogs) < 1 { - return nil, noLogsFound + return ErrNoLogs } transformErr := delegator.delegateLogs(persistedLogs) if transformErr != nil { logrus.Errorf("error transforming logs: %s", transformErr) - return transformErr, logsFound + return transformErr } - return nil, logsFound + return nil } func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { diff --git a/libraries/shared/logs/delegator_test.go b/libraries/shared/logs/delegator_test.go index f2bd581c..43813be8 100644 --- a/libraries/shared/logs/delegator_test.go +++ b/libraries/shared/logs/delegator_test.go @@ -59,10 +59,10 @@ var _ = Describe("Log delegator", func() { }) Describe("DelegateLogs", func() { - It("returns an error if no transformers configured", func() { + It("returns error if no transformers configured", func() { delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) - err, _ := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(logs.ErrNoTransformers)) @@ -70,35 +70,36 @@ var _ = Describe("Log delegator", func() { It("gets untransformed logs", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} + mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - err, _ := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).NotTo(HaveOccurred()) Expect(mockLogRepository.GetCalled).To(BeTrue()) }) - It("emits error if getting untransformed logs fails", func() { + It("returns error if getting untransformed logs fails", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.GetError = fakes.FakeError delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - err, _ := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits that no logs were found if no logs returned", func() { + It("returns error that no logs were found if no logs returned", func() { delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator.AddTransformer(&mocks.MockEventTransformer{}) - err, logsFound := delegator.DelegateLogs() + err := delegator.DelegateLogs() - Expect(err).NotTo(HaveOccurred()) - Expect(logsFound).To(BeFalse()) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(logs.ErrNoLogs)) }) It("delegates chunked logs to transformers", func() { @@ -115,27 +116,27 @@ var _ = Describe("Log delegator", func() { delegator := newDelegator(mockLogRepository) delegator.AddTransformer(fakeTransformer) - err, _ := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).NotTo(HaveOccurred()) Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue()) Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs)) }) - It("emits error if transformer returns an error", func() { + It("returns error if transformer returns an error", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} delegator := newDelegator(mockLogRepository) fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} delegator.AddTransformer(fakeTransformer) - err, _ := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits logs found when logs returned and delegated", func() { + It("returns nil for error when logs returned and delegated", func() { fakeTransformer := &mocks.MockEventTransformer{} config := mocks.FakeTransformerConfig fakeTransformer.SetTransformerConfig(config) @@ -149,10 +150,9 @@ var _ = Describe("Log delegator", func() { delegator := newDelegator(mockLogRepository) delegator.AddTransformer(fakeTransformer) - err, logsFound := delegator.DelegateLogs() + err := delegator.DelegateLogs() Expect(err).NotTo(HaveOccurred()) - Expect(logsFound).To(BeTrue()) }) }) }) diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index 36fe6eda..bbd769c2 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -28,16 +28,14 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore" ) -var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") - -const ( - uncheckedHeadersFound = true - noUncheckedHeadersFound = false +var ( + ErrNoUncheckedHeaders = errors.New("no unchecked headers available for log fetching") + ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") ) type ILogExtractor interface { AddTransformerConfig(config transformer.EventTransformerConfig) error - ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) + ExtractLogs(recheckHeaders constants.TransformerExecution) error } type LogExtractor struct { @@ -71,50 +69,50 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran } // Fetch and persist watched logs -func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { +func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { if len(extractor.Addresses) < 1 { logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) - return ErrNoWatchedAddresses, noUncheckedHeadersFound + return ErrNoWatchedAddresses } uncheckedHeaders, uncheckedHeadersErr := extractor.CheckedHeadersRepository.UncheckedHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) if uncheckedHeadersErr != nil { logrus.Errorf("error fetching missing headers: %s", uncheckedHeadersErr) - return uncheckedHeadersErr, noUncheckedHeadersFound + return uncheckedHeadersErr } if len(uncheckedHeaders) < 1 { - return nil, noUncheckedHeadersFound + return ErrNoUncheckedHeaders } for _, header := range uncheckedHeaders { logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) if fetchLogsErr != nil { logError("error fetching logs for header: %s", fetchLogsErr, header) - return fetchLogsErr, uncheckedHeadersFound + return fetchLogsErr } if len(logs) > 0 { transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) if transactionsSyncErr != nil { logError("error syncing transactions: %s", transactionsSyncErr, header) - return transactionsSyncErr, uncheckedHeadersFound + return transactionsSyncErr } createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) if createLogsErr != nil { logError("error persisting logs: %s", createLogsErr, header) - return createLogsErr, uncheckedHeadersFound + return createLogsErr } } markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) if markHeaderCheckedErr != nil { logError("error marking header checked: %s", markHeaderCheckedErr, header) - return markHeaderCheckedErr, uncheckedHeadersFound + return markHeaderCheckedErr } } - return nil, uncheckedHeadersFound + return nil } func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index aecd8e28..c9f5b317 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -157,7 +157,7 @@ var _ = Describe("Log extractor", func() { Describe("ExtractLogs", func() { It("returns error if no watched addresses configured", func() { - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) @@ -171,7 +171,7 @@ var _ = Describe("Log extractor", func() { startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) @@ -188,7 +188,7 @@ var _ = Describe("Log extractor", func() { startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - err, _ := extractor.ExtractLogs(constants.HeaderRecheck) + err := extractor.ExtractLogs(constants.HeaderRecheck) Expect(err).NotTo(HaveOccurred()) Expect(mockCheckedHeadersRepository.UncheckedHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) @@ -197,13 +197,13 @@ var _ = Describe("Log extractor", func() { }) }) - It("emits error if getting unchecked headers fails", func() { + It("returns error if getting unchecked headers fails", func() { addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.UncheckedHeadersReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -215,20 +215,19 @@ var _ = Describe("Log extractor", func() { mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + _ = extractor.ExtractLogs(constants.HeaderUnchecked) - Expect(err).NotTo(HaveOccurred()) Expect(mockLogFetcher.FetchCalled).To(BeFalse()) }) - It("emits that no unchecked headers were found", func() { + It("returns error that no unchecked headers were found", func() { addTransformerConfig(extractor) mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - _, uncheckedHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) - Expect(uncheckedHeadersFound).To(BeFalse()) + Expect(err).To(MatchError(logs.ErrNoUncheckedHeaders)) }) }) @@ -245,7 +244,7 @@ var _ = Describe("Log extractor", func() { mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockLogFetcher.FetchCalled).To(BeTrue()) @@ -262,7 +261,7 @@ var _ = Describe("Log extractor", func() { mockLogFetcher.ReturnError = fakes.FakeError extractor.Fetcher = mockLogFetcher - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -275,7 +274,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) @@ -290,7 +289,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) @@ -304,7 +303,7 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer.SyncTransactionsError = fakes.FakeError extractor.Syncer = mockTransactionSyncer - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -324,7 +323,7 @@ var _ = Describe("Log extractor", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} extractor.LogRepository = mockLogRepository - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) @@ -338,7 +337,7 @@ var _ = Describe("Log extractor", func() { mockLogRepository.CreateError = fakes.FakeError extractor.LogRepository = mockLogRepository - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -353,7 +352,7 @@ var _ = Describe("Log extractor", func() { mockCheckedHeadersRepository.UncheckedHeadersReturnHeaders = []core.Header{{Id: headerID}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID)) @@ -367,20 +366,19 @@ var _ = Describe("Log extractor", func() { mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err, _ := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits that missing headers were found", func() { + It("returns nil for error if everything succeeds", func() { addUncheckedHeader(extractor) addTransformerConfig(extractor) - err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderUnchecked) + err := extractor.ExtractLogs(constants.HeaderUnchecked) Expect(err).NotTo(HaveOccurred()) - Expect(missingHeadersFound).To(BeTrue()) }) }) }) diff --git a/libraries/shared/mocks/log_delegator.go b/libraries/shared/mocks/log_delegator.go index ad6b926e..b627877d 100644 --- a/libraries/shared/mocks/log_delegator.go +++ b/libraries/shared/mocks/log_delegator.go @@ -24,21 +24,22 @@ type MockLogDelegator struct { AddedTransformers []transformer.EventTransformer DelegateCallCount int DelegateErrors []error - LogsFound []bool } func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) { delegator.AddedTransformers = append(delegator.AddedTransformers, t) } -func (delegator *MockLogDelegator) DelegateLogs() (error, bool) { +func (delegator *MockLogDelegator) DelegateLogs() error { delegator.DelegateCallCount++ - var delegateErrorThisRun error - delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] - if delegateErrorThisRun != nil { - return delegateErrorThisRun, false + if len(delegator.DelegateErrors) > 1 { + var delegateErrorThisRun error + delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] + return delegateErrorThisRun + } else if len(delegator.DelegateErrors) == 1 { + thisErr := delegator.DelegateErrors[0] + delegator.DelegateErrors = []error{} + return thisErr } - var logsFoundThisRun bool - logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:] - return nil, logsFoundThisRun + return nil } diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index 363d27b5..8a7be9bc 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -26,7 +26,6 @@ type MockLogExtractor struct { AddTransformerConfigError error ExtractLogsCount int ExtractLogsErrors []error - UncheckedHeadersExist []bool } func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error { @@ -34,14 +33,16 @@ func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.Event return extractor.AddTransformerConfigError } -func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { +func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { extractor.ExtractLogsCount++ - var errorThisRun error - errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] - if errorThisRun != nil { - return errorThisRun, false + if len(extractor.ExtractLogsErrors) > 1 { + var errorThisRun error + errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] + return errorThisRun + } else if len(extractor.ExtractLogsErrors) == 1 { + thisErr := extractor.ExtractLogsErrors[0] + extractor.ExtractLogsErrors = []error{} + return thisErr } - var missingHeadersExist bool - missingHeadersExist, extractor.UncheckedHeadersExist = extractor.UncheckedHeadersExist[0], extractor.UncheckedHeadersExist[1:] - return nil, missingHeadersExist + return nil } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index 80b0a023..d6028c5c 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -74,49 +74,53 @@ func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTra } // Extracts and delegates watched log events. -func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution, errsChan chan error) { - extractErrsChan := make(chan error) +func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution) error { delegateErrsChan := make(chan error) + extractErrsChan := make(chan error) + defer close(delegateErrsChan) + defer close(extractErrsChan) go watcher.extractLogs(recheckHeaders, extractErrsChan) go watcher.delegateLogs(delegateErrsChan) for { select { - case extractErr := <-extractErrsChan: - logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error()) - errsChan <- extractErr case delegateErr := <-delegateErrsChan: logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error()) - errsChan <- delegateErr + return delegateErr + case extractErr := <-extractErrsChan: + logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error()) + return extractErr } } } func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) { - err, uncheckedHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders) - if err != nil { + err := watcher.LogExtractor.ExtractLogs(recheckHeaders) + if err != nil && err != logs.ErrNoUncheckedHeaders { errs <- err + return } - if uncheckedHeadersFound { + if err == logs.ErrNoUncheckedHeaders { + time.Sleep(NoNewDataPause) watcher.extractLogs(recheckHeaders, errs) } else { - time.Sleep(NoNewDataPause) watcher.extractLogs(recheckHeaders, errs) } } func (watcher *EventWatcher) delegateLogs(errs chan error) { - err, logsFound := watcher.LogDelegator.DelegateLogs() - if err != nil { + err := watcher.LogDelegator.DelegateLogs() + if err != nil && err != logs.ErrNoLogs { errs <- err + return } - if logsFound { + if err == logs.ErrNoLogs { + time.Sleep(NoNewDataPause) watcher.delegateLogs(errs) } else { - time.Sleep(NoNewDataPause) watcher.delegateLogs(errs) } } diff --git a/libraries/shared/watcher/event_watcher_test.go b/libraries/shared/watcher/event_watcher_test.go index 08206d39..69f9b5e7 100644 --- a/libraries/shared/watcher/event_watcher_test.go +++ b/libraries/shared/watcher/event_watcher_test.go @@ -17,6 +17,7 @@ package watcher_test import ( + "errors" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" @@ -26,6 +27,8 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/fakes" ) +var errExecuteClosed = errors.New("this error means the mocks were finished executing") + var _ = Describe("Event Watcher", func() { var ( delegator *mocks.MockLogDelegator @@ -57,7 +60,8 @@ var _ = Describe("Event Watcher", func() { fakeTransformerTwo.FakeTransformerInitializer, } - eventWatcher.AddTransformers(initializers) + err := eventWatcher.AddTransformers(initializers) + Expect(err).NotTo(HaveOccurred()) }) It("adds initialized transformer to log delegator", func() { @@ -78,114 +82,87 @@ var _ = Describe("Event Watcher", func() { }) Describe("Execute", func() { - var errsChan chan error - - BeforeEach(func() { - errsChan = make(chan error) - }) It("extracts watched logs", func(done Done) { - delegator.DelegateErrors = []error{nil} - delegator.LogsFound = []bool{false} - extractor.ExtractLogsErrors = []error{nil} - extractor.UncheckedHeadersExist = []bool{false} + extractor.ExtractLogsErrors = []error{nil, errExecuteClosed} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Eventually(func() int { - return extractor.ExtractLogsCount - }).Should(Equal(1)) + Expect(err).To(MatchError(errExecuteClosed)) + Eventually(func() bool { + return extractor.ExtractLogsCount > 0 + }).Should(BeTrue()) close(done) }) It("returns error if extracting logs fails", func(done Done) { - delegator.DelegateErrors = []error{nil} - delegator.LogsFound = []bool{false} extractor.ExtractLogsErrors = []error{fakes.FakeError} - extractor.UncheckedHeadersExist = []bool{false} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) + Expect(err).To(MatchError(fakes.FakeError)) close(done) }) It("extracts watched logs again if missing headers found", func(done Done) { - delegator.DelegateErrors = []error{nil} - delegator.LogsFound = []bool{false} - extractor.ExtractLogsErrors = []error{nil, nil} - extractor.UncheckedHeadersExist = []bool{true, false} + extractor.ExtractLogsErrors = []error{nil, errExecuteClosed} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Eventually(func() int { - return extractor.ExtractLogsCount - }).Should(Equal(2)) + Expect(err).To(MatchError(errExecuteClosed)) + Eventually(func() bool { + return extractor.ExtractLogsCount > 1 + }).Should(BeTrue()) close(done) }) It("returns error if extracting logs fails on subsequent run", func(done Done) { - delegator.DelegateErrors = []error{nil} - delegator.LogsFound = []bool{false} extractor.ExtractLogsErrors = []error{nil, fakes.FakeError} - extractor.UncheckedHeadersExist = []bool{true, false} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) + Expect(err).To(MatchError(fakes.FakeError)) close(done) - }) - It("delegates untransformed logs", func(done Done) { - delegator.DelegateErrors = []error{nil} - delegator.LogsFound = []bool{false} - extractor.ExtractLogsErrors = []error{nil} - extractor.UncheckedHeadersExist = []bool{false} + It("delegates untransformed logs", func() { + delegator.DelegateErrors = []error{nil, errExecuteClosed} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Eventually(func() int { - return delegator.DelegateCallCount - }).Should(Equal(1)) - close(done) + Expect(err).To(MatchError(errExecuteClosed)) + Eventually(func() bool { + return delegator.DelegateCallCount > 0 + }).Should(BeTrue()) }) It("returns error if delegating logs fails", func(done Done) { - delegator.LogsFound = []bool{false} delegator.DelegateErrors = []error{fakes.FakeError} - extractor.ExtractLogsErrors = []error{nil} - extractor.UncheckedHeadersExist = []bool{false} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) + Expect(err).To(MatchError(fakes.FakeError)) close(done) }) It("delegates logs again if untransformed logs found", func(done Done) { - delegator.DelegateErrors = []error{nil, nil} - delegator.LogsFound = []bool{true, false} - extractor.ExtractLogsErrors = []error{nil} - extractor.UncheckedHeadersExist = []bool{false} + delegator.DelegateErrors = []error{nil, nil, nil, errExecuteClosed} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Eventually(func() int { - return delegator.DelegateCallCount - }).Should(Equal(2)) + Expect(err).To(MatchError(errExecuteClosed)) + Eventually(func() bool { + return delegator.DelegateCallCount > 1 + }).Should(BeTrue()) close(done) }) It("returns error if delegating logs fails on subsequent run", func(done Done) { delegator.DelegateErrors = []error{nil, fakes.FakeError} - delegator.LogsFound = []bool{true, false} - extractor.ExtractLogsErrors = []error{nil} - extractor.UncheckedHeadersExist = []bool{false} - go eventWatcher.Execute(constants.HeaderUnchecked, errsChan) + err := eventWatcher.Execute(constants.HeaderUnchecked) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) + Expect(err).To(MatchError(fakes.FakeError)) close(done) }) })