Merge pull request #150 from vulcanize/unique-headers-constraint-v2
Add constraint to prevent duplicate headers
This commit is contained in:
commit
25f9c6c9e3
@ -8,7 +8,8 @@ CREATE TABLE public.headers
|
|||||||
block_timestamp NUMERIC,
|
block_timestamp NUMERIC,
|
||||||
check_count INTEGER NOT NULL DEFAULT 0,
|
check_count INTEGER NOT NULL DEFAULT 0,
|
||||||
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
|
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
|
||||||
eth_node_fingerprint VARCHAR(128)
|
eth_node_fingerprint VARCHAR(128),
|
||||||
|
UNIQUE (block_number, hash, eth_node_fingerprint)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index is removed when table is
|
-- Index is removed when table is
|
||||||
|
@ -921,6 +921,14 @@ ALTER TABLE ONLY public.header_sync_transactions
|
|||||||
ADD CONSTRAINT header_sync_transactions_pkey PRIMARY KEY (id);
|
ADD CONSTRAINT header_sync_transactions_pkey PRIMARY KEY (id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: headers headers_block_number_hash_eth_node_fingerprint_key; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.headers
|
||||||
|
ADD CONSTRAINT headers_block_number_hash_eth_node_fingerprint_key UNIQUE (block_number, hash, eth_node_fingerprint);
|
||||||
|
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
-- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
--
|
--
|
||||||
|
@ -41,7 +41,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
|
|||||||
hash, err := repository.getHeaderHash(header)
|
hash, err := repository.getHeaderHash(header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if headerDoesNotExist(err) {
|
if headerDoesNotExist(err) {
|
||||||
return repository.insertHeader(header)
|
return repository.InternalInsertHeader(header)
|
||||||
}
|
}
|
||||||
log.Error("CreateOrUpdateHeader: error getting header hash: ", err)
|
log.Error("CreateOrUpdateHeader: error getting header hash: ", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -128,13 +128,21 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er
|
|||||||
return hash, err
|
return hash, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository HeaderRepository) insertHeader(header core.Header) (int64, error) {
|
// Function is public so we can test insert being called for the same header
|
||||||
|
// Can happen when concurrent processes are inserting headers
|
||||||
|
// Otherwise should not occur since only called in CreateOrUpdateHeader
|
||||||
|
func (repository HeaderRepository) InternalInsertHeader(header core.Header) (int64, error) {
|
||||||
var headerId int64
|
var headerId int64
|
||||||
err := repository.database.QueryRowx(
|
row := repository.database.QueryRowx(
|
||||||
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`,
|
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint)
|
||||||
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId)
|
VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) ON CONFLICT DO NOTHING RETURNING id`,
|
||||||
|
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID)
|
||||||
|
err := row.Scan(&headerId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("insertHeader: error inserting header: ", err)
|
if err == sql.ErrNoRows {
|
||||||
|
return 0, ErrValidHeaderExists
|
||||||
|
}
|
||||||
|
log.Error("InternalInsertHeader: error inserting header: ", err)
|
||||||
}
|
}
|
||||||
return headerId, err
|
return headerId, err
|
||||||
}
|
}
|
||||||
@ -146,5 +154,5 @@ func (repository HeaderRepository) replaceHeader(header core.Header) (int64, err
|
|||||||
log.Error("replaceHeader: error deleting headers: ", err)
|
log.Error("replaceHeader: error deleting headers: ", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return repository.insertHeader(header)
|
return repository.InternalInsertHeader(header)
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,20 @@ var _ = Describe("Block header repository", func() {
|
|||||||
Expect(len(dbHeaders)).To(Equal(1))
|
Expect(len(dbHeaders)).To(Equal(1))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("does not duplicate headers in concurrent insert", func() {
|
||||||
|
_, err = repo.InternalInsertHeader(header)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
_, err = repo.InternalInsertHeader(header)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err).To(MatchError(repositories.ErrValidHeaderExists))
|
||||||
|
|
||||||
|
var dbHeaders []core.Header
|
||||||
|
err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(dbHeaders)).To(Equal(1))
|
||||||
|
})
|
||||||
|
|
||||||
It("replaces header if hash is different", func() {
|
It("replaces header if hash is different", func() {
|
||||||
_, err = repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Loading…
Reference in New Issue
Block a user