Rename checked_logs => watched_logs

- We're logging that a given log has been included in any fetch calls
  for checked headers, rather than that we have already checked for
  that log
This commit is contained in:
Rob Mulholand 2019-09-10 21:22:14 -05:00
parent 13d503b851
commit 3f9b034c4c
9 changed files with 98 additions and 98 deletions

View File

@ -1,6 +1,6 @@
-- +goose Up
-- SQL in this section is executed when the migration is applied.
CREATE TABLE public.checked_logs
CREATE TABLE public.watched_logs
(
id SERIAL PRIMARY KEY,
contract_address VARCHAR(42),
@ -9,4 +9,4 @@ CREATE TABLE public.checked_logs
-- +goose Down
-- SQL in this section is executed when the migration is rolled back.
DROP TABLE public.checked_logs;
DROP TABLE public.watched_logs;

View File

@ -155,37 +155,6 @@ CREATE SEQUENCE public.checked_headers_id_seq
ALTER SEQUENCE public.checked_headers_id_seq OWNED BY public.checked_headers.id;
--
-- Name: checked_logs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.checked_logs (
id integer NOT NULL,
contract_address character varying(42),
topic_zero character varying(66)
);
--
-- Name: checked_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.checked_logs_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: checked_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.checked_logs_id_seq OWNED BY public.checked_logs.id;
--
-- Name: eth_nodes; Type: TABLE; Schema: public; Owner: -
--
@ -666,6 +635,37 @@ CREATE VIEW public.watched_event_logs AS
WHERE ((((log_filters.topic0)::text = (full_sync_logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (full_sync_logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (full_sync_logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (full_sync_logs.topic3)::text) OR (log_filters.topic3 IS NULL)));
--
-- Name: watched_logs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.watched_logs (
id integer NOT NULL,
contract_address character varying(42),
topic_zero character varying(66)
);
--
-- Name: watched_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.watched_logs_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: watched_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.watched_logs_id_seq OWNED BY public.watched_logs.id;
--
-- Name: addresses id; Type: DEFAULT; Schema: public; Owner: -
--
@ -687,13 +687,6 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block
ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('public.checked_headers_id_seq'::regclass);
--
-- Name: checked_logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.checked_logs ALTER COLUMN id SET DEFAULT nextval('public.checked_logs_id_seq'::regclass);
--
-- Name: eth_nodes id; Type: DEFAULT; Schema: public; Owner: -
--
@ -785,6 +778,13 @@ ALTER TABLE ONLY public.uncles ALTER COLUMN id SET DEFAULT nextval('public.uncle
ALTER TABLE ONLY public.watched_contracts ALTER COLUMN contract_id SET DEFAULT nextval('public.watched_contracts_contract_id_seq'::regclass);
--
-- Name: watched_logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.watched_logs ALTER COLUMN id SET DEFAULT nextval('public.watched_logs_id_seq'::regclass);
--
-- Name: addresses addresses_address_key; Type: CONSTRAINT; Schema: public; Owner: -
--
@ -825,14 +825,6 @@ ALTER TABLE ONLY public.checked_headers
ADD CONSTRAINT checked_headers_pkey PRIMARY KEY (id);
--
-- Name: checked_logs checked_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.checked_logs
ADD CONSTRAINT checked_logs_pkey PRIMARY KEY (id);
--
-- Name: blocks eth_node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: -
--
@ -1001,6 +993,14 @@ ALTER TABLE ONLY public.watched_contracts
ADD CONSTRAINT watched_contracts_pkey PRIMARY KEY (contract_id);
--
-- Name: watched_logs watched_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.watched_logs
ADD CONSTRAINT watched_logs_pkey PRIMARY KEY (id);
--
-- Name: block_id_index; Type: INDEX; Schema: public; Owner: -
--

View File

@ -138,18 +138,18 @@ func getCheckCount(recheckHeaders constants.TransformerExecution) int64 {
}
func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error {
hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic)
if hasBeenCheckedErr != nil {
return hasBeenCheckedErr
alreadyWatchingLog, watchingLogErr := extractor.CheckedLogsRepository.AlreadyWatchingLog(config.ContractAddresses, config.Topic)
if watchingLogErr != nil {
return watchingLogErr
}
if !hasBeenChecked {
if !alreadyWatchingLog {
uncheckHeadersErr := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber)
if uncheckHeadersErr != nil {
return uncheckHeadersErr
}
markLogsCheckedErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic)
if markLogsCheckedErr != nil {
return markLogsCheckedErr
markLogWatchedErr := extractor.CheckedLogsRepository.MarkLogWatched(config.ContractAddresses, config.Topic)
if markLogWatchedErr != nil {
return markLogWatchedErr
}
}
return nil

View File

@ -91,7 +91,7 @@ var _ = Describe("Log extractor", func() {
})
It("returns error if checking whether log has been checked returns error", func() {
checkedLogsRepository.HasLogBeenCheckedError = fakes.FakeError
checkedLogsRepository.AlreadyWatchingLogError = fakes.FakeError
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
@ -101,7 +101,7 @@ var _ = Describe("Log extractor", func() {
Describe("when log has previously been checked", func() {
It("does not mark any headers unchecked", func() {
checkedLogsRepository.HasLogBeenCheckedReturn = true
checkedLogsRepository.AlreadyWatchingLogReturn = true
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
@ -112,7 +112,7 @@ var _ = Describe("Log extractor", func() {
Describe("when log has not previously been checked", func() {
BeforeEach(func() {
checkedLogsRepository.HasLogBeenCheckedReturn = false
checkedLogsRepository.AlreadyWatchingLogReturn = false
})
It("marks headers since transformer's starting block number as unchecked", func() {
@ -140,12 +140,12 @@ var _ = Describe("Log extractor", func() {
err := extractor.AddTransformerConfig(config)
Expect(err).NotTo(HaveOccurred())
Expect(checkedLogsRepository.MarkLogCheckedAddresses).To(Equal(config.ContractAddresses))
Expect(checkedLogsRepository.MarkLogCheckedTopicZero).To(Equal(config.Topic))
Expect(checkedLogsRepository.MarkLogWatchedAddresses).To(Equal(config.ContractAddresses))
Expect(checkedLogsRepository.MarkLogWatchedTopicZero).To(Equal(config.Topic))
})
It("returns error if marking logs checked returns error", func() {
checkedLogsRepository.MarkLogCheckedError = fakes.FakeError
checkedLogsRepository.MarkLogWatchedError = fakes.FakeError
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))

View File

@ -30,10 +30,10 @@ func NewCheckedLogsRepository(db *postgres.DB) CheckedLogsRepository {
}
// Return whether a given address + topic0 has been fetched on a previous run of vDB
func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) {
func (repository CheckedLogsRepository) AlreadyWatchingLog(addresses []string, topic0 string) (bool, error) {
for _, address := range addresses {
var addressExists bool
getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1)`, address)
getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1)`, address)
if getAddressExistsErr != nil {
return false, getAddressExistsErr
}
@ -42,7 +42,7 @@ func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string,
}
}
var topicZeroExists bool
getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE topic_zero = $1)`, topic0)
getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE topic_zero = $1)`, topic0)
if getTopicZeroExistsErr != nil {
return false, getTopicZeroExistsErr
}
@ -50,13 +50,13 @@ func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string,
}
// Persist that a given address + topic0 has is being fetched on this run of vDB
func (repository CheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error {
func (repository CheckedLogsRepository) MarkLogWatched(addresses []string, topic0 string) error {
tx, txErr := repository.db.Beginx()
if txErr != nil {
return txErr
}
for _, address := range addresses {
_, insertErr := tx.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0)
_, insertErr := tx.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0)
if insertErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {

View File

@ -47,12 +47,12 @@ var _ = Describe("Checked logs repository", func() {
Expect(closeErr).NotTo(HaveOccurred())
})
Describe("HaveLogsBeenChecked", func() {
Describe("AlreadyWatchingLog", func() {
It("returns true if all addresses and the topic0 are already present in the db", func() {
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
_, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeTrue())
@ -62,13 +62,13 @@ var _ = Describe("Checked logs repository", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex()
// insert row with matching address but different topic0
_, insertOneErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
_, insertOneErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
Expect(insertOneErr).NotTo(HaveOccurred())
// insert row with matching topic0 but different address
_, insertTwoErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero)
_, insertTwoErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero)
Expect(insertTwoErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeTrue())
@ -76,10 +76,10 @@ var _ = Describe("Checked logs repository", func() {
It("returns false if any address has not been checked", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
_, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
hasBeenChecked, err := repository.AlreadyWatchingLog(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeFalse())
@ -87,27 +87,27 @@ var _ = Describe("Checked logs repository", func() {
It("returns false if topic0 has not been checked", func() {
anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex()
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
_, insertErr := db.Exec(`INSERT INTO public.watched_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
hasBeenChecked, err := repository.AlreadyWatchingLog(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeFalse())
})
})
Describe("MarkLogsChecked", func() {
Describe("MarkLogWatched", func() {
It("adds a row for all of transformer's addresses + topic0", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
err := repository.MarkLogsChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
err := repository.MarkLogWatched(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
var comboOneExists, comboTwoExists bool
getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero)
getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero)
Expect(getComboOneErr).NotTo(HaveOccurred())
Expect(comboOneExists).To(BeTrue())
getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero)
getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.watched_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero)
Expect(getComboTwoErr).NotTo(HaveOccurred())
Expect(comboTwoExists).To(BeTrue())
})

View File

@ -41,8 +41,8 @@ type CheckedHeadersRepository interface {
}
type CheckedLogsRepository interface {
HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error)
MarkLogsChecked(addresses []string, topic0 string) error
AlreadyWatchingLog(addresses []string, topic0 string) (bool, error)
MarkLogWatched(addresses []string, topic0 string) error
}
type ContractRepository interface {

View File

@ -17,23 +17,23 @@
package fakes
type MockCheckedLogsRepository struct {
HasLogBeenCheckedAddresses []string
HasLogBeenCheckedError error
HasLogBeenCheckedReturn bool
HasLogBeenCheckedTopicZero string
MarkLogCheckedAddresses []string
MarkLogCheckedError error
MarkLogCheckedTopicZero string
AlreadyWatchingLogAddresses []string
AlreadyWatchingLogError error
AlreadyWatchingLogReturn bool
AlreadyWatchingLogTopicZero string
MarkLogWatchedAddresses []string
MarkLogWatchedError error
MarkLogWatchedTopicZero string
}
func (repository *MockCheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) {
repository.HasLogBeenCheckedAddresses = addresses
repository.HasLogBeenCheckedTopicZero = topic0
return repository.HasLogBeenCheckedReturn, repository.HasLogBeenCheckedError
func (repository *MockCheckedLogsRepository) AlreadyWatchingLog(addresses []string, topic0 string) (bool, error) {
repository.AlreadyWatchingLogAddresses = addresses
repository.AlreadyWatchingLogTopicZero = topic0
return repository.AlreadyWatchingLogReturn, repository.AlreadyWatchingLogError
}
func (repository *MockCheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error {
repository.MarkLogCheckedAddresses = addresses
repository.MarkLogCheckedTopicZero = topic0
return repository.MarkLogCheckedError
func (repository *MockCheckedLogsRepository) MarkLogWatched(addresses []string, topic0 string) error {
repository.MarkLogWatchedAddresses = addresses
repository.MarkLogWatchedTopicZero = topic0
return repository.MarkLogWatchedError
}

View File

@ -106,7 +106,6 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM addresses")
db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM checked_headers")
db.MustExec("DELETE FROM checked_logs")
// can't delete from eth_nodes since this function is called after the required eth_node is persisted
db.MustExec("DELETE FROM full_sync_logs")
db.MustExec("DELETE FROM full_sync_receipts")
@ -119,6 +118,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM queued_storage")
db.MustExec("DELETE FROM watched_contracts")
db.MustExec("DELETE FROM watched_logs")
}
func CleanCheckedHeadersTable(db *postgres.DB, columnNames []string) {