split backfill range up into smaller bins and process them concurrently; improve tests; review fixes

This commit is contained in:
Ian Norden 2019-11-01 00:29:57 -05:00
parent 563832422c
commit 8562abd180
18 changed files with 744 additions and 366 deletions

View File

@ -26,20 +26,24 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
) )
const (
PayloadChanBufferSize = 20000 // the max eth sub buffer size
)
type GethRPCStorageFetcher struct { type GethRPCStorageFetcher struct {
statediffPayloadChan chan statediff.Payload StatediffPayloadChan chan statediff.Payload
streamer streamer.Streamer streamer streamer.Streamer
} }
func NewGethRPCStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRPCStorageFetcher { func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher {
return GethRPCStorageFetcher{ return GethRPCStorageFetcher{
statediffPayloadChan: statediffPayloadChan, StatediffPayloadChan: make(chan statediff.Payload, PayloadChanBufferSize),
streamer: streamer, streamer: streamer,
} }
} }
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
ethStatediffPayloadChan := fetcher.statediffPayloadChan ethStatediffPayloadChan := fetcher.StatediffPayloadChan
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan)
if clientSubErr != nil { if clientSubErr != nil {
errs <- clientSubErr errs <- clientSubErr

View File

@ -59,15 +59,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa
var _ = Describe("Geth RPC Storage Fetcher", func() { var _ = Describe("Geth RPC Storage Fetcher", func() {
var streamer MockStoragediffStreamer var streamer MockStoragediffStreamer
var statediffPayloadChan chan statediff.Payload
var statediffFetcher fetcher.GethRPCStorageFetcher var statediffFetcher fetcher.GethRPCStorageFetcher
var storagediffChan chan utils.StorageDiff var storagediffChan chan utils.StorageDiff
var errorChan chan error var errorChan chan error
BeforeEach(func() { BeforeEach(func() {
streamer = MockStoragediffStreamer{} streamer = MockStoragediffStreamer{}
statediffPayloadChan = make(chan statediff.Payload, 1) statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer)
statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer, statediffPayloadChan)
storagediffChan = make(chan utils.StorageDiff) storagediffChan = make(chan utils.StorageDiff)
errorChan = make(chan error) errorChan = make(chan error)
}) })
@ -91,9 +89,9 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan)
streamedPayload := <-statediffPayloadChan streamedPayload := <-statediffFetcher.StatediffPayloadChan
Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload)) Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload))
Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan)) Expect(streamer.PassedPayloadChan).To(Equal(statediffFetcher.StatediffPayloadChan))
close(done) close(done)
}) })

View File

@ -25,7 +25,7 @@ import (
// StateDiffFetcher is the state diff fetching interface // StateDiffFetcher is the state diff fetching interface
type StateDiffFetcher interface { type StateDiffFetcher interface {
FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error)
} }
// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
@ -35,6 +35,8 @@ type BatchClient interface {
// stateDiffFetcher is the state diff fetching struct // stateDiffFetcher is the state diff fetching struct
type stateDiffFetcher struct { type stateDiffFetcher struct {
// stateDiffFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state
// http.Client is thread-safe
client BatchClient client BatchClient
} }
@ -49,7 +51,7 @@ func NewStateDiffFetcher(bc BatchClient) StateDiffFetcher {
// FetchStateDiffsAt fetches the statediff payloads at the given block heights // FetchStateDiffsAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
batch := make([]client.BatchElem, 0) batch := make([]client.BatchElem, 0)
for _, height := range blockHeights { for _, height := range blockHeights {
batch = append(batch, client.BatchElem{ batch = append(batch, client.BatchElem{
@ -62,12 +64,15 @@ func (fetcher *stateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*st
if batchErr != nil { if batchErr != nil {
return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error()) return nil, fmt.Errorf("stateDiffFetcher err: %s", batchErr.Error())
} }
results := make([]*statediff.Payload, 0, len(blockHeights)) results := make([]statediff.Payload, 0, len(blockHeights))
for _, batchElem := range batch { for _, batchElem := range batch {
if batchElem.Error != nil { if batchElem.Error != nil {
return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error()) return nil, fmt.Errorf("stateDiffFetcher err: %s", batchElem.Error.Error())
} }
results = append(results, batchElem.Result.(*statediff.Payload)) payload, ok := batchElem.Result.(*statediff.Payload)
if ok {
results = append(results, *payload)
}
} }
return results, nil return results, nil
} }

View File

@ -17,10 +17,6 @@
package fetcher_test package fetcher_test
import ( import (
"bytes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -51,26 +47,8 @@ var _ = Describe("StateDiffFetcher", func() {
stateDiffPayloads, fetchErr := stateDiffFetcher.FetchStateDiffsAt(blockHeights) stateDiffPayloads, fetchErr := stateDiffFetcher.FetchStateDiffsAt(blockHeights)
Expect(fetchErr).ToNot(HaveOccurred()) Expect(fetchErr).ToNot(HaveOccurred())
Expect(len(stateDiffPayloads)).To(Equal(2)) Expect(len(stateDiffPayloads)).To(Equal(2))
// Can only rlp encode the slice of diffs as part of a struct Expect(stateDiffPayloads[0]).To(Equal(test_data.MockStatediffPayload))
// Rlp encoding allows us to compare content of the slices when the order in the slice may vary Expect(stateDiffPayloads[1]).To(Equal(test_data.MockStatediffPayload2))
expectedPayloadsStruct := struct {
payloads []*statediff.Payload
}{
[]*statediff.Payload{
&test_data.MockStatediffPayload,
&test_data.MockStatediffPayload2,
},
}
expectedPayloadsBytes, rlpErr1 := rlp.EncodeToBytes(expectedPayloadsStruct)
Expect(rlpErr1).ToNot(HaveOccurred())
receivedPayloadsStruct := struct {
payloads []*statediff.Payload
}{
stateDiffPayloads,
}
receivedPayloadsBytes, rlpErr2 := rlp.EncodeToBytes(receivedPayloadsStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
Expect(bytes.Equal(expectedPayloadsBytes, receivedPayloadsBytes)).To(BeTrue())
}) })
}) })
}) })

