Don't duplicate queued storage diffs

- currently, if we don't recognize the same diff several times (e.g.
  if you restart the storage diff watcher pointed at the same file),
  we'll add the same row to the queue on each run.
- these changes assure we only queue an unrecognized diff once.
This commit is contained in:
Rob Mulholand 2019-06-14 11:26:15 -05:00
parent eb2b3a0263
commit e11f2c8c59
4 changed files with 30 additions and 9 deletions

View File

@ -5,7 +5,8 @@ CREATE TABLE public.queued_storage (
block_hash BYTEA,
contract BYTEA,
storage_key BYTEA,
storage_value BYTEA
storage_value BYTEA,
UNIQUE (block_height, block_hash, contract, storage_key, storage_value)
);
-- +goose Down

View File

@ -2,8 +2,8 @@
-- PostgreSQL database dump
--
-- Dumped from database version 11.2
-- Dumped by pg_dump version 11.2
-- Dumped from database version 11.3
-- Dumped by pg_dump version 11.3
SET statement_timeout = 0;
SET lock_timeout = 0;
@ -12,6 +12,7 @@ SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
@ -789,6 +790,14 @@ ALTER TABLE ONLY public.eth_nodes
ADD CONSTRAINT nodes_pkey PRIMARY KEY (id);
--
-- Name: queued_storage queued_storage_block_height_block_hash_contract_storage_key_key; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.queued_storage
ADD CONSTRAINT queued_storage_block_height_block_hash_contract_storage_key_key UNIQUE (block_height, block_hash, contract, storage_key, storage_value);
--
-- Name: queued_storage queued_storage_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

View File

@ -38,7 +38,7 @@ func NewStorageQueue(db *postgres.DB) StorageQueue {
func (queue StorageQueue) Add(row utils.StorageDiffRow) error {
_, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract,
block_hash, block_height, storage_key, storage_value) VALUES
($1, $2, $3, $4, $5)`, row.Contract.Bytes(), row.BlockHash.Bytes(),
($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, row.Contract.Bytes(), row.BlockHash.Bytes(),
row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes())
return err
}

View File

@ -49,11 +49,22 @@ var _ = Describe("Storage queue", func() {
Expect(addErr).NotTo(HaveOccurred())
})
It("adds a storage row to the db", func() {
var result utils.StorageDiffRow
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
Expect(getErr).NotTo(HaveOccurred())
Expect(result).To(Equal(row))
Describe("Add", func() {
It("adds a storage row to the db", func() {
var result utils.StorageDiffRow
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
Expect(getErr).NotTo(HaveOccurred())
Expect(result).To(Equal(row))
})
It("does not duplicate storage rows", func() {
addErr := queue.Add(row)
Expect(addErr).NotTo(HaveOccurred())
var count int
getErr := db.Get(&count, `SELECT count(*) FROM public.queued_storage`)
Expect(getErr).NotTo(HaveOccurred())
Expect(count).To(Equal(1))
})
})
It("deletes storage row from db", func() {