(VDB-950) Write raw diffs before transforming

- Raw field we can reference by FK for related data
- Enables replay for unwatched or mistransformed diffs
This commit is contained in:
Rob Mulholand 2019-10-31 11:04:53 -05:00 committed by Ian Norden
parent 1178940047
commit 56ce8bdb41
45 changed files with 543 additions and 1815 deletions

View File

@ -1,13 +0,0 @@
-- +goose Up
CREATE TABLE public.queued_storage (
id SERIAL PRIMARY KEY,
block_height BIGINT,
block_hash BYTEA,
contract BYTEA,
storage_key BYTEA,
storage_value BYTEA,
UNIQUE (block_height, block_hash, contract, storage_key, storage_value)
);
-- +goose Down
DROP TABLE public.queued_storage;

View File

@ -0,0 +1,14 @@
-- +goose Up
CREATE TABLE public.storage_diff
(
id SERIAL PRIMARY KEY,
block_height BIGINT,
block_hash BYTEA,
hashed_address BYTEA,
storage_key BYTEA,
storage_value BYTEA,
UNIQUE (block_height, block_hash, hashed_address, storage_key, storage_value)
);
-- +goose Down
DROP TABLE public.storage_diff;

View File

@ -0,0 +1,9 @@
-- +goose Up
CREATE TABLE public.queued_storage
(
id SERIAL PRIMARY KEY,
diff_id BIGINT UNIQUE NOT NULL REFERENCES public.storage_diff (id)
);
-- +goose Down
DROP TABLE public.queued_storage;

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,6 @@ import (
) )
type Repository interface { type Repository interface {
Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error Create(diffID int64, metadata utils.StorageValueMetadata, value interface{}) error
SetDB(db *postgres.DB) SetDB(db *postgres.DB)
} }

View File

