diff --git a/cmd/execute.go b/cmd/execute.go index c6d7bb15..108eba82 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -157,10 +157,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { } else { recheck = constants.HeaderMissing } - ticker := time.NewTicker(pollingInterval) - defer ticker.Stop() - for range ticker.C { - w.Execute(recheck) + errs := make(chan error) + go w.Execute(recheck, errs) + for { + select { + case err := <-errs: + LogWithCommand.Fatalf("error executing event watcher: %s", err.Error()) + } } } diff --git a/libraries/shared/logs/delegator.go b/libraries/shared/logs/delegator.go index 67637a53..f04023f2 100644 --- a/libraries/shared/logs/delegator.go +++ b/libraries/shared/logs/delegator.go @@ -29,7 +29,7 @@ var ErrNoTransformers = errors.New("no event transformers configured in the log type ILogDelegator interface { AddTransformer(t transformer.EventTransformer) - DelegateLogs() error + DelegateLogs(errs chan error, logsFound chan bool) } type LogDelegator struct { @@ -43,24 +43,31 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) { delegator.Chunker.AddConfig(t.GetConfig()) } -func (delegator LogDelegator) DelegateLogs() error { +func (delegator *LogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { if len(delegator.Transformers) < 1 { - return ErrNoTransformers + errs <- ErrNoTransformers + return } persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() if fetchErr != nil { logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) - return fetchErr + errs <- fetchErr + return + } + + if len(persistedLogs) < 1 { + logsFound <- false } transformErr := delegator.delegateLogs(persistedLogs) if transformErr != nil { logrus.Errorf("error transforming logs: %s", transformErr) - return transformErr + errs <- transformErr + return } - return nil + logsFound <- true } func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { diff --git a/libraries/shared/logs/delegator_test.go b/libraries/shared/logs/delegator_test.go index 15e59a9d..c1132bd5 100644 --- a/libraries/shared/logs/delegator_test.go +++ b/libraries/shared/logs/delegator_test.go @@ -59,75 +59,123 @@ var _ = Describe("Log delegator", func() { }) Describe("DelegateLogs", func() { - It("returns an error if no transformers configured", func() { - delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) + var ( + errsChan chan error + logsFound chan bool + ) - err := delegator.DelegateLogs() - - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(logs.ErrNoTransformers)) + BeforeEach(func() { + errsChan = make(chan error) + logsFound = make(chan bool) }) - It("gets untransformed logs", func() { + It("returns an error if no transformers configured", func(done Done) { + delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) + + go delegator.DelegateLogs(errsChan, logsFound) + + Expect(<-errsChan).To(MatchError(logs.ErrNoTransformers)) + close(done) + }) + + It("gets untransformed logs", func(done Done) { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - err := delegator.DelegateLogs() + go delegator.DelegateLogs(errsChan, logsFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockLogRepository.GetCalled).To(BeTrue()) + Eventually(func() bool { + return mockLogRepository.GetCalled + }).Should(BeTrue()) + close(done) }) - It("returns error if getting untransformed logs fails", func() { + It("emits error if getting untransformed logs fails", func(done Done) { mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.GetError = fakes.FakeError delegator := newDelegator(mockLogRepository) delegator.AddTransformer(&mocks.MockEventTransformer{}) - err := delegator.DelegateLogs() + go delegator.DelegateLogs(errsChan, logsFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) }) - It("delegates chunked logs to transformers", func() { + It("emits that no logs were found if no logs returned", func(done Done) { + delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) + delegator.AddTransformer(&mocks.MockEventTransformer{}) + + go delegator.DelegateLogs(errsChan, logsFound) + + Expect(<-logsFound).To(BeFalse()) + close(done) + }) + + It("delegates chunked logs to transformers", func(done Done) { fakeTransformer := &mocks.MockEventTransformer{} - fakeTransformer.SetTransformerConfig(mocks.FakeTransformerConfig) + config := mocks.FakeTransformerConfig + fakeTransformer.SetTransformerConfig(config) fakeGethLog := types.Log{ - Address: common.HexToAddress(fakeTransformer.GetConfig().ContractAddresses[0]), - Topics: []common.Hash{common.HexToHash(fakeTransformer.GetConfig().Topic)}, + Address: common.HexToAddress(config.ContractAddresses[0]), + Topics: []common.Hash{common.HexToHash(config.Topic)}, } - fakeHeaderSyncLog := core.HeaderSyncLog{Log: fakeGethLog} - fakeHeaderSyncLogs := []core.HeaderSyncLog{fakeHeaderSyncLog} + fakeHeaderSyncLogs := []core.HeaderSyncLog{{Log: fakeGethLog}} mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository.ReturnLogs = fakeHeaderSyncLogs delegator := newDelegator(mockLogRepository) delegator.AddTransformer(fakeTransformer) - err := delegator.DelegateLogs() + go delegator.DelegateLogs(errsChan, logsFound) - Expect(err).NotTo(HaveOccurred()) - Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue()) - Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs)) + Eventually(func() bool { + return fakeTransformer.ExecuteWasCalled + }).Should(BeTrue()) + Eventually(func() []core.HeaderSyncLog { + return fakeTransformer.PassedLogs + }).Should(Equal(fakeHeaderSyncLogs)) + close(done) }) - It("returns an error if transformer returns an error", func() { - delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) + It("emits error if transformer returns an error", func(done Done) { + mockLogRepository := &fakes.MockHeaderSyncLogRepository{} + mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} + delegator := newDelegator(mockLogRepository) fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} delegator.AddTransformer(fakeTransformer) - err := delegator.DelegateLogs() + go delegator.DelegateLogs(errsChan, logsFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) + + It("emits logs found when logs returned and delegated", func(done Done) { + fakeTransformer := &mocks.MockEventTransformer{} + config := mocks.FakeTransformerConfig + fakeTransformer.SetTransformerConfig(config) + fakeGethLog := types.Log{ + Address: common.HexToAddress(config.ContractAddresses[0]), + Topics: []common.Hash{common.HexToHash(config.Topic)}, + } + fakeHeaderSyncLogs := []core.HeaderSyncLog{{Log: fakeGethLog}} + mockLogRepository := &fakes.MockHeaderSyncLogRepository{} + mockLogRepository.ReturnLogs = fakeHeaderSyncLogs + delegator := newDelegator(mockLogRepository) + delegator.AddTransformer(fakeTransformer) + + go delegator.DelegateLogs(errsChan, logsFound) + + Expect(<-logsFound).To(BeTrue()) + close(done) }) }) }) -func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) logs.LogDelegator { - return logs.LogDelegator{ +func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) *logs.LogDelegator { + return &logs.LogDelegator{ Chunker: chunker.NewLogChunker(), LogRepository: headerSyncLogRepository, } diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index fccce1b5..5c42dd8a 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -32,7 +32,7 @@ var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the l type ILogExtractor interface { AddTransformerConfig(config transformer.EventTransformerConfig) - ExtractLogs(recheckHeaders constants.TransformerExecution) error + ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) } type LogExtractor struct { @@ -59,46 +59,56 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran } // Fetch and persist watched logs -func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { +func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { if len(extractor.Addresses) < 1 { logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) - return ErrNoWatchedAddresses + errs <- ErrNoWatchedAddresses + return } missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) if missingHeadersErr != nil { logrus.Errorf("error fetching missing headers: %s", missingHeadersErr) - return missingHeadersErr + errs <- missingHeadersErr + return + } + + if len(missingHeaders) < 1 { + missingHeadersFound <- false + return } for _, header := range missingHeaders { logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) if fetchLogsErr != nil { logError("error fetching logs for header: %s", fetchLogsErr, header) - return fetchLogsErr + errs <- fetchLogsErr + return } if len(logs) > 0 { transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) if transactionsSyncErr != nil { logError("error syncing transactions: %s", transactionsSyncErr, header) - return transactionsSyncErr + errs <- transactionsSyncErr + return } createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) if createLogsErr != nil { logError("error persisting logs: %s", createLogsErr, header) - return createLogsErr + errs <- createLogsErr + return } } markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) if markHeaderCheckedErr != nil { logError("error marking header checked: %s", markHeaderCheckedErr, header) - return markHeaderCheckedErr + errs <- markHeaderCheckedErr } } - return nil + missingHeadersFound <- true } func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index 151086be..539c4c29 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -81,201 +81,270 @@ var _ = Describe("Log extractor", func() { }) Describe("ExtractLogs", func() { - It("returns error if no watched addresses configured", func() { - err := extractor.ExtractLogs(constants.HeaderMissing) + var ( + errsChan chan error + missingHeadersFound chan bool + ) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) + BeforeEach(func() { + errsChan = make(chan error) + missingHeadersFound = make(chan bool) + }) + + It("returns error if no watched addresses configured", func(done Done) { + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + + Expect(<-errsChan).To(MatchError(logs.ErrNoWatchedAddresses)) + close(done) }) Describe("when checking missing headers", func() { - It("gets missing headers since configured starting block with check count < 1", func() { + It("gets missing headers since configured starting block with check_count < 1", func(done Done) { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) - Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) - Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1))) + Eventually(func() int64 { + return mockCheckedHeadersRepository.StartingBlockNumber + }).Should(Equal(startingBlockNumber)) + Eventually(func() int64 { + return mockCheckedHeadersRepository.EndingBlockNumber + }).Should(Equal(int64(-1))) + Eventually(func() int64 { + return mockCheckedHeadersRepository.CheckCount + }).Should(Equal(int64(1))) + close(done) }) }) Describe("when rechecking headers", func() { - It("gets missing headers since configured starting block with check count < 1", func() { + It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func(done Done) { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) - err := extractor.ExtractLogs(constants.HeaderRecheck) + go extractor.ExtractLogs(constants.HeaderRecheck, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) - Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) - Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap)) + Eventually(func() int64 { + return mockCheckedHeadersRepository.StartingBlockNumber + }).Should(Equal(startingBlockNumber)) + Eventually(func() int64 { + return mockCheckedHeadersRepository.EndingBlockNumber + }).Should(Equal(int64(-1))) + Eventually(func() int64 { + return mockCheckedHeadersRepository.CheckCount + }).Should(Equal(constants.RecheckHeaderCap)) + close(done) }) }) - It("returns error if getting missing headers fails", func() { + It("emits error if getting missing headers fails", func(done Done) { addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) }) - It("does not fetch logs if no missing headers", func() { - addTransformerConfig(extractor) - mockLogFetcher := &mocks.MockLogFetcher{} - extractor.Fetcher = mockLogFetcher + Describe("when no missing headers", func() { + It("does not fetch logs", func(done Done) { + addTransformerConfig(extractor) + mockLogFetcher := &mocks.MockLogFetcher{} + extractor.Fetcher = mockLogFetcher - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockLogFetcher.FetchCalled).To(BeFalse()) + Consistently(func() bool { + return mockLogFetcher.FetchCalled + }).Should(BeFalse()) + close(done) + }) + + It("emits that no missing headers were found", func(done Done) { + addTransformerConfig(extractor) + mockLogFetcher := &mocks.MockLogFetcher{} + extractor.Fetcher = mockLogFetcher + + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + + Expect(<-missingHeadersFound).To(BeFalse()) + close(done) + }) }) - It("fetches logs for missing headers", func() { - addMissingHeader(extractor) - config := transformer.EventTransformerConfig{ - ContractAddresses: []string{fakes.FakeAddress.Hex()}, - Topic: fakes.FakeHash.Hex(), - StartingBlockNumber: rand.Int63(), - } - extractor.AddTransformerConfig(config) - mockLogFetcher := &mocks.MockLogFetcher{} - extractor.Fetcher = mockLogFetcher + Describe("when there are missing headers", func() { + It("fetches logs for missing headers", func(done Done) { + addMissingHeader(extractor) + config := transformer.EventTransformerConfig{ + ContractAddresses: []string{fakes.FakeAddress.Hex()}, + Topic: fakes.FakeHash.Hex(), + StartingBlockNumber: rand.Int63(), + } + extractor.AddTransformerConfig(config) + mockLogFetcher := &mocks.MockLogFetcher{} + extractor.Fetcher = mockLogFetcher - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockLogFetcher.FetchCalled).To(BeTrue()) - Expect(mockLogFetcher.Topics).To(Equal([]common.Hash{common.HexToHash(config.Topic)})) - Expect(mockLogFetcher.ContractAddresses).To(Equal(transformer.HexStringsToAddresses(config.ContractAddresses))) - }) + Eventually(func() bool { + return mockLogFetcher.FetchCalled + }).Should(BeTrue()) + expectedTopics := []common.Hash{common.HexToHash(config.Topic)} + Eventually(func() []common.Hash { + return mockLogFetcher.Topics + }).Should(Equal(expectedTopics)) + expectedAddresses := transformer.HexStringsToAddresses(config.ContractAddresses) + Eventually(func() []common.Address { + return mockLogFetcher.ContractAddresses + }).Should(Equal(expectedAddresses)) + close(done) + }) - It("returns error if fetching logs fails", func() { - addMissingHeader(extractor) - addTransformerConfig(extractor) - mockLogFetcher := &mocks.MockLogFetcher{} - mockLogFetcher.ReturnError = fakes.FakeError - extractor.Fetcher = mockLogFetcher + It("returns error if fetching logs fails", func(done Done) { + addMissingHeader(extractor) + addTransformerConfig(extractor) + mockLogFetcher := &mocks.MockLogFetcher{} + mockLogFetcher.ReturnError = fakes.FakeError + extractor.Fetcher = mockLogFetcher - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) - }) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) - It("does not sync transactions if no fetched logs", func() { - addMissingHeader(extractor) - addTransformerConfig(extractor) - mockTransactionSyncer := &fakes.MockTransactionSyncer{} - extractor.Syncer = mockTransactionSyncer + Describe("when no fetched logs", func() { + It("does not sync transactions", func(done Done) { + addMissingHeader(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + extractor.Syncer = mockTransactionSyncer - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) - }) + Consistently(func() bool { + return mockTransactionSyncer.SyncTransactionsCalled + }).Should(BeFalse()) + close(done) + }) + }) - It("syncs transactions for fetched logs", func() { - addMissingHeader(extractor) - addFetchedLog(extractor) - addTransformerConfig(extractor) - mockTransactionSyncer := &fakes.MockTransactionSyncer{} - extractor.Syncer = mockTransactionSyncer + Describe("when there are fetched logs", func() { + It("syncs transactions", func(done Done) { + addMissingHeader(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + extractor.Syncer = mockTransactionSyncer - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) - }) + Eventually(func() bool { + return mockTransactionSyncer.SyncTransactionsCalled + }).Should(BeTrue()) + close(done) + }) - It("returns error if syncing transactions fails", func() { - addMissingHeader(extractor) - addFetchedLog(extractor) - addTransformerConfig(extractor) - mockTransactionSyncer := &fakes.MockTransactionSyncer{} - mockTransactionSyncer.SyncTransactionsError = fakes.FakeError - extractor.Syncer = mockTransactionSyncer + It("returns error if syncing transactions fails", func(done Done) { + addMissingHeader(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockTransactionSyncer := &fakes.MockTransactionSyncer{} + mockTransactionSyncer.SyncTransactionsError = fakes.FakeError + extractor.Syncer = mockTransactionSyncer - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) - }) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) - It("persists fetched logs", func() { - addMissingHeader(extractor) - addTransformerConfig(extractor) - fakeLogs := []types.Log{{ - Address: common.HexToAddress("0xA"), - Topics: []common.Hash{common.HexToHash("0xA")}, - Data: []byte{}, - Index: 0, - }} - mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs} - extractor.Fetcher = mockLogFetcher - mockLogRepository := &fakes.MockHeaderSyncLogRepository{} - extractor.LogRepository = mockLogRepository + It("persists fetched logs", func(done Done) { + addMissingHeader(extractor) + addTransformerConfig(extractor) + fakeLogs := []types.Log{{ + Address: common.HexToAddress("0xA"), + Topics: []common.Hash{common.HexToHash("0xA")}, + Data: []byte{}, + Index: 0, + }} + mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs} + extractor.Fetcher = mockLogFetcher + mockLogRepository := &fakes.MockHeaderSyncLogRepository{} + extractor.LogRepository = mockLogRepository - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) - }) + Eventually(func() []types.Log { + return mockLogRepository.PassedLogs + }).Should(Equal(fakeLogs)) + close(done) + }) - It("returns error if persisting logs fails", func() { - addMissingHeader(extractor) - addFetchedLog(extractor) - addTransformerConfig(extractor) - mockLogRepository := &fakes.MockHeaderSyncLogRepository{} - mockLogRepository.CreateError = fakes.FakeError - extractor.LogRepository = mockLogRepository + It("returns error if persisting logs fails", func(done Done) { + addMissingHeader(extractor) + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockLogRepository := &fakes.MockHeaderSyncLogRepository{} + mockLogRepository.CreateError = fakes.FakeError + extractor.LogRepository = mockLogRepository - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) - }) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) + }) - It("marks header checked", func() { - addFetchedLog(extractor) - addTransformerConfig(extractor) - mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - headerID := rand.Int63() - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} - extractor.CheckedHeadersRepository = mockCheckedHeadersRepository + It("marks header checked", func(done Done) { + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} + headerID := rand.Int63() + mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} + extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID)) - }) + Eventually(func() int64 { + return mockCheckedHeadersRepository.HeaderID + }).Should(Equal(headerID)) + close(done) + }) - It("returns error if marking header checked fails", func() { - addFetchedLog(extractor) - addTransformerConfig(extractor) - mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}} - mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError - extractor.CheckedHeadersRepository = mockCheckedHeadersRepository + It("returns error if marking header checked fails", func(done Done) { + addFetchedLog(extractor) + addTransformerConfig(extractor) + mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} + mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}} + mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError + extractor.CheckedHeadersRepository = mockCheckedHeadersRepository - err := extractor.ExtractLogs(constants.HeaderMissing) + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) + + It("emits that missing headers were found", func(done Done) { + addMissingHeader(extractor) + addTransformerConfig(extractor) + + go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) + + Expect(<-missingHeadersFound).To(BeTrue()) + close(done) + }) }) }) }) diff --git a/libraries/shared/mocks/log_delegator.go b/libraries/shared/mocks/log_delegator.go index 95ad5fe8..8a2338a4 100644 --- a/libraries/shared/mocks/log_delegator.go +++ b/libraries/shared/mocks/log_delegator.go @@ -22,15 +22,23 @@ import ( type MockLogDelegator struct { AddedTransformers []transformer.EventTransformer - DelegateCalled bool - DelegateError error + DelegateCallCount int + DelegateErrors []error + LogsFound []bool } func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) { delegator.AddedTransformers = append(delegator.AddedTransformers, t) } -func (delegator *MockLogDelegator) DelegateLogs() error { - delegator.DelegateCalled = true - return delegator.DelegateError +func (delegator *MockLogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { + delegator.DelegateCallCount++ + var delegateErrorThisRun error + delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] + if delegateErrorThisRun != nil { + errs <- delegateErrorThisRun + } + var logsFoundThisRun bool + logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:] + logsFound <- logsFoundThisRun } diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index b179b1ea..0fba9d27 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -22,16 +22,24 @@ import ( ) type MockLogExtractor struct { - AddedConfigs []transformer.EventTransformerConfig - ExtractLogsCalled bool - ExtractLogsError error + AddedConfigs []transformer.EventTransformerConfig + ExtractLogsCount int + ExtractLogsErrors []error + MissingHeadersExist []bool } func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) { extractor.AddedConfigs = append(extractor.AddedConfigs, config) } -func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { - extractor.ExtractLogsCalled = true - return extractor.ExtractLogsError +func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { + extractor.ExtractLogsCount++ + var errorThisRun error + errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] + if errorThisRun != nil { + errs <- errorThisRun + } + var missingHeadersExist bool + missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:] + missingHeadersFound <- missingHeadersExist } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index c682400c..ff40e302 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -27,8 +27,11 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "time" ) +const NoNewDataPause = time.Second * 7 + type EventWatcher struct { blockChain core.BlockChain db *postgres.DB @@ -66,18 +69,62 @@ func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTra } // Extracts and delegates watched log events. -func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution) error { - extractErr := watcher.LogExtractor.ExtractLogs(recheckHeaders) - if extractErr != nil { - logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error()) - return extractErr - } +func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution, errsChan chan error) { + extractErrsChan := make(chan error) + delegateErrsChan := make(chan error) - delegateErr := watcher.LogDelegator.DelegateLogs() - if delegateErr != nil { - logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error()) - return delegateErr - } + go watcher.extractLogs(recheckHeaders, extractErrsChan) + go watcher.delegateLogs(delegateErrsChan) + + for { + select { + case extractErr := <-extractErrsChan: + logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error()) + errsChan <- extractErr + case delegateErr := <-delegateErrsChan: + logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error()) + errsChan <- delegateErr + } + } +} + +func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) { + extractLogsErr := make(chan error) + missingHeadersFound := make(chan bool) + go watcher.LogExtractor.ExtractLogs(recheckHeaders, extractLogsErr, missingHeadersFound) + + for { + select { + case err := <-extractLogsErr: + errs <- err + case missingHeaders := <-missingHeadersFound: + if missingHeaders { + go watcher.extractLogs(recheckHeaders, errs) + } else { + time.Sleep(NoNewDataPause) + go watcher.extractLogs(recheckHeaders, errs) + } + } + } +} + +func (watcher *EventWatcher) delegateLogs(errs chan error) { + delegateLogsErr := make(chan error) + logsFound := make(chan bool) + go watcher.LogDelegator.DelegateLogs(delegateLogsErr, logsFound) + + for { + select { + case err := <-delegateLogsErr: + errs <- err + case logs := <-logsFound: + if logs { + go watcher.delegateLogs(errs) + } else { + time.Sleep(NoNewDataPause) + go watcher.delegateLogs(errs) + } + } + } - return nil } diff --git a/libraries/shared/watcher/event_watcher_test.go b/libraries/shared/watcher/event_watcher_test.go index 041aca42..be165396 100644 --- a/libraries/shared/watcher/event_watcher_test.go +++ b/libraries/shared/watcher/event_watcher_test.go @@ -30,13 +30,13 @@ var _ = Describe("Event Watcher", func() { var ( delegator *mocks.MockLogDelegator extractor *mocks.MockLogExtractor - eventWatcher watcher.EventWatcher + eventWatcher *watcher.EventWatcher ) BeforeEach(func() { delegator = &mocks.MockLogDelegator{} extractor = &mocks.MockLogExtractor{} - eventWatcher = watcher.EventWatcher{ + eventWatcher = &watcher.EventWatcher{ LogDelegator: delegator, LogExtractor: extractor, } @@ -78,36 +78,115 @@ var _ = Describe("Event Watcher", func() { }) Describe("Execute", func() { - It("extracts watched logs", func() { - err := eventWatcher.Execute(constants.HeaderMissing) + var errsChan chan error - Expect(err).NotTo(HaveOccurred()) - Expect(extractor.ExtractLogsCalled).To(BeTrue()) + BeforeEach(func() { + errsChan = make(chan error) }) - It("returns error if extracting logs fails", func() { - extractor.ExtractLogsError = fakes.FakeError + It("extracts watched logs", func(done Done) { + delegator.DelegateErrors = []error{nil} + delegator.LogsFound = []bool{false} + extractor.ExtractLogsErrors = []error{nil} + extractor.MissingHeadersExist = []bool{false} - err := eventWatcher.Execute(constants.HeaderMissing) + go eventWatcher.Execute(constants.HeaderMissing, errsChan) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Eventually(func() int { + return extractor.ExtractLogsCount + }).Should(Equal(1)) + close(done) }) - It("delegates untransformed logs", func() { - err := eventWatcher.Execute(constants.HeaderMissing) + It("returns error if extracting logs fails", func(done Done) { + delegator.DelegateErrors = []error{nil} + delegator.LogsFound = []bool{false} + extractor.ExtractLogsErrors = []error{fakes.FakeError} + extractor.MissingHeadersExist = []bool{false} - Expect(err).NotTo(HaveOccurred()) - Expect(delegator.DelegateCalled).To(BeTrue()) + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) }) - It("returns error if delegating logs fails", func() { - delegator.DelegateError = fakes.FakeError + It("extracts watched logs again if missing headers found", func(done Done) { + delegator.DelegateErrors = []error{nil} + delegator.LogsFound = []bool{false} + extractor.ExtractLogsErrors = []error{nil, nil} + extractor.MissingHeadersExist = []bool{true, false} - err := eventWatcher.Execute(constants.HeaderMissing) + go eventWatcher.Execute(constants.HeaderMissing, errsChan) - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(fakes.FakeError)) + Eventually(func() int { + return extractor.ExtractLogsCount + }).Should(Equal(2)) + close(done) + }) + + It("returns error if extracting logs fails on subsequent run", func(done Done) { + delegator.DelegateErrors = []error{nil} + delegator.LogsFound = []bool{false} + extractor.ExtractLogsErrors = []error{nil, fakes.FakeError} + extractor.MissingHeadersExist = []bool{true, false} + + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + + }) + + It("delegates untransformed logs", func(done Done) { + delegator.DelegateErrors = []error{nil} + delegator.LogsFound = []bool{false} + extractor.ExtractLogsErrors = []error{nil} + extractor.MissingHeadersExist = []bool{false} + + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Eventually(func() int { + return delegator.DelegateCallCount + }).Should(Equal(1)) + close(done) + }) + + It("returns error if delegating logs fails", func(done Done) { + delegator.LogsFound = []bool{false} + delegator.DelegateErrors = []error{fakes.FakeError} + extractor.ExtractLogsErrors = []error{nil} + extractor.MissingHeadersExist = []bool{false} + + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) + }) + + It("delegates logs again if untransformed logs found", func(done Done) { + delegator.DelegateErrors = []error{nil, nil} + delegator.LogsFound = []bool{true, false} + extractor.ExtractLogsErrors = []error{nil} + extractor.MissingHeadersExist = []bool{false} + + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Eventually(func() int { + return delegator.DelegateCallCount + }).Should(Equal(2)) + close(done) + }) + + It("returns error if delegating logs fails on subsequent run", func(done Done) { + delegator.DelegateErrors = []error{nil, fakes.FakeError} + delegator.LogsFound = []bool{true, false} + extractor.ExtractLogsErrors = []error{nil} + extractor.MissingHeadersExist = []bool{false} + + go eventWatcher.Execute(constants.HeaderMissing, errsChan) + + Expect(<-errsChan).To(MatchError(fakes.FakeError)) + close(done) }) }) }) diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository.go b/pkg/datastore/postgres/repositories/checked_headers_repository.go index 272a8dae..341cb272 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository.go @@ -52,8 +52,7 @@ func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingB LEFT JOIN checked_headers on headers.id = header_id WHERE (header_id ISNULL OR check_count < $2) AND headers.block_number >= $1 - AND headers.eth_node_fingerprint = $3 - LIMIT 100` + AND headers.eth_node_fingerprint = $3` err = repo.db.Select(&result, query, startingBlockNumber, checkCount, repo.db.Node.ID) } else { query = `SELECT headers.id, headers.block_number, headers.hash FROM headers @@ -61,8 +60,7 @@ func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingB WHERE (header_id ISNULL OR check_count < $3) AND headers.block_number >= $1 AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $4 - LIMIT 100` + AND headers.eth_node_fingerprint = $4` err = repo.db.Select(&result, query, startingBlockNumber, endingBlockNumber, checkCount, repo.db.Node.ID) } diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go index 422d00b3..c9a9284a 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go @@ -170,18 +170,6 @@ var _ = Describe("Checked Headers repository", func() { Expect(nodeTwoMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10))) Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10))) }) - - It("only returns 100 results to prevent blocking log delegation", func() { - for n := outOfRangeBlockNumber + 1; n < outOfRangeBlockNumber+100; n++ { - _, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - - missingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber+200, uncheckedCheckCount) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(missingHeaders)).To(Equal(100)) - }) }) Describe("when ending block is -1", func() { @@ -252,19 +240,6 @@ var _ = Describe("Checked Headers repository", func() { Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10))) Expect(nodeTwoMissingHeaders[3].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10))) }) - - It("only returns 100 results to prevent blocking log delegation", func() { - for n := outOfRangeBlockNumber + 1; n < outOfRangeBlockNumber+100; n++ { - _, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - } - - missingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount) - - Expect(err).NotTo(HaveOccurred()) - Expect(len(missingHeaders)).To(Equal(100)) - }) }) - }) })