Extract and delegate logs concurrently

This commit is contained in:
Rob Mulholand 2019-08-12 12:26:49 -05:00
parent d496dad33c
commit 63dabbb051
11 changed files with 506 additions and 254 deletions

View File

@ -157,10 +157,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
} else { } else {
recheck = constants.HeaderMissing recheck = constants.HeaderMissing
} }
ticker := time.NewTicker(pollingInterval) errs := make(chan error)
defer ticker.Stop() go w.Execute(recheck, errs)
for range ticker.C { for {
w.Execute(recheck) select {
case err := <-errs:
LogWithCommand.Fatalf("error executing event watcher: %s", err.Error())
}
} }
} }

View File

@ -29,7 +29,7 @@ var ErrNoTransformers = errors.New("no event transformers configured in the log
type ILogDelegator interface { type ILogDelegator interface {
AddTransformer(t transformer.EventTransformer) AddTransformer(t transformer.EventTransformer)
DelegateLogs() error DelegateLogs(errs chan error, logsFound chan bool)
} }
type LogDelegator struct { type LogDelegator struct {
@ -43,24 +43,31 @@ func (delegator *LogDelegator) AddTransformer(t transformer.EventTransformer) {
delegator.Chunker.AddConfig(t.GetConfig()) delegator.Chunker.AddConfig(t.GetConfig())
} }
func (delegator LogDelegator) DelegateLogs() error { func (delegator *LogDelegator) DelegateLogs(errs chan error, logsFound chan bool) {
if len(delegator.Transformers) < 1 { if len(delegator.Transformers) < 1 {
return ErrNoTransformers errs <- ErrNoTransformers
return
} }
persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs() persistedLogs, fetchErr := delegator.LogRepository.GetUntransformedHeaderSyncLogs()
if fetchErr != nil { if fetchErr != nil {
logrus.Errorf("error loading logs from db: %s", fetchErr.Error()) logrus.Errorf("error loading logs from db: %s", fetchErr.Error())
return fetchErr errs <- fetchErr
return
}
if len(persistedLogs) < 1 {
logsFound <- false
} }
transformErr := delegator.delegateLogs(persistedLogs) transformErr := delegator.delegateLogs(persistedLogs)
if transformErr != nil { if transformErr != nil {
logrus.Errorf("error transforming logs: %s", transformErr) logrus.Errorf("error transforming logs: %s", transformErr)
return transformErr errs <- transformErr
return
} }
return nil logsFound <- true
} }
func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error { func (delegator *LogDelegator) delegateLogs(logs []core.HeaderSyncLog) error {

View File

@ -59,75 +59,123 @@ var _ = Describe("Log delegator", func() {
}) })
Describe("DelegateLogs", func() { Describe("DelegateLogs", func() {
It("returns an error if no transformers configured", func() { var (
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) errsChan chan error
logsFound chan bool
)
err := delegator.DelegateLogs() BeforeEach(func() {
errsChan = make(chan error)
Expect(err).To(HaveOccurred()) logsFound = make(chan bool)
Expect(err).To(MatchError(logs.ErrNoTransformers))
}) })
It("gets untransformed logs", func() { It("returns an error if no transformers configured", func(done Done) {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
go delegator.DelegateLogs(errsChan, logsFound)
Expect(<-errsChan).To(MatchError(logs.ErrNoTransformers))
close(done)
})
It("gets untransformed logs", func(done Done) {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
err := delegator.DelegateLogs() go delegator.DelegateLogs(errsChan, logsFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() bool {
Expect(mockLogRepository.GetCalled).To(BeTrue()) return mockLogRepository.GetCalled
}).Should(BeTrue())
close(done)
}) })
It("returns error if getting untransformed logs fails", func() { It("emits error if getting untransformed logs fails", func(done Done) {
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.GetError = fakes.FakeError mockLogRepository.GetError = fakes.FakeError
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(&mocks.MockEventTransformer{}) delegator.AddTransformer(&mocks.MockEventTransformer{})
err := delegator.DelegateLogs() go delegator.DelegateLogs(errsChan, logsFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
}) })
It("delegates chunked logs to transformers", func() { It("emits that no logs were found if no logs returned", func(done Done) {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{})
delegator.AddTransformer(&mocks.MockEventTransformer{})
go delegator.DelegateLogs(errsChan, logsFound)
Expect(<-logsFound).To(BeFalse())
close(done)
})
It("delegates chunked logs to transformers", func(done Done) {
fakeTransformer := &mocks.MockEventTransformer{} fakeTransformer := &mocks.MockEventTransformer{}
fakeTransformer.SetTransformerConfig(mocks.FakeTransformerConfig) config := mocks.FakeTransformerConfig
fakeTransformer.SetTransformerConfig(config)
fakeGethLog := types.Log{ fakeGethLog := types.Log{
Address: common.HexToAddress(fakeTransformer.GetConfig().ContractAddresses[0]), Address: common.HexToAddress(config.ContractAddresses[0]),
Topics: []common.Hash{common.HexToHash(fakeTransformer.GetConfig().Topic)}, Topics: []common.Hash{common.HexToHash(config.Topic)},
} }
fakeHeaderSyncLog := core.HeaderSyncLog{Log: fakeGethLog} fakeHeaderSyncLogs := []core.HeaderSyncLog{{Log: fakeGethLog}}
fakeHeaderSyncLogs := []core.HeaderSyncLog{fakeHeaderSyncLog}
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = fakeHeaderSyncLogs mockLogRepository.ReturnLogs = fakeHeaderSyncLogs
delegator := newDelegator(mockLogRepository) delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
err := delegator.DelegateLogs() go delegator.DelegateLogs(errsChan, logsFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() bool {
Expect(fakeTransformer.ExecuteWasCalled).To(BeTrue()) return fakeTransformer.ExecuteWasCalled
Expect(fakeTransformer.PassedLogs).To(Equal(fakeHeaderSyncLogs)) }).Should(BeTrue())
Eventually(func() []core.HeaderSyncLog {
return fakeTransformer.PassedLogs
}).Should(Equal(fakeHeaderSyncLogs))
close(done)
}) })
It("returns an error if transformer returns an error", func() { It("emits error if transformer returns an error", func(done Done) {
delegator := newDelegator(&fakes.MockHeaderSyncLogRepository{}) mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = []core.HeaderSyncLog{{}}
delegator := newDelegator(mockLogRepository)
fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError} fakeTransformer := &mocks.MockEventTransformer{ExecuteError: fakes.FakeError}
delegator.AddTransformer(fakeTransformer) delegator.AddTransformer(fakeTransformer)
err := delegator.DelegateLogs() go delegator.DelegateLogs(errsChan, logsFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
})
It("emits logs found when logs returned and delegated", func(done Done) {
fakeTransformer := &mocks.MockEventTransformer{}
config := mocks.FakeTransformerConfig
fakeTransformer.SetTransformerConfig(config)
fakeGethLog := types.Log{
Address: common.HexToAddress(config.ContractAddresses[0]),
Topics: []common.Hash{common.HexToHash(config.Topic)},
}
fakeHeaderSyncLogs := []core.HeaderSyncLog{{Log: fakeGethLog}}
mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.ReturnLogs = fakeHeaderSyncLogs
delegator := newDelegator(mockLogRepository)
delegator.AddTransformer(fakeTransformer)
go delegator.DelegateLogs(errsChan, logsFound)
Expect(<-logsFound).To(BeTrue())
close(done)
}) })
}) })
}) })
func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) logs.LogDelegator { func newDelegator(headerSyncLogRepository *fakes.MockHeaderSyncLogRepository) *logs.LogDelegator {
return logs.LogDelegator{ return &logs.LogDelegator{
Chunker: chunker.NewLogChunker(), Chunker: chunker.NewLogChunker(),
LogRepository: headerSyncLogRepository, LogRepository: headerSyncLogRepository,
} }

View File

@ -32,7 +32,7 @@ var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the l
type ILogExtractor interface { type ILogExtractor interface {
AddTransformerConfig(config transformer.EventTransformerConfig) AddTransformerConfig(config transformer.EventTransformerConfig)
ExtractLogs(recheckHeaders constants.TransformerExecution) error ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool)
} }
type LogExtractor struct { type LogExtractor struct {
@ -59,46 +59,56 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran
} }
// Fetch and persist watched logs // Fetch and persist watched logs
func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { func (extractor LogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) {
if len(extractor.Addresses) < 1 { if len(extractor.Addresses) < 1 {
logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error()) logrus.Errorf("error extracting logs: %s", ErrNoWatchedAddresses.Error())
return ErrNoWatchedAddresses errs <- ErrNoWatchedAddresses
return
} }
missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders)) missingHeaders, missingHeadersErr := extractor.CheckedHeadersRepository.MissingHeaders(*extractor.StartingBlock, -1, getCheckCount(recheckHeaders))
if missingHeadersErr != nil { if missingHeadersErr != nil {
logrus.Errorf("error fetching missing headers: %s", missingHeadersErr) logrus.Errorf("error fetching missing headers: %s", missingHeadersErr)
return missingHeadersErr errs <- missingHeadersErr
return
}
if len(missingHeaders) < 1 {
missingHeadersFound <- false
return
} }
for _, header := range missingHeaders { for _, header := range missingHeaders {
logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header) logs, fetchLogsErr := extractor.Fetcher.FetchLogs(extractor.Addresses, extractor.Topics, header)
if fetchLogsErr != nil { if fetchLogsErr != nil {
logError("error fetching logs for header: %s", fetchLogsErr, header) logError("error fetching logs for header: %s", fetchLogsErr, header)
return fetchLogsErr errs <- fetchLogsErr
return
} }
if len(logs) > 0 { if len(logs) > 0 {
transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs) transactionsSyncErr := extractor.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil { if transactionsSyncErr != nil {
logError("error syncing transactions: %s", transactionsSyncErr, header) logError("error syncing transactions: %s", transactionsSyncErr, header)
return transactionsSyncErr errs <- transactionsSyncErr
return
} }
createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs) createLogsErr := extractor.LogRepository.CreateHeaderSyncLogs(header.Id, logs)
if createLogsErr != nil { if createLogsErr != nil {
logError("error persisting logs: %s", createLogsErr, header) logError("error persisting logs: %s", createLogsErr, header)
return createLogsErr errs <- createLogsErr
return
} }
} }
markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id) markHeaderCheckedErr := extractor.CheckedHeadersRepository.MarkHeaderChecked(header.Id)
if markHeaderCheckedErr != nil { if markHeaderCheckedErr != nil {
logError("error marking header checked: %s", markHeaderCheckedErr, header) logError("error marking header checked: %s", markHeaderCheckedErr, header)
return markHeaderCheckedErr errs <- markHeaderCheckedErr
} }
} }
return nil missingHeadersFound <- true
} }
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {

View File

@ -81,201 +81,270 @@ var _ = Describe("Log extractor", func() {
}) })
Describe("ExtractLogs", func() { Describe("ExtractLogs", func() {
It("returns error if no watched addresses configured", func() { var (
err := extractor.ExtractLogs(constants.HeaderMissing) errsChan chan error
missingHeadersFound chan bool
)
Expect(err).To(HaveOccurred()) BeforeEach(func() {
Expect(err).To(MatchError(logs.ErrNoWatchedAddresses)) errsChan = make(chan error)
missingHeadersFound = make(chan bool)
})
It("returns error if no watched addresses configured", func(done Done) {
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(<-errsChan).To(MatchError(logs.ErrNoWatchedAddresses))
close(done)
}) })
Describe("when checking missing headers", func() { Describe("when checking missing headers", func() {
It("gets missing headers since configured starting block with check count < 1", func() { It("gets missing headers since configured starting block with check_count < 1", func(done Done) {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() int64 {
Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) return mockCheckedHeadersRepository.StartingBlockNumber
Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) }).Should(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1))) Eventually(func() int64 {
return mockCheckedHeadersRepository.EndingBlockNumber
}).Should(Equal(int64(-1)))
Eventually(func() int64 {
return mockCheckedHeadersRepository.CheckCount
}).Should(Equal(int64(1)))
close(done)
}) })
}) })
Describe("when rechecking headers", func() { Describe("when rechecking headers", func() {
It("gets missing headers since configured starting block with check count < 1", func() { It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func(done Done) {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63() startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
err := extractor.ExtractLogs(constants.HeaderRecheck) go extractor.ExtractLogs(constants.HeaderRecheck, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() int64 {
Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) return mockCheckedHeadersRepository.StartingBlockNumber
Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) }).Should(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap)) Eventually(func() int64 {
return mockCheckedHeadersRepository.EndingBlockNumber
}).Should(Equal(int64(-1)))
Eventually(func() int64 {
return mockCheckedHeadersRepository.CheckCount
}).Should(Equal(constants.RecheckHeaderCap))
close(done)
}) })
}) })
It("returns error if getting missing headers fails", func() { It("emits error if getting missing headers fails", func(done Done) {
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError mockCheckedHeadersRepository.MissingHeadersReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
}) })
It("does not fetch logs if no missing headers", func() { Describe("when no missing headers", func() {
addTransformerConfig(extractor) It("does not fetch logs", func(done Done) {
mockLogFetcher := &mocks.MockLogFetcher{} addTransformerConfig(extractor)
extractor.Fetcher = mockLogFetcher mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Consistently(func() bool {
Expect(mockLogFetcher.FetchCalled).To(BeFalse()) return mockLogFetcher.FetchCalled
}).Should(BeFalse())
close(done)
})
It("emits that no missing headers were found", func(done Done) {
addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(<-missingHeadersFound).To(BeFalse())
close(done)
})
}) })
It("fetches logs for missing headers", func() { Describe("when there are missing headers", func() {
addMissingHeader(extractor) It("fetches logs for missing headers", func(done Done) {
config := transformer.EventTransformerConfig{ addMissingHeader(extractor)
ContractAddresses: []string{fakes.FakeAddress.Hex()}, config := transformer.EventTransformerConfig{
Topic: fakes.FakeHash.Hex(), ContractAddresses: []string{fakes.FakeAddress.Hex()},
StartingBlockNumber: rand.Int63(), Topic: fakes.FakeHash.Hex(),
} StartingBlockNumber: rand.Int63(),
extractor.AddTransformerConfig(config) }
mockLogFetcher := &mocks.MockLogFetcher{} extractor.AddTransformerConfig(config)
extractor.Fetcher = mockLogFetcher mockLogFetcher := &mocks.MockLogFetcher{}
extractor.Fetcher = mockLogFetcher
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() bool {
Expect(mockLogFetcher.FetchCalled).To(BeTrue()) return mockLogFetcher.FetchCalled
Expect(mockLogFetcher.Topics).To(Equal([]common.Hash{common.HexToHash(config.Topic)})) }).Should(BeTrue())
Expect(mockLogFetcher.ContractAddresses).To(Equal(transformer.HexStringsToAddresses(config.ContractAddresses))) expectedTopics := []common.Hash{common.HexToHash(config.Topic)}
}) Eventually(func() []common.Hash {
return mockLogFetcher.Topics
}).Should(Equal(expectedTopics))
expectedAddresses := transformer.HexStringsToAddresses(config.ContractAddresses)
Eventually(func() []common.Address {
return mockLogFetcher.ContractAddresses
}).Should(Equal(expectedAddresses))
close(done)
})
It("returns error if fetching logs fails", func() { It("returns error if fetching logs fails", func(done Done) {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogFetcher := &mocks.MockLogFetcher{} mockLogFetcher := &mocks.MockLogFetcher{}
mockLogFetcher.ReturnError = fakes.FakeError mockLogFetcher.ReturnError = fakes.FakeError
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
}) })
It("does not sync transactions if no fetched logs", func() { Describe("when no fetched logs", func() {
addMissingHeader(extractor) It("does not sync transactions", func(done Done) {
addTransformerConfig(extractor) addMissingHeader(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{} addTransformerConfig(extractor)
extractor.Syncer = mockTransactionSyncer mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Consistently(func() bool {
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeFalse()) return mockTransactionSyncer.SyncTransactionsCalled
}) }).Should(BeFalse())
close(done)
})
})
It("syncs transactions for fetched logs", func() { Describe("when there are fetched logs", func() {
addMissingHeader(extractor) It("syncs transactions", func(done Done) {
addFetchedLog(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addFetchedLog(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{} addTransformerConfig(extractor)
extractor.Syncer = mockTransactionSyncer mockTransactionSyncer := &fakes.MockTransactionSyncer{}
extractor.Syncer = mockTransactionSyncer
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() bool {
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue()) return mockTransactionSyncer.SyncTransactionsCalled
}) }).Should(BeTrue())
close(done)
})
It("returns error if syncing transactions fails", func() { It("returns error if syncing transactions fails", func(done Done) {
addMissingHeader(extractor) addMissingHeader(extractor)
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer := &fakes.MockTransactionSyncer{}
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
extractor.Syncer = mockTransactionSyncer extractor.Syncer = mockTransactionSyncer
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
}) })
It("persists fetched logs", func() { It("persists fetched logs", func(done Done) {
addMissingHeader(extractor) addMissingHeader(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
fakeLogs := []types.Log{{ fakeLogs := []types.Log{{
Address: common.HexToAddress("0xA"), Address: common.HexToAddress("0xA"),
Topics: []common.Hash{common.HexToHash("0xA")}, Topics: []common.Hash{common.HexToHash("0xA")},
Data: []byte{}, Data: []byte{},
Index: 0, Index: 0,
}} }}
mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs} mockLogFetcher := &mocks.MockLogFetcher{ReturnLogs: fakeLogs}
extractor.Fetcher = mockLogFetcher extractor.Fetcher = mockLogFetcher
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() []types.Log {
Expect(mockLogRepository.PassedLogs).To(Equal(fakeLogs)) return mockLogRepository.PassedLogs
}) }).Should(Equal(fakeLogs))
close(done)
})
It("returns error if persisting logs fails", func() { It("returns error if persisting logs fails", func(done Done) {
addMissingHeader(extractor) addMissingHeader(extractor)
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockLogRepository := &fakes.MockHeaderSyncLogRepository{} mockLogRepository := &fakes.MockHeaderSyncLogRepository{}
mockLogRepository.CreateError = fakes.FakeError mockLogRepository.CreateError = fakes.FakeError
extractor.LogRepository = mockLogRepository extractor.LogRepository = mockLogRepository
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
}) })
})
It("marks header checked", func() { It("marks header checked", func(done Done) {
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
headerID := rand.Int63() headerID := rand.Int63()
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).NotTo(HaveOccurred()) Eventually(func() int64 {
Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID)) return mockCheckedHeadersRepository.HeaderID
}) }).Should(Equal(headerID))
close(done)
})
It("returns error if marking header checked fails", func() { It("returns error if marking header checked fails", func(done Done) {
addFetchedLog(extractor) addFetchedLog(extractor)
addTransformerConfig(extractor) addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}} mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}}
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err := extractor.ExtractLogs(constants.HeaderMissing) go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(err).To(HaveOccurred()) Expect(<-errsChan).To(MatchError(fakes.FakeError))
Expect(err).To(MatchError(fakes.FakeError)) close(done)
})
It("emits that missing headers were found", func(done Done) {
addMissingHeader(extractor)
addTransformerConfig(extractor)
go extractor.ExtractLogs(constants.HeaderMissing, errsChan, missingHeadersFound)
Expect(<-missingHeadersFound).To(BeTrue())
close(done)
})
}) })
}) })
}) })

