(VDB-371) Recheck queued storage

- Iterate through queued storage at defined interval, popping rows
  from the queue if successfully persisted
This commit is contained in:
Rob Mulholand 2019-04-25 12:44:14 -05:00
parent bf4b1687a0
commit 6a86de87b4
8 changed files with 251 additions and 80 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
@ -168,7 +169,9 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
w.Execute()
errs := make(chan error)
rows := make(chan storageUtils.StorageDiffRow)
w.Execute(rows, errs, queueRecheckInterval)
}
}

View File

@ -50,6 +50,7 @@ var (
const (
pollingInterval = 7 * time.Second
queueRecheckInterval = 5 * time.Minute
validationWindow = 15
)

View File

@ -30,39 +30,35 @@ var _ = Describe("Csv Tail Storage Fetcher", func() {
storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer)
})
It("adds error to errors channel if tailing file fails", func() {
It("adds error to errors channel if tailing file fails", func(done Done) {
mockTailer.TailErr = fakes.FakeError
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
close(mockTailer.Lines)
returnedErr := <-errorsChannel
Expect(returnedErr).To(HaveOccurred())
Expect(returnedErr).To(MatchError(fakes.FakeError))
Expect(<-errorsChannel).To(MatchError(fakes.FakeError))
close(done)
})
It("adds parsed csv row to rows channel for storage diff", func() {
It("adds parsed csv row to rows channel for storage diff", func(done Done) {
line := getFakeLine()
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
mockTailer.Lines <- line
close(mockTailer.Lines)
returnedRow := <-rowsChannel
expectedRow, err := utils.FromStrings(strings.Split(line.Text, ","))
Expect(err).NotTo(HaveOccurred())
Expect(expectedRow).To(Equal(returnedRow))
Expect(<-rowsChannel).To(Equal(expectedRow))
close(done)
})
It("adds error to errors channel if parsing csv fails", func() {
It("adds error to errors channel if parsing csv fails", func(done Done) {
line := &tail.Line{Text: "invalid"}
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
mockTailer.Lines <- line
close(mockTailer.Lines)
returnedErr := <-errorsChannel
Expect(returnedErr).To(HaveOccurred())
Expect(<-errorsChannel).To(HaveOccurred())
close(done)
})
})

View File

@ -12,12 +12,12 @@ func NewMockStorageFetcher() *MockStorageFetcher {
}
func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
defer close(out)
defer close(errs)
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
for _, row := range fetcher.RowsToReturn {
out <- row
}
close(out)
close(errs)
}

View File

@ -23,19 +23,24 @@ import (
type MockStorageQueue struct {
AddCalled bool
AddError error
PassedRow utils.StorageDiffRow
AddPassedRow utils.StorageDiffRow
DeleteErr error
DeletePassedId int
GetAllErr error
RowsToReturn []utils.StorageDiffRow
}
func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error {
queue.AddCalled = true
queue.PassedRow = row
queue.AddPassedRow = row
return queue.AddError
}
func (queue *MockStorageQueue) Delete(id int) error {
panic("implement me")
queue.DeletePassedId = id
return queue.DeleteErr
}
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) {
panic("implement me")
return queue.RowsToReturn, queue.GetAllErr
}

View File

@ -4,10 +4,10 @@ import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/test_config"
)

View File

@ -17,7 +17,9 @@
package watcher
import (
"fmt"
"reflect"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
@ -47,29 +49,30 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage
}
}
func (watcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
for _, initializer := range initializers {
storageTransformer := initializer(watcher.db)
watcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer
storageTransformer := initializer(storageWatcher.db)
storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer
}
}
func (watcher StorageWatcher) Execute() error {
rows := make(chan utils.StorageDiffRow)
errs := make(chan error)
go watcher.StorageFetcher.FetchStorageDiffs(rows, errs)
func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, errs chan error, queueRecheckInterval time.Duration) {
ticker := time.NewTicker(queueRecheckInterval)
go storageWatcher.StorageFetcher.FetchStorageDiffs(rows, errs)
for {
select {
case fetchErr := <-errs:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
case row := <-rows:
watcher.processRow(row)
case err := <-errs:
return err
storageWatcher.processRow(row)
case <-ticker.C:
storageWatcher.processQueue()
}
}
}
func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) {
storageTransformer, ok := watcher.Transformers[row.Contract]
func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
storageTransformer, ok := storageWatcher.Transformers[row.Contract]
if !ok {
// ignore rows from unwatched contracts
return
@ -77,16 +80,42 @@ func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) {
executeErr := storageTransformer.Execute(row)
if executeErr != nil {
if isKeyNotFound(executeErr) {
queueErr := watcher.Queue.Add(row)
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
logrus.Warn(queueErr.Error())
logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr))
}
} else {
logrus.Warn(executeErr.Error())
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
}
}
}
func (storageWatcher StorageWatcher) processQueue() {
rows, fetchErr := storageWatcher.Queue.GetAll()
if fetchErr != nil {
logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr))
}
for _, row := range rows {
storageTransformer, ok := storageWatcher.Transformers[row.Contract]
if !ok {
// delete row from queue if address no longer watched
storageWatcher.deleteRow(row.Id)
continue
}
executeErr := storageTransformer.Execute(row)
if executeErr == nil {
storageWatcher.deleteRow(row.Id)
}
}
}
func (storageWatcher StorageWatcher) deleteRow(id int) {
deleteErr := storageWatcher.Queue.Delete(id)
if deleteErr != nil {
logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr))
}
}
func isKeyNotFound(executeErr error) bool {
return reflect.TypeOf(executeErr) == reflect.TypeOf(utils.ErrStorageKeyNotFound{})
}

