Merge pull request #157 from vulcanize/statediff_at

(VDB-933) Backfill for storage diff transformer
This commit is contained in:
Ian Norden 2019-12-02 12:56:26 -06:00 committed by GitHub
commit 1a208546d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 1585 additions and 223 deletions

View File

@ -17,10 +17,6 @@
package cmd package cmd
import ( import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"os" "os"
"plugin" "plugin"
syn "sync" syn "sync"
@ -29,7 +25,10 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
@ -186,12 +185,11 @@ func composeAndExecute() {
log.Debug("fetching storage diffs from geth pub sub") log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients() rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer)
storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(sw, &wg)
default: default:
log.Debug("fetching storage diffs from csv") log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
@ -199,7 +197,7 @@ func composeAndExecute() {
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(sw, &wg)
} }
} }

View File

@ -18,13 +18,13 @@ package cmd
import ( import (
"fmt" "fmt"
"github.com/vulcanize/vulcanizedb/pkg/config"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
st "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" st "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/config"
ft "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer" ft "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer"
ht "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/transformer" ht "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/transformer"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"

View File

@ -18,21 +18,21 @@ package cmd
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"plugin" "plugin"
syn "sync" syn "sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
) )
@ -41,28 +41,22 @@ var executeCmd = &cobra.Command{
Use: "execute", Use: "execute",
Short: "executes a precomposed transformer initializer plugin", Short: "executes a precomposed transformer initializer plugin",
Long: `This command needs a config .toml file of form: Long: `This command needs a config .toml file of form:
[database] [database]
name = "vulcanize_public" name = "vulcanize_public"
hostname = "localhost" hostname = "localhost"
user = "vulcanize" user = "vulcanize"
password = "vulcanize" password = "vulcanize"
port = 5432 port = 5432
[client] [client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc" ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter] [exporter]
name = "exampleTransformerExporter" name = "exampleTransformerExporter"
Note: If any of the plugin transformer need additional Note: If any of the plugin transformer need additional
configuration variables include them in the .toml file as well configuration variables include them in the .toml file as well
The exporter.name is the name (without extension) of the plugin to be loaded. The exporter.name is the name (without extension) of the plugin to be loaded.
The plugin file needs to be located in the /plugins directory and this command assumes The plugin file needs to be located in the /plugins directory and this command assumes
the db migrations remain from when the plugin was composed. Additionally, the plugin the db migrations remain from when the plugin was composed. Additionally, the plugin
must have been composed by the same version of vulcanizedb or else it will not be compatible. must have been composed by the same version of vulcanizedb or else it will not be compatible.
Specify config location when executing the command: Specify config location when executing the command:
./vulcanizedb execute --config=./environments/config_name.toml`, ./vulcanizedb execute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
@ -128,14 +122,13 @@ func execute() {
switch storageDiffsSource { switch storageDiffsSource {
case "geth": case "geth":
log.Debug("fetching storage diffs from geth pub sub") log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients() wsClient := getWSClient()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) stateDiffStreamer := streamer.NewStateDiffStreamer(wsClient)
payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer)
storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(sw, &wg)
default: default:
log.Debug("fetching storage diffs from csv") log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
@ -143,7 +136,7 @@ func execute() {
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers) sw.AddTransformers(ethStorageInitializers)
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(sw, &wg)
} }
} }
@ -186,13 +179,20 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher // Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers") LogWithCommand.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval) on := viper.GetBool("storageBackFill.on")
defer ticker.Stop() if on {
for range ticker.C { backFillStorage(w)
errs := make(chan error)
diffs := make(chan storageUtils.StorageDiff)
w.Execute(diffs, errs, queueRecheckInterval)
} }
w.Execute(queueRecheckInterval, on)
}
func backFillStorage(w watcher.IStorageWatcher) {
rpcClient, _ := getClients()
// find min deployment block
minDeploymentBlock := constants.GetMinDeploymentBlock()
stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient)
backFiller := storage.NewStorageBackFiller(stateDiffFetcher, storage.DefaultMaxBatchSize)
go w.BackFill(minDeploymentBlock, backFiller)
} }
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"errors"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -28,6 +29,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/eth" "github.com/vulcanize/vulcanizedb/pkg/eth"
"github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc"
@ -174,3 +176,15 @@ func getClients() (client.RPCClient, *ethclient.Client) {
return rpcClient, ethClient return rpcClient, ethClient
} }
func getWSClient() core.RPCClient {
wsRPCpath := viper.GetString("client.wsPath")
if wsRPCpath == "" {
LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided"))
}
wsRPCClient, dialErr := rpc.Dial(wsRPCpath)
if dialErr != nil {
LogWithCommand.Fatal(dialErr)
}
return client.NewRPCClient(wsRPCClient, wsRPCpath)
}

View File

@ -100,6 +100,7 @@ The config provides information for composing a set of transformers from externa
[client] [client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc" ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
wsPath = "ws://127.0.0.1:8546"
[exporter] [exporter]
home = "github.com/vulcanize/vulcanizedb" home = "github.com/vulcanize/vulcanizedb"
@ -160,6 +161,7 @@ The config provides information for composing a set of transformers from externa
- don't leave gaps - don't leave gaps
- transformers with identical migrations/migration paths should share the same rank - transformers with identical migrations/migration paths should share the same rank
- Note: If any of the imported transformers need additional config variables those need to be included as well - Note: If any of the imported transformers need additional config variables those need to be included as well
- Note: If the storage transformers are processing storage diffs from geth, we need to configure the websocket endpoint `client.wsPath` for them
This information is used to write and build a Go plugin which exports the configured transformers. This information is used to write and build a Go plugin which exports the configured transformers.
These transformers are loaded onto their specified watchers and executed. These transformers are loaded onto their specified watchers and executed.
@ -194,3 +196,17 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
} }
} }
``` ```
### Storage backfilling
Storage transformers stream data from a geth subscription or parity csv file where the storage diffs are produced and emitted as the
full sync progresses. If the transformers have missed consuming a range of diffs due to lag in the startup of the processes or due to misalignment of the sync,
we can configure our storage transformers to backfill missing diffs from a [modified archival geth client](https://github.com/vulcanize/go-ethereum/tree/statediff_at).
To do so, add the following fields to the config file.
```toml
[storageBackFill]
on = false
```
- `on` is set to `true` to turn the backfill process on
This process uses the regular `client.ipcPath` rpc path, it assumes that it is either an http or ipc path that supports the `StateDiffAt` endpoint.

View File

@ -19,10 +19,11 @@ package chunker_test
import ( import (
"testing" "testing"
"io/ioutil"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil"
) )
func TestFactories(t *testing.T) { func TestFactories(t *testing.T) {

View File

@ -17,10 +17,11 @@
package chunker package chunker
import ( import (
"strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"strings"
) )
type Chunker interface { type Chunker interface {

View File

@ -0,0 +1,80 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package constants
import (
"fmt"
"math"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
var initialized = false
func initConfig() {
if initialized {
return
}
if err := viper.ReadInConfig(); err == nil {
log.Info("Using config file:", viper.ConfigFileUsed())
} else {
panic(fmt.Sprintf("Could not find environment file: %v", err))
}
initialized = true
}
// GetMinDeploymentBlock gets the minimum deployment block for multiple contracts from config
func GetMinDeploymentBlock() uint64 {
initConfig()
contractNames := getContractNames()
if len(contractNames) < 1 {
log.Fatalf("No contracts supplied")
}
minBlock := uint64(math.MaxUint64)
for c := range contractNames {
deployed := getDeploymentBlock(c)
if deployed < minBlock {
minBlock = deployed
}
}
return minBlock
}
func getContractNames() map[string]bool {
transformerNames := viper.GetStringSlice("exporter.transformerNames")
contractNames := make(map[string]bool)
for _, transformerName := range transformerNames {
configKey := "exporter." + transformerName + ".contracts"
names := viper.GetStringSlice(configKey)
for _, name := range names {
contractNames[name] = true
}
}
return contractNames
}
func getDeploymentBlock(contractName string) uint64 {
configKey := "contract." + contractName + ".deployed"
value := viper.GetInt64(configKey)
if value < 0 {
log.Infof("No deployment block configured for contract \"%v\", defaulting to 0.", contractName)
return 0
}
return uint64(value)
}

View File

@ -19,10 +19,11 @@ package event_test
import ( import (
"testing" "testing"
"io/ioutil"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil"
) )
func TestFactories(t *testing.T) { func TestFactories(t *testing.T) {

View File

@ -19,9 +19,10 @@ package event
import ( import (
"database/sql/driver" "database/sql/driver"
"fmt" "fmt"
"github.com/vulcanize/vulcanizedb/utils"
"strings" "strings"
"github.com/vulcanize/vulcanizedb/utils"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )

View File

@ -18,6 +18,8 @@ package event_test
import ( import (
"fmt" "fmt"
"math/big"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
@ -26,7 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
"math/big"
) )
var _ = Describe("Repository", func() { var _ = Describe("Repository", func() {

View File

@ -17,6 +17,8 @@
package event_test package event_test
import ( import (
"math/rand"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
@ -25,7 +27,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"math/rand"
) )
var _ = Describe("Transformer", func() { var _ = Describe("Transformer", func() {

View File

@ -17,10 +17,11 @@
package fetcher package fetcher
import ( import (
"strings"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/pkg/fs"
"strings"
) )
type CsvTailStorageFetcher struct { type CsvTailStorageFetcher struct {

View File

@ -18,6 +18,9 @@ package fetcher_test
import ( import (
"fmt" "fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -25,8 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"strings"
"time"
) )
var _ = Describe("Csv Tail Storage Fetcher", func() { var _ = Describe("Csv Tail Storage Fetcher", func() {

View File

@ -19,10 +19,11 @@ package fetcher_test
import ( import (
"testing" "testing"
"io/ioutil"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil"
) )
func TestFactories(t *testing.T) { func TestFactories(t *testing.T) {

View File

@ -1,42 +1,50 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher package fetcher
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
) )
const (
PayloadChanBufferSize = 20000 // the max eth sub buffer size
)
type GethRPCStorageFetcher struct { type GethRPCStorageFetcher struct {
statediffPayloadChan chan statediff.Payload StatediffPayloadChan chan statediff.Payload
streamer streamer.Streamer streamer streamer.Streamer
} }
func NewGethRPCStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRPCStorageFetcher { func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher {
return GethRPCStorageFetcher{ return GethRPCStorageFetcher{
statediffPayloadChan: statediffPayloadChan, StatediffPayloadChan: make(chan statediff.Payload, PayloadChanBufferSize),
streamer: streamer, streamer: streamer,
} }
} }
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
ethStatediffPayloadChan := fetcher.statediffPayloadChan ethStatediffPayloadChan := fetcher.StatediffPayloadChan
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan)
if clientSubErr != nil { if clientSubErr != nil {
errs <- clientSubErr errs <- clientSubErr
@ -54,28 +62,25 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
errs <- decodeErr errs <- decodeErr
} }
accounts := getAccountsFromDiff(*stateDiff) accounts := utils.GetAccountsFromDiff(*stateDiff)
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber)) logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
for _, account := range accounts { for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
for _, storage := range account.Storage { for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
errs <- formatErr
continue
}
logrus.Trace("adding storage diff to out channel", logrus.Trace("adding storage diff to out channel",
"keccak of address: ", diff.HashedAddress.Hex(), "keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight, "block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(), "storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex()) "storage value: ", diff.StorageValue.Hex())
if formatErr != nil {
errs <- formatErr
}
out <- diff out <- diff
} }
} }
} }
} }
func getAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff {
accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
return append(accounts, stateDiff.DeletedAccounts...)
}

View File

@ -1,16 +1,18 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // This program is free software: you can redistribute it and/or modify
// You may obtain a copy of the License at // it under the terms of the GNU Affero General Public License as published by
// // the Free Software Foundation, either version 3 of the License, or
// http://www.apache.org/licenses/LICENSE-2.0 // (at your option) any later version.
//
// Unless required by applicable law or agreed to in writing, software // This program is distributed in the hope that it will be useful,
// distributed under the License is distributed on an "AS IS" BASIS, // but WITHOUT ANY WARRANTY; without even the implied warranty of
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// See the License for the specific language governing permissions and // GNU Affero General Public License for more details.
// limitations under the License.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher_test package fetcher_test
@ -21,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
@ -56,15 +59,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa
var _ = Describe("Geth RPC Storage Fetcher", func() { var _ = Describe("Geth RPC Storage Fetcher", func() {
var streamer MockStoragediffStreamer var streamer MockStoragediffStreamer
var statediffPayloadChan chan statediff.Payload
var statediffFetcher fetcher.GethRPCStorageFetcher var statediffFetcher fetcher.GethRPCStorageFetcher
var storagediffChan chan utils.StorageDiff var storagediffChan chan utils.StorageDiff
var errorChan chan error var errorChan chan error
BeforeEach(func() { BeforeEach(func() {
streamer = MockStoragediffStreamer{} streamer = MockStoragediffStreamer{}
statediffPayloadChan = make(chan statediff.Payload, 1) statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer)
statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer, statediffPayloadChan)
storagediffChan = make(chan utils.StorageDiff) storagediffChan = make(chan utils.StorageDiff)
errorChan = make(chan error) errorChan = make(chan error)
}) })
@ -88,9 +89,9 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan)
streamedPayload := <-statediffPayloadChan streamedPayload := <-statediffFetcher.StatediffPayloadChan
Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload)) Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload))
Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan)) Expect(streamer.PassedPayloadChan).To(Equal(statediffFetcher.StatediffPayloadChan))
close(done) close(done)
}) })

View File

@ -0,0 +1,79 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher
import (
"fmt"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
)
// StateDiffFetcher is the state diff fetching interface
type StateDiffFetcher interface {
FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error)
}
// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
type BatchClient interface {
BatchCall(batch []client.BatchElem) error
}
// stateDiffFetcher is the state diff fetching struct
type stateDiffFetcher struct {
// stateDiffFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state
// http.Client is thread-safe
client BatchClient
}
const method = "statediff_stateDiffAt"
// NewStateDiffFetcher returns a IStateDiffFetcher
func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher {
return &stateDiffFetcher{
client: bc,
}
}
// FetchStateDiffsAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
batch := make([]client.BatchElem, 0)
for _, height := range blockHeights {
batch = append(batch, client.BatchElem{
Method: method,
Args: []interface{}{height},
Result: new(statediff.Payload),
})
}
batchErr := fetcher.client.BatchCall(batch)
if batchErr != nil {
return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error())
}
results := make([]statediff.Payload, 0, len(blockHeights))
for _, batchElem := range batch {
if batchElem.Error != nil {
return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error())
}
payload, ok := batchElem.Result.(*statediff.Payload)
if ok {
results = append(results, *payload)
}
}
return results, nil
}

View File

@ -0,0 +1,54 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
)
var _ = Describe("StateDiffFetcher", func() {
Describe("FetchStateDiffsAt", func() {
var (
mc *mocks.BackFillerClient
stateDiffFetcher fetcher.StateDiffFetcher
)
BeforeEach(func() {
mc = new(mocks.BackFillerClient)
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
Expect(setDiffAtErr1).ToNot(HaveOccurred())
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
Expect(setDiffAtErr2).ToNot(HaveOccurred())
stateDiffFetcher = fetcher.NewStateDiffFetcher(mc)
})
It("Batch calls statediff_stateDiffAt", func() {
blockHeights := []uint64{
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
}
stateDiffPayloads, fetchErr := stateDiffFetcher.FetchStateDiffsAt(blockHeights)
Expect(fetchErr).ToNot(HaveOccurred())
Expect(len(stateDiffPayloads)).To(Equal(2))
Expect(stateDiffPayloads[0]).To(Equal(test_data.MockStatediffPayload))
Expect(stateDiffPayloads[1]).To(Equal(test_data.MockStatediffPayload2))
})
})
})

View File

@ -18,6 +18,7 @@ package logs
import ( import (
"errors" "errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/chunker" "github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"

View File

@ -17,6 +17,8 @@
package logs_test package logs_test
import ( import (
"strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -27,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"strings"
) )
var _ = Describe("Log delegator", func() { var _ = Describe("Log delegator", func() {

View File

@ -18,6 +18,7 @@ package logs
import ( import (
"errors" "errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"

View File

@ -17,6 +17,8 @@
package logs_test package logs_test
import ( import (
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -27,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"math/rand"
) )
var _ = Describe("Log extractor", func() { var _ = Describe("Log extractor", func() {

View File

@ -17,10 +17,11 @@
package logs_test package logs_test
import ( import (
"github.com/sirupsen/logrus"
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/sirupsen/logrus"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )

View File

@ -0,0 +1,58 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"errors"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
// BackFiller mock for tests
type BackFiller struct {
StorageDiffsToReturn []utils.StorageDiff
BackFillErrs []error
PassedEndingBlock uint64
}
// SetStorageDiffsToReturn for tests
func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) {
backFiller.StorageDiffsToReturn = diffs
}
// BackFill mock method
func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
if endingBlock < startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
backFiller.PassedEndingBlock = endingBlock
go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) {
errLen := len(backFiller.BackFillErrs)
for i, diff := range backFiller.StorageDiffsToReturn {
if i < errLen {
err := backFiller.BackFillErrs[i]
if err != nil {
errChan <- err
continue
}
}
backFill <- diff
}
done <- true
}(backFill, errChan, done)
return nil
}

View File

@ -0,0 +1,64 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
)
// BackFillerClient is a mock client for use in backfiller tests
type BackFillerClient struct {
MappedStateDiffAt map[uint64][]byte
}
// SetReturnDiffAt method to set what statediffs the mock client returns
func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
if mc.MappedStateDiffAt == nil {
mc.MappedStateDiffAt = make(map[uint64][]byte)
}
by, err := json.Marshal(diffPayload)
if err != nil {
return err
}
mc.MappedStateDiffAt[height] = by
return nil
}
// BatchCall mockClient method to simulate batch call to geth
func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}
for _, batchElem := range batch {
if len(batchElem.Args) != 1 {
return errors.New("expected batch elem to contain single argument")
}
blockHeight, ok := batchElem.Args[0].(uint64)
if !ok {
return errors.New("expected batch elem argument to be a uint64")
}
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,50 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"errors"
"sync/atomic"
"github.com/ethereum/go-ethereum/statediff"
)
// StateDiffFetcher mock for tests
type StateDiffFetcher struct {
PayloadsToReturn map[uint64]statediff.Payload
FetchErrs map[uint64]error
CalledAtBlockHeights [][]uint64
CalledTimes int64
}
// FetchStateDiffsAt mock method
func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
if fetcher.PayloadsToReturn == nil {
return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
}
atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment
fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
results := make([]statediff.Payload, 0, len(blockHeights))
for _, height := range blockHeights {
results = append(results, fetcher.PayloadsToReturn[height])
err, ok := fetcher.FetchErrs[height]
if ok && err != nil {
return nil, err
}
}
return results, nil
}

View File

@ -0,0 +1,43 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
)
// StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
PassedPayloadChan chan statediff.Payload
ReturnSub *rpc.ClientSubscription
ReturnErr error
StreamPayloads []statediff.Payload
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
sds.PassedPayloadChan = payloadChan
go func() {
for _, payload := range sds.StreamPayloads {
sds.PassedPayloadChan <- payload
}
}()
return sds.ReturnSub, sds.ReturnErr
}

View File

@ -16,20 +16,23 @@
package mocks package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
type MockStorageFetcher struct { // StorageFetcher is a mock fetcher for use in tests with backfilling
type StorageFetcher struct {
DiffsToReturn []utils.StorageDiff DiffsToReturn []utils.StorageDiff
ErrsToReturn []error ErrsToReturn []error
} }
func NewMockStorageFetcher() *MockStorageFetcher { // NewStorageFetcher returns a new StorageFetcher
return &MockStorageFetcher{} func NewStorageFetcher() *StorageFetcher {
return &StorageFetcher{}
} }
func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { // FetchStorageDiffs mock method
defer close(out) func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
defer close(errs)
for _, err := range fetcher.ErrsToReturn { for _, err := range fetcher.ErrsToReturn {
errs <- err errs <- err
} }

View File

@ -20,27 +20,41 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
) )
// MockStorageQueue for tests
type MockStorageQueue struct { type MockStorageQueue struct {
AddCalled bool AddCalled bool
AddError error AddError error
AddPassedDiff utils.StorageDiff AddPassedDiffs map[int]utils.StorageDiff
DeleteErr error DeleteErr error
DeletePassedID int DeletePassedIds []int
GetAllErr error GetAllErr error
DiffsToReturn []utils.StorageDiff DiffsToReturn map[int]utils.StorageDiff
GetAllCalled bool
} }
// Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
queue.AddCalled = true queue.AddCalled = true
queue.AddPassedDiff = diff if queue.AddPassedDiffs == nil {
queue.AddPassedDiffs = make(map[int]utils.StorageDiff)
}
queue.AddPassedDiffs[diff.ID] = diff
return queue.AddError return queue.AddError
} }
// Delete mock method
func (queue *MockStorageQueue) Delete(id int) error { func (queue *MockStorageQueue) Delete(id int) error {
queue.DeletePassedID = id queue.DeletePassedIds = append(queue.DeletePassedIds, id)
delete(queue.DiffsToReturn, id)
return queue.DeleteErr return queue.DeleteErr
} }
// GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
return queue.DiffsToReturn, queue.GetAllErr queue.GetAllCalled = true
diffs := make([]utils.StorageDiff, 0)
for _, diff := range queue.DiffsToReturn {
diffs = append(diffs, diff)
}
return diffs, queue.GetAllErr
} }

View File

@ -18,26 +18,34 @@ package mocks
import ( import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
// MockStorageTransformer for tests
type MockStorageTransformer struct { type MockStorageTransformer struct {
KeccakOfAddress common.Hash KeccakOfAddress common.Hash
ExecuteErr error ExecuteErr error
PassedDiff utils.StorageDiff PassedDiffs map[int]utils.StorageDiff
} }
// Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error {
transformer.PassedDiff = diff if transformer.PassedDiffs == nil {
transformer.PassedDiffs = make(map[int]utils.StorageDiff)
}
transformer.PassedDiffs[diff.ID] = diff
return transformer.ExecuteErr return transformer.ExecuteErr
} }
// KeccakContractAddress mock method
func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash { func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash {
return transformer.KeccakOfAddress return transformer.KeccakOfAddress
} }
// FakeTransformerInitializer mock method
func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer { func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer {
return transformer return transformer
} }

View File

@ -17,14 +17,14 @@
package repository_test package repository_test
import ( import (
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"strings" "strings"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"

View File

@ -19,10 +19,11 @@ package repository_test
import ( import (
"testing" "testing"
"io/ioutil"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil"
) )
func TestFactories(t *testing.T) { func TestFactories(t *testing.T) {

View File

@ -0,0 +1,149 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
const (
DefaultMaxBatchSize uint64 = 100
defaultMaxBatchNumber int64 = 10
)
// BackFiller is the backfilling interface
type BackFiller interface {
BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error
}
// backFiller is the backfilling struct
type backFiller struct {
fetcher fetcher.StateDiffFetcher
batchSize uint64
startingBlock uint64
}
// NewStorageBackFiller returns a BackFiller
func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) BackFiller {
if batchSize == 0 {
batchSize = DefaultMaxBatchSize
}
return &backFiller{
fetcher: fetcher,
batchSize: batchSize,
}
}
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
// break the range up into bins of smaller ranges
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize)
if err != nil {
return err
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
processingDone := make(chan [2]uint64)
forwardDone := make(chan bool)
// for each block range bin spin up a goroutine to batch fetch and process state diffs in that range
go func() {
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber {
// this blocks until a process signals it has finished
<-forwardDone
}
go bf.backFillRange(blockHeights, backFill, errChan, processingDone)
}
}()
// goroutine that listens on the processingDone chan
// keeps track of the number of processing goroutines that have finished
// when they have all finished, sends the final signal out
go func() {
goroutinesFinished := 0
for {
select {
case doneWithHeights := <-processingDone:
atomic.AddInt64(&activeCount, -1)
select {
// if we are waiting for a process to finish, signal that one has
case forwardDone <- true:
default:
}
logrus.Infof("finished fetching gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1])
goroutinesFinished++
if goroutinesFinished >= len(blockRangeBins) {
done <- true
return
}
}
}
}()
return nil
}
func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) {
payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil {
errChan <- fetchErr
}
for _, payload := range payloads {
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr
continue
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
errChan <- formatErr
continue
}
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
diffChan <- diff
}
}
}
// when this is done, send out a signal
doneChan <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]}
}

View File

@ -0,0 +1,237 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage_test
import (
"errors"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
)
var _ = Describe("BackFiller", func() {
Describe("BackFill", func() {
var (
mockFetcher *mocks.StateDiffFetcher
backFiller storage.BackFiller
)
BeforeEach(func() {
mockFetcher = new(mocks.StateDiffFetcher)
mockFetcher.PayloadsToReturn = map[uint64]statediff.Payload{
test_data.BlockNumber.Uint64(): test_data.MockStatediffPayload,
test_data.BlockNumber2.Uint64(): test_data.MockStatediffPayload2,
}
})
It("batch calls statediff_stateDiffAt", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, 100)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(1)))
Expect(len(diffs)).To(Equal(4))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
})
It("has a configurable batch size", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(len(diffs)).To(Equal(4))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
})
It("handles bin numbers in excess of the goroutine limit (100)", func() {
payloadsToReturn := make(map[uint64]statediff.Payload, 1001)
for i := test_data.BlockNumber.Uint64(); i <= test_data.BlockNumber.Uint64()+1000; i++ {
payloadsToReturn[i] = test_data.MockStatediffPayload
}
mockFetcher.PayloadsToReturn = payloadsToReturn
// batch size of 2 with 1001 block range => 501 bins
backFiller = storage.NewStorageBackFiller(mockFetcher, 2)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber.Uint64()+1000,
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(501)))
Expect(len(diffs)).To(Equal(3003))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
})
It("passes fetcher errors forward", func() {
mockFetcher.FetchErrs = map[uint64]error{
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
}
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var numOfErrs int
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("mock fetcher error"))
numOfErrs++
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(numOfErrs).To(Equal(1))
Expect(len(diffs)).To(Equal(1))
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
mockFetcher.FetchErrs = map[uint64]error{
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"),
}
mockFetcher.CalledTimes = 0
backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill = make(chan utils.StorageDiff)
done = make(chan bool)
errChan = make(chan error)
backFillInitErr = backFiller.BackFill(
test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
numOfErrs = 0
diffs = []utils.StorageDiff{}
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("mock fetcher error"))
numOfErrs++
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(numOfErrs).To(Equal(2))
Expect(len(diffs)).To(Equal(0))
})
})
})
func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool {
for _, d := range diffs {
if d == diff {
return true
}
}
return false
}

View File

@ -19,10 +19,11 @@ package storage_test
import ( import (
"testing" "testing"
"io/ioutil"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil"
) )
func TestFactories(t *testing.T) { func TestFactories(t *testing.T) {

View File

@ -0,0 +1,45 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils
import "errors"
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
}
if batchSize == 0 {
return nil, errors.New("backfill: batchsize needs to be greater than zero")
}
length := endingBlock - startingBlock + 1
numberOfBins := length / batchSize
remainder := length % batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := startingBlock + batchSize
blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ {
blockRange = append(blockRange, j)
}
startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
return blockRangeBins, nil
}

View File

@ -0,0 +1,66 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
var _ = Describe("GetBlockHeightBins", func() {
It("splits a block range up into bins", func() {
var startingBlock uint64 = 1
var endingBlock uint64 = 10101
var batchSize uint64 = 100
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).ToNot(HaveOccurred())
Expect(len(blockRangeBins)).To(Equal(102))
Expect(blockRangeBins[101]).To(Equal([]uint64{10101}))
startingBlock = 101
endingBlock = 10100
batchSize = 100
lastBin := make([]uint64, 0)
for i := 10001; i <= 10100; i++ {
lastBin = append(lastBin, uint64(i))
}
blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).ToNot(HaveOccurred())
Expect(len(blockRangeBins)).To(Equal(100))
Expect(blockRangeBins[99]).To(Equal(lastBin))
})
It("throws an error if the starting block is higher than the ending block", func() {
var startingBlock uint64 = 10102
var endingBlock uint64 = 10101
var batchSize uint64 = 100
_, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ending block number needs to be greater than starting block number"))
})
It("throws an error if the batch size is zero", func() {
var startingBlock uint64 = 1
var endingBlock uint64 = 10101
var batchSize uint64 = 0
_, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("batchsize needs to be greater than zero"))
})
})

View File

@ -17,11 +17,12 @@
package utils package utils
import ( import (
"strconv"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"strconv"
) )
const ExpectedRowLength = 5 const ExpectedRowLength = 5
@ -71,3 +72,8 @@ func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.State
func HexToKeccak256Hash(hex string) common.Hash { func HexToKeccak256Hash(hex string) common.Hash {
return crypto.Keccak256Hash(common.FromHex(hex)) return crypto.Keccak256Hash(common.FromHex(hex))
} }
func GetAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff {
accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
return append(accounts, stateDiff.DeletedAccounts...)
}

View File

@ -17,6 +17,9 @@
package utils_test package utils_test
import ( import (
"math/big"
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
@ -25,8 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"math/big"
"math/rand"
) )
var _ = Describe("Storage row parsing", func() { var _ = Describe("Storage row parsing", func() {

View File

@ -17,9 +17,10 @@
package utils package utils
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"math/big"
) )
const ( const (

View File

@ -17,10 +17,11 @@
package utils_test package utils_test
import ( import (
"github.com/sirupsen/logrus"
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/sirupsen/logrus"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )

View File

@ -1,16 +1,18 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // This program is free software: you can redistribute it and/or modify
// You may obtain a copy of the License at // it under the terms of the GNU Affero General Public License as published by
// // the Free Software Foundation, either version 3 of the License, or
// http://www.apache.org/licenses/LICENSE-2.0 // (at your option) any later version.
//
// Unless required by applicable law or agreed to in writing, software // This program is distributed in the hope that it will be useful,
// distributed under the License is distributed on an "AS IS" BASIS, // but WITHOUT ANY WARRANTY; without even the implied warranty of
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// See the License for the specific language governing permissions and // GNU Affero General Public License for more details.
// limitations under the License.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package streamer package streamer
@ -18,24 +20,29 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
// Streamer is the interface for streaming a statediff subscription
type Streamer interface { type Streamer interface {
Stream(chan statediff.Payload) (*rpc.ClientSubscription, error) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
} }
// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
type StateDiffStreamer struct { type StateDiffStreamer struct {
client core.RPCClient Client core.RPCClient
} }
func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { // NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the IStateDiffStreamer interface
logrus.Info("streaming diffs from geth") func NewStateDiffStreamer(client core.RPCClient) Streamer {
return streamer.client.Subscribe("statediff", payloadChan, "stream") return &StateDiffStreamer{
} Client: client,
func NewStateDiffStreamer(client core.RPCClient) StateDiffStreamer {
return StateDiffStreamer{
client: client,
} }
} }
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
logrus.Info("streaming diffs from geth")
return sds.Client.Subscribe("statediff", payloadChan, "stream")
}

View File

@ -17,13 +17,14 @@
package test_data package test_data
import ( import (
"math/rand"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"math/rand"
"time"
) )
var startingBlockNumber = rand.Int63() var startingBlockNumber = rand.Int63()

View File

@ -15,20 +15,23 @@
package test_data package test_data
import ( import (
"errors" "math/big"
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"math/big" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"math/rand"
) )
var ( var (
BlockNumber = big.NewInt(rand.Int63()) BlockNumber = big.NewInt(rand.Int63())
BlockNumber2 = big.NewInt(0).Add(BlockNumber, big.NewInt(1))
BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73" BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"
BlockHash2 = "0xaa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f72"
CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
NewNonceValue = rand.Uint64() NewNonceValue = rand.Uint64()
NewBalanceValue = rand.Int63() NewBalanceValue = rand.Int63()
@ -44,14 +47,13 @@ var (
Proof: [][]byte{}, Proof: [][]byte{},
}} }}
LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000") LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
LargeStorageValueRlp, rlpErr = rlp.EncodeToBytes(LargeStorageValue) LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue)
storageWithLargeValue = []statediff.StorageDiff{{ storageWithLargeValue = []statediff.StorageDiff{{
Key: StorageKey, Key: StorageKey,
Value: LargeStorageValueRlp, Value: LargeStorageValueRlp,
Path: StoragePath, Path: StoragePath,
Proof: [][]byte{}, Proof: [][]byte{},
}} }}
EmptyStorage = make([]statediff.StorageDiff, 0)
StorageWithBadValue = statediff.StorageDiff{ StorageWithBadValue = statediff.StorageDiff{
Key: StorageKey, Key: StorageKey,
Value: []byte{0, 1, 2}, Value: []byte{0, 1, 2},
@ -83,6 +85,11 @@ var (
Value: valueBytes, Value: valueBytes,
Storage: storageWithLargeValue, Storage: storageWithLargeValue,
}} }}
UpdatedAccountDiffs2 = []statediff.AccountDiff{{
Key: AnotherContractLeafKey.Bytes(),
Value: valueBytes,
Storage: storageWithSmallValue,
}}
DeletedAccountDiffs = []statediff.AccountDiff{{ DeletedAccountDiffs = []statediff.AccountDiff{{
Key: AnotherContractLeafKey.Bytes(), Key: AnotherContractLeafKey.Bytes(),
@ -97,7 +104,15 @@ var (
DeletedAccounts: DeletedAccountDiffs, DeletedAccounts: DeletedAccountDiffs,
UpdatedAccounts: UpdatedAccountDiffs, UpdatedAccounts: UpdatedAccountDiffs,
} }
MockStateDiff2 = statediff.StateDiff{
BlockNumber: BlockNumber2,
BlockHash: common.HexToHash(BlockHash2),
CreatedAccounts: nil,
DeletedAccounts: nil,
UpdatedAccounts: UpdatedAccountDiffs2,
}
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2)
mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil)
mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil)
@ -114,24 +129,57 @@ var (
TxHash: common.HexToHash("0x0"), TxHash: common.HexToHash("0x0"),
ReceiptHash: common.HexToHash("0x0"), ReceiptHash: common.HexToHash("0x0"),
} }
MockHeader2 = types.Header{
Time: 0,
Number: BlockNumber2,
Root: common.HexToHash("0x1"),
TxHash: common.HexToHash("0x1"),
ReceiptHash: common.HexToHash("0x1"),
}
MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts)
MockBlock2 = types.NewBlock(&MockHeader2, MockTransactions, nil, MockReceipts)
MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock)
MockBlockRlp2, _ = rlp.EncodeToBytes(MockBlock2)
MockStatediffPayload = statediff.Payload{ MockStatediffPayload = statediff.Payload{
BlockRlp: MockBlockRlp, BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes, StateDiffRlp: MockStateDiffBytes,
Err: nil, }
MockStatediffPayload2 = statediff.Payload{
BlockRlp: MockBlockRlp2,
StateDiffRlp: MockStateDiff2Bytes,
} }
EmptyStatediffPayload = statediff.Payload{ CreatedExpectedStorageDiff = utils.StorageDiff{
BlockRlp: []byte{}, ID: 0,
StateDiffRlp: []byte{}, HashedAddress: common.BytesToHash(ContractLeafKey[:]),
Err: nil, BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue),
} }
UpdatedExpectedStorageDiff = utils.StorageDiff{
ErrStatediffPayload = statediff.Payload{ ID: 0,
BlockRlp: []byte{}, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
StateDiffRlp: []byte{}, BlockHash: common.HexToHash(BlockHash),
Err: errors.New("mock error"), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(LargeStorageValue),
}
UpdatedExpectedStorageDiff2 = utils.StorageDiff{
ID: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash2),
BlockHeight: int(BlockNumber2.Int64()),
StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue),
}
DeletedExpectedStorageDiff = utils.StorageDiff{
ID: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue),
} }
) )

View File

@ -1,13 +1,14 @@
package test_data package test_data
import ( import (
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"math/rand"
) )
// Create a header sync log to reference in an event, returning inserted header sync log // Create a header sync log to reference in an event, returning inserted header sync log

View File

@ -17,7 +17,10 @@
package watcher package watcher
import ( import (
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/chunker" "github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
@ -27,7 +30,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"time"
) )
const NoNewDataPause = time.Second * 7 const NoNewDataPause = time.Second * 7

View File

@ -18,6 +18,7 @@ package watcher_test
import ( import (
"errors" "errors"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"

View File

@ -18,19 +18,22 @@ package watcher
import ( import (
"fmt" "fmt"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"time"
) )
type IStorageWatcher interface { type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer) AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) Execute(queueRecheckInterval time.Duration, backFillOn bool)
BackFill(startingBlock uint64, backFiller storage.BackFiller)
} }
type StorageWatcher struct { type StorageWatcher struct {
@ -38,42 +41,69 @@ type StorageWatcher struct {
StorageFetcher fetcher.IStorageFetcher StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
DiffsChan chan utils.StorageDiff
ErrsChan chan error
BackFillDoneChan chan bool
StartingSyncBlockChan chan uint64
} }
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher {
queue := storage.NewStorageQueue(db) queue := storage.NewStorageQueue(db)
transformers := make(map[common.Hash]transformer.StorageTransformer) transformers := make(map[common.Hash]transformer.StorageTransformer)
return StorageWatcher{ return &StorageWatcher{
db: db, db: db,
StorageFetcher: fetcher, StorageFetcher: f,
DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize),
ErrsChan: make(chan error),
StartingSyncBlockChan: make(chan uint64),
BackFillDoneChan: make(chan bool),
Queue: queue, Queue: queue,
KeccakAddressTransformers: transformers, KeccakAddressTransformers: transformers,
} }
} }
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
for _, initializer := range initializers { for _, initializer := range initializers {
storageTransformer := initializer(storageWatcher.db) storageTransformer := initializer(storageWatcher.db)
storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer
} }
} }
func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) { // BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher
func (storageWatcher *StorageWatcher) BackFill(startingBlock uint64, backFiller storage.BackFiller) {
// this blocks until the Execute process sends us the first block number it sees
endBackFillBlock := <-storageWatcher.StartingSyncBlockChan
backFillInitErr := backFiller.BackFill(startingBlock, endBackFillBlock,
storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan)
if backFillInitErr != nil {
logrus.Warn(backFillInitErr)
}
}
// Execute runs the StorageWatcher processes
func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration, backFillOn bool) {
ticker := time.NewTicker(queueRecheckInterval) ticker := time.NewTicker(queueRecheckInterval)
go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan) go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan)
start := true
for { for {
select { select {
case fetchErr := <-errsChan: case fetchErr := <-storageWatcher.ErrsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error()))
case diff := <-diffsChan: case diff := <-storageWatcher.DiffsChan:
if start && backFillOn {
storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1)
start = false
}
storageWatcher.processRow(diff) storageWatcher.processRow(diff)
case <-ticker.C: case <-ticker.C:
storageWatcher.processQueue() storageWatcher.processQueue()
case <-storageWatcher.BackFillDoneChan:
logrus.Info("storage watcher backfill process has finished")
} }
} }
} }
func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress]
return storageTransformer, ok return storageTransformer, ok
} }

View File

@ -17,19 +17,24 @@
package watcher_test package watcher_test
import ( import (
"errors"
"io/ioutil"
"os"
"sort"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
"io/ioutil"
"os"
"time"
) )
var _ = Describe("Storage Watcher", func() { var _ = Describe("Storage Watcher", func() {
@ -37,31 +42,26 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() { It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress}
w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w := watcher.NewStorageWatcher(mocks.NewStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer)) Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer))
}) })
}) })
Describe("Execute", func() { Describe("Execute", func() {
var ( var (
errs chan error mockFetcher *mocks.StorageFetcher
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff csvDiff utils.StorageDiff
diffs chan utils.StorageDiff storageWatcher *watcher.StorageWatcher
storageWatcher watcher.StorageWatcher
hashedAddress common.Hash hashedAddress common.Hash
) )
BeforeEach(func() { BeforeEach(func() {
errs = make(chan error)
diffs = make(chan utils.StorageDiff)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewMockStorageFetcher() mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{ csvDiff = utils.StorageDiff{
@ -84,7 +84,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
@ -102,25 +102,29 @@ var _ = Describe("Storage Watcher", func() {
}) })
It("executes transformer for recognized storage diff", func(done Done) { It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() utils.StorageDiff { Eventually(func() map[int]utils.StorageDiff {
return mockTransformer.PassedDiff return mockTransformer.PassedDiffs
}).Should(Equal(csvDiff)) }).Should(Equal(map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
}))
close(done) close(done)
}) })
It("queues diff for later processing if transformer execution fails", func(done Done) { It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Expect(<-errs).To(BeNil())
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
}).Should(BeTrue()) }).Should(BeTrue())
Eventually(func() utils.StorageDiff { Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff if len(mockQueue.AddPassedDiffs) > 0 {
return mockQueue.AddPassedDiffs[csvDiff.ID]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(csvDiff))
close(done) close(done)
}) })
@ -133,7 +137,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
@ -148,26 +152,34 @@ var _ = Describe("Storage Watcher", func() {
Describe("transforming queued storage diffs", func() { Describe("transforming queued storage diffs", func() {
BeforeEach(func() { BeforeEach(func() {
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
}) })
It("executes transformer for storage diff", func(done Done) { It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() utils.StorageDiff { Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff if len(mockTransformer.PassedDiffs) > 0 {
return mockTransformer.PassedDiffs[csvDiff.ID]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(csvDiff))
close(done) close(done)
}) })
It("deletes diff from queue if transformer execution successful", func(done Done) { It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int {
return mockQueue.DeletePassedID if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0]
}
return 0
}).Should(Equal(csvDiff.ID)) }).Should(Equal(csvDiff.ID))
close(done) close(done)
}) })
@ -179,7 +191,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
@ -193,12 +205,17 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1, ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
} }
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
}
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int {
return mockQueue.DeletePassedID if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0]
}
return 0
}).Should(Equal(obsoleteDiff.ID)) }).Should(Equal(obsoleteDiff.ID))
close(done) close(done)
}) })
@ -208,14 +225,16 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1, ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
} }
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
}
mockQueue.DeleteErr = fakes.FakeError mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred()) Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
@ -225,4 +244,236 @@ var _ = Describe("Storage Watcher", func() {
}) })
}) })
}) })
Describe("BackFill", func() {
var (
mockFetcher *mocks.StorageFetcher
mockBackFiller *mocks.BackFiller
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
mockTransformer2 *mocks.MockStorageTransformer
mockTransformer3 *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash
createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff
)
BeforeEach(func() {
createdDiff = test_data.CreatedExpectedStorageDiff
createdDiff.ID = 1333
updatedDiff1 = test_data.UpdatedExpectedStorageDiff
updatedDiff1.ID = 1334
deletedDiff = test_data.DeletedExpectedStorageDiff
deletedDiff.ID = 1335
updatedDiff2 = test_data.UpdatedExpectedStorageDiff2
updatedDiff2.ID = 1336
mockBackFiller = new(mocks.BackFiller)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])}
mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])}
csvDiff = utils.StorageDiff{
ID: 1337,
HashedAddress: hashedAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
Describe("transforming streamed and backfilled storage diffs", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
createdDiff,
updatedDiff1,
deletedDiff,
updatedDiff2,
})
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
mockTransformer.FakeTransformerInitializer,
mockTransformer2.FakeTransformerInitializer,
mockTransformer3.FakeTransformerInitializer,
})
})
It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) {
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
It("adds diffs to the queue if transformation fails", func(done Done) {
mockTransformer3.ExecuteErr = fakes.FakeError
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3))
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() map[int]utils.StorageDiff {
if len(mockQueue.AddPassedDiffs) > 2 {
return mockQueue.AddPassedDiffs
}
return map[int]utils.StorageDiff{}
}).Should(Equal(map[int]utils.StorageDiff{
updatedDiff1.ID: updatedDiff1,
deletedDiff.ID: deletedDiff,
updatedDiff2.ID: updatedDiff2,
}))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
It("logs a backfill error", func(done Done) {
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
mockBackFiller.BackFillErrs = []error{
nil,
nil,
nil,
errors.New("mock backfiller error"),
}
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(2))
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring("mock backfiller error"))
close(done)
})
It("logs when backfill finishes", func(done Done) {
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring("storage watcher backfill process has finished"))
close(done)
})
})
Describe("transforms queued storage diffs", func() {
BeforeEach(func() {
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
csvDiff.ID: csvDiff,
createdDiff.ID: createdDiff,
updatedDiff1.ID: updatedDiff1,
deletedDiff.ID: deletedDiff,
updatedDiff2.ID: updatedDiff2,
}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
mockTransformer.FakeTransformerInitializer,
mockTransformer2.FakeTransformerInitializer,
mockTransformer3.FakeTransformerInitializer,
})
})
It("executes transformers on queued storage diffs", func(done Done) {
go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
go storageWatcher.Execute(time.Nanosecond, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3))
Eventually(func() bool {
return mockQueue.GetAllCalled
}).Should(BeTrue())
sortedExpectedIDs := []int{
csvDiff.ID,
createdDiff.ID,
updatedDiff1.ID,
deletedDiff.ID,
updatedDiff2.ID,
}
sort.Ints(sortedExpectedIDs)
Eventually(func() []int {
if len(mockQueue.DeletePassedIds) > 4 {
sort.Ints(mockQueue.DeletePassedIds)
return mockQueue.DeletePassedIds
}
return []int{}
}).Should(Equal(sortedExpectedIDs))
Expect(mockQueue.AddCalled).To(Not(BeTrue()))
Expect(len(mockQueue.DiffsToReturn)).To(Equal(0))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done)
})
})
})
}) })

View File

@ -17,10 +17,11 @@
package watcher_test package watcher_test
import ( import (
log "github.com/sirupsen/logrus"
"io/ioutil" "io/ioutil"
"testing" "testing"
log "github.com/sirupsen/logrus"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )

View File

@ -20,6 +20,7 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"math"
"strings" "strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -105,7 +106,7 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling
tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs
tr.Start = 100000000000 tr.Start = math.MaxInt64
// Iterate through all internal contract addresses // Iterate through all internal contract addresses
for contractAddr := range tr.Config.Addresses { for contractAddr := range tr.Config.Addresses {

View File

@ -25,5 +25,5 @@ import (
func TestGeth(t *testing.T) { func TestGeth(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "eth Suite") RunSpecs(t, "Eth Suite")
} }

View File

@ -18,17 +18,15 @@ package node
import ( import (
"context" "context"
"strconv"
"regexp" "regexp"
"strconv"
"log" "strings"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"strings"
) )
type IPropertiesReader interface { type IPropertiesReader interface {
@ -101,7 +99,7 @@ func (reader PropertiesReader) NetworkID() float64 {
var version string var version string
err := reader.client.CallContext(context.Background(), &version, "net_version") err := reader.client.CallContext(context.Background(), &version, "net_version")
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
networkID, _ := strconv.ParseFloat(version, 64) networkID, _ := strconv.ParseFloat(version, 64)
return networkID return networkID
@ -111,13 +109,19 @@ func (reader PropertiesReader) GenesisBlock() string {
var header *types.Header var header *types.Header
blockZero := "0x0" blockZero := "0x0"
includeTransactions := false includeTransactions := false
reader.client.CallContext(context.Background(), &header, "eth_getBlockByNumber", blockZero, includeTransactions) err := reader.client.CallContext(context.Background(), &header, "eth_getBlockByNumber", blockZero, includeTransactions)
if err != nil {
log.Error(err)
}
return header.Hash().Hex() return header.Hash().Hex()
} }
func (reader PropertiesReader) NodeInfo() (string, string) { func (reader PropertiesReader) NodeInfo() (string, string) {
var info p2p.NodeInfo var info p2p.NodeInfo
reader.client.CallContext(context.Background(), &info, "admin_nodeInfo") err := reader.client.CallContext(context.Background(), &info, "admin_nodeInfo")
if err != nil {
log.Error(err)
}
return info.ID, info.Name return info.ID, info.Name
} }
@ -137,14 +141,20 @@ func (client GanacheClient) NodeInfo() (string, string) {
func (client ParityClient) parityNodeInfo() string { func (client ParityClient) parityNodeInfo() string {
var nodeInfo core.ParityNodeInfo var nodeInfo core.ParityNodeInfo
client.client.CallContext(context.Background(), &nodeInfo, "parity_versionInfo") err := client.client.CallContext(context.Background(), &nodeInfo, "parity_versionInfo")
if err != nil {
log.Error(err)
}
return nodeInfo.String() return nodeInfo.String()
} }
func (client ParityClient) parityID() string { func (client ParityClient) parityID() string {
var enodeID = regexp.MustCompile(`^enode://(.+)@.+$`) var enodeID = regexp.MustCompile(`^enode://(.+)@.+$`)
var enodeURL string var enodeURL string
client.client.CallContext(context.Background(), &enodeURL, "parity_enode") err := client.client.CallContext(context.Background(), &enodeURL, "parity_enode")
if err != nil {
log.Error(err)
}
enode := enodeID.FindStringSubmatch(enodeURL) enode := enodeID.FindStringSubmatch(enodeURL)
if len(enode) < 2 { if len(enode) < 2 {
return "" return ""