Reference header sync logs address via foreign key

This commit is contained in:
Rob Mulholand 2019-08-28 10:41:34 -05:00
parent 5ac76eee74
commit b9f3b9f946
6 changed files with 63 additions and 25 deletions

View File

@ -4,7 +4,7 @@ CREATE TABLE header_sync_logs
( (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
address VARCHAR(66), address INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE,
topics BYTEA[], topics BYTEA[],
data BYTEA, data BYTEA,
block_number BIGINT, block_number BIGINT,

View File

@ -334,7 +334,7 @@ ALTER SEQUENCE public.goose_db_version_id_seq OWNED BY public.goose_db_version.i
CREATE TABLE public.header_sync_logs ( CREATE TABLE public.header_sync_logs (
id integer NOT NULL, id integer NOT NULL,
header_id integer NOT NULL, header_id integer NOT NULL,
address character varying(66), address integer NOT NULL,
topics bytea[], topics bytea[],
data bytea, data bytea,
block_number bigint, block_number bigint,
@ -1075,6 +1075,14 @@ ALTER TABLE ONLY public.full_sync_transactions
ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
--
-- Name: header_sync_logs header_sync_logs_address_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.header_sync_logs
ADD CONSTRAINT header_sync_logs_address_fkey FOREIGN KEY (address) REFERENCES public.addresses(id) ON DELETE CASCADE;
-- --
-- Name: header_sync_logs header_sync_logs_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: header_sync_logs header_sync_logs_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -34,13 +34,18 @@ var _ = Describe("Checked Headers repository", func() {
repo datastore.CheckedHeadersRepository repo datastore.CheckedHeadersRepository
) )
Describe("MarkHeaderChecked", func() { BeforeEach(func() {
BeforeEach(func() { db = test_config.NewTestDB(test_config.NewTestNode())
db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db)
test_config.CleanTestDB(db) repo = repositories.NewCheckedHeadersRepository(db)
repo = repositories.NewCheckedHeadersRepository(db) })
})
AfterEach(func() {
closeErr := db.Close()
Expect(closeErr).NotTo(HaveOccurred())
})
Describe("MarkHeaderChecked", func() {
It("marks passed header as checked on insert", func() { It("marks passed header as checked on insert", func() {
headerRepository := repositories.NewHeaderRepository(db) headerRepository := repositories.NewHeaderRepository(db)
headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
@ -129,10 +134,7 @@ var _ = Describe("Checked Headers repository", func() {
) )
BeforeEach(func() { BeforeEach(func() {
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
headerRepository = repositories.NewHeaderRepository(db) headerRepository = repositories.NewHeaderRepository(db)
repo = repositories.NewCheckedHeadersRepository(db)
startingBlockNumber = rand.Int63() startingBlockNumber = rand.Int63()
middleBlockNumber = startingBlockNumber + 1 middleBlockNumber = startingBlockNumber + 1

View File

@ -42,6 +42,11 @@ var _ = Describe("Checked logs repository", func() {
repository = repositories.NewCheckedLogsRepository(db) repository = repositories.NewCheckedLogsRepository(db)
}) })
AfterEach(func() {
closeErr := db.Close()
Expect(closeErr).NotTo(HaveOccurred())
})
Describe("HaveLogsBeenChecked", func() { Describe("HaveLogsBeenChecked", func() {
It("returns true if all addresses and the topic0 are already present in the db", func() { It("returns true if all addresses and the topic0 are already present in the db", func() {
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)

View File

@ -31,17 +31,21 @@ const insertHeaderSyncLogQuery = `INSERT INTO header_sync_logs
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING` VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING`
type HeaderSyncLogRepository struct { type HeaderSyncLogRepository struct {
db *postgres.DB db *postgres.DB
addressRepository AddressRepository
} }
func NewHeaderSyncLogRepository(db *postgres.DB) HeaderSyncLogRepository { func NewHeaderSyncLogRepository(db *postgres.DB) HeaderSyncLogRepository {
return HeaderSyncLogRepository{db: db} return HeaderSyncLogRepository{
db: db,
addressRepository: AddressRepository{},
}
} }
type headerSyncLog struct { type headerSyncLog struct {
ID int64 ID int64
HeaderID int64 `db:"header_id"` HeaderID int64 `db:"header_id"`
Address string Address int64
Topics pq.ByteaArray Topics pq.ByteaArray
Data []byte Data []byte
BlockNumber uint64 `db:"block_number"` BlockNumber uint64 `db:"block_number"`
@ -70,8 +74,12 @@ func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]co
for _, topic := range rawLog.Topics { for _, topic := range rawLog.Topics {
logTopics = append(logTopics, common.BytesToHash(topic)) logTopics = append(logTopics, common.BytesToHash(topic))
} }
address, addrErr := repository.addressRepository.GetAddressById(repository.db, rawLog.Address)
if addrErr != nil {
return nil, addrErr
}
reconstructedLog := types.Log{ reconstructedLog := types.Log{
Address: common.HexToAddress(rawLog.Address), Address: common.HexToAddress(address),
Topics: logTopics, Topics: logTopics,
Data: rawLog.Data, Data: rawLog.Data,
BlockNumber: rawLog.BlockNumber, BlockNumber: rawLog.BlockNumber,
@ -102,7 +110,7 @@ func (repository HeaderSyncLogRepository) CreateHeaderSyncLogs(headerID int64, l
return txErr return txErr
} }
for _, log := range logs { for _, log := range logs {
err := insertLog(headerID, log, tx) err := repository.insertLog(headerID, log, tx)
if err != nil { if err != nil {
rollbackErr := tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil { if rollbackErr != nil {
@ -114,13 +122,17 @@ func (repository HeaderSyncLogRepository) CreateHeaderSyncLogs(headerID int64, l
return tx.Commit() return tx.Commit()
} }
func insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error { func (repository HeaderSyncLogRepository) insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error {
topics := buildTopics(log) topics := buildTopics(log)
raw, jsonErr := log.MarshalJSON() raw, jsonErr := log.MarshalJSON()
if jsonErr != nil { if jsonErr != nil {
return jsonErr return jsonErr
} }
_, insertErr := tx.Exec(insertHeaderSyncLogQuery, headerID, log.Address.Hex(), topics, log.Data, log.BlockNumber, addressID, addrErr := repository.addressRepository.GetOrCreateAddressInTransaction(tx, log.Address.Hex())
if addrErr != nil {
return addrErr
}
_, insertErr := tx.Exec(insertHeaderSyncLogQuery, headerID, addressID, topics, log.Data, log.BlockNumber,
log.BlockHash.Hex(), log.TxIndex, log.TxHash.Hex(), log.Index, raw) log.BlockHash.Hex(), log.TxIndex, log.TxHash.Hex(), log.Index, raw)
return insertErr return insertErr
} }

View File

@ -47,11 +47,16 @@ var _ = Describe("Header sync log repository", func() {
repository = repositories.NewHeaderSyncLogRepository(db) repository = repositories.NewHeaderSyncLogRepository(db)
}) })
AfterEach(func() {
closeErr := db.Close()
Expect(closeErr).NotTo(HaveOccurred())
})
Describe("CreateHeaderSyncLogs", func() { Describe("CreateHeaderSyncLogs", func() {
type HeaderSyncLog struct { type headerSyncLog struct {
ID int64 ID int64
HeaderID int64 `db:"header_id"` HeaderID int64 `db:"header_id"`
Address string Address int64
Topics pq.ByteaArray Topics pq.ByteaArray
Data []byte Data []byte
BlockNumber uint64 `db:"block_number"` BlockNumber uint64 `db:"block_number"`
@ -69,12 +74,15 @@ var _ = Describe("Header sync log repository", func() {
err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log}) err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var dbLog HeaderSyncLog var dbLog headerSyncLog
lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`)
Expect(lookupErr).NotTo(HaveOccurred()) Expect(lookupErr).NotTo(HaveOccurred())
Expect(dbLog.ID).NotTo(BeZero()) Expect(dbLog.ID).NotTo(BeZero())
Expect(dbLog.HeaderID).To(Equal(headerID)) Expect(dbLog.HeaderID).To(Equal(headerID))
Expect(dbLog.Address).To(Equal(log.Address.Hex())) addressRepository := repositories.AddressRepository{}
actualAddress, addressErr := addressRepository.GetAddressById(db, dbLog.Address)
Expect(addressErr).NotTo(HaveOccurred())
Expect(actualAddress).To(Equal(log.Address.Hex()))
Expect(dbLog.Topics[0]).To(Equal(log.Topics[0].Bytes())) Expect(dbLog.Topics[0]).To(Equal(log.Topics[0].Bytes()))
Expect(dbLog.Topics[1]).To(Equal(log.Topics[1].Bytes())) Expect(dbLog.Topics[1]).To(Equal(log.Topics[1].Bytes()))
Expect(dbLog.Data).To(Equal(log.Data)) Expect(dbLog.Data).To(Equal(log.Data))
@ -111,7 +119,7 @@ var _ = Describe("Header sync log repository", func() {
err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log}) err := repository.CreateHeaderSyncLogs(headerID, []types.Log{log})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var dbLog HeaderSyncLog var dbLog headerSyncLog
lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`) lookupErr := db.Get(&dbLog, `SELECT * FROM header_sync_logs`)
Expect(lookupErr).NotTo(HaveOccurred()) Expect(lookupErr).NotTo(HaveOccurred())
@ -120,8 +128,11 @@ var _ = Describe("Header sync log repository", func() {
logTopics = append(logTopics, common.BytesToHash(topic)) logTopics = append(logTopics, common.BytesToHash(topic))
} }
addressRepository := repositories.AddressRepository{}
actualAddress, addressErr := addressRepository.GetAddressById(db, dbLog.Address)
Expect(addressErr).NotTo(HaveOccurred())
reconstructedLog := types.Log{ reconstructedLog := types.Log{
Address: common.HexToAddress(dbLog.Address), Address: common.HexToAddress(actualAddress),
Topics: logTopics, Topics: logTopics,
Data: dbLog.Data, Data: dbLog.Data,
BlockNumber: dbLog.BlockNumber, BlockNumber: dbLog.BlockNumber,
@ -147,7 +158,7 @@ var _ = Describe("Header sync log repository", func() {
}) })
}) })
Describe("GetFullSyncLogs", func() { Describe("GetUntransformedHeaderSyncLogs", func() {
Describe("when there are no logs", func() { Describe("when there are no logs", func() {
It("returns empty collection", func() { It("returns empty collection", func() {
result, err := repository.GetUntransformedHeaderSyncLogs() result, err := repository.GetUntransformedHeaderSyncLogs()