View File

@ -19,6 +19,7 @@ package watcher_test
import (
"io/ioutil"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
@ -46,49 +47,84 @@ var _ = Describe("Storage Watcher", func() {
Describe("executing watcher", func() {
var (
errs chan error
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
row utils.StorageDiffRow
rows chan utils.StorageDiffRow
storageWatcher watcher.StorageWatcher
)
BeforeEach(func() {
errs = make(chan error)
rows = make(chan utils.StorageDiffRow)
address := common.HexToAddress("0x0123456789abcdef")
mockFetcher = mocks.NewMockStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{Address: address}
row = utils.StorageDiffRow{
Id: 1337,
Contract: address,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
It("logs error if fetching storage diffs fails", func(done Done) {
mockFetcher.ErrsToReturn = []error{fakes.FakeError}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Hour)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
Describe("transforming new storage diffs", func() {
BeforeEach(func() {
mockFetcher.RowsToReturn = []utils.StorageDiffRow{row}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for recognized storage row", func() {
err := storageWatcher.Execute()
It("executes transformer for recognized storage row", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(err).NotTo(HaveOccurred())
Expect(mockTransformer.PassedRow).To(Equal(row))
Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow
}).Should(Equal(row))
close(done)
})
It("queues row for later processing if row's key not recognized", func() {
It("queues row for later processing if row's key not recognized", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
err := storageWatcher.Execute()
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
Expect(mockQueue.PassedRow).To(Equal(row))
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiffRow {
return mockQueue.AddPassedRow
}).Should(Equal(row))
close(done)
})
It("logs error if queueing row fails", func() {
It("logs error if queueing row fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
@ -96,28 +132,129 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
err := storageWatcher.Execute()
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("logs error if transformer execution fails for reason other than key not found", func() {
It("logs error if transformer execution fails for reason other than key not found", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
err := storageWatcher.Execute()
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
Describe("transforming queued storage diffs", func() {
BeforeEach(func() {
mockQueue.RowsToReturn = []utils.StorageDiffRow{row}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("logs error if getting queued storage fails", func(done Done) {
mockQueue.GetAllErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("executes transformer for storage row", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow
}).Should(Equal(row))
close(done)
})
It("deletes row from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(row.Id))
close(done)
})
It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete row from queue if contract not recognized", func(done Done) {
obsoleteRow := utils.StorageDiffRow{
Id: row.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteRow.Id))
close(done)
})
It("logs error if deleting obsolete row fails", func(done Done) {
obsoleteRow := utils.StorageDiffRow{
Id: row.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
})