diff --git a/cmd/execute.go b/cmd/execute.go index 27fdf88f..e23cfcbc 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -17,13 +17,11 @@ package cmd import ( - "errors" "fmt" "plugin" syn "sync" "time" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -34,7 +32,6 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/utils" ) @@ -44,28 +41,22 @@ var executeCmd = &cobra.Command{ Use: "execute", Short: "executes a precomposed transformer initializer plugin", Long: `This command needs a config .toml file of form: - [database] name = "vulcanize_public" hostname = "localhost" user = "vulcanize" password = "vulcanize" port = 5432 - [client] ipcPath = "/Users/user/Library/Ethereum/geth.ipc" - [exporter] name = "exampleTransformerExporter" - Note: If any of the plugin transformer need additional configuration variables include them in the .toml file as well - The exporter.name is the name (without extension) of the plugin to be loaded. The plugin file needs to be located in the /plugins directory and this command assumes the db migrations remain from when the plugin was composed. Additionally, the plugin must have been composed by the same version of vulcanizedb or else it will not be compatible. - Specify config location when executing the command: ./vulcanizedb execute --config=./environments/config_name.toml`, Run: func(cmd *cobra.Command, args []string) { @@ -131,8 +122,8 @@ func execute() { switch storageDiffsSource { case "geth": log.Debug("fetching storage diffs from geth pub sub") - rpcClient, _ := getClients() - stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) + wsClient := getWSClient() + stateDiffStreamer := streamer.NewStateDiffStreamer(wsClient) storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer) sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) @@ -196,22 +187,12 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { } func backFillStorage(w watcher.IStorageWatcher) { - // configure archival rpc client - // move this all into the storage watcher? - archivalRPCPath := viper.GetString("storageBackFill.rpcPath") - if archivalRPCPath == "" { - LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided")) - } - rawRPCClient, dialErr := rpc.Dial(archivalRPCPath) - if dialErr != nil { - LogWithCommand.Fatal(dialErr) - } - rpcClient := client.NewRPCClient(rawRPCClient, archivalRPCPath) + rpcClient, _ := getClients() // find min deployment block minDeploymentBlock := constants.GetMinDeploymentBlock() stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient) - backFiller := storage.NewStorageBackFiller(stateDiffFetcher, uint64(minDeploymentBlock), storage.DefaultMaxBatchSize) - go w.BackFill(backFiller) + backFiller := storage.NewStorageBackFiller(stateDiffFetcher, storage.DefaultMaxBatchSize) + go w.BackFill(minDeploymentBlock, backFiller) } func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { diff --git a/cmd/root.go b/cmd/root.go index cd8e4fd1..6b45ea52 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -17,6 +17,7 @@ package cmd import ( + "errors" "fmt" "strings" "time" @@ -28,6 +29,7 @@ import ( "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/eth" "github.com/vulcanize/vulcanizedb/pkg/eth/client" vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" @@ -174,3 +176,15 @@ func getClients() (client.RPCClient, *ethclient.Client) { return rpcClient, ethClient } + +func getWSClient() core.RPCClient { + wsRPCpath := viper.GetString("client.wsPath") + if wsRPCpath == "" { + LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided")) + } + wsRPCClient, dialErr := rpc.Dial(wsRPCpath) + if dialErr != nil { + LogWithCommand.Fatal(dialErr) + } + return client.NewRPCClient(wsRPCClient, wsRPCpath) +} diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md index 6e348646..2fe2a9f7 100644 --- a/documentation/custom-transformers.md +++ b/documentation/custom-transformers.md @@ -100,6 +100,7 @@ The config provides information for composing a set of transformers from externa [client] ipcPath = "/Users/user/Library/Ethereum/geth.ipc" + wsPath = "ws://127.0.0.1:8546" [exporter] home = "github.com/vulcanize/vulcanizedb" @@ -160,6 +161,7 @@ The config provides information for composing a set of transformers from externa - don't leave gaps - transformers with identical migrations/migration paths should share the same rank - Note: If any of the imported transformers need additional config variables those need to be included as well +- Note: If the storage transformers are processing storage diffs from geth, we need to configure the websocket endpoint `client.wsPath` for them This information is used to write and build a Go plugin which exports the configured transformers. These transformers are loaded onto their specified watchers and executed. @@ -204,7 +206,7 @@ To do so, add the following fields to the config file. ```toml [storageBackFill] on = false - rpcPath = "" ``` - `on` is set to `true` to turn the backfill process on -- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs \ No newline at end of file + +This process uses the regular `client.ipcPath` rpc path, it assumes that it is either an http or ipc path that supports the `StateDiffAt` endpoint. \ No newline at end of file diff --git a/libraries/shared/constants/external.go b/libraries/shared/constants/external.go index c53c4629..156d13c7 100644 --- a/libraries/shared/constants/external.go +++ b/libraries/shared/constants/external.go @@ -47,7 +47,7 @@ func GetMinDeploymentBlock() uint64 { log.Fatalf("No contracts supplied") } minBlock := uint64(math.MaxUint64) - for _, c := range contractNames { + for c := range contractNames { deployed := getDeploymentBlock(c) if deployed < minBlock { minBlock = deployed @@ -56,29 +56,19 @@ func GetMinDeploymentBlock() uint64 { return minBlock } -func getContractNames() []string { - initConfig() +func getContractNames() map[string]bool { transformerNames := viper.GetStringSlice("exporter.transformerNames") - contractNames := make([]string, 0) + contractNames := make(map[string]bool) for _, transformerName := range transformerNames { configKey := "exporter." + transformerName + ".contracts" names := viper.GetStringSlice(configKey) for _, name := range names { - contractNames = appendNoDuplicates(transformerNames, name) + contractNames[name] = true } } return contractNames } -func appendNoDuplicates(strSlice []string, str string) []string { - for _, strInSlice := range strSlice { - if strInSlice == str { - return strSlice - } - } - return append(strSlice, str) -} - func getDeploymentBlock(contractName string) uint64 { configKey := "contract." + contractName + ".deployed" value := viper.GetInt64(configKey) diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index a9cfd981..570a1f46 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -18,6 +18,7 @@ package fetcher import ( "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" @@ -64,10 +65,11 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD accounts := utils.GetAccountsFromDiff(*stateDiff) logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber)) for _, account := range accounts { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex())) for _, storage := range account.Storage { diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) if formatErr != nil { + logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key)) errs <- formatErr continue } diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go index 43436d8a..3a1a0254 100644 --- a/libraries/shared/mocks/backfiller.go +++ b/libraries/shared/mocks/backfiller.go @@ -27,7 +27,6 @@ type BackFiller struct { StorageDiffsToReturn []utils.StorageDiff BackFillErrs []error PassedEndingBlock uint64 - StartingBlock uint64 } // SetStorageDiffsToReturn for tests @@ -36,8 +35,8 @@ func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) } // BackFill mock method -func (backFiller *BackFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { - if endingBlock < backFiller.StartingBlock { +func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { + if endingBlock < startingBlock { return errors.New("backfill: ending block number needs to be greater than starting block number") } backFiller.PassedEndingBlock = endingBlock diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index 84a80a30..dd871cb4 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -24,36 +24,37 @@ import ( type MockStorageQueue struct { AddCalled bool AddError error - AddPassedDiffs []utils.StorageDiff + AddPassedDiffs map[int]utils.StorageDiff DeleteErr error DeletePassedIds []int GetAllErr error - DiffsToReturn []utils.StorageDiff + DiffsToReturn map[int]utils.StorageDiff GetAllCalled bool } // Add mock method func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { queue.AddCalled = true - queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff) + if queue.AddPassedDiffs == nil { + queue.AddPassedDiffs = make(map[int]utils.StorageDiff) + } + queue.AddPassedDiffs[diff.Id] = diff return queue.AddError } // Delete mock method func (queue *MockStorageQueue) Delete(id int) error { queue.DeletePassedIds = append(queue.DeletePassedIds, id) - diffsToReturn := make([]utils.StorageDiff, 0) - for _, diff := range queue.DiffsToReturn { - if diff.Id != id { - diffsToReturn = append(diffsToReturn, diff) - } - } - queue.DiffsToReturn = diffsToReturn + delete(queue.DiffsToReturn, id) return queue.DeleteErr } // GetAll mock method func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { queue.GetAllCalled = true - return queue.DiffsToReturn, queue.GetAllErr + diffs := make([]utils.StorageDiff, 0) + for _, diff := range queue.DiffsToReturn { + diffs = append(diffs, diff) + } + return diffs, queue.GetAllErr } diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go index 2f126fd6..798de0ac 100644 --- a/libraries/shared/mocks/storage_transformer.go +++ b/libraries/shared/mocks/storage_transformer.go @@ -28,12 +28,15 @@ import ( type MockStorageTransformer struct { KeccakOfAddress common.Hash ExecuteErr error - PassedDiffs []utils.StorageDiff + PassedDiffs map[int]utils.StorageDiff } // Execute mock method func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { - transformer.PassedDiffs = append(transformer.PassedDiffs, diff) + if transformer.PassedDiffs == nil { + transformer.PassedDiffs = make(map[int]utils.StorageDiff) + } + transformer.PassedDiffs[diff.Id] = diff return transformer.ExecuteErr } diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go index fe21b2e2..6b5f290c 100644 --- a/libraries/shared/storage/backfiller.go +++ b/libraries/shared/storage/backfiller.go @@ -17,10 +17,10 @@ package storage import ( - "errors" "fmt" "sync/atomic" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" @@ -30,13 +30,13 @@ import ( ) const ( - DefaultMaxBatchSize uint64 = 1000 + DefaultMaxBatchSize uint64 = 100 defaultMaxBatchNumber int64 = 10 ) // BackFiller is the backfilling interface type BackFiller interface { - BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error + BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error } // backFiller is the backfilling struct @@ -47,52 +47,33 @@ type backFiller struct { } // NewStorageBackFiller returns a BackFiller -func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, startingBlock, batchSize uint64) BackFiller { +func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) BackFiller { if batchSize == 0 { batchSize = DefaultMaxBatchSize } return &backFiller{ - fetcher: fetcher, - batchSize: batchSize, - startingBlock: startingBlock, + fetcher: fetcher, + batchSize: batchSize, } } // BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { - if endingBlock < bf.startingBlock { - return errors.New("backfill: ending block number needs to be greater than starting block number") - } - logrus.Infof("going to fill in gap from %d to %d", bf.startingBlock, endingBlock) - // break the range up into bins of smaller ranges - length := endingBlock - bf.startingBlock + 1 - numberOfBins := length / bf.batchSize - remainder := length % bf.batchSize - if remainder != 0 { - numberOfBins++ - } - blockRangeBins := make([][]uint64, numberOfBins) - for i := range blockRangeBins { - nextBinStart := bf.startingBlock + uint64(bf.batchSize) - if nextBinStart > endingBlock { - nextBinStart = endingBlock + 1 - } - blockRange := make([]uint64, 0, nextBinStart-bf.startingBlock+1) - for j := bf.startingBlock; j < nextBinStart; j++ { - blockRange = append(blockRange, j) - } - bf.startingBlock = nextBinStart - blockRangeBins[i] = blockRange - } +func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { + logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) + // break the range up into bins of smaller ranges + blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize) + if err != nil { + return err + } // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done processingDone := make(chan [2]uint64) forwardDone := make(chan bool) - // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range + // for each block range bin spin up a goroutine to batch fetch and process state diffs in that range go func() { for _, blockHeights := range blockRangeBins { // if we have reached our limit of active goroutines @@ -101,39 +82,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi // this blocks until a process signals it has finished <-forwardDone } - go func(blockHeights []uint64) { - payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) - if fetchErr != nil { - errChan <- fetchErr - } - for _, payload := range payloads { - stateDiff := new(statediff.StateDiff) - stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) - if stateDiffDecodeErr != nil { - errChan <- stateDiffDecodeErr - continue - } - accounts := utils.GetAccountsFromDiff(*stateDiff) - for _, account := range accounts { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) - for _, storage := range account.Storage { - diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) - if formatErr != nil { - errChan <- formatErr - continue - } - logrus.Trace("adding storage diff to results", - "keccak of address: ", diff.HashedAddress.Hex(), - "block height: ", diff.BlockHeight, - "storage key: ", diff.StorageKey.Hex(), - "storage value: ", diff.StorageValue.Hex()) - backFill <- diff - } - } - } - // when this goroutine is done, send out a signal - processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} - }(blockHeights) + go bf.backFillRange(blockHeights, backFill, errChan, processingDone) } }() @@ -153,7 +102,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi } logrus.Infof("finished fetching gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1]) goroutinesFinished++ - if goroutinesFinished == int(numberOfBins) { + if goroutinesFinished >= len(blockRangeBins) { done <- true return } @@ -163,3 +112,38 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi return nil } + +func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) { + payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) + if fetchErr != nil { + errChan <- fetchErr + } + for _, payload := range payloads { + stateDiff := new(statediff.StateDiff) + stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) + if stateDiffDecodeErr != nil { + errChan <- stateDiffDecodeErr + continue + } + accounts := utils.GetAccountsFromDiff(*stateDiff) + for _, account := range accounts { + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex())) + for _, storage := range account.Storage { + diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) + if formatErr != nil { + logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key)) + errChan <- formatErr + continue + } + logrus.Trace("adding storage diff to results", + "keccak of address: ", diff.HashedAddress.Hex(), + "block height: ", diff.BlockHeight, + "storage key: ", diff.StorageKey.Hex(), + "storage value: ", diff.StorageValue.Hex()) + diffChan <- diff + } + } + } + // when this is done, send out a signal + doneChan <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} +} diff --git a/libraries/shared/storage/backfiller_test.go b/libraries/shared/storage/backfiller_test.go index 1a893b39..0b347576 100644 --- a/libraries/shared/storage/backfiller_test.go +++ b/libraries/shared/storage/backfiller_test.go @@ -44,11 +44,12 @@ var _ = Describe("BackFiller", func() { }) It("batch calls statediff_stateDiffAt", func() { - backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 100) + backFiller = storage.NewStorageBackFiller(mockFetcher, 100) backFill := make(chan utils.StorageDiff) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( + test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64(), backFill, errChan, @@ -77,11 +78,12 @@ var _ = Describe("BackFiller", func() { }) It("has a configurable batch size", func() { - backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFill := make(chan utils.StorageDiff) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( + test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64(), backFill, errChan, @@ -116,11 +118,12 @@ var _ = Describe("BackFiller", func() { } mockFetcher.PayloadsToReturn = payloadsToReturn // batch size of 2 with 1001 block range => 501 bins - backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 2) + backFiller = storage.NewStorageBackFiller(mockFetcher, 2) backFill := make(chan utils.StorageDiff) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( + test_data.BlockNumber.Uint64(), test_data.BlockNumber.Uint64()+1000, backFill, errChan, @@ -151,11 +154,12 @@ var _ = Describe("BackFiller", func() { mockFetcher.FetchErrs = map[uint64]error{ test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"), } - backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFill := make(chan utils.StorageDiff) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( + test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64(), backFill, errChan, @@ -188,11 +192,12 @@ var _ = Describe("BackFiller", func() { test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"), } mockFetcher.CalledTimes = 0 - backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFill = make(chan utils.StorageDiff) done = make(chan bool) errChan = make(chan error) backFillInitErr = backFiller.BackFill( + test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64(), backFill, errChan, diff --git a/libraries/shared/storage/utils/bins.go b/libraries/shared/storage/utils/bins.go new file mode 100644 index 00000000..18f28c32 --- /dev/null +++ b/libraries/shared/storage/utils/bins.go @@ -0,0 +1,45 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package utils + +import "errors" + +func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) { + if endingBlock < startingBlock { + return nil, errors.New("backfill: ending block number needs to be greater than starting block number") + } + if batchSize == 0 { + return nil, errors.New("backfill: batchsize needs to be greater than zero") + } + length := endingBlock - startingBlock + 1 + numberOfBins := length / batchSize + remainder := length % batchSize + if remainder != 0 { + numberOfBins++ + } + blockRangeBins := make([][]uint64, numberOfBins) + for i := range blockRangeBins { + nextBinStart := startingBlock + batchSize + blockRange := make([]uint64, 0, nextBinStart-startingBlock+1) + for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ { + blockRange = append(blockRange, j) + } + startingBlock = nextBinStart + blockRangeBins[i] = blockRange + } + return blockRangeBins, nil +} diff --git a/libraries/shared/storage/utils/bins_test.go b/libraries/shared/storage/utils/bins_test.go new file mode 100644 index 00000000..84f9405e --- /dev/null +++ b/libraries/shared/storage/utils/bins_test.go @@ -0,0 +1,66 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package utils_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +) + +var _ = Describe("GetBlockHeightBins", func() { + It("splits a block range up into bins", func() { + var startingBlock uint64 = 1 + var endingBlock uint64 = 10101 + var batchSize uint64 = 100 + blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) + Expect(err).ToNot(HaveOccurred()) + Expect(len(blockRangeBins)).To(Equal(102)) + Expect(blockRangeBins[101]).To(Equal([]uint64{10101})) + + startingBlock = 101 + endingBlock = 10100 + batchSize = 100 + lastBin := make([]uint64, 0) + for i := 10001; i <= 10100; i++ { + lastBin = append(lastBin, uint64(i)) + } + blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) + Expect(err).ToNot(HaveOccurred()) + Expect(len(blockRangeBins)).To(Equal(100)) + Expect(blockRangeBins[99]).To(Equal(lastBin)) + }) + + It("throws an error if the starting block is higher than the ending block", func() { + var startingBlock uint64 = 10102 + var endingBlock uint64 = 10101 + var batchSize uint64 = 100 + _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("ending block number needs to be greater than starting block number")) + }) + + It("throws an error if the batch size is zero", func() { + var startingBlock uint64 = 1 + var endingBlock uint64 = 10101 + var batchSize uint64 = 0 + _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("batchsize needs to be greater than zero")) + }) +}) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 922d9399..d4cd3067 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -33,7 +33,7 @@ import ( type IStorageWatcher interface { AddTransformers(initializers []transformer.StorageTransformerInitializer) Execute(queueRecheckInterval time.Duration, backFillOn bool) - BackFill(backFiller storage.BackFiller) + BackFill(startingBlock uint64, backFiller storage.BackFiller) } type StorageWatcher struct { @@ -44,7 +44,7 @@ type StorageWatcher struct { DiffsChan chan utils.StorageDiff ErrsChan chan error BackFillDoneChan chan bool - StartingSyncBlockChan chan int + StartingSyncBlockChan chan uint64 } func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher { @@ -55,7 +55,7 @@ func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatch StorageFetcher: f, DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize), ErrsChan: make(chan error), - StartingSyncBlockChan: make(chan int), + StartingSyncBlockChan: make(chan uint64), BackFillDoneChan: make(chan bool), Queue: queue, KeccakAddressTransformers: transformers, @@ -70,10 +70,10 @@ func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer } // BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher -func (storageWatcher *StorageWatcher) BackFill(backFiller storage.BackFiller) { +func (storageWatcher *StorageWatcher) BackFill(startingBlock uint64, backFiller storage.BackFiller) { // this blocks until the Execute process sends us the first block number it sees - startingSyncBlock := <-storageWatcher.StartingSyncBlockChan - backFillInitErr := backFiller.BackFill(uint64(startingSyncBlock), + endBackFillBlock := <-storageWatcher.StartingSyncBlockChan + backFillInitErr := backFiller.BackFill(startingBlock, endBackFillBlock, storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan) if backFillInitErr != nil { logrus.Warn(backFillInitErr) @@ -91,7 +91,7 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error())) case diff := <-storageWatcher.DiffsChan: if start && backFillOn { - storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1 + storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1) start = false } storageWatcher.processRow(diff) diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 820cc45e..c2cc44dd 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -20,6 +20,7 @@ import ( "errors" "io/ioutil" "os" + "sort" "time" "github.com/ethereum/go-ethereum/common" @@ -48,7 +49,6 @@ var _ = Describe("Storage Watcher", func() { Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer)) }) }) - Describe("Execute", func() { var ( mockFetcher *mocks.StorageFetcher @@ -104,9 +104,11 @@ var _ = Describe("Storage Watcher", func() { It("executes transformer for recognized storage diff", func(done Done) { go storageWatcher.Execute(time.Hour, false) - Eventually(func() []utils.StorageDiff { + Eventually(func() map[int]utils.StorageDiff { return mockTransformer.PassedDiffs - }).Should(Equal([]utils.StorageDiff{csvDiff})) + }).Should(Equal(map[int]utils.StorageDiff{ + csvDiff.ID: csvDiff, + })) close(done) }) @@ -120,7 +122,7 @@ var _ = Describe("Storage Watcher", func() { }).Should(BeTrue()) Eventually(func() utils.StorageDiff { if len(mockQueue.AddPassedDiffs) > 0 { - return mockQueue.AddPassedDiffs[0] + return mockQueue.AddPassedDiffs[csvDiff.ID] } return utils.StorageDiff{} }).Should(Equal(csvDiff)) @@ -150,7 +152,9 @@ var _ = Describe("Storage Watcher", func() { Describe("transforming queued storage diffs", func() { BeforeEach(func() { - mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} + mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ + csvDiff.ID: csvDiff, + } storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) @@ -161,7 +165,7 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() utils.StorageDiff { if len(mockTransformer.PassedDiffs) > 0 { - return mockTransformer.PassedDiffs[0] + return mockTransformer.PassedDiffs[csvDiff.ID] } return utils.StorageDiff{} }).Should(Equal(csvDiff)) @@ -201,7 +205,9 @@ var _ = Describe("Storage Watcher", func() { ID: csvDiff.ID + 1, HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ + obsoleteDiff.ID: obsoleteDiff, + } go storageWatcher.Execute(time.Nanosecond, false) @@ -219,7 +225,9 @@ var _ = Describe("Storage Watcher", func() { ID: csvDiff.ID + 1, HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} + mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ + obsoleteDiff.ID: obsoleteDiff, + } mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) @@ -239,20 +247,28 @@ var _ = Describe("Storage Watcher", func() { Describe("BackFill", func() { var ( - mockFetcher *mocks.StorageFetcher - mockBackFiller *mocks.BackFiller - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - mockTransformer2 *mocks.MockStorageTransformer - mockTransformer3 *mocks.MockStorageTransformer - csvDiff utils.StorageDiff - storageWatcher *watcher.StorageWatcher - hashedAddress common.Hash + mockFetcher *mocks.StorageFetcher + mockBackFiller *mocks.BackFiller + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + mockTransformer2 *mocks.MockStorageTransformer + mockTransformer3 *mocks.MockStorageTransformer + csvDiff utils.StorageDiff + storageWatcher *watcher.StorageWatcher + hashedAddress common.Hash + createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff ) BeforeEach(func() { + createdDiff = test_data.CreatedExpectedStorageDiff + createdDiff.ID = 1333 + updatedDiff1 = test_data.UpdatedExpectedStorageDiff + updatedDiff1.ID = 1334 + deletedDiff = test_data.DeletedExpectedStorageDiff + deletedDiff.ID = 1335 + updatedDiff2 = test_data.UpdatedExpectedStorageDiff2 + updatedDiff2.ID = 1336 mockBackFiller = new(mocks.BackFiller) - mockBackFiller.StartingBlock = test_data.BlockNumber.Uint64() hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") mockFetcher = mocks.NewStorageFetcher() mockQueue = &mocks.MockStorageQueue{} @@ -269,16 +285,16 @@ var _ = Describe("Storage Watcher", func() { } }) - Describe("transforming streamed and backfilled queued storage diffs", func() { + Describe("transforming streamed and backfilled storage diffs", func() { BeforeEach(func() { mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ - test_data.CreatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff, - test_data.DeletedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, + createdDiff, + updatedDiff1, + deletedDiff, + updatedDiff2, }) - mockQueue.DiffsToReturn = []utils.StorageDiff{} + mockQueue.DiffsToReturn = map[int]utils.StorageDiff{} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{ @@ -289,7 +305,7 @@ var _ = Describe("Storage Watcher", func() { }) It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) { - go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller) go storageWatcher.Execute(time.Hour, true) Eventually(func() int { @@ -303,18 +319,18 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() int { return len(mockTransformer3.PassedDiffs) }).Should(Equal(3)) - Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) - Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) + Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) + Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) + Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) close(done) }) It("adds diffs to the queue if transformation fails", func(done Done) { mockTransformer3.ExecuteErr = fakes.FakeError - go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller) go storageWatcher.Execute(time.Hour, true) Eventually(func() int { @@ -330,22 +346,23 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) - Eventually(func() []utils.StorageDiff { + Eventually(func() map[int]utils.StorageDiff { if len(mockQueue.AddPassedDiffs) > 2 { return mockQueue.AddPassedDiffs } - return []utils.StorageDiff{} - }).Should(Equal([]utils.StorageDiff{ - test_data.UpdatedExpectedStorageDiff, - test_data.DeletedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, + return map[int]utils.StorageDiff{} + }).Should(Equal(map[int]utils.StorageDiff{ + updatedDiff1.ID: updatedDiff1, + deletedDiff.ID: deletedDiff, + updatedDiff2.ID: updatedDiff2, })) - Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) - Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) + Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) + Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) + Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) + Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) close(done) }) @@ -362,7 +379,7 @@ var _ = Describe("Storage Watcher", func() { errors.New("mock backfiller error"), } - go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller) go storageWatcher.Execute(time.Hour, true) Eventually(func() int { @@ -387,7 +404,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller) go storageWatcher.Execute(time.Hour, true) Eventually(func() (string, error) { @@ -398,17 +415,14 @@ var _ = Describe("Storage Watcher", func() { }) }) - Describe("transforms storage diffs", func() { + Describe("transforms queued storage diffs", func() { BeforeEach(func() { - test_data.CreatedExpectedStorageDiff.ID = 1334 - test_data.UpdatedExpectedStorageDiff.ID = 1335 - test_data.DeletedExpectedStorageDiff.ID = 1336 - test_data.UpdatedExpectedStorageDiff2.ID = 1337 - mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff, - test_data.CreatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff, - test_data.DeletedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, + mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ + csvDiff.ID: csvDiff, + createdDiff.ID: createdDiff, + updatedDiff1.ID: updatedDiff1, + deletedDiff.ID: deletedDiff, + updatedDiff2.ID: updatedDiff2, } storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue @@ -420,7 +434,7 @@ var _ = Describe("Storage Watcher", func() { }) It("executes transformers on queued storage diffs", func(done Done) { - go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller) go storageWatcher.Execute(time.Nanosecond, true) Eventually(func() int { @@ -435,26 +449,29 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() bool { return mockQueue.GetAllCalled }).Should(BeTrue()) + sortedExpectedIDs := []int{ + csvDiff.ID, + createdDiff.ID, + updatedDiff1.ID, + deletedDiff.ID, + updatedDiff2.ID, + } + sort.Ints(sortedExpectedIDs) Eventually(func() []int { if len(mockQueue.DeletePassedIds) > 4 { + sort.Ints(mockQueue.DeletePassedIds) return mockQueue.DeletePassedIds } return []int{} - }).Should(Equal([]int{ - csvDiff.ID, - test_data.CreatedExpectedStorageDiff.ID, - test_data.UpdatedExpectedStorageDiff.ID, - test_data.DeletedExpectedStorageDiff.ID, - test_data.UpdatedExpectedStorageDiff2.ID, - })) + }).Should(Equal(sortedExpectedIDs)) Expect(mockQueue.AddCalled).To(Not(BeTrue())) Expect(len(mockQueue.DiffsToReturn)).To(Equal(0)) - Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) - Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) - Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) + Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) + Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) + Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) + Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) close(done) }) }) diff --git a/pkg/eth/eth_suite_test.go b/pkg/eth/eth_suite_test.go index 6271dca1..b6c30dfa 100644 --- a/pkg/eth/eth_suite_test.go +++ b/pkg/eth/eth_suite_test.go @@ -25,5 +25,5 @@ import ( func TestGeth(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "eth Suite") + RunSpecs(t, "Eth Suite") }