View File

@ -22,15 +22,23 @@ import (
type MockLogDelegator struct { type MockLogDelegator struct {
AddedTransformers []transformer.EventTransformer AddedTransformers []transformer.EventTransformer
DelegateCalled bool DelegateCallCount int
DelegateError error DelegateErrors []error
LogsFound []bool
} }
func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) { func (delegator *MockLogDelegator) AddTransformer(t transformer.EventTransformer) {
delegator.AddedTransformers = append(delegator.AddedTransformers, t) delegator.AddedTransformers = append(delegator.AddedTransformers, t)
} }
func (delegator *MockLogDelegator) DelegateLogs() error { func (delegator *MockLogDelegator) DelegateLogs(errs chan error, logsFound chan bool) {
delegator.DelegateCalled = true delegator.DelegateCallCount++
return delegator.DelegateError var delegateErrorThisRun error
delegateErrorThisRun, delegator.DelegateErrors = delegator.DelegateErrors[0], delegator.DelegateErrors[1:]
if delegateErrorThisRun != nil {
errs <- delegateErrorThisRun
}
var logsFoundThisRun bool
logsFoundThisRun, delegator.LogsFound = delegator.LogsFound[0], delegator.LogsFound[1:]
logsFound <- logsFoundThisRun
} }

View File

