Add constraint to prevent duplicate headers

- Disallow inserts of headers with the same number, hash, and node
  fingerprint, since it will enable duplicate log fetching for the
  same header
This commit is contained in:
Rob Mulholand 2019-09-26 13:58:48 -05:00
parent 62e1378e0c
commit e252229b8a
4 changed files with 39 additions and 8 deletions

View File

@ -8,7 +8,8 @@ CREATE TABLE public.headers
block_timestamp NUMERIC,
check_count INTEGER NOT NULL DEFAULT 0,
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
eth_node_fingerprint VARCHAR(128)
eth_node_fingerprint VARCHAR(128),
UNIQUE (block_number, hash, eth_node_fingerprint)
);
-- Index is removed when table is

View File

@ -921,6 +921,14 @@ ALTER TABLE ONLY public.header_sync_transactions
ADD CONSTRAINT header_sync_transactions_pkey PRIMARY KEY (id);
--
-- Name: headers headers_block_number_hash_eth_node_fingerprint_key; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.headers
ADD CONSTRAINT headers_block_number_hash_eth_node_fingerprint_key UNIQUE (block_number, hash, eth_node_fingerprint);
--
-- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

View File

@ -41,7 +41,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
hash, err := repository.getHeaderHash(header)
if err != nil {
if headerDoesNotExist(err) {
return repository.insertHeader(header)
return repository.InternalInsertHeader(header)
}
log.Error("CreateOrUpdateHeader: error getting header hash: ", err)
return 0, err
@ -128,13 +128,21 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er
return hash, err
}
func (repository HeaderRepository) insertHeader(header core.Header) (int64, error) {
// Function is public so we can test insert being called for the same header
// Can happen when concurrent processes are inserting headers
// Otherwise should not occur since only called in CreateOrUpdateHeader
func (repository HeaderRepository) InternalInsertHeader(header core.Header) (int64, error) {
var headerId int64
err := repository.database.QueryRowx(
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`,
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId)
row := repository.database.QueryRowx(
`INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint)
VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) ON CONFLICT DO NOTHING RETURNING id`,
header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID)
err := row.Scan(&headerId)
if err != nil {
log.Error("insertHeader: error inserting header: ", err)
if err == sql.ErrNoRows {
return 0, ErrValidHeaderExists
}
log.Error("InternalInsertHeader: error inserting header: ", err)
}
return headerId, err
}
@ -146,5 +154,5 @@ func (repository HeaderRepository) replaceHeader(header core.Header) (int64, err
log.Error("replaceHeader: error deleting headers: ", err)
return 0, err
}
return repository.insertHeader(header)
return repository.InternalInsertHeader(header)
}

View File

@ -98,6 +98,20 @@ var _ = Describe("Block header repository", func() {
Expect(len(dbHeaders)).To(Equal(1))
})
It("does not duplicate headers in concurrent insert", func() {
_, err = repo.InternalInsertHeader(header)
Expect(err).NotTo(HaveOccurred())
_, err = repo.InternalInsertHeader(header)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(repositories.ErrValidHeaderExists))
var dbHeaders []core.Header
err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(len(dbHeaders)).To(Equal(1))
})
It("replaces header if hash is different", func() {
_, err = repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())