View File

@ -16,14 +16,18 @@
package mocks package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" import (
"errors"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
// BackFiller mock for tests // BackFiller mock for tests
type BackFiller struct { type BackFiller struct {
StorageDiffsToReturn []utils.StorageDiff StorageDiffsToReturn []utils.StorageDiff
BackFillErr error BackFillErrs []error
PassedStartingBlock uint64
PassedEndingBlock uint64 PassedEndingBlock uint64
StartingBlock uint64
} }
// SetStorageDiffsToReturn for tests // SetStorageDiffsToReturn for tests
@ -32,8 +36,24 @@ func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff)
} }
// BackFill mock method // BackFill mock method
func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) { func (backFiller *BackFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
backFiller.PassedStartingBlock = startingBlock if endingBlock < backFiller.StartingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
backFiller.PassedEndingBlock = endingBlock backFiller.PassedEndingBlock = endingBlock
return backFiller.StorageDiffsToReturn, backFiller.BackFillErr go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) {
errLen := len(backFiller.BackFillErrs)
for i, diff := range backFiller.StorageDiffsToReturn {
if i < errLen {
err := backFiller.BackFillErrs[i]
if err != nil {
errChan <- err
continue
}
}
backFill <- diff
}
done <- true
}(backFill, errChan, done)
return nil
} }

View File

@ -18,31 +18,33 @@ package mocks
import ( import (
"errors" "errors"
"sync/atomic"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
) )
// StateDiffFetcher mock for tests // StateDiffFetcher mock for tests
type StateDiffFetcher struct { type StateDiffFetcher struct {
PayloadsToReturn map[uint64]*statediff.Payload PayloadsToReturn map[uint64]statediff.Payload
FetchErr error FetchErrs map[uint64]error
CalledAtBlockHeights [][]uint64 CalledAtBlockHeights [][]uint64
} CalledTimes int64
// SetPayloadsToReturn for tests
func (fetcher *StateDiffFetcher) SetPayloadsToReturn(payloads map[uint64]*statediff.Payload) {
fetcher.PayloadsToReturn = payloads
} }
// FetchStateDiffsAt mock method // FetchStateDiffsAt mock method
func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]*statediff.Payload, error) { func (fetcher *StateDiffFetcher) FetchStateDiffsAt(blockHeights []uint64) ([]statediff.Payload, error) {
fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
if fetcher.PayloadsToReturn == nil { if fetcher.PayloadsToReturn == nil {
return nil, errors.New("MockStateDiffFetcher needs to be initialized with payloads to return") return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
} }
results := make([]*statediff.Payload, 0, len(blockHeights)) atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment
fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
results := make([]statediff.Payload, 0, len(blockHeights))
for _, height := range blockHeights { for _, height := range blockHeights {
results = append(results, fetcher.PayloadsToReturn[height]) results = append(results, fetcher.PayloadsToReturn[height])
err, ok := fetcher.FetchErrs[height]
if ok && err != nil {
return nil, err
}
} }
return results, nil return results, nil
} }

