integrate backfill into storage watcher; documentation for storage backfill

This commit is contained in:
Ian Norden 2019-10-24 11:35:39 -05:00
parent b454b61777
commit 37f4a2d603
18 changed files with 526 additions and 297 deletions

View File

@ -17,21 +17,25 @@
package cmd package cmd
import ( import (
"errors"
"fmt" "fmt"
"plugin" "plugin"
syn "sync" syn "sync"
"time" "time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
) )
@ -186,13 +190,29 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher // Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers") LogWithCommand.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval) on := viper.GetBool("storageBackFill.on")
defer ticker.Stop() if on {
for range ticker.C { go backFillStorage(w)
errs := make(chan error)
diffs := make(chan storageUtils.StorageDiff)
w.Execute(diffs, errs, queueRecheckInterval)
} }
w.Execute(queueRecheckInterval, on)
}
func backFillStorage(w watcher.IStorageWatcher) {
// configure archival rpc client
archivalRPCPath := viper.GetString("storageBackFill.rpcPath")
if archivalRPCPath == "" {
LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided"))
}
rawRPCClient, dialErr := rpc.Dial(archivalRPCPath)
if dialErr != nil {
LogWithCommand.Fatal(dialErr)
}
rpcClient := client.NewRpcClient(rawRPCClient, archivalRPCPath)
// find min deployment block
minDeploymentBlock := viper.GetInt("storageBackFill.startingBlock")
stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient)
backFiller := storage.NewStorageBackFiller(stateDiffFetcher)
w.BackFill(backFiller, minDeploymentBlock)
} }
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {

View File

@ -194,3 +194,19 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
} }
} }
``` ```
### Storage backfilling
Storage transformers stream data from a geth subscription or parity csv file where the storage diffs are produced and emitted as the
full sync progresses. If the transformers have missed consuming a range of diffs due to lag in the startup of the processes or due to misalignment of the sync,
we can configure our storage transformers to backfill missing diffs from a [modified archival geth client](https://github.com/vulcanize/go-ethereum/tree/statediff_at).
To do so, add the following fields to the config file.
```toml
[storageBackFill]
on = false
rpcPath = ""
startingBlock = 0
```
- `on` is set to `true` to turn the backfill process on
- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs
- `startingBlock` is the block height at which we want to begin the backfill process; the height of the earliest contract deployment

View File

@ -1,17 +1,18 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher package fetcher
import ( import (
@ -62,14 +63,15 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
for _, storage := range account.Storage { for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
errs <- formatErr
continue
}
logrus.Trace("adding storage diff to out channel", logrus.Trace("adding storage diff to out channel",
"keccak of address: ", diff.HashedAddress.Hex(), "keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight, "block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(), "storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex()) "storage value: ", diff.StorageValue.Hex())
if formatErr != nil {
errs <- formatErr
}
out <- diff out <- diff
} }

View File

@ -1,16 +1,18 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // This program is free software: you can redistribute it and/or modify
// You may obtain a copy of the License at // it under the terms of the GNU Affero General Public License as published by
// // the Free Software Foundation, either version 3 of the License, or
// http://www.apache.org/licenses/LICENSE-2.0 // (at your option) any later version.
//
// Unless required by applicable law or agreed to in writing, software // This program is distributed in the hope that it will be useful,
// distributed under the License is distributed on an "AS IS" BASIS, // but WITHOUT ANY WARRANTY; without even the implied warranty of
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// See the License for the specific language governing permissions and // GNU Affero General Public License for more details.
// limitations under the License.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fetcher_test package fetcher_test
@ -21,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"

View File

@ -17,13 +17,14 @@
package fetcher package fetcher
import ( import (
"fmt"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/geth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/client"
) )
// IStateDiffFetcher is the state diff fetching interface // StateDiffFetcher is the state diff fetching interface
type IStateDiffFetcher interface { type StateDiffFetcher interface {
FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error)
} }
@ -32,23 +33,23 @@ type BatchClient interface {
BatchCall(batch []client.BatchElem) error BatchCall(batch []client.BatchElem) error
} }
// StateDiffFetcher is the state diff fetching struct // stateDiffFetcher is the state diff fetching struct
type StateDiffFetcher struct { type stateDiffFetcher struct {
client BatchClient client BatchClient
} }
const method = "statediff_stateDiffAt" const method = "statediff_stateDiffAt"
// NewStateDiffFetcher returns a IStateDiffFetcher // NewStateDiffFetcher returns a IStateDiffFetcher
func NewStateDiffFetcher(bc BatchClient) IStateDiffFetcher { func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher {
return &StateDiffFetcher{ return &stateDiffFetcher{
client: bc, client: bc,
} }
} }
// FetchStateDiffsAt fetches the statediff payloads at the given block heights // FetchStateDiffsAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) {
batch := make([]client.BatchElem, 0) batch := make([]client.BatchElem, 0)
for _, height := range blockHeights { for _, height := range blockHeights {
batch = append(batch, client.BatchElem{ batch = append(batch, client.BatchElem{
@ -57,14 +58,14 @@ func (sdf *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*stated
Result: new(statediff.Payload), Result: new(statediff.Payload),
}) })
} }
batchErr := sdf.client.BatchCall(batch) batchErr := fetcher.client.BatchCall(batch)
if batchErr != nil { if batchErr != nil {
return nil, batchErr return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error())
} }
results := make([]*statediff.Payload, 0, len(blockHeights)) results := make([]*statediff.Payload, 0, len(blockHeights))
for _, batchElem := range batch { for _, batchElem := range batch {
if batchElem.Error != nil { if batchElem.Error != nil {
return nil, batchElem.Error return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error())
} }
results = append(results, batchElem.Result.(*statediff.Payload)) results = append(results, batchElem.Result.(*statediff.Payload))
} }

