diff --git a/cmd/execute.go b/cmd/execute.go
index 2f2c31ea..097fa9f1 100644
--- a/cmd/execute.go
+++ b/cmd/execute.go
@@ -17,21 +17,25 @@
package cmd
import (
+ "errors"
"fmt"
"plugin"
syn "sync"
"time"
+ "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
+ "github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
- storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils"
)
@@ -186,13 +190,29 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers")
- ticker := time.NewTicker(pollingInterval)
- defer ticker.Stop()
- for range ticker.C {
- errs := make(chan error)
- diffs := make(chan storageUtils.StorageDiff)
- w.Execute(diffs, errs, queueRecheckInterval)
+ on := viper.GetBool("storageBackFill.on")
+ if on {
+ go backFillStorage(w)
}
+ w.Execute(queueRecheckInterval, on)
+}
+
+func backFillStorage(w watcher.IStorageWatcher) {
+ // configure archival rpc client
+ archivalRPCPath := viper.GetString("storageBackFill.rpcPath")
+ if archivalRPCPath == "" {
+ LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided"))
+ }
+ rawRPCClient, dialErr := rpc.Dial(archivalRPCPath)
+ if dialErr != nil {
+ LogWithCommand.Fatal(dialErr)
+ }
+ rpcClient := client.NewRpcClient(rawRPCClient, archivalRPCPath)
+ // find min deployment block
+ minDeploymentBlock := viper.GetInt("storageBackFill.startingBlock")
+ stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient)
+ backFiller := storage.NewStorageBackFiller(stateDiffFetcher)
+ w.BackFill(backFiller, minDeploymentBlock)
}
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md
index c4782c54..007120ed 100644
--- a/documentation/custom-transformers.md
+++ b/documentation/custom-transformers.md
@@ -194,3 +194,19 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
}
}
```
+
+### Storage backfilling
+Storage transformers stream data from a geth subscription or parity csv file where the storage diffs are produced and emitted as the
+full sync progresses. If the transformers have missed consuming a range of diffs due to lag in the startup of the processes or due to misalignment of the sync,
+we can configure our storage transformers to backfill missing diffs from a [modified archival geth client](https://github.com/vulcanize/go-ethereum/tree/statediff_at).
+
+To do so, add the following fields to the config file.
+```toml
+[storageBackFill]
+ on = false
+ rpcPath = ""
+ startingBlock = 0
+```
+- `on` is set to `true` to turn the backfill process on
+- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs
+- `startingBlock` is the block height at which we want to begin the backfill process; the height of the earliest contract deployment
diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
index 27d505a0..7dd7da9d 100644
--- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
+++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go
@@ -1,17 +1,18 @@
-// Copyright 2019 Vulcanize
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
package fetcher
import (
@@ -62,14 +63,15 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
+ if formatErr != nil {
+ errs <- formatErr
+ continue
+ }
logrus.Trace("adding storage diff to out channel",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
- if formatErr != nil {
- errs <- formatErr
- }
out <- diff
}
diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
index ca87e1b3..dff1b863 100644
--- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
+++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go
@@ -1,16 +1,18 @@
-// Copyright 2019 Vulcanize
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
package fetcher_test
@@ -21,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
diff --git a/libraries/shared/fetcher/state_diff_fetcher.go b/libraries/shared/fetcher/state_diff_fetcher.go
index 087655c2..a24f94f8 100644
--- a/libraries/shared/fetcher/state_diff_fetcher.go
+++ b/libraries/shared/fetcher/state_diff_fetcher.go
@@ -17,13 +17,14 @@
package fetcher
import (
+ "fmt"
"github.com/ethereum/go-ethereum/statediff"
- "github.com/vulcanize/vulcanizedb/pkg/geth/client"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/client"
)
-// IStateDiffFetcher is the state diff fetching interface
-type IStateDiffFetcher interface {
+// StateDiffFetcher is the state diff fetching interface
+type StateDiffFetcher interface {
FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error)
}
@@ -32,23 +33,23 @@ type BatchClient interface {
BatchCall(batch []client.BatchElem) error
}
-// StateDiffFetcher is the state diff fetching struct
-type StateDiffFetcher struct {
+// stateDiffFetcher is the state diff fetching struct
+type stateDiffFetcher struct {
client BatchClient
}
const method = "statediff_stateDiffAt"
// NewStateDiffFetcher returns a IStateDiffFetcher
-func NewStateDiffFetcher(bc BatchClient) IStateDiffFetcher {
- return &StateDiffFetcher{
+func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher {
+ return &stateDiffFetcher{
client: bc,
}
}
// FetchStateDiffsAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
-func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) {
+func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) {
batch := make([]client.BatchElem, 0)
for _, height := range blockHeights {
batch = append(batch, client.BatchElem{
@@ -57,14 +58,14 @@ func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*stated
Result: new(statediff.Payload),
})
}
- batchErr := sdf.client.BatchCall(batch)
+ batchErr := fetcher.client.BatchCall(batch)
if batchErr != nil {
- return nil, batchErr
+ return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error())
}
results := make([]*statediff.Payload, 0, len(blockHeights))
for _, batchElem := range batch {
if batchElem.Error != nil {
- return nil, batchElem.Error
+ return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error())
}
results = append(results, batchElem.Result.(*statediff.Payload))
}
diff --git a/libraries/shared/fetcher/state_diff_fetcher_test.go b/libraries/shared/fetcher/state_diff_fetcher_test.go
index 7ff33899..257032cc 100644
--- a/libraries/shared/fetcher/state_diff_fetcher_test.go
+++ b/libraries/shared/fetcher/state_diff_fetcher_test.go
@@ -18,66 +18,25 @@ package fetcher_test
import (
"bytes"
- "encoding/json"
- "errors"
-
- "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
- "github.com/vulcanize/vulcanizedb/pkg/geth/client"
)
-type mockClient struct {
- MappedStateDiffAt map[uint64][]byte
-}
-
-// SetReturnDiffAt method to set what statediffs the mock client returns
-func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
- if mc.MappedStateDiffAt == nil {
- mc.MappedStateDiffAt = make(map[uint64][]byte)
- }
- by, err := json.Marshal(diffPayload)
- if err != nil {
- return err
- }
- mc.MappedStateDiffAt[height] = by
- return nil
-}
-
-// BatchCall mockClient method to simulate batch call to geth
-func (mc *mockClient) BatchCall(batch []client.BatchElem) error {
- if mc.MappedStateDiffAt == nil {
- return errors.New("mockclient needs to be initialized with statediff payloads and errors")
- }
- for _, batchElem := range batch {
- if len(batchElem.Args) != 1 {
- return errors.New("expected batch elem to contain single argument")
- }
- blockHeight, ok := batchElem.Args[0].(uint64)
- if !ok {
- return errors.New("expected batch elem argument to be a uint64")
- }
- err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
var _ = Describe("StateDiffFetcher", func() {
Describe("FetchStateDiffsAt", func() {
var (
- mc *mockClient
- stateDiffFetcher fetcher.IStateDiffFetcher
+ mc *mocks.BackFillerClient
+ stateDiffFetcher fetcher.StateDiffFetcher
)
BeforeEach(func() {
- mc = new(mockClient)
+ mc = new(mocks.BackFillerClient)
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
Expect(setDiffAtErr1).ToNot(HaveOccurred())
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go
new file mode 100644
index 00000000..26754998
--- /dev/null
+++ b/libraries/shared/mocks/backfiller.go
@@ -0,0 +1,39 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+
+// BackFiller mock for tests
+type BackFiller struct {
+ StorageDiffsToReturn []utils.StorageDiff
+ BackFillErr error
+ PassedStartingBlock uint64
+ PassedEndingBlock uint64
+}
+
+// SetStorageDiffsToReturn for tests
+func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) {
+ backFiller.StorageDiffsToReturn = diffs
+}
+
+// BackFill mock method
+func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) {
+ backFiller.PassedStartingBlock = startingBlock
+ backFiller.PassedEndingBlock = endingBlock
+ return backFiller.StorageDiffsToReturn, backFiller.BackFillErr
+}
diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go
new file mode 100644
index 00000000..d2156f8d
--- /dev/null
+++ b/libraries/shared/mocks/batch_client.go
@@ -0,0 +1,64 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "encoding/json"
+ "errors"
+
+ "github.com/ethereum/go-ethereum/statediff"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/client"
+)
+
+// BackFillerClient is a mock client for use in backfiller tests
+type BackFillerClient struct {
+ MappedStateDiffAt map[uint64][]byte
+}
+
+// SetReturnDiffAt method to set what statediffs the mock client returns
+func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
+ if mc.MappedStateDiffAt == nil {
+ mc.MappedStateDiffAt = make(map[uint64][]byte)
+ }
+ by, err := json.Marshal(diffPayload)
+ if err != nil {
+ return err
+ }
+ mc.MappedStateDiffAt[height] = by
+ return nil
+}
+
+// BatchCall mockClient method to simulate batch call to geth
+func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
+ if mc.MappedStateDiffAt == nil {
+ return errors.New("mockclient needs to be initialized with statediff payloads and errors")
+ }
+ for _, batchElem := range batch {
+ if len(batchElem.Args) != 1 {
+ return errors.New("expected batch elem to contain single argument")
+ }
+ blockHeight, ok := batchElem.Args[0].(uint64)
+ if !ok {
+ return errors.New("expected batch elem argument to be a uint64")
+ }
+ err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/libraries/shared/mocks/state_diff_fetcher.go b/libraries/shared/mocks/state_diff_fetcher.go
new file mode 100644
index 00000000..52c06803
--- /dev/null
+++ b/libraries/shared/mocks/state_diff_fetcher.go
@@ -0,0 +1,48 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import (
+ "errors"
+
+ "github.com/ethereum/go-ethereum/statediff"
+)
+
+// StateDiffFetcher mock for tests
+type StateDiffFetcher struct {
+ PayloadsToReturn map[uint64]*statediff.Payload
+ FetchErr error
+ CalledAtBlockHeights [][]uint64
+}
+
+// SetPayloadsToReturn for tests
+func (fetcher *StateDiffFetcher) SetPayloadsToReturn(payloads map[uint64]*statediff.Payload) {
+ fetcher.PayloadsToReturn = payloads
+}
+
+// FetchStateDiffsAt mock method
+func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) {
+ fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
+ if fetcher.PayloadsToReturn == nil {
+ return nil, errors.New("MockStateDiffFetcher needs to be initialized with payloads to return")
+ }
+ results := make([]*statediff.Payload, 0, len(blockHeights))
+ for _, height := range blockHeights {
+ results = append(results, fetcher.PayloadsToReturn[height])
+ }
+ return results, nil
+}
diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go
index 16c1ad93..862470f7 100644
--- a/libraries/shared/mocks/storage_fetcher.go
+++ b/libraries/shared/mocks/storage_fetcher.go
@@ -18,16 +18,19 @@ package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
-type MockStorageFetcher struct {
+// ClosingStorageFetcher is a mock fetcher for use in tests without backfilling
+type ClosingStorageFetcher struct {
DiffsToReturn []utils.StorageDiff
ErrsToReturn []error
}
-func NewMockStorageFetcher() *MockStorageFetcher {
- return &MockStorageFetcher{}
+// NewClosingStorageFetcher returns a new ClosingStorageFetcher
+func NewClosingStorageFetcher() *ClosingStorageFetcher {
+ return &ClosingStorageFetcher{}
}
-func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
+// FetchStorageDiffs mock method
+func (fetcher *ClosingStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
defer close(out)
defer close(errs)
for _, err := range fetcher.ErrsToReturn {
@@ -37,3 +40,24 @@ func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDif
out <- diff
}
}
+
+// StorageFetcher is a mock fetcher for use in tests with backfilling
+type StorageFetcher struct {
+ DiffsToReturn []utils.StorageDiff
+ ErrsToReturn []error
+}
+
+// NewStorageFetcher returns a new StorageFetcher
+func NewStorageFetcher() *StorageFetcher {
+ return &StorageFetcher{}
+}
+
+// FetchStorageDiffs mock method
+func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
+ for _, err := range fetcher.ErrsToReturn {
+ errs <- err
+ }
+ for _, diff := range fetcher.DiffsToReturn {
+ out <- diff
+ }
+}
diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go
index ed2b9275..a3ecc709 100644
--- a/libraries/shared/mocks/storage_queue.go
+++ b/libraries/shared/mocks/storage_queue.go
@@ -20,27 +20,31 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
+// MockStorageQueue for tests
type MockStorageQueue struct {
- AddCalled bool
- AddError error
- AddPassedDiff utils.StorageDiff
- DeleteErr error
- DeletePassedID int
- GetAllErr error
- DiffsToReturn []utils.StorageDiff
+ AddCalled bool
+ AddError error
+ AddPassedDiffs []utils.StorageDiff
+ DeleteErr error
+ DeletePassedIds []int
+ GetAllErr error
+ DiffsToReturn []utils.StorageDiff
}
+// Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
queue.AddCalled = true
- queue.AddPassedDiff = diff
+ queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff)
return queue.AddError
}
+// Delete mock method
func (queue *MockStorageQueue) Delete(id int) error {
- queue.DeletePassedID = id
+ queue.DeletePassedIds = append(queue.DeletePassedIds, id)
return queue.DeleteErr
}
+// GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
return queue.DiffsToReturn, queue.GetAllErr
}
diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go
index b1c3cba2..2f126fd6 100644
--- a/libraries/shared/mocks/storage_transformer.go
+++ b/libraries/shared/mocks/storage_transformer.go
@@ -18,26 +18,31 @@ package mocks
import (
"github.com/ethereum/go-ethereum/common"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
+// MockStorageTransformer for tests
type MockStorageTransformer struct {
KeccakOfAddress common.Hash
ExecuteErr error
- PassedDiff utils.StorageDiff
+ PassedDiffs []utils.StorageDiff
}
+// Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error {
- transformer.PassedDiff = diff
+ transformer.PassedDiffs = append(transformer.PassedDiffs, diff)
return transformer.ExecuteErr
}
+// KeccakContractAddress mock method
func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash {
return transformer.KeccakOfAddress
}
+// FakeTransformerInitializer mock method
func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer {
return transformer
}
diff --git a/libraries/shared/repository/address_repository_test.go b/libraries/shared/repository/address_repository_test.go
index 5abb4222..7e1c8b16 100644
--- a/libraries/shared/repository/address_repository_test.go
+++ b/libraries/shared/repository/address_repository_test.go
@@ -17,14 +17,14 @@
package repository_test
import (
- "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
- "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"strings"
"github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
diff --git a/libraries/shared/storage/backfill.go b/libraries/shared/storage/backfill.go
index 70a14d3b..4cbe29ac 100644
--- a/libraries/shared/storage/backfill.go
+++ b/libraries/shared/storage/backfill.go
@@ -17,12 +17,9 @@
package storage
import (
- "bytes"
"errors"
"fmt"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
@@ -31,52 +28,39 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
-// IBackFiller is the backfilling interface
-type IBackFiller interface {
- BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error)
+// BackFiller is the backfilling interface
+type BackFiller interface {
+ BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error)
}
-// BackFiller is the backfilling struct
-type BackFiller struct {
- sdf fetcher.IStateDiffFetcher
+// backFiller is the backfilling struct
+type backFiller struct {
+ fetcher fetcher.StateDiffFetcher
}
-// BackFillerArgs are used to pass configuration params to the backfiller
-type BackFillerArgs struct {
- // mapping of hashed addresses to a list of the storage key hashes we want to collect at that address
- WantedStorage map[common.Hash][]common.Hash
- StartingBlock uint64
- EndingBlock uint64
-}
-
-// NewStorageBackFiller returns a IBackFiller
-func NewStorageBackFiller(bc fetcher.BatchClient) IBackFiller {
- return &BackFiller{
- sdf: fetcher.NewStateDiffFetcher(bc),
+// NewStorageBackFiller returns a BackFiller
+func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher) BackFiller {
+ return &backFiller{
+ fetcher: fetcher,
}
}
// BackFill uses the provided config to fetch and return the state diff at the specified blocknumber
// StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
-func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error) {
- results := make(map[common.Hash][]utils.StorageDiff, len(bfa.WantedStorage))
- if bfa.EndingBlock < bfa.StartingBlock {
+func (bf *backFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) {
+ results := make([]utils.StorageDiff, 0)
+ if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
}
- blockHeights := make([]uint64, 0, bfa.EndingBlock-bfa.StartingBlock+1)
- for i := bfa.StartingBlock; i <= bfa.EndingBlock; i++ {
+ blockHeights := make([]uint64, 0, endingBlock-startingBlock+1)
+ for i := startingBlock; i <= endingBlock; i++ {
blockHeights = append(blockHeights, i)
}
- payloads, err := bf.sdf.FetchStateDiffsAt(blockHeights)
+ payloads, err := bf.fetcher.FetchStateDiffsAt(blockHeights)
if err != nil {
return nil, err
}
for _, payload := range payloads {
- block := new(types.Block)
- blockDecodeErr := rlp.DecodeBytes(payload.BlockRlp, block)
- if blockDecodeErr != nil {
- return nil, blockDecodeErr
- }
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
@@ -84,42 +68,20 @@ func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.Stor
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
- if wantedHashedAddress(bfa.WantedStorage, common.BytesToHash(account.Key)) {
- logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
- for _, storage := range account.Storage {
- if wantedHashedStorageKey(bfa.WantedStorage[common.BytesToHash(account.Key)], storage.Key) {
- diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
- logrus.Trace("adding storage diff to out channel",
- "keccak of address: ", diff.HashedAddress.Hex(),
- "block height: ", diff.BlockHeight,
- "storage key: ", diff.StorageKey.Hex(),
- "storage value: ", diff.StorageValue.Hex())
- if formatErr != nil {
- return nil, formatErr
- }
- results[diff.HashedAddress] = append(results[diff.HashedAddress], diff)
- }
+ logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
+ for _, storage := range account.Storage {
+ diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
+ if formatErr != nil {
+ return nil, formatErr
}
+ logrus.Trace("adding storage diff to results",
+ "keccak of address: ", diff.HashedAddress.Hex(),
+ "block height: ", diff.BlockHeight,
+ "storage key: ", diff.StorageKey.Hex(),
+ "storage value: ", diff.StorageValue.Hex())
+ results = append(results, diff)
}
}
}
return results, nil
}
-
-func wantedHashedAddress(wantedStorage map[common.Hash][]common.Hash, hashedKey common.Hash) bool {
- for addrHash := range wantedStorage {
- if bytes.Equal(addrHash.Bytes(), hashedKey.Bytes()) {
- return true
- }
- }
- return false
-}
-
-func wantedHashedStorageKey(wantedKeys []common.Hash, keyBytes []byte) bool {
- for _, key := range wantedKeys {
- if bytes.Equal(key.Bytes(), keyBytes) {
- return true
- }
- }
- return false
-}
diff --git a/libraries/shared/storage/backfill_test.go b/libraries/shared/storage/backfill_test.go
index 7dd24f8d..4b3304e2 100644
--- a/libraries/shared/storage/backfill_test.go
+++ b/libraries/shared/storage/backfill_test.go
@@ -18,94 +18,43 @@ package storage_test
import (
"bytes"
- "encoding/json"
- "errors"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
- "github.com/vulcanize/vulcanizedb/pkg/geth/client"
)
-type mockClient struct {
- MappedStateDiffAt map[uint64][]byte
-}
-
-// SetReturnDiffAt method to set what statediffs the mock client returns
-func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
- if mc.MappedStateDiffAt == nil {
- mc.MappedStateDiffAt = make(map[uint64][]byte)
- }
- by, err := json.Marshal(diffPayload)
- if err != nil {
- return err
- }
- mc.MappedStateDiffAt[height] = by
- return nil
-}
-
-// BatchCall mockClient method to simulate batch call to geth
-func (mc *mockClient) BatchCall(batch []client.BatchElem) error {
- if mc.MappedStateDiffAt == nil {
- return errors.New("mockclient needs to be initialized with statediff payloads and errors")
- }
- for _, batchElem := range batch {
- if len(batchElem.Args) != 1 {
- return errors.New("expected batch elem to contain single argument")
- }
- blockHeight, ok := batchElem.Args[0].(uint64)
- if !ok {
- return errors.New("expected batch elem argument to be a uint64")
- }
- err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
- if err != nil {
- return err
- }
- }
- return nil
-}
-
var _ = Describe("BackFiller", func() {
Describe("BackFill", func() {
var (
- mc *mockClient
- backFiller storage.IBackFiller
+ fetcher *mocks.StateDiffFetcher
+ backFiller storage.BackFiller
)
BeforeEach(func() {
- mc = new(mockClient)
- setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
- Expect(setDiffAtErr1).ToNot(HaveOccurred())
- setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
- Expect(setDiffAtErr2).ToNot(HaveOccurred())
- backFiller = storage.NewStorageBackFiller(mc)
+ fetcher = new(mocks.StateDiffFetcher)
+ fetcher.SetPayloadsToReturn(map[uint64]*statediff.Payload{
+ test_data.BlockNumber.Uint64(): &test_data.MockStatediffPayload,
+ test_data.BlockNumber2.Uint64(): &test_data.MockStatediffPayload2,
+ })
+ backFiller = storage.NewStorageBackFiller(fetcher)
})
It("Batch calls statediff_stateDiffAt", func() {
- backFillArgs := storage.BackFillerArgs{
- WantedStorage: map[common.Hash][]common.Hash{
- test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
- test_data.AnotherContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
- },
- StartingBlock: test_data.BlockNumber.Uint64(),
- EndingBlock: test_data.BlockNumber2.Uint64(),
- }
- backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
+ backFillStorage, backFillErr := backFiller.BackFill(test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64())
Expect(backFillErr).ToNot(HaveOccurred())
- Expect(len(backFillStorage)).To(Equal(2))
- Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1))
- Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(3))
- Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff))
+ Expect(len(backFillStorage)).To(Equal(4))
// Can only rlp encode the slice of diffs as part of a struct
// Rlp encoding allows us to compare content of the slices when the order in the slice may vary
expectedDiffStruct := struct {
diffs []utils.StorageDiff
}{
[]utils.StorageDiff{
+ test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff,
@@ -116,40 +65,11 @@ var _ = Describe("BackFiller", func() {
receivedDiffStruct := struct {
diffs []utils.StorageDiff
}{
- backFillStorage[test_data.AnotherContractLeafKey],
+ backFillStorage,
}
receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue())
})
-
- It("Only returns storage for provided addresses (address hashes)", func() {
- backFillArgs := storage.BackFillerArgs{
- WantedStorage: map[common.Hash][]common.Hash{
- test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
- },
- StartingBlock: test_data.BlockNumber.Uint64(),
- EndingBlock: test_data.BlockNumber2.Uint64(),
- }
- backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
- Expect(backFillErr).ToNot(HaveOccurred())
- Expect(len(backFillStorage)).To(Equal(1))
- Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1))
- Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(0))
- Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff))
- })
-
- It("Only returns storage for provided storage keys", func() {
- backFillArgs := storage.BackFillerArgs{
- WantedStorage: map[common.Hash][]common.Hash{
- test_data.ContractLeafKey: nil,
- },
- StartingBlock: test_data.BlockNumber.Uint64(),
- EndingBlock: test_data.BlockNumber2.Uint64(),
- }
- backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
- Expect(backFillErr).ToNot(HaveOccurred())
- Expect(len(backFillStorage)).To(Equal(0))
- })
})
})
diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go
index d749b773..ef709842 100644
--- a/libraries/shared/test_data/statediff.go
+++ b/libraries/shared/test_data/statediff.go
@@ -151,6 +151,7 @@ var (
}
CreatedExpectedStorageDiff = utils.StorageDiff{
+ Id: 1333,
HashedAddress: common.BytesToHash(ContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()),
@@ -158,6 +159,7 @@ var (
StorageValue: common.BytesToHash(SmallStorageValue),
}
UpdatedExpectedStorageDiff = utils.StorageDiff{
+ Id: 1334,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()),
@@ -165,6 +167,7 @@ var (
StorageValue: common.BytesToHash(LargeStorageValue),
}
UpdatedExpectedStorageDiff2 = utils.StorageDiff{
+ Id: 1335,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber2.Int64()),
@@ -172,6 +175,7 @@ var (
StorageValue: common.BytesToHash(SmallStorageValue),
}
DeletedExpectedStorageDiff = utils.StorageDiff{
+ Id: 1336,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()),
diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go
index 76a99e95..6ff1f771 100644
--- a/libraries/shared/watcher/storage_watcher.go
+++ b/libraries/shared/watcher/storage_watcher.go
@@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
@@ -31,7 +32,8 @@ import (
type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer)
- Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration)
+ Execute(queueRecheckInterval time.Duration, backFill bool)
+ BackFill(backFiller storage.BackFiller, minDeploymentBlock int)
}
type StorageWatcher struct {
@@ -39,6 +41,9 @@ type StorageWatcher struct {
StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
+ DiffsChan chan utils.StorageDiff
+ ErrsChan chan error
+ StartingSyncBlockChan chan int
}
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
@@ -47,6 +52,9 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage
return StorageWatcher{
db: db,
StorageFetcher: fetcher,
+ DiffsChan: make(chan utils.StorageDiff),
+ ErrsChan: make(chan error),
+ StartingSyncBlockChan: make(chan int),
Queue: queue,
KeccakAddressTransformers: transformers,
}
@@ -59,14 +67,33 @@ func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.
}
}
-func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) {
+// BackFill configures the StorageWatcher to backfill missed storage diffs using a modified archival geth client
+func (storageWatcher StorageWatcher) BackFill(backFiller storage.BackFiller, minDeploymentBlock int) {
+ // need to learn which block to perform the backfill process up to
+ // this blocks until the Execute process sends us the first block number it sees
+ startingSyncBlock := <-storageWatcher.StartingSyncBlockChan
+ backFilledDiffs, err := backFiller.BackFill(uint64(minDeploymentBlock), uint64(startingSyncBlock))
+ if err != nil {
+ storageWatcher.ErrsChan <- err
+ }
+ for _, storageDiff := range backFilledDiffs {
+ storageWatcher.DiffsChan <- storageDiff
+ }
+}
+
+func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration, backFill bool) {
ticker := time.NewTicker(queueRecheckInterval)
- go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan)
+ go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan)
+ start := true
for {
select {
- case fetchErr := <-errsChan:
+ case fetchErr := <-storageWatcher.ErrsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
- case diff := <-diffsChan:
+ case diff := <-storageWatcher.DiffsChan:
+ if start && backFill {
+ storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1
+ start = false
+ }
storageWatcher.processRow(diff)
case <-ticker.C:
storageWatcher.processQueue()
diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go
index 391dfd6c..29f64672 100644
--- a/libraries/shared/watcher/storage_watcher_test.go
+++ b/libraries/shared/watcher/storage_watcher_test.go
@@ -22,11 +22,14 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/rlp"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
+
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
+ "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
@@ -38,7 +41,7 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress}
- w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
+ w := watcher.NewStorageWatcher(mocks.NewClosingStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
@@ -48,21 +51,17 @@ var _ = Describe("Storage Watcher", func() {
Describe("Execute", func() {
var (
- errs chan error
- mockFetcher *mocks.MockStorageFetcher
+ mockFetcher *mocks.ClosingStorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
- diffs chan utils.StorageDiff
storageWatcher watcher.StorageWatcher
hashedAddress common.Hash
)
BeforeEach(func() {
- errs = make(chan error)
- diffs = make(chan utils.StorageDiff)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
- mockFetcher = mocks.NewMockStorageFetcher()
+ mockFetcher = mocks.NewClosingStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{
@@ -85,7 +84,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
@@ -103,25 +102,28 @@ var _ = Describe("Storage Watcher", func() {
})
It("executes transformer for recognized storage diff", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
- Eventually(func() utils.StorageDiff {
- return mockTransformer.PassedDiff
- }).Should(Equal(csvDiff))
+ Eventually(func() []utils.StorageDiff {
+ return mockTransformer.PassedDiffs
+ }).Should(Equal([]utils.StorageDiff{csvDiff}))
close(done)
})
It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
- Expect(<-errs).To(BeNil())
+ Expect(<-storageWatcher.ErrsChan).To(BeNil())
Eventually(func() bool {
return mockQueue.AddCalled
}).Should(BeTrue())
Eventually(func() utils.StorageDiff {
- return mockQueue.AddPassedDiff
+ if len(mockQueue.AddPassedDiffs) > 0 {
+ return mockQueue.AddPassedDiffs[0]
+ }
+ return utils.StorageDiff{}
}).Should(Equal(csvDiff))
close(done)
})
@@ -134,7 +136,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Hour)
+ go storageWatcher.Execute(time.Hour, false)
Eventually(func() bool {
return mockQueue.AddCalled
@@ -156,19 +158,25 @@ var _ = Describe("Storage Watcher", func() {
})
It("executes transformer for storage diff", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() utils.StorageDiff {
- return mockTransformer.PassedDiff
+ if len(mockTransformer.PassedDiffs) > 0 {
+ return mockTransformer.PassedDiffs[0]
+ }
+ return utils.StorageDiff{}
}).Should(Equal(csvDiff))
close(done)
})
It("deletes diff from queue if transformer execution successful", func(done Done) {
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int {
- return mockQueue.DeletePassedID
+ if len(mockQueue.DeletePassedIds) > 0 {
+ return mockQueue.DeletePassedIds[0]
+ }
+ return 0
}).Should(Equal(csvDiff.ID))
close(done)
})
@@ -180,7 +188,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
@@ -196,10 +204,13 @@ var _ = Describe("Storage Watcher", func() {
}
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int {
- return mockQueue.DeletePassedID
+ if len(mockQueue.DeletePassedIds) > 0 {
+ return mockQueue.DeletePassedIds[0]
+ }
+ return 0
}).Should(Equal(obsoleteDiff.ID))
close(done)
})
@@ -216,7 +227,127 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
- go storageWatcher.Execute(diffs, errs, time.Nanosecond)
+ go storageWatcher.Execute(time.Nanosecond, false)
+
+ Eventually(func() (string, error) {
+ logContent, err := ioutil.ReadFile(tempFile.Name())
+ return string(logContent), err
+ }).Should(ContainSubstring(fakes.FakeError.Error()))
+ close(done)
+ })
+ })
+ })
+
+ Describe("BackFill", func() {
+ var (
+ mockFetcher *mocks.StorageFetcher
+ mockBackFiller *mocks.BackFiller
+ mockQueue *mocks.MockStorageQueue
+ mockTransformer *mocks.MockStorageTransformer
+ csvDiff utils.StorageDiff
+ storageWatcher watcher.StorageWatcher
+ hashedAddress common.Hash
+ )
+
+ BeforeEach(func() {
+ mockBackFiller = new(mocks.BackFiller)
+ mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
+ test_data.CreatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff2,
+ test_data.DeletedExpectedStorageDiff})
+ hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
+ mockFetcher = mocks.NewStorageFetcher()
+ mockQueue = &mocks.MockStorageQueue{}
+ mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
+ csvDiff = utils.StorageDiff{
+ ID: 1337,
+ HashedAddress: hashedAddress,
+ BlockHash: common.HexToHash("0xfedcba9876543210"),
+ BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
+ StorageKey: common.HexToHash("0xabcdef1234567890"),
+ StorageValue: common.HexToHash("0x9876543210abcdef"),
+ }
+ })
+
+ Describe("transforming streamed and backfilled queued storage diffs", func() {
+ BeforeEach(func() {
+ mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
+ mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff,
+ test_data.CreatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff2,
+ test_data.DeletedExpectedStorageDiff}
+ storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
+ storageWatcher.Queue = mockQueue
+ storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
+ })
+
+ It("executes transformer for storage diffs", func(done Done) {
+ go storageWatcher.BackFill(mockBackFiller, int(test_data.BlockNumber.Uint64()))
+ go storageWatcher.Execute(time.Nanosecond, true)
+ expectedDiffsStruct := struct {
+ diffs []utils.StorageDiff
+ }{
+ []utils.StorageDiff{
+ csvDiff,
+ test_data.CreatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff,
+ test_data.UpdatedExpectedStorageDiff2,
+ test_data.DeletedExpectedStorageDiff,
+ },
+ }
+ expectedDiffsBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffsStruct)
+ Expect(rlpErr1).ToNot(HaveOccurred())
+ Eventually(func() []byte {
+ diffsStruct := struct {
+ diffs []utils.StorageDiff
+ }{
+ mockTransformer.PassedDiffs,
+ }
+ diffsBytes, rlpErr2 := rlp.EncodeToBytes(diffsStruct)
+ Expect(rlpErr2).ToNot(HaveOccurred())
+ return diffsBytes
+ }).Should(Equal(expectedDiffsBytes))
+ close(done)
+ })
+
+ It("deletes diffs from queue if transformer execution is successful", func(done Done) {
+ go storageWatcher.Execute(time.Nanosecond, false)
+ expectedIdsStruct := struct {
+ diffs []int
+ }{
+ []int{
+ csvDiff.ID,
+ test_data.CreatedExpectedStorageDiff.ID,
+ test_data.UpdatedExpectedStorageDiff.ID,
+ test_data.UpdatedExpectedStorageDiff2.ID,
+ test_data.DeletedExpectedStorageDiff.ID,
+ },
+ }
+ expectedIdsBytes, rlpErr1 := rlp.EncodeToBytes(expectedIdsStruct)
+ Expect(rlpErr1).ToNot(HaveOccurred())
+ Eventually(func() []byte {
+ idsStruct := struct {
+ diffs []int
+ }{
+ mockQueue.DeletePassedIds,
+ }
+ idsBytes, rlpErr2 := rlp.EncodeToBytes(idsStruct)
+ Expect(rlpErr2).ToNot(HaveOccurred())
+ return idsBytes
+ }).Should(Equal(expectedIdsBytes))
+ close(done)
+ })
+
+ It("logs error if deleting persisted diff fails", func(done Done) {
+ mockQueue.DeleteErr = fakes.FakeError
+ tempFile, fileErr := ioutil.TempFile("", "log")
+ Expect(fileErr).NotTo(HaveOccurred())
+ defer os.Remove(tempFile.Name())
+ logrus.SetOutput(tempFile)
+
+ go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())