View File

@ -0,0 +1,43 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
)
// StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
PassedPayloadChan chan statediff.Payload
ReturnSub *rpc.ClientSubscription
ReturnErr error
StreamPayloads []statediff.Payload
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
sds.PassedPayloadChan = payloadChan
go func() {
for _, payload := range sds.StreamPayloads {
sds.PassedPayloadChan <- payload
}
}()
return sds.ReturnSub, sds.ReturnErr
}

View File

@ -16,30 +16,9 @@
package mocks package mocks
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
// ClosingStorageFetcher is a mock fetcher for use in tests without backfilling )
type ClosingStorageFetcher struct {
DiffsToReturn []utils.StorageDiff
ErrsToReturn []error
}
// NewClosingStorageFetcher returns a new ClosingStorageFetcher
func NewClosingStorageFetcher() *ClosingStorageFetcher {
return &ClosingStorageFetcher{}
}
// FetchStorageDiffs mock method
func (fetcher *ClosingStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
defer close(out)
defer close(errs)
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
for _, diff := range fetcher.DiffsToReturn {
out <- diff
}
}
// StorageFetcher is a mock fetcher for use in tests with backfilling // StorageFetcher is a mock fetcher for use in tests with backfilling
type StorageFetcher struct { type StorageFetcher struct {

View File

@ -29,6 +29,7 @@ type MockStorageQueue struct {
DeletePassedIds []int DeletePassedIds []int
GetAllErr error GetAllErr error
DiffsToReturn []utils.StorageDiff DiffsToReturn []utils.StorageDiff
GetAllCalled bool
} }
// Add mock method // Add mock method
@ -41,10 +42,18 @@ func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error {
// Delete mock method // Delete mock method
func (queue *MockStorageQueue) Delete(id int) error { func (queue *MockStorageQueue) Delete(id int) error {
queue.DeletePassedIds = append(queue.DeletePassedIds, id) queue.DeletePassedIds = append(queue.DeletePassedIds, id)
diffsToReturn := make([]utils.StorageDiff, 0)
for _, diff := range queue.DiffsToReturn {
if diff.Id != id {
diffsToReturn = append(diffsToReturn, diff)
}
}
queue.DiffsToReturn = diffsToReturn
return queue.DeleteErr return queue.DeleteErr
} }
// GetAll mock method // GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) {
queue.GetAllCalled = true
return queue.DiffsToReturn, queue.GetAllErr return queue.DiffsToReturn, queue.GetAllErr
} }

View File

@ -1,87 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
// BackFiller is the backfilling interface
type BackFiller interface {
BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error)
}
// backFiller is the backfilling struct
type backFiller struct {
fetcher fetcher.StateDiffFetcher
}
// NewStorageBackFiller returns a BackFiller
func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher) BackFiller {
return &backFiller{
fetcher: fetcher,
}
}
// BackFill uses the provided config to fetch and return the state diff at the specified blocknumber
// StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (bf *backFiller) BackFill(startingBlock, endingBlock uint64) ([]utils.StorageDiff, error) {
results := make([]utils.StorageDiff, 0)
if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
}
blockHeights := make([]uint64, 0, endingBlock-startingBlock+1)
for i := startingBlock; i <= endingBlock; i++ {
blockHeights = append(blockHeights, i)
}
payloads, err := bf.fetcher.FetchStateDiffsAt(blockHeights)
if err != nil {
return nil, err
}
for _, payload := range payloads {
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
return nil, stateDiffDecodeErr
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
return nil, formatErr
}
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
results = append(results, diff)
}
}
}
return results, nil
}

View File

@ -1,75 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage_test
import (
"bytes"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
)
var _ = Describe("BackFiller", func() {
Describe("BackFill", func() {
var (
fetcher *mocks.StateDiffFetcher
backFiller storage.BackFiller
)
BeforeEach(func() {
fetcher = new(mocks.StateDiffFetcher)
fetcher.SetPayloadsToReturn(map[uint64]*statediff.Payload{
test_data.BlockNumber.Uint64(): &test_data.MockStatediffPayload,
test_data.BlockNumber2.Uint64(): &test_data.MockStatediffPayload2,
})
backFiller = storage.NewStorageBackFiller(fetcher)
})
It("Batch calls statediff_stateDiffAt", func() {
backFillStorage, backFillErr := backFiller.BackFill(test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64())
Expect(backFillErr).ToNot(HaveOccurred())
Expect(len(backFillStorage)).To(Equal(4))
// Can only rlp encode the slice of diffs as part of a struct
// Rlp encoding allows us to compare content of the slices when the order in the slice may vary
expectedDiffStruct := struct {
diffs []utils.StorageDiff
}{
[]utils.StorageDiff{
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff,
},
}
expectedDiffBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffStruct)
Expect(rlpErr1).ToNot(HaveOccurred())
receivedDiffStruct := struct {
diffs []utils.StorageDiff
}{
backFillStorage,
}
receivedDiffBytes, rlpErr2 := rlp.EncodeToBytes(receivedDiffStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
Expect(bytes.Equal(expectedDiffBytes, receivedDiffBytes)).To(BeTrue())
})
})
})

