Factor out an IStorageWatcher interface

This commit is contained in:
Elizabeth Engelman 2019-07-30 09:03:03 -05:00
parent 18f47b7b6a
commit 0cc90a1f80
8 changed files with 618 additions and 465 deletions

View File

@ -188,8 +188,7 @@ func composeAndExecute() {
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload)
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(&storageFetcher, &db) sw := watcher.NewGethStorageWatcher(&storageFetcher, &db)
sw.SetStorageDiffSource("geth")
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)
@ -197,7 +196,7 @@ func composeAndExecute() {
log.Debug("fetching storage diffs from csv") log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewCsvStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)

View File

@ -132,8 +132,7 @@ func execute() {
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload)
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(&storageFetcher, &db) sw := watcher.NewGethStorageWatcher(&storageFetcher, &db)
sw.SetStorageDiffSource("geth")
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)
@ -141,7 +140,7 @@ func execute() {
log.Debug("fetching storage diffs from csv") log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewCsvStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)
@ -183,7 +182,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
} }
} }
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher // Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers") LogWithCommand.Info("executing storage transformers")
@ -191,8 +190,8 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
errs := make(chan error) errs := make(chan error)
rows := make(chan storageUtils.StorageDiffRow) diffs := make(chan storageUtils.StorageDiff)
w.Execute(rows, errs, queueRecheckInterval) w.Execute(diffs, errs, queueRecheckInterval)
} }
} }

View File

@ -0,0 +1,45 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package watcher
import (
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type CsvStorageWatcher struct {
StorageWatcher
}
func NewCsvStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) CsvStorageWatcher {
queue := storage.NewStorageQueue(db)
transformers := make(map[common.Address]transformer.StorageTransformer)
storageWatcher := StorageWatcher{
db: db,
StorageFetcher: fetcher,
Queue: queue,
Transformers: transformers,
}
storageWatcher.transformerGetter = storageWatcher.getCsvTransformer
return CsvStorageWatcher{StorageWatcher: storageWatcher}
}
func (storageWatcher StorageWatcher) getCsvTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.Transformers[contractAddress]
return storageTransformer, ok
}

View File

@ -0,0 +1,228 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
import (
"io/ioutil"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Csv Storage Watcher", func() {
It("adds transformers", func() {
fakeAddress := common.HexToAddress("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
w := watcher.NewCsvStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
})
Describe("executing watcher", func() {
var (
errs chan error
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
diffs chan utils.StorageDiff
storageWatcher watcher.CsvStorageWatcher
address common.Address
)
BeforeEach(func() {
errs = make(chan error)
diffs = make(chan utils.StorageDiff)
address = common.HexToAddress("0x0123456789abcdef")
mockFetcher = mocks.NewMockStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{Address: address}
csvDiff = utils.StorageDiff{
Id: 1337,
Contract: address,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
It("logs error if fetching storage diffs fails", func(done Done) {
mockFetcher.ErrsToReturn = []error{fakes.FakeError}
storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
Describe("transforming new storage diffs from csv", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour)
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("logs error if queueing diff fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
Describe("transforming queued storage diffs", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff}
storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(csvDiff.Id))
close(done)
})
It("logs error if deleting persisted diff fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete diff from queue if contract not recognized", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: csvDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteDiff.Id))
close(done)
})
It("logs error if deleting obsolete diff fails", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: csvDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
})

View File

@ -0,0 +1,60 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package watcher
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type GethStorageWatcher struct {
StorageWatcher
}
func NewGethStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) GethStorageWatcher {
queue := storage.NewStorageQueue(db)
transformers := make(map[common.Address]transformer.StorageTransformer)
keccakAddressTransformers := make(map[common.Address]transformer.StorageTransformer)
storageWatcher := StorageWatcher{
db: db,
StorageFetcher: fetcher,
Queue: queue,
Transformers: transformers,
KeccakAddressTransformers: keccakAddressTransformers,
}
storageWatcher.transformerGetter = storageWatcher.getTransformerForGethWatcher
return GethStorageWatcher{StorageWatcher: storageWatcher}
}
func (storageWatcher StorageWatcher) getTransformerForGethWatcher(contractAddress common.Address) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[contractAddress]
if ok {
return storageTransformer, ok
} else {
for address, transformer := range storageWatcher.Transformers {
keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:]))
if keccakOfTransformerAddress == contractAddress {
storageWatcher.KeccakAddressTransformers[contractAddress] = transformer
return transformer, true
}
}
}
return nil, false
}

