Rename logs to full sync logs

- Enable creating new table for logs used in event watching based on
  header sync
This commit is contained in:
Rob Mulholand 2019-07-18 16:24:25 -05:00
parent b6a2a278ce
commit 3693ed905f
11 changed files with 97 additions and 97 deletions

View File

@ -1,5 +1,5 @@
-- +goose Up -- +goose Up
CREATE TABLE logs ( CREATE TABLE full_sync_logs (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
block_number BIGINT, block_number BIGINT,
address VARCHAR(66), address VARCHAR(66),
@ -10,10 +10,10 @@ CREATE TABLE logs (
topic2 VARCHAR(66), topic2 VARCHAR(66),
topic3 VARCHAR(66), topic3 VARCHAR(66),
data TEXT, data TEXT,
CONSTRAINT log_uc UNIQUE (block_number, index) CONSTRAINT full_sync_log_uc UNIQUE (block_number, index)
); );
-- +goose Down -- +goose Down
DROP TABLE logs; DROP TABLE full_sync_logs;

View File

@ -1,11 +1,11 @@
-- +goose Up -- +goose Up
ALTER TABLE logs ALTER TABLE full_sync_logs
DROP CONSTRAINT log_uc; DROP CONSTRAINT full_sync_log_uc;
ALTER TABLE logs ALTER TABLE full_sync_logs
ADD COLUMN receipt_id INT; ADD COLUMN receipt_id INT;
ALTER TABLE logs ALTER TABLE full_sync_logs
ADD CONSTRAINT receipts_fk ADD CONSTRAINT receipts_fk
FOREIGN KEY (receipt_id) FOREIGN KEY (receipt_id)
REFERENCES full_sync_receipts (id) REFERENCES full_sync_receipts (id)
@ -13,11 +13,11 @@ ON DELETE CASCADE;
-- +goose Down -- +goose Down
ALTER TABLE logs ALTER TABLE full_sync_logs
DROP CONSTRAINT receipts_fk; DROP CONSTRAINT receipts_fk;
ALTER TABLE logs ALTER TABLE full_sync_logs
DROP COLUMN receipt_id; DROP COLUMN receipt_id;
ALTER TABLE logs ALTER TABLE full_sync_logs
ADD CONSTRAINT log_uc UNIQUE (block_number, index); ADD CONSTRAINT full_sync_log_uc UNIQUE (block_number, index);

View File

@ -3,31 +3,31 @@ CREATE VIEW block_stats AS
SELECT SELECT
max(block_number) AS max_block, max(block_number) AS max_block,
min(block_number) AS min_block min(block_number) AS min_block
FROM logs; FROM full_sync_logs;
CREATE VIEW watched_event_logs AS CREATE VIEW watched_event_logs AS
SELECT SELECT
log_filters.name, log_filters.name,
logs.id, full_sync_logs.id,
block_number, block_number,
logs.address, full_sync_logs.address,
tx_hash, tx_hash,
index, index,
logs.topic0, full_sync_logs.topic0,
logs.topic1, full_sync_logs.topic1,
logs.topic2, full_sync_logs.topic2,
logs.topic3, full_sync_logs.topic3,
data, data,
receipt_id receipt_id
FROM log_filters FROM log_filters
CROSS JOIN block_stats CROSS JOIN block_stats
JOIN logs ON logs.address = log_filters.address JOIN full_sync_logs ON full_sync_logs.address = log_filters.address
AND logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block) AND full_sync_logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block)
AND logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block) AND full_sync_logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block)
WHERE (log_filters.topic0 = logs.topic0 OR log_filters.topic0 ISNULL) WHERE (log_filters.topic0 = full_sync_logs.topic0 OR log_filters.topic0 ISNULL)
AND (log_filters.topic1 = logs.topic1 OR log_filters.topic1 ISNULL) AND (log_filters.topic1 = full_sync_logs.topic1 OR log_filters.topic1 ISNULL)
AND (log_filters.topic2 = logs.topic2 OR log_filters.topic2 ISNULL) AND (log_filters.topic2 = full_sync_logs.topic2 OR log_filters.topic2 ISNULL)
AND (log_filters.topic3 = logs.topic3 OR log_filters.topic3 ISNULL); AND (log_filters.topic3 = full_sync_logs.topic3 OR log_filters.topic3 ISNULL);
-- +goose Down -- +goose Down
DROP VIEW watched_event_logs; DROP VIEW watched_event_logs;