View File

@ -0,0 +1,158 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"errors"
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
const (
DefaultMaxBatchSize uint64 = 5000
defaultMaxBatchNumber int64 = 100
)
// BackFiller is the backfilling interface
type BackFiller interface {
BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error
}
// backFiller is the backfilling struct
type backFiller struct {
fetcher fetcher.StateDiffFetcher
batchSize uint64
startingBlock uint64
}
// NewStorageBackFiller returns a BackFiller
func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, startingBlock, batchSize uint64) BackFiller {
if batchSize == 0 {
batchSize = DefaultMaxBatchSize
}
return &backFiller{
fetcher: fetcher,
batchSize: batchSize,
startingBlock: startingBlock,
}
}
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error {
if endingBlock < bf.startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
// break the range up into bins of smaller ranges
length := endingBlock - bf.startingBlock + 1
numberOfBins := length / bf.batchSize
remainder := length % bf.batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := bf.startingBlock + uint64(bf.batchSize)
if nextBinStart > endingBlock {
nextBinStart = endingBlock + 1
}
blockRange := make([]uint64, 0, nextBinStart-bf.startingBlock+1)
for j := bf.startingBlock; j < nextBinStart; j++ {
blockRange = append(blockRange, j)
}
bf.startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
processingDone := make(chan bool)
// for each block range bin spin up a goroutine to batch fetch and process state diffs for that range
go func() {
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber {
// this blocks until a process signals it has finished
// immediately forwards the signal to the normal listener so that it keeps the correct count
processingDone <- <-processingDone
}
go func(blockHeights []uint64) {
payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil {
errChan <- fetchErr
}
for _, payload := range payloads {
stateDiff := new(statediff.StateDiff)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr
continue
}
accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage)))
for _, storage := range account.Storage {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil {
errChan <- formatErr
continue
}
logrus.Trace("adding storage diff to results",
"keccak of address: ", diff.HashedAddress.Hex(),
"block height: ", diff.BlockHeight,
"storage key: ", diff.StorageKey.Hex(),
"storage value: ", diff.StorageValue.Hex())
backFill <- diff
}
}
}
// when this goroutine is done, send out a signal
processingDone <- true
}(blockHeights)
}
}()
// goroutine that listens on the processingDone chan
// keeps track of the number of processing goroutines that have finished
// when they have all finished, sends the final signal out
go func() {
goroutinesFinished := 0
for {
select {
case <-processingDone:
atomic.AddInt64(&activeCount, -1)
goroutinesFinished++
if goroutinesFinished == int(numberOfBins) {
done <- true
return
}
}
}
}()
return nil
}

View File

