Add light sync transactions table

- Foreign key to header instead of block
- Can use same Transaction struct for both
This commit is contained in:
Rob Mulholand 2019-03-19 14:39:26 -05:00
parent d93006321b
commit 79e011aad2
9 changed files with 154 additions and 1 deletions

View File

@ -0,0 +1,14 @@
-- +goose Up
CREATE TABLE light_sync_transactions (
id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE,
hash TEXT,
raw JSONB,
tx_index INTEGER,
tx_from TEXT,
tx_to TEXT,
UNIQUE (header_id, hash)
);
-- +goose Down
DROP TABLE light_sync_transactions;

View File

@ -258,6 +258,41 @@ CREATE SEQUENCE public.headers_id_seq
ALTER SEQUENCE public.headers_id_seq OWNED BY public.headers.id; ALTER SEQUENCE public.headers_id_seq OWNED BY public.headers.id;
--
-- Name: light_sync_transactions; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.light_sync_transactions (
id integer NOT NULL,
header_id integer NOT NULL,
hash text,
raw jsonb,
tx_index integer,
tx_from text,
tx_to text
);
--
-- Name: light_sync_transactions_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.light_sync_transactions_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: light_sync_transactions_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.light_sync_transactions_id_seq OWNED BY public.light_sync_transactions.id;
-- --
-- Name: log_filters; Type: TABLE; Schema: public; Owner: - -- Name: log_filters; Type: TABLE; Schema: public; Owner: -
-- --
@ -504,6 +539,13 @@ ALTER TABLE ONLY public.goose_db_version ALTER COLUMN id SET DEFAULT nextval('pu
ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.headers_id_seq'::regclass); ALTER TABLE ONLY public.headers ALTER COLUMN id SET DEFAULT nextval('public.headers_id_seq'::regclass);
--
-- Name: light_sync_transactions id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.light_sync_transactions ALTER COLUMN id SET DEFAULT nextval('public.light_sync_transactions_id_seq'::regclass);
-- --
-- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: - -- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -603,6 +645,22 @@ ALTER TABLE ONLY public.headers
ADD CONSTRAINT headers_pkey PRIMARY KEY (id); ADD CONSTRAINT headers_pkey PRIMARY KEY (id);
--
-- Name: light_sync_transactions light_sync_transactions_header_id_hash_key; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.light_sync_transactions
ADD CONSTRAINT light_sync_transactions_header_id_hash_key UNIQUE (header_id, hash);
--
-- Name: light_sync_transactions light_sync_transactions_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.light_sync_transactions
ADD CONSTRAINT light_sync_transactions_pkey PRIMARY KEY (id);
-- --
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
-- --
@ -733,6 +791,14 @@ ALTER TABLE ONLY public.full_sync_transactions
ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
--
-- Name: light_sync_transactions light_sync_transactions_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.light_sync_transactions
ADD CONSTRAINT light_sync_transactions_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
-- --
-- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -240,6 +240,9 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`DELETE FROM full_sync_transactions`) _, err = tx.Exec(`DELETE FROM full_sync_transactions`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec("DELETE FROM light_sync_transactions")
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM receipts`) _, err = tx.Exec(`DELETE FROM receipts`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -49,11 +49,21 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
return 0, ErrValidHeaderExists return 0, ErrValidHeaderExists
} }
func (repository HeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error {
_, err := repository.database.Exec(`INSERT INTO public.light_sync_transactions
(header_id, hash, tx_to, tx_from, tx_index) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING`,
headerID, transaction.Hash, transaction.To, transaction.From, transaction.TxIndex)
return err
}
func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
var header core.Header var header core.Header
err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,
blockNumber, repository.database.Node.ID) blockNumber, repository.database.Node.ID)
if err != nil {
log.Error("GetHeader: error getting headers: ", err) log.Error("GetHeader: error getting headers: ", err)
}
return header, err return header, err
} }

View File

@ -180,6 +180,57 @@ var _ = Describe("Block header repository", func() {
}) })
}) })
Describe("creating a transaction", func() {
It("adds a transaction", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.Transaction{
From: fromAddress.Hex(),
Hash: txHash.Hex(),
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
}
insertErr := repo.CreateTransaction(headerID, transaction)
Expect(insertErr).NotTo(HaveOccurred())
var dbTransaction core.Transaction
err = db.Get(&dbTransaction, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(dbTransaction).To(Equal(transaction))
})
It("silently ignores duplicate inserts", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.Transaction{
From: fromAddress.Hex(),
Hash: txHash.Hex(),
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
}
insertErr := repo.CreateTransaction(headerID, transaction)
Expect(insertErr).NotTo(HaveOccurred())
insertTwoErr := repo.CreateTransaction(headerID, transaction)
Expect(insertTwoErr).NotTo(HaveOccurred())
var dbTransactions []core.Transaction
err = db.Select(&dbTransactions, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(len(dbTransactions)).To(Equal(1))
})
})
Describe("Getting a header", func() { Describe("Getting a header", func() {
It("returns header if it exists", func() { It("returns header if it exists", func() {
_, err = repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)

View File

@ -55,6 +55,7 @@ type FilterRepository interface {
type HeaderRepository interface { type HeaderRepository interface {
CreateOrUpdateHeader(header core.Header) (int64, error) CreateOrUpdateHeader(header core.Header) (int64, error)
CreateTransaction(headerID int64, transaction core.Transaction) error
GetHeader(blockNumber int64) (core.Header, error) GetHeader(blockNumber int64) (core.Header, error)
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error)
} }

View File

@ -56,6 +56,10 @@ func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header)
return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr
} }
func (repository *MockHeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error {
panic("implement me")
}
func (repository *MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository *MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
repository.GetHeaderPassedBlockNumber = blockNumber repository.GetHeaderPassedBlockNumber = blockNumber
return core.Header{BlockNumber: blockNumber, Hash: repository.getHeaderReturnBlockHash}, repository.getHeaderError return core.Header{BlockNumber: blockNumber, Hash: repository.getHeaderReturnBlockHash}, repository.getHeaderError

View File

@ -69,6 +69,9 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`DELETE FROM full_sync_transactions`) _, err = tx.Exec(`DELETE FROM full_sync_transactions`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec("DELETE FROM light_sync_transactions")
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM receipts`) _, err = tx.Exec(`DELETE FROM receipts`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -111,6 +111,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM full_sync_transactions") db.MustExec("DELETE FROM full_sync_transactions")
db.MustExec("DELETE FROM goose_db_version") db.MustExec("DELETE FROM goose_db_version")
db.MustExec("DELETE FROM headers") db.MustExec("DELETE FROM headers")
db.MustExec("DELETE FROM light_sync_transactions")
db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM logs") db.MustExec("DELETE FROM logs")
db.MustExec("DELETE FROM queued_storage") db.MustExec("DELETE FROM queued_storage")