diff --git a/db/migrations/20190219134901_create_queued_storage.sql b/db/migrations/20190219134901_create_queued_storage.sql new file mode 100644 index 00000000..bd5e468e --- /dev/null +++ b/db/migrations/20190219134901_create_queued_storage.sql @@ -0,0 +1,12 @@ +-- +goose Up +CREATE TABLE public.queued_storage ( + id SERIAL PRIMARY KEY, + block_height BIGINT, + block_hash BYTEA, + contract BYTEA, + storage_key BYTEA, + storage_value BYTEA +); + +-- +goose Down +DROP TABLE public.queued_storage; diff --git a/db/schema.sql b/db/schema.sql index 6cbdcd76..dd46db2f 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 10.5 --- Dumped by pg_dump version 10.5 +-- Dumped from database version 10.6 +-- Dumped by pg_dump version 10.6 SET statement_timeout = 0; SET lock_timeout = 0; @@ -2643,6 +2643,40 @@ CREATE SEQUENCE public.nodes_id_seq ALTER SEQUENCE public.nodes_id_seq OWNED BY public.eth_nodes.id; +-- +-- Name: queued_storage; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.queued_storage ( + id integer NOT NULL, + block_height bigint, + block_hash bytea, + contract bytea, + storage_key bytea, + storage_value bytea +); + + +-- +-- Name: queued_storage_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.queued_storage_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: queued_storage_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.queued_storage_id_seq OWNED BY public.queued_storage.id; + + -- -- Name: receipts; Type: TABLE; Schema: public; Owner: - -- @@ -3321,6 +3355,13 @@ ALTER TABLE ONLY public.log_filters ALTER COLUMN id SET DEFAULT nextval('public. ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass); +-- +-- Name: queued_storage id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.queued_storage ALTER COLUMN id SET DEFAULT nextval('public.queued_storage_id_seq'::regclass); + + -- -- Name: receipts id; Type: DEFAULT; Schema: public; Owner: - -- @@ -4197,6 +4238,14 @@ ALTER TABLE ONLY public.eth_nodes ADD CONSTRAINT nodes_pkey PRIMARY KEY (id); +-- +-- Name: queued_storage queued_storage_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.queued_storage + ADD CONSTRAINT queued_storage_pkey PRIMARY KEY (id); + + -- -- Name: receipts receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- diff --git a/libraries/shared/storage_queue.go b/libraries/shared/storage_queue.go new file mode 100644 index 00000000..485c1a76 --- /dev/null +++ b/libraries/shared/storage_queue.go @@ -0,0 +1,26 @@ +package shared + +import ( + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared" +) + +type IStorageQueue interface { + Add(row shared.StorageDiffRow) error +} + +type StorageQueue struct { + db *postgres.DB +} + +func NewStorageQueue(db *postgres.DB) StorageQueue { + return StorageQueue{db: db} +} + +func (queue StorageQueue) Add(row shared.StorageDiffRow) error { + _, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract, + block_hash, block_height, storage_key, storage_value) VALUES + ($1, $2, $3, $4, $5)`, row.Contract.Bytes(), row.BlockHash.Bytes(), + row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes()) + return err +} diff --git a/libraries/shared/storage_queue_test.go b/libraries/shared/storage_queue_test.go new file mode 100644 index 00000000..c1d7349f --- /dev/null +++ b/libraries/shared/storage_queue_test.go @@ -0,0 +1,32 @@ +package shared_test + +import ( + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + shared2 "github.com/vulcanize/vulcanizedb/libraries/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Storage queue", func() { + It("adds a storage row to the db", func() { + row := shared.StorageDiffRow{ + Contract: common.HexToAddress("0x123456"), + BlockHash: common.HexToHash("0x678901"), + BlockHeight: 987, + StorageKey: common.HexToHash("0x654321"), + StorageValue: common.HexToHash("0x198765"), + } + db := test_config.NewTestDB(test_config.NewTestNode()) + queue := shared2.NewStorageQueue(db) + + addErr := queue.Add(row) + + Expect(addErr).NotTo(HaveOccurred()) + var result shared.StorageDiffRow + getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`) + Expect(getErr).NotTo(HaveOccurred()) + Expect(result).To(Equal(row)) + }) +}) diff --git a/libraries/shared/storage_watcher.go b/libraries/shared/storage_watcher.go index 50e042b2..49dea19e 100644 --- a/libraries/shared/storage_watcher.go +++ b/libraries/shared/storage_watcher.go @@ -22,6 +22,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/storage" "github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared" + "reflect" "strings" "github.com/vulcanize/vulcanizedb/pkg/fs" @@ -30,14 +31,17 @@ import ( type StorageWatcher struct { db *postgres.DB tailer fs.Tailer + Queue IStorageQueue Transformers map[common.Address]storage.Transformer } func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher { transformers := make(map[common.Address]storage.Transformer) + queue := NewStorageQueue(db) return StorageWatcher{ db: db, tailer: tailer, + Queue: queue, Transformers: transformers, } } @@ -66,9 +70,20 @@ func (watcher StorageWatcher) Execute() error { } executeErr := transformer.Execute(row) if executeErr != nil { - logrus.Warn(executeErr.Error()) + if isKeyNotFound(executeErr) { + queueErr := watcher.Queue.Add(row) + if queueErr != nil { + logrus.Warn(queueErr.Error()) + } + } else { + logrus.Warn(executeErr.Error()) + } continue } } return nil } + +func isKeyNotFound(executeErr error) bool { + return reflect.TypeOf(executeErr) == reflect.TypeOf(shared.ErrStorageKeyNotFound{}) +} diff --git a/libraries/shared/storage_watcher_test.go b/libraries/shared/storage_watcher_test.go index 3f7b13d0..e0c67740 100644 --- a/libraries/shared/storage_watcher_test.go +++ b/libraries/shared/storage_watcher_test.go @@ -73,11 +73,8 @@ var _ = Describe("Storage Watcher", func() { It("logs error if no transformer can parse storage row", func() { mockTailer := fakes.NewMockTailer() - line := &tail.Line{ - Text: "12345,block_hash,123,storage_key,storage_value", - Time: time.Time{}, - Err: nil, - } + address := common.HexToAddress("0x12345") + line := getFakeLine(address.Bytes()) watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) tempFile, err := ioutil.TempFile("", "log") defer os.Remove(tempFile.Name()) @@ -88,22 +85,14 @@ var _ = Describe("Storage Watcher", func() { Expect(err).NotTo(HaveOccurred()) logContent, readErr := ioutil.ReadFile(tempFile.Name()) Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: common.HexToAddress("0x12345").Hex()}.Error())) + Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: address.Hex()}.Error())) }, watcher, mockTailer, []*tail.Line{line}) }) It("executes transformer with storage row", func() { address := []byte{1, 2, 3} - blockHash := []byte{4, 5, 6} - blockHeight := int64(789) - storageKey := []byte{9, 8, 7} - storageValue := []byte{6, 5, 4} + line := getFakeLine(address) mockTailer := fakes.NewMockTailer() - line := &tail.Line{ - Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), - Time: time.Time{}, - Err: nil, - } watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)} watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) @@ -116,40 +105,76 @@ var _ = Describe("Storage Watcher", func() { }, watcher, mockTailer, []*tail.Line{line}) }) - It("logs error if executing transformer fails", func() { - address := []byte{1, 2, 3} - blockHash := []byte{4, 5, 6} - blockHeight := int64(789) - storageKey := []byte{9, 8, 7} - storageValue := []byte{6, 5, 4} - mockTailer := fakes.NewMockTailer() - line := &tail.Line{ - Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), - Time: time.Time{}, - Err: nil, - } - watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) - executionError := errors.New("storage watcher failed attempting to execute transformer") - fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError} - watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - tempFile, err := ioutil.TempFile("", "log") - defer os.Remove(tempFile.Name()) - Expect(err).NotTo(HaveOccurred()) - logrus.SetOutput(tempFile) + Describe("when executing transformer fails", func() { + It("queues row when error is storage key not found", func() { + address := []byte{1, 2, 3} + line := getFakeLine(address) + mockTailer := fakes.NewMockTailer() + watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) + mockQueue := &mocks.MockStorageQueue{} + watcher.Queue = mockQueue + keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"} + fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} + watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - assert(func(err error) { + assert(func(err error) { + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + }, watcher, mockTailer, []*tail.Line{line}) + }) + + It("logs error if queuing row fails", func() { + address := []byte{1, 2, 3} + line := getFakeLine(address) + mockTailer := fakes.NewMockTailer() + watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) + mockQueue := &mocks.MockStorageQueue{} + mockQueue.AddError = fakes.FakeError + watcher.Queue = mockQueue + keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"} + fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError} + watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + tempFile, err := ioutil.TempFile("", "log") + defer os.Remove(tempFile.Name()) Expect(err).NotTo(HaveOccurred()) - logContent, readErr := ioutil.ReadFile(tempFile.Name()) - Expect(readErr).NotTo(HaveOccurred()) - Expect(string(logContent)).To(ContainSubstring(executionError.Error())) - }, watcher, mockTailer, []*tail.Line{line}) + logrus.SetOutput(tempFile) + + assert(func(err error) { + Expect(err).NotTo(HaveOccurred()) + Expect(mockQueue.AddCalled).To(BeTrue()) + logContent, readErr := ioutil.ReadFile(tempFile.Name()) + Expect(readErr).NotTo(HaveOccurred()) + Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error())) + }, watcher, mockTailer, []*tail.Line{line}) + }) + + It("logs any other error", func() { + address := []byte{1, 2, 3} + line := getFakeLine(address) + mockTailer := fakes.NewMockTailer() + watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) + executionError := errors.New("storage watcher failed attempting to execute transformer") + fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError} + watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + tempFile, err := ioutil.TempFile("", "log") + defer os.Remove(tempFile.Name()) + Expect(err).NotTo(HaveOccurred()) + logrus.SetOutput(tempFile) + + assert(func(err error) { + Expect(err).NotTo(HaveOccurred()) + logContent, readErr := ioutil.ReadFile(tempFile.Name()) + Expect(readErr).NotTo(HaveOccurred()) + Expect(string(logContent)).To(ContainSubstring(executionError.Error())) + }, watcher, mockTailer, []*tail.Line{line}) + }) }) }) func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) { errs := make(chan error, 1) done := make(chan bool, 1) - go execute(watcher, mockTailer, errs, done) + go execute(watcher, errs, done) for _, line := range lines { mockTailer.Lines <- line } @@ -165,7 +190,7 @@ func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer } } -func execute(watcher shared.StorageWatcher, tailer *fakes.MockTailer, errs chan error, done chan bool) { +func execute(watcher shared.StorageWatcher, errs chan error, done chan bool) { err := watcher.Execute() if err != nil { errs <- err @@ -173,3 +198,15 @@ func execute(watcher shared.StorageWatcher, tailer *fakes.MockTailer, errs chan done <- true } } + +func getFakeLine(address []byte) *tail.Line { + blockHash := []byte{4, 5, 6} + blockHeight := int64(789) + storageKey := []byte{9, 8, 7} + storageValue := []byte{6, 5, 4} + return &tail.Line{ + Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), + Time: time.Time{}, + Err: nil, + } +} diff --git a/pkg/transformers/storage_diffs/shared/decoder.go b/pkg/transformers/storage_diffs/shared/decoder.go index 7591f8c0..2fcc39ef 100644 --- a/pkg/transformers/storage_diffs/shared/decoder.go +++ b/pkg/transformers/storage_diffs/shared/decoder.go @@ -17,6 +17,7 @@ package shared import ( + "fmt" "github.com/ethereum/go-ethereum/common" "math/big" ) @@ -30,7 +31,7 @@ func Decode(row StorageDiffRow, metadata StorageValueMetadata) (interface{}, err case Bytes32: return row.StorageValue.Hex(), nil default: - return nil, ErrTypeNotFound{} + panic(fmt.Sprintf("can't decode unknown type: %d", metadata.Type)) } } diff --git a/pkg/transformers/storage_diffs/shared/decoder_test.go b/pkg/transformers/storage_diffs/shared/decoder_test.go index 710e1e16..9e1e3a6c 100644 --- a/pkg/transformers/storage_diffs/shared/decoder_test.go +++ b/pkg/transformers/storage_diffs/shared/decoder_test.go @@ -46,13 +46,4 @@ var _ = Describe("Storage decoder", func() { Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(fakeAddress.Hex())) }) - - It("returns error if attempting to decode unknown type", func() { - metadata := shared.StorageValueMetadata{Type: 100} - - _, err := shared.Decode(shared.StorageDiffRow{}, metadata) - - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(shared.ErrTypeNotFound{})) - }) }) diff --git a/pkg/transformers/storage_diffs/shared/errors.go b/pkg/transformers/storage_diffs/shared/errors.go index 29118a63..b33fc3ee 100644 --- a/pkg/transformers/storage_diffs/shared/errors.go +++ b/pkg/transformers/storage_diffs/shared/errors.go @@ -28,16 +28,6 @@ func (e ErrContractNotFound) Error() string { return fmt.Sprintf("transformer not found for contract: %s", e.Contract) } -type ErrHeaderMismatch struct { - BlockHeight int - DbHash string - DiffHash string -} - -func (e ErrHeaderMismatch) Error() string { - return fmt.Sprintf("header hash in row does not match db at height %d - row: %s, db: %s", e.BlockHeight, e.DbHash, e.DiffHash) -} - type ErrMetadataMalformed struct { MissingData Key } @@ -61,11 +51,3 @@ type ErrStorageKeyNotFound struct { func (e ErrStorageKeyNotFound) Error() string { return fmt.Sprintf("unknown storage key: %s", e.Key) } - -type ErrTypeNotFound struct { - Type int -} - -func (e ErrTypeNotFound) Error() string { - return fmt.Sprintf("no decoder for type: %d", e.Type) -} diff --git a/pkg/transformers/storage_diffs/shared/row.go b/pkg/transformers/storage_diffs/shared/row.go index 3de11d41..3f981def 100644 --- a/pkg/transformers/storage_diffs/shared/row.go +++ b/pkg/transformers/storage_diffs/shared/row.go @@ -25,10 +25,10 @@ const ExpectedRowLength = 5 type StorageDiffRow struct { Contract common.Address - BlockHash common.Hash - BlockHeight int - StorageKey common.Hash - StorageValue common.Hash + BlockHash common.Hash `db:"block_hash"` + BlockHeight int `db:"block_height"` + StorageKey common.Hash `db:"storage_key"` + StorageValue common.Hash `db:"storage_value"` } func FromStrings(csvRow []string) (StorageDiffRow, error) { diff --git a/pkg/transformers/test_data/mocks/storage_queue.go b/pkg/transformers/test_data/mocks/storage_queue.go new file mode 100644 index 00000000..6a4cda74 --- /dev/null +++ b/pkg/transformers/test_data/mocks/storage_queue.go @@ -0,0 +1,15 @@ +package mocks + +import ( + "github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared" +) + +type MockStorageQueue struct { + AddCalled bool + AddError error +} + +func (queue *MockStorageQueue) Add(row shared.StorageDiffRow) error { + queue.AddCalled = true + return queue.AddError +}