review fixes

This commit is contained in:
Ian Norden 2019-11-04 14:50:10 -06:00
parent a834e55b9f
commit db0f024088
15 changed files with 313 additions and 204 deletions

View File

@ -17,13 +17,11 @@
package cmd
import (
"errors"
"fmt"
"plugin"
syn "sync"
"time"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -34,7 +32,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -44,28 +41,22 @@ var executeCmd = &cobra.Command{
Use: "execute",
Short: "executes a precomposed transformer initializer plugin",
Long: `This command needs a config .toml file of form:
[database]
name = "vulcanize_public"
hostname = "localhost"
user = "vulcanize"
password = "vulcanize"
port = 5432
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
name = "exampleTransformerExporter"
Note: If any of the plugin transformer need additional
configuration variables include them in the .toml file as well
The exporter.name is the name (without extension) of the plugin to be loaded.
The plugin file needs to be located in the /plugins directory and this command assumes
the db migrations remain from when the plugin was composed. Additionally, the plugin
must have been composed by the same version of vulcanizedb or else it will not be compatible.
Specify config location when executing the command:
./vulcanizedb execute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) {
@ -131,8 +122,8 @@ func execute() {
switch storageDiffsSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
wsClient := getWSClient()
stateDiffStreamer := streamer.NewStateDiffStreamer(wsClient)
storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
@ -196,22 +187,12 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
}
func backFillStorage(w watcher.IStorageWatcher) {
// configure archival rpc client
// move this all into the storage watcher?
archivalRPCPath := viper.GetString("storageBackFill.rpcPath")
if archivalRPCPath == "" {
LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided"))
}
rawRPCClient, dialErr := rpc.Dial(archivalRPCPath)
if dialErr != nil {
LogWithCommand.Fatal(dialErr)
}
rpcClient := client.NewRPCClient(rawRPCClient, archivalRPCPath)
rpcClient, _ := getClients()
// find min deployment block
minDeploymentBlock := constants.GetMinDeploymentBlock()
stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient)
backFiller := storage.NewStorageBackFiller(stateDiffFetcher, uint64(minDeploymentBlock), storage.DefaultMaxBatchSize)
go w.BackFill(backFiller)
backFiller := storage.NewStorageBackFiller(stateDiffFetcher, storage.DefaultMaxBatchSize)
go w.BackFill(minDeploymentBlock, backFiller)
}
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {

View File

@ -17,6 +17,7 @@
package cmd
import (
"errors"
"fmt"
"strings"
"time"
@ -28,6 +29,7 @@ import (
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/eth"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc"
@ -174,3 +176,15 @@ func getClients() (client.RPCClient, *ethclient.Client) {
return rpcClient, ethClient
}
func getWSClient() core.RPCClient {
wsRPCpath := viper.GetString("client.wsPath")
if wsRPCpath == "" {
LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided"))
}
wsRPCClient, dialErr := rpc.Dial(wsRPCpath)
if dialErr != nil {
LogWithCommand.Fatal(dialErr)
}
return client.NewRPCClient(wsRPCClient, wsRPCpath)
}

View File

@ -100,6 +100,7 @@ The config provides information for composing a set of transformers from externa
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
wsPath = "ws://127.0.0.1:8546"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
@ -160,6 +161,7 @@ The config provides information for composing a set of transformers from externa
- don't leave gaps
- transformers with identical migrations/migration paths should share the same rank
- Note: If any of the imported transformers need additional config variables those need to be included as well
- Note: If the storage transformers are processing storage diffs from geth, we need to configure the websocket endpoint `client.wsPath` for them
This information is used to write and build a Go plugin which exports the configured transformers.
These transformers are loaded onto their specified watchers and executed.
@ -204,7 +206,7 @@ To do so, add the following fields to the config file.
```toml
[storageBackFill]
on = false
rpcPath = ""
```
- `on` is set to `true` to turn the backfill process on
- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs
This process uses the regular `client.ipcPath` rpc path, it assumes that it is either an http or ipc path that supports the `StateDiffAt` endpoint.

View File

@ -47,7 +47,7 @@ func GetMinDeploymentBlock() uint64 {
log.Fatalf("No contracts supplied")
}
minBlock := uint64(math.MaxUint64)
for _, c := range contractNames {
for c := range contractNames {
deployed := getDeploymentBlock(c)
if deployed < minBlock {
minBlock = deployed
@ -56,29 +56,19 @@ func GetMinDeploymentBlock() uint64 {
return minBlock
}
func getContractNames() []string {
initConfig()
func getContractNames() map[string]bool {
transformerNames := viper.GetStringSlice("exporter.transformerNames")
contractNames := make([]string, 0)
contractNames := make(map[string]bool)
for _, transformerName := range transformerNames {
configKey := "exporter." + transformerName + ".contracts"
names := viper.GetStringSlice(configKey)
for _, name := range names {
contractNames = appendNoDuplicates(transformerNames, name)
contractNames[name] = true
}
}
return contractNames
}
func appendNoDuplicates(strSlice []string, str string) []string {
for _, strInSlice := range strSlice {
if strInSlice == str {
return strSlice
}
}
return append(strSlice, str)
}
func getDeploymentBlock(contractName string) uint64 {
configKey := "contract." + contractName + ".deployed"
value := viper.GetInt64(configKey)

View File

@ -18,6 +18,7 @@ package fetcher
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
@ -64,10 +65,11 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
accounts := utils.GetAccountsFromDiff(*stateDiff)
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
errs <- formatErr
continue
}

View File

@ -27,7 +27,6 @@ type BackFiller struct {
StorageDiffsToReturn []utils.StorageDiff
BackFillErrs []error
PassedEndingBlock uint64
StartingBlock uint64
}
// SetStorageDiffsToReturn for tests
@ -36,8 +35,8 @@ func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff)
}
// BackFill mock method
func (backFiller *BackFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
if endingBlock < backFiller.StartingBlock {
func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
if endingBlock < startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
backFiller.PassedEndingBlock = endingBlock

View File

@ -24,36 +24,37 @@ import (
type MockStorageQueue struct {
AddCalled bool
AddError error
AddPassedDiffs []utils.StorageDiff
AddPassedDiffs map[int]utils.StorageDiff
DeleteErr error
DeletePassedIds []int
GetAllErr error
DiffsToReturn []utils.StorageDiff
DiffsToReturn map[int]utils.StorageDiff
GetAllCalled bool
}
// Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
queue.AddCalled = true
queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff)
if queue.AddPassedDiffs == nil {
queue.AddPassedDiffs = make(map[int]utils.StorageDiff)
}
queue.AddPassedDiffs[diff.Id] = diff
return queue.AddError
}
// Delete mock method
func (queue *MockStorageQueue) Delete(id int) error {
queue.DeletePassedIds = append(queue.DeletePassedIds, id)
diffsToReturn := make([]utils.StorageDiff, 0)
for _, diff := range queue.DiffsToReturn {
if diff.Id != id {
diffsToReturn = append(diffsToReturn, diff)
}
}
queue.DiffsToReturn = diffsToReturn
delete(queue.DiffsToReturn, id)
return queue.DeleteErr
}
// GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
queue.GetAllCalled = true
return queue.DiffsToReturn, queue.GetAllErr
diffs := make([]utils.StorageDiff, 0)
for _, diff := range queue.DiffsToReturn {
diffs = append(diffs, diff)
}
return diffs, queue.GetAllErr
}

View File

@ -28,12 +28,15 @@ import (
type MockStorageTransformer struct {
KeccakOfAddress common.Hash
ExecuteErr error
PassedDiffs []utils.StorageDiff
PassedDiffs map[int]utils.StorageDiff
}
// Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error {
transformer.PassedDiffs = append(transformer.PassedDiffs, diff)
if transformer.PassedDiffs == nil {
transformer.PassedDiffs = make(map[int]utils.StorageDiff)
}
transformer.PassedDiffs[diff.Id] = diff
return transformer.ExecuteErr
}

View File

@ -17,10 +17,10 @@
package storage
import (
"errors"
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
@ -30,13 +30,13 @@ import (
)
const (
DefaultMaxBatchSize uint64 = 1000
DefaultMaxBatchSize uint64 = 100
defaultMaxBatchNumber int64 = 10
)
// BackFiller is the backfilling interface
type BackFiller interface {
BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error
BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error
}
// backFiller is the backfilling struct
@ -47,52 +47,33 @@ type backFiller struct {
}
// NewStorageBackFiller returns a BackFiller
func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, startingBlock, batchSize uint64) BackFiller {
func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) BackFiller {
if batchSize == 0 {
batchSize = DefaultMaxBatchSize
}
return &backFiller{
fetcher: fetcher,
batchSize: batchSize,
startingBlock: startingBlock,
}
}
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
if endingBlock < bf.startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
logrus.Infof("going to fill in gap from %d to %d", bf.startingBlock, endingBlock)
// break the range up into bins of smaller ranges
length := endingBlock - bf.startingBlock + 1
numberOfBins := length / bf.batchSize
remainder := length % bf.batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := bf.startingBlock + uint64(bf.batchSize)
if nextBinStart > endingBlock {
nextBinStart = endingBlock + 1
}
blockRange := make([]uint64, 0, nextBinStart-bf.startingBlock+1)
for j := bf.startingBlock; j < nextBinStart; j++ {
blockRange = append(blockRange, j)
}
bf.startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
// break the range up into bins of smaller ranges
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize)
if err != nil {
return err
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
processingDone := make(chan [2]uint64)
forwardDone := make(chan bool)
// for each block range bin spin up a goroutine to batch fetch and process state diffs for that range
// for each block range bin spin up a goroutine to batch fetch and process state diffs in that range
go func() {
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
@ -101,39 +82,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi
// this blocks until a process signals it has finished
<-forwardDone
}
go func(blockHeights []uint64) {
payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil {
errChan <- fetchErr
}
for _, payload := range payloads {
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr
continue
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
errChan <- formatErr
continue
}
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
backFill <- diff
}
}
}
// when this goroutine is done, send out a signal
processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]}
}(blockHeights)
go bf.backFillRange(blockHeights, backFill, errChan, processingDone)
}
}()
@ -153,7 +102,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi
}
logrus.Infof("finished fetching gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1])
goroutinesFinished++
if goroutinesFinished == int(numberOfBins) {
if goroutinesFinished >= len(blockRangeBins) {
done <- true
return
}
@ -163,3 +112,38 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi
return nil
}
func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) {
payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil {
errChan <- fetchErr
}
for _, payload := range payloads {
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr
continue
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
errChan <- formatErr
continue
}
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
diffChan <- diff
}
}
}
// when this is done, send out a signal
doneChan <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]}
}

View File

@ -44,11 +44,12 @@ var _ = Describe("BackFiller", func() {
})
It("batch calls statediff_stateDiffAt", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 100)
backFiller = storage.NewStorageBackFiller(mockFetcher, 100)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
@ -77,11 +78,12 @@ var _ = Describe("BackFiller", func() {
})
It("has a configurable batch size", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
@ -116,11 +118,12 @@ var _ = Describe("BackFiller", func() {
}
mockFetcher.PayloadsToReturn = payloadsToReturn
// batch size of 2 with 1001 block range => 501 bins
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 2)
backFiller = storage.NewStorageBackFiller(mockFetcher, 2)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber.Uint64()+1000,
backFill,
errChan,
@ -151,11 +154,12 @@ var _ = Describe("BackFiller", func() {
mockFetcher.FetchErrs = map[uint64]error{
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
}
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
@ -188,11 +192,12 @@ var _ = Describe("BackFiller", func() {
test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"),
}
mockFetcher.CalledTimes = 0
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill = make(chan utils.StorageDiff)
done = make(chan bool)
errChan = make(chan error)
backFillInitErr = backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,

View File

@ -0,0 +1,45 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils
import "errors"
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
}
if batchSize == 0 {
return nil, errors.New("backfill: batchsize needs to be greater than zero")
}
length := endingBlock - startingBlock + 1
numberOfBins := length / batchSize
remainder := length % batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := startingBlock + batchSize
blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ {
blockRange = append(blockRange, j)
}
startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
return blockRangeBins, nil
}

View File

@ -0,0 +1,66 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
var _ = Describe("GetBlockHeightBins", func() {
It("splits a block range up into bins", func() {
var startingBlock uint64 = 1
var endingBlock uint64 = 10101
var batchSize uint64 = 100
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).ToNot(HaveOccurred())
Expect(len(blockRangeBins)).To(Equal(102))
Expect(blockRangeBins[101]).To(Equal([]uint64{10101}))
startingBlock = 101
endingBlock = 10100
batchSize = 100
lastBin := make([]uint64, 0)
for i := 10001; i <= 10100; i++ {
lastBin = append(lastBin, uint64(i))
}
blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).ToNot(HaveOccurred())
Expect(len(blockRangeBins)).To(Equal(100))
Expect(blockRangeBins[99]).To(Equal(lastBin))
})
It("throws an error if the starting block is higher than the ending block", func() {
var startingBlock uint64 = 10102
var endingBlock uint64 = 10101
var batchSize uint64 = 100
_, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ending block number needs to be greater than starting block number"))
})
It("throws an error if the batch size is zero", func() {
var startingBlock uint64 = 1
var endingBlock uint64 = 10101
var batchSize uint64 = 0
_, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("batchsize needs to be greater than zero"))
})
})

View File

@ -33,7 +33,7 @@ import (
type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(queueRecheckInterval time.Duration, backFillOn bool)
BackFill(backFiller storage.BackFiller)
BackFill(startingBlock uint64, backFiller storage.BackFiller)
}
type StorageWatcher struct {
@ -44,7 +44,7 @@ type StorageWatcher struct {
DiffsChan chan utils.StorageDiff
ErrsChan chan error
BackFillDoneChan chan bool
StartingSyncBlockChan chan int
StartingSyncBlockChan chan uint64
}
func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher {
@ -55,7 +55,7 @@ func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatch
StorageFetcher: f,
DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize),
ErrsChan: make(chan error),
StartingSyncBlockChan: make(chan int),
StartingSyncBlockChan: make(chan uint64),
BackFillDoneChan: make(chan bool),
Queue: queue,
KeccakAddressTransformers: transformers,
@ -70,10 +70,10 @@ func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer
}
// BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher
func (storageWatcher *StorageWatcher) BackFill(backFiller storage.BackFiller) {
func (storageWatcher *StorageWatcher) BackFill(startingBlock uint64, backFiller storage.BackFiller) {
// this blocks until the Execute process sends us the first block number it sees
startingSyncBlock := <-storageWatcher.StartingSyncBlockChan
backFillInitErr := backFiller.BackFill(uint64(startingSyncBlock),
endBackFillBlock := <-storageWatcher.StartingSyncBlockChan
backFillInitErr := backFiller.BackFill(startingBlock, endBackFillBlock,
storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan)
if backFillInitErr != nil {
logrus.Warn(backFillInitErr)
@ -91,7 +91,7 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error()))
case diff := <-storageWatcher.DiffsChan:
if start && backFillOn {
storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1
storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1)
start = false
}
storageWatcher.processRow(diff)

View File

@ -20,6 +20,7 @@ import (
"errors"
"io/ioutil"
"os"
"sort"
"time"
"github.com/ethereum/go-ethereum/common"
@ -48,7 +49,6 @@ var _ = Describe("Storage Watcher", func() {
Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer))
})
})
Describe("Execute", func() {
var (
mockFetcher *mocks.StorageFetcher
@ -104,9 +104,11 @@ var _ = Describe("Storage Watcher", func() {
It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(time.Hour, false)
Eventually(func() []utils.StorageDiff {
Eventually(func() map[int]utils.StorageDiff {
return mockTransformer.PassedDiffs
}).Should(Equal([]utils.StorageDiff{csvDiff}))
}).Should(Equal(map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
}))
close(done)
})
@ -120,7 +122,7 @@ var _ = Describe("Storage Watcher", func() {
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
if len(mockQueue.AddPassedDiffs) > 0 {
return mockQueue.AddPassedDiffs[0]
return mockQueue.AddPassedDiffs[csvDiff.ID]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff))
@ -150,7 +152,9 @@ var _ = Describe("Storage Watcher", func() {
Describe("transforming queued storage diffs", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
@ -161,7 +165,7 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() utils.StorageDiff {
if len(mockTransformer.PassedDiffs) > 0 {
return mockTransformer.PassedDiffs[0]
return mockTransformer.PassedDiffs[csvDiff.ID]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff))
@ -201,7 +205,9 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
}
go storageWatcher.Execute(time.Nanosecond, false)
@ -219,7 +225,9 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
}
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
@ -248,11 +256,19 @@ var _ = Describe("Storage Watcher", func() {
csvDiff utils.StorageDiff
storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash
createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff
)
BeforeEach(func() {
createdDiff = test_data.CreatedExpectedStorageDiff
createdDiff.ID = 1333
updatedDiff1 = test_data.UpdatedExpectedStorageDiff
updatedDiff1.ID = 1334
deletedDiff = test_data.DeletedExpectedStorageDiff
deletedDiff.ID = 1335
updatedDiff2 = test_data.UpdatedExpectedStorageDiff2
updatedDiff2.ID = 1336
mockBackFiller = new(mocks.BackFiller)
mockBackFiller.StartingBlock = test_data.BlockNumber.Uint64()
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
@ -269,16 +285,16 @@ var _ = Describe("Storage Watcher", func() {
}
})
Describe("transforming streamed and backfilled queued storage diffs", func() {
Describe("transforming streamed and backfilled storage diffs", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
createdDiff,
updatedDiff1,
deletedDiff,
updatedDiff2,
})
mockQueue.DiffsToReturn = []utils.StorageDiff{}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
@ -289,7 +305,7 @@ var _ = Describe("Storage Watcher", func() {
})
It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) {
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
@ -303,18 +319,18 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3))
Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
It("adds diffs to the queue if transformation fails", func(done Done) {
mockTransformer3.ExecuteErr = fakes.FakeError
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
@ -330,22 +346,23 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() []utils.StorageDiff {
Eventually(func() map[int]utils.StorageDiff {
if len(mockQueue.AddPassedDiffs) > 2 {
return mockQueue.AddPassedDiffs
}
return []utils.StorageDiff{}
}).Should(Equal([]utils.StorageDiff{
test_data.UpdatedExpectedStorageDiff,
test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
return map[int]utils.StorageDiff{}
}).Should(Equal(map[int]utils.StorageDiff{
updatedDiff1.ID: updatedDiff1,
deletedDiff.ID: deletedDiff,
updatedDiff2.ID: updatedDiff2,
}))
Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
@ -362,7 +379,7 @@ var _ = Describe("Storage Watcher", func() {
errors.New("mock backfiller error"),
}
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
@ -387,7 +404,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() (string, error) {
@ -398,17 +415,14 @@ var _ = Describe("Storage Watcher", func() {
})
})
Describe("transforms storage diffs", func() {
Describe("transforms queued storage diffs", func() {
BeforeEach(func() {
test_data.CreatedExpectedStorageDiff.ID = 1334
test_data.UpdatedExpectedStorageDiff.ID = 1335
test_data.DeletedExpectedStorageDiff.ID = 1336
test_data.UpdatedExpectedStorageDiff2.ID = 1337
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff,
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
createdDiff.ID: createdDiff,
updatedDiff1.ID: updatedDiff1,
deletedDiff.ID: deletedDiff,
updatedDiff2.ID: updatedDiff2,
}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
@ -420,7 +434,7 @@ var _ = Describe("Storage Watcher", func() {
})
It("executes transformers on queued storage diffs", func(done Done) {
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Nanosecond, true)
Eventually(func() int {
@ -435,26 +449,29 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() bool {
return mockQueue.GetAllCalled
}).Should(BeTrue())
sortedExpectedIDs := []int{
csvDiff.ID,
createdDiff.ID,
updatedDiff1.ID,
deletedDiff.ID,
updatedDiff2.ID,
}
sort.Ints(sortedExpectedIDs)
Eventually(func() []int {
if len(mockQueue.DeletePassedIds) > 4 {
sort.Ints(mockQueue.DeletePassedIds)
return mockQueue.DeletePassedIds
}
return []int{}
}).Should(Equal([]int{
csvDiff.ID,
test_data.CreatedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff.ID,
test_data.DeletedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff2.ID,
}))
}).Should(Equal(sortedExpectedIDs))
Expect(mockQueue.AddCalled).To(Not(BeTrue()))
Expect(len(mockQueue.DiffsToReturn)).To(Equal(0))
Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
})

View File

@ -25,5 +25,5 @@ import (
func TestGeth(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "eth Suite")
RunSpecs(t, "Eth Suite")
}