@ -0,0 +1,232 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage_test
import (
"errors"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
)
var _ = Describe("BackFiller", func() {
Describe("BackFill", func() {
var (
mockFetcher *mocks.StateDiffFetcher
backFiller storage.BackFiller
)
BeforeEach(func() {
mockFetcher = new(mocks.StateDiffFetcher)
mockFetcher.PayloadsToReturn = map[uint64]statediff.Payload{
test_data.BlockNumber.Uint64(): test_data.MockStatediffPayload,
test_data.BlockNumber2.Uint64(): test_data.MockStatediffPayload2,
}
})
It("batch calls statediff_stateDiffAt", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 100)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(1)))
Expect(len(diffs)).To(Equal(4))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
})
It("has a configurable batch size", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(len(diffs)).To(Equal(4))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
})
It("handles bin numbers in excess of the goroutine limit (100)", func() {
payloadsToReturn := make(map[uint64]statediff.Payload, 1001)
for i := test_data.BlockNumber.Uint64(); i <= test_data.BlockNumber.Uint64()+1000; i++ {
payloadsToReturn[i] = test_data.MockStatediffPayload
}
mockFetcher.PayloadsToReturn = payloadsToReturn
// batch size of 2 with 1001 block range => 501 bins
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 2)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber.Uint64()+1000,
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).ToNot(HaveOccurred())
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(501)))
Expect(len(diffs)).To(Equal(3003))
Expect(containsDiff(diffs, test_data.CreatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff)).To(BeTrue())
Expect(containsDiff(diffs, test_data.DeletedExpectedStorageDiff)).To(BeTrue())
})
It("passes fetcher errors forward", func() {
mockFetcher.FetchErrs = map[uint64]error{
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
}
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFill := make(chan utils.StorageDiff)
done := make(chan bool)
errChan := make(chan error)
backFillInitErr := backFiller.BackFill(
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
var numOfErrs int
var diffs []utils.StorageDiff
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("mock fetcher error"))
numOfErrs++
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(numOfErrs).To(Equal(1))
Expect(len(diffs)).To(Equal(1))
Expect(containsDiff(diffs, test_data.UpdatedExpectedStorageDiff2)).To(BeTrue())
mockFetcher.FetchErrs = map[uint64]error{
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
test_data.BlockNumber2.Uint64(): errors.New("mock fetcher error"),
}
mockFetcher.CalledTimes = 0
backFiller = storage.NewStorageBackFiller(mockFetcher, test_data.BlockNumber.Uint64(), 1)
backFill = make(chan utils.StorageDiff)
done = make(chan bool)
errChan = make(chan error)
backFillInitErr = backFiller.BackFill(
test_data.BlockNumber2.Uint64(),
backFill,
errChan,
done)
Expect(backFillInitErr).ToNot(HaveOccurred())
numOfErrs = 0
diffs = []utils.StorageDiff{}
for {
select {
case diff := <-backFill:
diffs = append(diffs, diff)
continue
case err := <-errChan:
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("mock fetcher error"))
numOfErrs++
continue
case <-done:
break
}
break
}
Expect(mockFetcher.CalledTimes).To(Equal(int64(2)))
Expect(numOfErrs).To(Equal(2))
Expect(len(diffs)).To(Equal(0))
})
})
})
func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool {
for _, d := range diffs {
if d == diff {
return true
}
}
return false
}

View File

@ -1,16 +1,18 @@
// Copyright 2019 Vulcanize // VulcanizeDB
// // Copyright © 2019 Vulcanize
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // This program is free software: you can redistribute it and/or modify
// You may obtain a copy of the License at // it under the terms of the GNU Affero General Public License as published by
// // the Free Software Foundation, either version 3 of the License, or
// http://www.apache.org/licenses/LICENSE-2.0 // (at your option) any later version.
//
// Unless required by applicable law or agreed to in writing, software // This program is distributed in the hope that it will be useful,
// distributed under the License is distributed on an "AS IS" BASIS, // but WITHOUT ANY WARRANTY; without even the implied warranty of
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// See the License for the specific language governing permissions and // GNU Affero General Public License for more details.
// limitations under the License.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package streamer package streamer
@ -18,24 +20,29 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
// Streamer is the interface for streaming a statediff subscription
type Streamer interface { type Streamer interface {
Stream(chan statediff.Payload) (*rpc.ClientSubscription, error) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
} }
// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
type StateDiffStreamer struct { type StateDiffStreamer struct {
client core.RPCClient Client core.RPCClient
} }
func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { // NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the IStateDiffStreamer interface
logrus.Info("streaming diffs from geth") func NewStateDiffStreamer(client core.RPCClient) Streamer {
return streamer.client.Subscribe("statediff", payloadChan, "stream") return &StateDiffStreamer{
} Client: client,
func NewStateDiffStreamer(client core.RPCClient) StateDiffStreamer {
return StateDiffStreamer{
client: client,
} }
} }
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
logrus.Info("streaming diffs from geth")
return sds.Client.Subscribe("statediff", payloadChan, "stream")
}

View File