@ -22,16 +22,24 @@ import (
) )
type MockLogExtractor struct { type MockLogExtractor struct {
AddedConfigs []transformer.EventTransformerConfig AddedConfigs []transformer.EventTransformerConfig
ExtractLogsCalled bool ExtractLogsCount int
ExtractLogsError error ExtractLogsErrors []error
MissingHeadersExist []bool
} }
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) { func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) {
extractor.AddedConfigs = append(extractor.AddedConfigs, config) extractor.AddedConfigs = append(extractor.AddedConfigs, config)
} }
func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) error { func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution, errs chan error, missingHeadersFound chan bool) {
extractor.ExtractLogsCalled = true extractor.ExtractLogsCount++
return extractor.ExtractLogsError var errorThisRun error
errorThisRun, extractor.ExtractLogsErrors = extractor.ExtractLogsErrors[0], extractor.ExtractLogsErrors[1:]
if errorThisRun != nil {
errs <- errorThisRun
}
var missingHeadersExist bool
missingHeadersExist, extractor.MissingHeadersExist = extractor.MissingHeadersExist[0], extractor.MissingHeadersExist[1:]
missingHeadersFound <- missingHeadersExist
} }

View File

@ -27,8 +27,11 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"time"
) )
const NoNewDataPause = time.Second * 7
type EventWatcher struct { type EventWatcher struct {
blockChain core.BlockChain blockChain core.BlockChain
db *postgres.DB db *postgres.DB
@ -66,18 +69,62 @@ func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTra
} }
// Extracts and delegates watched log events. // Extracts and delegates watched log events.
func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution) error { func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecution, errsChan chan error) {
extractErr := watcher.LogExtractor.ExtractLogs(recheckHeaders) extractErrsChan := make(chan error)
if extractErr != nil { delegateErrsChan := make(chan error)
logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error())
return extractErr
}
delegateErr := watcher.LogDelegator.DelegateLogs() go watcher.extractLogs(recheckHeaders, extractErrsChan)
if delegateErr != nil { go watcher.delegateLogs(delegateErrsChan)
logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error())
return delegateErr for {
} select {
case extractErr := <-extractErrsChan:
logrus.Errorf("error extracting logs in event watcher: %s", extractErr.Error())
errsChan <- extractErr
case delegateErr := <-delegateErrsChan:
logrus.Errorf("error delegating logs in event watcher: %s", delegateErr.Error())
errsChan <- delegateErr
}
}
}
func (watcher *EventWatcher) extractLogs(recheckHeaders constants.TransformerExecution, errs chan error) {
extractLogsErr := make(chan error)
missingHeadersFound := make(chan bool)
go watcher.LogExtractor.ExtractLogs(recheckHeaders, extractLogsErr, missingHeadersFound)
for {
select {
case err := <-extractLogsErr:
errs <- err
case missingHeaders := <-missingHeadersFound:
if missingHeaders {
go watcher.extractLogs(recheckHeaders, errs)
} else {
time.Sleep(NoNewDataPause)
go watcher.extractLogs(recheckHeaders, errs)
}
}
}
}
func (watcher *EventWatcher) delegateLogs(errs chan error) {
delegateLogsErr := make(chan error)
logsFound := make(chan bool)
go watcher.LogDelegator.DelegateLogs(delegateLogsErr, logsFound)
for {
select {
case err := <-delegateLogsErr:
errs <- err
case logs := <-logsFound:
if logs {
go watcher.delegateLogs(errs)
} else {
time.Sleep(NoNewDataPause)
go watcher.delegateLogs(errs)
}
}
}
return nil
} }