View File

@ -2,8 +2,8 @@
-- PostgreSQL database dump -- PostgreSQL database dump
-- --
-- Dumped from database version 11.3 -- Dumped from database version 11.4
-- Dumped by pg_dump version 11.3 -- Dumped by pg_dump version 11.4
SET statement_timeout = 0; SET statement_timeout = 0;
SET lock_timeout = 0; SET lock_timeout = 0;
@ -51,10 +51,10 @@ ALTER SEQUENCE public.addresses_id_seq OWNED BY public.addresses.id;
-- --
-- Name: logs; Type: TABLE; Schema: public; Owner: - -- Name: full_sync_logs; Type: TABLE; Schema: public; Owner: -
-- --
CREATE TABLE public.logs ( CREATE TABLE public.full_sync_logs (
id integer NOT NULL, id integer NOT NULL,
block_number bigint, block_number bigint,
address character varying(66), address character varying(66),
@ -74,9 +74,9 @@ CREATE TABLE public.logs (
-- --
CREATE VIEW public.block_stats AS CREATE VIEW public.block_stats AS
SELECT max(logs.block_number) AS max_block, SELECT max(full_sync_logs.block_number) AS max_block,
min(logs.block_number) AS min_block min(full_sync_logs.block_number) AS min_block
FROM public.logs; FROM public.full_sync_logs;
-- --
@ -168,6 +168,26 @@ CREATE TABLE public.eth_nodes (
); );
--
-- Name: full_sync_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.full_sync_logs_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: full_sync_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.full_sync_logs_id_seq OWNED BY public.full_sync_logs.id;
-- --
-- Name: full_sync_receipts; Type: TABLE; Schema: public; Owner: - -- Name: full_sync_receipts; Type: TABLE; Schema: public; Owner: -
-- --
@ -429,26 +449,6 @@ CREATE SEQUENCE public.log_filters_id_seq
ALTER SEQUENCE public.log_filters_id_seq OWNED BY public.log_filters.id; ALTER SEQUENCE public.log_filters_id_seq OWNED BY public.log_filters.id;
--
-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.logs_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.logs_id_seq OWNED BY public.logs.id;
-- --
-- Name: nodes_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- Name: nodes_id_seq; Type: SEQUENCE; Schema: public; Owner: -
-- --
@ -577,21 +577,21 @@ ALTER SEQUENCE public.watched_contracts_contract_id_seq OWNED BY public.watched_
CREATE VIEW public.watched_event_logs AS CREATE VIEW public.watched_event_logs AS
SELECT log_filters.name, SELECT log_filters.name,
logs.id, full_sync_logs.id,
logs.block_number, full_sync_logs.block_number,
logs.address, full_sync_logs.address,
logs.tx_hash, full_sync_logs.tx_hash,
logs.index, full_sync_logs.index,
logs.topic0, full_sync_logs.topic0,
logs.topic1, full_sync_logs.topic1,
logs.topic2, full_sync_logs.topic2,
logs.topic3, full_sync_logs.topic3,
logs.data, full_sync_logs.data,
logs.receipt_id full_sync_logs.receipt_id
FROM ((public.log_filters FROM ((public.log_filters
CROSS JOIN public.block_stats) CROSS JOIN public.block_stats)
JOIN public.logs ON ((((logs.address)::text = (log_filters.address)::text) AND (logs.block_number >= COALESCE(log_filters.from_block, block_stats.min_block)) AND (logs.block_number <= COALESCE(log_filters.to_block, block_stats.max_block))))) JOIN public.full_sync_logs ON ((((full_sync_logs.address)::text = (log_filters.address)::text) AND (full_sync_logs.block_number >= COALESCE(log_filters.from_block, block_stats.min_block)) AND (full_sync_logs.block_number <= COALESCE(log_filters.to_block, block_stats.max_block)))))
WHERE ((((log_filters.topic0)::text = (logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (logs.topic3)::text) OR (log_filters.topic3 IS NULL))); WHERE ((((log_filters.topic0)::text = (full_sync_logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (full_sync_logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (full_sync_logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (full_sync_logs.topic3)::text) OR (log_filters.topic3 IS NULL)));
-- --
@ -622,6 +622,13 @@ ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('pub
ALTER TABLE ONLY public.eth_nodes ALTER COLUMN id SET DEFAULT nextval('public.nodes_id_seq'::regclass); ALTER TABLE ONLY public.eth_nodes ALTER COLUMN id SET DEFAULT nextval('public.nodes_id_seq'::regclass);
--
-- Name: full_sync_logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.full_sync_logs ALTER COLUMN id SET DEFAULT nextval('public.full_sync_logs_id_seq'::regclass);
-- --
-- Name: full_sync_receipts id; Type: DEFAULT; Schema: public; Owner: - -- Name: full_sync_receipts id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -671,13 +678,6 @@ ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.head
ALTER TABLE ONLY public.log_filters ALTER COLUMN id SET DEFAULT nextval('public.log_filters_id_seq'::regclass); ALTER TABLE ONLY public.log_filters ALTER COLUMN id SET DEFAULT nextval('public.log_filters_id_seq'::regclass);
--
-- Name: logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass);
-- --
-- Name: queued_storage id; Type: DEFAULT; Schema: public; Owner: - -- Name: queued_storage id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -755,6 +755,14 @@ ALTER TABLE ONLY public.eth_nodes
ADD CONSTRAINT eth_node_uc UNIQUE (genesis_block, network_id, eth_node_id); ADD CONSTRAINT eth_node_uc UNIQUE (genesis_block, network_id, eth_node_id);
--
-- Name: full_sync_logs full_sync_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.full_sync_logs
ADD CONSTRAINT full_sync_logs_pkey PRIMARY KEY (id);
-- --
-- Name: full_sync_receipts full_sync_receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- Name: full_sync_receipts full_sync_receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: -
-- --
@ -819,14 +827,6 @@ ALTER TABLE ONLY public.headers
ADD CONSTRAINT headers_pkey PRIMARY KEY (id); ADD CONSTRAINT headers_pkey PRIMARY KEY (id);
--
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.logs
ADD CONSTRAINT logs_pkey PRIMARY KEY (id);
-- --
-- Name: log_filters name_uc; Type: CONSTRAINT; Schema: public; Owner: - -- Name: log_filters name_uc; Type: CONSTRAINT; Schema: public; Owner: -
-- --
@ -1014,10 +1014,10 @@ ALTER TABLE ONLY public.blocks
-- --
-- Name: logs receipts_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: full_sync_logs receipts_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --
ALTER TABLE ONLY public.logs ALTER TABLE ONLY public.full_sync_logs
ADD CONSTRAINT receipts_fk FOREIGN KEY (receipt_id) REFERENCES public.full_sync_receipts(id) ON DELETE CASCADE; ADD CONSTRAINT receipts_fk FOREIGN KEY (receipt_id) REFERENCES public.full_sync_receipts(id) ON DELETE CASCADE;

View File

@ -81,7 +81,7 @@ func (r *blockRetriever) retrieveFirstBlockFromLogs(contractAddr string) (int64,
var firstBlock int var firstBlock int
err := r.db.Get( err := r.db.Get(
&firstBlock, &firstBlock,
"SELECT block_number FROM logs WHERE lower(address) = $1 ORDER BY block_number ASC LIMIT 1", "SELECT block_number FROM full_sync_logs WHERE lower(address) = $1 ORDER BY block_number ASC LIMIT 1",
contractAddr, contractAddr,
) )

View File

@ -149,7 +149,7 @@ func SetupTusdRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string)
err = receiptRepository.CreateReceiptsAndLogs(blockId, receipts) err = receiptRepository.CreateReceiptsAndLogs(blockId, receipts)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = logRepository.Get(vulcanizeLogId, `SELECT id FROM logs`) err = logRepository.Get(vulcanizeLogId, `SELECT id FROM full_sync_logs`)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
info := SetupTusdContract(wantedEvents, wantedMethods) info := SetupTusdContract(wantedEvents, wantedMethods)
@ -195,7 +195,7 @@ func SetupENSRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string) (
err = receiptRepository.CreateReceiptsAndLogs(blockId, receipts) err = receiptRepository.CreateReceiptsAndLogs(blockId, receipts)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = logRepository.Get(vulcanizeLogId, `SELECT id FROM logs`) err = logRepository.Get(vulcanizeLogId, `SELECT id FROM full_sync_logs`)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
info := SetupENSContract(wantedEvents, wantedMethods) info := SetupENSContract(wantedEvents, wantedMethods)
@ -234,7 +234,7 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`DELETE FROM headers`) _, err = tx.Exec(`DELETE FROM headers`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM logs`) _, err = tx.Exec(`DELETE FROM full_sync_logs`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM log_filters`) _, err = tx.Exec(`DELETE FROM log_filters`)

View File

@ -231,7 +231,7 @@ func (r *eventRepository) newEventTable(tableID string, event types.Event) error
for _, field := range event.Fields { for _, field := range event.Fields {
pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType) pgStr = pgStr + fmt.Sprintf(" %s_ %s NOT NULL,", strings.ToLower(field.Name), field.PgType)
} }
pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES logs (id) ON DELETE CASCADE)" pgStr = pgStr + " CONSTRAINT log_index_fk FOREIGN KEY (vulcanize_log_id) REFERENCES full_sync_logs (id) ON DELETE CASCADE)"
case types.HeaderSync: case types.HeaderSync:
pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL," pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"

View File

@ -271,7 +271,7 @@ func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, b
func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int64) error { func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int64) error {
for _, tlog := range logs { for _, tlog := range logs {
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) `INSERT INTO full_sync_logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`, `,
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId, tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId,

View File

@ -56,7 +56,7 @@ func (receiptRepository FullSyncReceiptRepository) CreateReceiptsAndLogs(blockId
func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error { func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) `INSERT INTO full_sync_logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`, `,
log.BlockNumber, log.Address, log.TxHash, log.Index, log.Topics[0], log.Topics[1], log.Topics[2], log.Topics[3], log.Data, receiptId, log.BlockNumber, log.Address, log.TxHash, log.Index, log.Topics[0], log.Topics[1], log.Topics[2], log.Topics[3], log.Data, receiptId,

View File

@ -33,7 +33,7 @@ func (logRepository LogRepository) CreateLogs(lgs []core.Log, receiptId int64) e
tx, _ := logRepository.DB.Beginx() tx, _ := logRepository.DB.Beginx()
for _, tlog := range lgs { for _, tlog := range lgs {
_, insertLogErr := tx.Exec( _, insertLogErr := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) `INSERT INTO full_sync_logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`, `,
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId, tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId,
@ -68,7 +68,7 @@ func (logRepository LogRepository) GetLogs(address string, blockNumber int64) ([
topic2, topic2,
topic3, topic3,
data data
FROM logs FROM full_sync_logs
WHERE address = $1 AND block_number = $2 WHERE address = $1 AND block_number = $2
ORDER BY block_number DESC`, address, blockNumber) ORDER BY block_number DESC`, address, blockNumber)
if err != nil { if err != nil {

View File

@ -109,15 +109,15 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM blocks") db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM checked_headers") db.MustExec("DELETE FROM checked_headers")
// can't delete from eth_nodes since this function is called after the required eth_node is persisted // can't delete from eth_nodes since this function is called after the required eth_node is persisted
db.MustExec("DELETE FROM full_sync_logs")
db.MustExec("DELETE FROM full_sync_receipts")
db.MustExec("DELETE FROM full_sync_transactions") db.MustExec("DELETE FROM full_sync_transactions")
db.MustExec("DELETE FROM goose_db_version") db.MustExec("DELETE FROM goose_db_version")
db.MustExec("DELETE FROM headers")
db.MustExec("DELETE FROM header_sync_transactions")
db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM logs")
db.MustExec("DELETE FROM queued_storage")
db.MustExec("DELETE FROM full_sync_receipts")
db.MustExec("DELETE FROM header_sync_receipts") db.MustExec("DELETE FROM header_sync_receipts")
db.MustExec("DELETE FROM header_sync_transactions")
db.MustExec("DELETE FROM headers")
db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM queued_storage")
db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_contracts")
} }