diff --git a/db/migrations/00025_create_queued_storage.sql b/db/migrations/00025_create_queued_storage.sql deleted file mode 100644 index 79a3a548..00000000 --- a/db/migrations/00025_create_queued_storage.sql +++ /dev/null @@ -1,13 +0,0 @@ --- +goose Up -CREATE TABLE public.queued_storage ( - id SERIAL PRIMARY KEY, - block_height BIGINT, - block_hash BYTEA, - contract BYTEA, - storage_key BYTEA, - storage_value BYTEA, - UNIQUE (block_height, block_hash, contract, storage_key, storage_value) -); - --- +goose Down -DROP TABLE public.queued_storage; diff --git a/db/migrations/00025_create_storage_diffs_table.sql b/db/migrations/00025_create_storage_diffs_table.sql new file mode 100644 index 00000000..18deef11 --- /dev/null +++ b/db/migrations/00025_create_storage_diffs_table.sql @@ -0,0 +1,14 @@ +-- +goose Up +CREATE TABLE public.storage_diff +( + id SERIAL PRIMARY KEY, + block_height BIGINT, + block_hash BYTEA, + hashed_address BYTEA, + storage_key BYTEA, + storage_value BYTEA, + UNIQUE (block_height, block_hash, hashed_address, storage_key, storage_value) +); + +-- +goose Down +DROP TABLE public.storage_diff; \ No newline at end of file diff --git a/db/migrations/00026_create_queued_storage_diffs_table.sql b/db/migrations/00026_create_queued_storage_diffs_table.sql new file mode 100644 index 00000000..b184d641 --- /dev/null +++ b/db/migrations/00026_create_queued_storage_diffs_table.sql @@ -0,0 +1,9 @@ +-- +goose Up +CREATE TABLE public.queued_storage +( + id SERIAL PRIMARY KEY, + diff_id BIGINT UNIQUE NOT NULL REFERENCES public.storage_diff (id) +); + +-- +goose Down +DROP TABLE public.queued_storage; diff --git a/db/migrations/00026_create_header_sync_transactions_table.sql b/db/migrations/00027_create_header_sync_transactions_table.sql similarity index 100% rename from db/migrations/00026_create_header_sync_transactions_table.sql rename to db/migrations/00027_create_header_sync_transactions_table.sql diff --git a/db/migrations/00027_create_header_sync_receipts_table.sql b/db/migrations/00028_create_header_sync_receipts_table.sql similarity index 100% rename from db/migrations/00027_create_header_sync_receipts_table.sql rename to db/migrations/00028_create_header_sync_receipts_table.sql diff --git a/db/migrations/00028_create_uncles_table.sql b/db/migrations/00029_create_uncles_table.sql similarity index 100% rename from db/migrations/00028_create_uncles_table.sql rename to db/migrations/00029_create_uncles_table.sql diff --git a/db/migrations/00029_create_header_sync_logs_table.sql b/db/migrations/00030_create_header_sync_logs_table.sql similarity index 100% rename from db/migrations/00029_create_header_sync_logs_table.sql rename to db/migrations/00030_create_header_sync_logs_table.sql diff --git a/db/migrations/00030_create_watched_logs_table.sql b/db/migrations/00031_create_watched_logs_table.sql similarity index 100% rename from db/migrations/00030_create_watched_logs_table.sql rename to db/migrations/00031_create_watched_logs_table.sql diff --git a/db/migrations/00031_create_header_cids_table.sql b/db/migrations/00032_create_header_cids_table.sql similarity index 100% rename from db/migrations/00031_create_header_cids_table.sql rename to db/migrations/00032_create_header_cids_table.sql diff --git a/db/migrations/00032_create_transaction_cids_table.sql b/db/migrations/00033_create_transaction_cids_table.sql similarity index 100% rename from db/migrations/00032_create_transaction_cids_table.sql rename to db/migrations/00033_create_transaction_cids_table.sql diff --git a/db/migrations/00033_create_receipt_cids_table.sql b/db/migrations/00034_create_receipt_cids_table.sql similarity index 100% rename from db/migrations/00033_create_receipt_cids_table.sql rename to db/migrations/00034_create_receipt_cids_table.sql diff --git a/db/migrations/00034_create_state_cids_table.sql b/db/migrations/00035_create_state_cids_table.sql similarity index 100% rename from db/migrations/00034_create_state_cids_table.sql rename to db/migrations/00035_create_state_cids_table.sql diff --git a/db/migrations/00035_create_storage_cids_table.sql b/db/migrations/00036_create_storage_cids_table.sql similarity index 100% rename from db/migrations/00035_create_storage_cids_table.sql rename to db/migrations/00036_create_storage_cids_table.sql diff --git a/db/migrations/00036_create_ipfs_blocks_table.sql b/db/migrations/00037_create_ipfs_blocks_table.sql similarity index 100% rename from db/migrations/00036_create_ipfs_blocks_table.sql rename to db/migrations/00037_create_ipfs_blocks_table.sql diff --git a/db/schema.sql b/db/schema.sql index fdd00130..8db21b45 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -2,7 +2,7 @@ -- PostgreSQL database dump -- --- Dumped from database version 11.5 +-- Dumped from database version 10.10 -- Dumped by pg_dump version 11.5 SET statement_timeout = 0; @@ -560,11 +560,7 @@ ALTER SEQUENCE public.nodes_id_seq OWNED BY public.eth_nodes.id; CREATE TABLE public.queued_storage ( id integer NOT NULL, - block_height bigint, - block_hash bytea, - contract bytea, - storage_key bytea, - storage_value bytea + diff_id bigint NOT NULL ); @@ -687,6 +683,40 @@ CREATE SEQUENCE public.storage_cids_id_seq ALTER SEQUENCE public.storage_cids_id_seq OWNED BY public.storage_cids.id; +-- +-- Name: storage_diff; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.storage_diff ( + id integer NOT NULL, + block_height bigint, + block_hash bytea, + hashed_address bytea, + storage_key bytea, + storage_value bytea +); + + +-- +-- Name: storage_diff_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.storage_diff_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: storage_diff_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.storage_diff_id_seq OWNED BY public.storage_diff.id; + + -- -- Name: transaction_cids; Type: TABLE; Schema: public; Owner: - -- @@ -969,6 +999,13 @@ ALTER TABLE ONLY public.state_cids ALTER COLUMN id SET DEFAULT nextval('public.s ALTER TABLE ONLY public.storage_cids ALTER COLUMN id SET DEFAULT nextval('public.storage_cids_id_seq'::regclass); +-- +-- Name: storage_diff id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.storage_diff ALTER COLUMN id SET DEFAULT nextval('public.storage_diff_id_seq'::regclass); + + -- -- Name: transaction_cids id; Type: DEFAULT; Schema: public; Owner: - -- @@ -1190,11 +1227,11 @@ ALTER TABLE ONLY public.eth_nodes -- --- Name: queued_storage queued_storage_block_height_block_hash_contract_storage_key_key; Type: CONSTRAINT; Schema: public; Owner: - +-- Name: queued_storage queued_storage_diff_id_key; Type: CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.queued_storage - ADD CONSTRAINT queued_storage_block_height_block_hash_contract_storage_key_key UNIQUE (block_height, block_hash, contract, storage_key, storage_value); + ADD CONSTRAINT queued_storage_diff_id_key UNIQUE (diff_id); -- @@ -1245,6 +1282,22 @@ ALTER TABLE ONLY public.storage_cids ADD CONSTRAINT storage_cids_state_id_storage_key_key UNIQUE (state_id, storage_key); +-- +-- Name: storage_diff storage_diff_block_height_block_hash_hashed_address_storage_key; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.storage_diff + ADD CONSTRAINT storage_diff_block_height_block_hash_hashed_address_storage_key UNIQUE (block_height, block_hash, hashed_address, storage_key, storage_value); + + +-- +-- Name: storage_diff storage_diff_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.storage_diff + ADD CONSTRAINT storage_diff_pkey PRIMARY KEY (id); + + -- -- Name: transaction_cids transaction_cids_header_id_tx_hash_key; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1474,6 +1527,14 @@ ALTER TABLE ONLY public.eth_blocks ADD CONSTRAINT node_fk FOREIGN KEY (eth_node_id) REFERENCES public.eth_nodes(id) ON DELETE CASCADE; +-- +-- Name: queued_storage queued_storage_diff_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.queued_storage + ADD CONSTRAINT queued_storage_diff_id_fkey FOREIGN KEY (diff_id) REFERENCES public.storage_diff(id); + + -- -- Name: receipt_cids receipt_cids_tx_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/libraries/shared/factories/storage/repository.go b/libraries/shared/factories/storage/repository.go index 3146f3ba..be8f7b12 100644 --- a/libraries/shared/factories/storage/repository.go +++ b/libraries/shared/factories/storage/repository.go @@ -22,6 +22,6 @@ import ( ) type Repository interface { - Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error + Create(diffID int64, metadata utils.StorageValueMetadata, value interface{}) error SetDB(db *postgres.DB) } diff --git a/libraries/shared/factories/storage/transformer.go b/libraries/shared/factories/storage/transformer.go index 25e39811..006c0e68 100644 --- a/libraries/shared/factories/storage/transformer.go +++ b/libraries/shared/factories/storage/transformer.go @@ -39,7 +39,7 @@ func (transformer Transformer) KeccakContractAddress() common.Hash { return transformer.HashedAddress } -func (transformer Transformer) Execute(diff utils.StorageDiff) error { +func (transformer Transformer) Execute(diff utils.PersistedStorageDiff) error { metadata, lookupErr := transformer.StorageKeysLookup.Lookup(diff.StorageKey) if lookupErr != nil { return lookupErr @@ -48,5 +48,5 @@ func (transformer Transformer) Execute(diff utils.StorageDiff) error { if decodeErr != nil { return decodeErr } - return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value) + return transformer.Repository.Create(diff.ID, metadata, value) } diff --git a/libraries/shared/factories/storage/transformer_test.go b/libraries/shared/factories/storage/transformer_test.go index fddb082f..644d15d8 100644 --- a/libraries/shared/factories/storage/transformer_test.go +++ b/libraries/shared/factories/storage/transformer_test.go @@ -17,6 +17,8 @@ package storage_test import ( + "math/rand" + "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -51,7 +53,7 @@ var _ = Describe("Storage transformer", func() { }) It("looks up metadata for storage key", func() { - t.Execute(utils.StorageDiff{}) + t.Execute(utils.PersistedStorageDiff{}) Expect(storageKeysLookup.LookupCalled).To(BeTrue()) }) @@ -59,7 +61,7 @@ var _ = Describe("Storage transformer", func() { It("returns error if lookup fails", func() { storageKeysLookup.LookupErr = fakes.FakeError - err := t.Execute(utils.StorageDiff{}) + err := t.Execute(utils.PersistedStorageDiff{}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -71,19 +73,21 @@ var _ = Describe("Storage transformer", func() { rawValue := common.HexToAddress("0x12345") fakeBlockNumber := 123 fakeBlockHash := "0x67890" - fakeRow := utils.StorageDiff{ - HashedAddress: common.Hash{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + fakeRow := utils.PersistedStorageDiff{ + ID: rand.Int63(), + StorageDiffInput: utils.StorageDiffInput{ + HashedAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), + }, } err := t.Execute(fakeRow) Expect(err).NotTo(HaveOccurred()) - Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber)) - Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex())) + Expect(repository.PassedDiffID).To(Equal(fakeRow.ID)) Expect(repository.PassedMetadata).To(Equal(fakeMetadata)) Expect(repository.PassedValue.(string)).To(Equal(rawValue.Hex())) }) @@ -93,8 +97,9 @@ var _ = Describe("Storage transformer", func() { fakeMetadata := utils.StorageValueMetadata{Type: utils.Address} storageKeysLookup.Metadata = fakeMetadata repository.CreateErr = fakes.FakeError + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: rawValue.Hash()}} - err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) + err := t.Execute(diff) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -119,19 +124,21 @@ var _ = Describe("Storage transformer", func() { It("passes the decoded data items to the repository", func() { storageKeysLookup.Metadata = fakeMetadata - fakeRow := utils.StorageDiff{ - HashedAddress: common.Hash{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + fakeRow := utils.PersistedStorageDiff{ + ID: rand.Int63(), + StorageDiffInput: utils.StorageDiffInput{ + HashedAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), + }, } err := t.Execute(fakeRow) Expect(err).NotTo(HaveOccurred()) - Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber)) - Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex())) + Expect(repository.PassedDiffID).To(Equal(fakeRow.ID)) Expect(repository.PassedMetadata).To(Equal(fakeMetadata)) expectedPassedValue := make(map[int]string) expectedPassedValue[0] = "10800" @@ -142,8 +149,9 @@ var _ = Describe("Storage transformer", func() { It("returns error if creating a row fails", func() { storageKeysLookup.Metadata = fakeMetadata repository.CreateErr = fakes.FakeError + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: rawValue.Hash()}} - err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) + err := t.Execute(diff) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher.go b/libraries/shared/fetcher/csv_tail_storage_fetcher.go index 2205ef38..3d855c19 100644 --- a/libraries/shared/fetcher/csv_tail_storage_fetcher.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher.go @@ -32,7 +32,7 @@ func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher { return CsvTailStorageFetcher{tailer: tailer} } -func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { +func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) { t, tailErr := storageFetcher.tailer.Tail() if tailErr != nil { errs <- tailErr diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go index cf0f06f5..6a13d28b 100644 --- a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go @@ -34,13 +34,13 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { var ( errorsChannel chan error mockTailer *fakes.MockTailer - diffsChannel chan utils.StorageDiff + diffsChannel chan utils.StorageDiffInput storageFetcher fetcher.CsvTailStorageFetcher ) BeforeEach(func() { errorsChannel = make(chan error) - diffsChannel = make(chan utils.StorageDiff) + diffsChannel = make(chan utils.StorageDiffInput) mockTailer = fakes.NewMockTailer() storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) }) diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index 570a1f46..b4809cf6 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -43,7 +43,7 @@ func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher } } -func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { +func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) { ethStatediffPayloadChan := fetcher.StatediffPayloadChan clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) if clientSubErr != nil { diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go index becfaeb2..9244f38b 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -60,13 +60,13 @@ func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payloa var _ = Describe("Geth RPC Storage Fetcher", func() { var streamer MockStoragediffStreamer var statediffFetcher fetcher.GethRPCStorageFetcher - var storagediffChan chan utils.StorageDiff + var storagediffChan chan utils.StorageDiffInput var errorChan chan error BeforeEach(func() { streamer = MockStoragediffStreamer{} statediffFetcher = fetcher.NewGethRPCStorageFetcher(&streamer) - storagediffChan = make(chan utils.StorageDiff) + storagediffChan = make(chan utils.StorageDiffInput) errorChan = make(chan error) }) @@ -113,21 +113,21 @@ var _ = Describe("Geth RPC Storage Fetcher", func() { height := test_data.BlockNumber intHeight := int(height.Int64()) - createdExpectedStorageDiff := utils.StorageDiff{ + createdExpectedStorageDiff := utils.StorageDiffInput{ HashedAddress: common.BytesToHash(test_data.ContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: intHeight, StorageKey: common.BytesToHash(test_data.StorageKey), StorageValue: common.BytesToHash(test_data.SmallStorageValue), } - updatedExpectedStorageDiff := utils.StorageDiff{ + updatedExpectedStorageDiff := utils.StorageDiffInput{ HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: intHeight, StorageKey: common.BytesToHash(test_data.StorageKey), StorageValue: common.BytesToHash(test_data.LargeStorageValue), } - deletedExpectedStorageDiff := utils.StorageDiff{ + deletedExpectedStorageDiff := utils.StorageDiffInput{ HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), BlockHeight: intHeight, diff --git a/libraries/shared/fetcher/storage_fetcher_interface.go b/libraries/shared/fetcher/storage_fetcher_interface.go index 8999589c..34e0b5ae 100644 --- a/libraries/shared/fetcher/storage_fetcher_interface.go +++ b/libraries/shared/fetcher/storage_fetcher_interface.go @@ -17,5 +17,5 @@ package fetcher import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" type IStorageFetcher interface { - FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) + FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) } diff --git a/libraries/shared/mocks/backfiller.go b/libraries/shared/mocks/backfiller.go index 3a1a0254..81a09e06 100644 --- a/libraries/shared/mocks/backfiller.go +++ b/libraries/shared/mocks/backfiller.go @@ -24,23 +24,23 @@ import ( // BackFiller mock for tests type BackFiller struct { - StorageDiffsToReturn []utils.StorageDiff + StorageDiffsToReturn []utils.StorageDiffInput BackFillErrs []error PassedEndingBlock uint64 } // SetStorageDiffsToReturn for tests -func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiff) { +func (backFiller *BackFiller) SetStorageDiffsToReturn(diffs []utils.StorageDiffInput) { backFiller.StorageDiffsToReturn = diffs } // BackFill mock method -func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { +func (backFiller *BackFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error { if endingBlock < startingBlock { return errors.New("backfill: ending block number needs to be greater than starting block number") } backFiller.PassedEndingBlock = endingBlock - go func(backFill chan utils.StorageDiff, errChan chan error, done chan bool) { + go func(backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) { errLen := len(backFiller.BackFillErrs) for i, diff := range backFiller.StorageDiffsToReturn { if i < errLen { diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index d4928eeb..7eb62555 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -22,7 +22,7 @@ import ( // StorageFetcher is a mock fetcher for use in tests with backfilling type StorageFetcher struct { - DiffsToReturn []utils.StorageDiff + DiffsToReturn []utils.StorageDiffInput ErrsToReturn []error } @@ -32,7 +32,7 @@ func NewStorageFetcher() *StorageFetcher { } // FetchStorageDiffs mock method -func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { +func (fetcher *StorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) { for _, err := range fetcher.ErrsToReturn { errs <- err } diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index 889be21e..c13585dd 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -24,37 +24,36 @@ import ( type MockStorageQueue struct { AddCalled bool AddError error - AddPassedDiffs map[int]utils.StorageDiff + AddPassedDiffs []utils.PersistedStorageDiff DeleteErr error - DeletePassedIds []int + DeletePassedIds []int64 GetAllErr error - DiffsToReturn map[int]utils.StorageDiff + DiffsToReturn []utils.PersistedStorageDiff GetAllCalled bool } // Add mock method -func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { +func (queue *MockStorageQueue) Add(diff utils.PersistedStorageDiff) error { queue.AddCalled = true - if queue.AddPassedDiffs == nil { - queue.AddPassedDiffs = make(map[int]utils.StorageDiff) - } - queue.AddPassedDiffs[diff.ID] = diff + queue.AddPassedDiffs = append(queue.AddPassedDiffs, diff) return queue.AddError } // Delete mock method -func (queue *MockStorageQueue) Delete(id int) error { +func (queue *MockStorageQueue) Delete(id int64) error { queue.DeletePassedIds = append(queue.DeletePassedIds, id) - delete(queue.DiffsToReturn, id) + var diffs []utils.PersistedStorageDiff + for _, diff := range queue.DiffsToReturn { + if diff.ID != id { + diffs = append(diffs, diff) + } + } + queue.DiffsToReturn = diffs return queue.DeleteErr } // GetAll mock method -func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { +func (queue *MockStorageQueue) GetAll() ([]utils.PersistedStorageDiff, error) { queue.GetAllCalled = true - diffs := make([]utils.StorageDiff, 0) - for _, diff := range queue.DiffsToReturn { - diffs = append(diffs, diff) - } - return diffs, queue.GetAllErr + return queue.DiffsToReturn, queue.GetAllErr } diff --git a/libraries/shared/mocks/storage_repository.go b/libraries/shared/mocks/storage_repository.go index c4e351be..acf36076 100644 --- a/libraries/shared/mocks/storage_repository.go +++ b/libraries/shared/mocks/storage_repository.go @@ -22,16 +22,14 @@ import ( ) type MockStorageRepository struct { - CreateErr error - PassedBlockNumber int - PassedBlockHash string - PassedMetadata utils.StorageValueMetadata - PassedValue interface{} + CreateErr error + PassedDiffID int64 + PassedMetadata utils.StorageValueMetadata + PassedValue interface{} } -func (repository *MockStorageRepository) Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error { - repository.PassedBlockNumber = blockNumber - repository.PassedBlockHash = blockHash +func (repository *MockStorageRepository) Create(diffID int64, metadata utils.StorageValueMetadata, value interface{}) error { + repository.PassedDiffID = diffID repository.PassedMetadata = metadata repository.PassedValue = value return repository.CreateErr diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go index 04a091de..217d990b 100644 --- a/libraries/shared/mocks/storage_transformer.go +++ b/libraries/shared/mocks/storage_transformer.go @@ -28,15 +28,12 @@ import ( type MockStorageTransformer struct { KeccakOfAddress common.Hash ExecuteErr error - PassedDiffs map[int]utils.StorageDiff + PassedDiffs []utils.PersistedStorageDiff } // Execute mock method -func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { - if transformer.PassedDiffs == nil { - transformer.PassedDiffs = make(map[int]utils.StorageDiff) - } - transformer.PassedDiffs[diff.ID] = diff +func (transformer *MockStorageTransformer) Execute(diff utils.PersistedStorageDiff) error { + transformer.PassedDiffs = append(transformer.PassedDiffs, diff) return transformer.ExecuteErr } diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go index 6b5f290c..e9cc6aa5 100644 --- a/libraries/shared/storage/backfiller.go +++ b/libraries/shared/storage/backfiller.go @@ -36,7 +36,7 @@ const ( // BackFiller is the backfilling interface type BackFiller interface { - BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error + BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error } // backFiller is the backfilling struct @@ -59,7 +59,7 @@ func NewStorageBackFiller(fetcher fetcher.StateDiffFetcher, batchSize uint64) Ba // BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiff, errChan chan error, done chan bool) error { +func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan utils.StorageDiffInput, errChan chan error, done chan bool) error { logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) // break the range up into bins of smaller ranges @@ -113,7 +113,7 @@ func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan return nil } -func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiff, errChan chan error, doneChan chan [2]uint64) { +func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.StorageDiffInput, errChan chan error, doneChan chan [2]uint64) { payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) if fetchErr != nil { errChan <- fetchErr diff --git a/libraries/shared/storage/backfiller_test.go b/libraries/shared/storage/backfiller_test.go index 0b347576..75a14c40 100644 --- a/libraries/shared/storage/backfiller_test.go +++ b/libraries/shared/storage/backfiller_test.go @@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() { It("batch calls statediff_stateDiffAt", func() { backFiller = storage.NewStorageBackFiller(mockFetcher, 100) - backFill := make(chan utils.StorageDiff) + backFill := make(chan utils.StorageDiffInput) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( @@ -55,7 +55,7 @@ var _ = Describe("BackFiller", func() { errChan, done) Expect(backFillInitErr).ToNot(HaveOccurred()) - var diffs []utils.StorageDiff + var diffs []utils.StorageDiffInput for { select { case diff := <-backFill: @@ -79,7 +79,7 @@ var _ = Describe("BackFiller", func() { It("has a configurable batch size", func() { backFiller = storage.NewStorageBackFiller(mockFetcher, 1) - backFill := make(chan utils.StorageDiff) + backFill := make(chan utils.StorageDiffInput) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( @@ -89,7 +89,7 @@ var _ = Describe("BackFiller", func() { errChan, done) Expect(backFillInitErr).ToNot(HaveOccurred()) - var diffs []utils.StorageDiff + var diffs []utils.StorageDiffInput for { select { case diff := <-backFill: @@ -119,7 +119,7 @@ var _ = Describe("BackFiller", func() { mockFetcher.PayloadsToReturn = payloadsToReturn // batch size of 2 with 1001 block range => 501 bins backFiller = storage.NewStorageBackFiller(mockFetcher, 2) - backFill := make(chan utils.StorageDiff) + backFill := make(chan utils.StorageDiffInput) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( @@ -129,7 +129,7 @@ var _ = Describe("BackFiller", func() { errChan, done) Expect(backFillInitErr).ToNot(HaveOccurred()) - var diffs []utils.StorageDiff + var diffs []utils.StorageDiffInput for { select { case diff := <-backFill: @@ -155,7 +155,7 @@ var _ = Describe("BackFiller", func() { test_data.BlockNumber.Uint64(): errors.New("mock fetcher error"), } backFiller = storage.NewStorageBackFiller(mockFetcher, 1) - backFill := make(chan utils.StorageDiff) + backFill := make(chan utils.StorageDiffInput) done := make(chan bool) errChan := make(chan error) backFillInitErr := backFiller.BackFill( @@ -166,7 +166,7 @@ var _ = Describe("BackFiller", func() { done) Expect(backFillInitErr).ToNot(HaveOccurred()) var numOfErrs int - var diffs []utils.StorageDiff + var diffs []utils.StorageDiffInput for { select { case diff := <-backFill: @@ -193,7 +193,7 @@ var _ = Describe("BackFiller", func() { } mockFetcher.CalledTimes = 0 backFiller = storage.NewStorageBackFiller(mockFetcher, 1) - backFill = make(chan utils.StorageDiff) + backFill = make(chan utils.StorageDiffInput) done = make(chan bool) errChan = make(chan error) backFillInitErr = backFiller.BackFill( @@ -204,7 +204,7 @@ var _ = Describe("BackFiller", func() { done) Expect(backFillInitErr).ToNot(HaveOccurred()) numOfErrs = 0 - diffs = []utils.StorageDiff{} + diffs = []utils.StorageDiffInput{} for { select { case diff := <-backFill: @@ -227,7 +227,7 @@ var _ = Describe("BackFiller", func() { }) }) -func containsDiff(diffs []utils.StorageDiff, diff utils.StorageDiff) bool { +func containsDiff(diffs []utils.StorageDiffInput, diff utils.StorageDiffInput) bool { for _, d := range diffs { if d == diff { return true diff --git a/libraries/shared/storage/storage_queue.go b/libraries/shared/storage/storage_queue.go index c36b9c2d..bb83a905 100644 --- a/libraries/shared/storage/storage_queue.go +++ b/libraries/shared/storage/storage_queue.go @@ -22,9 +22,9 @@ import ( ) type IStorageQueue interface { - Add(diff utils.StorageDiff) error - Delete(id int) error - GetAll() ([]utils.StorageDiff, error) + Add(diff utils.PersistedStorageDiff) error + Delete(id int64) error + GetAll() ([]utils.PersistedStorageDiff, error) } type StorageQueue struct { @@ -35,21 +35,21 @@ func NewStorageQueue(db *postgres.DB) StorageQueue { return StorageQueue{db: db} } -func (queue StorageQueue) Add(diff utils.StorageDiff) error { - _, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract, - block_hash, block_height, storage_key, storage_value) VALUES - ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, diff.HashedAddress.Bytes(), diff.BlockHash.Bytes(), - diff.BlockHeight, diff.StorageKey.Bytes(), diff.StorageValue.Bytes()) +func (queue StorageQueue) Add(diff utils.PersistedStorageDiff) error { + _, err := queue.db.Exec(`INSERT INTO public.queued_storage (diff_id) VALUES + ($1) ON CONFLICT DO NOTHING`, diff.ID) return err } -func (queue StorageQueue) Delete(id int) error { - _, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE id = $1`, id) +func (queue StorageQueue) Delete(diffID int64) error { + _, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE diff_id = $1`, diffID) return err } -func (queue StorageQueue) GetAll() ([]utils.StorageDiff, error) { - var result []utils.StorageDiff - err := queue.db.Select(&result, `SELECT * FROM public.queued_storage`) +func (queue StorageQueue) GetAll() ([]utils.PersistedStorageDiff, error) { + var result []utils.PersistedStorageDiff + err := queue.db.Select(&result, `SELECT storage_diff.id, hashed_address, block_height, block_hash, storage_key, storage_value + FROM public.queued_storage + LEFT JOIN public.storage_diff ON queued_storage.diff_id = storage_diff.id`) return result, err } diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index d347a93b..2978437c 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -23,19 +23,21 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/test_config" ) var _ = Describe("Storage queue", func() { var ( - db *postgres.DB - diff utils.StorageDiff - queue storage.IStorageQueue + db *postgres.DB + diff utils.PersistedStorageDiff + diffRepository repositories.StorageDiffRepository + queue storage.IStorageQueue ) BeforeEach(func() { fakeAddr := "0x123456" - diff = utils.StorageDiff{ + rawDiff := utils.StorageDiffInput{ HashedAddress: utils.HexToKeccak256Hash(fakeAddr), BlockHash: common.HexToHash("0x678901"), BlockHeight: 987, @@ -44,6 +46,10 @@ var _ = Describe("Storage queue", func() { } db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) + diffRepository = repositories.NewStorageDiffRepository(db) + diffID, insertDiffErr := diffRepository.CreateStorageDiff(rawDiff) + Expect(insertDiffErr).NotTo(HaveOccurred()) + diff = utils.ToPersistedDiff(rawDiff, diffID) queue = storage.NewStorageQueue(db) addErr := queue.Add(diff) Expect(addErr).NotTo(HaveOccurred()) @@ -51,8 +57,10 @@ var _ = Describe("Storage queue", func() { Describe("Add", func() { It("adds a storage diff to the db", func() { - var result utils.StorageDiff - getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`) + var result utils.PersistedStorageDiff + getErr := db.Get(&result, `SELECT storage_diff.id, hashed_address, block_hash, block_height, storage_key, storage_value + FROM public.queued_storage + LEFT JOIN public.storage_diff ON queued_storage.diff_id = storage_diff.id`) Expect(getErr).NotTo(HaveOccurred()) Expect(result).To(Equal(diff)) }) @@ -82,14 +90,17 @@ var _ = Describe("Storage queue", func() { It("gets all storage diffs from db", func() { fakeAddr := "0x234567" - diffTwo := utils.StorageDiff{ + diffTwo := utils.StorageDiffInput{ HashedAddress: utils.HexToKeccak256Hash(fakeAddr), BlockHash: common.HexToHash("0x678902"), BlockHeight: 988, StorageKey: common.HexToHash("0x654322"), StorageValue: common.HexToHash("0x198766"), } - addErr := queue.Add(diffTwo) + persistedDiffTwoID, insertDiffErr := diffRepository.CreateStorageDiff(diffTwo) + Expect(insertDiffErr).NotTo(HaveOccurred()) + persistedDiffTwo := utils.ToPersistedDiff(diffTwo, persistedDiffTwoID) + addErr := queue.Add(persistedDiffTwo) Expect(addErr).NotTo(HaveOccurred()) diffs, err := queue.GetAll() diff --git a/libraries/shared/storage/utils/decoder.go b/libraries/shared/storage/utils/decoder.go index 94ea8b16..ff0273fb 100644 --- a/libraries/shared/storage/utils/decoder.go +++ b/libraries/shared/storage/utils/decoder.go @@ -27,7 +27,7 @@ const ( bitsPerByte = 8 ) -func Decode(diff StorageDiff, metadata StorageValueMetadata) (interface{}, error) { +func Decode(diff PersistedStorageDiff, metadata StorageValueMetadata) (interface{}, error) { switch metadata.Type { case Uint256: return decodeInteger(diff.StorageValue.Bytes()), nil diff --git a/libraries/shared/storage/utils/decoder_test.go b/libraries/shared/storage/utils/decoder_test.go index 6650965c..87a602d1 100644 --- a/libraries/shared/storage/utils/decoder_test.go +++ b/libraries/shared/storage/utils/decoder_test.go @@ -29,7 +29,7 @@ import ( var _ = Describe("Storage decoder", func() { It("decodes uint256", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000539") - diff := utils.StorageDiff{StorageValue: fakeInt} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}} metadata := utils.StorageValueMetadata{Type: utils.Uint256} result, err := utils.Decode(diff, metadata) @@ -40,7 +40,7 @@ var _ = Describe("Storage decoder", func() { It("decodes uint128", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000011123") - diff := utils.StorageDiff{StorageValue: fakeInt} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}} metadata := utils.StorageValueMetadata{Type: utils.Uint128} result, err := utils.Decode(diff, metadata) @@ -51,7 +51,7 @@ var _ = Describe("Storage decoder", func() { It("decodes uint48", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000123") - diff := utils.StorageDiff{StorageValue: fakeInt} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeInt}} metadata := utils.StorageValueMetadata{Type: utils.Uint48} result, err := utils.Decode(diff, metadata) @@ -62,7 +62,7 @@ var _ = Describe("Storage decoder", func() { It("decodes address", func() { fakeAddress := common.HexToAddress("0x12345") - diff := utils.StorageDiff{StorageValue: fakeAddress.Hash()} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: fakeAddress.Hash()}} metadata := utils.StorageValueMetadata{Type: utils.Address} result, err := utils.Decode(diff, metadata) @@ -75,7 +75,7 @@ var _ = Describe("Storage decoder", func() { It("decodes uint48 items", func() { //this is a real storage data example packedStorage := common.HexToHash("000000000000000000000000000000000000000000000002a300000000002a30") - diff := utils.StorageDiff{StorageValue: packedStorage} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint48 packedTypes[1] = utils.Uint48 @@ -99,7 +99,7 @@ var _ = Describe("Storage decoder", func() { packedStorageHex := "0000000A5D1AFFFFFFFFFFFE00000009F3C600000002A300000000002A30" packedStorage := common.HexToHash(packedStorageHex) - diff := utils.StorageDiff{StorageValue: packedStorage} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint48 packedTypes[1] = utils.Uint48 @@ -129,7 +129,7 @@ var _ = Describe("Storage decoder", func() { packedStorageHex := "000000038D7EA4C67FF8E502B6730000" + "0000000000000000AB54A98CEB1F0AD2" packedStorage := common.HexToHash(packedStorageHex) - diff := utils.StorageDiff{StorageValue: packedStorage} + diff := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint128 packedTypes[1] = utils.Uint128 @@ -151,7 +151,7 @@ var _ = Describe("Storage decoder", func() { //TODO: replace with real data when available addressHex := "0000000000000000000000000000000000012345" packedStorage := common.HexToHash("00000002a300" + "000000002a30" + addressHex) - row := utils.StorageDiff{StorageValue: packedStorage} + row := utils.PersistedStorageDiff{StorageDiffInput: utils.StorageDiffInput{StorageValue: packedStorage}} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Address packedTypes[1] = utils.Uint48 diff --git a/libraries/shared/storage/utils/diff.go b/libraries/shared/storage/utils/diff.go index 45e8ffa2..331ebf96 100644 --- a/libraries/shared/storage/utils/diff.go +++ b/libraries/shared/storage/utils/diff.go @@ -27,24 +27,28 @@ import ( const ExpectedRowLength = 5 -type StorageDiff struct { - ID int - HashedAddress common.Hash `db:"contract"` +type StorageDiffInput struct { + HashedAddress common.Hash `db:"hashed_address"` BlockHash common.Hash `db:"block_hash"` BlockHeight int `db:"block_height"` StorageKey common.Hash `db:"storage_key"` StorageValue common.Hash `db:"storage_value"` } -func FromParityCsvRow(csvRow []string) (StorageDiff, error) { +type PersistedStorageDiff struct { + StorageDiffInput + ID int64 +} + +func FromParityCsvRow(csvRow []string) (StorageDiffInput, error) { if len(csvRow) != ExpectedRowLength { - return StorageDiff{}, ErrRowMalformed{Length: len(csvRow)} + return StorageDiffInput{}, ErrRowMalformed{Length: len(csvRow)} } height, err := strconv.Atoi(csvRow[2]) if err != nil { - return StorageDiff{}, err + return StorageDiffInput{}, err } - return StorageDiff{ + return StorageDiffInput{ HashedAddress: HexToKeccak256Hash(csvRow[0]), BlockHash: common.HexToHash(csvRow[1]), BlockHeight: height, @@ -53,14 +57,14 @@ func FromParityCsvRow(csvRow []string) (StorageDiff, error) { }, nil } -func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiff, error) { +func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiffInput, error) { var decodedValue []byte err := rlp.DecodeBytes(storage.Value, &decodedValue) if err != nil { - return StorageDiff{}, err + return StorageDiffInput{}, err } - return StorageDiff{ + return StorageDiffInput{ HashedAddress: common.BytesToHash(account.Key), BlockHash: stateDiff.BlockHash, BlockHeight: int(stateDiff.BlockNumber.Int64()), @@ -69,6 +73,13 @@ func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.State }, nil } +func ToPersistedDiff(raw StorageDiffInput, id int64) PersistedStorageDiff { + return PersistedStorageDiff{ + StorageDiffInput: raw, + ID: id, + } +} + func HexToKeccak256Hash(hex string) common.Hash { return crypto.Keccak256Hash(common.FromHex(hex)) } diff --git a/libraries/shared/test_data/generic.go b/libraries/shared/test_data/generic.go index 07be69b8..8a876bca 100644 --- a/libraries/shared/test_data/generic.go +++ b/libraries/shared/test_data/generic.go @@ -33,12 +33,12 @@ var topic0 = "0x" + randomString(64) var GenericTestLog = func() types.Log { return types.Log{ Address: fakeAddress(), - Topics: []common.Hash{common.HexToHash(topic0), fakeHash()}, - Data: hexutil.MustDecode(fakeHash().Hex()), + Topics: []common.Hash{common.HexToHash(topic0), FakeHash()}, + Data: hexutil.MustDecode(FakeHash().Hex()), BlockNumber: uint64(startingBlockNumber), - TxHash: fakeHash(), + TxHash: FakeHash(), TxIndex: uint(rand.Int31()), - BlockHash: fakeHash(), + BlockHash: FakeHash(), Index: uint(rand.Int31()), } } @@ -58,7 +58,7 @@ func fakeAddress() common.Address { return common.HexToAddress("0x" + randomString(40)) } -func fakeHash() common.Hash { +func FakeHash() common.Hash { return common.HexToHash("0x" + randomString(64)) } diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go index f9bfd8fe..bc54431f 100644 --- a/libraries/shared/test_data/statediff.go +++ b/libraries/shared/test_data/statediff.go @@ -150,32 +150,28 @@ var ( StateDiffRlp: MockStateDiff2Bytes, } - CreatedExpectedStorageDiff = utils.StorageDiff{ - ID: 0, + CreatedExpectedStorageDiff = utils.StorageDiffInput{ HashedAddress: common.BytesToHash(ContractLeafKey[:]), BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(SmallStorageValue), } - UpdatedExpectedStorageDiff = utils.StorageDiff{ - ID: 0, + UpdatedExpectedStorageDiff = utils.StorageDiffInput{ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(LargeStorageValue), } - UpdatedExpectedStorageDiff2 = utils.StorageDiff{ - ID: 0, + UpdatedExpectedStorageDiff2 = utils.StorageDiffInput{ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash(BlockHash2), BlockHeight: int(BlockNumber2.Int64()), StorageKey: common.BytesToHash(StorageKey), StorageValue: common.BytesToHash(SmallStorageValue), } - DeletedExpectedStorageDiff = utils.StorageDiff{ - ID: 0, + DeletedExpectedStorageDiff = utils.StorageDiffInput{ HashedAddress: common.BytesToHash(AnotherContractLeafKey[:]), BlockHash: common.HexToHash(BlockHash), BlockHeight: int(BlockNumber.Int64()), diff --git a/libraries/shared/transformer/storage_transformer.go b/libraries/shared/transformer/storage_transformer.go index 698ef841..3ee7a168 100644 --- a/libraries/shared/transformer/storage_transformer.go +++ b/libraries/shared/transformer/storage_transformer.go @@ -23,7 +23,7 @@ import ( ) type StorageTransformer interface { - Execute(diff utils.StorageDiff) error + Execute(diff utils.PersistedStorageDiff) error KeccakContractAddress() common.Hash } diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 813d7aa7..39e93357 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -27,7 +27,9 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" ) type IStorageWatcher interface { @@ -40,8 +42,9 @@ type StorageWatcher struct { db *postgres.DB StorageFetcher fetcher.IStorageFetcher Queue storage.IStorageQueue + StorageDiffRepository datastore.StorageDiffRepository KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer - DiffsChan chan utils.StorageDiff + DiffsChan chan utils.StorageDiffInput ErrsChan chan error BackFillDoneChan chan bool StartingSyncBlockChan chan uint64 @@ -49,15 +52,17 @@ type StorageWatcher struct { func NewStorageWatcher(f fetcher.IStorageFetcher, db *postgres.DB) *StorageWatcher { queue := storage.NewStorageQueue(db) + storageDiffRepository := repositories.NewStorageDiffRepository(db) transformers := make(map[common.Hash]transformer.StorageTransformer) return &StorageWatcher{ db: db, StorageFetcher: f, - DiffsChan: make(chan utils.StorageDiff, fetcher.PayloadChanBufferSize), + DiffsChan: make(chan utils.StorageDiffInput, fetcher.PayloadChanBufferSize), ErrsChan: make(chan error), StartingSyncBlockChan: make(chan uint64), BackFillDoneChan: make(chan bool), Queue: queue, + StorageDiffRepository: storageDiffRepository, KeccakAddressTransformers: transformers, } } @@ -87,8 +92,8 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration start := true for { select { - case fetchErr := <-storageWatcher.ErrsChan: - logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr.Error())) + case err := <-storageWatcher.ErrsChan: + logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", err.Error())) case diff := <-storageWatcher.DiffsChan: if start && backFillOn { storageWatcher.StartingSyncBlockChan <- uint64(diff.BlockHeight - 1) @@ -103,27 +108,37 @@ func (storageWatcher *StorageWatcher) Execute(queueRecheckInterval time.Duration } } -func (storageWatcher *StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { +func (storageWatcher *StorageWatcher) getTransformer(diff utils.PersistedStorageDiff) (transformer.StorageTransformer, bool) { storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] return storageTransformer, ok } -func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { - storageTransformer, ok := storageWatcher.getTransformer(diff) +func (storageWatcher StorageWatcher) processRow(diffInput utils.StorageDiffInput) { + diffID, err := storageWatcher.StorageDiffRepository.CreateStorageDiff(diffInput) + if err != nil { + if err == repositories.ErrDuplicateDiff { + logrus.Warn("ignoring duplicate diff") + return + } + logrus.Warnf("failed to persist storage diff: %s", err.Error()) + // TODO: bail? Should we continue attempting to transform a diff we didn't persist + } + persistedDiff := utils.ToPersistedDiff(diffInput, diffID) + storageTransformer, ok := storageWatcher.getTransformer(persistedDiff) if !ok { - logrus.Debug("ignoring a diff from an unwatched contract") + logrus.Debug("ignoring diff from unwatched contract") return } - executeErr := storageTransformer.Execute(diff) + executeErr := storageTransformer.Execute(persistedDiff) if executeErr != nil { logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) - queueErr := storageWatcher.Queue.Add(diff) + queueErr := storageWatcher.Queue.Add(persistedDiff) if queueErr != nil { logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr)) } return } - logrus.Debugf("Storage diff persisted at block height: %d", diff.BlockHeight) + logrus.Debugf("Storage diff persisted at block height: %d", diffInput.BlockHeight) } func (storageWatcher StorageWatcher) processQueue() { @@ -145,8 +160,8 @@ func (storageWatcher StorageWatcher) processQueue() { } } -func (storageWatcher StorageWatcher) deleteRow(id int) { - deleteErr := storageWatcher.Queue.Delete(id) +func (storageWatcher StorageWatcher) deleteRow(diffID int64) { + deleteErr := storageWatcher.Queue.Delete(diffID) if deleteErr != nil { logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr)) } diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index c2cc44dd..a54b45e6 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -19,8 +19,8 @@ package watcher_test import ( "errors" "io/ioutil" + "math/rand" "os" - "sort" "time" "github.com/ethereum/go-ethereum/common" @@ -33,11 +33,22 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" ) var _ = Describe("Storage Watcher", func() { + var ( + mockFetcher *mocks.StorageFetcher + mockQueue *mocks.MockStorageQueue + mockTransformer *mocks.MockStorageTransformer + storageWatcher *watcher.StorageWatcher + mockStorageDiffRepository *fakes.MockStorageDiffRepository + fakeDiffId = rand.Int63() + hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") + csvDiff utils.StorageDiffInput + ) Describe("AddTransformer", func() { It("adds transformers", func() { fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") @@ -50,22 +61,12 @@ var _ = Describe("Storage Watcher", func() { }) }) Describe("Execute", func() { - var ( - mockFetcher *mocks.StorageFetcher - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - csvDiff utils.StorageDiff - storageWatcher *watcher.StorageWatcher - hashedAddress common.Hash - ) - BeforeEach(func() { - hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") mockFetcher = mocks.NewStorageFetcher() mockQueue = &mocks.MockStorageQueue{} + mockStorageDiffRepository = &fakes.MockStorageDiffRepository{} mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} - csvDiff = utils.StorageDiff{ - ID: 1337, + csvDiff = utils.StorageDiffInput{ HashedAddress: hashedAddress, BlockHash: common.HexToHash("0xfedcba9876543210"), BlockHeight: 0, @@ -79,6 +80,7 @@ var _ = Describe("Storage Watcher", func() { storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + storageWatcher.StorageDiffRepository = mockStorageDiffRepository tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) @@ -94,20 +96,69 @@ var _ = Describe("Storage Watcher", func() { }) Describe("transforming new storage diffs from csv", func() { + var fakePersistedDiff utils.PersistedStorageDiff BeforeEach(func() { - mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} + mockFetcher.DiffsToReturn = []utils.StorageDiffInput{csvDiff} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) + fakePersistedDiff = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: utils.StorageDiffInput{ + HashedAddress: csvDiff.HashedAddress, + BlockHash: csvDiff.BlockHash, + BlockHeight: csvDiff.BlockHeight, + StorageValue: csvDiff.StorageValue, + StorageKey: csvDiff.StorageKey, + }, + } + mockStorageDiffRepository.CreateReturnID = fakeDiffId + storageWatcher.StorageDiffRepository = mockStorageDiffRepository + }) + + It("writes raw diff before processing", func(done Done) { + go storageWatcher.Execute(time.Hour, false) + + Eventually(func() []utils.StorageDiffInput { + return mockStorageDiffRepository.CreatePassedInputs + }).Should(ContainElement(csvDiff)) + close(done) + }) + + It("discards raw diff if it's already been persisted", func(done Done) { + mockStorageDiffRepository.CreateReturnError = repositories.ErrDuplicateDiff + + go storageWatcher.Execute(time.Hour, false) + + Consistently(func() []utils.PersistedStorageDiff { + return mockTransformer.PassedDiffs + }).Should(BeZero()) + close(done) + }) + + It("logs error if persisting raw diff fails", func(done Done) { + mockStorageDiffRepository.CreateReturnError = fakes.FakeError + tempFile, fileErr := ioutil.TempFile("", "log") + Expect(fileErr).NotTo(HaveOccurred()) + defer os.Remove(tempFile.Name()) + logrus.SetOutput(tempFile) + + go storageWatcher.Execute(time.Hour, false) + + Eventually(func() (string, error) { + logContent, err := ioutil.ReadFile(tempFile.Name()) + return string(logContent), err + }).Should(ContainSubstring(fakes.FakeError.Error())) + close(done) }) It("executes transformer for recognized storage diff", func(done Done) { go storageWatcher.Execute(time.Hour, false) - Eventually(func() map[int]utils.StorageDiff { + Eventually(func() []utils.PersistedStorageDiff { return mockTransformer.PassedDiffs - }).Should(Equal(map[int]utils.StorageDiff{ - csvDiff.ID: csvDiff, + }).Should(Equal([]utils.PersistedStorageDiff{ + fakePersistedDiff, })) close(done) }) @@ -120,12 +171,12 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) - Eventually(func() utils.StorageDiff { + Eventually(func() utils.PersistedStorageDiff { if len(mockQueue.AddPassedDiffs) > 0 { - return mockQueue.AddPassedDiffs[csvDiff.ID] + return mockQueue.AddPassedDiffs[0] } - return utils.StorageDiff{} - }).Should(Equal(csvDiff)) + return utils.PersistedStorageDiff{} + }).Should(Equal(fakePersistedDiff)) close(done) }) @@ -151,10 +202,19 @@ var _ = Describe("Storage Watcher", func() { }) Describe("transforming queued storage diffs", func() { + var queuedDiff utils.PersistedStorageDiff BeforeEach(func() { - mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ - csvDiff.ID: csvDiff, + queuedDiff = utils.PersistedStorageDiff{ + ID: 1337, + StorageDiffInput: utils.StorageDiffInput{ + HashedAddress: hashedAddress, + BlockHash: test_data.FakeHash(), + BlockHeight: rand.Int(), + StorageKey: test_data.FakeHash(), + StorageValue: test_data.FakeHash(), + }, } + mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{queuedDiff} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) @@ -163,24 +223,24 @@ var _ = Describe("Storage Watcher", func() { It("executes transformer for storage diff", func(done Done) { go storageWatcher.Execute(time.Nanosecond, false) - Eventually(func() utils.StorageDiff { + Eventually(func() utils.PersistedStorageDiff { if len(mockTransformer.PassedDiffs) > 0 { - return mockTransformer.PassedDiffs[csvDiff.ID] + return mockTransformer.PassedDiffs[0] } - return utils.StorageDiff{} - }).Should(Equal(csvDiff)) + return utils.PersistedStorageDiff{} + }).Should(Equal(queuedDiff)) close(done) }) It("deletes diff from queue if transformer execution successful", func(done Done) { go storageWatcher.Execute(time.Nanosecond, false) - Eventually(func() int { + Eventually(func() int64 { if len(mockQueue.DeletePassedIds) > 0 { return mockQueue.DeletePassedIds[0] } return 0 - }).Should(Equal(csvDiff.ID)) + }).Should(Equal(queuedDiff.ID)) close(done) }) @@ -201,17 +261,15 @@ var _ = Describe("Storage Watcher", func() { }) It("deletes obsolete diff from queue if contract not recognized", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - ID: csvDiff.ID + 1, - HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ - obsoleteDiff.ID: obsoleteDiff, + obsoleteDiff := utils.PersistedStorageDiff{ + ID: queuedDiff.ID + 1, + StorageDiffInput: utils.StorageDiffInput{HashedAddress: test_data.FakeHash()}, } + mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{obsoleteDiff} go storageWatcher.Execute(time.Nanosecond, false) - Eventually(func() int { + Eventually(func() int64 { if len(mockQueue.DeletePassedIds) > 0 { return mockQueue.DeletePassedIds[0] } @@ -221,13 +279,11 @@ var _ = Describe("Storage Watcher", func() { }) It("logs error if deleting obsolete diff fails", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - ID: csvDiff.ID + 1, - HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ - obsoleteDiff.ID: obsoleteDiff, + obsoleteDiff := utils.PersistedStorageDiff{ + ID: queuedDiff.ID + 1, + StorageDiffInput: utils.StorageDiffInput{HashedAddress: test_data.FakeHash()}, } + mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{obsoleteDiff} mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) @@ -247,27 +303,39 @@ var _ = Describe("Storage Watcher", func() { Describe("BackFill", func() { var ( - mockFetcher *mocks.StorageFetcher - mockBackFiller *mocks.BackFiller - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - mockTransformer2 *mocks.MockStorageTransformer - mockTransformer3 *mocks.MockStorageTransformer - csvDiff utils.StorageDiff - storageWatcher *watcher.StorageWatcher - hashedAddress common.Hash - createdDiff, updatedDiff1, deletedDiff, updatedDiff2 utils.StorageDiff + mockBackFiller *mocks.BackFiller + mockTransformer2 *mocks.MockStorageTransformer + mockTransformer3 *mocks.MockStorageTransformer + createdPersistedDiff = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: test_data.CreatedExpectedStorageDiff, + } + updatedPersistedDiff1 = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: test_data.UpdatedExpectedStorageDiff, + } + deletedPersistedDiff = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: test_data.DeletedExpectedStorageDiff, + } + updatedPersistedDiff2 = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: test_data.UpdatedExpectedStorageDiff2, + } + csvDiff = utils.StorageDiffInput{ + HashedAddress: hashedAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: int(test_data.BlockNumber2.Int64()) + 1, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), + } + csvPersistedDiff = utils.PersistedStorageDiff{ + ID: fakeDiffId, + StorageDiffInput: csvDiff, + } ) BeforeEach(func() { - createdDiff = test_data.CreatedExpectedStorageDiff - createdDiff.ID = 1333 - updatedDiff1 = test_data.UpdatedExpectedStorageDiff - updatedDiff1.ID = 1334 - deletedDiff = test_data.DeletedExpectedStorageDiff - deletedDiff.ID = 1335 - updatedDiff2 = test_data.UpdatedExpectedStorageDiff2 - updatedDiff2.ID = 1336 mockBackFiller = new(mocks.BackFiller) hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") mockFetcher = mocks.NewStorageFetcher() @@ -275,26 +343,19 @@ var _ = Describe("Storage Watcher", func() { mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} mockTransformer2 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.ContractLeafKey[:])} mockTransformer3 = &mocks.MockStorageTransformer{KeccakOfAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:])} - csvDiff = utils.StorageDiff{ - ID: 1337, - HashedAddress: hashedAddress, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: int(test_data.BlockNumber2.Int64()) + 1, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } + mockStorageDiffRepository = &fakes.MockStorageDiffRepository{} }) Describe("transforming streamed and backfilled storage diffs", func() { BeforeEach(func() { - mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} - mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiff{ - createdDiff, - updatedDiff1, - deletedDiff, - updatedDiff2, + mockFetcher.DiffsToReturn = []utils.StorageDiffInput{csvDiff} + mockBackFiller.SetStorageDiffsToReturn([]utils.StorageDiffInput{ + test_data.CreatedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff, + test_data.DeletedExpectedStorageDiff, + test_data.UpdatedExpectedStorageDiff2, }) - mockQueue.DiffsToReturn = map[int]utils.StorageDiff{} + mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{ @@ -302,6 +363,9 @@ var _ = Describe("Storage Watcher", func() { mockTransformer2.FakeTransformerInitializer, mockTransformer3.FakeTransformerInitializer, }) + mockStorageDiffRepository.CreateReturnID = fakeDiffId + + storageWatcher.StorageDiffRepository = mockStorageDiffRepository }) It("executes transformer for storage diffs received from fetcher and backfiller", func(done Done) { @@ -311,20 +375,17 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() int { return len(mockTransformer.PassedDiffs) }).Should(Equal(1)) - Eventually(func() int { return len(mockTransformer2.PassedDiffs) }).Should(Equal(1)) - Eventually(func() int { return len(mockTransformer3.PassedDiffs) }).Should(Equal(3)) + Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) - Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) - Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) - Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff)) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff)) + Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2)) close(done) }) @@ -346,23 +407,17 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) - Eventually(func() map[int]utils.StorageDiff { + Eventually(func() []utils.PersistedStorageDiff { if len(mockQueue.AddPassedDiffs) > 2 { return mockQueue.AddPassedDiffs } - return map[int]utils.StorageDiff{} - }).Should(Equal(map[int]utils.StorageDiff{ - updatedDiff1.ID: updatedDiff1, - deletedDiff.ID: deletedDiff, - updatedDiff2.ID: updatedDiff2, - })) + return []utils.PersistedStorageDiff{} + }).Should(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2)) Expect(mockBackFiller.PassedEndingBlock).To(Equal(uint64(test_data.BlockNumber2.Int64()))) - Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) - Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) - Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff)) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff)) + Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2)) close(done) }) @@ -417,12 +472,12 @@ var _ = Describe("Storage Watcher", func() { Describe("transforms queued storage diffs", func() { BeforeEach(func() { - mockQueue.DiffsToReturn = map[int]utils.StorageDiff{ - csvDiff.ID: csvDiff, - createdDiff.ID: createdDiff, - updatedDiff1.ID: updatedDiff1, - deletedDiff.ID: deletedDiff, - updatedDiff2.ID: updatedDiff2, + mockQueue.DiffsToReturn = []utils.PersistedStorageDiff{ + csvPersistedDiff, + createdPersistedDiff, + updatedPersistedDiff1, + deletedPersistedDiff, + updatedPersistedDiff2, } storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue @@ -449,29 +504,25 @@ var _ = Describe("Storage Watcher", func() { Eventually(func() bool { return mockQueue.GetAllCalled }).Should(BeTrue()) - sortedExpectedIDs := []int{ - csvDiff.ID, - createdDiff.ID, - updatedDiff1.ID, - deletedDiff.ID, - updatedDiff2.ID, + expectedIDs := []int64{ + fakeDiffId, + fakeDiffId, + fakeDiffId, + fakeDiffId, + fakeDiffId, } - sort.Ints(sortedExpectedIDs) - Eventually(func() []int { + Eventually(func() []int64 { if len(mockQueue.DeletePassedIds) > 4 { - sort.Ints(mockQueue.DeletePassedIds) return mockQueue.DeletePassedIds } - return []int{} - }).Should(Equal(sortedExpectedIDs)) + return []int64{} + }).Should(Equal(expectedIDs)) Expect(mockQueue.AddCalled).To(Not(BeTrue())) Expect(len(mockQueue.DiffsToReturn)).To(Equal(0)) - Expect(mockTransformer.PassedDiffs[csvDiff.ID]).To(Equal(csvDiff)) - Expect(mockTransformer2.PassedDiffs[createdDiff.ID]).To(Equal(createdDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff1.ID]).To(Equal(updatedDiff1)) - Expect(mockTransformer3.PassedDiffs[deletedDiff.ID]).To(Equal(deletedDiff)) - Expect(mockTransformer3.PassedDiffs[updatedDiff2.ID]).To(Equal(updatedDiff2)) + Expect(mockTransformer.PassedDiffs[0]).To(Equal(csvPersistedDiff)) + Expect(mockTransformer2.PassedDiffs[0]).To(Equal(createdPersistedDiff)) + Expect(mockTransformer3.PassedDiffs).To(ConsistOf(updatedPersistedDiff1, deletedPersistedDiff, updatedPersistedDiff2)) close(done) }) }) diff --git a/pkg/datastore/postgres/repositories/storage_diff_repository.go b/pkg/datastore/postgres/repositories/storage_diff_repository.go new file mode 100644 index 00000000..c5c76e46 --- /dev/null +++ b/pkg/datastore/postgres/repositories/storage_diff_repository.go @@ -0,0 +1,47 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repositories + +import ( + "database/sql" + + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +var ErrDuplicateDiff = sql.ErrNoRows + +type StorageDiffRepository struct { + db *postgres.DB +} + +func NewStorageDiffRepository(db *postgres.DB) StorageDiffRepository { + return StorageDiffRepository{db: db} +} + +func (repository StorageDiffRepository) CreateStorageDiff(input utils.StorageDiffInput) (int64, error) { + var storageDiffID int64 + row := repository.db.QueryRowx(`INSERT INTO public.storage_diff + (hashed_address, block_height, block_hash, storage_key, storage_value) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING RETURNING id`, input.HashedAddress.Bytes(), input.BlockHeight, input.BlockHash.Bytes(), + input.StorageKey.Bytes(), input.StorageValue.Bytes()) + err := row.Scan(&storageDiffID) + if err != nil && err == sql.ErrNoRows { + return 0, ErrDuplicateDiff + } + return storageDiffID, err +} diff --git a/pkg/datastore/postgres/repositories/storage_diff_repository_test.go b/pkg/datastore/postgres/repositories/storage_diff_repository_test.go new file mode 100644 index 00000000..d6577dec --- /dev/null +++ b/pkg/datastore/postgres/repositories/storage_diff_repository_test.go @@ -0,0 +1,83 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repositories_test + +import ( + "database/sql" + "math/rand" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Storage diffs repository", func() { + var ( + db *postgres.DB + repo repositories.StorageDiffRepository + fakeStorageDiff utils.StorageDiffInput + ) + + BeforeEach(func() { + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + repo = repositories.NewStorageDiffRepository(db) + fakeStorageDiff = utils.StorageDiffInput{ + HashedAddress: test_data.FakeHash(), + BlockHash: test_data.FakeHash(), + BlockHeight: rand.Int(), + StorageKey: test_data.FakeHash(), + StorageValue: test_data.FakeHash(), + } + }) + + Describe("CreateStorageDiff", func() { + It("adds a storage diff to the db, returning id", func() { + id, createErr := repo.CreateStorageDiff(fakeStorageDiff) + + Expect(createErr).NotTo(HaveOccurred()) + Expect(id).NotTo(BeZero()) + var persisted utils.PersistedStorageDiff + getErr := db.Get(&persisted, `SELECT * FROM public.storage_diff`) + Expect(getErr).NotTo(HaveOccurred()) + Expect(persisted.ID).To(Equal(id)) + Expect(persisted.HashedAddress).To(Equal(fakeStorageDiff.HashedAddress)) + Expect(persisted.BlockHash).To(Equal(fakeStorageDiff.BlockHash)) + Expect(persisted.BlockHeight).To(Equal(fakeStorageDiff.BlockHeight)) + Expect(persisted.StorageKey).To(Equal(fakeStorageDiff.StorageKey)) + Expect(persisted.StorageValue).To(Equal(fakeStorageDiff.StorageValue)) + }) + + It("does not duplicate storage diffs", func() { + _, createErr := repo.CreateStorageDiff(fakeStorageDiff) + Expect(createErr).NotTo(HaveOccurred()) + + _, createTwoErr := repo.CreateStorageDiff(fakeStorageDiff) + Expect(createTwoErr).To(HaveOccurred()) + Expect(createTwoErr).To(MatchError(sql.ErrNoRows)) + + var count int + getErr := db.Get(&count, `SELECT count(*) FROM public.storage_diff`) + Expect(getErr).NotTo(HaveOccurred()) + Expect(count).To(Equal(1)) + }) + }) +}) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index a9431c81..51d4b8a1 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -19,6 +19,7 @@ package datastore import ( "github.com/ethereum/go-ethereum/core/types" "github.com/jmoiron/sqlx" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/filters" ) @@ -83,6 +84,10 @@ type HeaderSyncReceiptRepository interface { CreateFullSyncReceiptInTx(blockID int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) } +type StorageDiffRepository interface { + CreateStorageDiff(input utils.StorageDiffInput) (int64, error) +} + type WatchedEventRepository interface { GetWatchedEvents(name string) ([]*core.WatchedEvent, error) } diff --git a/pkg/fakes/mock_storage_diff_repository.go b/pkg/fakes/mock_storage_diff_repository.go new file mode 100644 index 00000000..642286aa --- /dev/null +++ b/pkg/fakes/mock_storage_diff_repository.go @@ -0,0 +1,32 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package fakes + +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" +) + +type MockStorageDiffRepository struct { + CreatePassedInputs []utils.StorageDiffInput + CreateReturnID int64 + CreateReturnError error +} + +func (repository *MockStorageDiffRepository) CreateStorageDiff(input utils.StorageDiffInput) (int64, error) { + repository.CreatePassedInputs = append(repository.CreatePassedInputs, input) + return repository.CreateReturnID, repository.CreateReturnError +} diff --git a/test_config/test_config.go b/test_config/test_config.go index f2dac3fa..fc8dc1fc 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -101,6 +101,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM headers") db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM queued_storage") + db.MustExec("DELETE FROM storage_diff") db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_logs") }