510 lines
15 KiB
Go
510 lines
15 KiB
Go
package repositories
|
|
|
|
import (
|
|
"database/sql"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"github.com/8thlight/vulcanizedb/pkg/config"
|
|
"github.com/8thlight/vulcanizedb/pkg/core"
|
|
"github.com/jmoiron/sqlx"
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
type Postgres struct {
|
|
Db *sqlx.DB
|
|
node core.Node
|
|
nodeId int64
|
|
}
|
|
|
|
var (
|
|
ErrDBInsertFailed = errors.New("postgres: insert failed")
|
|
ErrDBDeleteFailed = errors.New("postgres: delete failed")
|
|
ErrDBConnectionFailed = errors.New("postgres: db connection failed")
|
|
ErrUnableToSetNode = errors.New("postgres: unable to set node")
|
|
)
|
|
|
|
var ErrReceiptDoesNotExist = func(txHash string) error {
|
|
return errors.New(fmt.Sprintf("Receipt for tx: %v does not exist", txHash))
|
|
}
|
|
|
|
var ErrContractDoesNotExist = func(contractHash string) error {
|
|
return errors.New(fmt.Sprintf("Contract %v does not exist", contractHash))
|
|
}
|
|
|
|
var ErrBlockDoesNotExist = func(blockNumber int64) error {
|
|
return errors.New(fmt.Sprintf("Block number %d does not exist", blockNumber))
|
|
}
|
|
|
|
func NewPostgres(databaseConfig config.Database, node core.Node) (Postgres, error) {
|
|
connectString := config.DbConnectionString(databaseConfig)
|
|
db, err := sqlx.Connect("postgres", connectString)
|
|
if err != nil {
|
|
return Postgres{}, ErrDBConnectionFailed
|
|
}
|
|
pg := Postgres{Db: db, node: node}
|
|
err = pg.CreateNode(&node)
|
|
if err != nil {
|
|
return Postgres{}, ErrUnableToSetNode
|
|
}
|
|
return pg, nil
|
|
}
|
|
|
|
func (repository Postgres) SetBlocksStatus(chainHead int64) {
|
|
cutoff := chainHead - blocksFromHeadBeforeFinal
|
|
repository.Db.Exec(`
|
|
UPDATE blocks SET is_final = TRUE
|
|
WHERE is_final = FALSE AND block_number < $1`,
|
|
cutoff)
|
|
}
|
|
|
|
func (repository Postgres) CreateLogs(logs []core.Log) error {
|
|
tx, _ := repository.Db.BeginTx(context.Background(), nil)
|
|
for _, tlog := range logs {
|
|
_, err := tx.Exec(
|
|
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
ON CONFLICT (index, block_number)
|
|
DO UPDATE
|
|
SET block_number = $1,
|
|
address = $2,
|
|
tx_hash = $3,
|
|
index = $4,
|
|
topic0 = $5,
|
|
topic1 = $6,
|
|
topic2 = $7,
|
|
topic3 = $8,
|
|
data = $9
|
|
`,
|
|
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data,
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return ErrDBInsertFailed
|
|
}
|
|
}
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) FindLogs(address string, blockNumber int64) []core.Log {
|
|
logRows, _ := repository.Db.Query(
|
|
`SELECT block_number,
|
|
address,
|
|
tx_hash,
|
|
index,
|
|
topic0,
|
|
topic1,
|
|
topic2,
|
|
topic3,
|
|
data
|
|
FROM logs
|
|
WHERE address = $1 AND block_number = $2
|
|
ORDER BY block_number DESC`, address, blockNumber)
|
|
return repository.loadLogs(logRows)
|
|
}
|
|
|
|
func (repository *Postgres) CreateNode(node *core.Node) error {
|
|
var nodeId int64
|
|
err := repository.Db.QueryRow(
|
|
`INSERT INTO nodes (genesis_block, network_id, node_id, client_name)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (genesis_block, network_id, node_id)
|
|
DO UPDATE
|
|
SET genesis_block = $1,
|
|
network_id = $2,
|
|
node_id = $3,
|
|
client_name = $4
|
|
RETURNING id`,
|
|
node.GenesisBlock, node.NetworkId, node.Id, node.ClientName).Scan(&nodeId)
|
|
if err != nil {
|
|
return ErrUnableToSetNode
|
|
}
|
|
repository.nodeId = nodeId
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) CreateContract(contract core.Contract) error {
|
|
abi := contract.Abi
|
|
var abiToInsert *string
|
|
if abi != "" {
|
|
abiToInsert = &abi
|
|
}
|
|
_, err := repository.Db.Exec(
|
|
`INSERT INTO watched_contracts (contract_hash, contract_abi)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (contract_hash)
|
|
DO UPDATE
|
|
SET contract_hash = $1, contract_abi = $2
|
|
`, contract.Hash, abiToInsert)
|
|
if err != nil {
|
|
return ErrDBInsertFailed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) ContractExists(contractHash string) bool {
|
|
var exists bool
|
|
repository.Db.QueryRow(
|
|
`SELECT exists(
|
|
SELECT 1
|
|
FROM watched_contracts
|
|
WHERE contract_hash = $1)`, contractHash).Scan(&exists)
|
|
return exists
|
|
}
|
|
|
|
func (repository Postgres) FindContract(contractHash string) (core.Contract, error) {
|
|
var hash string
|
|
var abi string
|
|
contract := repository.Db.QueryRow(
|
|
`SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash)
|
|
err := contract.Scan(&hash, &abi)
|
|
if err == sql.ErrNoRows {
|
|
return core.Contract{}, ErrContractDoesNotExist(contractHash)
|
|
}
|
|
savedContract := repository.addTransactions(core.Contract{Hash: hash, Abi: abi})
|
|
return savedContract, nil
|
|
}
|
|
|
|
func (repository Postgres) MaxBlockNumber() int64 {
|
|
var highestBlockNumber int64
|
|
repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`)
|
|
return highestBlockNumber
|
|
}
|
|
|
|
func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highestBlockNumber int64) []int64 {
|
|
numbers := make([]int64, 0)
|
|
repository.Db.Select(&numbers,
|
|
`SELECT all_block_numbers
|
|
FROM (
|
|
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
|
|
LEFT JOIN blocks
|
|
ON block_number = all_block_numbers
|
|
WHERE block_number ISNULL`,
|
|
startingBlockNumber,
|
|
highestBlockNumber)
|
|
return numbers
|
|
}
|
|
|
|
func (repository Postgres) FindBlockByNumber(blockNumber int64) (core.Block, error) {
|
|
blockRows := repository.Db.QueryRow(
|
|
`SELECT id,
|
|
block_number,
|
|
block_gaslimit,
|
|
block_gasused,
|
|
block_time,
|
|
block_difficulty,
|
|
block_hash,
|
|
block_nonce,
|
|
block_parenthash,
|
|
block_size,
|
|
uncle_hash,
|
|
is_final,
|
|
block_miner,
|
|
block_extra_data,
|
|
block_reward,
|
|
block_uncles_reward
|
|
FROM blocks
|
|
WHERE node_id = $1 AND block_number = $2`, repository.nodeId, blockNumber)
|
|
savedBlock, err := repository.loadBlock(blockRows)
|
|
if err != nil {
|
|
switch err {
|
|
case sql.ErrNoRows:
|
|
return core.Block{}, ErrBlockDoesNotExist(blockNumber)
|
|
default:
|
|
return savedBlock, err
|
|
}
|
|
}
|
|
return savedBlock, nil
|
|
}
|
|
|
|
func (repository Postgres) BlockCount() int {
|
|
var count int
|
|
repository.Db.Get(&count, `SELECT COUNT(*) FROM blocks`)
|
|
return count
|
|
}
|
|
|
|
func (repository Postgres) getBlockHash(block core.Block) (string, bool) {
|
|
var retrievedBlockHash string
|
|
repository.Db.Get(&retrievedBlockHash,
|
|
`SELECT block_hash
|
|
FROM blocks
|
|
WHERE block_number = $1 AND node_id = $2`,
|
|
block.Number, repository.nodeId)
|
|
return retrievedBlockHash, blockExists(retrievedBlockHash)
|
|
}
|
|
|
|
func blockExists(retrievedBlockHash string) bool {
|
|
return retrievedBlockHash != ""
|
|
}
|
|
|
|
func (repository Postgres) CreateOrUpdateBlock(block core.Block) error {
|
|
var err error
|
|
retrievedBlockHash, ok := repository.getBlockHash(block)
|
|
if !ok {
|
|
err = repository.insertBlock(block)
|
|
return err
|
|
}
|
|
if ok && retrievedBlockHash != block.Hash {
|
|
err = repository.removeBlock(block.Number)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = repository.insertBlock(block)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) insertBlock(block core.Block) error {
|
|
var blockId int64
|
|
tx, _ := repository.Db.BeginTx(context.Background(), nil)
|
|
err := tx.QueryRow(
|
|
`INSERT INTO blocks
|
|
(node_id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash, is_final, block_miner, block_extra_data, block_reward, block_uncles_reward)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
|
RETURNING id `,
|
|
repository.nodeId, block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash, block.IsFinal, block.Miner, block.ExtraData, block.Reward, block.UnclesReward).
|
|
Scan(&blockId)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return ErrDBInsertFailed
|
|
}
|
|
err = repository.createTransactions(tx, blockId, block.Transactions)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return ErrDBInsertFailed
|
|
}
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) removeBlock(blockNumber int64) error {
|
|
_, err := repository.Db.Exec(
|
|
`DELETE FROM
|
|
blocks
|
|
WHERE block_number=$1 AND node_id=$2`,
|
|
blockNumber, repository.nodeId)
|
|
if err != nil {
|
|
return ErrDBDeleteFailed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) FindReceipt(txHash string) (core.Receipt, error) {
|
|
row := repository.Db.QueryRow(
|
|
`SELECT contract_address,
|
|
tx_hash,
|
|
cumulative_gas_used,
|
|
gas_used,
|
|
state_root,
|
|
status
|
|
FROM receipts
|
|
WHERE tx_hash = $1`, txHash)
|
|
receipt, err := loadReceipt(row)
|
|
if err != nil {
|
|
switch err {
|
|
case sql.ErrNoRows:
|
|
return core.Receipt{}, ErrReceiptDoesNotExist(txHash)
|
|
default:
|
|
return core.Receipt{}, err
|
|
}
|
|
}
|
|
return receipt, nil
|
|
}
|
|
|
|
func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
|
|
for _, transaction := range transactions {
|
|
err := repository.createTransaction(tx, blockId, transaction)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error {
|
|
var transactionId int
|
|
err := tx.QueryRow(
|
|
`INSERT INTO transactions
|
|
(block_id, tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value, tx_input_data)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
RETURNING id`,
|
|
blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.From, transaction.GasLimit, transaction.GasPrice, transaction.Value, transaction.Data).
|
|
Scan(&transactionId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if transaction.Receipt.TxHash != "" {
|
|
err = repository.createReceipt(tx, transactionId, transaction.Receipt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(transaction.Receipt.Logs) > 0 {
|
|
err = repository.CreateLogs(transaction.Receipt.Logs)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (repository Postgres) createReceipt(tx *sql.Tx, transactionId int, receipt core.Receipt) error {
|
|
//Not currently persisting log bloom filters
|
|
_, err := tx.Exec(
|
|
`INSERT INTO receipts
|
|
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, transaction_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
|
|
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, transactionId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func loadReceipt(receiptsRow *sql.Row) (core.Receipt, error) {
|
|
var contractAddress string
|
|
var txHash string
|
|
var cumulativeGasUsed int64
|
|
var gasUsed int64
|
|
var stateRoot string
|
|
var status int
|
|
|
|
err := receiptsRow.Scan(&contractAddress, &txHash, &cumulativeGasUsed, &gasUsed, &stateRoot, &status)
|
|
return core.Receipt{
|
|
TxHash: txHash,
|
|
ContractAddress: contractAddress,
|
|
CumulativeGasUsed: cumulativeGasUsed,
|
|
GasUsed: gasUsed,
|
|
StateRoot: stateRoot,
|
|
Status: status,
|
|
}, err
|
|
}
|
|
|
|
func (repository Postgres) loadBlock(blockRows *sql.Row) (core.Block, error) {
|
|
var blockId int64
|
|
var blockHash string
|
|
var blockNonce string
|
|
var blockNumber int64
|
|
var blockMiner string
|
|
var blockExtraData string
|
|
var blockParentHash string
|
|
var blockSize int64
|
|
var blockTime float64
|
|
var blockReward float64
|
|
var difficulty int64
|
|
var gasLimit float64
|
|
var gasUsed float64
|
|
var uncleHash string
|
|
var unclesReward float64
|
|
var isFinal bool
|
|
err := blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash, &isFinal, &blockMiner, &blockExtraData, &blockReward, &unclesReward)
|
|
if err != nil {
|
|
return core.Block{}, err
|
|
}
|
|
transactionRows, _ := repository.Db.Query(`
|
|
SELECT tx_hash,
|
|
tx_nonce,
|
|
tx_to,
|
|
tx_from,
|
|
tx_gaslimit,
|
|
tx_gasprice,
|
|
tx_value,
|
|
tx_input_data
|
|
FROM transactions
|
|
WHERE block_id = $1
|
|
ORDER BY tx_hash`, blockId)
|
|
transactions := repository.loadTransactions(transactionRows)
|
|
return core.Block{
|
|
Reward: blockReward,
|
|
Difficulty: difficulty,
|
|
ExtraData: blockExtraData,
|
|
GasLimit: int64(gasLimit),
|
|
GasUsed: int64(gasUsed),
|
|
Hash: blockHash,
|
|
IsFinal: isFinal,
|
|
Miner: blockMiner,
|
|
Nonce: blockNonce,
|
|
Number: blockNumber,
|
|
ParentHash: blockParentHash,
|
|
Size: blockSize,
|
|
Time: int64(blockTime),
|
|
Transactions: transactions,
|
|
UncleHash: uncleHash,
|
|
UnclesReward: unclesReward,
|
|
}, nil
|
|
}
|
|
|
|
func (repository Postgres) loadLogs(logsRows *sql.Rows) []core.Log {
|
|
var logs []core.Log
|
|
for logsRows.Next() {
|
|
var blockNumber int64
|
|
var address string
|
|
var txHash string
|
|
var index int64
|
|
var data string
|
|
topics := make([]string, 4)
|
|
logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data)
|
|
log := core.Log{
|
|
BlockNumber: blockNumber,
|
|
TxHash: txHash,
|
|
Address: address,
|
|
Index: index,
|
|
Data: data,
|
|
}
|
|
log.Topics = make(map[int]string)
|
|
for i, topic := range topics {
|
|
log.Topics[i] = topic
|
|
}
|
|
logs = append(logs, log)
|
|
}
|
|
return logs
|
|
}
|
|
|
|
func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Transaction {
|
|
var transactions []core.Transaction
|
|
for transactionRows.Next() {
|
|
var hash string
|
|
var nonce uint64
|
|
var to string
|
|
var from string
|
|
var gasLimit int64
|
|
var gasPrice int64
|
|
var inputData string
|
|
var value int64
|
|
transactionRows.Scan(&hash, &nonce, &to, &from, &gasLimit, &gasPrice, &value, &inputData)
|
|
transaction := core.Transaction{
|
|
Hash: hash,
|
|
Nonce: nonce,
|
|
To: to,
|
|
From: from,
|
|
GasLimit: gasLimit,
|
|
GasPrice: gasPrice,
|
|
Value: value,
|
|
Data: inputData,
|
|
}
|
|
transactions = append(transactions, transaction)
|
|
}
|
|
return transactions
|
|
}
|
|
|
|
func (repository Postgres) addTransactions(contract core.Contract) core.Contract {
|
|
transactionRows, _ := repository.Db.Query(`
|
|
SELECT tx_hash,
|
|
tx_nonce,
|
|
tx_to,
|
|
tx_from,
|
|
tx_gaslimit,
|
|
tx_gasprice,
|
|
tx_value,
|
|
tx_input_data
|
|
FROM transactions
|
|
WHERE tx_to = $1
|
|
ORDER BY block_id DESC`, contract.Hash)
|
|
transactions := repository.loadTransactions(transactionRows)
|
|
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
|
|
return savedContract
|
|
}
|