View File

@ -0,0 +1,271 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
import (
"github.com/ethereum/go-ethereum/crypto"
"io/ioutil"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Geth Storage Watcher", func() {
It("adds transformers", func() {
fakeAddress := common.HexToAddress("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
w := watcher.NewGethStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
})
Describe("executing watcher", func() {
var (
errs chan error
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
gethDiff utils.StorageDiff
diffs chan utils.StorageDiff
storageWatcher watcher.GethStorageWatcher
address common.Address
keccakOfAddress common.Address
)
BeforeEach(func() {
errs = make(chan error)
diffs = make(chan utils.StorageDiff)
address = common.HexToAddress("0x0123456789abcdef")
keccakOfAddress = common.BytesToAddress(crypto.Keccak256(address[:]))
mockFetcher = mocks.NewMockStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{Address: address}
gethDiff = utils.StorageDiff{
Id: 1338,
Contract: keccakOfAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
It("logs error if fetching storage diffs fails", func(done Done) {
mockFetcher.ErrsToReturn = []error{fakes.FakeError}
storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
Describe("transforming new storage diffs", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{gethDiff}
storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour)
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("logs error if queueing diff fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("keeps track transformers by the keccak256 hash of their contract address ", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
m := make(map[common.Address]transformer.StorageTransformer)
m[keccakOfAddress] = mockTransformer
Eventually(func() map[common.Address]transformer.StorageTransformer {
return storageWatcher.KeccakAddressTransformers
}).Should(Equal(m))
close(done)
})
It("gets the transformer from the known keccak address map first", func(done Done) {
anotherAddress := common.HexToAddress("0xafakeaddress")
anotherTransformer := &mocks.MockStorageTransformer{Address: anotherAddress}
keccakOfAnotherAddress := common.BytesToAddress(crypto.Keccak256(anotherAddress[:]))
anotherGethDiff := utils.StorageDiff{
Id: 1338,
Contract: keccakOfAnotherAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
mockFetcher.DiffsToReturn = []utils.StorageDiff{anotherGethDiff}
storageWatcher.KeccakAddressTransformers[keccakOfAnotherAddress] = anotherTransformer
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return anotherTransformer.PassedDiff
}).Should(Equal(anotherGethDiff))
close(done)
})
})
Describe("transforming queued storage diffs", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{gethDiff}
storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(gethDiff.Id))
close(done)
})
It("logs error if deleting persisted diff fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete diff from queue if contract not recognized", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: gethDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteDiff.Id))
close(done)
})
It("logs error if deleting obsolete diff fails", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: gethDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
})

View File