View File

@ -18,66 +18,25 @@ package fetcher_test
import ( import (
"bytes" "bytes"
"encoding/json"
"errors"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
) )
type mockClient struct {
MappedStateDiffAt map[uint64][]byte
}
// SetReturnDiffAt method to set what statediffs the mock client returns
func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
if mc.MappedStateDiffAt == nil {
mc.MappedStateDiffAt = make(map[uint64][]byte)
}
by, err := json.Marshal(diffPayload)
if err != nil {
return err
}
mc.MappedStateDiffAt[height] = by
return nil
}
// BatchCall mockClient method to simulate batch call to geth
func (mc *mockClient) BatchCall(batch []client.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}
for _, batchElem := range batch {
if len(batchElem.Args) != 1 {
return errors.New("expected batch elem to contain single argument")
}
blockHeight, ok := batchElem.Args[0].(uint64)
if !ok {
return errors.New("expected batch elem argument to be a uint64")
}
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil {
return err
}
}
return nil
}
var _ = Describe("StateDiffFetcher", func() { var _ = Describe("StateDiffFetcher", func() {
Describe("FetchStateDiffsAt", func() { Describe("FetchStateDiffsAt", func() {
var ( var (
mc *mockClient mc *mocks.BackFillerClient
stateDiffFetcher fetcher.IStateDiffFetcher stateDiffFetcher fetcher.StateDiffFetcher
) )
BeforeEach(func() { BeforeEach(func() {
mc = new(mockClient) mc = new(mocks.BackFillerClient)
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
Expect(setDiffAtErr1).ToNot(HaveOccurred()) Expect(setDiffAtErr1).ToNot(HaveOccurred())
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)

View File

@ -0,0 +1,39 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
// BackFiller mock for tests
type BackFiller struct {
StorageDiffsToReturn []utils.StorageDiff
BackFillErr error
PassedStartingBlock uint64
PassedEndingBlock uint64
}
// SetStorageDiffsToReturn for tests
func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) {
backFiller.StorageDiffsToReturn = diffs
}
// BackFill mock method
func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) {
backFiller.PassedStartingBlock = startingBlock
backFiller.PassedEndingBlock = endingBlock
return backFiller.StorageDiffsToReturn, backFiller.BackFillErr
}

View File

@ -0,0 +1,64 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
)
// BackFillerClient is a mock client for use in backfiller tests
type BackFillerClient struct {
MappedStateDiffAt map[uint64][]byte
}
// SetReturnDiffAt method to set what statediffs the mock client returns
func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
if mc.MappedStateDiffAt == nil {
mc.MappedStateDiffAt = make(map[uint64][]byte)
}
by, err := json.Marshal(diffPayload)
if err != nil {
return err
}
mc.MappedStateDiffAt[height] = by
return nil
}
// BatchCall mockClient method to simulate batch call to geth
func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}
for _, batchElem := range batch {
if len(batchElem.Args) != 1 {
return errors.New("expected batch elem to contain single argument")
}
blockHeight, ok := batchElem.Args[0].(uint64)
if !ok {
return errors.New("expected batch elem argument to be a uint64")
}
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,48 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"errors"
"github.com/ethereum/go-ethereum/statediff"
)
// StateDiffFetcher mock for tests
type StateDiffFetcher struct {
PayloadsToReturn map[uint64]*statediff.Payload
FetchErr error
CalledAtBlockHeights [][]uint64
}
// SetPayloadsToReturn for tests
func (fetcher *StateDiffFetcher) SetPayloadsToReturn(payloads map[uint64]*statediff.Payload) {
fetcher.PayloadsToReturn = payloads
}
// FetchStateDiffsAt mock method
func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) {
fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
if fetcher.PayloadsToReturn == nil {
return nil, errors.New("MockStateDiffFetcher needs to be initialized with payloads to return")
}
results := make([]*statediff.Payload, 0, len(blockHeights))
for _, height := range blockHeights {
results = append(results, fetcher.PayloadsToReturn[height])
}
return results, nil
}

View File

@ -18,16 +18,19 @@ package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
type MockStorageFetcher struct { // ClosingStorageFetcher is a mock fetcher for use in tests without backfilling
type ClosingStorageFetcher struct {
DiffsToReturn []utils.StorageDiff DiffsToReturn []utils.StorageDiff
ErrsToReturn []error ErrsToReturn []error
} }
func NewMockStorageFetcher() *MockStorageFetcher { // NewClosingStorageFetcher returns a new ClosingStorageFetcher
return &MockStorageFetcher{} func NewClosingStorageFetcher() *ClosingStorageFetcher {
return &ClosingStorageFetcher{}
} }
func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { // FetchStorageDiffs mock method
func (fetcher *ClosingStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
defer close(out) defer close(out)
defer close(errs) defer close(errs)
for _, err := range fetcher.ErrsToReturn { for _, err := range fetcher.ErrsToReturn {
@ -37,3 +40,24 @@ func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDif
out <- diff out <- diff
} }
} }
// StorageFetcher is a mock fetcher for use in tests with backfilling
type StorageFetcher struct {
DiffsToReturn []utils.StorageDiff
ErrsToReturn []error
}
// NewStorageFetcher returns a new StorageFetcher
func NewStorageFetcher() *StorageFetcher {
return &StorageFetcher{}
}
// FetchStorageDiffs mock method
func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
for _, diff := range fetcher.DiffsToReturn {
out <- diff
}
}

View File

@ -20,27 +20,31 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
) )
// MockStorageQueue for tests
type MockStorageQueue struct { type MockStorageQueue struct {
AddCalled bool AddCalled bool
AddError error AddError error
AddPassedDiff utils.StorageDiff AddPassedDiffs []utils.StorageDiff
DeleteErr error DeleteErr error
DeletePassedID int DeletePassedIds []int
GetAllErr error GetAllErr error
DiffsToReturn []utils.StorageDiff DiffsToReturn []utils.StorageDiff
} }
// Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
queue.AddCalled = true queue.AddCalled = true
queue.AddPassedDiff = diff queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff)
return queue.AddError return queue.AddError
} }
// Delete mock method
func (queue *MockStorageQueue) Delete(id int) error { func (queue *MockStorageQueue) Delete(id int) error {
queue.DeletePassedID = id queue.DeletePassedIds = append(queue.DeletePassedIds, id)
return queue.DeleteErr return queue.DeleteErr
} }
// GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
return queue.DiffsToReturn, queue.GetAllErr return queue.DiffsToReturn, queue.GetAllErr
} }

