Add database transactions to postgres repository

* Repository returns error when CreateBlock fails.
This commit is contained in:
Matt Krump 2017-11-07 11:56:49 -06:00 committed by Eric Meyer
parent d3b3da7f17
commit df9e7ebcc5
4 changed files with 82 additions and 12 deletions

View File

@ -24,8 +24,9 @@ func NewInMemory() *InMemory {
} }
} }
func (repository *InMemory) CreateBlock(block core.Block) { func (repository *InMemory) CreateBlock(block core.Block) error {
repository.blocks[block.Number] = &block repository.blocks[block.Number] = &block
return nil
} }
func (repository *InMemory) BlockCount() int { func (repository *InMemory) BlockCount() int {

View File

@ -3,6 +3,9 @@ package repositories
import ( import (
"database/sql" "database/sql"
"context"
"errors"
"github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/core"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/lib/pq" _ "github.com/lib/pq"
@ -12,6 +15,10 @@ type Postgres struct {
Db *sqlx.DB Db *sqlx.DB
} }
var (
ErrDBInsertFailed = errors.New("postgres: insert failed")
)
func (repository Postgres) MaxBlockNumber() int64 { func (repository Postgres) MaxBlockNumber() int64 {
var highestBlockNumber int64 var highestBlockNumber int64
repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`) repository.Db.Get(&highestBlockNumber, `SELECT MAX(block_number) FROM blocks`)
@ -57,26 +64,42 @@ func (repository Postgres) BlockCount() int {
return count return count
} }
func (repository Postgres) CreateBlock(block core.Block) { func (repository Postgres) CreateBlock(block core.Block) error {
insertedBlock := repository.Db.QueryRow( tx, _ := repository.Db.BeginTx(context.Background(), nil)
var blockId int64
err := tx.QueryRow(
`INSERT INTO blocks `INSERT INTO blocks
(block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash) (block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id`, VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash) RETURNING id `,
var blockId int64 block.Number, block.GasLimit, block.GasUsed, block.Time, block.Difficulty, block.Hash, block.Nonce, block.ParentHash, block.Size, block.UncleHash).
insertedBlock.Scan(&blockId) Scan(&blockId)
repository.createTransactions(blockId, block.Transactions) if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
err = repository.createTransactions(tx, blockId, block.Transactions)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
tx.Commit()
return nil
} }
func (repository Postgres) createTransactions(blockId int64, transactions []core.Transaction) { func (repository Postgres) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error {
for _, transaction := range transactions { for _, transaction := range transactions {
repository.Db.MustExec( _, err := tx.Exec(
`INSERT INTO transactions `INSERT INTO transactions
(block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value) (block_id, tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value)
VALUES ($1, $2, $3, $4, $5, $6, $7)`, VALUES ($1, $2, $3, $4, $5, $6, $7)`,
blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.GasLimit, transaction.GasPrice, transaction.Value) blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.GasLimit, transaction.GasPrice, transaction.Value)
if err != nil {
return err
} }
} }
return nil
}
func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block { func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
var blockId int64 var blockId int64
@ -107,7 +130,7 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
} }
} }
func (repository Postgres) loadTransactions(blockId int64) []core.Transaction { func (repository Postgres) loadTransactions(blockId int64) []core.Transaction {
transactionRows, _ := repository.Db.Query("SELECT tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value FROM transactions") transactionRows, _ := repository.Db.Query(`SELECT tx_hash, tx_nonce, tx_to, tx_gaslimit, tx_gasprice, tx_value FROM transactions`)
var transactions []core.Transaction var transactions []core.Transaction
for transactionRows.Next() { for transactionRows.Next() {
var hash string var hash string

View File

@ -3,7 +3,7 @@ package repositories
import "github.com/8thlight/vulcanizedb/pkg/core" import "github.com/8thlight/vulcanizedb/pkg/core"
type Repository interface { type Repository interface {
CreateBlock(block core.Block) CreateBlock(block core.Block) error
BlockCount() int BlockCount() int
FindBlockByNumber(blockNumber int64) *core.Block FindBlockByNumber(blockNumber int64) *core.Block
MaxBlockNumber() int64 MaxBlockNumber() int64

View File

@ -1,6 +1,9 @@
package repositories_test package repositories_test
import ( import (
"fmt"
"strings"
"github.com/8thlight/vulcanizedb/pkg/config" "github.com/8thlight/vulcanizedb/pkg/config"
"github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/repositories" "github.com/8thlight/vulcanizedb/pkg/repositories"
@ -133,6 +136,7 @@ var _ = Describe("Repositories", func() {
Expect(savedTransaction.GasPrice).To(Equal(gasPrice)) Expect(savedTransaction.GasPrice).To(Equal(gasPrice))
Expect(savedTransaction.Value).To(Equal(value)) Expect(savedTransaction.Value).To(Equal(value))
}) })
}) })
Describe("The missing block numbers", func() { Describe("The missing block numbers", func() {
@ -219,6 +223,48 @@ var _ = Describe("Repositories", func() {
Expect(db).ShouldNot(BeNil()) Expect(db).ShouldNot(BeNil())
}) })
It("does not commit block if block is invalid", func() {
//badNonce violates db Nonce field length
badNonce := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badBlock := core.Block{
Number: 123,
Nonce: badNonce,
Transactions: []core.Transaction{},
}
pgConfig := config.DbConnectionString(config.NewConfig("private").Database)
db, _ := sqlx.Connect("postgres", pgConfig)
Expect(db).ShouldNot(BeNil())
repository := repositories.NewPostgres(db)
err := repository.CreateBlock(badBlock)
savedBlock := repository.FindBlockByNumber(123)
Expect(err).ToNot(BeNil())
Expect(savedBlock).To(BeNil())
})
It("does not commit block or transactions if transaction is invalid", func() {
//badHash violates db To field length
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badTransaction := core.Transaction{To: badHash}
pgConfig := config.DbConnectionString(config.NewConfig("private").Database)
block := core.Block{
Number: 123,
Transactions: []core.Transaction{badTransaction},
}
db, _ := sqlx.Connect("postgres", pgConfig)
Expect(db).ShouldNot(BeNil())
repository := repositories.NewPostgres(db)
err := repository.CreateBlock(block)
savedBlock := repository.FindBlockByNumber(123)
Expect(err).ToNot(BeNil())
Expect(savedBlock).To(BeNil())
})
AssertRepositoryBehavior(func() repositories.Repository { AssertRepositoryBehavior(func() repositories.Repository {
pgConfig := config.DbConnectionString(config.NewConfig("private").Database) pgConfig := config.DbConnectionString(config.NewConfig("private").Database)
db, _ := sqlx.Connect("postgres", pgConfig) db, _ := sqlx.Connect("postgres", pgConfig)