diff --git a/db/migrations/00029_create_header_sync_logs_table.sql b/db/migrations/00029_create_header_sync_logs_table.sql index 83313ca7..4a6ec73b 100644 --- a/db/migrations/00029_create_header_sync_logs_table.sql +++ b/db/migrations/00029_create_header_sync_logs_table.sql @@ -4,7 +4,7 @@ CREATE TABLE header_sync_logs ( id SERIAL PRIMARY KEY, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, - address VARCHAR(66), + address INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE, topics BYTEA[], data BYTEA, block_number BIGINT, diff --git a/db/schema.sql b/db/schema.sql index f78a8d1c..0a5bb3a1 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -334,7 +334,7 @@ ALTER SEQUENCE public.goose_db_version_id_seq OWNED BY public.goose_db_version.i CREATE TABLE public.header_sync_logs ( id integer NOT NULL, header_id integer NOT NULL, - address character varying(66), + address integer NOT NULL, topics bytea[], data bytea, block_number bigint, @@ -1075,6 +1075,14 @@ ALTER TABLE ONLY public.full_sync_transactions ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; +-- +-- Name: header_sync_logs header_sync_logs_address_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.header_sync_logs + ADD CONSTRAINT header_sync_logs_address_fkey FOREIGN KEY (address) REFERENCES public.addresses(id) ON DELETE CASCADE; + + -- -- Name: header_sync_logs header_sync_logs_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go index b2c9948a..613432b7 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go @@ -34,13 +34,18 @@ var _ = Describe("Checked Headers repository", func() { repo datastore.CheckedHeadersRepository ) - Describe("MarkHeaderChecked", func() { - BeforeEach(func() { - db = test_config.NewTestDB(test_config.NewTestNode()) - test_config.CleanTestDB(db) - repo = repositories.NewCheckedHeadersRepository(db) - }) + BeforeEach(func() { + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + repo = repositories.NewCheckedHeadersRepository(db) + }) + AfterEach(func() { + closeErr := db.Close() + Expect(closeErr).NotTo(HaveOccurred()) + }) + + Describe("MarkHeaderChecked", func() { It("marks passed header as checked on insert", func() { headerRepository := repositories.NewHeaderRepository(db) headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) @@ -129,10 +134,7 @@ var _ = Describe("Checked Headers repository", func() { ) BeforeEach(func() { - db = test_config.NewTestDB(test_config.NewTestNode()) - test_config.CleanTestDB(db) headerRepository = repositories.NewHeaderRepository(db) - repo = repositories.NewCheckedHeadersRepository(db) startingBlockNumber = rand.Int63() middleBlockNumber = startingBlockNumber + 1 diff --git a/pkg/datastore/postgres/repositories/checked_logs_repository_test.go b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go index 351bbcaf..8597d5fe 100644 --- a/pkg/datastore/postgres/repositories/checked_logs_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go @@ -42,6 +42,11 @@ var _ = Describe("Checked logs repository", func() { repository = repositories.NewCheckedLogsRepository(db) }) + AfterEach(func() { + closeErr := db.Close() + Expect(closeErr).NotTo(HaveOccurred()) + }) + Describe("HaveLogsBeenChecked", func() { It("returns true if all addresses and the topic0 are already present in the db", func() { _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) diff --git a/pkg/datastore/postgres/repositories/header_sync_log_repository.go b/pkg/datastore/postgres/repositories/header_sync_log_repository.go index fef7e6b3..90aceab5 100644 --- a/pkg/datastore/postgres/repositories/header_sync_log_repository.go +++ b/pkg/datastore/postgres/repositories/header_sync_log_repository.go @@ -31,17 +31,21 @@ const insertHeaderSyncLogQuery = `INSERT INTO header_sync_logs VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING` type HeaderSyncLogRepository struct { - db *postgres.DB + db *postgres.DB + addressRepository AddressRepository } func NewHeaderSyncLogRepository(db *postgres.DB) HeaderSyncLogRepository { - return HeaderSyncLogRepository{db: db} + return HeaderSyncLogRepository{ + db: db, + addressRepository: AddressRepository{}, + } } type headerSyncLog struct { ID int64 HeaderID int64 `db:"header_id"` - Address string + Address int64 Topics pq.ByteaArray Data []byte BlockNumber uint64 `db:"block_number"` @@ -70,8 +74,12 @@ func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]co for _, topic := range rawLog.Topics { logTopics = append(logTopics, common.BytesToHash(topic)) } + address, addrErr := repository.addressRepository.GetAddressById(repository.db, rawLog.Address) + if addrErr != nil { + return nil, addrErr + } reconstructedLog := types.Log{ - Address: common.HexToAddress(rawLog.Address), + Address: common.HexToAddress(address), Topics: logTopics, Data: rawLog.Data, BlockNumber: rawLog.BlockNumber, @@ -102,7 +110,7 @@ func (repository HeaderSyncLogRepository) CreateHeaderSyncLogs(headerID int64, l return txErr } for _, log := range logs { - err := insertLog(headerID, log, tx) + err := repository.insertLog(headerID, log, tx) if err != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { @@ -114,13 +122,17 @@ func (repository HeaderSyncLogRepository) CreateHeaderSyncLogs(headerID int64, l return tx.Commit() } -func insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error { +func (repository HeaderSyncLogRepository) insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error { topics := buildTopics(log) raw, jsonErr := log.MarshalJSON() if jsonErr != nil { return jsonErr } - _, insertErr := tx.Exec(insertHeaderSyncLogQuery, headerID, log.Address.Hex(), topics, log.Data, log.BlockNumber, + addressID, addrErr := repository.addressRepository.GetOrCreateAddressInTransaction(tx, log.Address.Hex()) + if addrErr != nil { + return addrErr + } + _, insertErr := tx.Exec(insertHeaderSyncLogQuery, headerID, addressID, topics, log.Data, log.BlockNumber, log.BlockHash.Hex(), log.TxIndex, log.TxHash.Hex(), log.Index, raw) return insertErr } diff --git a/pkg/datastore/postgres/repositories/header_sync_log_repository_test.go b/pkg/datastore/postgres/repositories/header_sync_log_repository_test.go index 2891b6ba..5cd062a3 100644 --- a/pkg/datastore/postgres/repositories/header_sync_log_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_sync_log_repository_test.go @@ -47,11 +47,16 @@ var _ = Describe("Header sync log repository", func() { repository = repositories.NewHeaderSyncLogRepository(db) }) + AfterEach(func() { + closeErr := db.Close() + Expect(closeErr).NotTo(HaveOccurred()) + }) + Describe("CreateHeaderSyncLogs", func() { - type HeaderSyncLog struct { + type headerSyncLog struct { ID int64 HeaderID int64 `db:"header_id"` - Address string + Address int64 Topics pq.ByteaArray Data []byte BlockNumber uint64 `db:"block_number"` @@ -69,12 +74,15 @@ var _ = Describe("Header sync log repository", func() { err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log}) Expect(err).NotTo(HaveOccurred()) - var dbLog HeaderSyncLog + var dbLog headerSyncLog lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) Expect(lookupErr).NotTo(HaveOccurred()) Expect(dbLog.ID).NotTo(BeZero()) Expect(dbLog.HeaderID).To(Equal(headerID)) - Expect(dbLog.Address).To(Equal(log.Address.Hex())) + addressRepository := repositories.AddressRepository{} + actualAddress, addressErr := addressRepository.GetAddressById(db, dbLog.Address) + Expect(addressErr).NotTo(HaveOccurred()) + Expect(actualAddress).To(Equal(log.Address.Hex())) Expect(dbLog.Topics[0]).To(Equal(log.Topics[0].Bytes())) Expect(dbLog.Topics[1]).To(Equal(log.Topics[1].Bytes())) Expect(dbLog.Data).To(Equal(log.Data)) @@ -111,7 +119,7 @@ var _ = Describe("Header sync log repository", func() { err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log}) Expect(err).NotTo(HaveOccurred()) - var dbLog HeaderSyncLog + var dbLog headerSyncLog lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) Expect(lookupErr).NotTo(HaveOccurred()) @@ -120,8 +128,11 @@ var _ = Describe("Header sync log repository", func() { logTopics = append(logTopics, common.BytesToHash(topic)) } + addressRepository := repositories.AddressRepository{} + actualAddress, addressErr := addressRepository.GetAddressById(db, dbLog.Address) + Expect(addressErr).NotTo(HaveOccurred()) reconstructedLog := types.Log{ - Address: common.HexToAddress(dbLog.Address), + Address: common.HexToAddress(actualAddress), Topics: logTopics, Data: dbLog.Data, BlockNumber: dbLog.BlockNumber, @@ -147,7 +158,7 @@ var _ = Describe("Header sync log repository", func() { }) }) - Describe("GetFullSyncLogs", func() { + Describe("GetUntransformedHeaderSyncLogs", func() { Describe("when there are no logs", func() { It("returns empty collection", func() { result, err := repository.GetUntransformedHeaderSyncLogs()