View File

@ -18,26 +18,31 @@ package mocks
import ( import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
// MockStorageTransformer for tests
type MockStorageTransformer struct { type MockStorageTransformer struct {
KeccakOfAddress common.Hash KeccakOfAddress common.Hash
ExecuteErr error ExecuteErr error
PassedDiff utils.StorageDiff PassedDiffs []utils.StorageDiff
} }
// Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error {
transformer.PassedDiff = diff transformer.PassedDiffs = append(transformer.PassedDiffs, diff)
return transformer.ExecuteErr return transformer.ExecuteErr
} }
// KeccakContractAddress mock method
func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash { func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash {
return transformer.KeccakOfAddress return transformer.KeccakOfAddress
} }
// FakeTransformerInitializer mock method
func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer { func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer {
return transformer return transformer
} }

View File

@ -17,14 +17,14 @@
package repository_test package repository_test
import ( import (
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"strings" "strings"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"

View File

@ -17,12 +17,9 @@
package storage package storage
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -31,52 +28,39 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
) )
// IBackFiller is the backfilling interface // BackFiller is the backfilling interface
type IBackFiller interface { type BackFiller interface {
BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error)
} }
// BackFiller is the backfilling struct // backFiller is the backfilling struct
type BackFiller struct { type backFiller struct {
sdf fetcher.IStateDiffFetcher fetcher fetcher.StateDiffFetcher
} }
// BackFillerArgs are used to pass configuration params to the backfiller // NewStorageBackFiller returns a BackFiller
type BackFillerArgs struct { func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher) BackFiller {
// mapping of hashed addresses to a list of the storage key hashes we want to collect at that address return &backFiller{
WantedStorage map[common.Hash][]common.Hash fetcher: fetcher,
StartingBlock uint64
EndingBlock uint64
}
// NewStorageBackFiller returns a IBackFiller
func NewStorageBackFiller(bc fetcher.BatchClient) IBackFiller {
return &BackFiller{
sdf: fetcher.NewStateDiffFetcher(bc),
} }
} }
// BackFill uses the provided config to fetch and return the state diff at the specified blocknumber // BackFill uses the provided config to fetch and return the state diff at the specified blocknumber
// StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) // StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.StorageDiff, error) { func (bf *backFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) {
results := make(map[common.Hash][]utils.StorageDiff, len(bfa.WantedStorage)) results := make([]utils.StorageDiff, 0)
if bfa.EndingBlock < bfa.StartingBlock { if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number") return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
} }
blockHeights := make([]uint64, 0, bfa.EndingBlock-bfa.StartingBlock+1) blockHeights := make([]uint64, 0, endingBlock-startingBlock+1)
for i := bfa.StartingBlock; i <= bfa.EndingBlock; i++ { for i := startingBlock; i <= endingBlock; i++ {
blockHeights = append(blockHeights, i) blockHeights = append(blockHeights, i)
} }
payloads, err := bf.sdf.FetchStateDiffsAt(blockHeights) payloads, err := bf.fetcher.FetchStateDiffsAt(blockHeights)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, payload := range payloads { for _, payload := range payloads {
block := new(types.Block)
blockDecodeErr := rlp.DecodeBytes(payload.BlockRlp, block)
if blockDecodeErr != nil {
return nil, blockDecodeErr
}
stateDiff := new(statediff.StateDiff) stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil { if stateDiffDecodeErr != nil {
@ -84,42 +68,20 @@ func (bf *BackFiller) BackFill(bfa BackFillerArgs) (map[common.Hash][]utils.Stor
} }
accounts := utils.GetAccountsFromDiff(*stateDiff) accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts { for _, account := range accounts {
if wantedHashedAddress(bfa.WantedStorage, common.BytesToHash(account.Key)) { logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) for _, storage := range account.Storage {
for _, storage := range account.Storage { diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if wantedHashedStorageKey(bfa.WantedStorage[common.BytesToHash(account.Key)], storage.Key) { if formatErr != nil {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) return nil, formatErr
logrus.Trace("adding storage diff to out channel",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
if formatErr != nil {
return nil, formatErr
}
results[diff.HashedAddress] = append(results[diff.HashedAddress], diff)
}
} }
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
results = append(results, diff)
} }
} }
} }
return results, nil return results, nil
} }
func wantedHashedAddress(wantedStorage map[common.Hash][]common.Hash, hashedKey common.Hash) bool {
for addrHash := range wantedStorage {
if bytes.Equal(addrHash.Bytes(), hashedKey.Bytes()) {
return true
}
}
return false
}
func wantedHashedStorageKey(wantedKeys []common.Hash, keyBytes []byte) bool {
for _, key := range wantedKeys {
if bytes.Equal(key.Bytes(), keyBytes) {
return true
}
}
return false
}