View File

@ -30,13 +30,13 @@ var _ = Describe("Event Watcher", func() {
var ( var (
delegator *mocks.MockLogDelegator delegator *mocks.MockLogDelegator
extractor *mocks.MockLogExtractor extractor *mocks.MockLogExtractor
eventWatcher watcher.EventWatcher eventWatcher *watcher.EventWatcher
) )
BeforeEach(func() { BeforeEach(func() {
delegator = &mocks.MockLogDelegator{} delegator = &mocks.MockLogDelegator{}
extractor = &mocks.MockLogExtractor{} extractor = &mocks.MockLogExtractor{}
eventWatcher = watcher.EventWatcher{ eventWatcher = &watcher.EventWatcher{
LogDelegator: delegator, LogDelegator: delegator,
LogExtractor: extractor, LogExtractor: extractor,
} }
@ -78,36 +78,115 @@ var _ = Describe("Event Watcher", func() {
}) })
Describe("Execute", func() { Describe("Execute", func() {
It("extracts watched logs", func() { var errsChan chan error
err := eventWatcher.Execute(constants.HeaderMissing)
Expect(err).NotTo(HaveOccurred()) BeforeEach(func() {
Expect(extractor.ExtractLogsCalled).To(BeTrue()) errsChan = make(chan error)
}) })
It("returns error if extracting logs fails", func() { It("extracts watched logs", func(done Done) {
extractor.ExtractLogsError = fakes.FakeError delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil}
extractor.MissingHeadersExist = []bool{false}
err := eventWatcher.Execute(constants.HeaderMissing) go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(err).To(HaveOccurred()) Eventually(func() int {
Expect(err).To(MatchError(fakes.FakeError)) return extractor.ExtractLogsCount
}).Should(Equal(1))
close(done)
}) })
It("delegates untransformed logs", func() { It("returns error if extracting logs fails", func(done Done) {
err := eventWatcher.Execute(constants.HeaderMissing) delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{fakes.FakeError}
extractor.MissingHeadersExist = []bool{false}
Expect(err).NotTo(HaveOccurred()) go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(delegator.DelegateCalled).To(BeTrue())
Expect(<-errsChan).To(MatchError(fakes.FakeError))
close(done)
}) })
It("returns error if delegating logs fails", func() { It("extracts watched logs again if missing headers found", func(done Done) {
delegator.DelegateError = fakes.FakeError delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil, nil}
extractor.MissingHeadersExist = []bool{true, false}
err := eventWatcher.Execute(constants.HeaderMissing) go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(err).To(HaveOccurred()) Eventually(func() int {
Expect(err).To(MatchError(fakes.FakeError)) return extractor.ExtractLogsCount
}).Should(Equal(2))
close(done)
})
It("returns error if extracting logs fails on subsequent run", func(done Done) {
delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil, fakes.FakeError}
extractor.MissingHeadersExist = []bool{true, false}
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(<-errsChan).To(MatchError(fakes.FakeError))
close(done)
})
It("delegates untransformed logs", func(done Done) {
delegator.DelegateErrors = []error{nil}
delegator.LogsFound = []bool{false}
extractor.ExtractLogsErrors = []error{nil}
extractor.MissingHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Eventually(func() int {
return delegator.DelegateCallCount
}).Should(Equal(1))
close(done)
})
It("returns error if delegating logs fails", func(done Done) {
delegator.LogsFound = []bool{false}
delegator.DelegateErrors = []error{fakes.FakeError}
extractor.ExtractLogsErrors = []error{nil}
extractor.MissingHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(<-errsChan).To(MatchError(fakes.FakeError))
close(done)
})
It("delegates logs again if untransformed logs found", func(done Done) {
delegator.DelegateErrors = []error{nil, nil}
delegator.LogsFound = []bool{true, false}
extractor.ExtractLogsErrors = []error{nil}
extractor.MissingHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Eventually(func() int {
return delegator.DelegateCallCount
}).Should(Equal(2))
close(done)
})
It("returns error if delegating logs fails on subsequent run", func(done Done) {
delegator.DelegateErrors = []error{nil, fakes.FakeError}
delegator.LogsFound = []bool{true, false}
extractor.ExtractLogsErrors = []error{nil}
extractor.MissingHeadersExist = []bool{false}
go eventWatcher.Execute(constants.HeaderMissing, errsChan)
Expect(<-errsChan).To(MatchError(fakes.FakeError))
close(done)
}) })
}) })
}) })

