diff --git a/db/migrations/00023_create_headers_table.sql b/db/migrations/00023_create_headers_table.sql index da539de8..82a2451f 100644 --- a/db/migrations/00023_create_headers_table.sql +++ b/db/migrations/00023_create_headers_table.sql @@ -8,7 +8,8 @@ CREATE TABLE public.headers block_timestamp NUMERIC, check_count INTEGER NOT NULL DEFAULT 0, eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE, - eth_node_fingerprint VARCHAR(128) + eth_node_fingerprint VARCHAR(128), + UNIQUE (block_number, hash, eth_node_fingerprint) ); -- Index is removed when table is diff --git a/db/schema.sql b/db/schema.sql index eeb1c81e..daaa7dd8 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -921,6 +921,14 @@ ALTER TABLE ONLY public.header_sync_transactions ADD CONSTRAINT header_sync_transactions_pkey PRIMARY KEY (id); +-- +-- Name: headers headers_block_number_hash_eth_node_fingerprint_key; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.headers + ADD CONSTRAINT headers_block_number_hash_eth_node_fingerprint_key UNIQUE (block_number, hash, eth_node_fingerprint); + + -- -- Name: headers headers_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- diff --git a/pkg/datastore/postgres/repositories/header_repository.go b/pkg/datastore/postgres/repositories/header_repository.go index dc0baa0f..2323f10d 100644 --- a/pkg/datastore/postgres/repositories/header_repository.go +++ b/pkg/datastore/postgres/repositories/header_repository.go @@ -41,7 +41,7 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int hash, err := repository.getHeaderHash(header) if err != nil { if headerDoesNotExist(err) { - return repository.insertHeader(header) + return repository.InternalInsertHeader(header) } log.Error("CreateOrUpdateHeader: error getting header hash: ", err) return 0, err @@ -128,13 +128,21 @@ func (repository HeaderRepository) getHeaderHash(header core.Header) (string, er return hash, err } -func (repository HeaderRepository) insertHeader(header core.Header) (int64, error) { +// Function is public so we can test insert being called for the same header +// Can happen when concurrent processes are inserting headers +// Otherwise should not occur since only called in CreateOrUpdateHeader +func (repository HeaderRepository) InternalInsertHeader(header core.Header) (int64, error) { var headerId int64 - err := repository.database.QueryRowx( - `INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) RETURNING id`, - header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID).Scan(&headerId) + row := repository.database.QueryRowx( + `INSERT INTO public.headers (block_number, hash, block_timestamp, raw, eth_node_id, eth_node_fingerprint) + VALUES ($1, $2, $3::NUMERIC, $4, $5, $6) ON CONFLICT DO NOTHING RETURNING id`, + header.BlockNumber, header.Hash, header.Timestamp, header.Raw, repository.database.NodeID, repository.database.Node.ID) + err := row.Scan(&headerId) if err != nil { - log.Error("insertHeader: error inserting header: ", err) + if err == sql.ErrNoRows { + return 0, ErrValidHeaderExists + } + log.Error("InternalInsertHeader: error inserting header: ", err) } return headerId, err } @@ -146,5 +154,5 @@ func (repository HeaderRepository) replaceHeader(header core.Header) (int64, err log.Error("replaceHeader: error deleting headers: ", err) return 0, err } - return repository.insertHeader(header) + return repository.InternalInsertHeader(header) } diff --git a/pkg/datastore/postgres/repositories/header_repository_test.go b/pkg/datastore/postgres/repositories/header_repository_test.go index dba6da09..c37812a5 100644 --- a/pkg/datastore/postgres/repositories/header_repository_test.go +++ b/pkg/datastore/postgres/repositories/header_repository_test.go @@ -98,6 +98,20 @@ var _ = Describe("Block header repository", func() { Expect(len(dbHeaders)).To(Equal(1)) }) + It("does not duplicate headers in concurrent insert", func() { + _, err = repo.InternalInsertHeader(header) + Expect(err).NotTo(HaveOccurred()) + + _, err = repo.InternalInsertHeader(header) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(repositories.ErrValidHeaderExists)) + + var dbHeaders []core.Header + err = db.Select(&dbHeaders, `SELECT block_number, hash, raw FROM public.headers WHERE block_number = $1`, header.BlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(len(dbHeaders)).To(Equal(1)) + }) + It("replaces header if hash is different", func() { _, err = repo.CreateOrUpdateHeader(header) Expect(err).NotTo(HaveOccurred())