fix: db serialize txn - retry

This commit is contained in:
Andrew Jackson (Ajax) 2023-12-07 15:32:35 -06:00
parent cf8fed9440
commit 0e49673c49
4 changed files with 79 additions and 14 deletions

View File

@ -82,6 +82,7 @@ var wdPostTaskCmd = &cli.Command{
return xerrors.Errorf("cannot get miner id %w", err)
}
var id int64
retryAddTask:
_, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
if err != nil {
@ -102,6 +103,9 @@ var wdPostTaskCmd = &cli.Command{
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
goto retryAddTask
}
return xerrors.Errorf("writing SQL transaction: %w", err)
}
fmt.Printf("Inserted task %v. Waiting for success ", id)

View File

@ -3,6 +3,7 @@ package harmonydb
import (
"context"
"errors"
"runtime"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgerrcode"
@ -10,6 +11,8 @@ import (
"github.com/jackc/pgx/v5/pgconn"
)
var inTxErr = errors.New("Cannot use a non-transaction func in a transaction")
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
// In any package, raw strings will satisfy compilation. Ex:
//
@ -22,6 +25,9 @@ type rawStringOnly string
// Note, for CREATE & DROP please keep these permanent and express
// them in the ./sql/ files (next number).
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
if usedInTransaction() {
return 0, inTxErr
}
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
return int(res.RowsAffected()), err
}
@ -55,6 +61,9 @@ type Query struct {
// fmt.Println(id, name)
// }
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
if usedInTransaction() {
return &Query{}, inTxErr
}
q, err := db.pgx.Query(ctx, string(sql), arguments...)
return &Query{q}, err
}
@ -66,6 +75,9 @@ type Row interface {
Scan(...any) error
}
type rowErr struct{}
func (rowErr) Scan(..any) error { return inTxErr }
// QueryRow gets 1 row using column order matching.
// This is a timesaver for the special case of wanting the first row returned only.
// EX:
@ -74,7 +86,10 @@ type Row interface {
// var ID = 123
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
return db.pgx.QueryRow(ctx, string(sql), arguments...)
if usedInTransaction() {
return rowErr{}
}
return db.pgx.QueryRow(ctx, string(sql), arguments...)
}
/*
@ -92,6 +107,9 @@ Ex:
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
*/
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
if usedInTransaction() {
return inTxErr
}
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
}
@ -100,10 +118,30 @@ type Tx struct {
ctx context.Context
}
// usedInTransaction is a helper to prevent nesting transactions
// & non-transaction calls in transactions. In the case of a stack read error,
// it will return false, so only use true for a course of action.
func usedInTransaction() bool {
ok := true
fn := ""
for v:=2; ok; v++ {
_,_,fn,ok = runtime.Caller(v)
if strings.Contains(fn, "BeginTransaction") {
return true
}
}
return false
}
// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
// Be sure to test the error for IsErrSerialization() if you want to retry
// when there is a DB serialization error.
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
if usedInTransaction() {
return 0, inTxErr
}
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
@ -156,3 +194,8 @@ func IsErrUniqueContraint(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
}
func IsErrSerialization(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure
}

View File

@ -25,6 +25,7 @@ type taskTypeHandler struct {
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
var tID TaskID
retryAddTask:
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
// create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
@ -44,6 +45,9 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
return
}
if harmonydb.IsErrSerialization(err) {
goto retryAddTask
}
log.Error("Could not add task. AddTasFunc failed: %v", err)
return
}
@ -161,7 +165,7 @@ top:
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
@ -214,6 +218,9 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
goto retryRecordCompletion
}
log.Error("Could not record transaction: ", err)
return
}

View File

@ -180,12 +180,12 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}
}
retryAttachStorage:
// Single transaction to attach storage which is not present in the DB
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var urls sql.NullString
var storageId sql.NullString
err = dbi.harmonyDB.QueryRow(ctx,
err = tx.QueryRow(
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
return false, xerrors.Errorf("storage attach select fails: %v", err)
@ -200,7 +200,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}
currUrls = union(currUrls, si.URLs)
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
strings.Join(currUrls, ","),
si.Weight,
@ -220,7 +220,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}
// Insert storage id
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"INSERT INTO storage_path "+
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
si.ID,
@ -245,6 +245,9 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
goto retryAttachStorage
}
return err
}
@ -284,22 +287,26 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
} else {
retryDropPath:
// Single transaction to drop storage path and sector decls which have this as a storage path
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// Drop storage path completely
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id)
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id)
if err != nil {
return false, err
}
// Drop all sectors entries which use this storage path
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id)
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id)
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
goto retryDropPath
}
return err
}
log.Warnw("Dropping sector storage", "path", id)
@ -373,9 +380,10 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
return xerrors.Errorf("invalid filetype")
}
retryStorageDeclareSector:
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var currPrimary sql.NullBool
err = dbi.harmonyDB.QueryRow(ctx,
err = tx.QueryRow(
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
@ -385,7 +393,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
// If storage id already exists for this sector, update primary if need be
if currPrimary.Valid {
if !currPrimary.Bool && primary {
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
s.Miner, s.Number, ft, storageID)
if err != nil {
@ -395,7 +403,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
log.Warnf("sector %v redeclared in %s", s, storageID)
}
} else {
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"INSERT INTO sector_location "+
"values($1, $2, $3, $4, $5)",
s.Miner, s.Number, ft, storageID, primary)
@ -407,6 +415,9 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
goto retryStorageDeclareSector
}
return err
}
@ -750,7 +761,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
fts := (read | write).AllSet()
err = dbi.harmonyDB.Select(ctx, &rows,
err = tx.Select(&rows,
`SELECT sector_filetype, read_ts, read_refs, write_ts
FROM sector_location
WHERE miner_id=$1
@ -792,7 +803,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
}
// Acquire write locks
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
`UPDATE sector_location
SET write_ts = NOW(), write_lock_owner = $1
WHERE miner_id=$2
@ -807,7 +818,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
}
// Acquire read locks
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
`UPDATE sector_location
SET read_ts = NOW(), read_refs = read_refs + 1
WHERE miner_id=$1