View File

@ -52,8 +52,7 @@ func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingB
LEFT JOIN checked_headers on headers.id = header_id LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR check_count < $2) WHERE (header_id ISNULL OR check_count < $2)
AND headers.block_number >= $1 AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $3 AND headers.eth_node_fingerprint = $3`
LIMIT 100`
err = repo.db.Select(&result, query, startingBlockNumber, checkCount, repo.db.Node.ID) err = repo.db.Select(&result, query, startingBlockNumber, checkCount, repo.db.Node.ID)
} else { } else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
@ -61,8 +60,7 @@ func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingB
WHERE (header_id ISNULL OR check_count < $3) WHERE (header_id ISNULL OR check_count < $3)
AND headers.block_number >= $1 AND headers.block_number >= $1
AND headers.block_number <= $2 AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $4 AND headers.eth_node_fingerprint = $4`
LIMIT 100`
err = repo.db.Select(&result, query, startingBlockNumber, endingBlockNumber, checkCount, repo.db.Node.ID) err = repo.db.Select(&result, query, startingBlockNumber, endingBlockNumber, checkCount, repo.db.Node.ID)
} }

View File

@ -170,18 +170,6 @@ var _ = Describe("Checked Headers repository", func() {
Expect(nodeTwoMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10))) Expect(nodeTwoMissingHeaders[1].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10)))
Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10))) Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10)))
}) })
It("only returns 100 results to prevent blocking log delegation", func() {
for n := outOfRangeBlockNumber + 1; n < outOfRangeBlockNumber+100; n++ {
_, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
}
missingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlockNumber+200, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(100))
})
}) })
Describe("when ending block is -1", func() { Describe("when ending block is -1", func() {
@ -252,19 +240,6 @@ var _ = Describe("Checked Headers repository", func() {
Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10))) Expect(nodeTwoMissingHeaders[2].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10)))
Expect(nodeTwoMissingHeaders[3].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10))) Expect(nodeTwoMissingHeaders[3].BlockNumber).To(Or(Equal(startingBlockNumber+10), Equal(middleBlockNumber+10), Equal(endingBlockNumber+10), Equal(outOfRangeBlockNumber+10)))
}) })
It("only returns 100 results to prevent blocking log delegation", func() {
for n := outOfRangeBlockNumber + 1; n < outOfRangeBlockNumber+100; n++ {
_, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
}
missingHeaders, err := repo.MissingHeaders(startingBlockNumber, endingBlock, uncheckedCheckCount)
Expect(err).NotTo(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(100))
})
}) })
}) })
}) })