@ -18,7 +18,6 @@ package watcher
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/crypto"
"reflect" "reflect"
"time" "time"
@ -32,31 +31,18 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration)
}
type StorageWatcher struct { type StorageWatcher struct {
db *postgres.DB db *postgres.DB
diffSource string
StorageFetcher fetcher.IStorageFetcher StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
Transformers map[common.Address]transformer.StorageTransformer Transformers map[common.Address]transformer.StorageTransformer
KeccakAddressTransformers map[common.Address]transformer.StorageTransformer // keccak hash of an address => transformer KeccakAddressTransformers map[common.Address]transformer.StorageTransformer // keccak hash of an address => transformer
} transformerGetter func(common.Address) (transformer.StorageTransformer, bool)
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
queue := storage.NewStorageQueue(db)
transformers := make(map[common.Address]transformer.StorageTransformer)
keccakAddressTransformers := make(map[common.Address]transformer.StorageTransformer)
return StorageWatcher{
db: db,
diffSource: "csv",
StorageFetcher: fetcher,
Queue: queue,
Transformers: transformers,
KeccakAddressTransformers: keccakAddressTransformers,
}
}
func (storageWatcher *StorageWatcher) SetStorageDiffSource(source string) {
storageWatcher.diffSource = source
} }
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
@ -82,26 +68,7 @@ func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, e
} }
func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) { func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) {
if storageWatcher.diffSource == "csv" { return storageWatcher.transformerGetter(contractAddress)
storageTransformer, ok := storageWatcher.Transformers[contractAddress]
return storageTransformer, ok
} else if storageWatcher.diffSource == "geth" {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[contractAddress]
if ok {
return storageTransformer, ok
} else {
for address, transformer := range storageWatcher.Transformers {
keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:]))
if keccakOfTransformerAddress == contractAddress {
storageWatcher.KeccakAddressTransformers[contractAddress] = transformer
return transformer, true
}
}
}
return nil, false
}
return nil, false
} }
func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) {

View File

@ -1,416 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
import (
"github.com/ethereum/go-ethereum/crypto"
"io/ioutil"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() {
fakeAddress := common.HexToAddress("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
})
Describe("executing watcher", func() {
var (
errs chan error
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
gethDiff utils.StorageDiff
diffs chan utils.StorageDiff
storageWatcher watcher.StorageWatcher
address common.Address
keccakOfAddress common.Address
)
BeforeEach(func() {
errs = make(chan error)
diffs = make(chan utils.StorageDiff)
address = common.HexToAddress("0x0123456789abcdef")
keccakOfAddress = common.BytesToAddress(crypto.Keccak256(address[:]))
mockFetcher = mocks.NewMockStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{Address: address}
csvDiff = utils.StorageDiff{
Id: 1337,
Contract: address,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
gethDiff = utils.StorageDiff{
Id: 1338,
Contract: keccakOfAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
It("logs error if fetching storage diffs fails", func(done Done) {
mockFetcher.ErrsToReturn = []error{fakes.FakeError}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
Describe("transforming new storage diffs from csv", func() {
Describe("where diff source is a csv file", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour)
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("logs error if queueing diff fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
Describe("where diff source is geth RPC pub sub", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{gethDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour)
Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("logs error if queueing diff fails", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
mockQueue.AddError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("keeps track transformers by the keccak256 hash of their contract address ", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour)
m := make(map[common.Address]transformer.StorageTransformer)
m[keccakOfAddress] = mockTransformer
Eventually(func() map[common.Address]transformer.StorageTransformer {
return storageWatcher.KeccakAddressTransformers
}).Should(Equal(m))
close(done)
})
It("gets the transformer from the known keccak address map first", func(done Done) {
anotherAddress := common.HexToAddress("0xafakeaddress")
anotherTransformer := &mocks.MockStorageTransformer{Address: anotherAddress}
keccakOfAnotherAddress := common.BytesToAddress(crypto.Keccak256(anotherAddress[:]))
anotherGethDiff := utils.StorageDiff{
Id: 1338,
Contract: keccakOfAnotherAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
mockFetcher.DiffsToReturn = []utils.StorageDiff{anotherGethDiff}
storageWatcher.KeccakAddressTransformers[keccakOfAnotherAddress] = anotherTransformer
go storageWatcher.Execute(diffs, errs, time.Hour)
Eventually(func() utils.StorageDiff {
return anotherTransformer.PassedDiff
}).Should(Equal(anotherGethDiff))
close(done)
})
})
})
Describe("transforming queued storage diffs", func() {
Describe("where diff source is a csv file", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(csvDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(csvDiff.Id))
close(done)
})
It("logs error if deleting persisted diff fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete diff from queue if contract not recognized", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: csvDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteDiff.Id))
close(done)
})
It("logs error if deleting obsolete diff fails", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: csvDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
Describe("where diff source is geth RPC pub sub", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{gethDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.SetStorageDiffSource("geth")
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff
}).Should(Equal(gethDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(gethDiff.Id))
close(done)
})
It("logs error if deleting persisted diff fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
It("deletes obsolete diff from queue if contract not recognized", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: gethDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(obsoleteDiff.Id))
close(done)
})
It("logs error if deleting obsolete diff fails", func(done Done) {
obsoleteDiff := utils.StorageDiff{
Id: gethDiff.Id + 1,
Contract: common.HexToAddress("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
})
})