Extract storage diff fetching behind an interface

- Replaces directly reading from a CSV
- Simplifies testing
- Should hopefully make it easier to plug in other sources for storage
  diffs (e.g. differently formatted CSVs, JSON RPC, etc)
This commit is contained in:
Rob Mulholand 2019-04-24 15:05:57 -05:00
parent c93e6adb27
commit 2d684c5aec
15 changed files with 279 additions and 224 deletions

View File

@ -16,16 +16,19 @@
package cmd package cmd
import ( import (
"os"
"plugin"
syn "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
"os"
"plugin"
syn "sync"
) )
// composeAndExecuteCmd represents the composeAndExecute command // composeAndExecuteCmd represents the composeAndExecute command
@ -170,7 +173,8 @@ func composeAndExecute() {
if len(ethStorageInitializers) > 0 { if len(ethStorageInitializers) > 0 {
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
sw := watcher.NewStorageWatcher(tailer, &db) storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)

View File

@ -26,6 +26,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/pkg/fs"
@ -118,7 +119,8 @@ func execute() {
if len(ethStorageInitializers) > 0 { if len(ethStorageInitializers) > 0 {
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
sw := watcher.NewStorageWatcher(tailer, &db) storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)

View File

@ -24,22 +24,22 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
type LogFetcher interface { type ILogFetcher interface {
FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error) FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error)
} }
type Fetcher struct { type LogFetcher struct {
blockChain core.BlockChain blockChain core.BlockChain
} }
func NewFetcher(blockchain core.BlockChain) *Fetcher { func NewLogFetcher(blockchain core.BlockChain) *LogFetcher {
return &Fetcher{ return &LogFetcher{
blockChain: blockchain, blockChain: blockchain,
} }
} }
// Checks all topic0s, on all addresses, fetching matching logs for the given header // Checks all topic0s, on all addresses, fetching matching logs for the given header
func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) { func (logFetcher LogFetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) {
blockHash := common.HexToHash(header.Hash) blockHash := common.HexToHash(header.Hash)
query := ethereum.FilterQuery{ query := ethereum.FilterQuery{
BlockHash: &blockHash, BlockHash: &blockHash,
@ -48,7 +48,7 @@ func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Ha
Topics: [][]common.Hash{topic0s}, Topics: [][]common.Hash{topic0s},
} }
logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query) logs, err := logFetcher.blockChain.GetEthLogsWithCustomQuery(query)
if err != nil { if err != nil {
// TODO review aggregate fetching error handling // TODO review aggregate fetching error handling
return []types.Log{}, err return []types.Log{}, err

View File

@ -22,16 +22,16 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
fetch "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
) )
var _ = Describe("Fetcher", func() { var _ = Describe("LogFetcher", func() {
Describe("FetchLogs", func() { Describe("FetchLogs", func() {
It("fetches logs based on the given query", func() { It("fetches logs based on the given query", func() {
blockChain := fakes.NewMockBlockChain() blockChain := fakes.NewMockBlockChain()
fetcher := fetch.NewFetcher(blockChain) logFetcher := fetcher.NewLogFetcher(blockChain)
header := fakes.FakeHeader header := fakes.FakeHeader
addresses := []common.Address{ addresses := []common.Address{
@ -41,7 +41,7 @@ var _ = Describe("Fetcher", func() {
topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})} topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})}
_, err := fetcher.FetchLogs(addresses, topicZeros, header) _, err := logFetcher.FetchLogs(addresses, topicZeros, header)
address1 := common.HexToAddress("0xfakeAddress") address1 := common.HexToAddress("0xfakeAddress")
address2 := common.HexToAddress("0xanotherFakeAddress") address2 := common.HexToAddress("0xanotherFakeAddress")
@ -59,9 +59,9 @@ var _ = Describe("Fetcher", func() {
It("returns an error if fetching the logs fails", func() { It("returns an error if fetching the logs fails", func() {
blockChain := fakes.NewMockBlockChain() blockChain := fakes.NewMockBlockChain()
blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError)
fetcher := fetch.NewFetcher(blockChain) logFetcher := fetcher.NewLogFetcher(blockChain)
_, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{}) _, err := logFetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))

View File

@ -0,0 +1,33 @@
package fetcher
import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"strings"
)
type IStorageFetcher interface {
FetchStorageDiffs(chan<- utils.StorageDiffRow, chan<- error)
}
type CsvTailStorageFetcher struct {
tailer fs.Tailer
}
func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher {
return CsvTailStorageFetcher{tailer: tailer}
}
func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
t, tailErr := storageFetcher.tailer.Tail()
if tailErr != nil {
errs <- tailErr
}
for line := range t.Lines {
row, parseErr := utils.FromStrings(strings.Split(line.Text, ","))
if parseErr != nil {
errs <- parseErr
}
out <- row
}
}

View File

@ -0,0 +1,81 @@
package fetcher_test
import (
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/hpcloud/tail"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
)
var _ = Describe("Csv Tail Storage Fetcher", func() {
var (
errorsChannel chan error
mockTailer *fakes.MockTailer
rowsChannel chan utils.StorageDiffRow
storageFetcher fetcher.CsvTailStorageFetcher
)
BeforeEach(func() {
errorsChannel = make(chan error)
rowsChannel = make(chan utils.StorageDiffRow)
mockTailer = fakes.NewMockTailer()
storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer)
})
It("adds error to errors channel if tailing file fails", func() {
mockTailer.TailErr = fakes.FakeError
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
close(mockTailer.Lines)
returnedErr := <-errorsChannel
Expect(returnedErr).To(HaveOccurred())
Expect(returnedErr).To(MatchError(fakes.FakeError))
})
It("adds parsed csv row to rows channel for storage diff", func() {
line := getFakeLine()
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
mockTailer.Lines <- line
close(mockTailer.Lines)
returnedRow := <-rowsChannel
expectedRow, err := utils.FromStrings(strings.Split(line.Text, ","))
Expect(err).NotTo(HaveOccurred())
Expect(expectedRow).To(Equal(returnedRow))
})
It("adds error to errors channel if parsing csv fails", func() {
line := &tail.Line{Text: "invalid"}
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
mockTailer.Lines <- line
close(mockTailer.Lines)
returnedErr := <-errorsChannel
Expect(returnedErr).To(HaveOccurred())
})
})
func getFakeLine() *tail.Line {
address := common.HexToAddress("0x1234567890abcdef")
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
return &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address.Bytes()), common.Bytes2Hex(blockHash),
blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
}

View File

@ -53,6 +53,6 @@ func (converter *MockConverter) SetToEntityConverterError(err error) {
converter.entityConverterError = err converter.entityConverterError = err
} }
func (c *MockConverter) SetToModelConverterError(err error) { func (converter *MockConverter) SetToModelConverterError(err error) {
c.modelConverterError = err converter.modelConverterError = err
} }

View File

@ -0,0 +1,23 @@
package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
type MockStorageFetcher struct {
RowsToReturn []utils.StorageDiffRow
ErrsToReturn []error
}
func NewMockStorageFetcher() *MockStorageFetcher {
return &MockStorageFetcher{}
}
func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
for _, row := range fetcher.RowsToReturn {
out <- row
}
close(out)
close(errs)
}

View File

@ -23,9 +23,11 @@ import (
type MockStorageQueue struct { type MockStorageQueue struct {
AddCalled bool AddCalled bool
AddError error AddError error
PassedRow utils.StorageDiffRow
} }
func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error {
queue.AddCalled = true queue.AddCalled = true
queue.PassedRow = row
return queue.AddError return queue.AddError
} }

View File

@ -54,10 +54,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
watcher.Transformers = append(watcher.Transformers, t) watcher.Transformers = append(watcher.Transformers, t)
} }
for _, transformer := range watcher.Transformers { for _, contractTransformer := range watcher.Transformers {
err := transformer.Init() err := contractTransformer.Init()
if err != nil { if err != nil {
log.Print("Unable to initialize transformer:", transformer.GetConfig().Name, err) log.Print("Unable to initialize transformer:", contractTransformer.GetConfig().Name, err)
return err return err
} }
} }
@ -65,10 +65,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
} }
func (watcher *ContractWatcher) Execute() error { func (watcher *ContractWatcher) Execute() error {
for _, transformer := range watcher.Transformers { for _, contractTransformer := range watcher.Transformers {
err := transformer.Execute() err := contractTransformer.Execute()
if err != nil { if err != nil {
log.Error("Unable to execute transformer:", transformer.GetConfig().Name, err) log.Error("Unable to execute transformer:", contractTransformer.GetConfig().Name, err)
return err return err
} }
} }

View File

@ -37,7 +37,7 @@ type EventWatcher struct {
Transformers []transformer.EventTransformer Transformers []transformer.EventTransformer
BlockChain core.BlockChain BlockChain core.BlockChain
DB *postgres.DB DB *postgres.DB
Fetcher fetcher.LogFetcher Fetcher fetcher.ILogFetcher
Chunker chunker.Chunker Chunker chunker.Chunker
Addresses []common.Address Addresses []common.Address
Topics []common.Hash Topics []common.Hash
@ -47,7 +47,7 @@ type EventWatcher struct {
func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
logChunker := chunker.NewLogChunker() logChunker := chunker.NewLogChunker()
logFetcher := fetcher.NewFetcher(bc) logFetcher := fetcher.NewLogFetcher(bc)
transactionSyncer := transactions.NewTransactionsSyncer(db, bc) transactionSyncer := transactions.NewTransactionsSyncer(db, bc)
return EventWatcher{ return EventWatcher{
BlockChain: bc, BlockChain: bc,

View File

@ -18,33 +18,32 @@ package watcher
import ( import (
"reflect" "reflect"
"strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/fs"
) )
type StorageWatcher struct { type StorageWatcher struct {
db *postgres.DB db *postgres.DB
tailer fs.Tailer StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
Transformers map[common.Address]transformer.StorageTransformer Transformers map[common.Address]transformer.StorageTransformer
} }
func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher { func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
transformers := make(map[common.Address]transformer.StorageTransformer) transformers := make(map[common.Address]transformer.StorageTransformer)
queue := storage.NewStorageQueue(db) queue := storage.NewStorageQueue(db)
return StorageWatcher{ return StorageWatcher{
db: db, db: db,
tailer: tailer, StorageFetcher: fetcher,
Queue: queue, Queue: queue,
Transformers: transformers, Transformers: transformers,
} }
} }
@ -56,34 +55,36 @@ func (watcher StorageWatcher) AddTransformers(initializers []transformer.Storage
} }
func (watcher StorageWatcher) Execute() error { func (watcher StorageWatcher) Execute() error {
t, tailErr := watcher.tailer.Tail() rows := make(chan utils.StorageDiffRow)
if tailErr != nil { errs := make(chan error)
return tailErr go watcher.StorageFetcher.FetchStorageDiffs(rows, errs)
for {
select {
case row := <-rows:
watcher.processRow(row)
case err := <-errs:
return err
}
} }
for line := range t.Lines { }
row, parseErr := utils.FromStrings(strings.Split(line.Text, ","))
if parseErr != nil { func (watcher StorageWatcher) processRow(row utils.StorageDiffRow) {
return parseErr storageTransformer, ok := watcher.Transformers[row.Contract]
} if !ok {
storageTransformer, ok := watcher.Transformers[row.Contract] // ignore rows from unwatched contracts
if !ok { return
logrus.Warn(utils.ErrContractNotFound{Contract: row.Contract.Hex()}.Error()) }
continue executeErr := storageTransformer.Execute(row)
} if executeErr != nil {
executeErr := storageTransformer.Execute(row) if isKeyNotFound(executeErr) {
if executeErr != nil { queueErr := watcher.Queue.Add(row)
if isKeyNotFound(executeErr) { if queueErr != nil {
queueErr := watcher.Queue.Add(row) logrus.Warn(queueErr.Error())
if queueErr != nil {
logrus.Warn(queueErr.Error())
}
} else {
logrus.Warn(executeErr.Error())
} }
continue } else {
logrus.Warn(executeErr.Error())
} }
} }
return nil
} }
func isKeyNotFound(executeErr error) bool { func isKeyNotFound(executeErr error) bool {

View File

@ -17,15 +17,10 @@
package watcher_test package watcher_test
import ( import (
"errors"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strings"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/hpcloud/tail"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -34,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
@ -43,170 +37,87 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() { It("adds transformers", func() {
fakeAddress := common.HexToAddress("0x12345") fakeAddress := common.HexToAddress("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
w := watcher.NewStorageWatcher(&fakes.MockTailer{}, test_config.NewTestDB(core.Node{})) w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
}) })
It("reads the tail of the storage diffs file", func() { Describe("executing watcher", func() {
mockTailer := fakes.NewMockTailer() var (
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
row utils.StorageDiffRow
storageWatcher watcher.StorageWatcher
)
assert(func(err error) { BeforeEach(func() {
Expect(err).To(BeNil()) address := common.HexToAddress("0x0123456789abcdef")
Expect(mockTailer.TailCalled).To(BeTrue()) mockFetcher = mocks.NewMockStorageFetcher()
}, w, mockTailer, []*tail.Line{}) mockQueue = &mocks.MockStorageQueue{}
}) mockTransformer = &mocks.MockStorageTransformer{Address: address}
row = utils.StorageDiffRow{
Contract: address,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
mockFetcher.RowsToReturn = []utils.StorageDiffRow{row}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("returns error if row parsing fails", func() { It("executes transformer for recognized storage row", func() {
mockTailer := fakes.NewMockTailer() err := storageWatcher.Execute()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
line := &tail.Line{Text: "oops"}
assert(func(err error) { Expect(err).NotTo(HaveOccurred())
Expect(err).To(HaveOccurred()) Expect(mockTransformer.PassedRow).To(Equal(row))
Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) })
}, w, mockTailer, []*tail.Line{line})
})
It("logs error if no transformer can parse storage row", func() { It("queues row for later processing if row's key not recognized", func() {
mockTailer := fakes.NewMockTailer() mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
address := common.HexToAddress("0x12345")
line := getFakeLine(address.Bytes()) err := storageWatcher.Execute()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
tempFile, err := ioutil.TempFile("", "log") Expect(err).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name()) Expect(mockQueue.AddCalled).To(BeTrue())
Expect(err).NotTo(HaveOccurred()) Expect(mockQueue.PassedRow).To(Equal(row))
logrus.SetOutput(tempFile) })
It("logs error if queueing row fails", func() {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
err := storageWatcher.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
})
It("logs error if transformer execution fails for reason other than key not found", func() {
mockTransformer.ExecuteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
err := storageWatcher.Execute()
assert(func(err error) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name()) logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred()) Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(utils.ErrContractNotFound{Contract: address.Hex()}.Error())) Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
}, w, mockTailer, []*tail.Line{line})
})
It("executes transformer with storage row", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)}
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
assert(func(err error) {
Expect(err).To(BeNil())
expectedRow, err := utils.FromStrings(strings.Split(line.Text, ","))
Expect(err).NotTo(HaveOccurred())
Expect(fakeTransformer.PassedRow).To(Equal(expectedRow))
}, w, mockTailer, []*tail.Line{line})
})
Describe("when executing transformer fails", func() {
It("queues row when error is storage key not found", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockQueue := &mocks.MockStorageQueue{}
w.Queue = mockQueue
keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
}, w, mockTailer, []*tail.Line{line})
})
It("logs error if queuing row fails", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockQueue := &mocks.MockStorageQueue{}
mockQueue.AddError = fakes.FakeError
w.Queue = mockQueue
keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)
assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
}, w, mockTailer, []*tail.Line{line})
})
It("logs any other error", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
executionError := errors.New("storage watcher failed attempting to execute transformer")
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)
assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(executionError.Error()))
}, w, mockTailer, []*tail.Line{line})
}) })
}) })
}) })
func assert(assertion func(err error), watcher watcher.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) {
errs := make(chan error, 1)
done := make(chan bool, 1)
go execute(watcher, errs, done)
for _, line := range lines {
mockTailer.Lines <- line
}
close(mockTailer.Lines)
select {
case err := <-errs:
assertion(err)
break
case <-done:
assertion(nil)
break
}
}
func execute(w watcher.StorageWatcher, errs chan error, done chan bool) {
err := w.Execute()
if err != nil {
errs <- err
} else {
done <- true
}
}
func getFakeLine(address []byte) *tail.Line {
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
return &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
}

View File

@ -6,24 +6,22 @@ import (
) )
type MockTailer struct { type MockTailer struct {
Lines chan *tail.Line Lines chan *tail.Line
TailCalled bool TailErr error
} }
func NewMockTailer() *MockTailer { func NewMockTailer() *MockTailer {
return &MockTailer{ return &MockTailer{
Lines: make(chan *tail.Line, 1), Lines: make(chan *tail.Line, 1),
TailCalled: false,
} }
} }
func (mock *MockTailer) Tail() (*tail.Tail, error) { func (mock *MockTailer) Tail() (*tail.Tail, error) {
mock.TailCalled = true
fakeTail := &tail.Tail{ fakeTail := &tail.Tail{
Filename: "", Filename: "",
Lines: mock.Lines, Lines: mock.Lines,
Config: tail.Config{}, Config: tail.Config{},
Tomb: tomb.Tomb{}, Tomb: tomb.Tomb{},
} }
return fakeTail, nil return fakeTail, mock.TailErr
} }