@ -39,7 +39,7 @@ func (transformer Transformer) KeccakContractAddress() common.Hash {
return transformer.HashedAddress return transformer.HashedAddress
} }
func (transformer Transformer) Execute(diff utils.StorageDiff) error { func (transformer Transformer) Execute(diff utils.PersistedStorageDiff) error {
metadata, lookupErr := transformer.StorageKeysLookup.Lookup(diff.StorageKey) metadata, lookupErr := transformer.StorageKeysLookup.Lookup(diff.StorageKey)
if lookupErr != nil { if lookupErr != nil {
return lookupErr return lookupErr
@ -48,5 +48,5 @@ func (transformer Transformer) Execute(diff utils.StorageDiff) error {
if decodeErr != nil { if decodeErr != nil {
return decodeErr return decodeErr
} }
return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value) return transformer.Repository.Create(diff.ID, metadata, value)
} }

View File

@ -17,6 +17,8 @@
package storage_test package storage_test
import ( import (
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -51,7 +53,7 @@ var _ = Describe("Storage transformer", func() {
}) })
It("looks up metadata for storage key", func() { It("looks up metadata for storage key", func() {
t.Execute(utils.StorageDiff{}) t.Execute(utils.PersistedStorageDiff{})
Expect(storageKeysLookup.LookupCalled).To(BeTrue()) Expect(storageKeysLookup.LookupCalled).To(BeTrue())
}) })
@ -59,7 +61,7 @@ var _ = Describe("Storage transformer", func() {
It("returns error if lookup fails", func() { It("returns error if lookup fails", func() {
storageKeysLookup.LookupErr = fakes.FakeError storageKeysLookup.LookupErr = fakes.FakeError
err := t.Execute(utils.StorageDiff{}) err := t.Execute(utils.PersistedStorageDiff{})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -71,19 +73,21 @@ var _ = Describe("Storage transformer", func() {
rawValue := common.HexToAddress("0x12345") rawValue := common.HexToAddress("0x12345")
fakeBlockNumber := 123 fakeBlockNumber := 123
fakeBlockHash := "0x67890" fakeBlockHash := "0x67890"
fakeRow := utils.StorageDiff{ fakeRow := utils.PersistedStorageDiff{
HashedAddress: common.Hash{}, ID: rand.Int63(),
BlockHash: common.HexToHash(fakeBlockHash), StorageDiffInput: utils.StorageDiffInput{
BlockHeight: fakeBlockNumber, HashedAddress: common.Hash{},
StorageKey: common.Hash{}, BlockHash: common.HexToHash(fakeBlockHash),
StorageValue: rawValue.Hash(), BlockHeight: fakeBlockNumber,
StorageKey: common.Hash{},
StorageValue: rawValue.Hash(),
},
} }
err := t.Execute(fakeRow) err := t.Execute(fakeRow)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber)) Expect(repository.PassedDiffID).To(Equal(fakeRow.ID))
Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex()))
Expect(repository.PassedMetadata).To(Equal(fakeMetadata)) Expect(repository.PassedMetadata).To(Equal(fakeMetadata))
Expect(repository.PassedValue.(string)).To(Equal(rawValue.Hex())) Expect(repository.PassedValue.(string)).To(Equal(rawValue.Hex()))
}) })
@ -93,8 +97,9 @@ var _ = Describe("Storage transformer", func() {
fakeMetadata := utils.StorageValueMetadata{Type: utils.Address} fakeMetadata := utils.StorageValueMetadata{Type: utils.Address}
storageKeysLookup.Metadata = fakeMetadata storageKeysLookup.Metadata = fakeMetadata
repository.CreateErr = fakes.FakeError repository.CreateErr = fakes.FakeError
diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: rawValue.Hash()}}
err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) err := t.Execute(diff)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
@ -119,19 +124,21 @@ var _ = Describe("Storage transformer", func() {
It("passes the decoded data items to the repository", func() { It("passes the decoded data items to the repository", func() {
storageKeysLookup.Metadata = fakeMetadata storageKeysLookup.Metadata = fakeMetadata
fakeRow := utils.StorageDiff{ fakeRow := utils.PersistedStorageDiff{
HashedAddress: common.Hash{}, ID: rand.Int63(),
BlockHash: common.HexToHash(fakeBlockHash), StorageDiffInput: utils.StorageDiffInput{
BlockHeight: fakeBlockNumber, HashedAddress: common.Hash{},
StorageKey: common.Hash{}, BlockHash: common.HexToHash(fakeBlockHash),
StorageValue: rawValue.Hash(), BlockHeight: fakeBlockNumber,
StorageKey: common.Hash{},
StorageValue: rawValue.Hash(),
},
} }
err := t.Execute(fakeRow) err := t.Execute(fakeRow)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber)) Expect(repository.PassedDiffID).To(Equal(fakeRow.ID))
Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex()))
Expect(repository.PassedMetadata).To(Equal(fakeMetadata)) Expect(repository.PassedMetadata).To(Equal(fakeMetadata))
expectedPassedValue := make(map[int]string) expectedPassedValue := make(map[int]string)
expectedPassedValue[0] = "10800" expectedPassedValue[0] = "10800"
@ -142,8 +149,9 @@ var _ = Describe("Storage transformer", func() {
It("returns error if creating a row fails", func() { It("returns error if creating a row fails", func() {
storageKeysLookup.Metadata = fakeMetadata storageKeysLookup.Metadata = fakeMetadata
repository.CreateErr = fakes.FakeError repository.CreateErr = fakes.FakeError
diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: rawValue.Hash()}}
err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) err := t.Execute(diff)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))

View File

@ -32,7 +32,7 @@ func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher {
return CsvTailStorageFetcher{tailer: tailer} return CsvTailStorageFetcher{tailer: tailer}
} }
func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
t, tailErr := storageFetcher.tailer.Tail() t, tailErr := storageFetcher.tailer.Tail()
if tailErr != nil { if tailErr != nil {
errs <- tailErr errs <- tailErr

View File

@ -34,13 +34,13 @@ var _ = Describe("Csv Tail Storage Fetcher", func() {
var ( var (
errorsChannel chan error errorsChannel chan error
mockTailer *fakes.MockTailer mockTailer *fakes.MockTailer
diffsChannel chan utils.StorageDiff diffsChannel chan utils.StorageDiffInput
storageFetcher fetcher.CsvTailStorageFetcher storageFetcher fetcher.CsvTailStorageFetcher
) )
BeforeEach(func() { BeforeEach(func() {
errorsChannel = make(chan error) errorsChannel = make(chan error)
diffsChannel = make(chan utils.StorageDiff) diffsChannel = make(chan utils.StorageDiffInput)
mockTailer = fakes.NewMockTailer() mockTailer = fakes.NewMockTailer()
storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer)
}) })

View File

@ -43,7 +43,7 @@ func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher
} }
} }
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
ethStatediffPayloadChan := fetcher.StatediffPayloadChan ethStatediffPayloadChan := fetcher.StatediffPayloadChan
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan)
if clientSubErr != nil { if clientSubErr != nil {

View File

@ -60,13 +60,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa
var _ = Describe("Geth RPC Storage Fetcher", func() { var _ = Describe("Geth RPC Storage Fetcher", func() {
var streamer MockStoragediffStreamer var streamer MockStoragediffStreamer
var statediffFetcher fetcher.GethRPCStorageFetcher var statediffFetcher fetcher.GethRPCStorageFetcher
var storagediffChan chan utils.StorageDiff var storagediffChan chan utils.StorageDiffInput
var errorChan chan error var errorChan chan error
BeforeEach(func() { BeforeEach(func() {
streamer = MockStoragediffStreamer{} streamer = MockStoragediffStreamer{}
statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer) statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer)
storagediffChan = make(chan utils.StorageDiff) storagediffChan = make(chan utils.StorageDiffInput)
errorChan = make(chan error) errorChan = make(chan error)
}) })
@ -113,21 +113,21 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
height := test_data.BlockNumber height := test_data.BlockNumber
intHeight := int(height.Int64()) intHeight := int(height.Int64())
createdExpectedStorageDiff := utils.StorageDiff{ createdExpectedStorageDiff := utils.StorageDiffInput{
HashedAddress: common.BytesToHash(test_data.ContractLeafKey[:]), HashedAddress: common.BytesToHash(test_data.ContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: intHeight, BlockHeight: intHeight,
StorageKey: common.BytesToHash(test_data.StorageKey), StorageKey: common.BytesToHash(test_data.StorageKey),
StorageValue: common.BytesToHash(test_data.SmallStorageValue), StorageValue: common.BytesToHash(test_data.SmallStorageValue),
} }
updatedExpectedStorageDiff := utils.StorageDiff{ updatedExpectedStorageDiff := utils.StorageDiffInput{
HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: intHeight, BlockHeight: intHeight,
StorageKey: common.BytesToHash(test_data.StorageKey), StorageKey: common.BytesToHash(test_data.StorageKey),
StorageValue: common.BytesToHash(test_data.LargeStorageValue), StorageValue: common.BytesToHash(test_data.LargeStorageValue),
} }
deletedExpectedStorageDiff := utils.StorageDiff{ deletedExpectedStorageDiff := utils.StorageDiffInput{
HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: intHeight, BlockHeight: intHeight,

View File

@ -17,5 +17,5 @@ package fetcher
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
type IStorageFetcher interface { type IStorageFetcher interface {
FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error)
} }

View File

@ -24,23 +24,23 @@ import (
// BackFiller mock for tests // BackFiller mock for tests
type BackFiller struct { type BackFiller struct {
StorageDiffsToReturn []utils.StorageDiff StorageDiffsToReturn []utils.StorageDiffInput
BackFillErrs []error BackFillErrs []error
PassedEndingBlock uint64 PassedEndingBlock uint64
} }
// SetStorageDiffsToReturn for tests // SetStorageDiffsToReturn for tests
func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) { func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiffInput) {
backFiller.StorageDiffsToReturn = diffs backFiller.StorageDiffsToReturn = diffs
} }
// BackFill mock method // BackFill mock method
func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error {
if endingBlock < startingBlock { if endingBlock < startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number") return errors.New("backfill: ending block number needs to be greater than starting block number")
} }
backFiller.PassedEndingBlock = endingBlock backFiller.PassedEndingBlock = endingBlock
go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) { go func(backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) {
errLen := len(backFiller.BackFillErrs) errLen := len(backFiller.BackFillErrs)
for i, diff := range backFiller.StorageDiffsToReturn { for i, diff := range backFiller.StorageDiffsToReturn {
if i < errLen { if i < errLen {

View File

@ -22,7 +22,7 @@ import (
// StorageFetcher is a mock fetcher for use in tests with backfilling // StorageFetcher is a mock fetcher for use in tests with backfilling
type StorageFetcher struct { type StorageFetcher struct {
DiffsToReturn []utils.StorageDiff DiffsToReturn []utils.StorageDiffInput
ErrsToReturn []error ErrsToReturn []error
} }
@ -32,7 +32,7 @@ func NewStorageFetcher() *StorageFetcher {
} }
// FetchStorageDiffs mock method // FetchStorageDiffs mock method
func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
for _, err := range fetcher.ErrsToReturn { for _, err := range fetcher.ErrsToReturn {
errs <- err errs <- err
} }

View File

@ -24,37 +24,36 @@ import (
type MockStorageQueue struct { type MockStorageQueue struct {
AddCalled bool AddCalled bool
AddError error AddError error
AddPassedDiffs map[int]utils.StorageDiff AddPassedDiffs []utils.PersistedStorageDiff
DeleteErr error DeleteErr error
DeletePassedIds []int DeletePassedIds []int64
GetAllErr error GetAllErr error
DiffsToReturn map[int]utils.StorageDiff DiffsToReturn []utils.PersistedStorageDiff
GetAllCalled bool GetAllCalled bool
} }
// Add mock method // Add mock method
func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { func (queue *MockStorageQueue) Add(diff utils.PersistedStorageDiff) error {
queue.AddCalled = true queue.AddCalled = true
if queue.AddPassedDiffs == nil { queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff)
queue.AddPassedDiffs = make(map[int]utils.StorageDiff)
}
queue.AddPassedDiffs[diff.ID] = diff
return queue.AddError return queue.AddError
} }
// Delete mock method // Delete mock method
func (queue *MockStorageQueue) Delete(id int) error { func (queue *MockStorageQueue) Delete(id int64) error {
queue.DeletePassedIds = append(queue.DeletePassedIds, id) queue.DeletePassedIds = append(queue.DeletePassedIds, id)
delete(queue.DiffsToReturn, id) var diffs []utils.PersistedStorageDiff
for _, diff := range queue.DiffsToReturn {
if diff.ID != id {
diffs = append(diffs, diff)
}
}
queue.DiffsToReturn = diffs
return queue.DeleteErr return queue.DeleteErr
} }
// GetAll mock method // GetAll mock method
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { func (queue *MockStorageQueue) GetAll() ([]utils.PersistedStorageDiff, error) {
queue.GetAllCalled = true queue.GetAllCalled = true
diffs := make([]utils.StorageDiff, 0) return queue.DiffsToReturn, queue.GetAllErr
for _, diff := range queue.DiffsToReturn {
diffs = append(diffs, diff)
}
return diffs, queue.GetAllErr
} }

View File

@ -22,16 +22,14 @@ import (
) )
type MockStorageRepository struct { type MockStorageRepository struct {
CreateErr error CreateErr error
PassedBlockNumber int PassedDiffID int64
PassedBlockHash string PassedMetadata utils.StorageValueMetadata
PassedMetadata utils.StorageValueMetadata PassedValue interface{}
PassedValue interface{}
} }
func (repository *MockStorageRepository) Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error { func (repository *MockStorageRepository) Create(diffID int64, metadata utils.StorageValueMetadata, value interface{}) error {
repository.PassedBlockNumber = blockNumber repository.PassedDiffID = diffID
repository.PassedBlockHash = blockHash
repository.PassedMetadata = metadata repository.PassedMetadata = metadata
repository.PassedValue = value repository.PassedValue = value
return repository.CreateErr return repository.CreateErr

View File

@ -28,15 +28,12 @@ import (
type MockStorageTransformer struct { type MockStorageTransformer struct {
KeccakOfAddress common.Hash KeccakOfAddress common.Hash
ExecuteErr error ExecuteErr error
PassedDiffs map[int]utils.StorageDiff PassedDiffs []utils.PersistedStorageDiff
} }
// Execute mock method // Execute mock method
func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { func (transformer *MockStorageTransformer) Execute(diff utils.PersistedStorageDiff) error {
if transformer.PassedDiffs == nil { transformer.PassedDiffs = append(transformer.PassedDiffs, diff)
transformer.PassedDiffs = make(map[int]utils.StorageDiff)
}
transformer.PassedDiffs[diff.ID] = diff
return transformer.ExecuteErr return transformer.ExecuteErr
} }

View File

@ -36,7 +36,7 @@ const (
// BackFiller is the backfilling interface // BackFiller is the backfilling interface
type BackFiller interface { type BackFiller interface {
BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error
} }
// backFiller is the backfilling struct // backFiller is the backfilling struct
@ -59,7 +59,7 @@ func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) Ba
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error {
logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
// break the range up into bins of smaller ranges // break the range up into bins of smaller ranges
@ -113,7 +113,7 @@ func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan
return nil return nil
} }
func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) { func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiffInput, errChan chan error, doneChan chan [2]uint64) {
payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil { if fetchErr != nil {
errChan <- fetchErr errChan <- fetchErr

View File

@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
It("batch calls statediff_stateDiffAt", func() { It("batch calls statediff_stateDiffAt", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, 100) backFiller = storage.NewStorageBackFiller(mockFetcher, 100)
backFill := make(chan utils.StorageDiff) backFill := make(chan utils.StorageDiffInput)
done := make(chan bool) done := make(chan bool)
errChan := make(chan error) errChan := make(chan error)
backFillInitErr := backFiller.BackFill( backFillInitErr := backFiller.BackFill(
@ -55,7 +55,7 @@ var _ = Describe("BackFiller", func() {
errChan, errChan,
done) done)
Expect(backFillInitErr).ToNot(HaveOccurred()) Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff var diffs []utils.StorageDiffInput
for { for {
select { select {
case diff := <-backFill: case diff := <-backFill:
@ -79,7 +79,7 @@ var _ = Describe("BackFiller", func() {
It("has a configurable batch size", func() { It("has a configurable batch size", func() {
backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff) backFill := make(chan utils.StorageDiffInput)
done := make(chan bool) done := make(chan bool)
errChan := make(chan error) errChan := make(chan error)
backFillInitErr := backFiller.BackFill( backFillInitErr := backFiller.BackFill(
@ -89,7 +89,7 @@ var _ = Describe("BackFiller", func() {
errChan, errChan,
done) done)
Expect(backFillInitErr).ToNot(HaveOccurred()) Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff var diffs []utils.StorageDiffInput
for { for {
select { select {
case diff := <-backFill: case diff := <-backFill:
@ -119,7 +119,7 @@ var _ = Describe("BackFiller", func() {
mockFetcher.PayloadsToReturn = payloadsToReturn mockFetcher.PayloadsToReturn = payloadsToReturn
// batch size of 2 with 1001 block range => 501 bins // batch size of 2 with 1001 block range => 501 bins
backFiller = storage.NewStorageBackFiller(mockFetcher, 2) backFiller = storage.NewStorageBackFiller(mockFetcher, 2)
backFill := make(chan utils.StorageDiff) backFill := make(chan utils.StorageDiffInput)
done := make(chan bool) done := make(chan bool)
errChan := make(chan error) errChan := make(chan error)
backFillInitErr := backFiller.BackFill( backFillInitErr := backFiller.BackFill(
@ -129,7 +129,7 @@ var _ = Describe("BackFiller", func() {
errChan, errChan,
done) done)
Expect(backFillInitErr).ToNot(HaveOccurred()) Expect(backFillInitErr).ToNot(HaveOccurred())
var diffs []utils.StorageDiff var diffs []utils.StorageDiffInput
for { for {
select { select {
case diff := <-backFill: case diff := <-backFill:
@ -155,7 +155,7 @@ var _ = Describe("BackFiller", func() {
test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"), test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"),
} }
backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill := make(chan utils.StorageDiff) backFill := make(chan utils.StorageDiffInput)
done := make(chan bool) done := make(chan bool)
errChan := make(chan error) errChan := make(chan error)
backFillInitErr := backFiller.BackFill( backFillInitErr := backFiller.BackFill(
@ -166,7 +166,7 @@ var _ = Describe("BackFiller", func() {
done) done)
Expect(backFillInitErr).ToNot(HaveOccurred()) Expect(backFillInitErr).ToNot(HaveOccurred())
var numOfErrs int var numOfErrs int
var diffs []utils.StorageDiff var diffs []utils.StorageDiffInput
for { for {
select { select {
case diff := <-backFill: case diff := <-backFill:
@ -193,7 +193,7 @@ var _ = Describe("BackFiller", func() {
} }
mockFetcher.CalledTimes = 0 mockFetcher.CalledTimes = 0
backFiller = storage.NewStorageBackFiller(mockFetcher, 1) backFiller = storage.NewStorageBackFiller(mockFetcher, 1)
backFill = make(chan utils.StorageDiff) backFill = make(chan utils.StorageDiffInput)
done = make(chan bool) done = make(chan bool)
errChan = make(chan error) errChan = make(chan error)
backFillInitErr = backFiller.BackFill( backFillInitErr = backFiller.BackFill(
@ -204,7 +204,7 @@ var _ = Describe("BackFiller", func() {
done) done)
Expect(backFillInitErr).ToNot(HaveOccurred()) Expect(backFillInitErr).ToNot(HaveOccurred())
numOfErrs = 0 numOfErrs = 0
diffs = []utils.StorageDiff{} diffs = []utils.StorageDiffInput{}
for { for {
select { select {
case diff := <-backFill: case diff := <-backFill:
@ -227,7 +227,7 @@ var _ = Describe("BackFiller", func() {
}) })
}) })
func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool { func containsDiff(diffs []utils.StorageDiffInput, diff utils.StorageDiffInput) bool {
for _, d := range diffs { for _, d := range diffs {
if d == diff { if d == diff {
return true return true

View File

@ -22,9 +22,9 @@ import (
) )
type IStorageQueue interface { type IStorageQueue interface {
Add(diff utils.StorageDiff) error Add(diff utils.PersistedStorageDiff) error
Delete(id int) error Delete(id int64) error
GetAll() ([]utils.StorageDiff, error) GetAll() ([]utils.PersistedStorageDiff, error)
} }
type StorageQueue struct { type StorageQueue struct {
@ -35,21 +35,21 @@ func NewStorageQueue(db *postgres.DB) StorageQueue {
return StorageQueue{db: db} return StorageQueue{db: db}
} }
func (queue StorageQueue) Add(diff utils.StorageDiff) error { func (queue StorageQueue) Add(diff utils.PersistedStorageDiff) error {
_, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract, _, err := queue.db.Exec(`INSERT INTO public.queued_storage (diff_id) VALUES
block_hash, block_height, storage_key, storage_value) VALUES ($1) ON CONFLICT DO NOTHING`, diff.ID)
($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, diff.HashedAddress.Bytes(), diff.BlockHash.Bytes(),
diff.BlockHeight, diff.StorageKey.Bytes(), diff.StorageValue.Bytes())
return err return err
} }
func (queue StorageQueue) Delete(id int) error { func (queue StorageQueue) Delete(diffID int64) error {
_, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE id = $1`, id) _, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE diff_id = $1`, diffID)
return err return err
} }
func (queue StorageQueue) GetAll() ([]utils.StorageDiff, error) { func (queue StorageQueue) GetAll() ([]utils.PersistedStorageDiff, error) {
var result []utils.StorageDiff var result []utils.PersistedStorageDiff
err := queue.db.Select(&result, `SELECT * FROM public.queued_storage`) err := queue.db.Select(&result, `SELECT storage_diff.id, hashed_address, block_height, block_hash, storage_key, storage_value
FROM public.queued_storage
LEFT JOIN public.storage_diff ON queued_storage.diff_id = storage_diff.id`)
return result, err return result, err
} }

View File

@ -23,19 +23,21 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
var _ = Describe("Storage queue", func() { var _ = Describe("Storage queue", func() {
var ( var (
db *postgres.DB db *postgres.DB
diff utils.StorageDiff diff utils.PersistedStorageDiff
queue storage.IStorageQueue diffRepository repositories.StorageDiffRepository
queue storage.IStorageQueue
) )
BeforeEach(func() { BeforeEach(func() {
fakeAddr := "0x123456" fakeAddr := "0x123456"
diff = utils.StorageDiff{ rawDiff := utils.StorageDiffInput{
HashedAddress: utils.HexToKeccak256Hash(fakeAddr), HashedAddress: utils.HexToKeccak256Hash(fakeAddr),
BlockHash: common.HexToHash("0x678901"), BlockHash: common.HexToHash("0x678901"),
BlockHeight: 987, BlockHeight: 987,
@ -44,6 +46,10 @@ var _ = Describe("Storage queue", func() {
} }
db = test_config.NewTestDB(test_config.NewTestNode()) db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
diffRepository = repositories.NewStorageDiffRepository(db)
diffID, insertDiffErr := diffRepository.CreateStorageDiff(rawDiff)
Expect(insertDiffErr).NotTo(HaveOccurred())
diff = utils.ToPersistedDiff(rawDiff, diffID)
queue = storage.NewStorageQueue(db) queue = storage.NewStorageQueue(db)
addErr := queue.Add(diff) addErr := queue.Add(diff)
Expect(addErr).NotTo(HaveOccurred()) Expect(addErr).NotTo(HaveOccurred())
@ -51,8 +57,10 @@ var _ = Describe("Storage queue", func() {
Describe("Add", func() { Describe("Add", func() {
It("adds a storage diff to the db", func() { It("adds a storage diff to the db", func() {
var result utils.StorageDiff var result utils.PersistedStorageDiff
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`) getErr := db.Get(&result, `SELECT storage_diff.id, hashed_address, block_hash, block_height, storage_key, storage_value
FROM public.queued_storage
LEFT JOIN public.storage_diff ON queued_storage.diff_id = storage_diff.id`)
Expect(getErr).NotTo(HaveOccurred()) Expect(getErr).NotTo(HaveOccurred())
Expect(result).To(Equal(diff)) Expect(result).To(Equal(diff))
}) })
@ -82,14 +90,17 @@ var _ = Describe("Storage queue", func() {
It("gets all storage diffs from db", func() { It("gets all storage diffs from db", func() {
fakeAddr := "0x234567" fakeAddr := "0x234567"
diffTwo := utils.StorageDiff{ diffTwo := utils.StorageDiffInput{
HashedAddress: utils.HexToKeccak256Hash(fakeAddr), HashedAddress: utils.HexToKeccak256Hash(fakeAddr),
BlockHash: common.HexToHash("0x678902"), BlockHash: common.HexToHash("0x678902"),
BlockHeight: 988, BlockHeight: 988,
StorageKey: common.HexToHash("0x654322"), StorageKey: common.HexToHash("0x654322"),
StorageValue: common.HexToHash("0x198766"), StorageValue: common.HexToHash("0x198766"),
} }
addErr := queue.Add(diffTwo) persistedDiffTwoID, insertDiffErr := diffRepository.CreateStorageDiff(diffTwo)
Expect(insertDiffErr).NotTo(HaveOccurred())
persistedDiffTwo := utils.ToPersistedDiff(diffTwo, persistedDiffTwoID)
addErr := queue.Add(persistedDiffTwo)
Expect(addErr).NotTo(HaveOccurred()) Expect(addErr).NotTo(HaveOccurred())
diffs, err := queue.GetAll() diffs, err := queue.GetAll()

View File

@ -27,7 +27,7 @@ const (
bitsPerByte = 8 bitsPerByte = 8
) )
func Decode(diff StorageDiff, metadata StorageValueMetadata) (interface{}, error) { func Decode(diff PersistedStorageDiff, metadata StorageValueMetadata) (interface{}, error) {
switch metadata.Type { switch metadata.Type {
case Uint256: case Uint256:
return decodeInteger(diff.StorageValue.Bytes()), nil return decodeInteger(diff.StorageValue.Bytes()), nil

View File

@ -29,7 +29,7 @@ import (
var _ = Describe("Storage decoder", func() { var _ = Describe("Storage decoder", func() {
It("decodes uint256", func() { It("decodes uint256", func() {
fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000539") fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000539")
diff := utils.StorageDiff{StorageValue: fakeInt} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}}
metadata := utils.StorageValueMetadata{Type: utils.Uint256} metadata := utils.StorageValueMetadata{Type: utils.Uint256}
result, err := utils.Decode(diff, metadata) result, err := utils.Decode(diff, metadata)
@ -40,7 +40,7 @@ var _ = Describe("Storage decoder", func() {
It("decodes uint128", func() { It("decodes uint128", func() {
fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000011123") fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000011123")
diff := utils.StorageDiff{StorageValue: fakeInt} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}}
metadata := utils.StorageValueMetadata{Type: utils.Uint128} metadata := utils.StorageValueMetadata{Type: utils.Uint128}
result, err := utils.Decode(diff, metadata) result, err := utils.Decode(diff, metadata)
@ -51,7 +51,7 @@ var _ = Describe("Storage decoder", func() {
It("decodes uint48", func() { It("decodes uint48", func() {
fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000123") fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000123")
diff := utils.StorageDiff{StorageValue: fakeInt} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}}
metadata := utils.StorageValueMetadata{Type: utils.Uint48} metadata := utils.StorageValueMetadata{Type: utils.Uint48}
result, err := utils.Decode(diff, metadata) result, err := utils.Decode(diff, metadata)
@ -62,7 +62,7 @@ var _ = Describe("Storage decoder", func() {
It("decodes address", func() { It("decodes address", func() {
fakeAddress := common.HexToAddress("0x12345") fakeAddress := common.HexToAddress("0x12345")
diff := utils.StorageDiff{StorageValue: fakeAddress.Hash()} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeAddress.Hash()}}
metadata := utils.StorageValueMetadata{Type: utils.Address} metadata := utils.StorageValueMetadata{Type: utils.Address}
result, err := utils.Decode(diff, metadata) result, err := utils.Decode(diff, metadata)
@ -75,7 +75,7 @@ var _ = Describe("Storage decoder", func() {
It("decodes uint48 items", func() { It("decodes uint48 items", func() {
//this is a real storage data example //this is a real storage data example
packedStorage := common.HexToHash("000000000000000000000000000000000000000000000002a300000000002a30") packedStorage := common.HexToHash("000000000000000000000000000000000000000000000002a300000000002a30")
diff := utils.StorageDiff{StorageValue: packedStorage} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}}
packedTypes := map[int]utils.ValueType{} packedTypes := map[int]utils.ValueType{}
packedTypes[0] = utils.Uint48 packedTypes[0] = utils.Uint48
packedTypes[1] = utils.Uint48 packedTypes[1] = utils.Uint48
@ -99,7 +99,7 @@ var _ = Describe("Storage decoder", func() {
packedStorageHex := "0000000A5D1AFFFFFFFFFFFE00000009F3C600000002A300000000002A30" packedStorageHex := "0000000A5D1AFFFFFFFFFFFE00000009F3C600000002A300000000002A30"
packedStorage := common.HexToHash(packedStorageHex) packedStorage := common.HexToHash(packedStorageHex)
diff := utils.StorageDiff{StorageValue: packedStorage} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}}
packedTypes := map[int]utils.ValueType{} packedTypes := map[int]utils.ValueType{}
packedTypes[0] = utils.Uint48 packedTypes[0] = utils.Uint48
packedTypes[1] = utils.Uint48 packedTypes[1] = utils.Uint48
@ -129,7 +129,7 @@ var _ = Describe("Storage decoder", func() {
packedStorageHex := "000000038D7EA4C67FF8E502B6730000" + packedStorageHex := "000000038D7EA4C67FF8E502B6730000" +
"0000000000000000AB54A98CEB1F0AD2" "0000000000000000AB54A98CEB1F0AD2"
packedStorage := common.HexToHash(packedStorageHex) packedStorage := common.HexToHash(packedStorageHex)
diff := utils.StorageDiff{StorageValue: packedStorage} diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}}
packedTypes := map[int]utils.ValueType{} packedTypes := map[int]utils.ValueType{}
packedTypes[0] = utils.Uint128 packedTypes[0] = utils.Uint128
packedTypes[1] = utils.Uint128 packedTypes[1] = utils.Uint128
@ -151,7 +151,7 @@ var _ = Describe("Storage decoder", func() {
//TODO: replace with real data when available //TODO: replace with real data when available
addressHex := "0000000000000000000000000000000000012345" addressHex := "0000000000000000000000000000000000012345"
packedStorage := common.HexToHash("00000002a300" + "000000002a30" + addressHex) packedStorage := common.HexToHash("00000002a300" + "000000002a30" + addressHex)
row := utils.StorageDiff{StorageValue: packedStorage} row := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}}
packedTypes := map[int]utils.ValueType{} packedTypes := map[int]utils.ValueType{}
packedTypes[0] = utils.Address packedTypes[0] = utils.Address
packedTypes[1] = utils.Uint48 packedTypes[1] = utils.Uint48

View File

@ -27,24 +27,28 @@ import (
const ExpectedRowLength = 5 const ExpectedRowLength = 5
type StorageDiff struct { type StorageDiffInput struct {
ID int HashedAddress common.Hash `db:"hashed_address"`
HashedAddress common.Hash `db:"contract"`
BlockHash common.Hash `db:"block_hash"` BlockHash common.Hash `db:"block_hash"`
BlockHeight int `db:"block_height"` BlockHeight int `db:"block_height"`
StorageKey common.Hash `db:"storage_key"` StorageKey common.Hash `db:"storage_key"`
StorageValue common.Hash `db:"storage_value"` StorageValue common.Hash `db:"storage_value"`
} }
func FromParityCsvRow(csvRow []string) (StorageDiff, error) { type PersistedStorageDiff struct {
StorageDiffInput
ID int64
}
func FromParityCsvRow(csvRow []string) (StorageDiffInput, error) {
if len(csvRow) != ExpectedRowLength { if len(csvRow) != ExpectedRowLength {
return StorageDiff{}, ErrRowMalformed{Length: len(csvRow)} return StorageDiffInput{}, ErrRowMalformed{Length: len(csvRow)}
} }
height, err := strconv.Atoi(csvRow[2]) height, err := strconv.Atoi(csvRow[2])
if err != nil { if err != nil {
return StorageDiff{}, err return StorageDiffInput{}, err
} }
return StorageDiff{ return StorageDiffInput{
HashedAddress: HexToKeccak256Hash(csvRow[0]), HashedAddress: HexToKeccak256Hash(csvRow[0]),
BlockHash: common.HexToHash(csvRow[1]), BlockHash: common.HexToHash(csvRow[1]),
BlockHeight: height, BlockHeight: height,
@ -53,14 +57,14 @@ func FromParityCsvRow(csvRow []string) (StorageDiff, error) {
}, nil }, nil
} }
func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiff, error) { func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiffInput, error) {
var decodedValue []byte var decodedValue []byte
err := rlp.DecodeBytes(storage.Value, &decodedValue) err := rlp.DecodeBytes(storage.Value, &decodedValue)
if err != nil { if err != nil {
return StorageDiff{}, err return StorageDiffInput{}, err
} }
return StorageDiff{ return StorageDiffInput{
HashedAddress: common.BytesToHash(account.Key), HashedAddress: common.BytesToHash(account.Key),
BlockHash: stateDiff.BlockHash, BlockHash: stateDiff.BlockHash,
BlockHeight: int(stateDiff.BlockNumber.Int64()), BlockHeight: int(stateDiff.BlockNumber.Int64()),
@ -69,6 +73,13 @@ func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.State
}, nil }, nil
} }
func ToPersistedDiff(raw StorageDiffInput, id int64) PersistedStorageDiff {
return PersistedStorageDiff{
StorageDiffInput: raw,
ID: id,
}
}
func HexToKeccak256Hash(hex string) common.Hash { func HexToKeccak256Hash(hex string) common.Hash {
return crypto.Keccak256Hash(common.FromHex(hex)) return crypto.Keccak256Hash(common.FromHex(hex))
} }

View File

@ -33,12 +33,12 @@ var topic0 = "0x" + randomString(64)
var GenericTestLog = func() types.Log { var GenericTestLog = func() types.Log {
return types.Log{ return types.Log{
Address: fakeAddress(), Address: fakeAddress(),
Topics: []common.Hash{common.HexToHash(topic0), fakeHash()}, Topics: []common.Hash{common.HexToHash(topic0), FakeHash()},
Data: hexutil.MustDecode(fakeHash().Hex()), Data: hexutil.MustDecode(FakeHash().Hex()),
BlockNumber: uint64(startingBlockNumber), BlockNumber: uint64(startingBlockNumber),
TxHash: fakeHash(), TxHash: FakeHash(),
TxIndex: uint(rand.Int31()), TxIndex: uint(rand.Int31()),
BlockHash: fakeHash(), BlockHash: FakeHash(),
Index: uint(rand.Int31()), Index: uint(rand.Int31()),
} }
} }
@ -58,7 +58,7 @@ func fakeAddress() common.Address {
return common.HexToAddress("0x" + randomString(40)) return common.HexToAddress("0x" + randomString(40))
} }
func fakeHash() common.Hash { func FakeHash() common.Hash {
return common.HexToHash("0x" + randomString(64)) return common.HexToHash("0x" + randomString(64))
} }

View File

@ -150,32 +150,28 @@ var (
StateDiffRlp: MockStateDiff2Bytes, StateDiffRlp: MockStateDiff2Bytes,
} }
CreatedExpectedStorageDiff = utils.StorageDiff{ CreatedExpectedStorageDiff = utils.StorageDiffInput{
ID: 0,
HashedAddress: common.BytesToHash(ContractLeafKey[:]), HashedAddress: common.BytesToHash(ContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
UpdatedExpectedStorageDiff = utils.StorageDiff{ UpdatedExpectedStorageDiff = utils.StorageDiffInput{
ID: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(LargeStorageValue), StorageValue: common.BytesToHash(LargeStorageValue),
} }
UpdatedExpectedStorageDiff2 = utils.StorageDiff{ UpdatedExpectedStorageDiff2 = utils.StorageDiffInput{
ID: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash2), BlockHash: common.HexToHash(BlockHash2),
BlockHeight: int(BlockNumber2.Int64()), BlockHeight: int(BlockNumber2.Int64()),
StorageKey: common.BytesToHash(StorageKey), StorageKey: common.BytesToHash(StorageKey),
StorageValue: common.BytesToHash(SmallStorageValue), StorageValue: common.BytesToHash(SmallStorageValue),
} }
DeletedExpectedStorageDiff = utils.StorageDiff{ DeletedExpectedStorageDiff = utils.StorageDiffInput{
ID: 0,
HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]),
BlockHash: common.HexToHash(BlockHash), BlockHash: common.HexToHash(BlockHash),
BlockHeight: int(BlockNumber.Int64()), BlockHeight: int(BlockNumber.Int64()),

View File

@ -23,7 +23,7 @@ import (
) )
type StorageTransformer interface { type StorageTransformer interface {
Execute(diff utils.StorageDiff) error Execute(diff utils.PersistedStorageDiff) error
KeccakContractAddress() common.Hash KeccakContractAddress() common.Hash
} }

View File

@ -27,7 +27,9 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
) )
type IStorageWatcher interface { type IStorageWatcher interface {
@ -40,8 +42,9 @@ type StorageWatcher struct {
db *postgres.DB db *postgres.DB
StorageFetcher fetcher.IStorageFetcher StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue Queue storage.IStorageQueue
StorageDiffRepository datastore.StorageDiffRepository
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
DiffsChan chan utils.StorageDiff DiffsChan chan utils.StorageDiffInput
ErrsChan chan error ErrsChan chan error
BackFillDoneChan chan bool BackFillDoneChan chan bool
StartingSyncBlockChan chan uint64 StartingSyncBlockChan chan uint64
@ -49,15 +52,17 @@ type StorageWatcher struct {
func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher { func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher {
queue := storage.NewStorageQueue(db) queue := storage.NewStorageQueue(db)
storageDiffRepository := repositories.NewStorageDiffRepository(db)
transformers := make(map[common.Hash]transformer.StorageTransformer) transformers := make(map[common.Hash]transformer.StorageTransformer)
return &StorageWatcher{ return &StorageWatcher{
db: db, db: db,
StorageFetcher: f, StorageFetcher: f,
DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize), DiffsChan: make(chan utils.StorageDiffInput, fetcher.PayloadChanBufferSize),
ErrsChan: make(chan error), ErrsChan: make(chan error),
StartingSyncBlockChan: make(chan uint64), StartingSyncBlockChan: make(chan uint64),
BackFillDoneChan: make(chan bool), BackFillDoneChan: make(chan bool),
Queue: queue, Queue: queue,
StorageDiffRepository: storageDiffRepository,
KeccakAddressTransformers: transformers, KeccakAddressTransformers: transformers,
} }
} }
@ -87,8 +92,8 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration
start := true start := true
for { for {
select { select {
case fetchErr := <-storageWatcher.ErrsChan: case err := <-storageWatcher.ErrsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error())) logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", err.Error()))
case diff := <-storageWatcher.DiffsChan: case diff := <-storageWatcher.DiffsChan:
if start && backFillOn { if start && backFillOn {
storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1) storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1)
@ -103,27 +108,37 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration
} }
} }
func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { func (storageWatcher *StorageWatcher) getTransformer(diff utils.PersistedStorageDiff) (transformer.StorageTransformer, bool) {
storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress]
return storageTransformer, ok return storageTransformer, ok
} }
func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { func (storageWatcher StorageWatcher) processRow(diffInput utils.StorageDiffInput) {
storageTransformer, ok := storageWatcher.getTransformer(diff) diffID, err := storageWatcher.StorageDiffRepository.CreateStorageDiff(diffInput)
if err != nil {
if err == repositories.ErrDuplicateDiff {
logrus.Warn("ignoring duplicate diff")
return
}
logrus.Warnf("failed to persist storage diff: %s", err.Error())
// TODO: bail? Should we continue attempting to transform a diff we didn't persist
}
persistedDiff := utils.ToPersistedDiff(diffInput, diffID)
storageTransformer, ok := storageWatcher.getTransformer(persistedDiff)
if !ok { if !ok {
logrus.Debug("ignoring a diff from an unwatched contract") logrus.Debug("ignoring diff from unwatched contract")
return return
} }
executeErr := storageTransformer.Execute(diff) executeErr := storageTransformer.Execute(persistedDiff)
if executeErr != nil { if executeErr != nil {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(diff) queueErr := storageWatcher.Queue.Add(persistedDiff)
if queueErr != nil { if queueErr != nil {
logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr)) logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr))
} }
return return
} }
logrus.Debugf("Storage diff persisted at block height: %d", diff.BlockHeight) logrus.Debugf("Storage diff persisted at block height: %d", diffInput.BlockHeight)
} }
func (storageWatcher StorageWatcher) processQueue() { func (storageWatcher StorageWatcher) processQueue() {
@ -145,8 +160,8 @@ func (storageWatcher StorageWatcher) processQueue() {
} }
} }
func (storageWatcher StorageWatcher) deleteRow(id int) { func (storageWatcher StorageWatcher) deleteRow(diffID int64) {
deleteErr := storageWatcher.Queue.Delete(id) deleteErr := storageWatcher.Queue.Delete(diffID)
if deleteErr != nil { if deleteErr != nil {
logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr)) logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr))
} }

View File

@ -19,8 +19,8 @@ package watcher_test
import ( import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"sort"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -33,11 +33,22 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
var _ = Describe("Storage Watcher", func() { var _ = Describe("Storage Watcher", func() {
var (
mockFetcher *mocks.StorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
storageWatcher *watcher.StorageWatcher
mockStorageDiffRepository *fakes.MockStorageDiffRepository
fakeDiffId = rand.Int63()
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
csvDiff utils.StorageDiffInput
)
Describe("AddTransformer", func() { Describe("AddTransformer", func() {
It("adds transformers", func() { It("adds transformers", func() {
fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") fakeHashedAddress := utils.HexToKeccak256Hash("0x12345")
@ -50,22 +61,12 @@ var _ = Describe("Storage Watcher", func() {
}) })
}) })
Describe("Execute", func() { Describe("Execute", func() {
var (
mockFetcher *mocks.StorageFetcher
mockQueue *mocks.MockStorageQueue
mockTransformer *mocks.MockStorageTransformer
csvDiff utils.StorageDiff
storageWatcher *watcher.StorageWatcher
hashedAddress common.Hash
)
BeforeEach(func() { BeforeEach(func() {
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher() mockFetcher = mocks.NewStorageFetcher()
mockQueue = &mocks.MockStorageQueue{} mockQueue = &mocks.MockStorageQueue{}
mockStorageDiffRepository = &fakes.MockStorageDiffRepository{}
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
csvDiff = utils.StorageDiff{ csvDiff = utils.StorageDiffInput{
ID: 1337,
HashedAddress: hashedAddress, HashedAddress: hashedAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"), BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: 0, BlockHeight: 0,
@ -79,6 +80,7 @@ var _ = Describe("Storage Watcher", func() {
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
storageWatcher.StorageDiffRepository = mockStorageDiffRepository
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred()) Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
@ -94,20 +96,69 @@ var _ = Describe("Storage Watcher", func() {
}) })
Describe("transforming new storage diffs from csv", func() { Describe("transforming new storage diffs from csv", func() {
var fakePersistedDiff utils.PersistedStorageDiff
BeforeEach(func() { BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} mockFetcher.DiffsToReturn = []utils.StorageDiffInput{csvDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
fakePersistedDiff = utils.PersistedStorageDiff{
ID: fakeDiffId,
StorageDiffInput: utils.StorageDiffInput{
HashedAddress: csvDiff.HashedAddress,
BlockHash: csvDiff.BlockHash,
BlockHeight: csvDiff.BlockHeight,
StorageValue: csvDiff.StorageValue,
StorageKey: csvDiff.StorageKey,
},
}
mockStorageDiffRepository.CreateReturnID = fakeDiffId
storageWatcher.StorageDiffRepository = mockStorageDiffRepository
})
It("writes raw diff before processing", func(done Done) {
go storageWatcher.Execute(time.Hour, false)
Eventually(func() []utils.StorageDiffInput {
return mockStorageDiffRepository.CreatePassedInputs
}).Should(ContainElement(csvDiff))
close(done)
})
It("discards raw diff if it's already been persisted", func(done Done) {
mockStorageDiffRepository.CreateReturnError = repositories.ErrDuplicateDiff
go storageWatcher.Execute(time.Hour, false)
Consistently(func() []utils.PersistedStorageDiff {
return mockTransformer.PassedDiffs
}).Should(BeZero())
close(done)
})
It("logs error if persisting raw diff fails", func(done Done) {
mockStorageDiffRepository.CreateReturnError = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred())
defer os.Remove(tempFile.Name())
logrus.SetOutput(tempFile)
go storageWatcher.Execute(time.Hour, false)
Eventually(func() (string, error) {
logContent, err := ioutil.ReadFile(tempFile.Name())
return string(logContent), err
}).Should(ContainSubstring(fakes.FakeError.Error()))
close(done)
}) })
It("executes transformer for recognized storage diff", func(done Done) { It("executes transformer for recognized storage diff", func(done Done) {
go storageWatcher.Execute(time.Hour, false) go storageWatcher.Execute(time.Hour, false)
Eventually(func() map[int]utils.StorageDiff { Eventually(func() []utils.PersistedStorageDiff {
return mockTransformer.PassedDiffs return mockTransformer.PassedDiffs
}).Should(Equal(map[int]utils.StorageDiff{ }).Should(Equal([]utils.PersistedStorageDiff{
csvDiff.ID: csvDiff, fakePersistedDiff,
})) }))
close(done) close(done)
}) })
@ -120,12 +171,12 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
}).Should(BeTrue()) }).Should(BeTrue())
Eventually(func() utils.StorageDiff { Eventually(func() utils.PersistedStorageDiff {
if len(mockQueue.AddPassedDiffs) > 0 { if len(mockQueue.AddPassedDiffs) > 0 {
return mockQueue.AddPassedDiffs[csvDiff.ID] return mockQueue.AddPassedDiffs[0]
} }
return utils.StorageDiff{} return utils.PersistedStorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(fakePersistedDiff))
close(done) close(done)
}) })
@ -151,10 +202,19 @@ var _ = Describe("Storage Watcher", func() {
}) })
Describe("transforming queued storage diffs", func() { Describe("transforming queued storage diffs", func() {
var queuedDiff utils.PersistedStorageDiff
BeforeEach(func() { BeforeEach(func() {
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ queuedDiff = utils.PersistedStorageDiff{
csvDiff.ID: csvDiff, ID: 1337,
StorageDiffInput: utils.StorageDiffInput{
HashedAddress: hashedAddress,
BlockHash: test_data.FakeHash(),
BlockHeight: rand.Int(),
StorageKey: test_data.FakeHash(),
StorageValue: test_data.FakeHash(),
},
} }
mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{queuedDiff}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
@ -163,24 +223,24 @@ var _ = Describe("Storage Watcher", func() {
It("executes transformer for storage diff", func(done Done) { It("executes transformer for storage diff", func(done Done) {
go storageWatcher.Execute(time.Nanosecond, false) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() utils.StorageDiff { Eventually(func() utils.PersistedStorageDiff {
if len(mockTransformer.PassedDiffs) > 0 { if len(mockTransformer.PassedDiffs) > 0 {
return mockTransformer.PassedDiffs[csvDiff.ID] return mockTransformer.PassedDiffs[0]
} }
return utils.StorageDiff{} return utils.PersistedStorageDiff{}
}).Should(Equal(csvDiff)) }).Should(Equal(queuedDiff))
close(done) close(done)
}) })
It("deletes diff from queue if transformer execution successful", func(done Done) { It("deletes diff from queue if transformer execution successful", func(done Done) {
go storageWatcher.Execute(time.Nanosecond, false) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int64 {
if len(mockQueue.DeletePassedIds) > 0 { if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0] return mockQueue.DeletePassedIds[0]
} }
return 0 return 0
}).Should(Equal(csvDiff.ID)) }).Should(Equal(queuedDiff.ID))
close(done) close(done)
}) })
@ -201,17 +261,15 @@ var _ = Describe("Storage Watcher", func() {
}) })
It("deletes obsolete diff from queue if contract not recognized", func(done Done) { It("deletes obsolete diff from queue if contract not recognized", func(done Done) {
obsoleteDiff := utils.StorageDiff{ obsoleteDiff := utils.PersistedStorageDiff{
ID: csvDiff.ID + 1, ID: queuedDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), StorageDiffInput: utils.StorageDiffInput{HashedAddress: test_data.FakeHash()},
}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
} }
mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{obsoleteDiff}
go storageWatcher.Execute(time.Nanosecond, false) go storageWatcher.Execute(time.Nanosecond, false)
Eventually(func() int { Eventually(func() int64 {
if len(mockQueue.DeletePassedIds) > 0 { if len(mockQueue.DeletePassedIds) > 0 {
return mockQueue.DeletePassedIds[0] return mockQueue.DeletePassedIds[0]
} }
@ -221,13 +279,11 @@ var _ = Describe("Storage Watcher", func() {
}) })
It("logs error if deleting obsolete diff fails", func(done Done) { It("logs error if deleting obsolete diff fails", func(done Done) {
obsoleteDiff := utils.StorageDiff{ obsoleteDiff := utils.PersistedStorageDiff{
ID: csvDiff.ID + 1, ID: queuedDiff.ID + 1,
HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), StorageDiffInput: utils.StorageDiffInput{HashedAddress: test_data.FakeHash()},
}
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{
obsoleteDiff.ID: obsoleteDiff,
} }
mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{obsoleteDiff}
mockQueue.DeleteErr = fakes.FakeError mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log") tempFile, fileErr := ioutil.TempFile("", "log")
Expect(fileErr).NotTo(HaveOccurred()) Expect(fileErr).NotTo(HaveOccurred())
@ -247,27 +303,39 @@ var _ = Describe("Storage Watcher", func() {
Describe("BackFill", func() { Describe("BackFill", func() {
var ( var (
mockFetcher *mocks.StorageFetcher mockBackFiller *mocks.BackFiller
mockBackFiller *mocks.BackFiller mockTransformer2 *mocks.MockStorageTransformer
mockQueue *mocks.MockStorageQueue mockTransformer3 *mocks.MockStorageTransformer
mockTransformer *mocks.MockStorageTransformer createdPersistedDiff = utils.PersistedStorageDiff{
mockTransformer2 *mocks.MockStorageTransformer ID: fakeDiffId,
mockTransformer3 *mocks.MockStorageTransformer StorageDiffInput: test_data.CreatedExpectedStorageDiff,
csvDiff utils.StorageDiff }
storageWatcher *watcher.StorageWatcher updatedPersistedDiff1 = utils.PersistedStorageDiff{
hashedAddress common.Hash ID: fakeDiffId,
createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff StorageDiffInput: test_data.UpdatedExpectedStorageDiff,
}
deletedPersistedDiff = utils.PersistedStorageDiff{
ID: fakeDiffId,
StorageDiffInput: test_data.DeletedExpectedStorageDiff,
}
updatedPersistedDiff2 = utils.PersistedStorageDiff{
ID: fakeDiffId,
StorageDiffInput: test_data.UpdatedExpectedStorageDiff2,
}
csvDiff = utils.StorageDiffInput{
HashedAddress: hashedAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
csvPersistedDiff = utils.PersistedStorageDiff{
ID: fakeDiffId,
StorageDiffInput: csvDiff,
}
) )
BeforeEach(func() { BeforeEach(func() {
createdDiff = test_data.CreatedExpectedStorageDiff
createdDiff.ID = 1333
updatedDiff1 = test_data.UpdatedExpectedStorageDiff
updatedDiff1.ID = 1334
deletedDiff = test_data.DeletedExpectedStorageDiff
deletedDiff.ID = 1335
updatedDiff2 = test_data.UpdatedExpectedStorageDiff2
updatedDiff2.ID = 1336
mockBackFiller = new(mocks.BackFiller) mockBackFiller = new(mocks.BackFiller)
hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef")
mockFetcher = mocks.NewStorageFetcher() mockFetcher = mocks.NewStorageFetcher()
@ -275,26 +343,19 @@ var _ = Describe("Storage Watcher", func() {
mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress}
mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])} mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])}
mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])} mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])}
csvDiff = utils.StorageDiff{ mockStorageDiffRepository = &fakes.MockStorageDiffRepository{}
ID: 1337,
HashedAddress: hashedAddress,
BlockHash: common.HexToHash("0xfedcba9876543210"),
BlockHeight: int(test_data.BlockNumber2.Int64()) + 1,
StorageKey: common.HexToHash("0xabcdef1234567890"),
StorageValue: common.HexToHash("0x9876543210abcdef"),
}
}) })
Describe("transforming streamed and backfilled storage diffs", func() { Describe("transforming streamed and backfilled storage diffs", func() {
BeforeEach(func() { BeforeEach(func() {
mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} mockFetcher.DiffsToReturn = []utils.StorageDiffInput{csvDiff}
mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiffInput{
createdDiff, test_data.CreatedExpectedStorageDiff,
updatedDiff1, test_data.UpdatedExpectedStorageDiff,
deletedDiff, test_data.DeletedExpectedStorageDiff,
updatedDiff2, test_data.UpdatedExpectedStorageDiff2,
}) })
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{} mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{}
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{ storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{
@ -302,6 +363,9 @@ var _ = Describe("Storage Watcher", func() {
mockTransformer2.FakeTransformerInitializer, mockTransformer2.FakeTransformerInitializer,
mockTransformer3.FakeTransformerInitializer, mockTransformer3.FakeTransformerInitializer,
}) })
mockStorageDiffRepository.CreateReturnID = fakeDiffId
storageWatcher.StorageDiffRepository = mockStorageDiffRepository
}) })
It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) { It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) {
@ -311,20 +375,17 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() int { Eventually(func() int {
return len(mockTransformer.PassedDiffs) return len(mockTransformer.PassedDiffs)
}).Should(Equal(1)) }).Should(Equal(1))
Eventually(func() int { Eventually(func() int {
return len(mockTransformer2.PassedDiffs) return len(mockTransformer2.PassedDiffs)
}).Should(Equal(1)) }).Should(Equal(1))
Eventually(func() int { Eventually(func() int {
return len(mockTransformer3.PassedDiffs) return len(mockTransformer3.PassedDiffs)
}).Should(Equal(3)) }).Should(Equal(3))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done) close(done)
}) })
@ -346,23 +407,17 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() bool { Eventually(func() bool {
return mockQueue.AddCalled return mockQueue.AddCalled
}).Should(BeTrue()) }).Should(BeTrue())
Eventually(func() map[int]utils.StorageDiff { Eventually(func() []utils.PersistedStorageDiff {
if len(mockQueue.AddPassedDiffs) > 2 { if len(mockQueue.AddPassedDiffs) > 2 {
return mockQueue.AddPassedDiffs return mockQueue.AddPassedDiffs
} }
return map[int]utils.StorageDiff{} return []utils.PersistedStorageDiff{}
}).Should(Equal(map[int]utils.StorageDiff{ }).Should(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2))
updatedDiff1.ID: updatedDiff1,
deletedDiff.ID: deletedDiff,
updatedDiff2.ID: updatedDiff2,
}))
Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64())))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done) close(done)
}) })
@ -417,12 +472,12 @@ var _ = Describe("Storage Watcher", func() {
Describe("transforms queued storage diffs", func() { Describe("transforms queued storage diffs", func() {
BeforeEach(func() { BeforeEach(func() {
mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{
csvDiff.ID: csvDiff, csvPersistedDiff,
createdDiff.ID: createdDiff, createdPersistedDiff,
updatedDiff1.ID: updatedDiff1, updatedPersistedDiff1,
deletedDiff.ID: deletedDiff, deletedPersistedDiff,
updatedDiff2.ID: updatedDiff2, updatedPersistedDiff2,
} }
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
storageWatcher.Queue = mockQueue storageWatcher.Queue = mockQueue
@ -449,29 +504,25 @@ var _ = Describe("Storage Watcher", func() {
Eventually(func() bool { Eventually(func() bool {
return mockQueue.GetAllCalled return mockQueue.GetAllCalled
}).Should(BeTrue()) }).Should(BeTrue())
sortedExpectedIDs := []int{ expectedIDs := []int64{
csvDiff.ID, fakeDiffId,
createdDiff.ID, fakeDiffId,
updatedDiff1.ID, fakeDiffId,
deletedDiff.ID, fakeDiffId,
updatedDiff2.ID, fakeDiffId,
} }
sort.Ints(sortedExpectedIDs) Eventually(func() []int64 {
Eventually(func() []int {
if len(mockQueue.DeletePassedIds) > 4 { if len(mockQueue.DeletePassedIds) > 4 {
sort.Ints(mockQueue.DeletePassedIds)
return mockQueue.DeletePassedIds return mockQueue.DeletePassedIds
} }
return []int{} return []int64{}
}).Should(Equal(sortedExpectedIDs)) }).Should(Equal(expectedIDs))
Expect(mockQueue.AddCalled).To(Not(BeTrue())) Expect(mockQueue.AddCalled).To(Not(BeTrue()))
Expect(len(mockQueue.DiffsToReturn)).To(Equal(0)) Expect(len(mockQueue.DiffsToReturn)).To(Equal(0))
Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff))
Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2))
Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff))
Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2))
close(done) close(done)
}) })
}) })

View File

@ -0,0 +1,47 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories
import (
"database/sql"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
var ErrDuplicateDiff = sql.ErrNoRows
type StorageDiffRepository struct {
db *postgres.DB
}
func NewStorageDiffRepository(db *postgres.DB) StorageDiffRepository {
return StorageDiffRepository{db: db}
}
func (repository StorageDiffRepository) CreateStorageDiff(input utils.StorageDiffInput) (int64, error) {
var storageDiffID int64
row := repository.db.QueryRowx(`INSERT INTO public.storage_diff
(hashed_address, block_height, block_hash, storage_key, storage_value) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING RETURNING id`, input.HashedAddress.Bytes(), input.BlockHeight, input.BlockHash.Bytes(),
input.StorageKey.Bytes(), input.StorageValue.Bytes())
err := row.Scan(&storageDiffID)
if err != nil && err == sql.ErrNoRows {
return 0, ErrDuplicateDiff
}
return storageDiffID, err
}

View File

@ -0,0 +1,83 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories_test
import (
"database/sql"
"math/rand"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Storage diffs repository", func() {
var (
db *postgres.DB
repo repositories.StorageDiffRepository
fakeStorageDiff utils.StorageDiffInput
)
BeforeEach(func() {
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
repo = repositories.NewStorageDiffRepository(db)
fakeStorageDiff = utils.StorageDiffInput{
HashedAddress: test_data.FakeHash(),
BlockHash: test_data.FakeHash(),
BlockHeight: rand.Int(),
StorageKey: test_data.FakeHash(),
StorageValue: test_data.FakeHash(),
}
})
Describe("CreateStorageDiff", func() {
It("adds a storage diff to the db, returning id", func() {
id, createErr := repo.CreateStorageDiff(fakeStorageDiff)
Expect(createErr).NotTo(HaveOccurred())
Expect(id).NotTo(BeZero())
var persisted utils.PersistedStorageDiff
getErr := db.Get(&persisted, `SELECT * FROM public.storage_diff`)
Expect(getErr).NotTo(HaveOccurred())
Expect(persisted.ID).To(Equal(id))
Expect(persisted.HashedAddress).To(Equal(fakeStorageDiff.HashedAddress))
Expect(persisted.BlockHash).To(Equal(fakeStorageDiff.BlockHash))
Expect(persisted.BlockHeight).To(Equal(fakeStorageDiff.BlockHeight))
Expect(persisted.StorageKey).To(Equal(fakeStorageDiff.StorageKey))
Expect(persisted.StorageValue).To(Equal(fakeStorageDiff.StorageValue))
})
It("does not duplicate storage diffs", func() {
_, createErr := repo.CreateStorageDiff(fakeStorageDiff)
Expect(createErr).NotTo(HaveOccurred())
_, createTwoErr := repo.CreateStorageDiff(fakeStorageDiff)
Expect(createTwoErr).To(HaveOccurred())
Expect(createTwoErr).To(MatchError(sql.ErrNoRows))
var count int
getErr := db.Get(&count, `SELECT count(*) FROM public.storage_diff`)
Expect(getErr).NotTo(HaveOccurred())
Expect(count).To(Equal(1))
})
})
})

View File

@ -19,6 +19,7 @@ package datastore
import ( import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/filters"
) )
@ -83,6 +84,10 @@ type HeaderSyncReceiptRepository interface {
CreateFullSyncReceiptInTx(blockID int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) CreateFullSyncReceiptInTx(blockID int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error)
} }
type StorageDiffRepository interface {
CreateStorageDiff(input utils.StorageDiffInput) (int64, error)
}
type WatchedEventRepository interface { type WatchedEventRepository interface {
GetWatchedEvents(name string) ([]*core.WatchedEvent, error) GetWatchedEvents(name string) ([]*core.WatchedEvent, error)
} }

View File

@ -0,0 +1,32 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fakes
import (
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
type MockStorageDiffRepository struct {
CreatePassedInputs []utils.StorageDiffInput
CreateReturnID int64
CreateReturnError error
}
func (repository *MockStorageDiffRepository) CreateStorageDiff(input utils.StorageDiffInput) (int64, error) {
repository.CreatePassedInputs = append(repository.CreatePassedInputs, input)
return repository.CreateReturnID, repository.CreateReturnError
}

View File

@ -101,6 +101,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM headers") db.MustExec("DELETE FROM headers")
db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM queued_storage") db.MustExec("DELETE FROM queued_storage")
db.MustExec("DELETE FROM storage_diff")
db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_contracts")
db.MustExec("DELETE FROM watched_logs") db.MustExec("DELETE FROM watched_logs")
} }