diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index 7dd7da9d..a9cfd981 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -26,20 +26,24 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" ) +const ( + PayloadChanBufferSize = 20000 // the max eth sub buffer size +) + type GethRPCStorageFetcher struct { - statediffPayloadChan chan statediff.Payload + StatediffPayloadChan chan statediff.Payload streamer streamer.Streamer } -func NewGethRPCStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRPCStorageFetcher { +func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher { return GethRPCStorageFetcher{ - statediffPayloadChan: statediffPayloadChan, + StatediffPayloadChan: make(chan statediff.Payload, PayloadChanBufferSize), streamer: streamer, } } func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { - ethStatediffPayloadChan := fetcher.statediffPayloadChan + ethStatediffPayloadChan := fetcher.StatediffPayloadChan clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) if clientSubErr != nil { errs <- clientSubErr diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go index dff1b863..becfaeb2 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -59,15 +59,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa var _ = Describe("Geth RPC Storage Fetcher", func() { var streamer MockStoragediffStreamer - var statediffPayloadChan chan statediff.Payload var statediffFetcher fetcher.GethRPCStorageFetcher var storagediffChan chan utils.StorageDiff var errorChan chan error BeforeEach(func() { streamer = MockStoragediffStreamer{} - statediffPayloadChan = make(chan statediff.Payload, 1) - statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer, statediffPayloadChan) + statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer) storagediffChan = make(chan utils.StorageDiff) errorChan = make(chan error) }) @@ -91,9 +89,9 @@ var _ = Describe("Geth RPC Storage Fetcher", func() { go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) - streamedPayload := <-statediffPayloadChan + streamedPayload := <-statediffFetcher.StatediffPayloadChan Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload)) - Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan)) + Expect(streamer.PassedPayloadChan).To(Equal(statediffFetcher.StatediffPayloadChan)) close(done) }) diff --git a/libraries/shared/fetcher/state_diff_fetcher.go b/libraries/shared/fetcher/state_diff_fetcher.go index a24f94f8..fe4c16c6 100644 --- a/libraries/shared/fetcher/state_diff_fetcher.go +++ b/libraries/shared/fetcher/state_diff_fetcher.go @@ -25,7 +25,7 @@ import ( // StateDiffFetcher is the state diff fetching interface type StateDiffFetcher interface { - FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) + FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) } // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion @@ -35,6 +35,8 @@ type BatchClient interface { // stateDiffFetcher is the state diff fetching struct type stateDiffFetcher struct { + // stateDiffFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state + // http.Client is thread-safe client BatchClient } @@ -49,7 +51,7 @@ func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher { // FetchStateDiffsAt fetches the statediff payloads at the given block heights // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) -func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { +func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) { batch := make([]client.BatchElem, 0) for _, height := range blockHeights { batch = append(batch, client.BatchElem{ @@ -62,12 +64,15 @@ func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*st if batchErr != nil { return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error()) } - results := make([]*statediff.Payload, 0, len(blockHeights)) + results := make([]statediff.Payload, 0, len(blockHeights)) for _, batchElem := range batch { if batchElem.Error != nil { return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error()) } - results = append(results, batchElem.Result.(*statediff.Payload)) + payload, ok := batchElem.Result.(*statediff.Payload) + if ok { + results = append(results, *payload) + } } return results, nil } diff --git a/libraries/shared/fetcher/state_diff_fetcher_test.go b/libraries/shared/fetcher/state_diff_fetcher_test.go index 257032cc..96c1dc8d 100644 --- a/libraries/shared/fetcher/state_diff_fetcher_test.go +++ b/libraries/shared/fetcher/state_diff_fetcher_test.go @@ -17,10 +17,6 @@ package fetcher_test import ( - "bytes" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -51,26 +47,8 @@ var _ = Describe("StateDiffFetcher", func() { stateDiffPayloads, fetchErr := stateDiffFetcher.FetchStateDiffsAt(blockHeights) Expect(fetchErr).ToNot(HaveOccurred()) Expect(len(stateDiffPayloads)).To(Equal(2)) - // Can only rlp encode the slice of diffs as part of a struct - // Rlp encoding allows us to compare content of the slices when the order in the slice may vary - expectedPayloadsStruct := struct { - payloads []*statediff.Payload - }{ - []*statediff.Payload{ - &test_data.MockStatediffPayload, - &test_data.MockStatediffPayload2, - }, - } - expectedPayloadsBytes, rlpErr1 := rlp.EncodeToBytes(expectedPayloadsStruct) - Expect(rlpErr1).ToNot(HaveOccurred()) - receivedPayloadsStruct := struct { - payloads []*statediff.Payload - }{ - stateDiffPayloads, - } - receivedPayloadsBytes, rlpErr2 := rlp.EncodeToBytes(receivedPayloadsStruct) - Expect(rlpErr2).ToNot(HaveOccurred()) - Expect(bytes.Equal(expectedPayloadsBytes, receivedPayloadsBytes)).To(BeTrue()) + Expect(stateDiffPayloads[0]).To(Equal(test_data.MockStatediffPayload)) + Expect(stateDiffPayloads[1]).To(Equal(test_data.MockStatediffPayload2)) }) }) }) diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go index 26754998..43436d8a 100644 --- a/libraries/shared/mocks/backfiller.go +++ b/libraries/shared/mocks/backfiller.go @@ -16,14 +16,18 @@ package mocks -import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +import ( + "errors" + + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +) // BackFiller mock for tests type BackFiller struct { StorageDiffsToReturn []utils.StorageDiff - BackFillErr error - PassedStartingBlock uint64 + BackFillErrs []error PassedEndingBlock uint64 + StartingBlock uint64 } // SetStorageDiffsToReturn for tests @@ -32,8 +36,24 @@ func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) } // BackFill mock method -func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) { - backFiller.PassedStartingBlock = startingBlock +func (backFiller *BackFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { + if endingBlock < backFiller.StartingBlock { + return errors.New("backfill: ending block number needs to be greater than starting block number") + } backFiller.PassedEndingBlock = endingBlock - return backFiller.StorageDiffsToReturn, backFiller.BackFillErr + go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) { + errLen := len(backFiller.BackFillErrs) + for i, diff := range backFiller.StorageDiffsToReturn { + if i < errLen { + err := backFiller.BackFillErrs[i] + if err != nil { + errChan <- err + continue + } + } + backFill <- diff + } + done <- true + }(backFill, errChan, done) + return nil } diff --git a/libraries/shared/mocks/state_diff_fetcher.go b/libraries/shared/mocks/state_diff_fetcher.go index 52c06803..03a38dc8 100644 --- a/libraries/shared/mocks/state_diff_fetcher.go +++ b/libraries/shared/mocks/state_diff_fetcher.go @@ -18,31 +18,33 @@ package mocks import ( "errors" + "sync/atomic" "github.com/ethereum/go-ethereum/statediff" ) // StateDiffFetcher mock for tests type StateDiffFetcher struct { - PayloadsToReturn map[uint64]*statediff.Payload - FetchErr error + PayloadsToReturn map[uint64]statediff.Payload + FetchErrs map[uint64]error CalledAtBlockHeights [][]uint64 -} - -// SetPayloadsToReturn for tests -func (fetcher *StateDiffFetcher) SetPayloadsToReturn(payloads map[uint64]*statediff.Payload) { - fetcher.PayloadsToReturn = payloads + CalledTimes int64 } // FetchStateDiffsAt mock method -func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { - fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights) +func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) { if fetcher.PayloadsToReturn == nil { - return nil, errors.New("MockStateDiffFetcher needs to be initialized with payloads to return") + return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") } - results := make([]*statediff.Payload, 0, len(blockHeights)) + atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment + fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights) + results := make([]statediff.Payload, 0, len(blockHeights)) for _, height := range blockHeights { results = append(results, fetcher.PayloadsToReturn[height]) + err, ok := fetcher.FetchErrs[height] + if ok && err != nil { + return nil, err + } } return results, nil } diff --git a/libraries/shared/mocks/statediff_streamer.go b/libraries/shared/mocks/statediff_streamer.go new file mode 100644 index 00000000..cd387ee6 --- /dev/null +++ b/libraries/shared/mocks/statediff_streamer.go @@ -0,0 +1,43 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" +) + +// StateDiffStreamer is the underlying struct for the Streamer interface +type StateDiffStreamer struct { + PassedPayloadChan chan statediff.Payload + ReturnSub *rpc.ClientSubscription + ReturnErr error + StreamPayloads []statediff.Payload +} + +// Stream is the main loop for subscribing to data from the Geth state diff process +func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + sds.PassedPayloadChan = payloadChan + + go func() { + for _, payload := range sds.StreamPayloads { + sds.PassedPayloadChan <- payload + } + }() + + return sds.ReturnSub, sds.ReturnErr +} diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index 862470f7..d4928eeb 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -16,30 +16,9 @@ package mocks -import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - -// ClosingStorageFetcher is a mock fetcher for use in tests without backfilling -type ClosingStorageFetcher struct { - DiffsToReturn []utils.StorageDiff - ErrsToReturn []error -} - -// NewClosingStorageFetcher returns a new ClosingStorageFetcher -func NewClosingStorageFetcher() *ClosingStorageFetcher { - return &ClosingStorageFetcher{} -} - -// FetchStorageDiffs mock method -func (fetcher *ClosingStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { - defer close(out) - defer close(errs) - for _, err := range fetcher.ErrsToReturn { - errs <- err - } - for _, diff := range fetcher.DiffsToReturn { - out <- diff - } -} +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +) // StorageFetcher is a mock fetcher for use in tests with backfilling type StorageFetcher struct { diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index a3ecc709..84a80a30 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -29,6 +29,7 @@ type MockStorageQueue struct { DeletePassedIds []int GetAllErr error DiffsToReturn []utils.StorageDiff + GetAllCalled bool } // Add mock method @@ -41,10 +42,18 @@ func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { // Delete mock method func (queue *MockStorageQueue) Delete(id int) error { queue.DeletePassedIds = append(queue.DeletePassedIds, id) + diffsToReturn := make([]utils.StorageDiff, 0) + for _, diff := range queue.DiffsToReturn { + if diff.Id != id { + diffsToReturn = append(diffsToReturn, diff) + } + } + queue.DiffsToReturn = diffsToReturn return queue.DeleteErr } // GetAll mock method func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { + queue.GetAllCalled = true return queue.DiffsToReturn, queue.GetAllErr } diff --git a/libraries/shared/storage/backfill.go b/libraries/shared/storage/backfill.go deleted file mode 100644 index 4cbe29ac..00000000 --- a/libraries/shared/storage/backfill.go +++ /dev/null @@ -1,87 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package storage - -import ( - "errors" - "fmt" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" - "github.com/sirupsen/logrus" - - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" -) - -// BackFiller is the backfilling interface -type BackFiller interface { - BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) -} - -// backFiller is the backfilling struct -type backFiller struct { - fetcher fetcher.StateDiffFetcher -} - -// NewStorageBackFiller returns a BackFiller -func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher) BackFiller { - return &backFiller{ - fetcher: fetcher, - } -} - -// BackFill uses the provided config to fetch and return the state diff at the specified blocknumber -// StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) -func (bf *backFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) { - results := make([]utils.StorageDiff, 0) - if endingBlock < startingBlock { - return nil, errors.New("backfill: ending block number needs to be greater than starting block number") - } - blockHeights := make([]uint64, 0, endingBlock-startingBlock+1) - for i := startingBlock; i <= endingBlock; i++ { - blockHeights = append(blockHeights, i) - } - payloads, err := bf.fetcher.FetchStateDiffsAt(blockHeights) - if err != nil { - return nil, err - } - for _, payload := range payloads { - stateDiff := new(statediff.StateDiff) - stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) - if stateDiffDecodeErr != nil { - return nil, stateDiffDecodeErr - } - accounts := utils.GetAccountsFromDiff(*stateDiff) - for _, account := range accounts { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) - for _, storage := range account.Storage { - diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) - if formatErr != nil { - return nil, formatErr - } - logrus.Trace("adding storage diff to results", - "keccak of address: ", diff.HashedAddress.Hex(), - "block height: ", diff.BlockHeight, - "storage key: ", diff.StorageKey.Hex(), - "storage value: ", diff.StorageValue.Hex()) - results = append(results, diff) - } - } - } - return results, nil -} diff --git a/libraries/shared/storage/backfill_test.go b/libraries/shared/storage/backfill_test.go deleted file mode 100644 index 4b3304e2..00000000 --- a/libraries/shared/storage/backfill_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package storage_test - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" -) - -var _ = Describe("BackFiller", func() { - Describe("BackFill", func() { - var ( - fetcher *mocks.StateDiffFetcher - backFiller storage.BackFiller - ) - BeforeEach(func() { - fetcher = new(mocks.StateDiffFetcher) - fetcher.SetPayloadsToReturn(map[uint64]*statediff.Payload{ - test_data.BlockNumber.Uint64(): &test_data.MockStatediffPayload, - test_data.BlockNumber2.Uint64(): &test_data.MockStatediffPayload2, - }) - backFiller = storage.NewStorageBackFiller(fetcher) - }) - It("Batch calls statediff_stateDiffAt", func() { - backFillStorage, backFillErr := backFiller.BackFill(test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64()) - Expect(backFillErr).ToNot(HaveOccurred()) - Expect(len(backFillStorage)).To(Equal(4)) - // Can only rlp encode the slice of diffs as part of a struct - // Rlp encoding allows us to compare content of the slices when the order in the slice may vary - expectedDiffStruct := struct { - diffs []utils.StorageDiff - }{ - []utils.StorageDiff{ - test_data.CreatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, - test_data.DeletedExpectedStorageDiff, - }, - } - expectedDiffBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffStruct) - Expect(rlpErr1).ToNot(HaveOccurred()) - receivedDiffStruct := struct { - diffs []utils.StorageDiff - }{ - backFillStorage, - } - receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct) - Expect(rlpErr2).ToNot(HaveOccurred()) - Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue()) - }) - }) -}) diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go new file mode 100644 index 00000000..ffdb9386 --- /dev/null +++ b/libraries/shared/storage/backfiller.go @@ -0,0 +1,158 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package storage + +import ( + "errors" + "fmt" + "sync/atomic" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +) + +const ( + DefaultMaxBatchSize uint64 = 5000 + defaultMaxBatchNumber int64 = 100 +) + +// BackFiller is the backfilling interface +type BackFiller interface { + BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error +} + +// backFiller is the backfilling struct +type backFiller struct { + fetcher fetcher.StateDiffFetcher + batchSize uint64 + startingBlock uint64 +} + +// NewStorageBackFiller returns a BackFiller +func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, startingBlock, batchSize uint64) BackFiller { + if batchSize == 0 { + batchSize = DefaultMaxBatchSize + } + return &backFiller{ + fetcher: fetcher, + batchSize: batchSize, + startingBlock: startingBlock, + } +} + +// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks +// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently +func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { + if endingBlock < bf.startingBlock { + return errors.New("backfill: ending block number needs to be greater than starting block number") + } + // break the range up into bins of smaller ranges + length := endingBlock - bf.startingBlock + 1 + numberOfBins := length / bf.batchSize + remainder := length % bf.batchSize + if remainder != 0 { + numberOfBins++ + } + blockRangeBins := make([][]uint64, numberOfBins) + for i := range blockRangeBins { + nextBinStart := bf.startingBlock + uint64(bf.batchSize) + if nextBinStart > endingBlock { + nextBinStart = endingBlock + 1 + } + blockRange := make([]uint64, 0, nextBinStart-bf.startingBlock+1) + for j := bf.startingBlock; j < nextBinStart; j++ { + blockRange = append(blockRange, j) + } + bf.startingBlock = nextBinStart + blockRangeBins[i] = blockRange + } + + // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have + var activeCount int64 + // channel for processing goroutines to signal when they are done + processingDone := make(chan bool) + + // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range + go func() { + for _, blockHeights := range blockRangeBins { + // if we have reached our limit of active goroutines + // wait for one to finish before starting the next + if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber { + // this blocks until a process signals it has finished + // immediately forwards the signal to the normal listener so that it keeps the correct count + processingDone <- <-processingDone + } + go func(blockHeights []uint64) { + payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) + if fetchErr != nil { + errChan <- fetchErr + } + for _, payload := range payloads { + stateDiff := new(statediff.StateDiff) + stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) + if stateDiffDecodeErr != nil { + errChan <- stateDiffDecodeErr + continue + } + accounts := utils.GetAccountsFromDiff(*stateDiff) + for _, account := range accounts { + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) + for _, storage := range account.Storage { + diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) + if formatErr != nil { + errChan <- formatErr + continue + } + logrus.Trace("adding storage diff to results", + "keccak of address: ", diff.HashedAddress.Hex(), + "block height: ", diff.BlockHeight, + "storage key: ", diff.StorageKey.Hex(), + "storage value: ", diff.StorageValue.Hex()) + backFill <- diff + } + } + } + // when this goroutine is done, send out a signal + processingDone <- true + }(blockHeights) + } + }() + + // goroutine that listens on the processingDone chan + // keeps track of the number of processing goroutines that have finished + // when they have all finished, sends the final signal out + go func() { + goroutinesFinished := 0 + for { + select { + case <-processingDone: + atomic.AddInt64(&activeCount, -1) + goroutinesFinished++ + if goroutinesFinished == int(numberOfBins) { + done <- true + return + } + } + } + }() + + return nil +} diff --git a/libraries/shared/storage/backfiller_test.go b/libraries/shared/storage/backfiller_test.go new file mode 100644 index 00000000..1a893b39 --- /dev/null +++ b/libraries/shared/storage/backfiller_test.go @@ -0,0 +1,232 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package storage_test + +import ( + "errors" + + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" +) + +var _ = Describe("BackFiller", func() { + Describe("BackFill", func() { + var ( + mockFetcher *mocks.StateDiffFetcher + backFiller storage.BackFiller + ) + BeforeEach(func() { + mockFetcher = new(mocks.StateDiffFetcher) + mockFetcher.PayloadsToReturn = map[uint64]statediff.Payload{ + test_data.BlockNumber.Uint64(): test_data.MockStatediffPayload, + test_data.BlockNumber2.Uint64(): test_data.MockStatediffPayload2, + } + }) + + It("batch calls statediff_stateDiffAt", func() { + backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 100) + backFill := make(chan utils.StorageDiff) + done := make(chan bool) + errChan := make(chan error) + backFillInitErr := backFiller.BackFill( + test_data.BlockNumber2.Uint64(), + backFill, + errChan, + done) + Expect(backFillInitErr).ToNot(HaveOccurred()) + var diffs []utils.StorageDiff + for { + select { + case diff := <-backFill: + diffs = append(diffs, diff) + continue + case err := <-errChan: + Expect(err).ToNot(HaveOccurred()) + continue + case <-done: + break + } + break + } + Expect(mockFetcher.CalledTimes).To(Equal(int64(1))) + Expect(len(diffs)).To(Equal(4)) + Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue()) + }) + + It("has a configurable batch size", func() { + backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFill := make(chan utils.StorageDiff) + done := make(chan bool) + errChan := make(chan error) + backFillInitErr := backFiller.BackFill( + test_data.BlockNumber2.Uint64(), + backFill, + errChan, + done) + Expect(backFillInitErr).ToNot(HaveOccurred()) + var diffs []utils.StorageDiff + for { + select { + case diff := <-backFill: + diffs = append(diffs, diff) + continue + case err := <-errChan: + Expect(err).ToNot(HaveOccurred()) + continue + case <-done: + break + } + break + } + Expect(mockFetcher.CalledTimes).To(Equal(int64(2))) + Expect(len(diffs)).To(Equal(4)) + Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue()) + }) + + It("handles bin numbers in excess of the goroutine limit (100)", func() { + payloadsToReturn := make(map[uint64]statediff.Payload, 1001) + for i := test_data.BlockNumber.Uint64(); i <= test_data.BlockNumber.Uint64()+1000; i++ { + payloadsToReturn[i] = test_data.MockStatediffPayload + } + mockFetcher.PayloadsToReturn = payloadsToReturn + // batch size of 2 with 1001 block range => 501 bins + backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 2) + backFill := make(chan utils.StorageDiff) + done := make(chan bool) + errChan := make(chan error) + backFillInitErr := backFiller.BackFill( + test_data.BlockNumber.Uint64()+1000, + backFill, + errChan, + done) + Expect(backFillInitErr).ToNot(HaveOccurred()) + var diffs []utils.StorageDiff + for { + select { + case diff := <-backFill: + diffs = append(diffs, diff) + continue + case err := <-errChan: + Expect(err).ToNot(HaveOccurred()) + continue + case <-done: + break + } + break + } + Expect(mockFetcher.CalledTimes).To(Equal(int64(501))) + Expect(len(diffs)).To(Equal(3003)) + Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue()) + Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue()) + }) + + It("passes fetcher errors forward", func() { + mockFetcher.FetchErrs = map[uint64]error{ + test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"), + } + backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFill := make(chan utils.StorageDiff) + done := make(chan bool) + errChan := make(chan error) + backFillInitErr := backFiller.BackFill( + test_data.BlockNumber2.Uint64(), + backFill, + errChan, + done) + Expect(backFillInitErr).ToNot(HaveOccurred()) + var numOfErrs int + var diffs []utils.StorageDiff + for { + select { + case diff := <-backFill: + diffs = append(diffs, diff) + continue + case err := <-errChan: + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("mock fetcher error")) + numOfErrs++ + continue + case <-done: + break + } + break + } + Expect(mockFetcher.CalledTimes).To(Equal(int64(2))) + Expect(numOfErrs).To(Equal(1)) + Expect(len(diffs)).To(Equal(1)) + Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue()) + + mockFetcher.FetchErrs = map[uint64]error{ + test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"), + test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"), + } + mockFetcher.CalledTimes = 0 + backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1) + backFill = make(chan utils.StorageDiff) + done = make(chan bool) + errChan = make(chan error) + backFillInitErr = backFiller.BackFill( + test_data.BlockNumber2.Uint64(), + backFill, + errChan, + done) + Expect(backFillInitErr).ToNot(HaveOccurred()) + numOfErrs = 0 + diffs = []utils.StorageDiff{} + for { + select { + case diff := <-backFill: + diffs = append(diffs, diff) + continue + case err := <-errChan: + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("mock fetcher error")) + numOfErrs++ + continue + case <-done: + break + } + break + } + Expect(mockFetcher.CalledTimes).To(Equal(int64(2))) + Expect(numOfErrs).To(Equal(2)) + Expect(len(diffs)).To(Equal(0)) + }) + }) +}) + +func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool { + for _, d := range diffs { + if d == diff { + return true + } + } + return false +} diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go index 564a4e15..458cb2b7 100644 --- a/libraries/shared/streamer/statediff_streamer.go +++ b/libraries/shared/streamer/statediff_streamer.go @@ -1,16 +1,18 @@ -// Copyright 2019 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . package streamer @@ -18,24 +20,29 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/core" ) +// Streamer is the interface for streaming a statediff subscription type Streamer interface { - Stream(chan statediff.Payload) (*rpc.ClientSubscription, error) + Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) } +// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface type StateDiffStreamer struct { - client core.RPCClient + Client core.RPCClient } -func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { - logrus.Info("streaming diffs from geth") - return streamer.client.Subscribe("statediff", payloadChan, "stream") -} - -func NewStateDiffStreamer(client core.RPCClient) StateDiffStreamer { - return StateDiffStreamer{ - client: client, +// NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the IStateDiffStreamer interface +func NewStateDiffStreamer(client core.RPCClient) Streamer { + return &StateDiffStreamer{ + Client: client, } } + +// Stream is the main loop for subscribing to data from the Geth state diff process +func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + logrus.Info("streaming diffs from geth") + return sds.Client.Subscribe("statediff", payloadChan, "stream") +} diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go index ef709842..46edd932 100644 --- a/libraries/shared/test_data/statediff.go +++ b/libraries/shared/test_data/statediff.go @@ -151,33 +151,33 @@ var ( } CreatedExpectedStorageDiff = utils.StorageDiff{ - Id: 1333, + Id: 0, HashedAddress: common.BytesToHash(ContractLeafKey[:]), - BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(SmallStorageValue), } UpdatedExpectedStorageDiff = utils.StorageDiff{ - Id: 1334, + Id: 0, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), - BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(LargeStorageValue), } UpdatedExpectedStorageDiff2 = utils.StorageDiff{ - Id: 1335, + Id: 0, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), - BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHash: common.HexToHash(BlockHash2), BlockHeight: int(BlockNumber2.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(SmallStorageValue), } DeletedExpectedStorageDiff = utils.StorageDiff{ - Id: 1336, + Id: 0, HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), - BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(SmallStorageValue), diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index 32fd873a..7ab70897 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -20,6 +20,7 @@ import ( "time" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/chunker" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 6ff1f771..922d9399 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -32,8 +32,8 @@ import ( type IStorageWatcher interface { AddTransformers(initializers []transformer.StorageTransformerInitializer) - Execute(queueRecheckInterval time.Duration, backFill bool) - BackFill(backFiller storage.BackFiller, minDeploymentBlock int) + Execute(queueRecheckInterval time.Duration, backFillOn bool) + BackFill(backFiller storage.BackFiller) } type StorageWatcher struct { @@ -43,65 +43,67 @@ type StorageWatcher struct { KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer DiffsChan chan utils.StorageDiff ErrsChan chan error + BackFillDoneChan chan bool StartingSyncBlockChan chan int } -func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { +func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher { queue := storage.NewStorageQueue(db) transformers := make(map[common.Hash]transformer.StorageTransformer) - return StorageWatcher{ + return &StorageWatcher{ db: db, - StorageFetcher: fetcher, - DiffsChan: make(chan utils.StorageDiff), + StorageFetcher: f, + DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize), ErrsChan: make(chan error), StartingSyncBlockChan: make(chan int), + BackFillDoneChan: make(chan bool), Queue: queue, KeccakAddressTransformers: transformers, } } -func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { +func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { storageTransformer := initializer(storageWatcher.db) storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer } } -// BackFill configures the StorageWatcher to backfill missed storage diffs using a modified archival geth client -func (storageWatcher StorageWatcher) BackFill(backFiller storage.BackFiller, minDeploymentBlock int) { - // need to learn which block to perform the backfill process up to +// BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher +func (storageWatcher *StorageWatcher) BackFill(backFiller storage.BackFiller) { // this blocks until the Execute process sends us the first block number it sees startingSyncBlock := <-storageWatcher.StartingSyncBlockChan - backFilledDiffs, err := backFiller.BackFill(uint64(minDeploymentBlock), uint64(startingSyncBlock)) - if err != nil { - storageWatcher.ErrsChan <- err - } - for _, storageDiff := range backFilledDiffs { - storageWatcher.DiffsChan <- storageDiff + backFillInitErr := backFiller.BackFill(uint64(startingSyncBlock), + storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan) + if backFillInitErr != nil { + logrus.Warn(backFillInitErr) } } -func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration, backFill bool) { +// Execute runs the StorageWatcher processes +func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration, backFillOn bool) { ticker := time.NewTicker(queueRecheckInterval) go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan) start := true for { select { case fetchErr := <-storageWatcher.ErrsChan: - logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) + logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error())) case diff := <-storageWatcher.DiffsChan: - if start && backFill { + if start && backFillOn { storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1 start = false } storageWatcher.processRow(diff) case <-ticker.C: storageWatcher.processQueue() + case <-storageWatcher.BackFillDoneChan: + logrus.Info("storage watcher backfill process has finished") } } } -func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { +func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] return storageTransformer, ok } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 29f64672..820cc45e 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,12 +17,12 @@ package watcher_test import ( + "errors" "io/ioutil" "os" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rlp" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" @@ -41,7 +41,7 @@ var _ = Describe("Storage Watcher", func() { It("adds transformers", func() { fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} - w := watcher.NewStorageWatcher(mocks.NewClosingStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) + w := watcher.NewStorageWatcher(mocks.NewStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) @@ -51,17 +51,17 @@ var _ = Describe("Storage Watcher", func() { Describe("Execute", func() { var ( - mockFetcher *mocks.ClosingStorageFetcher + mockFetcher *mocks.StorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer csvDiff utils.StorageDiff - storageWatcher watcher.StorageWatcher + storageWatcher *watcher.StorageWatcher hashedAddress common.Hash ) BeforeEach(func() { hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") - mockFetcher = mocks.NewClosingStorageFetcher() + mockFetcher = mocks.NewStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} csvDiff = utils.StorageDiff{ @@ -115,7 +115,6 @@ var _ = Describe("Storage Watcher", func() { go storageWatcher.Execute(time.Hour, false) - Expect(<-storageWatcher.ErrsChan).To(BeNil()) Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) @@ -240,26 +239,26 @@ var _ = Describe("Storage Watcher", func() { Describe("BackFill", func() { var ( - mockFetcher *mocks.StorageFetcher - mockBackFiller *mocks.BackFiller - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - csvDiff utils.StorageDiff - storageWatcher watcher.StorageWatcher - hashedAddress common.Hash + mockFetcher *mocks.StorageFetcher + mockBackFiller *mocks.BackFiller + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + mockTransformer2 *mocks.MockStorageTransformer + mockTransformer3 *mocks.MockStorageTransformer + csvDiff utils.StorageDiff + storageWatcher *watcher.StorageWatcher + hashedAddress common.Hash ) BeforeEach(func() { mockBackFiller = new(mocks.BackFiller) - mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ - test_data.CreatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, - test_data.DeletedExpectedStorageDiff}) + mockBackFiller.StartingBlock = test_data.BlockNumber.Uint64() hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") mockFetcher = mocks.NewStorageFetcher() mockQueue = &mocks.MockStorageQueue{} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} + mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])} + mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])} csvDiff = utils.StorageDiff{ ID: 1337, HashedAddress: hashedAddress, @@ -273,86 +272,189 @@ var _ = Describe("Storage Watcher", func() { Describe("transforming streamed and backfilled queued storage diffs", func() { BeforeEach(func() { mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} - mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff, + mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ test_data.CreatedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff, + test_data.DeletedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff2, - test_data.DeletedExpectedStorageDiff} + }) + mockQueue.DiffsToReturn = []utils.StorageDiff{} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{ + mockTransformer.FakeTransformerInitializer, + mockTransformer2.FakeTransformerInitializer, + mockTransformer3.FakeTransformerInitializer, + }) }) - It("executes transformer for storage diffs", func(done Done) { - go storageWatcher.BackFill(mockBackFiller, int(test_data.BlockNumber.Uint64())) - go storageWatcher.Execute(time.Nanosecond, true) - expectedDiffsStruct := struct { - diffs []utils.StorageDiff - }{ - []utils.StorageDiff{ - csvDiff, - test_data.CreatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff, - test_data.UpdatedExpectedStorageDiff2, - test_data.DeletedExpectedStorageDiff, - }, - } - expectedDiffsBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffsStruct) - Expect(rlpErr1).ToNot(HaveOccurred()) - Eventually(func() []byte { - diffsStruct := struct { - diffs []utils.StorageDiff - }{ - mockTransformer.PassedDiffs, - } - diffsBytes, rlpErr2 := rlp.EncodeToBytes(diffsStruct) - Expect(rlpErr2).ToNot(HaveOccurred()) - return diffsBytes - }).Should(Equal(expectedDiffsBytes)) + It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) { + go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.Execute(time.Hour, true) + + Eventually(func() int { + return len(mockTransformer.PassedDiffs) + }).Should(Equal(1)) + + Eventually(func() int { + return len(mockTransformer2.PassedDiffs) + }).Should(Equal(1)) + + Eventually(func() int { + return len(mockTransformer3.PassedDiffs) + }).Should(Equal(3)) + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) + Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) close(done) }) - It("deletes diffs from queue if transformer execution is successful", func(done Done) { - go storageWatcher.Execute(time.Nanosecond, false) - expectedIdsStruct := struct { - diffs []int - }{ - []int{ - csvDiff.ID, - test_data.CreatedExpectedStorageDiff.ID, - test_data.UpdatedExpectedStorageDiff.ID, - test_data.UpdatedExpectedStorageDiff2.ID, - test_data.DeletedExpectedStorageDiff.ID, - }, - } - expectedIdsBytes, rlpErr1 := rlp.EncodeToBytes(expectedIdsStruct) - Expect(rlpErr1).ToNot(HaveOccurred()) - Eventually(func() []byte { - idsStruct := struct { - diffs []int - }{ - mockQueue.DeletePassedIds, + It("adds diffs to the queue if transformation fails", func(done Done) { + mockTransformer3.ExecuteErr = fakes.FakeError + go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.Execute(time.Hour, true) + + Eventually(func() int { + return len(mockTransformer.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer2.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer3.PassedDiffs) + }).Should(Equal(3)) + + Eventually(func() bool { + return mockQueue.AddCalled + }).Should(BeTrue()) + Eventually(func() []utils.StorageDiff { + if len(mockQueue.AddPassedDiffs) > 2 { + return mockQueue.AddPassedDiffs } - idsBytes, rlpErr2 := rlp.EncodeToBytes(idsStruct) - Expect(rlpErr2).ToNot(HaveOccurred()) - return idsBytes - }).Should(Equal(expectedIdsBytes)) + return []utils.StorageDiff{} + }).Should(Equal([]utils.StorageDiff{ + test_data.UpdatedExpectedStorageDiff, + test_data.DeletedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, + })) + + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) close(done) }) - It("logs error if deleting persisted diff fails", func(done Done) { - mockQueue.DeleteErr = fakes.FakeError + It("logs a backfill error", func(done Done) { tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(time.Nanosecond, false) + mockBackFiller.BackFillErrs = []error{ + nil, + nil, + nil, + errors.New("mock backfiller error"), + } + + go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.Execute(time.Hour, true) + + Eventually(func() int { + return len(mockTransformer.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer2.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer3.PassedDiffs) + }).Should(Equal(2)) + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring("mock backfiller error")) + close(done) + }) + + It("logs when backfill finishes", func(done Done) { + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.Execute(time.Hour, true) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) + }).Should(ContainSubstring("storage watcher backfill process has finished")) + close(done) + }) + }) + + Describe("transforms storage diffs", func() { + BeforeEach(func() { + test_data.CreatedExpectedStorageDiff.ID = 1334 + test_data.UpdatedExpectedStorageDiff.ID = 1335 + test_data.DeletedExpectedStorageDiff.ID = 1336 + test_data.UpdatedExpectedStorageDiff2.ID = 1337 + mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff, + test_data.CreatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff, + test_data.DeletedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, + } + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher.Queue = mockQueue + storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{ + mockTransformer.FakeTransformerInitializer, + mockTransformer2.FakeTransformerInitializer, + mockTransformer3.FakeTransformerInitializer, + }) + }) + + It("executes transformers on queued storage diffs", func(done Done) { + go storageWatcher.BackFill(mockBackFiller) + go storageWatcher.Execute(time.Nanosecond, true) + + Eventually(func() int { + return len(mockTransformer.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer2.PassedDiffs) + }).Should(Equal(1)) + Eventually(func() int { + return len(mockTransformer3.PassedDiffs) + }).Should(Equal(3)) + Eventually(func() bool { + return mockQueue.GetAllCalled + }).Should(BeTrue()) + Eventually(func() []int { + if len(mockQueue.DeletePassedIds) > 4 { + return mockQueue.DeletePassedIds + } + return []int{} + }).Should(Equal([]int{ + csvDiff.ID, + test_data.CreatedExpectedStorageDiff.ID, + test_data.UpdatedExpectedStorageDiff.ID, + test_data.DeletedExpectedStorageDiff.ID, + test_data.UpdatedExpectedStorageDiff2.ID, + })) + + Expect(mockQueue.AddCalled).To(Not(BeTrue())) + Expect(len(mockQueue.DiffsToReturn)).To(Equal(0)) + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff)) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff)) + Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2)) close(done) }) })