diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go
index cb244e17..4784b92c 100644
--- a/cmd/composeAndExecute.go
+++ b/cmd/composeAndExecute.go
@@ -17,10 +17,6 @@
package cmd
import (
- "github.com/ethereum/go-ethereum/statediff"
- "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
- "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
- "github.com/vulcanize/vulcanizedb/pkg/fs"
"os"
"plugin"
syn "sync"
@@ -29,7 +25,10 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
+ "github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils"
@@ -186,12 +185,11 @@ func composeAndExecute() {
log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
- payloadChan := make(chan statediff.Payload)
- storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan)
+ storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
- go watchEthStorage(&sw, &wg)
+ go watchEthStorage(sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
@@ -199,7 +197,7 @@ func composeAndExecute() {
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
- go watchEthStorage(&sw, &wg)
+ go watchEthStorage(sw, &wg)
}
}
diff --git a/cmd/contractWatcher.go b/cmd/contractWatcher.go
index 2e2a8814..11238869 100644
--- a/cmd/contractWatcher.go
+++ b/cmd/contractWatcher.go
@@ -18,13 +18,13 @@ package cmd
import (
"fmt"
- "github.com/vulcanize/vulcanizedb/pkg/config"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
st "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
ft "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer"
ht "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/transformer"
"github.com/vulcanize/vulcanizedb/utils"
diff --git a/cmd/execute.go b/cmd/execute.go
index c68b7e08..e23cfcbc 100644
--- a/cmd/execute.go
+++ b/cmd/execute.go
@@ -18,21 +18,21 @@ package cmd
import (
"fmt"
- "github.com/ethereum/go-ethereum/statediff"
- "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
- "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
- "github.com/vulcanize/vulcanizedb/pkg/fs"
"plugin"
syn "sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
+ "github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
- storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
+ "github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils"
)
@@ -41,28 +41,22 @@ var executeCmd = &cobra.Command{
Use: "execute",
Short: "executes a precomposed transformer initializer plugin",
Long: `This command needs a config .toml file of form:
-
[database]
name = "vulcanize_public"
hostname = "localhost"
user = "vulcanize"
password = "vulcanize"
port = 5432
-
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
-
[exporter]
name = "exampleTransformerExporter"
-
Note: If any of the plugin transformer need additional
configuration variables include them in the .toml file as well
-
The exporter.name is the name (without extension) of the plugin to be loaded.
The plugin file needs to be located in the /plugins directory and this command assumes
the db migrations remain from when the plugin was composed. Additionally, the plugin
must have been composed by the same version of vulcanizedb or else it will not be compatible.
-
Specify config location when executing the command:
./vulcanizedb execute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) {
@@ -128,14 +122,13 @@ func execute() {
switch storageDiffsSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
- rpcClient, _ := getClients()
- stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
- payloadChan := make(chan statediff.Payload)
- storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan)
+ wsClient := getWSClient()
+ stateDiffStreamer := streamer.NewStateDiffStreamer(wsClient)
+ storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
- go watchEthStorage(&sw, &wg)
+ go watchEthStorage(sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
@@ -143,7 +136,7 @@ func execute() {
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
- go watchEthStorage(&sw, &wg)
+ go watchEthStorage(sw, &wg)
}
}
@@ -186,13 +179,20 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers")
- ticker := time.NewTicker(pollingInterval)
- defer ticker.Stop()
- for range ticker.C {
- errs := make(chan error)
- diffs := make(chan storageUtils.StorageDiff)
- w.Execute(diffs, errs, queueRecheckInterval)
+ on := viper.GetBool("storageBackFill.on")
+ if on {
+ backFillStorage(w)
}
+ w.Execute(queueRecheckInterval, on)
+}
+
+func backFillStorage(w watcher.IStorageWatcher) {
+ rpcClient, _ := getClients()
+ // find min deployment block
+ minDeploymentBlock := constants.GetMinDeploymentBlock()
+ stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient)
+ backFiller := storage.NewStorageBackFiller(stateDiffFetcher, storage.DefaultMaxBatchSize)
+ go w.BackFill(minDeploymentBlock, backFiller)
}
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
diff --git a/cmd/root.go b/cmd/root.go
index cd8e4fd1..6b45ea52 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -17,6 +17,7 @@
package cmd
import (
+ "errors"
"fmt"
"strings"
"time"
@@ -28,6 +29,7 @@ import (
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/config"
+ "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/eth"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc"
@@ -174,3 +176,15 @@ func getClients() (client.RPCClient, *ethclient.Client) {
return rpcClient, ethClient
}
+
+func getWSClient() core.RPCClient {
+ wsRPCpath := viper.GetString("client.wsPath")
+ if wsRPCpath == "" {
+ LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided"))
+ }
+ wsRPCClient, dialErr := rpc.Dial(wsRPCpath)
+ if dialErr != nil {
+ LogWithCommand.Fatal(dialErr)
+ }
+ return client.NewRPCClient(wsRPCClient, wsRPCpath)
+}
diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md
index c4782c54..2fe2a9f7 100644
--- a/documentation/custom-transformers.md
+++ b/documentation/custom-transformers.md
@@ -100,6 +100,7 @@ The config provides information for composing a set of transformers from externa
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
+ wsPath = "ws://127.0.0.1:8546"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
@@ -160,6 +161,7 @@ The config provides information for composing a set of transformers from externa
- don't leave gaps
- transformers with identical migrations/migration paths should share the same rank
- Note: If any of the imported transformers need additional config variables those need to be included as well
+- Note: If the storage transformers are processing storage diffs from geth, we need to configure the websocket endpoint `client.wsPath` for them
This information is used to write and build a Go plugin which exports the configured transformers.
These transformers are loaded onto their specified watchers and executed.
@@ -194,3 +196,17 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
}
}
```
+
+### Storage backfilling
+Storage transformers stream data from a geth subscription or parity csv file where the storage diffs are produced and emitted as the
+full sync progresses. If the transformers have missed consuming a range of diffs due to lag in the startup of the processes or due to misalignment of the sync,
+we can configure our storage transformers to backfill missing diffs from a [modified archival geth client](https://github.com/vulcanize/go-ethereum/tree/statediff_at).
+
+To do so, add the following fields to the config file.
+```toml
+[storageBackFill]
+ on = false
+```
+- `on` is set to `true` to turn the backfill process on
+
+This process uses the regular `client.ipcPath` rpc path, it assumes that it is either an http or ipc path that supports the `StateDiffAt` endpoint.
\ No newline at end of file
diff --git a/libraries/shared/chunker/chunker_suite_test.go b/libraries/shared/chunker/chunker_suite_test.go
index 0c5d4562..b2189eef 100644
--- a/libraries/shared/chunker/chunker_suite_test.go
+++ b/libraries/shared/chunker/chunker_suite_test.go
@@ -19,10 +19,11 @@ package chunker_test
import (
"testing"
+ "io/ioutil"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
- "io/ioutil"
)
func TestFactories(t *testing.T) {
diff --git a/libraries/shared/chunker/log_chunker.go b/libraries/shared/chunker/log_chunker.go
index 38edd5af..67771ab6 100644
--- a/libraries/shared/chunker/log_chunker.go
+++ b/libraries/shared/chunker/log_chunker.go
@@ -17,10 +17,11 @@
package chunker
import (
+ "strings"
+
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
- "strings"
)
type Chunker interface {
diff --git a/libraries/shared/constants/external.go b/libraries/shared/constants/external.go
new file mode 100644
index 00000000..156d13c7
--- /dev/null
+++ b/libraries/shared/constants/external.go
@@ -0,0 +1,80 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package constants
+
+import (
+ "fmt"
+ "math"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+)
+
+var initialized = false
+
+func initConfig() {
+ if initialized {
+ return
+ }
+
+ if err := viper.ReadInConfig(); err == nil {
+ log.Info("Using config file:", viper.ConfigFileUsed())
+ } else {
+ panic(fmt.Sprintf("Could not find environment file: %v", err))
+ }
+ initialized = true
+}
+
+// GetMinDeploymentBlock gets the minimum deployment block for multiple contracts from config
+func GetMinDeploymentBlock() uint64 {
+ initConfig()
+ contractNames := getContractNames()
+ if len(contractNames) < 1 {
+ log.Fatalf("No contracts supplied")
+ }
+ minBlock := uint64(math.MaxUint64)
+ for c := range contractNames {
+ deployed := getDeploymentBlock(c)
+ if deployed < minBlock {
+ minBlock = deployed
+ }
+ }
+ return minBlock
+}
+
+func getContractNames() map[string]bool {
+ transformerNames := viper.GetStringSlice("exporter.transformerNames")
+ contractNames := make(map[string]bool)
+ for _, transformerName := range transformerNames {
+ configKey := "exporter." + transformerName + ".contracts"
+ names := viper.GetStringSlice(configKey)
+ for _, name := range names {
+ contractNames[name] = true
+ }
+ }
+ return contractNames
+}
+
+func getDeploymentBlock(contractName string) uint64 {
+ configKey := "contract." + contractName + ".deployed"
+ value := viper.GetInt64(configKey)
+ if value < 0 {
+ log.Infof("No deployment block configured for contract \"%v\", defaulting to 0.", contractName)
+ return 0
+ }
+ return uint64(value)
+}
diff --git a/libraries/shared/factories/event/factories_suite_test.go b/libraries/shared/factories/event/factories_suite_test.go
index f2687b75..d205475e 100644
--- a/libraries/shared/factories/event/factories_suite_test.go
+++ b/libraries/shared/factories/event/factories_suite_test.go
@@ -19,10 +19,11 @@ package event_test
import (
"testing"
+ "io/ioutil"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
- "io/ioutil"
)
func TestFactories(t *testing.T) {
diff --git a/libraries/shared/factories/event/repository.go b/libraries/shared/factories/event/repository.go
index c8fd397c..a4bb5b3e 100644
--- a/libraries/shared/factories/event/repository.go
+++ b/libraries/shared/factories/event/repository.go
@@ -19,9 +19,10 @@ package event
import (
"database/sql/driver"
"fmt"
- "github.com/vulcanize/vulcanizedb/utils"
"strings"
+ "github.com/vulcanize/vulcanizedb/utils"
+
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
diff --git a/libraries/shared/factories/event/repository_test.go b/libraries/shared/factories/event/repository_test.go
index 93dfb789..10f3c0e9 100644
--- a/libraries/shared/factories/event/repository_test.go
+++ b/libraries/shared/factories/event/repository_test.go
@@ -18,6 +18,8 @@ package event_test
import (
"fmt"
+ "math/big"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
@@ -26,7 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
- "math/big"
)
var _ = Describe("Repository", func() {
diff --git a/libraries/shared/factories/event/transformer_test.go b/libraries/shared/factories/event/transformer_test.go
index 0b452409..dad7d138 100644
--- a/libraries/shared/factories/event/transformer_test.go
+++ b/libraries/shared/factories/event/transformer_test.go
@@ -17,6 +17,8 @@
package event_test
import (
+ "math/rand"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
@@ -25,7 +27,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
- "math/rand"
)
var _ = Describe("Transformer", func() {
diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher.go b/libraries/shared/fetcher/csv_tail_storage_fetcher.go
index 01dc3caa..2205ef38 100644
--- a/libraries/shared/fetcher/csv_tail_storage_fetcher.go
+++ b/libraries/shared/fetcher/csv_tail_storage_fetcher.go
@@ -17,10 +17,11 @@
package fetcher
import (
+ "strings"
+
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fs"
- "strings"
)
type CsvTailStorageFetcher struct {
diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go
index eaca0eb3..cf0f06f5 100644
--- a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go
+++ b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go
@@ -18,6 +18,9 @@ package fetcher_test
import (
"fmt"
+ "strings"
+ "time"
+
"github.com/ethereum/go-ethereum/common"
"github.com/hpcloud/tail"
. "github.com/onsi/ginkgo"
@@ -25,8 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
- "strings"
- "time"
)
var _ = Describe("Csv Tail Storage Fetcher", func() {
diff --git a/libraries/shared/fetcher/fetcher_suite_test.go b/libraries/shared/fetcher/fetcher_suite_test.go
index e1bb832a..691fb502 100644
--- a/libraries/shared/fetcher/fetcher_suite_test.go
+++ b/libraries/shared/fetcher/fetcher_suite_test.go
@@ -19,10 +19,11 @@ package fetcher_test
import (
"testing"
+ "io/ioutil"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
- "io/ioutil"
)
func TestFactories(t *testing.T) {
diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
index 6168b81c..570a1f46 100644
--- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
+++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
@@ -1,42 +1,50 @@
-// Copyright 2019 Vulcanize
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
package fetcher
import (
"fmt"
+
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
)
+const (
+ PayloadChanBufferSize = 20000 // the max eth sub buffer size
+)
+
type GethRPCStorageFetcher struct {
- statediffPayloadChan chan statediff.Payload
+ StatediffPayloadChan chan statediff.Payload
streamer streamer.Streamer
}
-func NewGethRPCStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRPCStorageFetcher {
+func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher {
return GethRPCStorageFetcher{
- statediffPayloadChan: statediffPayloadChan,
+ StatediffPayloadChan: make(chan statediff.Payload, PayloadChanBufferSize),
streamer: streamer,
}
}
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
- ethStatediffPayloadChan := fetcher.statediffPayloadChan
+ ethStatediffPayloadChan := fetcher.StatediffPayloadChan
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan)
if clientSubErr != nil {
errs <- clientSubErr
@@ -54,28 +62,25 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
errs <- decodeErr
}
- accounts := getAccountsFromDiff(*stateDiff)
+ accounts := utils.GetAccountsFromDiff(*stateDiff)
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
for _, account := range accounts {
- logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
+ logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
+ if formatErr != nil {
+ logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
+ errs <- formatErr
+ continue
+ }
logrus.Trace("adding storage diff to out channel",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
- if formatErr != nil {
- errs <- formatErr
- }
out <- diff
}
}
}
}
-
-func getAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff {
- accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
- return append(accounts, stateDiff.DeletedAccounts...)
-}
diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
index ca87e1b3..becfaeb2 100644
--- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
+++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
@@ -1,16 +1,18 @@
-// Copyright 2019 Vulcanize
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
package fetcher_test
@@ -21,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
@@ -56,15 +59,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa
var _ = Describe("Geth RPC Storage Fetcher", func() {
var streamer MockStoragediffStreamer
- var statediffPayloadChan chan statediff.Payload
var statediffFetcher fetcher.GethRPCStorageFetcher
var storagediffChan chan utils.StorageDiff
var errorChan chan error
BeforeEach(func() {
streamer = MockStoragediffStreamer{}
- statediffPayloadChan = make(chan statediff.Payload, 1)
- statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer, statediffPayloadChan)
+ statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer)
storagediffChan = make(chan utils.StorageDiff)
errorChan = make(chan error)
})
@@ -88,9 +89,9 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan)
- streamedPayload := <-statediffPayloadChan
+ streamedPayload := <-statediffFetcher.StatediffPayloadChan
Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload))
- Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan))
+ Expect(streamer.PassedPayloadChan).To(Equal(statediffFetcher.StatediffPayloadChan))
close(done)
})
diff --git a/libraries/shared/fetcher/state_diff_fetcher.go b/libraries/shared/fetcher/state_diff_fetcher.go
new file mode 100644
index 00000000..fa5ebdfc
--- /dev/null
+++ b/libraries/shared/fetcher/state_diff_fetcher.go
@@ -0,0 +1,79 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package fetcher
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/statediff"
+
+ "github.com/vulcanize/vulcanizedb/pkg/eth/client"
+)
+
+// StateDiffFetcher is the state diff fetching interface
+type StateDiffFetcher interface {
+ FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error)
+}
+
+// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
+type BatchClient interface {
+ BatchCall(batch []client.BatchElem) error
+}
+
+// stateDiffFetcher is the state diff fetching struct
+type stateDiffFetcher struct {
+ // stateDiffFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state
+ // http.Client is thread-safe
+ client BatchClient
+}
+
+const method = "statediff_stateDiffAt"
+
+// NewStateDiffFetcher returns a IStateDiffFetcher
+func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher {
+ return &stateDiffFetcher{
+ client: bc,
+ }
+}
+
+// FetchStateDiffsAt fetches the statediff payloads at the given block heights
+// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
+func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
+ batch := make([]client.BatchElem, 0)
+ for _, height := range blockHeights {
+ batch = append(batch, client.BatchElem{
+ Method: method,
+ Args: []interface{}{height},
+ Result: new(statediff.Payload),
+ })
+ }
+ batchErr := fetcher.client.BatchCall(batch)
+ if batchErr != nil {
+ return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error())
+ }
+ results := make([]statediff.Payload, 0, len(blockHeights))
+ for _, batchElem := range batch {
+ if batchElem.Error != nil {
+ return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error())
+ }
+ payload, ok := batchElem.Result.(*statediff.Payload)
+ if ok {
+ results = append(results, *payload)
+ }
+ }
+ return results, nil
+}
diff --git a/libraries/shared/fetcher/state_diff_fetcher_test.go b/libraries/shared/fetcher/state_diff_fetcher_test.go
new file mode 100644
index 00000000..96c1dc8d
--- /dev/null
+++ b/libraries/shared/fetcher/state_diff_fetcher_test.go
@@ -0,0 +1,54 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package fetcher_test
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
+)
+
+var _ = Describe("StateDiffFetcher", func() {
+ Describe("FetchStateDiffsAt", func() {
+ var (
+ mc *mocks.BackFillerClient
+ stateDiffFetcher fetcher.StateDiffFetcher
+ )
+ BeforeEach(func() {
+ mc = new(mocks.BackFillerClient)
+ setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
+ Expect(setDiffAtErr1).ToNot(HaveOccurred())
+ setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
+ Expect(setDiffAtErr2).ToNot(HaveOccurred())
+ stateDiffFetcher = fetcher.NewStateDiffFetcher(mc)
+ })
+ It("Batch calls statediff_stateDiffAt", func() {
+ blockHeights := []uint64{
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber2.Uint64(),
+ }
+ stateDiffPayloads, fetchErr := stateDiffFetcher.FetchStateDiffsAt(blockHeights)
+ Expect(fetchErr).ToNot(HaveOccurred())
+ Expect(len(stateDiffPayloads)).To(Equal(2))
+ Expect(stateDiffPayloads[0]).To(Equal(test_data.MockStatediffPayload))
+ Expect(stateDiffPayloads[1]).To(Equal(test_data.MockStatediffPayload2))
+ })
+ })
+})
diff --git a/libraries/shared/logs/delegator.go b/libraries/shared/logs/delegator.go
index 79b398ad..35b31601 100644
--- a/libraries/shared/logs/delegator.go
+++ b/libraries/shared/logs/delegator.go
@@ -18,6 +18,7 @@ package logs
import (
"errors"
+
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
diff --git a/libraries/shared/logs/delegator_test.go b/libraries/shared/logs/delegator_test.go
index 43813be8..9a56e831 100644
--- a/libraries/shared/logs/delegator_test.go
+++ b/libraries/shared/logs/delegator_test.go
@@ -17,6 +17,8 @@
package logs_test
import (
+ "strings"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
@@ -27,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
- "strings"
)
var _ = Describe("Log delegator", func() {
diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go
index a28802ae..89f5dec6 100644
--- a/libraries/shared/logs/extractor.go
+++ b/libraries/shared/logs/extractor.go
@@ -18,6 +18,7 @@ package logs
import (
"errors"
+
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go
index f3b623d2..954baf29 100644
--- a/libraries/shared/logs/extractor_test.go
+++ b/libraries/shared/logs/extractor_test.go
@@ -17,6 +17,8 @@
package logs_test
import (
+ "math/rand"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
@@ -27,7 +29,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
- "math/rand"
)
var _ = Describe("Log extractor", func() {
diff --git a/libraries/shared/logs/logs_suite_test.go b/libraries/shared/logs/logs_suite_test.go
index 5227bac7..9a04f74f 100644
--- a/libraries/shared/logs/logs_suite_test.go
+++ b/libraries/shared/logs/logs_suite_test.go
@@ -17,10 +17,11 @@
package logs_test
import (
- "github.com/sirupsen/logrus"
"io/ioutil"
"testing"
+ "github.com/sirupsen/logrus"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go
new file mode 100644
index 00000000..3a1a0254
--- /dev/null
+++ b/libraries/shared/mocks/backfiller.go
@@ -0,0 +1,58 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "errors"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+)
+
+// BackFiller mock for tests
+type BackFiller struct {
+ StorageDiffsToReturn []utils.StorageDiff
+ BackFillErrs []error
+ PassedEndingBlock uint64
+}
+
+// SetStorageDiffsToReturn for tests
+func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) {
+ backFiller.StorageDiffsToReturn = diffs
+}
+
+// BackFill mock method
+func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
+ if endingBlock < startingBlock {
+ return errors.New("backfill: ending block number needs to be greater than starting block number")
+ }
+ backFiller.PassedEndingBlock = endingBlock
+ go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) {
+ errLen := len(backFiller.BackFillErrs)
+ for i, diff := range backFiller.StorageDiffsToReturn {
+ if i < errLen {
+ err := backFiller.BackFillErrs[i]
+ if err != nil {
+ errChan <- err
+ continue
+ }
+ }
+ backFill <- diff
+ }
+ done <- true
+ }(backFill, errChan, done)
+ return nil
+}
diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go
new file mode 100644
index 00000000..d2156f8d
--- /dev/null
+++ b/libraries/shared/mocks/batch_client.go
@@ -0,0 +1,64 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "encoding/json"
+ "errors"
+
+ "github.com/ethereum/go-ethereum/statediff"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/client"
+)
+
+// BackFillerClient is a mock client for use in backfiller tests
+type BackFillerClient struct {
+ MappedStateDiffAt map[uint64][]byte
+}
+
+// SetReturnDiffAt method to set what statediffs the mock client returns
+func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
+ if mc.MappedStateDiffAt == nil {
+ mc.MappedStateDiffAt = make(map[uint64][]byte)
+ }
+ by, err := json.Marshal(diffPayload)
+ if err != nil {
+ return err
+ }
+ mc.MappedStateDiffAt[height] = by
+ return nil
+}
+
+// BatchCall mockClient method to simulate batch call to geth
+func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
+ if mc.MappedStateDiffAt == nil {
+ return errors.New("mockclient needs to be initialized with statediff payloads and errors")
+ }
+ for _, batchElem := range batch {
+ if len(batchElem.Args) != 1 {
+ return errors.New("expected batch elem to contain single argument")
+ }
+ blockHeight, ok := batchElem.Args[0].(uint64)
+ if !ok {
+ return errors.New("expected batch elem argument to be a uint64")
+ }
+ err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/libraries/shared/mocks/state_diff_fetcher.go b/libraries/shared/mocks/state_diff_fetcher.go
new file mode 100644
index 00000000..03a38dc8
--- /dev/null
+++ b/libraries/shared/mocks/state_diff_fetcher.go
@@ -0,0 +1,50 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "errors"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/statediff"
+)
+
+// StateDiffFetcher mock for tests
+type StateDiffFetcher struct {
+ PayloadsToReturn map[uint64]statediff.Payload
+ FetchErrs map[uint64]error
+ CalledAtBlockHeights [][]uint64
+ CalledTimes int64
+}
+
+// FetchStateDiffsAt mock method
+func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
+ if fetcher.PayloadsToReturn == nil {
+ return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
+ }
+ atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment
+ fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
+ results := make([]statediff.Payload, 0, len(blockHeights))
+ for _, height := range blockHeights {
+ results = append(results, fetcher.PayloadsToReturn[height])
+ err, ok := fetcher.FetchErrs[height]
+ if ok && err != nil {
+ return nil, err
+ }
+ }
+ return results, nil
+}
diff --git a/libraries/shared/mocks/statediff_streamer.go b/libraries/shared/mocks/statediff_streamer.go
new file mode 100644
index 00000000..cd387ee6
--- /dev/null
+++ b/libraries/shared/mocks/statediff_streamer.go
@@ -0,0 +1,43 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/statediff"
+)
+
+// StateDiffStreamer is the underlying struct for the Streamer interface
+type StateDiffStreamer struct {
+ PassedPayloadChan chan statediff.Payload
+ ReturnSub *rpc.ClientSubscription
+ ReturnErr error
+ StreamPayloads []statediff.Payload
+}
+
+// Stream is the main loop for subscribing to data from the Geth state diff process
+func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
+ sds.PassedPayloadChan = payloadChan
+
+ go func() {
+ for _, payload := range sds.StreamPayloads {
+ sds.PassedPayloadChan <- payload
+ }
+ }()
+
+ return sds.ReturnSub, sds.ReturnErr
+}
diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go
index 16c1ad93..d4928eeb 100644
--- a/libraries/shared/mocks/storage_fetcher.go
+++ b/libraries/shared/mocks/storage_fetcher.go
@@ -16,20 +16,23 @@
package mocks
-import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+import (
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+)
-type MockStorageFetcher struct {
+// StorageFetcher is a mock fetcher for use in tests with backfilling
+type StorageFetcher struct {
DiffsToReturn []utils.StorageDiff
ErrsToReturn []error
}
-func NewMockStorageFetcher() *MockStorageFetcher {
- return &MockStorageFetcher{}
+// NewStorageFetcher returns a new StorageFetcher
+func NewStorageFetcher() *StorageFetcher {
+ return &StorageFetcher{}
}
-func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
- defer close(out)
- defer close(errs)
+// FetchStorageDiffs mock method
+func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go
index ed2b9275..889be21e 100644
--- a/libraries/shared/mocks/storage_queue.go
+++ b/libraries/shared/mocks/storage_queue.go
@@ -20,27 +20,41 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
+// MockStorageQueue for tests
type MockStorageQueue struct {
- AddCalled bool
- AddError error
- AddPassedDiff utils.StorageDiff
- DeleteErr error
- DeletePassedID int
- GetAllErr error
- DiffsToReturn []utils.StorageDiff
+ AddCalled bool
+ AddError error
+ AddPassedDiffs map[int]utils.StorageDiff
+ DeleteErr error
+ DeletePassedIds []int
+ GetAllErr error
+ DiffsToReturn map[int]utils.StorageDiff
+ GetAllCalled bool
}
+// Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
queue.AddCalled = true
- queue.AddPassedDiff = diff
+ if queue.AddPassedDiffs == nil {
+ queue.AddPassedDiffs = make(map[int]utils.StorageDiff)
+ }
+ queue.AddPassedDiffs[diff.ID] = diff
return queue.AddError
}
+// Delete mock method
func (queue *MockStorageQueue) Delete(id int) error {
- queue.DeletePassedID = id
+ queue.DeletePassedIds = append(queue.DeletePassedIds, id)
+ delete(queue.DiffsToReturn, id)
return queue.DeleteErr
}
+// GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
- return queue.DiffsToReturn, queue.GetAllErr
+ queue.GetAllCalled = true
+ diffs := make([]utils.StorageDiff, 0)
+ for _, diff := range queue.DiffsToReturn {
+ diffs = append(diffs, diff)
+ }
+ return diffs, queue.GetAllErr
}
diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go
index b1c3cba2..04a091de 100644
--- a/libraries/shared/mocks/storage_transformer.go
+++ b/libraries/shared/mocks/storage_transformer.go
@@ -18,26 +18,34 @@ package mocks
import (
"github.com/ethereum/go-ethereum/common"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
+// MockStorageTransformer for tests
type MockStorageTransformer struct {
KeccakOfAddress common.Hash
ExecuteErr error
- PassedDiff utils.StorageDiff
+ PassedDiffs map[int]utils.StorageDiff
}
+// Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error {
- transformer.PassedDiff = diff
+ if transformer.PassedDiffs == nil {
+ transformer.PassedDiffs = make(map[int]utils.StorageDiff)
+ }
+ transformer.PassedDiffs[diff.ID] = diff
return transformer.ExecuteErr
}
+// KeccakContractAddress mock method
func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash {
return transformer.KeccakOfAddress
}
+// FakeTransformerInitializer mock method
func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer {
return transformer
}
diff --git a/libraries/shared/repository/address_repository_test.go b/libraries/shared/repository/address_repository_test.go
index 5abb4222..7e1c8b16 100644
--- a/libraries/shared/repository/address_repository_test.go
+++ b/libraries/shared/repository/address_repository_test.go
@@ -17,14 +17,14 @@
package repository_test
import (
- "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
- "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"strings"
"github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
diff --git a/libraries/shared/repository/repository_suite_test.go b/libraries/shared/repository/repository_suite_test.go
index d934f9f8..0868f7fc 100644
--- a/libraries/shared/repository/repository_suite_test.go
+++ b/libraries/shared/repository/repository_suite_test.go
@@ -19,10 +19,11 @@ package repository_test
import (
"testing"
+ "io/ioutil"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
- "io/ioutil"
)
func TestFactories(t *testing.T) {
diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go
new file mode 100644
index 00000000..6b5f290c
--- /dev/null
+++ b/libraries/shared/storage/backfiller.go
@@ -0,0 +1,149 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package storage
+
+import (
+ "fmt"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/statediff"
+ "github.com/sirupsen/logrus"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+)
+
+const (
+ DefaultMaxBatchSize uint64 = 100
+ defaultMaxBatchNumber int64 = 10
+)
+
+// BackFiller is the backfilling interface
+type BackFiller interface {
+ BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error
+}
+
+// backFiller is the backfilling struct
+type backFiller struct {
+ fetcher fetcher.StateDiffFetcher
+ batchSize uint64
+ startingBlock uint64
+}
+
+// NewStorageBackFiller returns a BackFiller
+func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) BackFiller {
+ if batchSize == 0 {
+ batchSize = DefaultMaxBatchSize
+ }
+ return &backFiller{
+ fetcher: fetcher,
+ batchSize: batchSize,
+ }
+}
+
+// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
+// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
+func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
+ logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
+
+ // break the range up into bins of smaller ranges
+ blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize)
+ if err != nil {
+ return err
+ }
+ // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
+ var activeCount int64
+ // channel for processing goroutines to signal when they are done
+ processingDone := make(chan [2]uint64)
+ forwardDone := make(chan bool)
+
+ // for each block range bin spin up a goroutine to batch fetch and process state diffs in that range
+ go func() {
+ for _, blockHeights := range blockRangeBins {
+ // if we have reached our limit of active goroutines
+ // wait for one to finish before starting the next
+ if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber {
+ // this blocks until a process signals it has finished
+ <-forwardDone
+ }
+ go bf.backFillRange(blockHeights, backFill, errChan, processingDone)
+ }
+ }()
+
+ // goroutine that listens on the processingDone chan
+ // keeps track of the number of processing goroutines that have finished
+ // when they have all finished, sends the final signal out
+ go func() {
+ goroutinesFinished := 0
+ for {
+ select {
+ case doneWithHeights := <-processingDone:
+ atomic.AddInt64(&activeCount, -1)
+ select {
+ // if we are waiting for a process to finish, signal that one has
+ case forwardDone <- true:
+ default:
+ }
+ logrus.Infof("finished fetching gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1])
+ goroutinesFinished++
+ if goroutinesFinished >= len(blockRangeBins) {
+ done <- true
+ return
+ }
+ }
+ }
+ }()
+
+ return nil
+}
+
+func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) {
+ payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
+ if fetchErr != nil {
+ errChan <- fetchErr
+ }
+ for _, payload := range payloads {
+ stateDiff := new(statediff.StateDiff)
+ stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
+ if stateDiffDecodeErr != nil {
+ errChan <- stateDiffDecodeErr
+ continue
+ }
+ accounts := utils.GetAccountsFromDiff(*stateDiff)
+ for _, account := range accounts {
+ logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.Key).Hex()))
+ for _, storage := range account.Storage {
+ diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
+ if formatErr != nil {
+ logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.Key), "from account with key: ", common.BytesToHash(account.Key))
+ errChan <- formatErr
+ continue
+ }
+ logrus.Trace("adding storage diff to results",
+ "keccak of address: ", diff.HashedAddress.Hex(),
+ "block height: ", diff.BlockHeight,
+ "storage key: ", diff.StorageKey.Hex(),
+ "storage value: ", diff.StorageValue.Hex())
+ diffChan <- diff
+ }
+ }
+ }
+ // when this is done, send out a signal
+ doneChan <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]}
+}
diff --git a/libraries/shared/storage/backfiller_test.go b/libraries/shared/storage/backfiller_test.go
new file mode 100644
index 00000000..0b347576
--- /dev/null
+++ b/libraries/shared/storage/backfiller_test.go
@@ -0,0 +1,237 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package storage_test
+
+import (
+ "errors"
+
+ "github.com/ethereum/go-ethereum/statediff"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
+)
+
+var _ = Describe("BackFiller", func() {
+ Describe("BackFill", func() {
+ var (
+ mockFetcher *mocks.StateDiffFetcher
+ backFiller storage.BackFiller
+ )
+ BeforeEach(func() {
+ mockFetcher = new(mocks.StateDiffFetcher)
+ mockFetcher.PayloadsToReturn = map[uint64]statediff.Payload{
+ test_data.BlockNumber.Uint64(): test_data.MockStatediffPayload,
+ test_data.BlockNumber2.Uint64(): test_data.MockStatediffPayload2,
+ }
+ })
+
+ It("batch calls statediff_stateDiffAt", func() {
+ backFiller = storage.NewStorageBackFiller(mockFetcher, 100)
+ backFill := make(chan utils.StorageDiff)
+ done := make(chan bool)
+ errChan := make(chan error)
+ backFillInitErr := backFiller.BackFill(
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber2.Uint64(),
+ backFill,
+ errChan,
+ done)
+ Expect(backFillInitErr).ToNot(HaveOccurred())
+ var diffs []utils.StorageDiff
+ for {
+ select {
+ case diff := <-backFill:
+ diffs = append(diffs, diff)
+ continue
+ case err := <-errChan:
+ Expect(err).ToNot(HaveOccurred())
+ continue
+ case <-done:
+ break
+ }
+ break
+ }
+ Expect(mockFetcher.CalledTimes).To(Equal(int64(1)))
+ Expect(len(diffs)).To(Equal(4))
+ Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
+ })
+
+ It("has a configurable batch size", func() {
+ backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
+ backFill := make(chan utils.StorageDiff)
+ done := make(chan bool)
+ errChan := make(chan error)
+ backFillInitErr := backFiller.BackFill(
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber2.Uint64(),
+ backFill,
+ errChan,
+ done)
+ Expect(backFillInitErr).ToNot(HaveOccurred())
+ var diffs []utils.StorageDiff
+ for {
+ select {
+ case diff := <-backFill:
+ diffs = append(diffs, diff)
+ continue
+ case err := <-errChan:
+ Expect(err).ToNot(HaveOccurred())
+ continue
+ case <-done:
+ break
+ }
+ break
+ }
+ Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
+ Expect(len(diffs)).To(Equal(4))
+ Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
+ })
+
+ It("handles bin numbers in excess of the goroutine limit (100)", func() {
+ payloadsToReturn := make(map[uint64]statediff.Payload, 1001)
+ for i := test_data.BlockNumber.Uint64(); i <= test_data.BlockNumber.Uint64()+1000; i++ {
+ payloadsToReturn[i] = test_data.MockStatediffPayload
+ }
+ mockFetcher.PayloadsToReturn = payloadsToReturn
+ // batch size of 2 with 1001 block range => 501 bins
+ backFiller = storage.NewStorageBackFiller(mockFetcher, 2)
+ backFill := make(chan utils.StorageDiff)
+ done := make(chan bool)
+ errChan := make(chan error)
+ backFillInitErr := backFiller.BackFill(
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber.Uint64()+1000,
+ backFill,
+ errChan,
+ done)
+ Expect(backFillInitErr).ToNot(HaveOccurred())
+ var diffs []utils.StorageDiff
+ for {
+ select {
+ case diff := <-backFill:
+ diffs = append(diffs, diff)
+ continue
+ case err := <-errChan:
+ Expect(err).ToNot(HaveOccurred())
+ continue
+ case <-done:
+ break
+ }
+ break
+ }
+ Expect(mockFetcher.CalledTimes).To(Equal(int64(501)))
+ Expect(len(diffs)).To(Equal(3003))
+ Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
+ Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
+ })
+
+ It("passes fetcher errors forward", func() {
+ mockFetcher.FetchErrs = map[uint64]error{
+ test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
+ }
+ backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
+ backFill := make(chan utils.StorageDiff)
+ done := make(chan bool)
+ errChan := make(chan error)
+ backFillInitErr := backFiller.BackFill(
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber2.Uint64(),
+ backFill,
+ errChan,
+ done)
+ Expect(backFillInitErr).ToNot(HaveOccurred())
+ var numOfErrs int
+ var diffs []utils.StorageDiff
+ for {
+ select {
+ case diff := <-backFill:
+ diffs = append(diffs, diff)
+ continue
+ case err := <-errChan:
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal("mock fetcher error"))
+ numOfErrs++
+ continue
+ case <-done:
+ break
+ }
+ break
+ }
+ Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
+ Expect(numOfErrs).To(Equal(1))
+ Expect(len(diffs)).To(Equal(1))
+ Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
+
+ mockFetcher.FetchErrs = map[uint64]error{
+ test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
+ test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"),
+ }
+ mockFetcher.CalledTimes = 0
+ backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
+ backFill = make(chan utils.StorageDiff)
+ done = make(chan bool)
+ errChan = make(chan error)
+ backFillInitErr = backFiller.BackFill(
+ test_data.BlockNumber.Uint64(),
+ test_data.BlockNumber2.Uint64(),
+ backFill,
+ errChan,
+ done)
+ Expect(backFillInitErr).ToNot(HaveOccurred())
+ numOfErrs = 0
+ diffs = []utils.StorageDiff{}
+ for {
+ select {
+ case diff := <-backFill:
+ diffs = append(diffs, diff)
+ continue
+ case err := <-errChan:
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal("mock fetcher error"))
+ numOfErrs++
+ continue
+ case <-done:
+ break
+ }
+ break
+ }
+ Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
+ Expect(numOfErrs).To(Equal(2))
+ Expect(len(diffs)).To(Equal(0))
+ })
+ })
+})
+
+func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool {
+ for _, d := range diffs {
+ if d == diff {
+ return true
+ }
+ }
+ return false
+}
diff --git a/libraries/shared/storage/storage_suite_test.go b/libraries/shared/storage/storage_suite_test.go
index a24f35d3..c22b8dba 100644
--- a/libraries/shared/storage/storage_suite_test.go
+++ b/libraries/shared/storage/storage_suite_test.go
@@ -19,10 +19,11 @@ package storage_test
import (
"testing"
+ "io/ioutil"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
- "io/ioutil"
)
func TestFactories(t *testing.T) {
diff --git a/libraries/shared/storage/utils/bins.go b/libraries/shared/storage/utils/bins.go
new file mode 100644
index 00000000..18f28c32
--- /dev/null
+++ b/libraries/shared/storage/utils/bins.go
@@ -0,0 +1,45 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package utils
+
+import "errors"
+
+func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
+ if endingBlock < startingBlock {
+ return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
+ }
+ if batchSize == 0 {
+ return nil, errors.New("backfill: batchsize needs to be greater than zero")
+ }
+ length := endingBlock - startingBlock + 1
+ numberOfBins := length / batchSize
+ remainder := length % batchSize
+ if remainder != 0 {
+ numberOfBins++
+ }
+ blockRangeBins := make([][]uint64, numberOfBins)
+ for i := range blockRangeBins {
+ nextBinStart := startingBlock + batchSize
+ blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
+ for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ {
+ blockRange = append(blockRange, j)
+ }
+ startingBlock = nextBinStart
+ blockRangeBins[i] = blockRange
+ }
+ return blockRangeBins, nil
+}
diff --git a/libraries/shared/storage/utils/bins_test.go b/libraries/shared/storage/utils/bins_test.go
new file mode 100644
index 00000000..84f9405e
--- /dev/null
+++ b/libraries/shared/storage/utils/bins_test.go
@@ -0,0 +1,66 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package utils_test
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+)
+
+var _ = Describe("GetBlockHeightBins", func() {
+ It("splits a block range up into bins", func() {
+ var startingBlock uint64 = 1
+ var endingBlock uint64 = 10101
+ var batchSize uint64 = 100
+ blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(blockRangeBins)).To(Equal(102))
+ Expect(blockRangeBins[101]).To(Equal([]uint64{10101}))
+
+ startingBlock = 101
+ endingBlock = 10100
+ batchSize = 100
+ lastBin := make([]uint64, 0)
+ for i := 10001; i <= 10100; i++ {
+ lastBin = append(lastBin, uint64(i))
+ }
+ blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(blockRangeBins)).To(Equal(100))
+ Expect(blockRangeBins[99]).To(Equal(lastBin))
+ })
+
+ It("throws an error if the starting block is higher than the ending block", func() {
+ var startingBlock uint64 = 10102
+ var endingBlock uint64 = 10101
+ var batchSize uint64 = 100
+ _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("ending block number needs to be greater than starting block number"))
+ })
+
+ It("throws an error if the batch size is zero", func() {
+ var startingBlock uint64 = 1
+ var endingBlock uint64 = 10101
+ var batchSize uint64 = 0
+ _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("batchsize needs to be greater than zero"))
+ })
+})
diff --git a/libraries/shared/storage/utils/diff.go b/libraries/shared/storage/utils/diff.go
index 991c9828..45e8ffa2 100644
--- a/libraries/shared/storage/utils/diff.go
+++ b/libraries/shared/storage/utils/diff.go
@@ -17,11 +17,12 @@
package utils
import (
+ "strconv"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
- "strconv"
)
const ExpectedRowLength = 5
@@ -71,3 +72,8 @@ func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.State
func HexToKeccak256Hash(hex string) common.Hash {
return crypto.Keccak256Hash(common.FromHex(hex))
}
+
+func GetAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff {
+ accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
+ return append(accounts, stateDiff.DeletedAccounts...)
+}
diff --git a/libraries/shared/storage/utils/diff_test.go b/libraries/shared/storage/utils/diff_test.go
index 2de496ac..1c530a7c 100644
--- a/libraries/shared/storage/utils/diff_test.go
+++ b/libraries/shared/storage/utils/diff_test.go
@@ -17,6 +17,9 @@
package utils_test
import (
+ "math/big"
+ "math/rand"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
@@ -25,8 +28,6 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
- "math/big"
- "math/rand"
)
var _ = Describe("Storage row parsing", func() {
diff --git a/libraries/shared/storage/utils/keys_loader.go b/libraries/shared/storage/utils/keys_loader.go
index 0142b564..e1252643 100644
--- a/libraries/shared/storage/utils/keys_loader.go
+++ b/libraries/shared/storage/utils/keys_loader.go
@@ -17,9 +17,10 @@
package utils
import (
+ "math/big"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
- "math/big"
)
const (
diff --git a/libraries/shared/storage/utils/utils_suite_test.go b/libraries/shared/storage/utils/utils_suite_test.go
index f77d6860..51a53ac6 100644
--- a/libraries/shared/storage/utils/utils_suite_test.go
+++ b/libraries/shared/storage/utils/utils_suite_test.go
@@ -17,10 +17,11 @@
package utils_test
import (
- "github.com/sirupsen/logrus"
"io/ioutil"
"testing"
+ "github.com/sirupsen/logrus"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go
index 564a4e15..458cb2b7 100644
--- a/libraries/shared/streamer/statediff_streamer.go
+++ b/libraries/shared/streamer/statediff_streamer.go
@@ -1,16 +1,18 @@
-// Copyright 2019 Vulcanize
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
package streamer
@@ -18,24 +20,29 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/pkg/core"
)
+// Streamer is the interface for streaming a statediff subscription
type Streamer interface {
- Stream(chan statediff.Payload) (*rpc.ClientSubscription, error)
+ Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
}
+// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
type StateDiffStreamer struct {
- client core.RPCClient
+ Client core.RPCClient
}
-func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
- logrus.Info("streaming diffs from geth")
- return streamer.client.Subscribe("statediff", payloadChan, "stream")
-}
-
-func NewStateDiffStreamer(client core.RPCClient) StateDiffStreamer {
- return StateDiffStreamer{
- client: client,
+// NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the IStateDiffStreamer interface
+func NewStateDiffStreamer(client core.RPCClient) Streamer {
+ return &StateDiffStreamer{
+ Client: client,
}
}
+
+// Stream is the main loop for subscribing to data from the Geth state diff process
+func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
+ logrus.Info("streaming diffs from geth")
+ return sds.Client.Subscribe("statediff", payloadChan, "stream")
+}
diff --git a/libraries/shared/test_data/generic.go b/libraries/shared/test_data/generic.go
index 43d3ba12..07be69b8 100644
--- a/libraries/shared/test_data/generic.go
+++ b/libraries/shared/test_data/generic.go
@@ -17,13 +17,14 @@
package test_data
import (
+ "math/rand"
+ "time"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
- "math/rand"
- "time"
)
var startingBlockNumber = rand.Int63()
diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go
index 3e0a2219..f9bfd8fe 100644
--- a/libraries/shared/test_data/statediff.go
+++ b/libraries/shared/test_data/statediff.go
@@ -15,20 +15,23 @@
package test_data
import (
- "errors"
+ "math/big"
+ "math/rand"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
- "math/big"
- "math/rand"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
var (
BlockNumber = big.NewInt(rand.Int63())
+ BlockNumber2 = big.NewInt(0).Add(BlockNumber, big.NewInt(1))
BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"
+ BlockHash2 = "0xaa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f72"
CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
NewNonceValue = rand.Uint64()
NewBalanceValue = rand.Int63()
@@ -43,15 +46,14 @@ var (
Path: StoragePath,
Proof: [][]byte{},
}}
- LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
- LargeStorageValueRlp, rlpErr = rlp.EncodeToBytes(LargeStorageValue)
- storageWithLargeValue = []statediff.StorageDiff{{
+ LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
+ LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue)
+ storageWithLargeValue = []statediff.StorageDiff{{
Key: StorageKey,
Value: LargeStorageValueRlp,
Path: StoragePath,
Proof: [][]byte{},
}}
- EmptyStorage = make([]statediff.StorageDiff, 0)
StorageWithBadValue = statediff.StorageDiff{
Key: StorageKey,
Value: []byte{0, 1, 2},
@@ -83,6 +85,11 @@ var (
Value: valueBytes,
Storage: storageWithLargeValue,
}}
+ UpdatedAccountDiffs2 = []statediff.AccountDiff{{
+ Key: AnotherContractLeafKey.Bytes(),
+ Value: valueBytes,
+ Storage: storageWithSmallValue,
+ }}
DeletedAccountDiffs = []statediff.AccountDiff{{
Key: AnotherContractLeafKey.Bytes(),
@@ -97,7 +104,15 @@ var (
DeletedAccounts: DeletedAccountDiffs,
UpdatedAccounts: UpdatedAccountDiffs,
}
- MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
+ MockStateDiff2 = statediff.StateDiff{
+ BlockNumber: BlockNumber2,
+ BlockHash: common.HexToHash(BlockHash2),
+ CreatedAccounts: nil,
+ DeletedAccounts: nil,
+ UpdatedAccounts: UpdatedAccountDiffs2,
+ }
+ MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
+ MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2)
mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil)
mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil)
@@ -114,24 +129,57 @@ var (
TxHash: common.HexToHash("0x0"),
ReceiptHash: common.HexToHash("0x0"),
}
- MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts)
- MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock)
+ MockHeader2 = types.Header{
+ Time: 0,
+ Number: BlockNumber2,
+ Root: common.HexToHash("0x1"),
+ TxHash: common.HexToHash("0x1"),
+ ReceiptHash: common.HexToHash("0x1"),
+ }
+ MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts)
+ MockBlock2 = types.NewBlock(&MockHeader2, MockTransactions, nil, MockReceipts)
+ MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock)
+ MockBlockRlp2, _ = rlp.EncodeToBytes(MockBlock2)
MockStatediffPayload = statediff.Payload{
BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes,
- Err: nil,
+ }
+ MockStatediffPayload2 = statediff.Payload{
+ BlockRlp: MockBlockRlp2,
+ StateDiffRlp: MockStateDiff2Bytes,
}
- EmptyStatediffPayload = statediff.Payload{
- BlockRlp: []byte{},
- StateDiffRlp: []byte{},
- Err: nil,
+ CreatedExpectedStorageDiff = utils.StorageDiff{
+ ID: 0,
+ HashedAddress: common.BytesToHash(ContractLeafKey[:]),
+ BlockHash: common.HexToHash(BlockHash),
+ BlockHeight: int(BlockNumber.Int64()),
+ StorageKey: common.BytesToHash(StorageKey),
+ StorageValue: common.BytesToHash(SmallStorageValue),
}
-
- ErrStatediffPayload = statediff.Payload{
- BlockRlp: []byte{},
- StateDiffRlp: []byte{},
- Err: errors.New("mock error"),
+ UpdatedExpectedStorageDiff = utils.StorageDiff{
+ ID: 0,
+ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
+ BlockHash: common.HexToHash(BlockHash),
+ BlockHeight: int(BlockNumber.Int64()),
+ StorageKey: common.BytesToHash(StorageKey),
+ StorageValue: common.BytesToHash(LargeStorageValue),
+ }
+ UpdatedExpectedStorageDiff2 = utils.StorageDiff{
+ ID: 0,
+ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
+ BlockHash: common.HexToHash(BlockHash2),
+ BlockHeight: int(BlockNumber2.Int64()),
+ StorageKey: common.BytesToHash(StorageKey),
+ StorageValue: common.BytesToHash(SmallStorageValue),
+ }
+ DeletedExpectedStorageDiff = utils.StorageDiff{
+ ID: 0,
+ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
+ BlockHash: common.HexToHash(BlockHash),
+ BlockHeight: int(BlockNumber.Int64()),
+ StorageKey: common.BytesToHash(StorageKey),
+ StorageValue: common.BytesToHash(SmallStorageValue),
}
)
diff --git a/libraries/shared/test_data/test_helpers.go b/libraries/shared/test_data/test_helpers.go
index 4295f4c6..b819635e 100644
--- a/libraries/shared/test_data/test_helpers.go
+++ b/libraries/shared/test_data/test_helpers.go
@@ -1,13 +1,14 @@
package test_data
import (
+ "math/rand"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
- "math/rand"
)
// Create a header sync log to reference in an event, returning inserted header sync log
diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go
index d6028c5c..7ab70897 100644
--- a/libraries/shared/watcher/event_watcher.go
+++ b/libraries/shared/watcher/event_watcher.go
@@ -17,7 +17,10 @@
package watcher
import (
+ "time"
+
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
@@ -27,7 +30,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
- "time"
)
const NoNewDataPause = time.Second * 7
diff --git a/libraries/shared/watcher/event_watcher_test.go b/libraries/shared/watcher/event_watcher_test.go
index 78221db9..9315684b 100644
--- a/libraries/shared/watcher/event_watcher_test.go
+++ b/libraries/shared/watcher/event_watcher_test.go
@@ -18,6 +18,7 @@ package watcher_test
import (
"errors"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go
index 38467c43..d4cd3067 100644
--- a/libraries/shared/watcher/storage_watcher.go
+++ b/libraries/shared/watcher/storage_watcher.go
@@ -18,19 +18,22 @@ package watcher
import (
"fmt"
+ "time"
+
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
- "time"
)
type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer)
- Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration)
+ Execute(queueRecheckInterval time.Duration, backFillOn bool)
+ BackFill(startingBlock uint64, backFiller storage.BackFiller)
}
type StorageWatcher struct {
@@ -38,42 +41,69 @@ type StorageWatcher struct {
StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
+ DiffsChan chan utils.StorageDiff
+ ErrsChan chan error
+ BackFillDoneChan chan bool
+ StartingSyncBlockChan chan uint64
}
-func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
+func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher {
queue := storage.NewStorageQueue(db)
transformers := make(map[common.Hash]transformer.StorageTransformer)
- return StorageWatcher{
+ return &StorageWatcher{
db: db,
- StorageFetcher: fetcher,
+ StorageFetcher: f,
+ DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize),
+ ErrsChan: make(chan error),
+ StartingSyncBlockChan: make(chan uint64),
+ BackFillDoneChan: make(chan bool),
Queue: queue,
KeccakAddressTransformers: transformers,
}
}
-func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
+func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
for _, initializer := range initializers {
storageTransformer := initializer(storageWatcher.db)
storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer
}
}
-func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) {
+// BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher
+func (storageWatcher *StorageWatcher) BackFill(startingBlock uint64, backFiller storage.BackFiller) {
+ // this blocks until the Execute process sends us the first block number it sees
+ endBackFillBlock := <-storageWatcher.StartingSyncBlockChan
+ backFillInitErr := backFiller.BackFill(startingBlock, endBackFillBlock,
+ storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan)
+ if backFillInitErr != nil {
+ logrus.Warn(backFillInitErr)
+ }
+}
+
+// Execute runs the StorageWatcher processes
+func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration, backFillOn bool) {
ticker := time.NewTicker(queueRecheckInterval)
- go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan)
+ go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan)
+ start := true
for {
select {
- case fetchErr := <-errsChan:
- logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
- case diff := <-diffsChan:
+ case fetchErr := <-storageWatcher.ErrsChan:
+ logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error()))
+ case diff := <-storageWatcher.DiffsChan:
+ if start && backFillOn {
+ storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1)
+ start = false
+ }
storageWatcher.processRow(diff)
case <-ticker.C:
storageWatcher.processQueue()
+ case <-storageWatcher.BackFillDoneChan:
+ logrus.Info("storage watcher backfill process has finished")
}
}
}
-func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) {
+func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress]
return storageTransformer, ok
}
diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go
index 7cc9a115..c2cc44dd 100644
--- a/libraries/shared/watcher/storage_watcher_test.go
+++ b/libraries/shared/watcher/storage_watcher_test.go
@@ -17,19 +17,24 @@
package watcher_test
import (
+ "errors"
+ "io/ioutil"
+ "os"
+ "sort"
+ "time"
+
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
- "io/ioutil"
- "os"
- "time"
)
var _ = Describe("Storage Watcher", func() {
@@ -37,31 +42,26 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress}
- w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
+ w := watcher.NewStorageWatcher(mocks.NewStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer))
})
})
-
Describe("Execute", func() {
var (
- errs chan error
- mockFetcher *mocks.MockStorageFetcher
+ mockFetcher *mocks.StorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
- diffs chan utils.StorageDiff
- storageWatcher watcher.StorageWatcher
+ storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash
)
BeforeEach(func() {
- errs = make(chan error)
- diffs = make(chan utils.StorageDiff)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
- mockFetcher = mocks.NewMockStorageFetcher()
+ mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{
@@ -84,7 +84,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
@@ -102,25 +102,29 @@ var _ = Describe("Storage Watcher", func() {
})
It("executes transformer for recognized storage diff", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
- Eventually(func() utils.StorageDiff {
- return mockTransformer.PassedDiff
- }).Should(Equal(csvDiff))
+ Eventually(func() map[int]utils.StorageDiff {
+ return mockTransformer.PassedDiffs
+ }).Should(Equal(map[int]utils.StorageDiff{
+ csvDiff.ID: csvDiff,
+ }))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
- Expect(<-errs).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
- return mockQueue.AddPassedDiff
+ if len(mockQueue.AddPassedDiffs) > 0 {
+ return mockQueue.AddPassedDiffs[csvDiff.ID]
+ }
+ return utils.StorageDiff{}
}).Should(Equal(csvDiff))
close(done)
})
@@ -133,7 +137,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
Eventually(func() bool {
return mockQueue.AddCalled
@@ -148,26 +152,34 @@ var _ = Describe("Storage Watcher", func() {
Describe("transforming queued storage diffs", func() {
BeforeEach(func() {
- mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff}
+ mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
+ csvDiff.ID: csvDiff,
+ }
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diff", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() utils.StorageDiff {
- return mockTransformer.PassedDiff
+ if len(mockTransformer.PassedDiffs) > 0 {
+ return mockTransformer.PassedDiffs[csvDiff.ID]
+ }
+ return utils.StorageDiff{}
}).Should(Equal(csvDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int {
- return mockQueue.DeletePassedID
+ if len(mockQueue.DeletePassedIds) > 0 {
+ return mockQueue.DeletePassedIds[0]
+ }
+ return 0
}).Should(Equal(csvDiff.ID))
close(done)
})
@@ -179,7 +191,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
@@ -193,12 +205,17 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
}
- mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
+ mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
+ obsoleteDiff.ID: obsoleteDiff,
+ }
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int {
- return mockQueue.DeletePassedID
+ if len(mockQueue.DeletePassedIds) > 0 {
+ return mockQueue.DeletePassedIds[0]
+ }
+ return 0
}).Should(Equal(obsoleteDiff.ID))
close(done)
})
@@ -208,14 +225,16 @@ var _ = Describe("Storage Watcher", func() {
ID: csvDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"),
}
- mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
+ mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
+ obsoleteDiff.ID: obsoleteDiff,
+ }
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
@@ -225,4 +244,236 @@ var _ = Describe("Storage Watcher", func() {
})
})
})
+
+ Describe("BackFill", func() {
+ var (
+ mockFetcher *mocks.StorageFetcher
+ mockBackFiller *mocks.BackFiller
+ mockQueue *mocks.MockStorageQueue
+ mockTransformer *mocks.MockStorageTransformer
+ mockTransformer2 *mocks.MockStorageTransformer
+ mockTransformer3 *mocks.MockStorageTransformer
+ csvDiff utils.StorageDiff
+ storageWatcher *watcher.StorageWatcher
+ hashedAddress common.Hash
+ createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff
+ )
+
+ BeforeEach(func() {
+ createdDiff = test_data.CreatedExpectedStorageDiff
+ createdDiff.ID = 1333
+ updatedDiff1 = test_data.UpdatedExpectedStorageDiff
+ updatedDiff1.ID = 1334
+ deletedDiff = test_data.DeletedExpectedStorageDiff
+ deletedDiff.ID = 1335
+ updatedDiff2 = test_data.UpdatedExpectedStorageDiff2
+ updatedDiff2.ID = 1336
+ mockBackFiller = new(mocks.BackFiller)
+ hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
+ mockFetcher = mocks.NewStorageFetcher()
+ mockQueue = &mocks.MockStorageQueue{}
+ mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
+ mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])}
+ mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])}
+ csvDiff = utils.StorageDiff{
+ ID: 1337,
+ HashedAddress: hashedAddress,
+ BlockHash: common.HexToHash("0xfedcba9876543210"),
+ BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
+ StorageKey: common.HexToHash("0xabcdef1234567890"),
+ StorageValue: common.HexToHash("0x9876543210abcdef"),
+ }
+ })
+
+ Describe("transforming streamed and backfilled storage diffs", func() {
+ BeforeEach(func() {
+ mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
+ mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
+ createdDiff,
+ updatedDiff1,
+ deletedDiff,
+ updatedDiff2,
+ })
+ mockQueue.DiffsToReturn = map[int]utils.StorageDiff{}
+ storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
+ storageWatcher.Queue = mockQueue
+ storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
+ mockTransformer.FakeTransformerInitializer,
+ mockTransformer2.FakeTransformerInitializer,
+ mockTransformer3.FakeTransformerInitializer,
+ })
+ })
+
+ It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) {
+ go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
+ go storageWatcher.Execute(time.Hour, true)
+
+ Eventually(func() int {
+ return len(mockTransformer.PassedDiffs)
+ }).Should(Equal(1))
+
+ Eventually(func() int {
+ return len(mockTransformer2.PassedDiffs)
+ }).Should(Equal(1))
+
+ Eventually(func() int {
+ return len(mockTransformer3.PassedDiffs)
+ }).Should(Equal(3))
+ Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
+ Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
+ Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
+ Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
+ close(done)
+ })
+
+ It("adds diffs to the queue if transformation fails", func(done Done) {
+ mockTransformer3.ExecuteErr = fakes.FakeError
+ go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
+ go storageWatcher.Execute(time.Hour, true)
+
+ Eventually(func() int {
+ return len(mockTransformer.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer2.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer3.PassedDiffs)
+ }).Should(Equal(3))
+
+ Eventually(func() bool {
+ return mockQueue.AddCalled
+ }).Should(BeTrue())
+ Eventually(func() map[int]utils.StorageDiff {
+ if len(mockQueue.AddPassedDiffs) > 2 {
+ return mockQueue.AddPassedDiffs
+ }
+ return map[int]utils.StorageDiff{}
+ }).Should(Equal(map[int]utils.StorageDiff{
+ updatedDiff1.ID: updatedDiff1,
+ deletedDiff.ID: deletedDiff,
+ updatedDiff2.ID: updatedDiff2,
+ }))
+
+ Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
+ Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
+ Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
+ Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
+ close(done)
+ })
+
+ It("logs a backfill error", func(done Done) {
+ tempFile, fileErr := ioutil.TempFile("", "log")
+ Expect(fileErr).NotTo(HaveOccurred())
+ defer os.Remove(tempFile.Name())
+ logrus.SetOutput(tempFile)
+
+ mockBackFiller.BackFillErrs = []error{
+ nil,
+ nil,
+ nil,
+ errors.New("mock backfiller error"),
+ }
+
+ go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
+ go storageWatcher.Execute(time.Hour, true)
+
+ Eventually(func() int {
+ return len(mockTransformer.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer2.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer3.PassedDiffs)
+ }).Should(Equal(2))
+ Eventually(func() (string, error) {
+ logContent, err := ioutil.ReadFile(tempFile.Name())
+ return string(logContent), err
+ }).Should(ContainSubstring("mock backfiller error"))
+ close(done)
+ })
+
+ It("logs when backfill finishes", func(done Done) {
+ tempFile, fileErr := ioutil.TempFile("", "log")
+ Expect(fileErr).NotTo(HaveOccurred())
+ defer os.Remove(tempFile.Name())
+ logrus.SetOutput(tempFile)
+
+ go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
+ go storageWatcher.Execute(time.Hour, true)
+
+ Eventually(func() (string, error) {
+ logContent, err := ioutil.ReadFile(tempFile.Name())
+ return string(logContent), err
+ }).Should(ContainSubstring("storage watcher backfill process has finished"))
+ close(done)
+ })
+ })
+
+ Describe("transforms queued storage diffs", func() {
+ BeforeEach(func() {
+ mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
+ csvDiff.ID: csvDiff,
+ createdDiff.ID: createdDiff,
+ updatedDiff1.ID: updatedDiff1,
+ deletedDiff.ID: deletedDiff,
+ updatedDiff2.ID: updatedDiff2,
+ }
+ storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
+ storageWatcher.Queue = mockQueue
+ storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
+ mockTransformer.FakeTransformerInitializer,
+ mockTransformer2.FakeTransformerInitializer,
+ mockTransformer3.FakeTransformerInitializer,
+ })
+ })
+
+ It("executes transformers on queued storage diffs", func(done Done) {
+ go storageWatcher.BackFill(test_data.BlockNumber.Uint64(), mockBackFiller)
+ go storageWatcher.Execute(time.Nanosecond, true)
+
+ Eventually(func() int {
+ return len(mockTransformer.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer2.PassedDiffs)
+ }).Should(Equal(1))
+ Eventually(func() int {
+ return len(mockTransformer3.PassedDiffs)
+ }).Should(Equal(3))
+ Eventually(func() bool {
+ return mockQueue.GetAllCalled
+ }).Should(BeTrue())
+ sortedExpectedIDs := []int{
+ csvDiff.ID,
+ createdDiff.ID,
+ updatedDiff1.ID,
+ deletedDiff.ID,
+ updatedDiff2.ID,
+ }
+ sort.Ints(sortedExpectedIDs)
+ Eventually(func() []int {
+ if len(mockQueue.DeletePassedIds) > 4 {
+ sort.Ints(mockQueue.DeletePassedIds)
+ return mockQueue.DeletePassedIds
+ }
+ return []int{}
+ }).Should(Equal(sortedExpectedIDs))
+
+ Expect(mockQueue.AddCalled).To(Not(BeTrue()))
+ Expect(len(mockQueue.DiffsToReturn)).To(Equal(0))
+ Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff))
+ Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1))
+ Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
+ Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
+ close(done)
+ })
+ })
+ })
})
diff --git a/libraries/shared/watcher/watcher_suite_test.go b/libraries/shared/watcher/watcher_suite_test.go
index 9685a721..f480cc3a 100644
--- a/libraries/shared/watcher/watcher_suite_test.go
+++ b/libraries/shared/watcher/watcher_suite_test.go
@@ -17,10 +17,11 @@
package watcher_test
import (
- log "github.com/sirupsen/logrus"
"io/ioutil"
"testing"
+ log "github.com/sirupsen/logrus"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go
index b77870c7..df1f4e56 100644
--- a/pkg/contract_watcher/header/transformer/transformer.go
+++ b/pkg/contract_watcher/header/transformer/transformer.go
@@ -20,6 +20,7 @@ import (
"database/sql"
"errors"
"fmt"
+ "math"
"strings"
"github.com/ethereum/go-ethereum/common"
@@ -105,7 +106,7 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling
tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs
- tr.Start = 100000000000
+ tr.Start = math.MaxInt64
// Iterate through all internal contract addresses
for contractAddr := range tr.Config.Addresses {
diff --git a/pkg/eth/eth_suite_test.go b/pkg/eth/eth_suite_test.go
index 6271dca1..b6c30dfa 100644
--- a/pkg/eth/eth_suite_test.go
+++ b/pkg/eth/eth_suite_test.go
@@ -25,5 +25,5 @@ import (
func TestGeth(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "eth Suite")
+ RunSpecs(t, "Eth Suite")
}
diff --git a/pkg/eth/node/node.go b/pkg/eth/node/node.go
index 90ffb329..a60d271e 100644
--- a/pkg/eth/node/node.go
+++ b/pkg/eth/node/node.go
@@ -18,17 +18,15 @@ package node
import (
"context"
-
- "strconv"
-
"regexp"
-
- "log"
+ "strconv"
+ "strings"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
+ log "github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/pkg/core"
- "strings"
)
type IPropertiesReader interface {
@@ -101,7 +99,7 @@ func (reader PropertiesReader) NetworkID() float64 {
var version string
err := reader.client.CallContext(context.Background(), &version, "net_version")
if err != nil {
- log.Println(err)
+ log.Error(err)
}
networkID, _ := strconv.ParseFloat(version, 64)
return networkID
@@ -111,13 +109,19 @@ func (reader PropertiesReader) GenesisBlock() string {
var header *types.Header
blockZero := "0x0"
includeTransactions := false
- reader.client.CallContext(context.Background(), &header, "eth_getBlockByNumber", blockZero, includeTransactions)
+ err := reader.client.CallContext(context.Background(), &header, "eth_getBlockByNumber", blockZero, includeTransactions)
+ if err != nil {
+ log.Error(err)
+ }
return header.Hash().Hex()
}
func (reader PropertiesReader) NodeInfo() (string, string) {
var info p2p.NodeInfo
- reader.client.CallContext(context.Background(), &info, "admin_nodeInfo")
+ err := reader.client.CallContext(context.Background(), &info, "admin_nodeInfo")
+ if err != nil {
+ log.Error(err)
+ }
return info.ID, info.Name
}
@@ -137,14 +141,20 @@ func (client GanacheClient) NodeInfo() (string, string) {
func (client ParityClient) parityNodeInfo() string {
var nodeInfo core.ParityNodeInfo
- client.client.CallContext(context.Background(), &nodeInfo, "parity_versionInfo")
+ err := client.client.CallContext(context.Background(), &nodeInfo, "parity_versionInfo")
+ if err != nil {
+ log.Error(err)
+ }
return nodeInfo.String()
}
func (client ParityClient) parityID() string {
var enodeID = regexp.MustCompile(`^enode://(.+)@.+$`)
var enodeURL string
- client.client.CallContext(context.Background(), &enodeURL, "parity_enode")
+ err := client.client.CallContext(context.Background(), &enodeURL, "parity_enode")
+ if err != nil {
+ log.Error(err)
+ }
enode := enodeID.FindStringSubmatch(enodeURL)
if len(enode) < 2 {
return ""