Handle different contract address format for geth vs csv

This commit is contained in:
Elizabeth Engelman 2019-07-10 14:34:14 -05:00
parent ee244ac6f5
commit 8c4a4d6587
2 changed files with 280 additions and 123 deletions

View File

@ -18,6 +18,7 @@ package watcher
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/crypto"
"reflect" "reflect"
"time" "time"
@ -33,6 +34,7 @@ import (
type StorageWatcher struct { type StorageWatcher struct {
db *postgres.DB db *postgres.DB
diffSource string
StorageFetcher fetcher.IStorageFetcher StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
Transformers map[common.Address]transformer.StorageTransformer Transformers map[common.Address]transformer.StorageTransformer
@ -43,12 +45,17 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage
queue := storage.NewStorageQueue(db) queue := storage.NewStorageQueue(db)
return StorageWatcher{ return StorageWatcher{
db: db, db: db,
diffSource: "csv",
StorageFetcher: fetcher, StorageFetcher: fetcher,
Queue: queue, Queue: queue,
Transformers: transformers, Transformers: transformers,
} }
} }
func (storageWatcher *StorageWatcher) SetStorageDiffSource(source string) {
storageWatcher.diffSource = source
}
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
for _, initializer := range initializers { for _, initializer := range initializers {
storageTransformer := initializer(storageWatcher.db) storageTransformer := initializer(storageWatcher.db)
@ -71,10 +78,29 @@ func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, err
} }
} }
func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) {
if storageWatcher.diffSource == "csv" {
storageTransformer, ok := storageWatcher.Transformers[contractAddress]
return storageTransformer, ok
} else if storageWatcher.diffSource == "geth" {
logrus.Debug("number of transformers", len(storageWatcher.Transformers))
for address, t := range storageWatcher.Transformers {
keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:]))
if keccakOfTransformerAddress == contractAddress {
return t, true
}
}
return nil, false
}
return nil, false
}
func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
storageTransformer, ok := storageWatcher.Transformers[row.Contract] storageTransformer, ok := storageWatcher.getTransformer(row.Contract)
if !ok { if !ok {
// ignore rows from unwatched contracts logrus.Debug("ignoring a row from an unwatched contract")
return return
} }
executeErr := storageTransformer.Execute(row) executeErr := storageTransformer.Execute(row)
@ -93,7 +119,7 @@ func (storageWatcher StorageWatcher) processQueue() {
logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr)) logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr))
} }
for _, row := range rows { for _, row := range rows {
storageTransformer, ok := storageWatcher.Transformers[row.Contract] storageTransformer, ok := storageWatcher.getTransformer(row.Contract)
if !ok { if !ok {
// delete row from queue if address no longer watched // delete row from queue if address no longer watched
storageWatcher.deleteRow(row.Id) storageWatcher.deleteRow(row.Id)

View File

@ -17,6 +17,7 @@
package watcher_test package watcher_test
import ( import (
"github.com/ethereum/go-ethereum/crypto"
"io/ioutil" "io/ioutil"
"os" "os"
"time" "time"
@ -51,7 +52,8 @@ var _ = Describe("Storage Watcher", func() {
mockFetcher *mocks.MockStorageFetcher mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer mockTransformer *mocks.MockStorageTransformer
row utils.StorageDiffRow csvRow utils.StorageDiffRow
gethRow utils.StorageDiffRow
rows chan utils.StorageDiffRow rows chan utils.StorageDiffRow
storageWatcher watcher.StorageWatcher storageWatcher watcher.StorageWatcher
) )
@ -63,7 +65,7 @@ var _ = Describe("Storage Watcher", func() {
mockFetcher = mocks.NewMockStorageFetcher() mockFetcher = mocks.NewMockStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{Address: address} mockTransformer = &mocks.MockStorageTransformer{Address: address}
row = utils.StorageDiffRow{ csvRow = utils.StorageDiffRow{
Id: 1337, Id: 1337,
Contract: address, Contract: address,
BlockHash: common.HexToHash("0xfedcba9876543210"), BlockHash: common.HexToHash("0xfedcba9876543210"),
@ -71,6 +73,14 @@ var _ = Describe("Storage Watcher", func() {
StorageKey: common.HexToHash("0xabcdef1234567890"), StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"), StorageValue: common.HexToHash("0x9876543210abcdef"),
} }
gethRow = utils.StorageDiffRow{
Id: 1338,
Contract: common.BytesToAddress(crypto.Keccak256(address[:])),
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
}) })
It("logs error if fetching storage diffs fails", func(done Done) { It("logs error if fetching storage diffs fails", func(done Done) {
@ -92,9 +102,10 @@ var _ = Describe("Storage Watcher", func() {
close(done) close(done)
}) })
Describe("transforming new storage diffs", func() { Describe("transforming new storage diffs from csv", func() {
Describe("where diff source is a csv file", func() {
BeforeEach(func() { BeforeEach(func() {
mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} mockFetcher.RowsToReturn = []utils.StorageDiffRow{csvRow}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
@ -105,7 +116,7 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() utils.StorageDiffRow { Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow return mockTransformer.PassedRow
}).Should(Equal(row)) }).Should(Equal(csvRow))
close(done) close(done)
}) })
@ -120,7 +131,7 @@ var _ = Describe("Storage Watcher", func() {
}).Should(BeTrue()) }).Should(BeTrue())
Eventually(func() utils.StorageDiffRow { Eventually(func() utils.StorageDiffRow {
return mockQueue.AddPassedRow return mockQueue.AddPassedRow
}).Should(Equal(row)) }).Should(Equal(csvRow))
close(done) close(done)
}) })
@ -145,36 +156,76 @@ var _ = Describe("Storage Watcher", func() {
}) })
}) })
Describe("transforming queued storage diffs", func() { Describe("where diff source is geth RPC pub sub", func() {
BeforeEach(func() { BeforeEach(func() {
mockQueue.RowsToReturn = []utils.StorageDiffRow{row} mockFetcher.RowsToReturn = []utils.StorageDiffRow{gethRow}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
}) })
It("logs error if getting queued storage fails", func(done Done) { It("executes transformer for recognized storage row", func(done Done) {
mockQueue.GetAllErr = fakes.FakeError go storageWatcher.Execute(rows, errs, time.Hour)
Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow
}).Should(Equal(gethRow))
close(done)
})
It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(rows, errs, time.Hour)
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiffRow {
return mockQueue.AddPassedRow
}).Should(Equal(gethRow))
close(done)
})
It("logs error if queueing row fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred()) Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond) go storageWatcher.Execute(rows, errs, time.Hour)
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error())) }).Should(ContainSubstring(fakes.FakeError.Error()))
close(done) close(done)
}) })
})
})
Describe("transforming queued storage diffs", func() {
Describe("where diff source is a csv file", func() {
BeforeEach(func() {
mockQueue.RowsToReturn = []utils.StorageDiffRow{csvRow}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage row", func(done Done) { It("executes transformer for storage row", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Nanosecond) go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() utils.StorageDiffRow { Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow return mockTransformer.PassedRow
}).Should(Equal(row)) }).Should(Equal(csvRow))
close(done) close(done)
}) })
@ -183,7 +234,7 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() int { Eventually(func() int {
return mockQueue.DeletePassedId return mockQueue.DeletePassedId
}).Should(Equal(row.Id)) }).Should(Equal(csvRow.Id))
close(done) close(done)
}) })
@ -205,7 +256,7 @@ var _ = Describe("Storage Watcher", func() {
It("deletes obsolete row from queue if contract not recognized", func(done Done) { It("deletes obsolete row from queue if contract not recognized", func(done Done) {
obsoleteRow := utils.StorageDiffRow{ obsoleteRow := utils.StorageDiffRow{
Id: row.Id + 1, Id: csvRow.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"), Contract: common.HexToAddress("0xfedcba9876543210"),
} }
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
@ -220,7 +271,7 @@ var _ = Describe("Storage Watcher", func() {
It("logs error if deleting obsolete row fails", func(done Done) { It("logs error if deleting obsolete row fails", func(done Done) {
obsoleteRow := utils.StorageDiffRow{ obsoleteRow := utils.StorageDiffRow{
Id: row.Id + 1, Id: csvRow.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"), Contract: common.HexToAddress("0xfedcba9876543210"),
} }
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
@ -240,5 +291,85 @@ var _ = Describe("Storage Watcher", func() {
}) })
}) })
Describe("where diff source is geth RPC pub sub", func() {
BeforeEach(func() {
mockQueue.RowsToReturn = []utils.StorageDiffRow{gethRow}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage row", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() utils.StorageDiffRow {
return mockTransformer.PassedRow
}).Should(Equal(gethRow))
close(done)
})
It("deletes row from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(gethRow.Id))
close(done)
})
It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete row from queue if contract not recognized", func(done Done) {
obsoleteRow := utils.StorageDiffRow{
Id: gethRow.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteRow.Id))
close(done)
})
It("logs error if deleting obsolete row fails", func(done Done) {
obsoleteRow := utils.StorageDiffRow{
Id: gethRow.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(rows, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
}) })
}) })