diff --git a/db/migrations/00030_create_checked_logs_table.sql b/db/migrations/00030_create_watched_logs_table.sql similarity index 81% rename from db/migrations/00030_create_checked_logs_table.sql rename to db/migrations/00030_create_watched_logs_table.sql index 1a77560a..4268a68a 100644 --- a/db/migrations/00030_create_checked_logs_table.sql +++ b/db/migrations/00030_create_watched_logs_table.sql @@ -1,6 +1,6 @@ -- +goose Up -- SQL in this section is executed when the migration is applied. -CREATE TABLE public.checked_logs +CREATE TABLE public.watched_logs ( id SERIAL PRIMARY KEY, contract_address VARCHAR(42), @@ -9,4 +9,4 @@ CREATE TABLE public.checked_logs -- +goose Down -- SQL in this section is executed when the migration is rolled back. -DROP TABLE public.checked_logs; +DROP TABLE public.watched_logs; diff --git a/db/schema.sql b/db/schema.sql index 0a5bb3a1..eeb1c81e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -155,37 +155,6 @@ CREATE SEQUENCE public.checked_headers_id_seq ALTER SEQUENCE public.checked_headers_id_seq OWNED BY public.checked_headers.id; --- --- Name: checked_logs; Type: TABLE; Schema: public; Owner: - --- - -CREATE TABLE public.checked_logs ( - id integer NOT NULL, - contract_address character varying(42), - topic_zero character varying(66) -); - - --- --- Name: checked_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - --- - -CREATE SEQUENCE public.checked_logs_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - --- --- Name: checked_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - --- - -ALTER SEQUENCE public.checked_logs_id_seq OWNED BY public.checked_logs.id; - - -- -- Name: eth_nodes; Type: TABLE; Schema: public; Owner: - -- @@ -666,6 +635,37 @@ CREATE VIEW public.watched_event_logs AS WHERE ((((log_filters.topic0)::text = (full_sync_logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (full_sync_logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (full_sync_logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (full_sync_logs.topic3)::text) OR (log_filters.topic3 IS NULL))); +-- +-- Name: watched_logs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.watched_logs ( + id integer NOT NULL, + contract_address character varying(42), + topic_zero character varying(66) +); + + +-- +-- Name: watched_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.watched_logs_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: watched_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.watched_logs_id_seq OWNED BY public.watched_logs.id; + + -- -- Name: addresses id; Type: DEFAULT; Schema: public; Owner: - -- @@ -687,13 +687,6 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('public.checked_headers_id_seq'::regclass); --- --- Name: checked_logs id; Type: DEFAULT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.checked_logs ALTER COLUMN id SET DEFAULT nextval('public.checked_logs_id_seq'::regclass); - - -- -- Name: eth_nodes id; Type: DEFAULT; Schema: public; Owner: - -- @@ -785,6 +778,13 @@ ALTER TABLE ONLY public.uncles ALTER COLUMN id SET DEFAULT nextval('public.uncle ALTER TABLE ONLY public.watched_contracts ALTER COLUMN contract_id SET DEFAULT nextval('public.watched_contracts_contract_id_seq'::regclass); +-- +-- Name: watched_logs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.watched_logs ALTER COLUMN id SET DEFAULT nextval('public.watched_logs_id_seq'::regclass); + + -- -- Name: addresses addresses_address_key; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -825,14 +825,6 @@ ALTER TABLE ONLY public.checked_headers ADD CONSTRAINT checked_headers_pkey PRIMARY KEY (id); --- --- Name: checked_logs checked_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.checked_logs - ADD CONSTRAINT checked_logs_pkey PRIMARY KEY (id); - - -- -- Name: blocks eth_node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1001,6 +993,14 @@ ALTER TABLE ONLY public.watched_contracts ADD CONSTRAINT watched_contracts_pkey PRIMARY KEY (contract_id); +-- +-- Name: watched_logs watched_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.watched_logs + ADD CONSTRAINT watched_logs_pkey PRIMARY KEY (id); + + -- -- Name: block_id_index; Type: INDEX; Schema: public; Owner: - -- diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index d2452870..36fe6eda 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -138,18 +138,18 @@ func getCheckCount(recheckHeaders constants.TransformerExecution) int64 { } func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error { - hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic) - if hasBeenCheckedErr != nil { - return hasBeenCheckedErr + alreadyWatchingLog, watchingLogErr := extractor.CheckedLogsRepository.AlreadyWatchingLog(config.ContractAddresses, config.Topic) + if watchingLogErr != nil { + return watchingLogErr } - if !hasBeenChecked { + if !alreadyWatchingLog { uncheckHeadersErr := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber) if uncheckHeadersErr != nil { return uncheckHeadersErr } - markLogsCheckedErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic) - if markLogsCheckedErr != nil { - return markLogsCheckedErr + markLogWatchedErr := extractor.CheckedLogsRepository.MarkLogWatched(config.ContractAddresses, config.Topic) + if markLogWatchedErr != nil { + return markLogWatchedErr } } return nil diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index 2a3ce5e9..aecd8e28 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -91,7 +91,7 @@ var _ = Describe("Log extractor", func() { }) It("returns error if checking whether log has been checked returns error", func() { - checkedLogsRepository.HasLogBeenCheckedError = fakes.FakeError + checkedLogsRepository.AlreadyWatchingLogError = fakes.FakeError err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) @@ -101,7 +101,7 @@ var _ = Describe("Log extractor", func() { Describe("when log has previously been checked", func() { It("does not mark any headers unchecked", func() { - checkedLogsRepository.HasLogBeenCheckedReturn = true + checkedLogsRepository.AlreadyWatchingLogReturn = true err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) @@ -112,7 +112,7 @@ var _ = Describe("Log extractor", func() { Describe("when log has not previously been checked", func() { BeforeEach(func() { - checkedLogsRepository.HasLogBeenCheckedReturn = false + checkedLogsRepository.AlreadyWatchingLogReturn = false }) It("marks headers since transformer's starting block number as unchecked", func() { @@ -140,12 +140,12 @@ var _ = Describe("Log extractor", func() { err := extractor.AddTransformerConfig(config) Expect(err).NotTo(HaveOccurred()) - Expect(checkedLogsRepository.MarkLogCheckedAddresses).To(Equal(config.ContractAddresses)) - Expect(checkedLogsRepository.MarkLogCheckedTopicZero).To(Equal(config.Topic)) + Expect(checkedLogsRepository.MarkLogWatchedAddresses).To(Equal(config.ContractAddresses)) + Expect(checkedLogsRepository.MarkLogWatchedTopicZero).To(Equal(config.Topic)) }) It("returns error if marking logs checked returns error", func() { - checkedLogsRepository.MarkLogCheckedError = fakes.FakeError + checkedLogsRepository.MarkLogWatchedError = fakes.FakeError err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) diff --git a/pkg/datastore/postgres/repositories/checked_logs_repository.go b/pkg/datastore/postgres/repositories/checked_logs_repository.go index 113be3ed..b3f012cb 100644 --- a/pkg/datastore/postgres/repositories/checked_logs_repository.go +++ b/pkg/datastore/postgres/repositories/checked_logs_repository.go @@ -30,10 +30,10 @@ func NewCheckedLogsRepository(db *postgres.DB) CheckedLogsRepository { } // Return whether a given address + topic0 has been fetched on a previous run of vDB -func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) { +func (repository CheckedLogsRepository) AlreadyWatchingLog(addresses []string, topic0 string) (bool, error) { for _, address := range addresses { var addressExists bool - getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1)`, address) + getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1)`, address) if getAddressExistsErr != nil { return false, getAddressExistsErr } @@ -42,7 +42,7 @@ func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, } } var topicZeroExists bool - getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE topic_zero = $1)`, topic0) + getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE topic_zero = $1)`, topic0) if getTopicZeroExistsErr != nil { return false, getTopicZeroExistsErr } @@ -50,13 +50,13 @@ func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, } // Persist that a given address + topic0 has is being fetched on this run of vDB -func (repository CheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error { +func (repository CheckedLogsRepository) MarkLogWatched(addresses []string, topic0 string) error { tx, txErr := repository.db.Beginx() if txErr != nil { return txErr } for _, address := range addresses { - _, insertErr := tx.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0) + _, insertErr := tx.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0) if insertErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { diff --git a/pkg/datastore/postgres/repositories/checked_logs_repository_test.go b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go index 8597d5fe..6bb9a8db 100644 --- a/pkg/datastore/postgres/repositories/checked_logs_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go @@ -47,12 +47,12 @@ var _ = Describe("Checked logs repository", func() { Expect(closeErr).NotTo(HaveOccurred()) }) - Describe("HaveLogsBeenChecked", func() { + Describe("AlreadyWatchingLog", func() { It("returns true if all addresses and the topic0 are already present in the db", func() { - _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) + _, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) Expect(insertErr).NotTo(HaveOccurred()) - hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero) Expect(err).NotTo(HaveOccurred()) Expect(hasBeenChecked).To(BeTrue()) @@ -62,13 +62,13 @@ var _ = Describe("Checked logs repository", func() { anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex() // insert row with matching address but different topic0 - _, insertOneErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) + _, insertOneErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) Expect(insertOneErr).NotTo(HaveOccurred()) // insert row with matching topic0 but different address - _, insertTwoErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero) + _, insertTwoErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero) Expect(insertTwoErr).NotTo(HaveOccurred()) - hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero) Expect(err).NotTo(HaveOccurred()) Expect(hasBeenChecked).To(BeTrue()) @@ -76,10 +76,10 @@ var _ = Describe("Checked logs repository", func() { It("returns false if any address has not been checked", func() { anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() - _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) + _, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) Expect(insertErr).NotTo(HaveOccurred()) - hasBeenChecked, err := repository.HaveLogsBeenChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) + hasBeenChecked, err := repository.AlreadyWatchingLog(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) Expect(err).NotTo(HaveOccurred()) Expect(hasBeenChecked).To(BeFalse()) @@ -87,27 +87,27 @@ var _ = Describe("Checked logs repository", func() { It("returns false if topic0 has not been checked", func() { anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex() - _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) + _, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) Expect(insertErr).NotTo(HaveOccurred()) - hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero) Expect(err).NotTo(HaveOccurred()) Expect(hasBeenChecked).To(BeFalse()) }) }) - Describe("MarkLogsChecked", func() { + Describe("MarkLogWatched", func() { It("adds a row for all of transformer's addresses + topic0", func() { anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() - err := repository.MarkLogsChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) + err := repository.MarkLogWatched(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) Expect(err).NotTo(HaveOccurred()) var comboOneExists, comboTwoExists bool - getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero) + getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero) Expect(getComboOneErr).NotTo(HaveOccurred()) Expect(comboOneExists).To(BeTrue()) - getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero) + getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero) Expect(getComboTwoErr).NotTo(HaveOccurred()) Expect(comboTwoExists).To(BeTrue()) }) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index ff0f02dd..4ea1e293 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -41,8 +41,8 @@ type CheckedHeadersRepository interface { } type CheckedLogsRepository interface { - HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) - MarkLogsChecked(addresses []string, topic0 string) error + AlreadyWatchingLog(addresses []string, topic0 string) (bool, error) + MarkLogWatched(addresses []string, topic0 string) error } type ContractRepository interface { diff --git a/pkg/fakes/checked_logs_repository.go b/pkg/fakes/checked_logs_repository.go index fca57e96..8506746a 100644 --- a/pkg/fakes/checked_logs_repository.go +++ b/pkg/fakes/checked_logs_repository.go @@ -17,23 +17,23 @@ package fakes type MockCheckedLogsRepository struct { - HasLogBeenCheckedAddresses []string - HasLogBeenCheckedError error - HasLogBeenCheckedReturn bool - HasLogBeenCheckedTopicZero string - MarkLogCheckedAddresses []string - MarkLogCheckedError error - MarkLogCheckedTopicZero string + AlreadyWatchingLogAddresses []string + AlreadyWatchingLogError error + AlreadyWatchingLogReturn bool + AlreadyWatchingLogTopicZero string + MarkLogWatchedAddresses []string + MarkLogWatchedError error + MarkLogWatchedTopicZero string } -func (repository *MockCheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) { - repository.HasLogBeenCheckedAddresses = addresses - repository.HasLogBeenCheckedTopicZero = topic0 - return repository.HasLogBeenCheckedReturn, repository.HasLogBeenCheckedError +func (repository *MockCheckedLogsRepository) AlreadyWatchingLog(addresses []string, topic0 string) (bool, error) { + repository.AlreadyWatchingLogAddresses = addresses + repository.AlreadyWatchingLogTopicZero = topic0 + return repository.AlreadyWatchingLogReturn, repository.AlreadyWatchingLogError } -func (repository *MockCheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error { - repository.MarkLogCheckedAddresses = addresses - repository.MarkLogCheckedTopicZero = topic0 - return repository.MarkLogCheckedError +func (repository *MockCheckedLogsRepository) MarkLogWatched(addresses []string, topic0 string) error { + repository.MarkLogWatchedAddresses = addresses + repository.MarkLogWatchedTopicZero = topic0 + return repository.MarkLogWatchedError } diff --git a/test_config/test_config.go b/test_config/test_config.go index f3dad64e..eb830d34 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -106,7 +106,6 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM addresses") db.MustExec("DELETE FROM blocks") db.MustExec("DELETE FROM checked_headers") - db.MustExec("DELETE FROM checked_logs") // can't delete from eth_nodes since this function is called after the required eth_node is persisted db.MustExec("DELETE FROM full_sync_logs") db.MustExec("DELETE FROM full_sync_receipts") @@ -119,6 +118,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM queued_storage") db.MustExec("DELETE FROM watched_contracts") + db.MustExec("DELETE FROM watched_logs") } func CleanCheckedHeadersTable(db *postgres.DB, columnNames []string) {