From 37f4a2d6035ee86ecd07c77b4fb1813e630357e0 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 24 Oct 2019 11:35:39 -0500 Subject: [PATCH] integrate backfill into storage watcher; documentation for storage backfill --- cmd/execute.go | 34 +++- documentation/custom-transformers.md | 16 ++ .../fetcher/geth_rpc_storage_fetcher.go | 34 ++-- .../fetcher/geth_rpc_storage_fetcher_test.go | 29 +-- .../shared/fetcher/state_diff_fetcher.go | 23 +-- .../shared/fetcher/state_diff_fetcher_test.go | 51 +---- libraries/shared/mocks/backfiller.go | 39 ++++ libraries/shared/mocks/batch_client.go | 64 +++++++ libraries/shared/mocks/state_diff_fetcher.go | 48 +++++ libraries/shared/mocks/storage_fetcher.go | 32 +++- libraries/shared/mocks/storage_queue.go | 22 ++- libraries/shared/mocks/storage_transformer.go | 9 +- .../repository/address_repository_test.go | 4 +- libraries/shared/storage/backfill.go | 92 +++------ libraries/shared/storage/backfill_test.go | 106 ++--------- libraries/shared/test_data/statediff.go | 4 + libraries/shared/watcher/storage_watcher.go | 37 +++- .../shared/watcher/storage_watcher_test.go | 179 +++++++++++++++--- 18 files changed, 526 insertions(+), 297 deletions(-) create mode 100644 libraries/shared/mocks/backfiller.go create mode 100644 libraries/shared/mocks/batch_client.go create mode 100644 libraries/shared/mocks/state_diff_fetcher.go diff --git a/cmd/execute.go b/cmd/execute.go index 2f2c31ea..097fa9f1 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -17,21 +17,25 @@ package cmd import ( + "errors" "fmt" "plugin" syn "sync" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" - storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/utils" ) @@ -186,13 +190,29 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the StorageTransformerInitializer set using the storage watcher LogWithCommand.Info("executing storage transformers") - ticker := time.NewTicker(pollingInterval) - defer ticker.Stop() - for range ticker.C { - errs := make(chan error) - diffs := make(chan storageUtils.StorageDiff) - w.Execute(diffs, errs, queueRecheckInterval) + on := viper.GetBool("storageBackFill.on") + if on { + go backFillStorage(w) } + w.Execute(queueRecheckInterval, on) +} + +func backFillStorage(w watcher.IStorageWatcher) { + // configure archival rpc client + archivalRPCPath := viper.GetString("storageBackFill.rpcPath") + if archivalRPCPath == "" { + LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided")) + } + rawRPCClient, dialErr := rpc.Dial(archivalRPCPath) + if dialErr != nil { + LogWithCommand.Fatal(dialErr) + } + rpcClient := client.NewRpcClient(rawRPCClient, archivalRPCPath) + // find min deployment block + minDeploymentBlock := viper.GetInt("storageBackFill.startingBlock") + stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient) + backFiller := storage.NewStorageBackFiller(stateDiffFetcher) + w.BackFill(backFiller, minDeploymentBlock) } func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md index c4782c54..007120ed 100644 --- a/documentation/custom-transformers.md +++ b/documentation/custom-transformers.md @@ -194,3 +194,19 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface } } ``` + +### Storage backfilling +Storage transformers stream data from a geth subscription or parity csv file where the storage diffs are produced and emitted as the +full sync progresses. If the transformers have missed consuming a range of diffs due to lag in the startup of the processes or due to misalignment of the sync, +we can configure our storage transformers to backfill missing diffs from a [modified archival geth client](https://github.com/vulcanize/go-ethereum/tree/statediff_at). + +To do so, add the following fields to the config file. +```toml +[storageBackFill] + on = false + rpcPath = "" + startingBlock = 0 +``` +- `on` is set to `true` to turn the backfill process on +- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs +- `startingBlock` is the block height at which we want to begin the backfill process; the height of the earliest contract deployment diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index 27d505a0..7dd7da9d 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -1,17 +1,18 @@ -// Copyright 2019 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// VulcanizeDB +// Copyright © 2019 Vulcanize +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . package fetcher import ( @@ -62,14 +63,15 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) for _, storage := range account.Storage { diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) + if formatErr != nil { + errs <- formatErr + continue + } logrus.Trace("adding storage diff to out channel", "keccak of address: ", diff.HashedAddress.Hex(), "block height: ", diff.BlockHeight, "storage key: ", diff.StorageKey.Hex(), "storage value: ", diff.StorageValue.Hex()) - if formatErr != nil { - errs <- formatErr - } out <- diff } diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go index ca87e1b3..dff1b863 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -1,16 +1,18 @@ -// Copyright 2019 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . package fetcher_test @@ -21,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" diff --git a/libraries/shared/fetcher/state_diff_fetcher.go b/libraries/shared/fetcher/state_diff_fetcher.go index 087655c2..a24f94f8 100644 --- a/libraries/shared/fetcher/state_diff_fetcher.go +++ b/libraries/shared/fetcher/state_diff_fetcher.go @@ -17,13 +17,14 @@ package fetcher import ( + "fmt" "github.com/ethereum/go-ethereum/statediff" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" ) -// IStateDiffFetcher is the state diff fetching interface -type IStateDiffFetcher interface { +// StateDiffFetcher is the state diff fetching interface +type StateDiffFetcher interface { FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) } @@ -32,23 +33,23 @@ type BatchClient interface { BatchCall(batch []client.BatchElem) error } -// StateDiffFetcher is the state diff fetching struct -type StateDiffFetcher struct { +// stateDiffFetcher is the state diff fetching struct +type stateDiffFetcher struct { client BatchClient } const method = "statediff_stateDiffAt" // NewStateDiffFetcher returns a IStateDiffFetcher -func NewStateDiffFetcher(bc BatchClient) IStateDiffFetcher { - return &StateDiffFetcher{ +func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher { + return &stateDiffFetcher{ client: bc, } } // FetchStateDiffsAt fetches the statediff payloads at the given block heights // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) -func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { +func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { batch := make([]client.BatchElem, 0) for _, height := range blockHeights { batch = append(batch, client.BatchElem{ @@ -57,14 +58,14 @@ func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*stated Result: new(statediff.Payload), }) } - batchErr := sdf.client.BatchCall(batch) + batchErr := fetcher.client.BatchCall(batch) if batchErr != nil { - return nil, batchErr + return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error()) } results := make([]*statediff.Payload, 0, len(blockHeights)) for _, batchElem := range batch { if batchElem.Error != nil { - return nil, batchElem.Error + return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error()) } results = append(results, batchElem.Result.(*statediff.Payload)) } diff --git a/libraries/shared/fetcher/state_diff_fetcher_test.go b/libraries/shared/fetcher/state_diff_fetcher_test.go index 7ff33899..257032cc 100644 --- a/libraries/shared/fetcher/state_diff_fetcher_test.go +++ b/libraries/shared/fetcher/state_diff_fetcher_test.go @@ -18,66 +18,25 @@ package fetcher_test import ( "bytes" - "encoding/json" - "errors" - - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) -type mockClient struct { - MappedStateDiffAt map[uint64][]byte -} - -// SetReturnDiffAt method to set what statediffs the mock client returns -func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error { - if mc.MappedStateDiffAt == nil { - mc.MappedStateDiffAt = make(map[uint64][]byte) - } - by, err := json.Marshal(diffPayload) - if err != nil { - return err - } - mc.MappedStateDiffAt[height] = by - return nil -} - -// BatchCall mockClient method to simulate batch call to geth -func (mc *mockClient) BatchCall(batch []client.BatchElem) error { - if mc.MappedStateDiffAt == nil { - return errors.New("mockclient needs to be initialized with statediff payloads and errors") - } - for _, batchElem := range batch { - if len(batchElem.Args) != 1 { - return errors.New("expected batch elem to contain single argument") - } - blockHeight, ok := batchElem.Args[0].(uint64) - if !ok { - return errors.New("expected batch elem argument to be a uint64") - } - err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) - if err != nil { - return err - } - } - return nil -} - var _ = Describe("StateDiffFetcher", func() { Describe("FetchStateDiffsAt", func() { var ( - mc *mockClient - stateDiffFetcher fetcher.IStateDiffFetcher + mc *mocks.BackFillerClient + stateDiffFetcher fetcher.StateDiffFetcher ) BeforeEach(func() { - mc = new(mockClient) + mc = new(mocks.BackFillerClient) setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) Expect(setDiffAtErr1).ToNot(HaveOccurred()) setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go new file mode 100644 index 00000000..26754998 --- /dev/null +++ b/libraries/shared/mocks/backfiller.go @@ -0,0 +1,39 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + +// BackFiller mock for tests +type BackFiller struct { + StorageDiffsToReturn []utils.StorageDiff + BackFillErr error + PassedStartingBlock uint64 + PassedEndingBlock uint64 +} + +// SetStorageDiffsToReturn for tests +func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) { + backFiller.StorageDiffsToReturn = diffs +} + +// BackFill mock method +func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) { + backFiller.PassedStartingBlock = startingBlock + backFiller.PassedEndingBlock = endingBlock + return backFiller.StorageDiffsToReturn, backFiller.BackFillErr +} diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go new file mode 100644 index 00000000..d2156f8d --- /dev/null +++ b/libraries/shared/mocks/batch_client.go @@ -0,0 +1,64 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "encoding/json" + "errors" + + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" +) + +// BackFillerClient is a mock client for use in backfiller tests +type BackFillerClient struct { + MappedStateDiffAt map[uint64][]byte +} + +// SetReturnDiffAt method to set what statediffs the mock client returns +func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error { + if mc.MappedStateDiffAt == nil { + mc.MappedStateDiffAt = make(map[uint64][]byte) + } + by, err := json.Marshal(diffPayload) + if err != nil { + return err + } + mc.MappedStateDiffAt[height] = by + return nil +} + +// BatchCall mockClient method to simulate batch call to geth +func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error { + if mc.MappedStateDiffAt == nil { + return errors.New("mockclient needs to be initialized with statediff payloads and errors") + } + for _, batchElem := range batch { + if len(batchElem.Args) != 1 { + return errors.New("expected batch elem to contain single argument") + } + blockHeight, ok := batchElem.Args[0].(uint64) + if !ok { + return errors.New("expected batch elem argument to be a uint64") + } + err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) + if err != nil { + return err + } + } + return nil +} diff --git a/libraries/shared/mocks/state_diff_fetcher.go b/libraries/shared/mocks/state_diff_fetcher.go new file mode 100644 index 00000000..52c06803 --- /dev/null +++ b/libraries/shared/mocks/state_diff_fetcher.go @@ -0,0 +1,48 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "errors" + + "github.com/ethereum/go-ethereum/statediff" +) + +// StateDiffFetcher mock for tests +type StateDiffFetcher struct { + PayloadsToReturn map[uint64]*statediff.Payload + FetchErr error + CalledAtBlockHeights [][]uint64 +} + +// SetPayloadsToReturn for tests +func (fetcher *StateDiffFetcher) SetPayloadsToReturn(payloads map[uint64]*statediff.Payload) { + fetcher.PayloadsToReturn = payloads +} + +// FetchStateDiffsAt mock method +func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { + fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights) + if fetcher.PayloadsToReturn == nil { + return nil, errors.New("MockStateDiffFetcher needs to be initialized with payloads to return") + } + results := make([]*statediff.Payload, 0, len(blockHeights)) + for _, height := range blockHeights { + results = append(results, fetcher.PayloadsToReturn[height]) + } + return results, nil +} diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index 16c1ad93..862470f7 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -18,16 +18,19 @@ package mocks import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" -type MockStorageFetcher struct { +// ClosingStorageFetcher is a mock fetcher for use in tests without backfilling +type ClosingStorageFetcher struct { DiffsToReturn []utils.StorageDiff ErrsToReturn []error } -func NewMockStorageFetcher() *MockStorageFetcher { - return &MockStorageFetcher{} +// NewClosingStorageFetcher returns a new ClosingStorageFetcher +func NewClosingStorageFetcher() *ClosingStorageFetcher { + return &ClosingStorageFetcher{} } -func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { +// FetchStorageDiffs mock method +func (fetcher *ClosingStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { defer close(out) defer close(errs) for _, err := range fetcher.ErrsToReturn { @@ -37,3 +40,24 @@ func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDif out <- diff } } + +// StorageFetcher is a mock fetcher for use in tests with backfilling +type StorageFetcher struct { + DiffsToReturn []utils.StorageDiff + ErrsToReturn []error +} + +// NewStorageFetcher returns a new StorageFetcher +func NewStorageFetcher() *StorageFetcher { + return &StorageFetcher{} +} + +// FetchStorageDiffs mock method +func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { + for _, err := range fetcher.ErrsToReturn { + errs <- err + } + for _, diff := range fetcher.DiffsToReturn { + out <- diff + } +} diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index ed2b9275..a3ecc709 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -20,27 +20,31 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" ) +// MockStorageQueue for tests type MockStorageQueue struct { - AddCalled bool - AddError error - AddPassedDiff utils.StorageDiff - DeleteErr error - DeletePassedID int - GetAllErr error - DiffsToReturn []utils.StorageDiff + AddCalled bool + AddError error + AddPassedDiffs []utils.StorageDiff + DeleteErr error + DeletePassedIds []int + GetAllErr error + DiffsToReturn []utils.StorageDiff } +// Add mock method func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { queue.AddCalled = true - queue.AddPassedDiff = diff + queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff) return queue.AddError } +// Delete mock method func (queue *MockStorageQueue) Delete(id int) error { - queue.DeletePassedID = id + queue.DeletePassedIds = append(queue.DeletePassedIds, id) return queue.DeleteErr } +// GetAll mock method func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { return queue.DiffsToReturn, queue.GetAllErr } diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go index b1c3cba2..2f126fd6 100644 --- a/libraries/shared/mocks/storage_transformer.go +++ b/libraries/shared/mocks/storage_transformer.go @@ -18,26 +18,31 @@ package mocks import ( "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +// MockStorageTransformer for tests type MockStorageTransformer struct { KeccakOfAddress common.Hash ExecuteErr error - PassedDiff utils.StorageDiff + PassedDiffs []utils.StorageDiff } +// Execute mock method func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { - transformer.PassedDiff = diff + transformer.PassedDiffs = append(transformer.PassedDiffs, diff) return transformer.ExecuteErr } +// KeccakContractAddress mock method func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash { return transformer.KeccakOfAddress } +// FakeTransformerInitializer mock method func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer { return transformer } diff --git a/libraries/shared/repository/address_repository_test.go b/libraries/shared/repository/address_repository_test.go index 5abb4222..7e1c8b16 100644 --- a/libraries/shared/repository/address_repository_test.go +++ b/libraries/shared/repository/address_repository_test.go @@ -17,14 +17,14 @@ package repository_test import ( - "github.com/vulcanize/vulcanizedb/libraries/shared/repository" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "strings" "github.com/jmoiron/sqlx" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/repository" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" diff --git a/libraries/shared/storage/backfill.go b/libraries/shared/storage/backfill.go index 70a14d3b..4cbe29ac 100644 --- a/libraries/shared/storage/backfill.go +++ b/libraries/shared/storage/backfill.go @@ -17,12 +17,9 @@ package storage import ( - "bytes" "errors" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" @@ -31,52 +28,39 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" ) -// IBackFiller is the backfilling interface -type IBackFiller interface { - BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error) +// BackFiller is the backfilling interface +type BackFiller interface { + BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) } -// BackFiller is the backfilling struct -type BackFiller struct { - sdf fetcher.IStateDiffFetcher +// backFiller is the backfilling struct +type backFiller struct { + fetcher fetcher.StateDiffFetcher } -// BackFillerArgs are used to pass configuration params to the backfiller -type BackFillerArgs struct { - // mapping of hashed addresses to a list of the storage key hashes we want to collect at that address - WantedStorage map[common.Hash][]common.Hash - StartingBlock uint64 - EndingBlock uint64 -} - -// NewStorageBackFiller returns a IBackFiller -func NewStorageBackFiller(bc fetcher.BatchClient) IBackFiller { - return &BackFiller{ - sdf: fetcher.NewStateDiffFetcher(bc), +// NewStorageBackFiller returns a BackFiller +func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher) BackFiller { + return &backFiller{ + fetcher: fetcher, } } // BackFill uses the provided config to fetch and return the state diff at the specified blocknumber // StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) -func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error) { - results := make(map[common.Hash][]utils.StorageDiff, len(bfa.WantedStorage)) - if bfa.EndingBlock < bfa.StartingBlock { +func (bf *backFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) { + results := make([]utils.StorageDiff, 0) + if endingBlock < startingBlock { return nil, errors.New("backfill: ending block number needs to be greater than starting block number") } - blockHeights := make([]uint64, 0, bfa.EndingBlock-bfa.StartingBlock+1) - for i := bfa.StartingBlock; i <= bfa.EndingBlock; i++ { + blockHeights := make([]uint64, 0, endingBlock-startingBlock+1) + for i := startingBlock; i <= endingBlock; i++ { blockHeights = append(blockHeights, i) } - payloads, err := bf.sdf.FetchStateDiffsAt(blockHeights) + payloads, err := bf.fetcher.FetchStateDiffsAt(blockHeights) if err != nil { return nil, err } for _, payload := range payloads { - block := new(types.Block) - blockDecodeErr := rlp.DecodeBytes(payload.BlockRlp, block) - if blockDecodeErr != nil { - return nil, blockDecodeErr - } stateDiff := new(statediff.StateDiff) stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) if stateDiffDecodeErr != nil { @@ -84,42 +68,20 @@ func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.Stor } accounts := utils.GetAccountsFromDiff(*stateDiff) for _, account := range accounts { - if wantedHashedAddress(bfa.WantedStorage, common.BytesToHash(account.Key)) { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) - for _, storage := range account.Storage { - if wantedHashedStorageKey(bfa.WantedStorage[common.BytesToHash(account.Key)], storage.Key) { - diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) - logrus.Trace("adding storage diff to out channel", - "keccak of address: ", diff.HashedAddress.Hex(), - "block height: ", diff.BlockHeight, - "storage key: ", diff.StorageKey.Hex(), - "storage value: ", diff.StorageValue.Hex()) - if formatErr != nil { - return nil, formatErr - } - results[diff.HashedAddress] = append(results[diff.HashedAddress], diff) - } + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) + for _, storage := range account.Storage { + diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) + if formatErr != nil { + return nil, formatErr } + logrus.Trace("adding storage diff to results", + "keccak of address: ", diff.HashedAddress.Hex(), + "block height: ", diff.BlockHeight, + "storage key: ", diff.StorageKey.Hex(), + "storage value: ", diff.StorageValue.Hex()) + results = append(results, diff) } } } return results, nil } - -func wantedHashedAddress(wantedStorage map[common.Hash][]common.Hash, hashedKey common.Hash) bool { - for addrHash := range wantedStorage { - if bytes.Equal(addrHash.Bytes(), hashedKey.Bytes()) { - return true - } - } - return false -} - -func wantedHashedStorageKey(wantedKeys []common.Hash, keyBytes []byte) bool { - for _, key := range wantedKeys { - if bytes.Equal(key.Bytes(), keyBytes) { - return true - } - } - return false -} diff --git a/libraries/shared/storage/backfill_test.go b/libraries/shared/storage/backfill_test.go index 7dd24f8d..4b3304e2 100644 --- a/libraries/shared/storage/backfill_test.go +++ b/libraries/shared/storage/backfill_test.go @@ -18,94 +18,43 @@ package storage_test import ( "bytes" - "encoding/json" - "errors" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) -type mockClient struct { - MappedStateDiffAt map[uint64][]byte -} - -// SetReturnDiffAt method to set what statediffs the mock client returns -func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error { - if mc.MappedStateDiffAt == nil { - mc.MappedStateDiffAt = make(map[uint64][]byte) - } - by, err := json.Marshal(diffPayload) - if err != nil { - return err - } - mc.MappedStateDiffAt[height] = by - return nil -} - -// BatchCall mockClient method to simulate batch call to geth -func (mc *mockClient) BatchCall(batch []client.BatchElem) error { - if mc.MappedStateDiffAt == nil { - return errors.New("mockclient needs to be initialized with statediff payloads and errors") - } - for _, batchElem := range batch { - if len(batchElem.Args) != 1 { - return errors.New("expected batch elem to contain single argument") - } - blockHeight, ok := batchElem.Args[0].(uint64) - if !ok { - return errors.New("expected batch elem argument to be a uint64") - } - err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) - if err != nil { - return err - } - } - return nil -} - var _ = Describe("BackFiller", func() { Describe("BackFill", func() { var ( - mc *mockClient - backFiller storage.IBackFiller + fetcher *mocks.StateDiffFetcher + backFiller storage.BackFiller ) BeforeEach(func() { - mc = new(mockClient) - setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) - Expect(setDiffAtErr1).ToNot(HaveOccurred()) - setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) - Expect(setDiffAtErr2).ToNot(HaveOccurred()) - backFiller = storage.NewStorageBackFiller(mc) + fetcher = new(mocks.StateDiffFetcher) + fetcher.SetPayloadsToReturn(map[uint64]*statediff.Payload{ + test_data.BlockNumber.Uint64(): &test_data.MockStatediffPayload, + test_data.BlockNumber2.Uint64(): &test_data.MockStatediffPayload2, + }) + backFiller = storage.NewStorageBackFiller(fetcher) }) It("Batch calls statediff_stateDiffAt", func() { - backFillArgs := storage.BackFillerArgs{ - WantedStorage: map[common.Hash][]common.Hash{ - test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)}, - test_data.AnotherContractLeafKey: {common.BytesToHash(test_data.StorageKey)}, - }, - StartingBlock: test_data.BlockNumber.Uint64(), - EndingBlock: test_data.BlockNumber2.Uint64(), - } - backFillStorage, backFillErr := backFiller.BackFill(backFillArgs) + backFillStorage, backFillErr := backFiller.BackFill(test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64()) Expect(backFillErr).ToNot(HaveOccurred()) - Expect(len(backFillStorage)).To(Equal(2)) - Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1)) - Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(3)) - Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff)) + Expect(len(backFillStorage)).To(Equal(4)) // Can only rlp encode the slice of diffs as part of a struct // Rlp encoding allows us to compare content of the slices when the order in the slice may vary expectedDiffStruct := struct { diffs []utils.StorageDiff }{ []utils.StorageDiff{ + test_data.CreatedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff2, test_data.DeletedExpectedStorageDiff, @@ -116,40 +65,11 @@ var _ = Describe("BackFiller", func() { receivedDiffStruct := struct { diffs []utils.StorageDiff }{ - backFillStorage[test_data.AnotherContractLeafKey], + backFillStorage, } receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct) Expect(rlpErr2).ToNot(HaveOccurred()) Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue()) }) - - It("Only returns storage for provided addresses (address hashes)", func() { - backFillArgs := storage.BackFillerArgs{ - WantedStorage: map[common.Hash][]common.Hash{ - test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)}, - }, - StartingBlock: test_data.BlockNumber.Uint64(), - EndingBlock: test_data.BlockNumber2.Uint64(), - } - backFillStorage, backFillErr := backFiller.BackFill(backFillArgs) - Expect(backFillErr).ToNot(HaveOccurred()) - Expect(len(backFillStorage)).To(Equal(1)) - Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1)) - Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(0)) - Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff)) - }) - - It("Only returns storage for provided storage keys", func() { - backFillArgs := storage.BackFillerArgs{ - WantedStorage: map[common.Hash][]common.Hash{ - test_data.ContractLeafKey: nil, - }, - StartingBlock: test_data.BlockNumber.Uint64(), - EndingBlock: test_data.BlockNumber2.Uint64(), - } - backFillStorage, backFillErr := backFiller.BackFill(backFillArgs) - Expect(backFillErr).ToNot(HaveOccurred()) - Expect(len(backFillStorage)).To(Equal(0)) - }) }) }) diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go index d749b773..ef709842 100644 --- a/libraries/shared/test_data/statediff.go +++ b/libraries/shared/test_data/statediff.go @@ -151,6 +151,7 @@ var ( } CreatedExpectedStorageDiff = utils.StorageDiff{ + Id: 1333, HashedAddress: common.BytesToHash(ContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: int(BlockNumber.Int64()), @@ -158,6 +159,7 @@ var ( StorageValue: common.BytesToHash(SmallStorageValue), } UpdatedExpectedStorageDiff = utils.StorageDiff{ + Id: 1334, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: int(BlockNumber.Int64()), @@ -165,6 +167,7 @@ var ( StorageValue: common.BytesToHash(LargeStorageValue), } UpdatedExpectedStorageDiff2 = utils.StorageDiff{ + Id: 1335, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: int(BlockNumber2.Int64()), @@ -172,6 +175,7 @@ var ( StorageValue: common.BytesToHash(SmallStorageValue), } DeletedExpectedStorageDiff = utils.StorageDiff{ + Id: 1336, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: int(BlockNumber.Int64()), diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 76a99e95..6ff1f771 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" @@ -31,7 +32,8 @@ import ( type IStorageWatcher interface { AddTransformers(initializers []transformer.StorageTransformerInitializer) - Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) + Execute(queueRecheckInterval time.Duration, backFill bool) + BackFill(backFiller storage.BackFiller, minDeploymentBlock int) } type StorageWatcher struct { @@ -39,6 +41,9 @@ type StorageWatcher struct { StorageFetcher fetcher.IStorageFetcher Queue storage.IStorageQueue KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer + DiffsChan chan utils.StorageDiff + ErrsChan chan error + StartingSyncBlockChan chan int } func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { @@ -47,6 +52,9 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage return StorageWatcher{ db: db, StorageFetcher: fetcher, + DiffsChan: make(chan utils.StorageDiff), + ErrsChan: make(chan error), + StartingSyncBlockChan: make(chan int), Queue: queue, KeccakAddressTransformers: transformers, } @@ -59,14 +67,33 @@ func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer. } } -func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) { +// BackFill configures the StorageWatcher to backfill missed storage diffs using a modified archival geth client +func (storageWatcher StorageWatcher) BackFill(backFiller storage.BackFiller, minDeploymentBlock int) { + // need to learn which block to perform the backfill process up to + // this blocks until the Execute process sends us the first block number it sees + startingSyncBlock := <-storageWatcher.StartingSyncBlockChan + backFilledDiffs, err := backFiller.BackFill(uint64(minDeploymentBlock), uint64(startingSyncBlock)) + if err != nil { + storageWatcher.ErrsChan <- err + } + for _, storageDiff := range backFilledDiffs { + storageWatcher.DiffsChan <- storageDiff + } +} + +func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration, backFill bool) { ticker := time.NewTicker(queueRecheckInterval) - go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan) + go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan) + start := true for { select { - case fetchErr := <-errsChan: + case fetchErr := <-storageWatcher.ErrsChan: logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) - case diff := <-diffsChan: + case diff := <-storageWatcher.DiffsChan: + if start && backFill { + storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1 + start = false + } storageWatcher.processRow(diff) case <-ticker.C: storageWatcher.processQueue() diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 391dfd6c..29f64672 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -22,11 +22,14 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fakes" @@ -38,7 +41,7 @@ var _ = Describe("Storage Watcher", func() { It("adds transformers", func() { fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} - w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) + w := watcher.NewStorageWatcher(mocks.NewClosingStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) @@ -48,21 +51,17 @@ var _ = Describe("Storage Watcher", func() { Describe("Execute", func() { var ( - errs chan error - mockFetcher *mocks.MockStorageFetcher + mockFetcher *mocks.ClosingStorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer csvDiff utils.StorageDiff - diffs chan utils.StorageDiff storageWatcher watcher.StorageWatcher hashedAddress common.Hash ) BeforeEach(func() { - errs = make(chan error) - diffs = make(chan utils.StorageDiff) hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") - mockFetcher = mocks.NewMockStorageFetcher() + mockFetcher = mocks.NewClosingStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} csvDiff = utils.StorageDiff{ @@ -85,7 +84,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(diffs, errs, time.Hour) + go storageWatcher.Execute(time.Hour, false) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) @@ -103,25 +102,28 @@ var _ = Describe("Storage Watcher", func() { }) It("executes transformer for recognized storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) + go storageWatcher.Execute(time.Hour, false) - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(csvDiff)) + Eventually(func() []utils.StorageDiff { + return mockTransformer.PassedDiffs + }).Should(Equal([]utils.StorageDiff{csvDiff})) close(done) }) It("queues diff for later processing if transformer execution fails", func(done Done) { mockTransformer.ExecuteErr = fakes.FakeError - go storageWatcher.Execute(diffs, errs, time.Hour) + go storageWatcher.Execute(time.Hour, false) - Expect(<-errs).To(BeNil()) + Expect(<-storageWatcher.ErrsChan).To(BeNil()) Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) Eventually(func() utils.StorageDiff { - return mockQueue.AddPassedDiff + if len(mockQueue.AddPassedDiffs) > 0 { + return mockQueue.AddPassedDiffs[0] + } + return utils.StorageDiff{} }).Should(Equal(csvDiff)) close(done) }) @@ -134,7 +136,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(diffs, errs, time.Hour) + go storageWatcher.Execute(time.Hour, false) Eventually(func() bool { return mockQueue.AddCalled @@ -156,19 +158,25 @@ var _ = Describe("Storage Watcher", func() { }) It("executes transformer for storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) + go storageWatcher.Execute(time.Nanosecond, false) Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff + if len(mockTransformer.PassedDiffs) > 0 { + return mockTransformer.PassedDiffs[0] + } + return utils.StorageDiff{} }).Should(Equal(csvDiff)) close(done) }) It("deletes diff from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) + go storageWatcher.Execute(time.Nanosecond, false) Eventually(func() int { - return mockQueue.DeletePassedID + if len(mockQueue.DeletePassedIds) > 0 { + return mockQueue.DeletePassedIds[0] + } + return 0 }).Should(Equal(csvDiff.ID)) close(done) }) @@ -180,7 +188,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(diffs, errs, time.Nanosecond) + go storageWatcher.Execute(time.Nanosecond, false) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) @@ -196,10 +204,13 @@ var _ = Describe("Storage Watcher", func() { } mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - go storageWatcher.Execute(diffs, errs, time.Nanosecond) + go storageWatcher.Execute(time.Nanosecond, false) Eventually(func() int { - return mockQueue.DeletePassedID + if len(mockQueue.DeletePassedIds) > 0 { + return mockQueue.DeletePassedIds[0] + } + return 0 }).Should(Equal(obsoleteDiff.ID)) close(done) }) @@ -216,7 +227,127 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(diffs, errs, time.Nanosecond) + go storageWatcher.Execute(time.Nanosecond, false) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) + }) + }) + }) + + Describe("BackFill", func() { + var ( + mockFetcher *mocks.StorageFetcher + mockBackFiller *mocks.BackFiller + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + csvDiff utils.StorageDiff + storageWatcher watcher.StorageWatcher + hashedAddress common.Hash + ) + + BeforeEach(func() { + mockBackFiller = new(mocks.BackFiller) + mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ + test_data.CreatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, + test_data.DeletedExpectedStorageDiff}) + hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") + mockFetcher = mocks.NewStorageFetcher() + mockQueue = &mocks.MockStorageQueue{} + mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} + csvDiff = utils.StorageDiff{ + ID: 1337, + HashedAddress: hashedAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: int(test_data.BlockNumber2.Int64()) + 1, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + }) + + Describe("transforming streamed and backfilled queued storage diffs", func() { + BeforeEach(func() { + mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} + mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff, + test_data.CreatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, + test_data.DeletedExpectedStorageDiff} + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + }) + + It("executes transformer for storage diffs", func(done Done) { + go storageWatcher.BackFill(mockBackFiller, int(test_data.BlockNumber.Uint64())) + go storageWatcher.Execute(time.Nanosecond, true) + expectedDiffsStruct := struct { + diffs []utils.StorageDiff + }{ + []utils.StorageDiff{ + csvDiff, + test_data.CreatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, + test_data.DeletedExpectedStorageDiff, + }, + } + expectedDiffsBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffsStruct) + Expect(rlpErr1).ToNot(HaveOccurred()) + Eventually(func() []byte { + diffsStruct := struct { + diffs []utils.StorageDiff + }{ + mockTransformer.PassedDiffs, + } + diffsBytes, rlpErr2 := rlp.EncodeToBytes(diffsStruct) + Expect(rlpErr2).ToNot(HaveOccurred()) + return diffsBytes + }).Should(Equal(expectedDiffsBytes)) + close(done) + }) + + It("deletes diffs from queue if transformer execution is successful", func(done Done) { + go storageWatcher.Execute(time.Nanosecond, false) + expectedIdsStruct := struct { + diffs []int + }{ + []int{ + csvDiff.ID, + test_data.CreatedExpectedStorageDiff.ID, + test_data.UpdatedExpectedStorageDiff.ID, + test_data.UpdatedExpectedStorageDiff2.ID, + test_data.DeletedExpectedStorageDiff.ID, + }, + } + expectedIdsBytes, rlpErr1 := rlp.EncodeToBytes(expectedIdsStruct) + Expect(rlpErr1).ToNot(HaveOccurred()) + Eventually(func() []byte { + idsStruct := struct { + diffs []int + }{ + mockQueue.DeletePassedIds, + } + idsBytes, rlpErr2 := rlp.EncodeToBytes(idsStruct) + Expect(rlpErr2).ToNot(HaveOccurred()) + return idsBytes + }).Should(Equal(expectedIdsBytes)) + close(done) + }) + + It("logs error if deleting persisted diff fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(time.Nanosecond, false) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name())