Remove unnecessary async from the event watcher

- extract and delegate logs synchronously after initial goroutine fired
This commit is contained in:
Rob Mulholand 2019-08-14 17:29:04 -05:00
parent 1883a11ab1
commit d76be4962b
7 changed files with 149 additions and 217 deletions

View File

@ -27,9 +27,14 @@ import (
var ErrNoTransformers = errors.New("no event transformers configured in the log delegator") var ErrNoTransformers = errors.New("no event transformers configured in the log delegator")
const (
logsFound = true
noLogsFound = false
)
type ILogDelegator interface { type ILogDelegator interface {
AddTransformer(t transformer.EventTransformer) AddTransformer(t transformer.EventTransformer)
DelegateLogs(errs chan error, logsFound chan bool) DelegateLogs() (error, bool)
} }
type LogDelegator struct { type LogDelegator struct {
@ -43,31 +48,28 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) {
delegator.Chunker.AddConfig(t.GetConfig()) delegator.Chunker.AddConfig(t.GetConfig())
} }
func (delegator *LogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { func (delegator *LogDelegator) DelegateLogs() (error, bool) {
if len(delegator.Transformers) < 1 { if len(delegator.Transformers) < 1 {
errs <- ErrNoTransformers return ErrNoTransformers, noLogsFound
return
} }
persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs()
if fetchErr != nil { if fetchErr != nil {
logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) logrus.Errorf("error loading logs from db: %s", fetchErr.Error())
errs <- fetchErr return fetchErr, noLogsFound
return
} }
if len(persistedLogs) < 1 { if len(persistedLogs) < 1 {
logsFound <- false return nil, noLogsFound
} }
transformErr := delegator.delegateLogs(persistedLogs) transformErr := delegator.delegateLogs(persistedLogs)
if transformErr != nil { if transformErr != nil {
logrus.Errorf("error transforming logs: %s", transformErr) logrus.Errorf("error transforming logs: %s", transformErr)
errs <- transformErr return transformErr, logsFound
return
} }
logsFound <- true return nil, logsFound
} }
func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error {

View File

@ -59,61 +59,49 @@ var _ = Describe("Log delegator", func() {
}) })
Describe("DelegateLogs", func() { Describe("DelegateLogs", func() {
var ( It("returns an error if no transformers configured", func() {
errsChan chan error
logsFound chan bool
)
BeforeEach(func() {
errsChan = make(chan error)
logsFound = make(chan bool)
})
It("returns an error if no transformers configured", func(done Done) {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
go delegator.DelegateLogs(errsChan, logsFound) err, _ := delegator.DelegateLogs()
Expect(<-errsChan).To(MatchError(logs.ErrNoTransformers)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(logs.ErrNoTransformers))
}) })
It("gets untransformed logs", func(done Done) { It("gets untransformed logs", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
go delegator.DelegateLogs(errsChan, logsFound) err, _ := delegator.DelegateLogs()
Eventually(func() bool { Expect(err).NotTo(HaveOccurred())
return mockLogRepository.GetCalled Expect(mockLogRepository.GetCalled).To(BeTrue())
}).Should(BeTrue())
close(done)
}) })
It("emits error if getting untransformed logs fails", func(done Done) { It("emits error if getting untransformed logs fails", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.GetError = fakes.FakeError mockLogRepository.GetError = fakes.FakeError
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
go delegator.DelegateLogs(errsChan, logsFound) err, _ := delegator.DelegateLogs()
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits that no logs were found if no logs returned", func(done Done) { It("emits that no logs were found if no logs returned", func() {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
go delegator.DelegateLogs(errsChan, logsFound) err, logsFound := delegator.DelegateLogs()
Expect(<-logsFound).To(BeFalse()) Expect(err).NotTo(HaveOccurred())
close(done) Expect(logsFound).To(BeFalse())
}) })
It("delegates chunked logs to transformers", func(done Done) { It("delegates chunked logs to transformers", func() {
fakeTransformer := &mocks.MockEventTransformer{} fakeTransformer := &mocks.MockEventTransformer{}
config := mocks.FakeTransformerConfig config := mocks.FakeTransformerConfig
fakeTransformer.SetTransformerConfig(config) fakeTransformer.SetTransformerConfig(config)
@ -127,31 +115,27 @@ var _ = Describe("Log delegator", func() {
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
go delegator.DelegateLogs(errsChan, logsFound) err, _ := delegator.DelegateLogs()
Eventually(func() bool { Expect(err).NotTo(HaveOccurred())
return fakeTransformer.ExecuteWasCalled Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue())
}).Should(BeTrue()) Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs))
Eventually(func() []core.HeaderSyncLog {
return fakeTransformer.PassedLogs
}).Should(Equal(fakeHeaderSyncLogs))
close(done)
}) })
It("emits error if transformer returns an error", func(done Done) { It("emits error if transformer returns an error", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}} mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}}
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError}
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
go delegator.DelegateLogs(errsChan, logsFound) err, _ := delegator.DelegateLogs()
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits logs found when logs returned and delegated", func(done Done) { It("emits logs found when logs returned and delegated", func() {
fakeTransformer := &mocks.MockEventTransformer{} fakeTransformer := &mocks.MockEventTransformer{}
config := mocks.FakeTransformerConfig config := mocks.FakeTransformerConfig
fakeTransformer.SetTransformerConfig(config) fakeTransformer.SetTransformerConfig(config)
@ -165,13 +149,12 @@ var _ = Describe("Log delegator", func() {
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
go delegator.DelegateLogs(errsChan, logsFound) err, logsFound := delegator.DelegateLogs()
Expect(<-logsFound).To(BeTrue()) Expect(err).NotTo(HaveOccurred())
close(done) Expect(logsFound).To(BeTrue())
}) })
}) })
}) })
func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) *logs.LogDelegator { func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) *logs.LogDelegator {

View File

@ -30,9 +30,14 @@ import (
var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor")
const (
missingHeadersFound = true
noMissingHeadersFound = false
)
type ILogExtractor interface { type ILogExtractor interface {
AddTransformerConfig(config transformer.EventTransformerConfig) AddTransformerConfig(config transformer.EventTransformerConfig)
ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool)
} }
type LogExtractor struct { type LogExtractor struct {
@ -59,56 +64,50 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran
} }
// Fetch and persist watched logs // Fetch and persist watched logs
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) {
if len(extractor.Addresses) < 1 { if len(extractor.Addresses) < 1 {
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
errs <- ErrNoWatchedAddresses return ErrNoWatchedAddresses, noMissingHeadersFound
return
} }
missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
if missingHeadersErr != nil { if missingHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", missingHeadersErr) logrus.Errorf("error fetching missing headers: %s", missingHeadersErr)
errs <- missingHeadersErr return missingHeadersErr, noMissingHeadersFound
return
} }
if len(missingHeaders) < 1 { if len(missingHeaders) < 1 {
missingHeadersFound <- false return nil, noMissingHeadersFound
return
} }
for _, header := range missingHeaders { for _, header := range missingHeaders {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil { if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header) logError("error fetching logs for header: %s", fetchLogsErr, header)
errs <- fetchLogsErr return fetchLogsErr, missingHeadersFound
return
} }
if len(logs) > 0 { if len(logs) > 0 {
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil { if transactionsSyncErr != nil {
logError("error syncing transactions: %s", transactionsSyncErr, header) logError("error syncing transactions: %s", transactionsSyncErr, header)
errs <- transactionsSyncErr return transactionsSyncErr, missingHeadersFound
return
} }
createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs)
if createLogsErr != nil { if createLogsErr != nil {
logError("error persisting logs: %s", createLogsErr, header) logError("error persisting logs: %s", createLogsErr, header)
errs <- createLogsErr return createLogsErr, missingHeadersFound
return
} }
} }
markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
if markHeaderCheckedErr != nil { if markHeaderCheckedErr != nil {
logError("error marking header checked: %s", markHeaderCheckedErr, header) logError("error marking header checked: %s", markHeaderCheckedErr, header)
errs <- markHeaderCheckedErr return markHeaderCheckedErr, missingHeadersFound
} }
} }
missingHeadersFound <- true return nil, missingHeadersFound
} }
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {

View File

@ -81,109 +81,84 @@ var _ = Describe("Log extractor", func() {
}) })
Describe("ExtractLogs", func() { Describe("ExtractLogs", func() {
var ( It("returns error if no watched addresses configured", func() {
errsChan chan error err, _ := extractor.ExtractLogs(constants.HeaderMissing)
missingHeadersFound chan bool
)
BeforeEach(func() { Expect(err).To(HaveOccurred())
errsChan = make(chan error) Expect(err).To(MatchError(logs.ErrNoWatchedAddresses))
missingHeadersFound = make(chan bool)
})
It("returns error if no watched addresses configured", func(done Done) {
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(<-errsChan).To(MatchError(logs.ErrNoWatchedAddresses))
close(done)
}) })
Describe("when checking missing headers", func() { Describe("when checking missing headers", func() {
It("gets missing headers since configured starting block with check_count < 1", func(done Done) { It("gets missing headers since configured starting block with check_count < 1", func() {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Eventually(func() int64 { Expect(err).NotTo(HaveOccurred())
return mockCheckedHeadersRepository.StartingBlockNumber Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber))
}).Should(Equal(startingBlockNumber)) Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1)))
Eventually(func() int64 { Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1)))
return mockCheckedHeadersRepository.EndingBlockNumber
}).Should(Equal(int64(-1)))
Eventually(func() int64 {
return mockCheckedHeadersRepository.CheckCount
}).Should(Equal(int64(1)))
close(done)
}) })
}) })
Describe("when rechecking headers", func() { Describe("when rechecking headers", func() {
It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func(done Done) { It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func() {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
go extractor.ExtractLogs(constants.HeaderRecheck, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderRecheck)
Eventually(func() int64 { Expect(err).NotTo(HaveOccurred())
return mockCheckedHeadersRepository.StartingBlockNumber Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber))
}).Should(Equal(startingBlockNumber)) Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1)))
Eventually(func() int64 { Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap))
return mockCheckedHeadersRepository.EndingBlockNumber
}).Should(Equal(int64(-1)))
Eventually(func() int64 {
return mockCheckedHeadersRepository.CheckCount
}).Should(Equal(constants.RecheckHeaderCap))
close(done)
}) })
}) })
It("emits error if getting missing headers fails", func(done Done) { It("emits error if getting missing headers fails", func() {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
Describe("when no missing headers", func() { Describe("when no missing headers", func() {
It("does not fetch logs", func(done Done) { It("does not fetch logs", func() {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Consistently(func() bool { Expect(err).NotTo(HaveOccurred())
return mockLogFetcher.FetchCalled Expect(mockLogFetcher.FetchCalled).To(BeFalse())
}).Should(BeFalse())
close(done)
}) })
It("emits that no missing headers were found", func(done Done) { It("emits that no missing headers were found", func() {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) _, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-missingHeadersFound).To(BeFalse()) Expect(missingHeadersFound).To(BeFalse())
close(done)
}) })
}) })
Describe("when there are missing headers", func() { Describe("when there are missing headers", func() {
It("fetches logs for missing headers", func(done Done) { It("fetches logs for missing headers", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
config := transformer.EventTransformerConfig{ config := transformer.EventTransformerConfig{
ContractAddresses: []string{fakes.FakeAddress.Hex()}, ContractAddresses: []string{fakes.FakeAddress.Hex()},
@ -194,68 +169,58 @@ var _ = Describe("Log extractor", func() {
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Eventually(func() bool { Expect(err).NotTo(HaveOccurred())
return mockLogFetcher.FetchCalled Expect(mockLogFetcher.FetchCalled).To(BeTrue())
}).Should(BeTrue())
expectedTopics := []common.Hash{common.HexToHash(config.Topic)} expectedTopics := []common.Hash{common.HexToHash(config.Topic)}
Eventually(func() []common.Hash { Expect(mockLogFetcher.Topics).To(Equal(expectedTopics))
return mockLogFetcher.Topics
}).Should(Equal(expectedTopics))
expectedAddresses := transformer.HexStringsToAddresses(config.ContractAddresses) expectedAddresses := transformer.HexStringsToAddresses(config.ContractAddresses)
Eventually(func() []common.Address { Expect(mockLogFetcher.ContractAddresses).To(Equal(expectedAddresses))
return mockLogFetcher.ContractAddresses
}).Should(Equal(expectedAddresses))
close(done)
}) })
It("returns error if fetching logs fails", func(done Done) { It("returns error if fetching logs fails", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
mockLogFetcher.ReturnError = fakes.FakeError mockLogFetcher.ReturnError = fakes.FakeError
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
Describe("when no fetched logs", func() { Describe("when no fetched logs", func() {
It("does not sync transactions", func(done Done) { It("does not sync transactions", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Consistently(func() bool { Expect(err).NotTo(HaveOccurred())
return mockTransactionSyncer.SyncTransactionsCalled Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse())
}).Should(BeFalse())
close(done)
}) })
}) })
Describe("when there are fetched logs", func() { Describe("when there are fetched logs", func() {
It("syncs transactions", func(done Done) { It("syncs transactions", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Eventually(func() bool { Expect(err).NotTo(HaveOccurred())
return mockTransactionSyncer.SyncTransactionsCalled Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
}).Should(BeTrue())
close(done)
}) })
It("returns error if syncing transactions fails", func(done Done) { It("returns error if syncing transactions fails", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
@ -263,13 +228,13 @@ var _ = Describe("Log extractor", func() {
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("persists fetched logs", func(done Done) { It("persists fetched logs", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
fakeLogs := []types.Log{{ fakeLogs := []types.Log{{
@ -283,15 +248,13 @@ var _ = Describe("Log extractor", func() {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Eventually(func() []types.Log { Expect(err).NotTo(HaveOccurred())
return mockLogRepository.PassedLogs Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs))
}).Should(Equal(fakeLogs))
close(done)
}) })
It("returns error if persisting logs fails", func(done Done) { It("returns error if persisting logs fails", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
@ -299,14 +262,14 @@ var _ = Describe("Log extractor", func() {
mockLogRepository.CreateError = fakes.FakeError mockLogRepository.CreateError = fakes.FakeError
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
}) })
It("marks header checked", func(done Done) { It("marks header checked", func() {
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
@ -314,15 +277,13 @@ var _ = Describe("Log extractor", func() {
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Eventually(func() int64 { Expect(err).NotTo(HaveOccurred())
return mockCheckedHeadersRepository.HeaderID Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID))
}).Should(Equal(headerID))
close(done)
}) })
It("returns error if marking header checked fails", func(done Done) { It("returns error if marking header checked fails", func() {
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
@ -330,20 +291,20 @@ var _ = Describe("Log extractor", func() {
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-errsChan).To(MatchError(fakes.FakeError)) Expect(err).To(HaveOccurred())
close(done) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("emits that missing headers were found", func(done Done) { It("emits that missing headers were found", func() {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound) err, missingHeadersFound := extractor.ExtractLogs(constants.HeaderMissing)
Expect(<-missingHeadersFound).To(BeTrue()) Expect(err).NotTo(HaveOccurred())
close(done) Expect(missingHeadersFound).To(BeTrue())
}) })
}) })
}) })

View File

@ -31,14 +31,14 @@ func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer
delegator.AddedTransformers = append(delegator.AddedTransformers, t) delegator.AddedTransformers = append(delegator.AddedTransformers, t)
} }
func (delegator *MockLogDelegator) DelegateLogs(errs chan error, logsFound chan bool) { func (delegator *MockLogDelegator) DelegateLogs() (error, bool) {
delegator.DelegateCallCount++ delegator.DelegateCallCount++
var delegateErrorThisRun error var delegateErrorThisRun error
delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:] delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:]
if delegateErrorThisRun != nil { if delegateErrorThisRun != nil {
errs <- delegateErrorThisRun return delegateErrorThisRun, false
} }
var logsFoundThisRun bool var logsFoundThisRun bool
logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:] logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:]
logsFound <- logsFoundThisRun return nil, logsFoundThisRun
} }

View File

@ -32,14 +32,14 @@ func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.Event
extractor.AddedConfigs = append(extractor.AddedConfigs, config) extractor.AddedConfigs = append(extractor.AddedConfigs, config)
} }
func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) { func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) {
extractor.ExtractLogsCount++ extractor.ExtractLogsCount++
var errorThisRun error var errorThisRun error
errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:] errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:]
if errorThisRun != nil { if errorThisRun != nil {
errs <- errorThisRun return errorThisRun, false
} }
var missingHeadersExist bool var missingHeadersExist bool
missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:] missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:]
missingHeadersFound <- missingHeadersExist return nil, missingHeadersExist
} }

View File

@ -89,42 +89,29 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti
} }
func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) { func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) {
extractLogsErr := make(chan error) err, missingHeadersFound := watcher.LogExtractor.ExtractLogs(recheckHeaders)
missingHeadersFound := make(chan bool) if err != nil {
go watcher.LogExtractor.ExtractLogs(recheckHeaders, extractLogsErr, missingHeadersFound) errs <- err
}
for { if missingHeadersFound {
select { watcher.extractLogs(recheckHeaders, errs)
case err := <-extractLogsErr: } else {
errs <- err time.Sleep(NoNewDataPause)
case missingHeaders := <-missingHeadersFound: watcher.extractLogs(recheckHeaders, errs)
if missingHeaders {
go watcher.extractLogs(recheckHeaders, errs)
} else {
time.Sleep(NoNewDataPause)
go watcher.extractLogs(recheckHeaders, errs)
}
}
} }
} }
func (watcher *EventWatcher) delegateLogs(errs chan error) { func (watcher *EventWatcher) delegateLogs(errs chan error) {
delegateLogsErr := make(chan error) err, logsFound := watcher.LogDelegator.DelegateLogs()
logsFound := make(chan bool) if err != nil {
go watcher.LogDelegator.DelegateLogs(delegateLogsErr, logsFound) errs <- err
for {
select {
case err := <-delegateLogsErr:
errs <- err
case logs := <-logsFound:
if logs {
go watcher.delegateLogs(errs)
} else {
time.Sleep(NoNewDataPause)
go watcher.delegateLogs(errs)
}
}
} }
if logsFound {
watcher.delegateLogs(errs)
} else {
time.Sleep(NoNewDataPause)
watcher.delegateLogs(errs)
}
} }