ipld-eth-server/pkg/repositories/postgres.go

526 lines
15 KiB
Go
Raw Normal View History

2017-11-03 13:01:35 +00:00
package repositories
import (
"database/sql"
"context"
"errors"
"fmt"
"github.com/8thlight/vulcanizedb/pkg/config"
2017-11-06 18:53:43 +00:00
"github.com/8thlight/vulcanizedb/pkg/core"
2017-11-03 13:01:35 +00:00
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
type Postgres struct {
Db *sqlx.DB
node core.Node
nodeId int64
2017-11-03 13:01:35 +00:00
}
var (
ErrDBInsertFailed = errors.New("postgres: insert failed")
ErrDBDeleteFailed = errors.New("postgres: delete failed")
ErrDBConnectionFailed = errors.New("postgres: db connection failed")
ErrUnableToSetNode = errors.New("postgres: unable to set node")
)
var ErrReceiptDoesNotExist = func(txHash string) error {
return errors.New(fmt.Sprintf("Receipt for tx: %v does not exist", txHash))
}
var ErrContractDoesNotExist = func(contractHash string) error {
return errors.New(fmt.Sprintf("Contract %v does not exist", contractHash))
}
var ErrBlockDoesNotExist = func(blockNumber int64) error {
return errors.New(fmt.Sprintf("Block number %d does not exist", blockNumber))
}
func NewPostgres(databaseConfig config.Database, node core.Node) (Postgres, error) {
connectString := config.DbConnectionString(databaseConfig)
db, err := sqlx.Connect("postgres", connectString)
if err != nil {
return Postgres{}, ErrDBConnectionFailed
}
pg := Postgres{Db: db, node: node}
err = pg.CreateNode(&node)
if err != nil {
return Postgres{}, ErrUnableToSetNode
}
return pg, nil
}
func (repository Postgres) SetBlocksStatus(chainHead int64) {
cutoff := chainHead - blocksFromHeadBeforeFinal
repository.Db.Exec(`
UPDATE blocks SET is_final = TRUE
WHERE is_final = FALSE AND block_number < $1`,
cutoff)
}
func (repository Postgres) FindLogs(address string, blockNumber int64) []core.Log {
logRows, _ := repository.Db.Query(
`SELECT block_number,
address,
tx_hash,
index,
topic0,
topic1,
topic2,
topic3,
data
FROM logs
WHERE address = $1 AND block_number = $2
ORDER BY block_number DESC`, address, blockNumber)
return repository.loadLogs(logRows)
}
func (repository *Postgres) CreateNode(node *core.Node) error {
var nodeId int64
err := repository.Db.QueryRow(
2018-01-10 21:54:36 +00:00
`INSERT INTO nodes (genesis_block, network_id, node_id, client_name)
VALUES ($1, $2, $3, $4)
ON CONFLICT (genesis_block, network_id, node_id)
DO UPDATE
2018-01-10 21:54:36 +00:00
SET genesis_block = $1,
network_id = $2,
node_id = $3,
client_name = $4
RETURNING id`,
2018-01-10 21:54:36 +00:00
node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId)
if err != nil {
return ErrUnableToSetNode
}
repository.nodeId = nodeId
return nil
}
2017-12-04 22:54:35 +00:00
func (repository Postgres) CreateContract(contract core.Contract) error {
abi := contract.Abi
var abiToInsert *string
if abi != "" {
abiToInsert = &abi
}
_, err := repository.Db.Exec(
`INSERT INTO watched_contracts (contract_hash, contract_abi)
VALUES ($1, $2)
ON CONFLICT (contract_hash)
DO UPDATE
SET contract_hash = $1, contract_abi = $2
`, contract.Hash, abiToInsert)
if err != nil {
return ErrDBInsertFailed
}
return nil
}
2017-12-04 22:54:35 +00:00
func (repository Postgres) ContractExists(contractHash string) bool {
var exists bool
repository.Db.QueryRow(
`SELECT exists(
SELECT 1
FROM watched_contracts
WHERE contract_hash = $1)`, contractHash).Scan(&exists)
return exists
}
func (repository Postgres) FindContract(contractHash string) (core.Contract, error) {
var hash string
var abi string
contract := repository.Db.QueryRow(
`SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash)
err := contract.Scan(&hash, &abi)
if err == sql.ErrNoRows {
return core.Contract{}, ErrContractDoesNotExist(contractHash)
}
savedContract := repository.addTransactions(core.Contract{Hash: hash, Abi: abi})
return savedContract, nil
}
func (repository Postgres) MaxBlockNumber() int64 {
var highestBlockNumber int64
repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`)
return highestBlockNumber
}
func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 {
numbers := make([]int64, 0)
repository.Db.Select(&numbers,
`SELECT all_block_numbers
FROM (
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
LEFT JOIN blocks
ON block_number = all_block_numbers
WHERE block_number ISNULL`,
startingBlockNumber,
highestBlockNumber)
return numbers
}
func (repository Postgres) FindBlockByNumber(blockNumber int64) (core.Block, error) {
blockRows := repository.Db.QueryRow(
`SELECT id,
block_number,
block_gaslimit,
block_gasused,
block_time,
block_difficulty,
block_hash,
block_nonce,
block_parenthash,
block_size,
uncle_hash,
2017-12-27 16:50:56 +00:00
is_final,
2017-12-27 18:10:08 +00:00
block_miner,
2017-12-28 16:06:13 +00:00
block_extra_data,
block_reward,
block_uncles_reward
FROM blocks
WHERE node_id = $1 AND block_number = $2`, repository.nodeId, blockNumber)
savedBlock, err := repository.loadBlock(blockRows)
if err != nil {
switch err {
case sql.ErrNoRows:
return core.Block{}, ErrBlockDoesNotExist(blockNumber)
default:
return savedBlock, err
}
2017-11-03 13:01:35 +00:00
}
return savedBlock, nil
2017-11-03 13:01:35 +00:00
}
func (repository Postgres) BlockCount() int {
var count int
repository.Db.Get(&count, `SELECT COUNT(*) FROM blocks`)
2017-11-03 13:01:35 +00:00
return count
}
func (repository Postgres) getBlockHash(block core.Block) (string, bool) {
var retrievedBlockHash string
repository.Db.Get(&retrievedBlockHash,
`SELECT block_hash
FROM blocks
WHERE block_number = $1 AND node_id = $2`,
block.Number, repository.nodeId)
return retrievedBlockHash, blockExists(retrievedBlockHash)
}
func blockExists(retrievedBlockHash string) bool {
return retrievedBlockHash != ""
}
func (repository Postgres) CreateOrUpdateBlock(block core.Block) error {
var err error
retrievedBlockHash, ok := repository.getBlockHash(block)
if !ok {
err = repository.insertBlock(block)
return err
}
if ok && retrievedBlockHash != block.Hash {
err = repository.removeBlock(block.Number)
if err != nil {
return err
}
err = repository.insertBlock(block)
return err
}
return nil
}
func (repository Postgres) insertBlock(block core.Block) error {
var blockId int64
tx, _ := repository.Db.BeginTx(context.Background(), nil)
err := tx.QueryRow(
`INSERT INTO blocks
2017-12-28 16:06:13 +00:00
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash, is_final, block_miner, block_extra_data, block_reward, block_uncles_reward)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id `,
2017-12-28 16:06:13 +00:00
repository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward).
Scan(&blockId)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
err = repository.createTransactions(tx, blockId, block.Transactions)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
tx.Commit()
return nil
2017-11-03 13:01:35 +00:00
}
func (repository Postgres) removeBlock(blockNumber int64) error {
_, err := repository.Db.Exec(
`DELETE FROM
blocks
WHERE block_number=$1 AND node_id=$2`,
blockNumber, repository.nodeId)
if err != nil {
return ErrDBDeleteFailed
}
return nil
}
func (repository Postgres) FindReceipt(txHash string) (core.Receipt, error) {
row := repository.Db.QueryRow(
`SELECT contract_address,
tx_hash,
cumulative_gas_used,
gas_used,
state_root,
status
FROM receipts
WHERE tx_hash = $1`, txHash)
receipt, err := loadReceipt(row)
if err != nil {
switch err {
case sql.ErrNoRows:
return core.Receipt{}, ErrReceiptDoesNotExist(txHash)
default:
return core.Receipt{}, err
}
}
return receipt, nil
}
func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
2017-11-03 13:01:35 +00:00
for _, transaction := range transactions {
err := repository.createTransaction(tx, blockId, transaction)
if err != nil {
return err
}
}
return nil
}
func (repository Postgres) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error {
var transactionId int
err := tx.QueryRow(
`INSERT INTO transactions
(block_id, tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value, tx_input_data)
VALUES ($1, $2, $3, $4, $5, $6, $7, cast(NULLIF($8, '') as NUMERIC), $9)
RETURNING id`,
blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.From, transaction.GasLimit, transaction.GasPrice, transaction.Value, transaction.Data).
Scan(&transactionId)
if err != nil {
return err
}
2018-01-15 21:27:45 +00:00
if hasReceipt(transaction) {
receiptId, err := repository.createReceipt(tx, transactionId, transaction.Receipt)
if err != nil {
return err
}
2018-01-15 21:27:45 +00:00
if hasLogs(transaction) {
err = repository.createLogs(tx, transaction.Receipt.Logs, receiptId)
if err != nil {
return err
}
2018-01-15 16:52:50 +00:00
}
2017-11-03 13:01:35 +00:00
}
return nil
2017-11-03 13:01:35 +00:00
}
2018-01-15 21:27:45 +00:00
func hasLogs(transaction core.Transaction) bool {
return len(transaction.Receipt.Logs) > 0
}
func hasReceipt(transaction core.Transaction) bool {
return transaction.Receipt.TxHash != ""
}
func (repository Postgres) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) (int, error) {
//Not currently persisting log bloom filters
2018-01-15 20:44:47 +00:00
var receiptId int
err := tx.QueryRow(
`INSERT INTO receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, transaction_id)
2018-01-15 20:44:47 +00:00
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, transactionId).Scan(&receiptId)
if err != nil {
2018-01-15 21:27:45 +00:00
return receiptId, err
}
return receiptId, nil
}
func (repository Postgres) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error {
for _, tlog := range logs {
_, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
2018-01-15 21:27:45 +00:00
`,
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, receiptId,
2018-01-15 21:27:45 +00:00
)
if err != nil {
return ErrDBInsertFailed
}
}
return nil
}
2018-01-15 20:44:47 +00:00
func (repository Postgres) CreateLogs(logs []core.Log) error {
tx, _ := repository.Db.BeginTx(context.Background(), nil)
for _, tlog := range logs {
_, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`,
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data,
)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
}
tx.Commit()
return nil
}
func loadReceipt(receiptsRow *sql.Row) (core.Receipt, error) {
var contractAddress string
var txHash string
var cumulativeGasUsed int64
var gasUsed int64
var stateRoot string
var status int
err := receiptsRow.Scan(&contractAddress, &txHash, &cumulativeGasUsed, &gasUsed, &stateRoot, &status)
return core.Receipt{
TxHash: txHash,
ContractAddress: contractAddress,
CumulativeGasUsed: cumulativeGasUsed,
GasUsed: gasUsed,
StateRoot: stateRoot,
Status: status,
}, err
}
func (repository Postgres) loadBlock(blockRows *sql.Row) (core.Block, error) {
2017-11-03 13:01:35 +00:00
var blockId int64
var blockHash string
var blockNonce string
var blockNumber int64
2017-12-27 16:50:56 +00:00
var blockMiner string
2017-12-27 18:10:08 +00:00
var blockExtraData string
2017-11-03 13:01:35 +00:00
var blockParentHash string
var blockSize int64
var blockTime float64
2017-12-28 16:06:13 +00:00
var blockReward float64
2017-11-03 13:01:35 +00:00
var difficulty int64
var gasLimit float64
var gasUsed float64
var uncleHash string
2017-12-28 16:06:13 +00:00
var unclesReward float64
var isFinal bool
2017-12-28 16:06:13 +00:00
err := blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash, &isFinal, &blockMiner, &blockExtraData, &blockReward, &unclesReward)
if err != nil {
return core.Block{}, err
}
transactionRows, _ := repository.Db.Query(`
SELECT tx_hash,
tx_nonce,
tx_to,
tx_from,
tx_gaslimit,
tx_gasprice,
2017-12-28 23:04:15 +00:00
tx_value,
tx_input_data
FROM transactions
WHERE block_id = $1
ORDER BY tx_hash`, blockId)
transactions := repository.loadTransactions(transactionRows)
2017-11-03 13:01:35 +00:00
return core.Block{
2017-12-28 16:06:13 +00:00
Reward: blockReward,
2017-11-03 13:01:35 +00:00
Difficulty: difficulty,
2017-12-27 18:10:08 +00:00
ExtraData: blockExtraData,
2017-11-03 13:01:35 +00:00
GasLimit: int64(gasLimit),
GasUsed: int64(gasUsed),
Hash: blockHash,
2017-12-27 18:10:08 +00:00
IsFinal: isFinal,
Miner: blockMiner,
2017-11-03 13:01:35 +00:00
Nonce: blockNonce,
Number: blockNumber,
ParentHash: blockParentHash,
Size: blockSize,
Time: int64(blockTime),
Transactions: transactions,
UncleHash: uncleHash,
2017-12-28 16:06:13 +00:00
UnclesReward: unclesReward,
}, nil
2017-11-03 13:01:35 +00:00
}
func (repository Postgres) loadLogs(logsRows *sql.Rows) []core.Log {
var logs []core.Log
for logsRows.Next() {
var blockNumber int64
var address string
var txHash string
var index int64
var data string
2018-01-15 20:44:47 +00:00
var topics core.Topics
logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data)
log := core.Log{
BlockNumber: blockNumber,
TxHash: txHash,
Address: address,
Index: index,
Data: data,
}
for i, topic := range topics {
log.Topics[i] = topic
}
logs = append(logs, log)
}
return logs
}
func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Transaction {
2017-11-03 13:01:35 +00:00
var transactions []core.Transaction
for transactionRows.Next() {
var hash string
var nonce uint64
var to string
2017-11-08 20:55:35 +00:00
var from string
2017-11-03 13:01:35 +00:00
var gasLimit int64
var gasPrice int64
2017-12-28 23:04:15 +00:00
var inputData string
var value string
2017-12-28 23:04:15 +00:00
transactionRows.Scan(&hash, &nonce, &to, &from, &gasLimit, &gasPrice, &value, &inputData)
2017-11-03 13:01:35 +00:00
transaction := core.Transaction{
Hash: hash,
Nonce: nonce,
To: to,
2017-11-08 20:55:35 +00:00
From: from,
2017-11-03 13:01:35 +00:00
GasLimit: gasLimit,
GasPrice: gasPrice,
Value: value,
2017-12-28 23:04:15 +00:00
Data: inputData,
2017-11-03 13:01:35 +00:00
}
transactions = append(transactions, transaction)
}
return transactions
}
func (repository Postgres) addTransactions(contract core.Contract) core.Contract {
transactionRows, _ := repository.Db.Query(`
SELECT tx_hash,
tx_nonce,
tx_to,
tx_from,
tx_gaslimit,
tx_gasprice,
2017-12-28 23:04:15 +00:00
tx_value,
tx_input_data
FROM transactions
WHERE tx_to = $1
ORDER BY block_id DESC`, contract.Hash)
transactions := repository.loadTransactions(transactionRows)
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
return savedContract
}