Merge pull request #14 from vulcanize/vdb-354-queue-unrecognized-diffs

(VDB-354) queue unrecognized diffs
This commit is contained in:
Rob Mulholand 2019-02-20 16:00:28 -06:00 committed by GitHub
commit 33c271b584
11 changed files with 237 additions and 77 deletions

View File

@ -0,0 +1,12 @@
-- +goose Up
CREATE TABLE public.queued_storage (
id SERIAL PRIMARY KEY,
block_height BIGINT,
block_hash BYTEA,
contract BYTEA,
storage_key BYTEA,
storage_value BYTEA
);
-- +goose Down
DROP TABLE public.queued_storage;

View File

@ -2,8 +2,8 @@
-- PostgreSQL database dump -- PostgreSQL database dump
-- --
-- Dumped from database version 10.5 -- Dumped from database version 10.6
-- Dumped by pg_dump version 10.5 -- Dumped by pg_dump version 10.6
SET statement_timeout = 0; SET statement_timeout = 0;
SET lock_timeout = 0; SET lock_timeout = 0;
@ -2643,6 +2643,40 @@ CREATE SEQUENCE public.nodes_id_seq
ALTER SEQUENCE public.nodes_id_seq OWNED BY public.eth_nodes.id; ALTER SEQUENCE public.nodes_id_seq OWNED BY public.eth_nodes.id;
--
-- Name: queued_storage; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.queued_storage (
id integer NOT NULL,
block_height bigint,
block_hash bytea,
contract bytea,
storage_key bytea,
storage_value bytea
);
--
-- Name: queued_storage_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.queued_storage_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: queued_storage_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.queued_storage_id_seq OWNED BY public.queued_storage.id;
-- --
-- Name: receipts; Type: TABLE; Schema: public; Owner: - -- Name: receipts; Type: TABLE; Schema: public; Owner: -
-- --
@ -3321,6 +3355,13 @@ ALTER TABLE ONLY public.log_filters ALTER COLUMN id SET DEFAULT nextval('public.
ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass); ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass);
--
-- Name: queued_storage id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.queued_storage ALTER COLUMN id SET DEFAULT nextval('public.queued_storage_id_seq'::regclass);
-- --
-- Name: receipts id; Type: DEFAULT; Schema: public; Owner: - -- Name: receipts id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -4197,6 +4238,14 @@ ALTER TABLE ONLY public.eth_nodes
ADD CONSTRAINT nodes_pkey PRIMARY KEY (id); ADD CONSTRAINT nodes_pkey PRIMARY KEY (id);
--
-- Name: queued_storage queued_storage_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.queued_storage
ADD CONSTRAINT queued_storage_pkey PRIMARY KEY (id);
-- --
-- Name: receipts receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- Name: receipts receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -0,0 +1,26 @@
package shared
import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
)
type IStorageQueue interface {
Add(row shared.StorageDiffRow) error
}
type StorageQueue struct {
db *postgres.DB
}
func NewStorageQueue(db *postgres.DB) StorageQueue {
return StorageQueue{db: db}
}
func (queue StorageQueue) Add(row shared.StorageDiffRow) error {
_, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract,
block_hash, block_height, storage_key, storage_value) VALUES
($1, $2, $3, $4, $5)`, row.Contract.Bytes(), row.BlockHash.Bytes(),
row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes())
return err
}

View File

@ -0,0 +1,32 @@
package shared_test
import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
shared2 "github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Storage queue", func() {
It("adds a storage row to the db", func() {
row := shared.StorageDiffRow{
Contract: common.HexToAddress("0x123456"),
BlockHash: common.HexToHash("0x678901"),
BlockHeight: 987,
StorageKey: common.HexToHash("0x654321"),
StorageValue: common.HexToHash("0x198765"),
}
db := test_config.NewTestDB(test_config.NewTestNode())
queue := shared2.NewStorageQueue(db)
addErr := queue.Add(row)
Expect(addErr).NotTo(HaveOccurred())
var result shared.StorageDiffRow
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
Expect(getErr).NotTo(HaveOccurred())
Expect(result).To(Equal(row))
})
})

View File

@ -22,6 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/storage" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/storage"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
"reflect"
"strings" "strings"
"github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/pkg/fs"
@ -30,14 +31,17 @@ import (
type StorageWatcher struct { type StorageWatcher struct {
db *postgres.DB db *postgres.DB
tailer fs.Tailer tailer fs.Tailer
Queue IStorageQueue
Transformers map[common.Address]storage.Transformer Transformers map[common.Address]storage.Transformer
} }
func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher { func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher {
transformers := make(map[common.Address]storage.Transformer) transformers := make(map[common.Address]storage.Transformer)
queue := NewStorageQueue(db)
return StorageWatcher{ return StorageWatcher{
db: db, db: db,
tailer: tailer, tailer: tailer,
Queue: queue,
Transformers: transformers, Transformers: transformers,
} }
} }
@ -66,9 +70,20 @@ func (watcher StorageWatcher) Execute() error {
} }
executeErr := transformer.Execute(row) executeErr := transformer.Execute(row)
if executeErr != nil { if executeErr != nil {
logrus.Warn(executeErr.Error()) if isKeyNotFound(executeErr) {
queueErr := watcher.Queue.Add(row)
if queueErr != nil {
logrus.Warn(queueErr.Error())
}
} else {
logrus.Warn(executeErr.Error())
}
continue continue
} }
} }
return nil return nil
} }
func isKeyNotFound(executeErr error) bool {
return reflect.TypeOf(executeErr) == reflect.TypeOf(shared.ErrStorageKeyNotFound{})
}

View File

@ -73,11 +73,8 @@ var _ = Describe("Storage Watcher", func() {
It("logs error if no transformer can parse storage row", func() { It("logs error if no transformer can parse storage row", func() {
mockTailer := fakes.NewMockTailer() mockTailer := fakes.NewMockTailer()
line := &tail.Line{ address := common.HexToAddress("0x12345")
Text: "12345,block_hash,123,storage_key,storage_value", line := getFakeLine(address.Bytes())
Time: time.Time{},
Err: nil,
}
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
tempFile, err := ioutil.TempFile("", "log") tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
@ -88,22 +85,14 @@ var _ = Describe("Storage Watcher", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name()) logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred()) Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: common.HexToAddress("0x12345").Hex()}.Error())) Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: address.Hex()}.Error()))
}, watcher, mockTailer, []*tail.Line{line}) }, watcher, mockTailer, []*tail.Line{line})
}) })
It("executes transformer with storage row", func() { It("executes transformer with storage row", func() {
address := []byte{1, 2, 3} address := []byte{1, 2, 3}
blockHash := []byte{4, 5, 6} line := getFakeLine(address)
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
mockTailer := fakes.NewMockTailer() mockTailer := fakes.NewMockTailer()
line := &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{})) watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)} fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer}) watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
@ -116,40 +105,76 @@ var _ = Describe("Storage Watcher", func() {
}, watcher, mockTailer, []*tail.Line{line}) }, watcher, mockTailer, []*tail.Line{line})
}) })
It("logs error if executing transformer fails", func() { Describe("when executing transformer fails", func() {
address := []byte{1, 2, 3} It("queues row when error is storage key not found", func() {
blockHash := []byte{4, 5, 6} address := []byte{1, 2, 3}
blockHeight := int64(789) line := getFakeLine(address)
storageKey := []byte{9, 8, 7} mockTailer := fakes.NewMockTailer()
storageValue := []byte{6, 5, 4} watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockTailer := fakes.NewMockTailer() mockQueue := &mocks.MockStorageQueue{}
line := &tail.Line{ watcher.Queue = mockQueue
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)), keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
Time: time.Time{}, fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
Err: nil, watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
}
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
executionError := errors.New("storage watcher failed attempting to execute transformer")
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)
assert(func(err error) { assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
}, watcher, mockTailer, []*tail.Line{line})
})
It("logs error if queuing row fails", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockQueue := &mocks.MockStorageQueue{}
mockQueue.AddError = fakes.FakeError
watcher.Queue = mockQueue
keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name()) logrus.SetOutput(tempFile)
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(executionError.Error())) assert(func(err error) {
}, watcher, mockTailer, []*tail.Line{line}) Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
}, watcher, mockTailer, []*tail.Line{line})
})
It("logs any other error", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
executionError := errors.New("storage watcher failed attempting to execute transformer")
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)
assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(executionError.Error()))
}, watcher, mockTailer, []*tail.Line{line})
})
}) })
}) })
func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) { func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) {
errs := make(chan error, 1) errs := make(chan error, 1)
done := make(chan bool, 1) done := make(chan bool, 1)
go execute(watcher, mockTailer, errs, done) go execute(watcher, errs, done)
for _, line := range lines { for _, line := range lines {
mockTailer.Lines <- line mockTailer.Lines <- line
} }
@ -165,7 +190,7 @@ func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer
} }
} }
func execute(watcher shared.StorageWatcher, tailer *fakes.MockTailer, errs chan error, done chan bool) { func execute(watcher shared.StorageWatcher, errs chan error, done chan bool) {
err := watcher.Execute() err := watcher.Execute()
if err != nil { if err != nil {
errs <- err errs <- err
@ -173,3 +198,15 @@ func execute(watcher shared.StorageWatcher, tailer *fakes.MockTailer, errs chan
done <- true done <- true
} }
} }
func getFakeLine(address []byte) *tail.Line {
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
return &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
}

View File

@ -17,6 +17,7 @@
package shared package shared
import ( import (
"fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"math/big" "math/big"
) )
@ -30,7 +31,7 @@ func Decode(row StorageDiffRow, metadata StorageValueMetadata) (interface{}, err
case Bytes32: case Bytes32:
return row.StorageValue.Hex(), nil return row.StorageValue.Hex(), nil
default: default:
return nil, ErrTypeNotFound{} panic(fmt.Sprintf("can't decode unknown type: %d", metadata.Type))
} }
} }

View File

@ -46,13 +46,4 @@ var _ = Describe("Storage decoder", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(result).To(Equal(fakeAddress.Hex())) Expect(result).To(Equal(fakeAddress.Hex()))
}) })
It("returns error if attempting to decode unknown type", func() {
metadata := shared.StorageValueMetadata{Type: 100}
_, err := shared.Decode(shared.StorageDiffRow{}, metadata)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(shared.ErrTypeNotFound{}))
})
}) })

View File

@ -28,16 +28,6 @@ func (e ErrContractNotFound) Error() string {
return fmt.Sprintf("transformer not found for contract: %s", e.Contract) return fmt.Sprintf("transformer not found for contract: %s", e.Contract)
} }
type ErrHeaderMismatch struct {
BlockHeight int
DbHash string
DiffHash string
}
func (e ErrHeaderMismatch) Error() string {
return fmt.Sprintf("header hash in row does not match db at height %d - row: %s, db: %s", e.BlockHeight, e.DbHash, e.DiffHash)
}
type ErrMetadataMalformed struct { type ErrMetadataMalformed struct {
MissingData Key MissingData Key
} }
@ -61,11 +51,3 @@ type ErrStorageKeyNotFound struct {
func (e ErrStorageKeyNotFound) Error() string { func (e ErrStorageKeyNotFound) Error() string {
return fmt.Sprintf("unknown storage key: %s", e.Key) return fmt.Sprintf("unknown storage key: %s", e.Key)
} }
type ErrTypeNotFound struct {
Type int
}
func (e ErrTypeNotFound) Error() string {
return fmt.Sprintf("no decoder for type: %d", e.Type)
}

View File

@ -25,10 +25,10 @@ const ExpectedRowLength = 5
type StorageDiffRow struct { type StorageDiffRow struct {
Contract common.Address Contract common.Address
BlockHash common.Hash BlockHash common.Hash `db:"block_hash"`
BlockHeight int BlockHeight int `db:"block_height"`
StorageKey common.Hash StorageKey common.Hash `db:"storage_key"`
StorageValue common.Hash StorageValue common.Hash `db:"storage_value"`
} }
func FromStrings(csvRow []string) (StorageDiffRow, error) { func FromStrings(csvRow []string) (StorageDiffRow, error) {

View File

@ -0,0 +1,15 @@
package mocks
import (
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
)
type MockStorageQueue struct {
AddCalled bool
AddError error
}
func (queue *MockStorageQueue) Add(row shared.StorageDiffRow) error {
queue.AddCalled = true
return queue.AddError
}