diff --git a/db/migrations/00025_create_light_sync_transactions_table.sql b/db/migrations/00025_create_light_sync_transactions_table.sql new file mode 100644 index 00000000..f72f503e --- /dev/null +++ b/db/migrations/00025_create_light_sync_transactions_table.sql @@ -0,0 +1,14 @@ +-- +goose Up +CREATE TABLE light_sync_transactions ( + id SERIAL PRIMARY KEY, + header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE, + hash TEXT, + raw JSONB, + tx_index INTEGER, + tx_from TEXT, + tx_to TEXT, + UNIQUE (header_id, hash) +); + +-- +goose Down +DROP TABLE light_sync_transactions; diff --git a/db/schema.sql b/db/schema.sql index d6a05de6..d5496079 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -258,6 +258,41 @@ CREATE SEQUENCE public.headers_id_seq ALTER SEQUENCE public.headers_id_seq OWNED BY public.headers.id; +-- +-- Name: light_sync_transactions; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.light_sync_transactions ( + id integer NOT NULL, + header_id integer NOT NULL, + hash text, + raw jsonb, + tx_index integer, + tx_from text, + tx_to text +); + + +-- +-- Name: light_sync_transactions_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.light_sync_transactions_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: light_sync_transactions_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.light_sync_transactions_id_seq OWNED BY public.light_sync_transactions.id; + + -- -- Name: log_filters; Type: TABLE; Schema: public; Owner: - -- @@ -504,6 +539,13 @@ ALTER TABLE ONLY public.goose_db_version ALTER COLUMN id SET DEFAULT nextval('pu ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.headers_id_seq'::regclass); +-- +-- Name: light_sync_transactions id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.light_sync_transactions ALTER COLUMN id SET DEFAULT nextval('public.light_sync_transactions_id_seq'::regclass); + + -- -- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: - -- @@ -603,6 +645,22 @@ ALTER TABLE ONLY public.headers ADD CONSTRAINT headers_pkey PRIMARY KEY (id); +-- +-- Name: light_sync_transactions light_sync_transactions_header_id_hash_key; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.light_sync_transactions + ADD CONSTRAINT light_sync_transactions_header_id_hash_key UNIQUE (header_id, hash); + + +-- +-- Name: light_sync_transactions light_sync_transactions_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.light_sync_transactions + ADD CONSTRAINT light_sync_transactions_pkey PRIMARY KEY (id); + + -- -- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -733,6 +791,14 @@ ALTER TABLE ONLY public.full_sync_transactions ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; +-- +-- Name: light_sync_transactions light_sync_transactions_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.light_sync_transactions + ADD CONSTRAINT light_sync_transactions_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; + + -- -- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/pkg/contract_watcher/shared/helpers/test_helpers/database.go b/pkg/contract_watcher/shared/helpers/test_helpers/database.go index c33a304e..8459c21c 100644 --- a/pkg/contract_watcher/shared/helpers/test_helpers/database.go +++ b/pkg/contract_watcher/shared/helpers/test_helpers/database.go @@ -240,6 +240,9 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`DELETE FROM full_sync_transactions`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec("DELETE FROM light_sync_transactions") + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM receipts`) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index 83126e1f..1f886003 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -49,11 +49,21 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int return 0, ErrValidHeaderExists } +func (repository HeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error { + _, err := repository.database.Exec(`INSERT INTO public.light_sync_transactions + (header_id, hash, tx_to, tx_from, tx_index) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING`, + headerID, transaction.Hash, transaction.To, transaction.From, transaction.TxIndex) + return err +} + func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { var header core.Header err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, blockNumber, repository.database.Node.ID) - log.Error("GetHeader: error getting headers: ", err) + if err != nil { + log.Error("GetHeader: error getting headers: ", err) + } return header, err } diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index 17dd5dbb..d2dbd33c 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -180,6 +180,57 @@ var _ = Describe("Block header repository", func() { }) }) + Describe("creating a transaction", func() { + It("adds a transaction", func() { + headerID, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + fromAddress := common.HexToAddress("0x1234") + toAddress := common.HexToAddress("0x5678") + txHash := common.HexToHash("0x9876") + txIndex := big.NewInt(123) + transaction := core.Transaction{ + From: fromAddress.Hex(), + Hash: txHash.Hex(), + To: toAddress.Hex(), + TxIndex: txIndex.Int64(), + } + + insertErr := repo.CreateTransaction(headerID, transaction) + + Expect(insertErr).NotTo(HaveOccurred()) + var dbTransaction core.Transaction + err = db.Get(&dbTransaction, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(dbTransaction).To(Equal(transaction)) + }) + + It("silently ignores duplicate inserts", func() { + headerID, err := repo.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + fromAddress := common.HexToAddress("0x1234") + toAddress := common.HexToAddress("0x5678") + txHash := common.HexToHash("0x9876") + txIndex := big.NewInt(123) + transaction := core.Transaction{ + From: fromAddress.Hex(), + Hash: txHash.Hex(), + To: toAddress.Hex(), + TxIndex: txIndex.Int64(), + } + + insertErr := repo.CreateTransaction(headerID, transaction) + Expect(insertErr).NotTo(HaveOccurred()) + + insertTwoErr := repo.CreateTransaction(headerID, transaction) + Expect(insertTwoErr).NotTo(HaveOccurred()) + + var dbTransactions []core.Transaction + err = db.Select(&dbTransactions, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(len(dbTransactions)).To(Equal(1)) + }) + }) + Describe("Getting a header", func() { It("returns header if it exists", func() { _, err = repo.CreateOrUpdateHeader(header) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 7f942051..b26bb8ce 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -55,6 +55,7 @@ type FilterRepository interface { type HeaderRepository interface { CreateOrUpdateHeader(header core.Header) (int64, error) + CreateTransaction(headerID int64, transaction core.Transaction) error GetHeader(blockNumber int64) (core.Header, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) } diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index 42a21f5c..ef196949 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -56,6 +56,10 @@ func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header) return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr } +func (repository *MockHeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error { + panic("implement me") +} + func (repository *MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { repository.GetHeaderPassedBlockNumber = blockNumber return core.Header{BlockNumber: blockNumber, Hash: repository.getHeaderReturnBlockHash}, repository.getHeaderError diff --git a/pkg/plugin/test_helpers/database.go b/pkg/plugin/test_helpers/database.go index 6fa82ad7..e4b941af 100644 --- a/pkg/plugin/test_helpers/database.go +++ b/pkg/plugin/test_helpers/database.go @@ -69,6 +69,9 @@ func TearDown(db *postgres.DB) { _, err = tx.Exec(`DELETE FROM full_sync_transactions`) Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec("DELETE FROM light_sync_transactions") + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM receipts`) Expect(err).NotTo(HaveOccurred()) diff --git a/test_config/test_config.go b/test_config/test_config.go index 60a131f9..1e2fbdc3 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -111,6 +111,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM full_sync_transactions") db.MustExec("DELETE FROM goose_db_version") db.MustExec("DELETE FROM headers") + db.MustExec("DELETE FROM light_sync_transactions") db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM logs") db.MustExec("DELETE FROM queued_storage")