Merge pull request #126 from vulcanize/vdb-585-create-address-table

Vdb-585 create address table
This commit is contained in:
Elizabeth 2019-08-26 15:13:07 -05:00 committed by GitHub
commit b6a2a278ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 709 additions and 148 deletions

View File

@ -84,7 +84,7 @@ func coldImport() {
// init cold importer deps // init cold importer deps
blockRepository := repositories.NewBlockRepository(&pgDB) blockRepository := repositories.NewBlockRepository(&pgDB)
receiptRepository := repositories.ReceiptRepository{DB: &pgDB} receiptRepository := repositories.FullSyncReceiptRepository{DB: &pgDB}
transactionConverter := cold_db.NewColdDbTransactionConverter() transactionConverter := cold_db.NewColdDbTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(transactionConverter) blockConverter := vulcCommon.NewBlockConverter(transactionConverter)

View File

@ -1,15 +1,16 @@
// Copyright © 2019 Vulcanize, Inc // VulcanizeDB
// // Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by // it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or // the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. // (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, // This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of // but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details. // GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -1,15 +1,16 @@
// Copyright © 2019 Vulcanize, Inc // VulcanizeDB
// // Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by // it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or // the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. // (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, // This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of // but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details. // GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -1,15 +1,16 @@
// Copyright © 2019 Vulcanize, Inc // VulcanizeDB
// // Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify // This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by // it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or // the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. // (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, // This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of // but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details. // GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -0,0 +1,10 @@
-- +goose Up
CREATE TABLE public.addresses
(
id SERIAL PRIMARY KEY,
address character varying(42),
UNIQUE (address)
);
-- +goose Down
DROP TABLE public.addresses;

View File

@ -1,16 +0,0 @@
-- +goose Up
CREATE TABLE full_sync_receipts
(
id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE,
contract_address VARCHAR(42),
cumulative_gas_used NUMERIC,
gas_used NUMERIC,
state_root VARCHAR(66),
status INTEGER,
tx_hash VARCHAR(66)
);
-- +goose Down
DROP TABLE full_sync_receipts;

View File

@ -0,0 +1,16 @@
-- +goose Up
CREATE TABLE full_sync_receipts
(
id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE,
contract_address_id INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE,
cumulative_gas_used NUMERIC,
gas_used NUMERIC,
state_root VARCHAR(66),
status INTEGER,
tx_hash VARCHAR(66)
);
-- +goose Down
DROP TABLE full_sync_receipts;

View File

@ -1,18 +0,0 @@
-- +goose Up
CREATE TABLE header_sync_receipts(
id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES header_sync_transactions(id) ON DELETE CASCADE,
header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE,
contract_address VARCHAR(42),
cumulative_gas_used NUMERIC,
gas_used NUMERIC,
state_root VARCHAR(66),
status INTEGER,
tx_hash VARCHAR(66),
rlp BYTEA,
UNIQUE(header_id, transaction_id)
);
-- +goose Down
DROP TABLE header_sync_receipts;

View File

@ -0,0 +1,19 @@
-- +goose Up
CREATE TABLE header_sync_receipts
(
id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES header_sync_transactions (id) ON DELETE CASCADE,
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
contract_address_id INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE,
cumulative_gas_used NUMERIC,
gas_used NUMERIC,
state_root VARCHAR(66),
status INTEGER,
tx_hash VARCHAR(66),
rlp BYTEA,
UNIQUE (header_id, transaction_id)
);
-- +goose Down
DROP TABLE header_sync_receipts;

View File

@ -20,6 +20,36 @@ SET default_tablespace = '';
SET default_with_oids = false; SET default_with_oids = false;
--
-- Name: addresses; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.addresses (
id integer NOT NULL,
address character varying(42)
);
--
-- Name: addresses_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.addresses_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: addresses_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.addresses_id_seq OWNED BY public.addresses.id;
-- --
-- Name: logs; Type: TABLE; Schema: public; Owner: - -- Name: logs; Type: TABLE; Schema: public; Owner: -
-- --
@ -144,7 +174,7 @@ CREATE TABLE public.eth_nodes (
CREATE TABLE public.full_sync_receipts ( CREATE TABLE public.full_sync_receipts (
id integer NOT NULL, id integer NOT NULL,
contract_address character varying(42), contract_address_id integer NOT NULL,
cumulative_gas_used numeric, cumulative_gas_used numeric,
gas_used numeric, gas_used numeric,
state_root character varying(66), state_root character varying(66),
@ -254,7 +284,7 @@ CREATE TABLE public.header_sync_receipts (
id integer NOT NULL, id integer NOT NULL,
transaction_id integer NOT NULL, transaction_id integer NOT NULL,
header_id integer NOT NULL, header_id integer NOT NULL,
contract_address character varying(42), contract_address_id integer NOT NULL,
cumulative_gas_used numeric, cumulative_gas_used numeric,
gas_used numeric, gas_used numeric,
state_root character varying(66), state_root character varying(66),
@ -564,6 +594,13 @@ CREATE VIEW public.watched_event_logs AS
WHERE ((((log_filters.topic0)::text = (logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (logs.topic3)::text) OR (log_filters.topic3 IS NULL))); WHERE ((((log_filters.topic0)::text = (logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (logs.topic3)::text) OR (log_filters.topic3 IS NULL)));
--
-- Name: addresses id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.addresses ALTER COLUMN id SET DEFAULT nextval('public.addresses_id_seq'::regclass);
-- --
-- Name: blocks id; Type: DEFAULT; Schema: public; Owner: - -- Name: blocks id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -662,6 +699,22 @@ ALTER TABLE ONLY public.uncles ALTER COLUMN id SET DEFAULT nextval('public.uncle
ALTER TABLE ONLY public.watched_contracts ALTER COLUMN contract_id SET DEFAULT nextval('public.watched_contracts_contract_id_seq'::regclass); ALTER TABLE ONLY public.watched_contracts ALTER COLUMN contract_id SET DEFAULT nextval('public.watched_contracts_contract_id_seq'::regclass);
--
-- Name: addresses addresses_address_key; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.addresses
ADD CONSTRAINT addresses_address_key UNIQUE (address);
--
-- Name: addresses addresses_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.addresses
ADD CONSTRAINT addresses_pkey PRIMARY KEY (id);
-- --
-- Name: blocks blocks_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- Name: blocks blocks_pkey; Type: CONSTRAINT; Schema: public; Owner: -
-- --
@ -896,6 +949,14 @@ ALTER TABLE ONLY public.checked_headers
ADD CONSTRAINT checked_headers_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; ADD CONSTRAINT checked_headers_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: full_sync_receipts full_sync_receipts_contract_address_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.full_sync_receipts
ADD CONSTRAINT full_sync_receipts_contract_address_id_fkey FOREIGN KEY (contract_address_id) REFERENCES public.addresses(id) ON DELETE CASCADE;
-- --
-- Name: full_sync_transactions full_sync_transactions_block_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: full_sync_transactions full_sync_transactions_block_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --
@ -904,6 +965,14 @@ ALTER TABLE ONLY public.full_sync_transactions
ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE; ADD CONSTRAINT full_sync_transactions_block_id_fkey FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
--
-- Name: header_sync_receipts header_sync_receipts_contract_address_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.header_sync_receipts
ADD CONSTRAINT header_sync_receipts_contract_address_id_fkey FOREIGN KEY (contract_address_id) REFERENCES public.addresses(id) ON DELETE CASCADE;
-- --
-- Name: header_sync_receipts header_sync_receipts_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: header_sync_receipts header_sync_receipts_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package integration package integration
import ( import (

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package integration package integration
import ( import (

View File

@ -17,7 +17,9 @@
package retriever package retriever
import ( import (
"database/sql"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
) )
// Block retriever is used to retrieve the first block for a given contract and the most recent block // Block retriever is used to retrieve the first block for a given contract and the most recent block
@ -41,26 +43,37 @@ func NewBlockRetriever(db *postgres.DB) (r *blockRetriever) {
func (r *blockRetriever) RetrieveFirstBlock(contractAddr string) (int64, error) { func (r *blockRetriever) RetrieveFirstBlock(contractAddr string) (int64, error) {
i, err := r.retrieveFirstBlockFromReceipts(contractAddr) i, err := r.retrieveFirstBlockFromReceipts(contractAddr)
if err != nil { if err != nil {
if err == sql.ErrNoRows {
i, err = r.retrieveFirstBlockFromLogs(contractAddr) i, err = r.retrieveFirstBlockFromLogs(contractAddr)
} }
return i, err
}
return i, err return i, err
} }
// For some contracts the contract creation transaction receipt doesn't have the contract address so this doesn't work (e.g. Sai) // For some contracts the contract creation transaction receipt doesn't have the contract address so this doesn't work (e.g. Sai)
func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (int64, error) { func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (int64, error) {
var firstBlock int var firstBlock int64
addressId, getAddressErr := addressRepository().GetOrCreateAddress(r.db, contractAddr)
if getAddressErr != nil {
return firstBlock, getAddressErr
}
err := r.db.Get( err := r.db.Get(
&firstBlock, &firstBlock,
`SELECT number FROM blocks `SELECT number FROM blocks
WHERE id = (SELECT block_id FROM full_sync_receipts WHERE id = (SELECT block_id FROM full_sync_receipts
WHERE lower(contract_address) = $1 WHERE contract_address_id = $1
ORDER BY block_id ASC ORDER BY block_id ASC
LIMIT 1)`, LIMIT 1)`,
contractAddr, addressId,
) )
return int64(firstBlock), err return firstBlock, err
}
func addressRepository() repositories.AddressRepository {
return repositories.AddressRepository{}
} }
// In which case this servers as a heuristic to find the first block by finding the first contract event log // In which case this servers as a heuristic to find the first block by finding the first contract event log

View File

@ -137,7 +137,7 @@ func SetupTusdRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string)
}, core.Node{}) }, core.Node{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptRepository := repositories.ReceiptRepository{DB: db} receiptRepository := repositories.FullSyncReceiptRepository{DB: db}
logRepository := repositories.LogRepository{DB: db} logRepository := repositories.LogRepository{DB: db}
blockRepository := *repositories.NewBlockRepository(db) blockRepository := *repositories.NewBlockRepository(db)
@ -183,7 +183,7 @@ func SetupENSRepo(vulcanizeLogId *int64, wantedEvents, wantedMethods []string) (
}, core.Node{}) }, core.Node{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptRepository := repositories.ReceiptRepository{DB: db} receiptRepository := repositories.FullSyncReceiptRepository{DB: db}
logRepository := repositories.LogRepository{DB: db} logRepository := repositories.LogRepository{DB: db}
blockRepository := *repositories.NewBlockRepository(db) blockRepository := *repositories.NewBlockRepository(db)
@ -225,6 +225,9 @@ func TearDown(db *postgres.DB) {
tx, err := db.Beginx() tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM addresses`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`) _, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -0,0 +1,62 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories
import (
"database/sql"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type AddressRepository struct{}
func (AddressRepository) GetOrCreateAddress(db *postgres.DB, address string) (int64, error) {
stringAddressToCommonAddress := common.HexToAddress(address)
hexAddress := stringAddressToCommonAddress.Hex()
var addressId int64
getErr := db.Get(&addressId, `SELECT id FROM public.addresses WHERE address = $1`, hexAddress)
if getErr == sql.ErrNoRows {
insertErr := db.QueryRow(`INSERT INTO public.addresses (address) VALUES($1) RETURNING id`, hexAddress).Scan(&addressId)
return addressId, insertErr
}
return addressId, getErr
}
func (AddressRepository) GetOrCreateAddressInTransaction(tx *sqlx.Tx, address string) (int64, error) {
stringAddressToCommonAddress := common.HexToAddress(address)
hexAddress := stringAddressToCommonAddress.Hex()
var addressId int64
getErr := tx.Get(&addressId, `SELECT id FROM public.addresses WHERE address = $1`, hexAddress)
if getErr == sql.ErrNoRows {
insertErr := tx.QueryRow(`INSERT INTO public.addresses (address) VALUES($1) RETURNING id`, hexAddress).Scan(&addressId)
return addressId, insertErr
}
return addressId, getErr
}
func (AddressRepository) GetAddressById(db *postgres.DB, id int64) (string, error) {
var address string
getErr := db.Get(&address, `SELECT address FROM public.addresses WHERE id = $1`, id)
return address, getErr
}

View File

@ -0,0 +1,181 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories_test
import (
"strings"
"github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("address lookup", func() {
var (
db *postgres.DB
repo repositories.AddressRepository
address = fakes.FakeAddress.Hex()
)
BeforeEach(func() {
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
repo = repositories.AddressRepository{}
})
AfterEach(func() {
test_config.CleanTestDB(db)
})
type dbAddress struct {
Id int64
Address string
}
Describe("GetOrCreateAddress", func() {
It("creates an address record", func() {
addressId, createErr := repo.GetOrCreateAddress(db, address)
Expect(createErr).NotTo(HaveOccurred())
var actualAddress dbAddress
getErr := db.Get(&actualAddress, `SELECT id, address FROM public.addresses LIMIT 1`)
Expect(getErr).NotTo(HaveOccurred())
expectedAddress := dbAddress{Id: addressId, Address: address}
Expect(actualAddress).To(Equal(expectedAddress))
})
It("returns the existing record id if the address already exists", func() {
createId, createErr := repo.GetOrCreateAddress(db, address)
Expect(createErr).NotTo(HaveOccurred())
getId, getErr := repo.GetOrCreateAddress(db, address)
Expect(getErr).NotTo(HaveOccurred())
var addressCount int
addressErr := db.Get(&addressCount, `SELECT count(*) FROM public.addresses`)
Expect(addressErr).NotTo(HaveOccurred())
Expect(addressCount).To(Equal(1))
Expect(createId).To(Equal(getId))
})
It("gets upper-cased addresses", func() {
upperAddress := strings.ToUpper(address)
upperAddressId, createErr := repo.GetOrCreateAddress(db, upperAddress)
Expect(createErr).NotTo(HaveOccurred())
mixedCaseAddressId, getErr := repo.GetOrCreateAddress(db, address)
Expect(getErr).NotTo(HaveOccurred())
Expect(upperAddressId).To(Equal(mixedCaseAddressId))
})
It("gets lower-cased addresses", func() {
lowerAddress := strings.ToLower(address)
upperAddressId, createErr := repo.GetOrCreateAddress(db, lowerAddress)
Expect(createErr).NotTo(HaveOccurred())
mixedCaseAddressId, getErr := repo.GetOrCreateAddress(db, address)
Expect(getErr).NotTo(HaveOccurred())
Expect(upperAddressId).To(Equal(mixedCaseAddressId))
})
})
Describe("GetOrCreateAddressInTransaction", func() {
var (
tx *sqlx.Tx
txErr error
)
BeforeEach(func() {
tx, txErr = db.Beginx()
Expect(txErr).NotTo(HaveOccurred())
})
AfterEach(func() {
tx.Rollback()
})
It("creates an address record", func() {
addressId, createErr := repo.GetOrCreateAddressInTransaction(tx, address)
Expect(createErr).NotTo(HaveOccurred())
commitErr := tx.Commit()
Expect(commitErr).NotTo(HaveOccurred())
var actualAddress dbAddress
getErr := db.Get(&actualAddress, `SELECT id, address FROM public.addresses LIMIT 1`)
Expect(getErr).NotTo(HaveOccurred())
expectedAddress := dbAddress{Id: addressId, Address: address}
Expect(actualAddress).To(Equal(expectedAddress))
})
It("returns the existing record id if the address already exists", func() {
_, createErr := repo.GetOrCreateAddressInTransaction(tx, address)
Expect(createErr).NotTo(HaveOccurred())
_, getErr := repo.GetOrCreateAddressInTransaction(tx, address)
Expect(getErr).NotTo(HaveOccurred())
tx.Commit()
var addressCount int
addressErr := db.Get(&addressCount, `SELECT count(*) FROM public.addresses`)
Expect(addressErr).NotTo(HaveOccurred())
})
It("gets upper-cased addresses", func() {
upperAddress := strings.ToUpper(address)
upperAddressId, createErr := repo.GetOrCreateAddressInTransaction(tx, upperAddress)
Expect(createErr).NotTo(HaveOccurred())
mixedCaseAddressId, getErr := repo.GetOrCreateAddressInTransaction(tx, address)
Expect(getErr).NotTo(HaveOccurred())
tx.Commit()
Expect(upperAddressId).To(Equal(mixedCaseAddressId))
})
It("gets lower-cased addresses", func() {
lowerAddress := strings.ToLower(address)
upperAddressId, createErr := repo.GetOrCreateAddressInTransaction(tx, lowerAddress)
Expect(createErr).NotTo(HaveOccurred())
mixedCaseAddressId, getErr := repo.GetOrCreateAddressInTransaction(tx, address)
Expect(getErr).NotTo(HaveOccurred())
tx.Commit()
Expect(upperAddressId).To(Equal(mixedCaseAddressId))
})
})
Describe("GetAddressById", func() {
It("gets and address by it's id", func() {
addressId, createErr := repo.GetOrCreateAddress(db, address)
Expect(createErr).NotTo(HaveOccurred())
actualAddress, getErr := repo.GetAddressById(db, addressId)
Expect(getErr).NotTo(HaveOccurred())
Expect(actualAddress).To(Equal(address))
})
It("returns an error if the id doesn't exist", func() {
_, getErr := repo.GetAddressById(db, 0)
Expect(getErr).To(HaveOccurred())
Expect(getErr).To(MatchError("sql: no rows in result set"))
})
})
})

View File

@ -234,7 +234,8 @@ func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId in
return err return err
} }
if hasReceipt(transaction) { if hasReceipt(transaction) {
receiptId, err := blockRepository.createReceipt(tx, blockId, transaction.Receipt) receiptRepo := FullSyncReceiptRepository{}
receiptId, err := receiptRepo.CreateFullSyncReceiptInTx(blockId, transaction.Receipt, tx)
if err != nil { if err != nil {
return err return err
} }
@ -256,22 +257,6 @@ func hasReceipt(transaction core.TransactionModel) bool {
return transaction.Receipt.TxHash != "" return transaction.Receipt.TxHash != ""
} }
func (blockRepository BlockRepository) createReceipt(tx *sqlx.Tx, blockId int64, receipt core.Receipt) (int, error) {
//Not currently persisting log bloom filters
var receiptId int
err := tx.QueryRow(
`INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil {
log.Error("createReceipt: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, nil
}
func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) { func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) {
var retrievedBlockHash string var retrievedBlockHash string
// TODO: handle possible error // TODO: handle possible error
@ -283,7 +268,7 @@ func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, b
return retrievedBlockHash, blockExists(retrievedBlockHash) return retrievedBlockHash, blockExists(retrievedBlockHash)
} }
func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int) error { func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int64) error {
for _, tlog := range logs { for _, tlog := range logs {
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)

View File

@ -26,17 +26,17 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
type ReceiptRepository struct { type FullSyncReceiptRepository struct {
*postgres.DB *postgres.DB
} }
func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error { func (receiptRepository FullSyncReceiptRepository) CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error {
tx, err := receiptRepository.DB.Beginx() tx, err := receiptRepository.DB.Beginx()
if err != nil { if err != nil {
return err return err
} }
for _, receipt := range receipts { for _, receipt := range receipts {
receiptId, err := createReceipt(receipt, blockId, tx) receiptId, err := receiptRepository.CreateFullSyncReceiptInTx(blockId, receipt, tx)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return err return err
@ -53,21 +53,6 @@ func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64,
return nil return nil
} }
func createReceipt(receipt core.Receipt, blockId int64, tx *sqlx.Tx) (int64, error) {
var receiptId int64
err := tx.QueryRow(
`INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId,
).Scan(&receiptId)
if err != nil {
logrus.Error("createReceipt: Error inserting: ", err)
}
return receiptId, err
}
func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error { func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec( _, err := tx.Exec(
@ -83,27 +68,30 @@ func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error {
return nil return nil
} }
func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) { func (FullSyncReceiptRepository) CreateFullSyncReceiptInTx(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) {
tx, _ := receiptRepository.DB.Beginx()
var receiptId int64 var receiptId int64
addressId, getAddressErr := AddressRepository{}.GetOrCreateAddressInTransaction(tx, receipt.ContractAddress)
if getAddressErr != nil {
logrus.Error("createReceipt: Error getting address id: ", getAddressErr)
return receiptId, getAddressErr
}
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO full_sync_receipts `INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id) (contract_address_id, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) addressId, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
logrus.Warning("CreateReceipt: error inserting receipt: ", err) logrus.Warning("CreateReceipt: error inserting receipt: ", err)
return receiptId, err return receiptId, err
} }
tx.Commit()
return receiptId, nil return receiptId, nil
} }
func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) { func (receiptRepository FullSyncReceiptRepository) GetFullSyncReceipt(txHash string) (core.Receipt, error) {
row := receiptRepository.DB.QueryRow( row := receiptRepository.DB.QueryRow(
`SELECT contract_address, `SELECT contract_address_id,
tx_hash, tx_hash,
cumulative_gas_used, cumulative_gas_used,
gas_used, gas_used,

View File

@ -30,7 +30,7 @@ import (
var _ = Describe("Receipt Repository", func() { var _ = Describe("Receipt Repository", func() {
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
var logRepository datastore.LogRepository var logRepository datastore.LogRepository
var receiptRepository datastore.ReceiptRepository var receiptRepository datastore.FullSyncReceiptRepository
var db *postgres.DB var db *postgres.DB
var node core.Node var node core.Node
BeforeEach(func() { BeforeEach(func() {
@ -44,7 +44,7 @@ var _ = Describe("Receipt Repository", func() {
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
blockRepository = repositories.NewBlockRepository(db) blockRepository = repositories.NewBlockRepository(db)
logRepository = repositories.LogRepository{DB: db} logRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.FullSyncReceiptRepository{DB: db}
}) })
Describe("Saving multiple receipts", func() { Describe("Saving multiple receipts", func() {
@ -84,12 +84,12 @@ var _ = Describe("Receipt Repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
persistedReceiptOne, err := receiptRepository.GetReceipt(txHashOne) persistedReceiptOne, err := receiptRepository.GetFullSyncReceipt(txHashOne)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(persistedReceiptOne).NotTo(BeNil()) Expect(persistedReceiptOne).NotTo(BeNil())
Expect(persistedReceiptOne.TxHash).To(Equal(txHashOne)) Expect(persistedReceiptOne.TxHash).To(Equal(txHashOne))
persistedReceiptTwo, err := receiptRepository.GetReceipt(txHashTwo) persistedReceiptTwo, err := receiptRepository.GetFullSyncReceipt(txHashTwo)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(persistedReceiptTwo).NotTo(BeNil()) Expect(persistedReceiptTwo).NotTo(BeNil())
Expect(persistedReceiptTwo.TxHash).To(Equal(txHashTwo)) Expect(persistedReceiptTwo.TxHash).To(Equal(txHashTwo))
@ -124,7 +124,7 @@ var _ = Describe("Receipt Repository", func() {
_, err := blockRepository.CreateOrUpdateBlock(block) _, err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receipt, err := receiptRepository.GetReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223") receipt, err := receiptRepository.GetFullSyncReceipt("0xe340558980f89d5f86045ac11e5cc34e4bcec20f9f1e2a427aa39d87114e8223")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
//Not currently serializing bloom logs //Not currently serializing bloom logs
Expect(receipt.Bloom).To(Equal(core.Receipt{}.Bloom)) Expect(receipt.Bloom).To(Equal(core.Receipt{}.Bloom))
@ -136,7 +136,7 @@ var _ = Describe("Receipt Repository", func() {
}) })
It("returns ErrReceiptDoesNotExist when receipt does not exist", func() { It("returns ErrReceiptDoesNotExist when receipt does not exist", func() {
receipt, err := receiptRepository.GetReceipt("DOES NOT EXIST") receipt, err := receiptRepository.GetFullSyncReceipt("DOES NOT EXIST")
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(receipt).To(BeZero()) Expect(receipt).To(BeZero())
}) })
@ -154,7 +154,7 @@ var _ = Describe("Receipt Repository", func() {
_, err := blockRepository.CreateOrUpdateBlock(block) _, err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = receiptRepository.GetReceipt(receipt.TxHash) _, err = receiptRepository.GetFullSyncReceipt(receipt.TxHash)
Expect(err).To(Not(HaveOccurred())) Expect(err).To(Not(HaveOccurred()))
}) })
}) })

View File

@ -85,22 +85,6 @@ func (repository HeaderRepository) CreateTransactionInTx(tx *sqlx.Tx, headerID i
return txId, err return txId, err
} }
func (repository HeaderRepository) CreateReceiptInTx(tx *sqlx.Tx, headerID, transactionID int64, receipt core.Receipt) (int64, error) {
var receiptId int64
err := tx.QueryRowx(`INSERT INTO public.header_sync_receipts
(header_id, transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, transaction_id) DO UPDATE
SET (contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp) = ($3, $4::NUMERIC, $5::NUMERIC, $6, $7, $8, $9)
RETURNING id`,
headerID, transactionID, receipt.ContractAddress, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, receipt.TxHash, receipt.Rlp).Scan(&receiptId)
if err != nil {
log.Error("header_repository: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, err
}
func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
var header core.Header var header core.Header
err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,

View File

@ -216,26 +216,40 @@ var _ = Describe("Block header repository", func() {
Rlp: []byte{1, 2, 3}, Rlp: []byte{1, 2, 3},
} }
_, receiptErr := repo.CreateReceiptInTx(tx, headerID, txId, receipt) receiptRepo := repositories.HeaderSyncReceiptRepository{}
_, receiptErr := receiptRepo.CreateHeaderSyncReceiptInTx(headerID, txId, receipt, tx)
Expect(receiptErr).ToNot(HaveOccurred())
commitErr := tx.Commit() commitErr := tx.Commit()
Expect(commitErr).ToNot(HaveOccurred()) Expect(commitErr).ToNot(HaveOccurred())
Expect(receiptErr).ToNot(HaveOccurred())
type idModel struct { type idModel struct {
TransactionId int64 `db:"transaction_id"` TransactionId int64 `db:"transaction_id"`
core.Receipt ContractAddressId int64 `db:"contract_address_id"`
CumulativeGasUsed uint64 `db:"cumulative_gas_used"`
GasUsed uint64 `db:"gas_used"`
StateRoot string `db:"state_root"`
Status int
TxHash string `db:"tx_hash"`
Rlp []byte `db:"rlp"`
} }
var addressId int64
getAddressErr := db.Get(&addressId, `SELECT id FROM addresses WHERE address = $1`, contractAddr.Hex())
Expect(getAddressErr).NotTo(HaveOccurred())
var dbReceipt idModel var dbReceipt idModel
err = db.Get(&dbReceipt, getReceiptErr := db.Get(&dbReceipt,
`SELECT transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp `SELECT transaction_id, contract_address_id, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp
FROM public.header_sync_receipts WHERE header_id = $1`, headerID) FROM public.header_sync_receipts WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred()) Expect(getReceiptErr).NotTo(HaveOccurred())
Expect(dbReceipt.TransactionId).To(Equal(txId)) Expect(dbReceipt.TransactionId).To(Equal(txId))
Expect(dbReceipt.TxHash).To(Equal(txHash.Hex())) Expect(dbReceipt.TxHash).To(Equal(txHash.Hex()))
Expect(dbReceipt.ContractAddress).To(Equal(contractAddr.Hex())) Expect(dbReceipt.ContractAddressId).To(Equal(addressId))
Expect(dbReceipt.CumulativeGasUsed).To(Equal(uint64(100))) Expect(dbReceipt.CumulativeGasUsed).To(Equal(uint64(100)))
Expect(dbReceipt.GasUsed).To(Equal(uint64(10))) Expect(dbReceipt.GasUsed).To(Equal(uint64(10)))
Expect(dbReceipt.StateRoot).To(Equal(stateRoot.Hex())) Expect(dbReceipt.StateRoot).To(Equal(stateRoot.Hex()))
Expect(dbReceipt.Status).To(Equal(0))
Expect(dbReceipt.Rlp).To(Equal([]byte{1, 2, 3})) Expect(dbReceipt.Rlp).To(Equal([]byte{1, 2, 3}))
}) })
}) })

View File

@ -0,0 +1,46 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories
import (
"github.com/ethereum/go-ethereum/log"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/core"
)
type HeaderSyncReceiptRepository struct{}
func (HeaderSyncReceiptRepository) CreateHeaderSyncReceiptInTx(headerID, transactionID int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) {
var receiptId int64
addressId, getAddressErr := AddressRepository{}.GetOrCreateAddressInTransaction(tx, receipt.ContractAddress)
if getAddressErr != nil {
log.Error("createReceipt: Error getting address id: ", getAddressErr)
return receiptId, getAddressErr
}
err := tx.QueryRowx(`INSERT INTO public.header_sync_receipts
(header_id, transaction_id, contract_address_id, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, transaction_id) DO UPDATE
SET (contract_address_id, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp) = ($3, $4::NUMERIC, $5::NUMERIC, $6, $7, $8, $9)
RETURNING id`,
headerID, transactionID, addressId, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, receipt.TxHash, receipt.Rlp).Scan(&receiptId)
if err != nil {
log.Error("header_repository: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, err
}

View File

@ -0,0 +1,133 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories_test
import (
"encoding/json"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Header Sync Receipt Repo", func() {
var (
rawHeader []byte
err error
timestamp string
db *postgres.DB
receiptRepo repositories.HeaderSyncReceiptRepository
headerRepo repositories.HeaderRepository
header core.Header
)
BeforeEach(func() {
rawHeader, err = json.Marshal(types.Header{})
Expect(err).NotTo(HaveOccurred())
timestamp = big.NewInt(123456789).String()
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
receiptRepo = repositories.HeaderSyncReceiptRepository{}
headerRepo = repositories.NewHeaderRepository(db)
header = core.Header{
BlockNumber: 100,
Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex(),
Raw: rawHeader,
Timestamp: timestamp,
}
})
Describe("creating a receipt", func() {
It("adds a receipt in a tx", func() {
headerID, err := headerRepo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.TransactionModel{
Data: []byte{},
From: fromAddress.Hex(),
GasLimit: 0,
GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
txId, txErr := headerRepo.CreateTransactionInTx(tx, headerID, transaction)
Expect(txErr).ToNot(HaveOccurred())
contractAddr := common.HexToAddress("0x1234")
stateRoot := common.HexToHash("0x5678")
receipt := core.Receipt{
ContractAddress: contractAddr.Hex(),
TxHash: txHash.Hex(),
GasUsed: 10,
CumulativeGasUsed: 100,
StateRoot: stateRoot.Hex(),
Rlp: []byte{1, 2, 3},
}
_, receiptErr := receiptRepo.CreateHeaderSyncReceiptInTx(headerID, txId, receipt, tx)
Expect(receiptErr).ToNot(HaveOccurred())
commitErr := tx.Commit()
Expect(commitErr).ToNot(HaveOccurred())
type idModel struct {
TransactionId int64 `db:"transaction_id"`
ContractAddressId int64 `db:"contract_address_id"`
CumulativeGasUsed uint64 `db:"cumulative_gas_used"`
GasUsed uint64 `db:"gas_used"`
StateRoot string `db:"state_root"`
Status int
TxHash string `db:"tx_hash"`
Rlp []byte `db:"rlp"`
}
var addressId int64
getAddressErr := db.Get(&addressId, `SELECT id FROM addresses WHERE address = $1`, contractAddr.Hex())
Expect(getAddressErr).NotTo(HaveOccurred())
var dbReceipt idModel
getReceiptErr := db.Get(&dbReceipt,
`SELECT transaction_id, contract_address_id, cumulative_gas_used, gas_used, state_root, status, tx_hash, rlp
FROM public.header_sync_receipts WHERE header_id = $1`, headerID)
Expect(getReceiptErr).NotTo(HaveOccurred())
Expect(dbReceipt.TransactionId).To(Equal(txId))
Expect(dbReceipt.TxHash).To(Equal(txHash.Hex()))
Expect(dbReceipt.ContractAddressId).To(Equal(addressId))
Expect(dbReceipt.CumulativeGasUsed).To(Equal(uint64(100)))
Expect(dbReceipt.GasUsed).To(Equal(uint64(10)))
Expect(dbReceipt.StateRoot).To(Equal(stateRoot.Hex()))
Expect(dbReceipt.Status).To(Equal(0))
Expect(dbReceipt.Rlp).To(Equal([]byte{1, 2, 3}))
})
})
})

View File

@ -34,7 +34,7 @@ var _ = Describe("Logs Repository", func() {
var db *postgres.DB var db *postgres.DB
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
var logsRepository datastore.LogRepository var logsRepository datastore.LogRepository
var receiptRepository datastore.ReceiptRepository var receiptRepository datastore.FullSyncReceiptRepository
var node core.Node var node core.Node
BeforeEach(func() { BeforeEach(func() {
@ -48,14 +48,16 @@ var _ = Describe("Logs Repository", func() {
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
blockRepository = repositories.NewBlockRepository(db) blockRepository = repositories.NewBlockRepository(db)
logsRepository = repositories.LogRepository{DB: db} logsRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.FullSyncReceiptRepository{DB: db}
}) })
It("returns the log when it exists", func() { It("returns the log when it exists", func() {
blockNumber := int64(12345) blockNumber := int64(12345)
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber}) blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateFullSyncReceiptInTx(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logsRepository.CreateLogs([]core.Log{{ err = logsRepository.CreateLogs([]core.Log{{
BlockNumber: blockNumber, BlockNumber: blockNumber,
@ -91,7 +93,9 @@ var _ = Describe("Logs Repository", func() {
blockNumber := int64(12345) blockNumber := int64(12345)
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber}) blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateFullSyncReceiptInTx(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logsRepository.CreateLogs([]core.Log{{ err = logsRepository.CreateLogs([]core.Log{{

View File

@ -19,6 +19,7 @@ package repositories_test
import ( import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -32,7 +33,7 @@ var _ = Describe("Watched Events Repository", func() {
var blocksRepository datastore.BlockRepository var blocksRepository datastore.BlockRepository
var filterRepository datastore.FilterRepository var filterRepository datastore.FilterRepository
var logRepository datastore.LogRepository var logRepository datastore.LogRepository
var receiptRepository datastore.ReceiptRepository var receiptRepository datastore.FullSyncReceiptRepository
var watchedEventRepository datastore.WatchedEventRepository var watchedEventRepository datastore.WatchedEventRepository
BeforeEach(func() { BeforeEach(func() {
@ -41,7 +42,7 @@ var _ = Describe("Watched Events Repository", func() {
blocksRepository = repositories.NewBlockRepository(db) blocksRepository = repositories.NewBlockRepository(db)
filterRepository = repositories.FilterRepository{DB: db} filterRepository = repositories.FilterRepository{DB: db}
logRepository = repositories.LogRepository{DB: db} logRepository = repositories.LogRepository{DB: db}
receiptRepository = repositories.ReceiptRepository{DB: db} receiptRepository = repositories.FullSyncReceiptRepository{DB: db}
watchedEventRepository = repositories.WatchedEventRepository{DB: db} watchedEventRepository = repositories.WatchedEventRepository{DB: db}
}) })
@ -79,7 +80,10 @@ var _ = Describe("Watched Events Repository", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{}) blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, txBeginErr := db.Beginx()
Expect(txBeginErr).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateFullSyncReceiptInTx(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logRepository.CreateLogs(logs, receiptId) err = logRepository.CreateLogs(logs, receiptId)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -136,7 +140,9 @@ var _ = Describe("Watched Events Repository", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{Hash: "Ox123"}) blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{Hash: "Ox123"})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{TxHash: "0x123"}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateFullSyncReceiptInTx(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logRepository.CreateLogs(logs, receiptId) err = logRepository.CreateLogs(logs, receiptId)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -17,10 +17,15 @@
package datastore package datastore
import ( import (
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/filters"
) )
type AddressRepository interface {
GetOrCreateAddress(address string) (int, error)
}
type BlockRepository interface { type BlockRepository interface {
CreateOrUpdateBlock(block core.Block) (int64, error) CreateOrUpdateBlock(block core.Block) (int64, error)
GetBlock(blockNumber int64) (core.Block, error) GetBlock(blockNumber int64) (core.Block, error)
@ -51,10 +56,14 @@ type LogRepository interface {
GetLogs(address string, blockNumber int64) ([]core.Log, error) GetLogs(address string, blockNumber int64) ([]core.Log, error)
} }
type ReceiptRepository interface { type FullSyncReceiptRepository interface {
CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error
CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) CreateFullSyncReceiptInTx(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error)
GetReceipt(txHash string) (core.Receipt, error) GetFullSyncReceipt(txHash string) (core.Receipt, error)
}
type HeaderSyncReceiptRepository interface {
CreateFullSyncReceiptInTx(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error)
} }
type WatchedEventRepository interface { type WatchedEventRepository interface {

View File

@ -17,6 +17,7 @@
package fakes package fakes
import ( import (
"github.com/jmoiron/sqlx"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
@ -49,11 +50,11 @@ func (mrr *MockReceiptRepository) CreateReceiptsAndLogs(blockId int64, receipts
return mrr.createReceiptsAndLogsReturnErr return mrr.createReceiptsAndLogsReturnErr
} }
func (mrr *MockReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) { func (mrr *MockReceiptRepository) CreateFullSyncReceiptInTx(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) {
panic("implement me") panic("implement me")
} }
func (mrr *MockReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) { func (mrr *MockReceiptRepository) GetFullSyncReceipt(txHash string) (core.Receipt, error) {
panic("implement me") panic("implement me")
} }

View File

@ -26,10 +26,10 @@ type ColdImporter struct {
blockRepository datastore.BlockRepository blockRepository datastore.BlockRepository
converter common.BlockConverter converter common.BlockConverter
ethDB ethereum.Database ethDB ethereum.Database
receiptRepository datastore.ReceiptRepository receiptRepository datastore.FullSyncReceiptRepository
} }
func NewColdImporter(ethDB ethereum.Database, blockRepository datastore.BlockRepository, receiptRepository datastore.ReceiptRepository, converter common.BlockConverter) *ColdImporter { func NewColdImporter(ethDB ethereum.Database, blockRepository datastore.BlockRepository, receiptRepository datastore.FullSyncReceiptRepository, converter common.BlockConverter) *ColdImporter {
return &ColdImporter{ return &ColdImporter{
blockRepository: blockRepository, blockRepository: blockRepository,
converter: converter, converter: converter,

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package rpc_test package rpc_test
import ( import (

View File

@ -105,6 +105,7 @@ func NewTestDB(node core.Node) *postgres.DB {
} }
func CleanTestDB(db *postgres.DB) { func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM addresses")
db.MustExec("DELETE FROM blocks") db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM checked_headers") db.MustExec("DELETE FROM checked_headers")
// can't delete from eth_nodes since this function is called after the required eth_node is persisted // can't delete from eth_nodes since this function is called after the required eth_node is persisted