View File

@ -18,94 +18,43 @@ package storage_test
import ( import (
"bytes" "bytes"
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
) )
type mockClient struct {
MappedStateDiffAt map[uint64][]byte
}
// SetReturnDiffAt method to set what statediffs the mock client returns
func (mc *mockClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error {
if mc.MappedStateDiffAt == nil {
mc.MappedStateDiffAt = make(map[uint64][]byte)
}
by, err := json.Marshal(diffPayload)
if err != nil {
return err
}
mc.MappedStateDiffAt[height] = by
return nil
}
// BatchCall mockClient method to simulate batch call to geth
func (mc *mockClient) BatchCall(batch []client.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}
for _, batchElem := range batch {
if len(batchElem.Args) != 1 {
return errors.New("expected batch elem to contain single argument")
}
blockHeight, ok := batchElem.Args[0].(uint64)
if !ok {
return errors.New("expected batch elem argument to be a uint64")
}
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil {
return err
}
}
return nil
}
var _ = Describe("BackFiller", func() { var _ = Describe("BackFiller", func() {
Describe("BackFill", func() { Describe("BackFill", func() {
var ( var (
mc *mockClient fetcher *mocks.StateDiffFetcher
backFiller storage.IBackFiller backFiller storage.BackFiller
) )
BeforeEach(func() { BeforeEach(func() {
mc = new(mockClient) fetcher = new(mocks.StateDiffFetcher)
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) fetcher.SetPayloadsToReturn(map[uint64]*statediff.Payload{
Expect(setDiffAtErr1).ToNot(HaveOccurred()) test_data.BlockNumber.Uint64(): &test_data.MockStatediffPayload,
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) test_data.BlockNumber2.Uint64(): &test_data.MockStatediffPayload2,
Expect(setDiffAtErr2).ToNot(HaveOccurred()) })
backFiller = storage.NewStorageBackFiller(mc) backFiller = storage.NewStorageBackFiller(fetcher)
}) })
It("Batch calls statediff_stateDiffAt", func() { It("Batch calls statediff_stateDiffAt", func() {
backFillArgs := storage.BackFillerArgs{ backFillStorage, backFillErr := backFiller.BackFill(test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64())
WantedStorage: map[common.Hash][]common.Hash{
test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
test_data.AnotherContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
},
StartingBlock: test_data.BlockNumber.Uint64(),
EndingBlock: test_data.BlockNumber2.Uint64(),
}
backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
Expect(backFillErr).ToNot(HaveOccurred()) Expect(backFillErr).ToNot(HaveOccurred())
Expect(len(backFillStorage)).To(Equal(2)) Expect(len(backFillStorage)).To(Equal(4))
Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1))
Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(3))
Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff))
// Can only rlp encode the slice of diffs as part of a struct // Can only rlp encode the slice of diffs as part of a struct
// Rlp encoding allows us to compare content of the slices when the order in the slice may vary // Rlp encoding allows us to compare content of the slices when the order in the slice may vary
expectedDiffStruct := struct { expectedDiffStruct := struct {
diffs []utils.StorageDiff diffs []utils.StorageDiff
}{ }{
[]utils.StorageDiff{ []utils.StorageDiff{
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2, test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff, test_data.DeletedExpectedStorageDiff,
@ -116,40 +65,11 @@ var _ = Describe("BackFiller", func() {
receivedDiffStruct := struct { receivedDiffStruct := struct {
diffs []utils.StorageDiff diffs []utils.StorageDiff
}{ }{
backFillStorage[test_data.AnotherContractLeafKey], backFillStorage,
} }
receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct) receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct)
Expect(rlpErr2).ToNot(HaveOccurred()) Expect(rlpErr2).ToNot(HaveOccurred())
Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue()) Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue())
}) })
It("Only returns storage for provided addresses (address hashes)", func() {
backFillArgs := storage.BackFillerArgs{
WantedStorage: map[common.Hash][]common.Hash{
test_data.ContractLeafKey: {common.BytesToHash(test_data.StorageKey)},
},
StartingBlock: test_data.BlockNumber.Uint64(),
EndingBlock: test_data.BlockNumber2.Uint64(),
}
backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
Expect(backFillErr).ToNot(HaveOccurred())
Expect(len(backFillStorage)).To(Equal(1))
Expect(len(backFillStorage[test_data.ContractLeafKey])).To(Equal(1))
Expect(len(backFillStorage[test_data.AnotherContractLeafKey])).To(Equal(0))
Expect(backFillStorage[test_data.ContractLeafKey][0]).To(Equal(test_data.CreatedExpectedStorageDiff))
})
It("Only returns storage for provided storage keys", func() {
backFillArgs := storage.BackFillerArgs{
WantedStorage: map[common.Hash][]common.Hash{
test_data.ContractLeafKey: nil,
},
StartingBlock: test_data.BlockNumber.Uint64(),
EndingBlock: test_data.BlockNumber2.Uint64(),
}
backFillStorage, backFillErr := backFiller.BackFill(backFillArgs)
Expect(backFillErr).ToNot(HaveOccurred())
Expect(len(backFillStorage)).To(Equal(0))
})
}) })
}) })

