From cb819fa9a60670f84be23ac1b60e96dcd3cb951c Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Tue, 23 Jul 2019 14:57:52 -0500 Subject: [PATCH] Write event logs to database before transforming - enables decoupling event extraction/persistence from transformation - modifies event transformer, converter, and log chunker to accept payload that includes internal log database ID with log data - remove alias for transformer pkg as shared_t - remove unused mock watcher repository --- .../00029_create_header_sync_logs_table.sql | 21 +++ db/schema.sql | 70 ++++++++ libraries/shared/chunker/log_chunker.go | 23 ++- libraries/shared/chunker/log_chunker_test.go | 83 +++++----- libraries/shared/factories/event/converter.go | 4 +- .../shared/factories/event/transformer.go | 4 +- .../factories/event/transformer_test.go | 16 +- libraries/shared/mocks/converter.go | 10 +- libraries/shared/mocks/transformer.go | 19 ++- libraries/shared/mocks/watcher_repository.go | 69 -------- libraries/shared/repository/repository.go | 64 ++++++++ .../shared/repository/repository_test.go | 152 ++++++++++++++++++ libraries/shared/test_data/generic.go | 35 ++-- .../shared/transformer/event_transformer.go | 4 +- libraries/shared/watcher/event_watcher.go | 10 +- .../shared/watcher/event_watcher_test.go | 44 +++-- pkg/core/log.go | 9 ++ test_config/test_config.go | 1 + 18 files changed, 468 insertions(+), 170 deletions(-) create mode 100644 db/migrations/00029_create_header_sync_logs_table.sql delete mode 100644 libraries/shared/mocks/watcher_repository.go diff --git a/db/migrations/00029_create_header_sync_logs_table.sql b/db/migrations/00029_create_header_sync_logs_table.sql new file mode 100644 index 00000000..1d4cd4dd --- /dev/null +++ b/db/migrations/00029_create_header_sync_logs_table.sql @@ -0,0 +1,21 @@ +-- +goose Up +-- SQL in this section is executed when the migration is applied. +CREATE TABLE header_sync_logs +( + id SERIAL PRIMARY KEY, + header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, + address VARCHAR(66), + topics BYTEA[], + data BYTEA, + block_number BIGINT, + block_hash VARCHAR(66), + tx_hash VARCHAR(66), + tx_index INTEGER, + log_index INTEGER, + raw JSONB, + UNIQUE (header_id, tx_index, log_index) +); + +-- +goose Down +-- SQL in this section is executed when the migration is rolled back. +DROP TABLE header_sync_logs; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index d58aa9a2..a8f04eb3 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -296,6 +296,45 @@ CREATE SEQUENCE public.goose_db_version_id_seq ALTER SEQUENCE public.goose_db_version_id_seq OWNED BY public.goose_db_version.id; +-- +-- Name: header_sync_logs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.header_sync_logs ( + id integer NOT NULL, + header_id integer NOT NULL, + address character varying(66), + topics bytea[], + data bytea, + block_number bigint, + block_hash character varying(66), + tx_hash character varying(66), + tx_index integer, + log_index integer, + raw jsonb +); + + +-- +-- Name: header_sync_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.header_sync_logs_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: header_sync_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.header_sync_logs_id_seq OWNED BY public.header_sync_logs.id; + + -- -- Name: header_sync_receipts; Type: TABLE; Schema: public; Owner: - -- @@ -650,6 +689,13 @@ ALTER TABLE ONLY public.full_sync_transactions ALTER COLUMN id SET DEFAULT nextv ALTER TABLE ONLY public.goose_db_version ALTER COLUMN id SET DEFAULT nextval('public.goose_db_version_id_seq'::regclass); +-- +-- Name: header_sync_logs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.header_sync_logs ALTER COLUMN id SET DEFAULT nextval('public.header_sync_logs_id_seq'::regclass); + + -- -- Name: header_sync_receipts id; Type: DEFAULT; Schema: public; Owner: - -- @@ -787,6 +833,22 @@ ALTER TABLE ONLY public.goose_db_version ADD CONSTRAINT goose_db_version_pkey PRIMARY KEY (id); +-- +-- Name: header_sync_logs header_sync_logs_header_id_tx_index_log_index_key; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.header_sync_logs + ADD CONSTRAINT header_sync_logs_header_id_tx_index_log_index_key UNIQUE (header_id, tx_index, log_index); + + +-- +-- Name: header_sync_logs header_sync_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.header_sync_logs + ADD CONSTRAINT header_sync_logs_pkey PRIMARY KEY (id); + + -- -- Name: header_sync_receipts header_sync_receipts_header_id_transaction_id_key; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -965,6 +1027,14 @@ ALTER TABLE ONLY public.full_sync_transactions ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; +-- +-- Name: header_sync_logs header_sync_logs_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.header_sync_logs + ADD CONSTRAINT header_sync_logs_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; + + -- -- Name: header_sync_receipts header_sync_receipts_contract_address_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/libraries/shared/chunker/log_chunker.go b/libraries/shared/chunker/log_chunker.go index 87861ba4..c3a780ba 100644 --- a/libraries/shared/chunker/log_chunker.go +++ b/libraries/shared/chunker/log_chunker.go @@ -20,14 +20,13 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - - shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/core" ) type Chunker interface { - AddConfigs(transformerConfigs []shared_t.EventTransformerConfig) - ChunkLogs(logs []types.Log) map[string][]types.Log + AddConfigs(transformerConfigs []transformer.EventTransformerConfig) + ChunkLogs(logs []core.HeaderSyncLog) map[string][]core.HeaderSyncLog } type LogChunker struct { @@ -45,7 +44,7 @@ func NewLogChunker() *LogChunker { } // Configures the chunker by adding more addreses and topics to consider. -func (chunker *LogChunker) AddConfigs(transformerConfigs []shared_t.EventTransformerConfig) { +func (chunker *LogChunker) AddConfigs(transformerConfigs []transformer.EventTransformerConfig) { for _, config := range transformerConfigs { for _, address := range config.ContractAddresses { var lowerCaseAddress = strings.ToLower(address) @@ -56,15 +55,15 @@ func (chunker *LogChunker) AddConfigs(transformerConfigs []shared_t.EventTransfo } // Goes through an array of logs, associating relevant logs (matching addresses and topic) with transformers -func (chunker *LogChunker) ChunkLogs(logs []types.Log) map[string][]types.Log { - chunks := map[string][]types.Log{} +func (chunker *LogChunker) ChunkLogs(logs []core.HeaderSyncLog) map[string][]core.HeaderSyncLog { + chunks := map[string][]core.HeaderSyncLog{} for _, log := range logs { // Topic0 is not unique to each transformer, also need to consider the contract address - relevantTransformers := chunker.AddressToNames[strings.ToLower(log.Address.String())] + relevantTransformers := chunker.AddressToNames[strings.ToLower(log.Log.Address.Hex())] - for _, transformer := range relevantTransformers { - if chunker.NameToTopic0[transformer] == log.Topics[0] { - chunks[transformer] = append(chunks[transformer], log) + for _, t := range relevantTransformers { + if chunker.NameToTopic0[t] == log.Log.Topics[0] { + chunks[t] = append(chunks[t], log) } } } diff --git a/libraries/shared/chunker/log_chunker_test.go b/libraries/shared/chunker/log_chunker_test.go index daf23a50..629c04b3 100644 --- a/libraries/shared/chunker/log_chunker_test.go +++ b/libraries/shared/chunker/log_chunker_test.go @@ -23,34 +23,35 @@ import ( . "github.com/onsi/gomega" chunk "github.com/vulcanize/vulcanizedb/libraries/shared/chunker" - shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/core" ) var _ = Describe("Log chunker", func() { var ( - configs []shared_t.EventTransformerConfig + configs []transformer.EventTransformerConfig chunker *chunk.LogChunker ) BeforeEach(func() { - configA := shared_t.EventTransformerConfig{ + configA := transformer.EventTransformerConfig{ TransformerName: "TransformerA", ContractAddresses: []string{"0x00000000000000000000000000000000000000A1", "0x00000000000000000000000000000000000000A2"}, Topic: "0xA", } - configB := shared_t.EventTransformerConfig{ + configB := transformer.EventTransformerConfig{ TransformerName: "TransformerB", ContractAddresses: []string{"0x00000000000000000000000000000000000000B1"}, Topic: "0xB", } - configC := shared_t.EventTransformerConfig{ + configC := transformer.EventTransformerConfig{ TransformerName: "TransformerC", ContractAddresses: []string{"0x00000000000000000000000000000000000000A2"}, Topic: "0xC", } - configs = []shared_t.EventTransformerConfig{configA, configB, configC} + configs = []transformer.EventTransformerConfig{configA, configB, configC} chunker = chunk.NewLogChunker() chunker.AddConfigs(configs) }) @@ -73,24 +74,24 @@ var _ = Describe("Log chunker", func() { Describe("AddConfigs", func() { It("can add more configs later", func() { - configD := shared_t.EventTransformerConfig{ + configD := transformer.EventTransformerConfig{ TransformerName: "TransformerD", ContractAddresses: []string{"0x000000000000000000000000000000000000000D"}, Topic: "0xD", } - chunker.AddConfigs([]shared_t.EventTransformerConfig{configD}) + chunker.AddConfigs([]transformer.EventTransformerConfig{configD}) Expect(chunker.AddressToNames).To(ContainElement([]string{"TransformerD"})) Expect(chunker.NameToTopic0).To(ContainElement(common.HexToHash("0xD"))) }) It("lower cases address", func() { - configD := shared_t.EventTransformerConfig{ + configD := transformer.EventTransformerConfig{ TransformerName: "TransformerD", ContractAddresses: []string{"0x000000000000000000000000000000000000000D"}, Topic: "0xD", } - chunker.AddConfigs([]shared_t.EventTransformerConfig{configD}) + chunker.AddConfigs([]transformer.EventTransformerConfig{configD}) Expect(chunker.AddressToNames["0x000000000000000000000000000000000000000d"]).To(Equal([]string{"TransformerD"})) }) @@ -98,7 +99,7 @@ var _ = Describe("Log chunker", func() { Describe("ChunkLogs", func() { It("only associates logs with relevant topic0 and address to transformers", func() { - logs := []types.Log{log1, log2, log3, log4, log5} + logs := []core.HeaderSyncLog{log1, log2, log3, log4, log5} chunks := chunker.ChunkLogs(logs) Expect(chunks["TransformerA"]).To(And(ContainElement(log1), ContainElement(log4))) @@ -110,43 +111,53 @@ var _ = Describe("Log chunker", func() { var ( // Match TransformerA - log1 = types.Log{ - Address: common.HexToAddress("0xA1"), - Topics: []common.Hash{ - common.HexToHash("0xA"), - common.HexToHash("0xLogTopic1"), + log1 = core.HeaderSyncLog{ + Log: types.Log{ + Address: common.HexToAddress("0xA1"), + Topics: []common.Hash{ + common.HexToHash("0xA"), + common.HexToHash("0xLogTopic1"), + }, }, } // Match TransformerA address, but not topic0 - log2 = types.Log{ - Address: common.HexToAddress("0xA1"), - Topics: []common.Hash{ - common.HexToHash("0xB"), - common.HexToHash("0xLogTopic2"), + log2 = core.HeaderSyncLog{ + Log: types.Log{ + Address: common.HexToAddress("0xA1"), + Topics: []common.Hash{ + common.HexToHash("0xB"), + common.HexToHash("0xLogTopic2"), + }, }, } // Match TransformerA topic, but TransformerB address - log3 = types.Log{ - Address: common.HexToAddress("0xB1"), - Topics: []common.Hash{ - common.HexToHash("0xA"), - common.HexToHash("0xLogTopic3"), + log3 = core.HeaderSyncLog{ + Log: types.Log{ + Address: common.HexToAddress("0xB1"), + Topics: []common.Hash{ + common.HexToHash("0xA"), + common.HexToHash("0xLogTopic3"), + }, }, } // Match TransformerA, with the other address - log4 = types.Log{ - Address: common.HexToAddress("0xA2"), - Topics: []common.Hash{ - common.HexToHash("0xA"), - common.HexToHash("0xLogTopic4"), + log4 = core.HeaderSyncLog{ + Log: types.Log{ + Address: common.HexToAddress("0xA2"), + Topics: []common.Hash{ + common.HexToHash("0xA"), + common.HexToHash("0xLogTopic4"), + }, }, } // Match TransformerC, which shares address with TransformerA - log5 = types.Log{ - Address: common.HexToAddress("0xA2"), - Topics: []common.Hash{ - common.HexToHash("0xC"), - common.HexToHash("0xLogTopic5"), + log5 = core.HeaderSyncLog{ + Log: types.Log{ + Address: common.HexToAddress("0xA2"), + Topics: []common.Hash{ + common.HexToHash("0xC"), + common.HexToHash("0xLogTopic5"), + }, }, } ) diff --git a/libraries/shared/factories/event/converter.go b/libraries/shared/factories/event/converter.go index 4fb647f2..525b628a 100644 --- a/libraries/shared/factories/event/converter.go +++ b/libraries/shared/factories/event/converter.go @@ -16,9 +16,9 @@ package event -import "github.com/ethereum/go-ethereum/core/types" +import "github.com/vulcanize/vulcanizedb/pkg/core" type Converter interface { - ToEntities(contractAbi string, ethLog []types.Log) ([]interface{}, error) + ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error) ToModels([]interface{}) ([]interface{}, error) } diff --git a/libraries/shared/factories/event/transformer.go b/libraries/shared/factories/event/transformer.go index dd2c675a..19c80eec 100644 --- a/libraries/shared/factories/event/transformer.go +++ b/libraries/shared/factories/event/transformer.go @@ -17,10 +17,10 @@ package event import ( - "github.com/ethereum/go-ethereum/core/types" "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) @@ -35,7 +35,7 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Event return transformer } -func (transformer Transformer) Execute(logs []types.Log, headerID int64) error { +func (transformer Transformer) Execute(logs []core.HeaderSyncLog, headerID int64) error { transformerName := transformer.Config.TransformerName config := transformer.Config diff --git a/libraries/shared/factories/event/transformer_test.go b/libraries/shared/factories/event/transformer_test.go index 8a5fadab..c13b0e0b 100644 --- a/libraries/shared/factories/event/transformer_test.go +++ b/libraries/shared/factories/event/transformer_test.go @@ -19,7 +19,6 @@ package event_test import ( "math/rand" - "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -38,7 +37,7 @@ var _ = Describe("Transformer", func() { t transformer.EventTransformer headerOne core.Header config = test_data.GenericTestConfig - logs = test_data.GenericTestLogs + logs []core.HeaderSyncLog ) BeforeEach(func() { @@ -52,6 +51,13 @@ var _ = Describe("Transformer", func() { }.NewTransformer(nil) headerOne = core.Header{Id: rand.Int63(), BlockNumber: rand.Int63()} + + logs = []core.HeaderSyncLog{{ + ID: 0, + HeaderID: headerOne.Id, + Log: test_data.GenericTestLog(), + Transformed: false, + }} }) It("sets the db", func() { @@ -59,14 +65,14 @@ var _ = Describe("Transformer", func() { }) It("marks header checked if no logs returned", func() { - err := t.Execute([]types.Log{}, headerOne.Id) + err := t.Execute([]core.HeaderSyncLog{}, headerOne.Id) Expect(err).NotTo(HaveOccurred()) repository.AssertMarkHeaderCheckedCalledWith(headerOne.Id) }) It("doesn't attempt to convert or persist an empty collection when there are no logs", func() { - err := t.Execute([]types.Log{}, headerOne.Id) + err := t.Execute([]core.HeaderSyncLog{}, headerOne.Id) Expect(err).NotTo(HaveOccurred()) Expect(converter.ToEntitiesCalledCounter).To(Equal(0)) @@ -84,7 +90,7 @@ var _ = Describe("Transformer", func() { It("returns error if marking header checked returns err", func() { repository.SetMarkHeaderCheckedError(fakes.FakeError) - err := t.Execute([]types.Log{}, headerOne.Id) + err := t.Execute([]core.HeaderSyncLog{}, headerOne.Id) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/mocks/converter.go b/libraries/shared/mocks/converter.go index 0a687718..c0f42c4e 100644 --- a/libraries/shared/mocks/converter.go +++ b/libraries/shared/mocks/converter.go @@ -16,9 +16,7 @@ package mocks -import ( - "github.com/ethereum/go-ethereum/core/types" -) +import "github.com/vulcanize/vulcanizedb/pkg/core" type MockConverter struct { ToEntitiesError error @@ -27,7 +25,7 @@ type MockConverter struct { entityConverterError error modelConverterError error ContractAbi string - LogsToConvert []types.Log + LogsToConvert []core.HeaderSyncLog EntitiesToConvert []interface{} EntitiesToReturn []interface{} ModelsToReturn []interface{} @@ -35,9 +33,9 @@ type MockConverter struct { ToModelsCalledCounter int } -func (converter *MockConverter) ToEntities(contractAbi string, ethLogs []types.Log) ([]interface{}, error) { +func (converter *MockConverter) ToEntities(contractAbi string, ethLogs []core.HeaderSyncLog) ([]interface{}, error) { for _, log := range ethLogs { - converter.PassedContractAddresses = append(converter.PassedContractAddresses, log.Address.Hex()) + converter.PassedContractAddresses = append(converter.PassedContractAddresses, log.Log.Address.Hex()) } converter.ContractAbi = contractAbi converter.LogsToConvert = ethLogs diff --git a/libraries/shared/mocks/transformer.go b/libraries/shared/mocks/transformer.go index e8bf65e4..034bbe69 100644 --- a/libraries/shared/mocks/transformer.go +++ b/libraries/shared/mocks/transformer.go @@ -17,21 +17,20 @@ package mocks import ( - "github.com/ethereum/go-ethereum/core/types" - - shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type MockTransformer struct { ExecuteWasCalled bool ExecuteError error - PassedLogs []types.Log + PassedLogs []core.HeaderSyncLog PassedHeaderID int64 - config shared_t.EventTransformerConfig + config transformer.EventTransformerConfig } -func (mh *MockTransformer) Execute(logs []types.Log, headerID int64) error { +func (mh *MockTransformer) Execute(logs []core.HeaderSyncLog, headerID int64) error { if mh.ExecuteError != nil { return mh.ExecuteError } @@ -41,19 +40,19 @@ func (mh *MockTransformer) Execute(logs []types.Log, headerID int64) error { return nil } -func (mh *MockTransformer) GetConfig() shared_t.EventTransformerConfig { +func (mh *MockTransformer) GetConfig() transformer.EventTransformerConfig { return mh.config } -func (mh *MockTransformer) SetTransformerConfig(config shared_t.EventTransformerConfig) { +func (mh *MockTransformer) SetTransformerConfig(config transformer.EventTransformerConfig) { mh.config = config } -func (mh *MockTransformer) FakeTransformerInitializer(db *postgres.DB) shared_t.EventTransformer { +func (mh *MockTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.EventTransformer { return mh } -var FakeTransformerConfig = shared_t.EventTransformerConfig{ +var FakeTransformerConfig = transformer.EventTransformerConfig{ TransformerName: "FakeTransformer", ContractAddresses: []string{"FakeAddress"}, Topic: "FakeTopic", diff --git a/libraries/shared/mocks/watcher_repository.go b/libraries/shared/mocks/watcher_repository.go deleted file mode 100644 index fb0da575..00000000 --- a/libraries/shared/mocks/watcher_repository.go +++ /dev/null @@ -1,69 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" -) - -type MockWatcherRepository struct { - ReturnCheckedColumnNames []string - GetCheckedColumnNamesError error - GetCheckedColumnNamesCalled bool - - ReturnNotCheckedSQL string - CreateNotCheckedSQLCalled bool - - ReturnMissingHeaders []core.Header - MissingHeadersError error - MissingHeadersCalled bool -} - -func (repository *MockWatcherRepository) GetCheckedColumnNames(db *postgres.DB) ([]string, error) { - repository.GetCheckedColumnNamesCalled = true - if repository.GetCheckedColumnNamesError != nil { - return []string{}, repository.GetCheckedColumnNamesError - } - - return repository.ReturnCheckedColumnNames, nil -} - -func (repository *MockWatcherRepository) SetCheckedColumnNames(checkedColumnNames []string) { - repository.ReturnCheckedColumnNames = checkedColumnNames -} - -func (repository *MockWatcherRepository) CreateNotCheckedSQL(boolColumns []string) string { - repository.CreateNotCheckedSQLCalled = true - return repository.ReturnNotCheckedSQL -} - -func (repository *MockWatcherRepository) SetNotCheckedSQL(notCheckedSql string) { - repository.ReturnNotCheckedSQL = notCheckedSql -} - -func (repository *MockWatcherRepository) MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, db *postgres.DB, notCheckedSQL string) ([]core.Header, error) { - if repository.MissingHeadersError != nil { - return []core.Header{}, repository.MissingHeadersError - } - repository.MissingHeadersCalled = true - return repository.ReturnMissingHeaders, nil -} - -func (repository *MockWatcherRepository) SetMissingHeaders(headers []core.Header) { - repository.ReturnMissingHeaders = headers -} diff --git a/libraries/shared/repository/repository.go b/libraries/shared/repository/repository.go index 2b7e2ce1..8aea8c49 100644 --- a/libraries/shared/repository/repository.go +++ b/libraries/shared/repository/repository.go @@ -18,15 +18,23 @@ package repository import ( "bytes" + "database/sql" "database/sql/driver" "fmt" + "github.com/ethereum/go-ethereum/core/types" "github.com/jmoiron/sqlx" + "github.com/lib/pq" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +const insertHeaderSyncLogQuery = `INSERT INTO header_sync_logs + (header_id, address, topics, data, block_number, block_hash, tx_index, tx_hash, log_index, raw) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING RETURNING id` + func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error { _, err := db.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2) @@ -113,6 +121,62 @@ func CreateHeaderCheckedPredicateSQL(boolColumns []string, recheckHeaders consta } } +func CreateLogs(headerID int64, logs []types.Log, db *postgres.DB) ([]core.HeaderSyncLog, error) { + tx, txErr := db.Beginx() + if txErr != nil { + return nil, txErr + } + var results []core.HeaderSyncLog + for _, log := range logs { + logID, err := insertLog(headerID, log, tx) + if err != nil { + if logWasADuplicate(err) { + continue + } + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Errorf("failed to rollback header sync log insert: %s", rollbackErr.Error()) + } + return nil, err + } + results = append(results, buildLog(logID, headerID, log)) + } + return results, tx.Commit() +} + +func logWasADuplicate(err error) bool { + return err == sql.ErrNoRows +} + +func insertLog(headerID int64, log types.Log, tx *sqlx.Tx) (int64, error) { + topics := buildTopics(log) + raw, jsonErr := log.MarshalJSON() + if jsonErr != nil { + return 0, jsonErr + } + var logID int64 + err := tx.QueryRowx(insertHeaderSyncLogQuery, headerID, log.Address.Hex(), topics, log.Data, log.BlockNumber, + log.BlockHash.Hex(), log.TxIndex, log.TxHash.Hex(), log.Index, raw).Scan(&logID) + return logID, err +} + +func buildLog(logID int64, headerID int64, log types.Log) core.HeaderSyncLog { + return core.HeaderSyncLog{ + ID: logID, + HeaderID: headerID, + Log: log, + Transformed: false, + } +} + +func buildTopics(log types.Log) pq.ByteaArray { + var topics pq.ByteaArray + for _, topic := range log.Topics { + topics = append(topics, topic.Bytes()) + } + return topics +} + func createHeaderCheckedPredicateSQLForMissingHeaders(boolColumns []string) string { var result bytes.Buffer result.WriteString(" (") diff --git a/libraries/shared/repository/repository_test.go b/libraries/shared/repository/repository_test.go index 183a940f..9eb33464 100644 --- a/libraries/shared/repository/repository_test.go +++ b/libraries/shared/repository/repository_test.go @@ -18,6 +18,10 @@ package repository_test import ( "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/lib/pq" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" "math/rand" "strconv" @@ -291,6 +295,154 @@ var _ = Describe("Repository", func() { }) }) }) + + Describe("CreateHeaderSyncLogs", func() { + var headerID int64 + + type HeaderSyncLog struct { + ID int64 + HeaderID int64 `db:"header_id"` + Address string + Topics pq.ByteaArray + Data []byte + BlockNumber uint64 `db:"block_number"` + BlockHash string `db:"block_hash"` + TxHash string `db:"tx_hash"` + TxIndex uint `db:"tx_index"` + LogIndex uint `db:"log_index"` + Transformed bool + Raw []byte + } + + BeforeEach(func() { + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + var headerErr error + headerID, headerErr = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) + Expect(headerErr).NotTo(HaveOccurred()) + }) + + It("writes a log to the db", func() { + log := test_data.GenericTestLog() + + _, err := shared.CreateLogs(headerID, []types.Log{log}, db) + + Expect(err).NotTo(HaveOccurred()) + var dbLog HeaderSyncLog + lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) + Expect(lookupErr).NotTo(HaveOccurred()) + Expect(dbLog.ID).NotTo(BeZero()) + Expect(dbLog.HeaderID).To(Equal(headerID)) + Expect(dbLog.Address).To(Equal(log.Address.Hex())) + Expect(dbLog.Topics[0]).To(Equal(log.Topics[0].Bytes())) + Expect(dbLog.Topics[1]).To(Equal(log.Topics[1].Bytes())) + Expect(dbLog.Data).To(Equal(log.Data)) + Expect(dbLog.BlockNumber).To(Equal(log.BlockNumber)) + Expect(dbLog.BlockHash).To(Equal(log.BlockHash.Hex())) + Expect(dbLog.TxIndex).To(Equal(log.TxIndex)) + Expect(dbLog.TxHash).To(Equal(log.TxHash.Hex())) + Expect(dbLog.LogIndex).To(Equal(log.Index)) + expectedRaw, jsonErr := log.MarshalJSON() + Expect(jsonErr).NotTo(HaveOccurred()) + Expect(dbLog.Raw).To(MatchJSON(expectedRaw)) + Expect(dbLog.Transformed).To(BeFalse()) + }) + + It("writes several logs to the db", func() { + log1 := test_data.GenericTestLog() + log2 := test_data.GenericTestLog() + logs := []types.Log{log1, log2} + + _, err := shared.CreateLogs(headerID, logs, db) + + Expect(err).NotTo(HaveOccurred()) + var count int + lookupErr := db.Get(&count, `SELECT COUNT(*) FROM header_sync_logs`) + Expect(lookupErr).NotTo(HaveOccurred()) + Expect(count).To(Equal(len(logs))) + }) + + It("persists record that can be unpacked into types.Log", func() { + // important if we want to decouple log persistence from transforming and still make use of + // tools on types.Log like abi.Unpack + + log := test_data.GenericTestLog() + + _, err := shared.CreateLogs(headerID, []types.Log{log}, db) + + Expect(err).NotTo(HaveOccurred()) + var dbLog HeaderSyncLog + lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) + Expect(lookupErr).NotTo(HaveOccurred()) + + var logTopics []common.Hash + for _, topic := range dbLog.Topics { + logTopics = append(logTopics, common.BytesToHash(topic)) + } + + reconstructedLog := types.Log{ + Address: common.HexToAddress(dbLog.Address), + Topics: logTopics, + Data: dbLog.Data, + BlockNumber: dbLog.BlockNumber, + TxHash: common.HexToHash(dbLog.TxHash), + TxIndex: dbLog.TxIndex, + BlockHash: common.HexToHash(dbLog.BlockHash), + Index: dbLog.LogIndex, + Removed: false, + } + Expect(reconstructedLog).To(Equal(log)) + }) + + It("does not duplicate logs", func() { + log := test_data.GenericTestLog() + + results, err := shared.CreateLogs(headerID, []types.Log{log, log}, db) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(results)).To(Equal(1)) + var count int + lookupErr := db.Get(&count, `SELECT COUNT(*) FROM header_sync_logs`) + Expect(lookupErr).NotTo(HaveOccurred()) + Expect(count).To(Equal(1)) + }) + + It("returns results with log id and header id for persisted logs", func() { + log1 := test_data.GenericTestLog() + log2 := test_data.GenericTestLog() + logs := []types.Log{log1, log2} + + results, err := shared.CreateLogs(headerID, logs, db) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(results)).To(Equal(len(logs))) + var log1ID, log2ID int64 + lookupErr := db.Get(&log1ID, `SELECT id FROM header_sync_logs WHERE data = $1`, log1.Data) + Expect(lookupErr).NotTo(HaveOccurred()) + lookup2Err := db.Get(&log2ID, `SELECT id FROM header_sync_logs WHERE data = $1`, log2.Data) + Expect(lookup2Err).NotTo(HaveOccurred()) + Expect(results[0].ID).To(Or(Equal(log1ID), Equal(log2ID))) + Expect(results[1].ID).To(Or(Equal(log1ID), Equal(log2ID))) + Expect(results[0].HeaderID).To(Equal(headerID)) + Expect(results[1].HeaderID).To(Equal(headerID)) + }) + + It("returns results with properties for persisted logs", func() { + log1 := test_data.GenericTestLog() + log2 := test_data.GenericTestLog() + logs := []types.Log{log1, log2} + + results, err := shared.CreateLogs(headerID, logs, db) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(results)).To(Equal(len(logs))) + Expect(results[0].Log).To(Or(Equal(log1), Equal(log2))) + Expect(results[1].Log).To(Or(Equal(log1), Equal(log2))) + Expect(results[0].Transformed).To(BeFalse()) + Expect(results[1].Transformed).To(BeFalse()) + }) + }) }) func getExpectedColumnNames() []string { diff --git a/libraries/shared/test_data/generic.go b/libraries/shared/test_data/generic.go index eb68f03a..f00b51a0 100644 --- a/libraries/shared/test_data/generic.go +++ b/libraries/shared/test_data/generic.go @@ -17,6 +17,7 @@ package test_data import ( + "github.com/ethereum/go-ethereum/common/hexutil" "math/rand" "time" @@ -30,28 +31,42 @@ type GenericModel struct{} type GenericEntity struct{} var startingBlockNumber = rand.Int63() -var topic = "0x" + randomString(64) -var address = "0x" + randomString(38) +var topic0 = "0x" + randomString(64) -var GenericTestLogs = []types.Log{{ - Address: common.HexToAddress(address), - Topics: []common.Hash{common.HexToHash(topic)}, - BlockNumber: uint64(startingBlockNumber), -}} +var GenericTestLog = func() types.Log { + return types.Log{ + Address: fakeAddress(), + Topics: []common.Hash{common.HexToHash(topic0), fakeHash()}, + Data: hexutil.MustDecode(fakeHash().Hex()), + BlockNumber: uint64(startingBlockNumber), + TxHash: fakeHash(), + TxIndex: uint(rand.Int31()), + BlockHash: fakeHash(), + Index: uint(rand.Int31()), + } +} var GenericTestConfig = transformer.EventTransformerConfig{ TransformerName: "generic-test-transformer", - ContractAddresses: []string{address}, + ContractAddresses: []string{fakeAddress().Hex()}, ContractAbi: randomString(100), - Topic: topic, + Topic: topic0, StartingBlockNumber: startingBlockNumber, EndingBlockNumber: startingBlockNumber + 1, } +func fakeAddress() common.Address { + return common.HexToAddress("0x" + randomString(40)) +} + +func fakeHash() common.Hash { + return common.HexToHash("0x" + randomString(64)) +} + func randomString(length int) string { var seededRand = rand.New( rand.NewSource(time.Now().UnixNano())) - charset := "abcdefghijklmnopqrstuvwxyz1234567890" + charset := "abcdef1234567890" b := make([]byte, length) for i := range b { b[i] = charset[seededRand.Intn(len(charset))] diff --git a/libraries/shared/transformer/event_transformer.go b/libraries/shared/transformer/event_transformer.go index 1bbb8b54..9e888ccf 100644 --- a/libraries/shared/transformer/event_transformer.go +++ b/libraries/shared/transformer/event_transformer.go @@ -18,13 +18,13 @@ package transformer import ( "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type EventTransformer interface { - Execute(logs []types.Log, headerID int64) error + Execute(logs []core.HeaderSyncLog, headerID int64) error GetConfig() EventTransformerConfig } diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index 6c78484b..5d8f3308 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -21,7 +21,6 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/transactions" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/chunker" @@ -122,7 +121,12 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti return transactionsSyncErr } - transformErr := watcher.transformLogs(logs, header.Id) + persistedLogs, createLogsErr := repository.CreateLogs(header.Id, logs, watcher.DB) + if createLogsErr != nil { + logrus.Errorf("error persisting logs: %s", createLogsErr.Error()) + } + + transformErr := watcher.transformLogs(persistedLogs, header.Id) if transformErr != nil { logrus.Error("Could not transform logs: ", transformErr) return transformErr @@ -131,7 +135,7 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti return err } -func (watcher *EventWatcher) transformLogs(logs []types.Log, headerID int64) error { +func (watcher *EventWatcher) transformLogs(logs []core.HeaderSyncLog, headerID int64) error { chunkedLogs := watcher.Chunker.ChunkLogs(logs) // Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync, diff --git a/libraries/shared/watcher/event_watcher_test.go b/libraries/shared/watcher/event_watcher_test.go index 6edd168d..b3e606c1 100644 --- a/libraries/shared/watcher/event_watcher_test.go +++ b/libraries/shared/watcher/event_watcher_test.go @@ -18,7 +18,6 @@ package watcher_test import ( "errors" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -106,7 +105,6 @@ var _ = Describe("Watcher", func() { w watcher.EventWatcher mockBlockChain fakes.MockBlockChain headerRepository repositories.HeaderRepository - repository mocks.MockWatcherRepository ) BeforeEach(func() { @@ -117,14 +115,12 @@ var _ = Describe("Watcher", func() { _, err := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) - repository = mocks.MockWatcherRepository{} w = watcher.NewEventWatcher(db, &mockBlockChain) }) It("syncs transactions for fetched logs", func() { fakeTransformer := &mocks.MockTransformer{} w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) mockTransactionSyncer := &fakes.MockTransactionSyncer{} w.Syncer = mockTransactionSyncer @@ -137,7 +133,6 @@ var _ = Describe("Watcher", func() { It("returns error if syncing transactions fails", func() { fakeTransformer := &mocks.MockTransformer{} w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) mockTransactionSyncer := &fakes.MockTransactionSyncer{} mockTransactionSyncer.SyncTransactionsError = fakes.FakeError w.Syncer = mockTransactionSyncer @@ -148,10 +143,30 @@ var _ = Describe("Watcher", func() { Expect(err).To(MatchError(fakes.FakeError)) }) + It("persists fetched logs", func() { + fakeTransformer := &mocks.MockTransformer{} + transformerConfig := transformer.EventTransformerConfig{TransformerName: "transformerA", + ContractAddresses: []string{"0x000000000000000000000000000000000000000A"}, + Topic: "0xA"} + fakeTransformer.SetTransformerConfig(transformerConfig) + w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + log := types.Log{Address: common.HexToAddress("0xA"), + Topics: []common.Hash{common.HexToHash("0xA")}, + Index: 0, + } + mockBlockChain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{log}) + + err := w.Execute(constants.HeaderMissing) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(fakeTransformer.PassedLogs)).NotTo(BeZero()) + Expect(fakeTransformer.PassedLogs[0].ID).NotTo(BeZero()) + Expect(fakeTransformer.PassedLogs[0].Log).To(Equal(log)) + }) + It("executes each transformer", func() { fakeTransformer := &mocks.MockTransformer{} w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) err := w.Execute(constants.HeaderMissing) Expect(err).NotTo(HaveOccurred()) @@ -161,7 +176,6 @@ var _ = Describe("Watcher", func() { It("returns an error if transformer returns an error", func() { fakeTransformer := &mocks.MockTransformer{ExecuteError: errors.New("Something bad happened")} w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) err := w.Execute(constants.HeaderMissing) Expect(err).To(HaveOccurred()) @@ -183,26 +197,30 @@ var _ = Describe("Watcher", func() { transformerB.SetTransformerConfig(configB) logA := types.Log{Address: common.HexToAddress("0xA"), - Topics: []common.Hash{common.HexToHash("0xA")}} + Topics: []common.Hash{common.HexToHash("0xA")}, + Index: 0, + } logB := types.Log{Address: common.HexToAddress("0xB"), - Topics: []common.Hash{common.HexToHash("0xB")}} + Topics: []common.Hash{common.HexToHash("0xB")}, + Index: 1, + } mockBlockChain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{logA, logB}) - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) w = watcher.NewEventWatcher(db, &mockBlockChain) w.AddTransformers([]transformer.EventTransformerInitializer{ transformerA.FakeTransformerInitializer, transformerB.FakeTransformerInitializer}) err := w.Execute(constants.HeaderMissing) Expect(err).NotTo(HaveOccurred()) - Expect(transformerA.PassedLogs).To(Equal([]types.Log{logA})) - Expect(transformerB.PassedLogs).To(Equal([]types.Log{logB})) + Expect(len(transformerA.PassedLogs)).NotTo(BeZero()) + Expect(transformerA.PassedLogs[0].Log).To(Equal(logA)) + Expect(len(transformerB.PassedLogs)).NotTo(BeZero()) + Expect(transformerB.PassedLogs[0].Log).To(Equal(logB)) }) Describe("uses the LogFetcher correctly:", func() { var fakeTransformer mocks.MockTransformer BeforeEach(func() { - repository.SetMissingHeaders([]core.Header{fakes.FakeHeader}) fakeTransformer = mocks.MockTransformer{} }) diff --git a/pkg/core/log.go b/pkg/core/log.go index 16962055..d1d576d6 100644 --- a/pkg/core/log.go +++ b/pkg/core/log.go @@ -16,6 +16,8 @@ package core +import "github.com/ethereum/go-ethereum/core/types" + type Log struct { BlockNumber int64 TxHash string @@ -24,3 +26,10 @@ type Log struct { Index int64 Data string } + +type HeaderSyncLog struct { + ID int64 + HeaderID int64 `db:"header_id"` + Log types.Log + Transformed bool +} diff --git a/test_config/test_config.go b/test_config/test_config.go index b8f62b72..0d3ee032 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -113,6 +113,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM full_sync_receipts") db.MustExec("DELETE FROM full_sync_transactions") db.MustExec("DELETE FROM goose_db_version") + db.MustExec("DELETE FROM header_sync_logs") db.MustExec("DELETE FROM header_sync_receipts") db.MustExec("DELETE FROM header_sync_transactions") db.MustExec("DELETE FROM headers")