@ -151,33 +151,33 @@ var (
} }
CreatedExpectedStorageDiff = utils.StorageDiff{ CreatedExpectedStorageDiff = utils.StorageDiff{
Id: 1333, Id: 0,
HashedAddress: common.BytesToHash(ContractLeafKey[:]), HashedAddress: common.BytesToHash(ContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
UpdatedExpectedStorageDiff = utils.StorageDiff{ UpdatedExpectedStorageDiff = utils.StorageDiff{
Id: 1334, Id: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(LargeStorageValue), StorageValue: common.BytesToHash(LargeStorageValue),
} }
UpdatedExpectedStorageDiff2 = utils.StorageDiff{ UpdatedExpectedStorageDiff2 = utils.StorageDiff{
Id: 1335, Id: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash(BlockHash2),
BlockHeight: int(BlockNumber2.Int64()), BlockHeight: int(BlockNumber2.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
DeletedExpectedStorageDiff = utils.StorageDiff{ DeletedExpectedStorageDiff = utils.StorageDiff{
Id: 1336, Id: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),

View File

@ -20,6 +20,7 @@ import (
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/chunker" "github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"

View File

@ -32,8 +32,8 @@ import (
type IStorageWatcher interface { type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer) AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(queueRecheckInterval time.Duration, backFill bool) Execute(queueRecheckInterval time.Duration, backFillOn bool)
BackFill(backFiller storage.BackFiller, minDeploymentBlock int) BackFill(backFiller storage.BackFiller)
} }
type StorageWatcher struct { type StorageWatcher struct {
@ -43,65 +43,67 @@ type StorageWatcher struct {
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
DiffsChan chan utils.StorageDiff DiffsChan chan utils.StorageDiff
ErrsChan chan error ErrsChan chan error
BackFillDoneChan chan bool
StartingSyncBlockChan chan int StartingSyncBlockChan chan int
} }
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher {
queue := storage.NewStorageQueue(db) queue := storage.NewStorageQueue(db)
transformers := make(map[common.Hash]transformer.StorageTransformer) transformers := make(map[common.Hash]transformer.StorageTransformer)
return StorageWatcher{ return &StorageWatcher{
db: db, db: db,
StorageFetcher: fetcher, StorageFetcher: f,
DiffsChan: make(chan utils.StorageDiff), DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize),
ErrsChan: make(chan error), ErrsChan: make(chan error),
StartingSyncBlockChan: make(chan int), StartingSyncBlockChan: make(chan int),
BackFillDoneChan: make(chan bool),
Queue: queue, Queue: queue,
KeccakAddressTransformers: transformers, KeccakAddressTransformers: transformers,
} }
} }
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { func (storageWatcher *StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
for _, initializer := range initializers { for _, initializer := range initializers {
storageTransformer := initializer(storageWatcher.db) storageTransformer := initializer(storageWatcher.db)
storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer
} }
} }
// BackFill configures the StorageWatcher to backfill missed storage diffs using a modified archival geth client // BackFill uses a backFiller to backfill missing storage diffs for the storageWatcher
func (storageWatcher StorageWatcher) BackFill(backFiller storage.BackFiller, minDeploymentBlock int) { func (storageWatcher *StorageWatcher) BackFill(backFiller storage.BackFiller) {
// need to learn which block to perform the backfill process up to
// this blocks until the Execute process sends us the first block number it sees // this blocks until the Execute process sends us the first block number it sees
startingSyncBlock := <-storageWatcher.StartingSyncBlockChan startingSyncBlock := <-storageWatcher.StartingSyncBlockChan
backFilledDiffs, err := backFiller.BackFill(uint64(minDeploymentBlock), uint64(startingSyncBlock)) backFillInitErr := backFiller.BackFill(uint64(startingSyncBlock),
if err != nil { storageWatcher.DiffsChan, storageWatcher.ErrsChan, storageWatcher.BackFillDoneChan)
storageWatcher.ErrsChan <- err if backFillInitErr != nil {
} logrus.Warn(backFillInitErr)
for _, storageDiff := range backFilledDiffs {
storageWatcher.DiffsChan <- storageDiff
} }
} }
func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration, backFill bool) { // Execute runs the StorageWatcher processes
func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration, backFillOn bool) {
ticker := time.NewTicker(queueRecheckInterval) ticker := time.NewTicker(queueRecheckInterval)
go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan) go storageWatcher.StorageFetcher.FetchStorageDiffs(storageWatcher.DiffsChan, storageWatcher.ErrsChan)
start := true start := true
for { for {
select { select {
case fetchErr := <-storageWatcher.ErrsChan: case fetchErr := <-storageWatcher.ErrsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error()))
case diff := <-storageWatcher.DiffsChan: case diff := <-storageWatcher.DiffsChan:
if start && backFill { if start && backFillOn {
storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1 storageWatcher.StartingSyncBlockChan <- diff.BlockHeight - 1
start = false start = false
} }
storageWatcher.processRow(diff) storageWatcher.processRow(diff)
case <-ticker.C: case <-ticker.C:
storageWatcher.processQueue() storageWatcher.processQueue()
case <-storageWatcher.BackFillDoneChan:
logrus.Info("storage watcher backfill process has finished")
} }
} }
} }
func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress]
return storageTransformer, ok return storageTransformer, ok
} }

View File