View File

@ -151,6 +151,7 @@ var (
} }
CreatedExpectedStorageDiff = utils.StorageDiff{ CreatedExpectedStorageDiff = utils.StorageDiff{
Id: 1333,
HashedAddress: common.BytesToHash(ContractLeafKey[:]), HashedAddress: common.BytesToHash(ContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
@ -158,6 +159,7 @@ var (
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
UpdatedExpectedStorageDiff = utils.StorageDiff{ UpdatedExpectedStorageDiff = utils.StorageDiff{
Id: 1334,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
@ -165,6 +167,7 @@ var (
StorageValue: common.BytesToHash(LargeStorageValue), StorageValue: common.BytesToHash(LargeStorageValue),
} }
UpdatedExpectedStorageDiff2 = utils.StorageDiff{ UpdatedExpectedStorageDiff2 = utils.StorageDiff{
Id: 1335,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber2.Int64()), BlockHeight: int(BlockNumber2.Int64()),
@ -172,6 +175,7 @@ var (
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
DeletedExpectedStorageDiff = utils.StorageDiff{ DeletedExpectedStorageDiff = utils.StorageDiff{
Id: 1336,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),

View File

@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
@ -31,7 +32,8 @@ import (
type IStorageWatcher interface { type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer) AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) Execute(queueRecheckInterval time.Duration, backFill bool)
BackFill(backFiller storage.BackFiller, minDeploymentBlock int)
} }
type StorageWatcher struct { type StorageWatcher struct {
@ -39,6 +41,9 @@ type StorageWatcher struct {
StorageFetcher fetcher.IStorageFetcher StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
DiffsChan chan utils.StorageDiff
ErrsChan chan error
StartingSyncBlockChan chan int
} }
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
@ -47,6 +52,9 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage
return StorageWatcher{ return StorageWatcher{
db: db, db: db,
StorageFetcher: fetcher, StorageFetcher: fetcher,
DiffsChan: make(chan utils.StorageDiff),
ErrsChan: make(chan error),
StartingSyncBlockChan: make(chan int),
Queue: queue, Queue: queue,
KeccakAddressTransformers: transformers, KeccakAddressTransformers: transformers,
} }
@ -59,14 +67,33 @@ func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.
} }
} }
func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) { // BackFill configures the StorageWatcher to backfill missed storage diffs using a modified archival geth client
func (storageWatcher StorageWatcher) BackFill(backFiller storage.BackFiller, minDeploymentBlock int) {
// need to learn which block to perform the backfill process up to
// this blocks until the Execute process sends us the first block number it sees
startingSyncBlock := <-storageWatcher.StartingSyncBlockChan
backFilledDiffs, err := backFiller.BackFill(uint64(minDeploymentBlock), uint64(startingSyncBlock))
if err != nil {
storageWatcher.ErrsChan <- err
}
for _, storageDiff := range backFilledDiffs {
storageWatcher.DiffsChan <- storageDiff
}
}
func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration, backFill bool) {
ticker := time.NewTicker(queueRecheckInterval) ticker := time.NewTicker(queueRecheckInterval)
go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan) go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan)
start := true
for { for {
select { select {
case fetchErr := <-errsChan: case fetchErr := <-storageWatcher.ErrsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
case diff := <-diffsChan: case diff := <-storageWatcher.DiffsChan:
if start && backFill {
storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1
start = false
}
storageWatcher.processRow(diff) storageWatcher.processRow(diff)
case <-ticker.C: case <-ticker.C:
storageWatcher.processQueue() storageWatcher.processQueue()

View File

@ -22,11 +22,14 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
@ -38,7 +41,7 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() { It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress}
w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w := watcher.NewStorageWatcher(mocks.NewClosingStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
@ -48,21 +51,17 @@ var _ = Describe("Storage Watcher", func() {
Describe("Execute", func() { Describe("Execute", func() {
var ( var (
errs chan error mockFetcher *mocks.ClosingStorageFetcher
mockFetcher *mocks.MockStorageFetcher
mockQueue *mocks.MockStorageQueue mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff csvDiff utils.StorageDiff
diffs chan utils.StorageDiff
storageWatcher watcher.StorageWatcher storageWatcher watcher.StorageWatcher
hashedAddress common.Hash hashedAddress common.Hash
) )
BeforeEach(func() { BeforeEach(func() {
errs = make(chan error)
diffs = make(chan utils.StorageDiff)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewMockStorageFetcher() mockFetcher = mocks.NewClosingStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{ csvDiff = utils.StorageDiff{
@ -85,7 +84,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
@ -103,25 +102,28 @@ var _ = Describe("Storage Watcher", func() {
}) })
It("executes transformer for recognized storage diff", func(done Done) { It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() utils.StorageDiff { Eventually(func() []utils.StorageDiff {
return mockTransformer.PassedDiff return mockTransformer.PassedDiffs
}).Should(Equal(csvDiff)) }).Should(Equal([]utils.StorageDiff{csvDiff}))
close(done) close(done)
}) })
It("queues diff for later processing if transformer execution fails", func(done Done) { It("queues diff for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError mockTransformer.ExecuteErr = fakes.FakeError
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Expect(<-errs).To(BeNil()) Expect(<-storageWatcher.ErrsChan).To(BeNil())
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
}).Should(BeTrue()) }).Should(BeTrue())
Eventually(func() utils.StorageDiff { Eventually(func() utils.StorageDiff {
return mockQueue.AddPassedDiff if len(mockQueue.AddPassedDiffs) > 0 {
return mockQueue.AddPassedDiffs[0]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(csvDiff))
close(done) close(done)
}) })
@ -134,7 +136,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Hour) go storageWatcher.Execute(time.Hour, false)
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
@ -156,19 +158,25 @@ var _ = Describe("Storage Watcher", func() {
}) })
It("executes transformer for storage diff", func(done Done) { It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() utils.StorageDiff { Eventually(func() utils.StorageDiff {
return mockTransformer.PassedDiff if len(mockTransformer.PassedDiffs) > 0 {
return mockTransformer.PassedDiffs[0]
}
return utils.StorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(csvDiff))
close(done) close(done)
}) })
It("deletes diff from queue if transformer execution successful", func(done Done) { It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int {
return mockQueue.DeletePassedID if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0]
}
return 0
}).Should(Equal(csvDiff.ID)) }).Should(Equal(csvDiff.ID))
close(done) close(done)
}) })
@ -180,7 +188,7 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
@ -196,10 +204,13 @@ var _ = Describe("Storage Watcher", func() {
} }
mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff}
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int {
return mockQueue.DeletePassedID if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0]
}
return 0
}).Should(Equal(obsoleteDiff.ID)) }).Should(Equal(obsoleteDiff.ID))
close(done) close(done)
}) })
@ -216,7 +227,127 @@ var _ = Describe("Storage Watcher", func() {
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(diffs, errs, time.Nanosecond) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
})
})
})
Describe("BackFill", func() {
var (
mockFetcher *mocks.StorageFetcher
mockBackFiller *mocks.BackFiller
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
storageWatcher watcher.StorageWatcher
hashedAddress common.Hash
)
BeforeEach(func() {
mockBackFiller = new(mocks.BackFiller)
mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff})
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{
ID: 1337,
HashedAddress: hashedAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
})
Describe("transforming streamed and backfilled queued storage diffs", func() {
BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff,
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
})
It("executes transformer for storage diffs", func(done Done) {
go storageWatcher.BackFill(mockBackFiller, int(test_data.BlockNumber.Uint64()))
go storageWatcher.Execute(time.Nanosecond, true)
expectedDiffsStruct := struct {
diffs []utils.StorageDiff
}{
[]utils.StorageDiff{
csvDiff,
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff,
},
}
expectedDiffsBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffsStruct)
Expect(rlpErr1).ToNot(HaveOccurred())
Eventually(func() []byte {
diffsStruct := struct {
diffs []utils.StorageDiff
}{
mockTransformer.PassedDiffs,
}
diffsBytes, rlpErr2 := rlp.EncodeToBytes(diffsStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
return diffsBytes
}).Should(Equal(expectedDiffsBytes))
close(done)
})
It("deletes diffs from queue if transformer execution is successful", func(done Done) {
go storageWatcher.Execute(time.Nanosecond, false)
expectedIdsStruct := struct {
diffs []int
}{
[]int{
csvDiff.ID,
test_data.CreatedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff2.ID,
test_data.DeletedExpectedStorageDiff.ID,
},
}
expectedIdsBytes, rlpErr1 := rlp.EncodeToBytes(expectedIdsStruct)
Expect(rlpErr1).ToNot(HaveOccurred())
Eventually(func() []byte {
idsStruct := struct {
diffs []int
}{
mockQueue.DeletePassedIds,
}
idsBytes, rlpErr2 := rlp.EncodeToBytes(idsStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
return idsBytes
}).Should(Equal(expectedIdsBytes))
close(done)
})
It("logs error if deleting persisted diff fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())