diff --git a/libraries/shared/logs/delegator.go b/libraries/shared/logs/delegator.go index f04023f2..7d2b2a5c 100644 --- a/libraries/shared/logs/delegator.go +++ b/libraries/shared/logs/delegator.go @@ -27,9 +27,14 @@ import ( var ErrNoTransformers = errors.New("no event transformers configured in the log delegator") +const ( + logsFound = true + noLogsFound = false +) + type ILogDelegator interface { AddTransformer(t transformer.EventTransformer) - DelegateLogs(errs chan error, logsFound chan bool) + DelegateLogs() (error, bool) } type LogDelegator struct { @@ -43,31 +48,28 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) { delegator.Chunker.AddConfig(t.GetConfig()) } -func (delegator *LogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { +func (delegator *LogDelegator) DelegateLogs() (error, bool) { if len(delegator.Transformers) < 1 { - errs <- ErrNoTransformers - return + return ErrNoTransformers, noLogsFound } persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() if fetchErr != nil { logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) - errs <- fetchErr - return + return fetchErr, noLogsFound } if len(persistedLogs) < 1 { - logsFound <- false + return nil, noLogsFound } transformErr := delegator.delegateLogs(persistedLogs) if transformErr != nil { logrus.Errorf("error transforming logs: %s", transformErr) - errs <- transformErr - return + return transformErr, logsFound } - logsFound <- true + return nil, logsFound } func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { diff --git a/libraries/shared/logs/delegator_test.go b/libraries/shared/logs/delegator_test.go index c1132bd5..f2bd581c 100644 --- a/libraries/shared/logs/delegator_test.go +++ b/libraries/shared/logs/delegator_test.go @@ -59,61 +59,49 @@ var _ = Describe("Log delegator", func() { }) Describe("DelegateLogs", func() { - var ( - errsChan chan error - logsFound chan bool - ) - - BeforeEach(func() { - errsChan = make(chan error) - logsFound = make(chan bool) - }) - - It("returns an error if no transformers configured", func(done Done) { + It("returns an error if no transformers configured", func() { delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) - go delegator.DelegateLogs(errsChan, logsFound) + err, _ := delegator.DelegateLogs() - Expect(<-errsChan).To(MatchError(logs.ErrNoTransformers)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(logs.ErrNoTransformers)) }) - It("gets untransformed logs", func(done Done) { + It("gets untransformed logs", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - go delegator.DelegateLogs(errsChan, logsFound) + err, _ := delegator.DelegateLogs() - Eventually(func() bool { - return mockLogRepository.GetCalled - }).Should(BeTrue()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogRepository.GetCalled).To(BeTrue()) }) - It("emits error if getting untransformed logs fails", func(done Done) { + It("emits error if getting untransformed logs fails", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.GetError = fakes.FakeError delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - go delegator.DelegateLogs(errsChan, logsFound) + err, _ := delegator.DelegateLogs() - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits that no logs were found if no logs returned", func(done Done) { + It("emits that no logs were found if no logs returned", func() { delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator.AddTransformer(&mocks.MockEventTransformer{}) - go delegator.DelegateLogs(errsChan, logsFound) + err, logsFound := delegator.DelegateLogs() - Expect(<-logsFound).To(BeFalse()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(logsFound).To(BeFalse()) }) - It("delegates chunked logs to transformers", func(done Done) { + It("delegates chunked logs to transformers", func() { fakeTransformer := &mocks.MockEventTransformer{} config := mocks.FakeTransformerConfig fakeTransformer.SetTransformerConfig(config) @@ -127,31 +115,27 @@ var _ = Describe("Log delegator", func() { delegator := newDelegator(mockLogRepository) delegator.AddTransformer(fakeTransformer) - go delegator.DelegateLogs(errsChan, logsFound) + err, _ := delegator.DelegateLogs() - Eventually(func() bool { - return fakeTransformer.ExecuteWasCalled - }).Should(BeTrue()) - Eventually(func() []core.HeaderSyncLog { - return fakeTransformer.PassedLogs - }).Should(Equal(fakeHeaderSyncLogs)) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue()) + Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs)) }) - It("emits error if transformer returns an error", func(done Done) { + It("emits error if transformer returns an error", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} delegator := newDelegator(mockLogRepository) fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} delegator.AddTransformer(fakeTransformer) - go delegator.DelegateLogs(errsChan, logsFound) + err, _ := delegator.DelegateLogs() - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits logs found when logs returned and delegated", func(done Done) { + It("emits logs found when logs returned and delegated", func() { fakeTransformer := &mocks.MockEventTransformer{} config := mocks.FakeTransformerConfig fakeTransformer.SetTransformerConfig(config) @@ -165,13 +149,12 @@ var _ = Describe("Log delegator", func() { delegator := newDelegator(mockLogRepository) delegator.AddTransformer(fakeTransformer) - go delegator.DelegateLogs(errsChan, logsFound) + err, logsFound := delegator.DelegateLogs() - Expect(<-logsFound).To(BeTrue()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(logsFound).To(BeTrue()) }) }) - }) func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) *logs.LogDelegator { diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index 5c42dd8a..f8132352 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -30,9 +30,14 @@ import ( var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") +const ( + missingHeadersFound = true + noMissingHeadersFound = false +) + type ILogExtractor interface { AddTransformerConfig(config transformer.EventTransformerConfig) - ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) + ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) } type LogExtractor struct { @@ -59,56 +64,50 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran } // Fetch and persist watched logs -func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { +func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { if len(extractor.Addresses) < 1 { logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) - errs <- ErrNoWatchedAddresses - return + return ErrNoWatchedAddresses, noMissingHeadersFound } missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) if missingHeadersErr != nil { logrus.Errorf("error fetching missing headers: %s", missingHeadersErr) - errs <- missingHeadersErr - return + return missingHeadersErr, noMissingHeadersFound } if len(missingHeaders) < 1 { - missingHeadersFound <- false - return + return nil, noMissingHeadersFound } for _, header := range missingHeaders { logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) if fetchLogsErr != nil { logError("error fetching logs for header: %s", fetchLogsErr, header) - errs <- fetchLogsErr - return + return fetchLogsErr, missingHeadersFound } if len(logs) > 0 { transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) if transactionsSyncErr != nil { logError("error syncing transactions: %s", transactionsSyncErr, header) - errs <- transactionsSyncErr - return + return transactionsSyncErr, missingHeadersFound } createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) if createLogsErr != nil { logError("error persisting logs: %s", createLogsErr, header) - errs <- createLogsErr - return + return createLogsErr, missingHeadersFound } } markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) if markHeaderCheckedErr != nil { logError("error marking header checked: %s", markHeaderCheckedErr, header) - errs <- markHeaderCheckedErr + return markHeaderCheckedErr, missingHeadersFound } } - missingHeadersFound <- true + return nil, missingHeadersFound } func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index 539c4c29..d21f2f6a 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -81,109 +81,84 @@ var _ = Describe("Log extractor", func() { }) Describe("ExtractLogs", func() { - var ( - errsChan chan error - missingHeadersFound chan bool - ) + It("returns error if no watched addresses configured", func() { + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - BeforeEach(func() { - errsChan = make(chan error) - missingHeadersFound = make(chan bool) - }) - - It("returns error if no watched addresses configured", func(done Done) { - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - - Expect(<-errsChan).To(MatchError(logs.ErrNoWatchedAddresses)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) }) Describe("when checking missing headers", func() { - It("gets missing headers since configured starting block with check_count < 1", func(done Done) { + It("gets missing headers since configured starting block with check_count < 1", func() { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Eventually(func() int64 { - return mockCheckedHeadersRepository.StartingBlockNumber - }).Should(Equal(startingBlockNumber)) - Eventually(func() int64 { - return mockCheckedHeadersRepository.EndingBlockNumber - }).Should(Equal(int64(-1))) - Eventually(func() int64 { - return mockCheckedHeadersRepository.CheckCount - }).Should(Equal(int64(1))) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) + Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) + Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1))) }) }) Describe("when rechecking headers", func() { - It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func(done Done) { + It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func() { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - go extractor.ExtractLogs(constants.HeaderRecheck, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderRecheck) - Eventually(func() int64 { - return mockCheckedHeadersRepository.StartingBlockNumber - }).Should(Equal(startingBlockNumber)) - Eventually(func() int64 { - return mockCheckedHeadersRepository.EndingBlockNumber - }).Should(Equal(int64(-1))) - Eventually(func() int64 { - return mockCheckedHeadersRepository.CheckCount - }).Should(Equal(constants.RecheckHeaderCap)) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) + Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) + Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap)) }) }) - It("emits error if getting missing headers fails", func(done Done) { + It("emits error if getting missing headers fails", func() { addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) Describe("when no missing headers", func() { - It("does not fetch logs", func(done Done) { + It("does not fetch logs", func() { addTransformerConfig(extractor) mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Consistently(func() bool { - return mockLogFetcher.FetchCalled - }).Should(BeFalse()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogFetcher.FetchCalled).To(BeFalse()) }) - It("emits that no missing headers were found", func(done Done) { + It("emits that no missing headers were found", func() { addTransformerConfig(extractor) mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + _, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-missingHeadersFound).To(BeFalse()) - close(done) + Expect(missingHeadersFound).To(BeFalse()) }) }) Describe("when there are missing headers", func() { - It("fetches logs for missing headers", func(done Done) { + It("fetches logs for missing headers", func() { addMissingHeader(extractor) config := transformer.EventTransformerConfig{ ContractAddresses: []string{fakes.FakeAddress.Hex()}, @@ -194,68 +169,58 @@ var _ = Describe("Log extractor", func() { mockLogFetcher := &mocks.MockLogFetcher{} extractor.Fetcher = mockLogFetcher - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Eventually(func() bool { - return mockLogFetcher.FetchCalled - }).Should(BeTrue()) + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogFetcher.FetchCalled).To(BeTrue()) expectedTopics := []common.Hash{common.HexToHash(config.Topic)} - Eventually(func() []common.Hash { - return mockLogFetcher.Topics - }).Should(Equal(expectedTopics)) + Expect(mockLogFetcher.Topics).To(Equal(expectedTopics)) expectedAddresses := transformer.HexStringsToAddresses(config.ContractAddresses) - Eventually(func() []common.Address { - return mockLogFetcher.ContractAddresses - }).Should(Equal(expectedAddresses)) - close(done) + Expect(mockLogFetcher.ContractAddresses).To(Equal(expectedAddresses)) }) - It("returns error if fetching logs fails", func(done Done) { + It("returns error if fetching logs fails", func() { addMissingHeader(extractor) addTransformerConfig(extractor) mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher.ReturnError = fakes.FakeError extractor.Fetcher = mockLogFetcher - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) Describe("when no fetched logs", func() { - It("does not sync transactions", func(done Done) { + It("does not sync transactions", func() { addMissingHeader(extractor) addTransformerConfig(extractor) mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Consistently(func() bool { - return mockTransactionSyncer.SyncTransactionsCalled - }).Should(BeFalse()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) }) }) Describe("when there are fetched logs", func() { - It("syncs transactions", func(done Done) { + It("syncs transactions", func() { addMissingHeader(extractor) addFetchedLog(extractor) addTransformerConfig(extractor) mockTransactionSyncer := &fakes.MockTransactionSyncer{} extractor.Syncer = mockTransactionSyncer - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Eventually(func() bool { - return mockTransactionSyncer.SyncTransactionsCalled - }).Should(BeTrue()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) }) - It("returns error if syncing transactions fails", func(done Done) { + It("returns error if syncing transactions fails", func() { addMissingHeader(extractor) addFetchedLog(extractor) addTransformerConfig(extractor) @@ -263,13 +228,13 @@ var _ = Describe("Log extractor", func() { mockTransactionSyncer.SyncTransactionsError = fakes.FakeError extractor.Syncer = mockTransactionSyncer - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) - It("persists fetched logs", func(done Done) { + It("persists fetched logs", func() { addMissingHeader(extractor) addTransformerConfig(extractor) fakeLogs := []types.Log{{ @@ -283,15 +248,13 @@ var _ = Describe("Log extractor", func() { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} extractor.LogRepository = mockLogRepository - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Eventually(func() []types.Log { - return mockLogRepository.PassedLogs - }).Should(Equal(fakeLogs)) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) }) - It("returns error if persisting logs fails", func(done Done) { + It("returns error if persisting logs fails", func() { addMissingHeader(extractor) addFetchedLog(extractor) addTransformerConfig(extractor) @@ -299,14 +262,14 @@ var _ = Describe("Log extractor", func() { mockLogRepository.CreateError = fakes.FakeError extractor.LogRepository = mockLogRepository - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) }) - It("marks header checked", func(done Done) { + It("marks header checked", func() { addFetchedLog(extractor) addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} @@ -314,15 +277,13 @@ var _ = Describe("Log extractor", func() { mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Eventually(func() int64 { - return mockCheckedHeadersRepository.HeaderID - }).Should(Equal(headerID)) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID)) }) - It("returns error if marking header checked fails", func(done Done) { + It("returns error if marking header checked fails", func() { addFetchedLog(extractor) addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} @@ -330,20 +291,20 @@ var _ = Describe("Log extractor", func() { mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, _ := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-errsChan).To(MatchError(fakes.FakeError)) - close(done) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) }) - It("emits that missing headers were found", func(done Done) { + It("emits that missing headers were found", func() { addMissingHeader(extractor) addTransformerConfig(extractor) - go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing) - Expect(<-missingHeadersFound).To(BeTrue()) - close(done) + Expect(err).NotTo(HaveOccurred()) + Expect(missingHeadersFound).To(BeTrue()) }) }) }) diff --git a/libraries/shared/mocks/log_delegator.go b/libraries/shared/mocks/log_delegator.go index 8a2338a4..ad6b926e 100644 --- a/libraries/shared/mocks/log_delegator.go +++ b/libraries/shared/mocks/log_delegator.go @@ -31,14 +31,14 @@ func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer delegator.AddedTransformers = append(delegator.AddedTransformers, t) } -func (delegator *MockLogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { +func (delegator *MockLogDelegator) DelegateLogs() (error, bool) { delegator.DelegateCallCount++ var delegateErrorThisRun error delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] if delegateErrorThisRun != nil { - errs <- delegateErrorThisRun + return delegateErrorThisRun, false } var logsFoundThisRun bool logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:] - logsFound <- logsFoundThisRun + return nil, logsFoundThisRun } diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index 0fba9d27..1cb26132 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -32,14 +32,14 @@ func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.Event extractor.AddedConfigs = append(extractor.AddedConfigs, config) } -func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { +func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { extractor.ExtractLogsCount++ var errorThisRun error errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] if errorThisRun != nil { - errs <- errorThisRun + return errorThisRun, false } var missingHeadersExist bool missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:] - missingHeadersFound <- missingHeadersExist + return nil, missingHeadersExist } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index ff40e302..e10da6c6 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -89,42 +89,29 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti } func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) { - extractLogsErr := make(chan error) - missingHeadersFound := make(chan bool) - go watcher.LogExtractor.ExtractLogs(recheckHeaders, extractLogsErr, missingHeadersFound) + err, missingHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders) + if err != nil { + errs <- err + } - for { - select { - case err := <-extractLogsErr: - errs <- err - case missingHeaders := <-missingHeadersFound: - if missingHeaders { - go watcher.extractLogs(recheckHeaders, errs) - } else { - time.Sleep(NoNewDataPause) - go watcher.extractLogs(recheckHeaders, errs) - } - } + if missingHeadersFound { + watcher.extractLogs(recheckHeaders, errs) + } else { + time.Sleep(NoNewDataPause) + watcher.extractLogs(recheckHeaders, errs) } } func (watcher *EventWatcher) delegateLogs(errs chan error) { - delegateLogsErr := make(chan error) - logsFound := make(chan bool) - go watcher.LogDelegator.DelegateLogs(delegateLogsErr, logsFound) - - for { - select { - case err := <-delegateLogsErr: - errs <- err - case logs := <-logsFound: - if logs { - go watcher.delegateLogs(errs) - } else { - time.Sleep(NoNewDataPause) - go watcher.delegateLogs(errs) - } - } + err, logsFound := watcher.LogDelegator.DelegateLogs() + if err != nil { + errs <- err } + if logsFound { + watcher.delegateLogs(errs) + } else { + time.Sleep(NoNewDataPause) + watcher.delegateLogs(errs) + } }