@ -17,12 +17,12 @@
package watcher_test package watcher_test
import ( import (
"errors"
"io/ioutil" "io/ioutil"
"os" "os"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -41,7 +41,7 @@ var _ = Describe("Storage Watcher", func() {
It("adds transformers", func() { It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress}
w := watcher.NewStorageWatcher(mocks.NewClosingStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) w := watcher.NewStorageWatcher(mocks.NewStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
@ -51,17 +51,17 @@ var _ = Describe("Storage Watcher", func() {
Describe("Execute", func() { Describe("Execute", func() {
var ( var (
mockFetcher *mocks.ClosingStorageFetcher mockFetcher *mocks.StorageFetcher
mockQueue *mocks.MockStorageQueue mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff csvDiff utils.StorageDiff
storageWatcher watcher.StorageWatcher storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash hashedAddress common.Hash
) )
BeforeEach(func() { BeforeEach(func() {
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewClosingStorageFetcher() mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{ csvDiff = utils.StorageDiff{
@ -115,7 +115,6 @@ var _ = Describe("Storage Watcher", func() {
go storageWatcher.Execute(time.Hour, false) go storageWatcher.Execute(time.Hour, false)
Expect(<-storageWatcher.ErrsChan).To(BeNil())
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
}).Should(BeTrue()) }).Should(BeTrue())
@ -240,26 +239,26 @@ var _ = Describe("Storage Watcher", func() {
Describe("BackFill", func() { Describe("BackFill", func() {
var ( var (
mockFetcher *mocks.StorageFetcher mockFetcher *mocks.StorageFetcher
mockBackFiller *mocks.BackFiller mockBackFiller *mocks.BackFiller
mockQueue *mocks.MockStorageQueue mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff mockTransformer2 *mocks.MockStorageTransformer
storageWatcher watcher.StorageWatcher mockTransformer3 *mocks.MockStorageTransformer
hashedAddress common.Hash csvDiff utils.StorageDiff
storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash
) )
BeforeEach(func() { BeforeEach(func() {
mockBackFiller = new(mocks.BackFiller) mockBackFiller = new(mocks.BackFiller)
mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ mockBackFiller.StartingBlock = test_data.BlockNumber.Uint64()
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff})
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher() mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])}
mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])}
csvDiff = utils.StorageDiff{ csvDiff = utils.StorageDiff{
ID: 1337, ID: 1337,
HashedAddress: hashedAddress, HashedAddress: hashedAddress,
@ -273,86 +272,189 @@ var _ = Describe("Storage Watcher", func() {
Describe("transforming streamed and backfilled queued storage diffs", func() { Describe("transforming streamed and backfilled queued storage diffs", func() {
BeforeEach(func() { BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff}
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff, mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{
test_data.CreatedExpectedStorageDiff, test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff, test_data.UpdatedExpectedStorageDiff,
test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2, test_data.UpdatedExpectedStorageDiff2,
test_data.DeletedExpectedStorageDiff} })
mockQueue.DiffsToReturn = []utils.StorageDiff{}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
mockTransformer.FakeTransformerInitializer,
mockTransformer2.FakeTransformerInitializer,
mockTransformer3.FakeTransformerInitializer,
})
}) })
It("executes transformer for storage diffs", func(done Done) { It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) {
go storageWatcher.BackFill(mockBackFiller, int(test_data.BlockNumber.Uint64())) go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.Execute(time.Nanosecond, true) go storageWatcher.Execute(time.Hour, true)
expectedDiffsStruct := struct {
diffs []utils.StorageDiff Eventually(func() int {
}{ return len(mockTransformer.PassedDiffs)
[]utils.StorageDiff{ }).Should(Equal(1))
csvDiff,
test_data.CreatedExpectedStorageDiff, Eventually(func() int {
test_data.UpdatedExpectedStorageDiff, return len(mockTransformer2.PassedDiffs)
test_data.UpdatedExpectedStorageDiff2, }).Should(Equal(1))
test_data.DeletedExpectedStorageDiff,
}, Eventually(func() int {
} return len(mockTransformer3.PassedDiffs)
expectedDiffsBytes, rlpErr1 := rlp.EncodeToBytes(expectedDiffsStruct) }).Should(Equal(3))
Expect(rlpErr1).ToNot(HaveOccurred()) Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Eventually(func() []byte { Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
diffsStruct := struct { Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
diffs []utils.StorageDiff Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
}{ Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
mockTransformer.PassedDiffs, Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
}
diffsBytes, rlpErr2 := rlp.EncodeToBytes(diffsStruct)
Expect(rlpErr2).ToNot(HaveOccurred())
return diffsBytes
}).Should(Equal(expectedDiffsBytes))
close(done) close(done)
}) })
It("deletes diffs from queue if transformer execution is successful", func(done Done) { It("adds diffs to the queue if transformation fails", func(done Done) {
go storageWatcher.Execute(time.Nanosecond, false) mockTransformer3.ExecuteErr = fakes.FakeError
expectedIdsStruct := struct { go storageWatcher.BackFill(mockBackFiller)
diffs []int go storageWatcher.Execute(time.Hour, true)
}{
[]int{ Eventually(func() int {
csvDiff.ID, return len(mockTransformer.PassedDiffs)
test_data.CreatedExpectedStorageDiff.ID, }).Should(Equal(1))
test_data.UpdatedExpectedStorageDiff.ID, Eventually(func() int {
test_data.UpdatedExpectedStorageDiff2.ID, return len(mockTransformer2.PassedDiffs)
test_data.DeletedExpectedStorageDiff.ID, }).Should(Equal(1))
}, Eventually(func() int {
} return len(mockTransformer3.PassedDiffs)
expectedIdsBytes, rlpErr1 := rlp.EncodeToBytes(expectedIdsStruct) }).Should(Equal(3))
Expect(rlpErr1).ToNot(HaveOccurred())
Eventually(func() []byte { Eventually(func() bool {
idsStruct := struct { return mockQueue.AddCalled
diffs []int }).Should(BeTrue())
}{ Eventually(func() []utils.StorageDiff {
mockQueue.DeletePassedIds, if len(mockQueue.AddPassedDiffs) > 2 {
return mockQueue.AddPassedDiffs
} }
idsBytes, rlpErr2 := rlp.EncodeToBytes(idsStruct) return []utils.StorageDiff{}
Expect(rlpErr2).ToNot(HaveOccurred()) }).Should(Equal([]utils.StorageDiff{
return idsBytes test_data.UpdatedExpectedStorageDiff,
}).Should(Equal(expectedIdsBytes)) test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
}))
Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
close(done) close(done)
}) })
It("logs error if deleting persisted diff fails", func(done Done) { It("logs a backfill error", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred()) Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile) logrus.SetOutput(tempFile)
go storageWatcher.Execute(time.Nanosecond, false) mockBackFiller.BackFillErrs = []error{
nil,
nil,
nil,
errors.New("mock backfiller error"),
}
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(2))
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring("mock backfiller error"))
close(done)
})
It("logs when backfill finishes", func(done Done) {
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.Execute(time.Hour, true)
Eventually(func() (string, error) { Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name()) logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error())) }).Should(ContainSubstring("storage watcher backfill process has finished"))
close(done)
})
})
Describe("transforms storage diffs", func() {
BeforeEach(func() {
test_data.CreatedExpectedStorageDiff.ID = 1334
test_data.UpdatedExpectedStorageDiff.ID = 1335
test_data.DeletedExpectedStorageDiff.ID = 1336
test_data.UpdatedExpectedStorageDiff2.ID = 1337
mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff,
test_data.CreatedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff,
test_data.DeletedExpectedStorageDiff,
test_data.UpdatedExpectedStorageDiff2,
}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
mockTransformer.FakeTransformerInitializer,
mockTransformer2.FakeTransformerInitializer,
mockTransformer3.FakeTransformerInitializer,
})
})
It("executes transformers on queued storage diffs", func(done Done) {
go storageWatcher.BackFill(mockBackFiller)
go storageWatcher.Execute(time.Nanosecond, true)
Eventually(func() int {
return len(mockTransformer.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1))
Eventually(func() int {
return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3))
Eventually(func() bool {
return mockQueue.GetAllCalled
}).Should(BeTrue())
Eventually(func() []int {
if len(mockQueue.DeletePassedIds) > 4 {
return mockQueue.DeletePassedIds
}
return []int{}
}).Should(Equal([]int{
csvDiff.ID,
test_data.CreatedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff.ID,
test_data.DeletedExpectedStorageDiff.ID,
test_data.UpdatedExpectedStorageDiff2.ID,
}))
Expect(mockQueue.AddCalled).To(Not(BeTrue()))
Expect(len(mockQueue.DiffsToReturn)).To(Equal(0))
Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvDiff))
Expect(mockTransformer2.PassedDiffs[0]).To(Equal(test_data.CreatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[0]).To(Equal(test_data.UpdatedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[1]).To(Equal(test_data.DeletedExpectedStorageDiff))
Expect(mockTransformer3.PassedDiffs[2]).To(Equal(test_data.UpdatedExpectedStorageDiff2))
close(done) close(done)
}) })
}) })