Merge pull request #11501 from filecoin-project/serializationFix
fix: lotus-provider: Serialization fix
This commit is contained in:
commit
c912f3bcd5
@ -82,6 +82,8 @@ var wdPostTaskCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("cannot get miner id %w", err)
|
return xerrors.Errorf("cannot get miner id %w", err)
|
||||||
}
|
}
|
||||||
var id int64
|
var id int64
|
||||||
|
retryDelay := time.Millisecond * 10
|
||||||
|
retryAddTask:
|
||||||
_, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
|
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -102,6 +104,11 @@ var wdPostTaskCmd = &cli.Command{
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
retryDelay *= 2
|
||||||
|
goto retryAddTask
|
||||||
|
}
|
||||||
return xerrors.Errorf("writing SQL transaction: %w", err)
|
return xerrors.Errorf("writing SQL transaction: %w", err)
|
||||||
}
|
}
|
||||||
fmt.Printf("Inserted task %v. Waiting for success ", id)
|
fmt.Printf("Inserted task %v. Waiting for success ", id)
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -33,6 +35,8 @@ type DB struct {
|
|||||||
cfg *pgxpool.Config
|
cfg *pgxpool.Config
|
||||||
schema string
|
schema string
|
||||||
hostnames []string
|
hostnames []string
|
||||||
|
BTFPOnce sync.Once
|
||||||
|
BTFP atomic.Uintptr
|
||||||
}
|
}
|
||||||
|
|
||||||
var logger = logging.Logger("harmonydb")
|
var logger = logging.Logger("harmonydb")
|
||||||
|
@ -3,13 +3,17 @@ package harmonydb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/georgysavva/scany/v2/pgxscan"
|
"github.com/georgysavva/scany/v2/pgxscan"
|
||||||
"github.com/jackc/pgerrcode"
|
"github.com/jackc/pgerrcode"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
"github.com/samber/lo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errTx = errors.New("Cannot use a non-transaction func in a transaction")
|
||||||
|
|
||||||
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
|
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
|
||||||
// In any package, raw strings will satisfy compilation. Ex:
|
// In any package, raw strings will satisfy compilation. Ex:
|
||||||
//
|
//
|
||||||
@ -22,6 +26,9 @@ type rawStringOnly string
|
|||||||
// Note, for CREATE & DROP please keep these permanent and express
|
// Note, for CREATE & DROP please keep these permanent and express
|
||||||
// them in the ./sql/ files (next number).
|
// them in the ./sql/ files (next number).
|
||||||
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
|
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return 0, errTx
|
||||||
|
}
|
||||||
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
|
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
|
||||||
return int(res.RowsAffected()), err
|
return int(res.RowsAffected()), err
|
||||||
}
|
}
|
||||||
@ -55,6 +62,9 @@ type Query struct {
|
|||||||
// fmt.Println(id, name)
|
// fmt.Println(id, name)
|
||||||
// }
|
// }
|
||||||
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
|
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return &Query{}, errTx
|
||||||
|
}
|
||||||
q, err := db.pgx.Query(ctx, string(sql), arguments...)
|
q, err := db.pgx.Query(ctx, string(sql), arguments...)
|
||||||
return &Query{q}, err
|
return &Query{q}, err
|
||||||
}
|
}
|
||||||
@ -66,6 +76,10 @@ type Row interface {
|
|||||||
Scan(...any) error
|
Scan(...any) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rowErr struct{}
|
||||||
|
|
||||||
|
func (rowErr) Scan(_ ...any) error { return errTx }
|
||||||
|
|
||||||
// QueryRow gets 1 row using column order matching.
|
// QueryRow gets 1 row using column order matching.
|
||||||
// This is a timesaver for the special case of wanting the first row returned only.
|
// This is a timesaver for the special case of wanting the first row returned only.
|
||||||
// EX:
|
// EX:
|
||||||
@ -74,6 +88,9 @@ type Row interface {
|
|||||||
// var ID = 123
|
// var ID = 123
|
||||||
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
|
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
|
||||||
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
|
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return rowErr{}
|
||||||
|
}
|
||||||
return db.pgx.QueryRow(ctx, string(sql), arguments...)
|
return db.pgx.QueryRow(ctx, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,6 +109,9 @@ Ex:
|
|||||||
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
||||||
*/
|
*/
|
||||||
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
|
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return errTx
|
||||||
|
}
|
||||||
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
|
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,10 +120,32 @@ type Tx struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// usedInTransaction is a helper to prevent nesting transactions
|
||||||
|
// & non-transaction calls in transactions. It only checks 20 frames.
|
||||||
|
// Fast: This memory should all be in CPU Caches.
|
||||||
|
func (db *DB) usedInTransaction() bool {
|
||||||
|
var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc)
|
||||||
|
framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller.
|
||||||
|
return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there.
|
||||||
|
}
|
||||||
|
|
||||||
// BeginTransaction is how you can access transactions using this library.
|
// BeginTransaction is how you can access transactions using this library.
|
||||||
// The entire transaction happens in the function passed in.
|
// The entire transaction happens in the function passed in.
|
||||||
// The return must be true or a rollback will occur.
|
// The return must be true or a rollback will occur.
|
||||||
|
// Be sure to test the error for IsErrSerialization() if you want to retry
|
||||||
|
//
|
||||||
|
// when there is a DB serialization error.
|
||||||
|
//
|
||||||
|
//go:noinline
|
||||||
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
||||||
|
db.BTFPOnce.Do(func() {
|
||||||
|
fp := make([]uintptr, 20)
|
||||||
|
runtime.Callers(1, fp)
|
||||||
|
db.BTFP.Store(fp[0])
|
||||||
|
})
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return false, errTx
|
||||||
|
}
|
||||||
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -156,3 +198,8 @@ func IsErrUniqueContraint(err error) bool {
|
|||||||
var e2 *pgconn.PgError
|
var e2 *pgconn.PgError
|
||||||
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsErrSerialization(err error) bool {
|
||||||
|
var e2 *pgconn.PgError
|
||||||
|
return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure
|
||||||
|
}
|
||||||
|
@ -25,6 +25,8 @@ type taskTypeHandler struct {
|
|||||||
|
|
||||||
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
|
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
|
||||||
var tID TaskID
|
var tID TaskID
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryAddTask:
|
||||||
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
// create taskID (from DB)
|
// create taskID (from DB)
|
||||||
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
|
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
|
||||||
@ -44,6 +46,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
|
|||||||
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
|
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryAddTask
|
||||||
|
}
|
||||||
log.Error("Could not add task. AddTasFunc failed: %v", err)
|
log.Error("Could not add task. AddTasFunc failed: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -161,7 +168,8 @@ top:
|
|||||||
|
|
||||||
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
|
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
|
||||||
workEnd := time.Now()
|
workEnd := time.Now()
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryRecordCompletion:
|
||||||
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
var postedTime time.Time
|
var postedTime time.Time
|
||||||
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
|
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
|
||||||
@ -214,6 +222,11 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryRecordCompletion
|
||||||
|
}
|
||||||
log.Error("Could not record transaction: ", err)
|
log.Error("Could not record transaction: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -180,12 +180,13 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryAttachStorage:
|
||||||
// Single transaction to attach storage which is not present in the DB
|
// Single transaction to attach storage which is not present in the DB
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
|
||||||
var urls sql.NullString
|
var urls sql.NullString
|
||||||
var storageId sql.NullString
|
var storageId sql.NullString
|
||||||
err = dbi.harmonyDB.QueryRow(ctx,
|
err = tx.QueryRow(
|
||||||
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||||
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
||||||
@ -200,7 +201,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
currUrls = union(currUrls, si.URLs)
|
currUrls = union(currUrls, si.URLs)
|
||||||
|
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
||||||
strings.Join(currUrls, ","),
|
strings.Join(currUrls, ","),
|
||||||
si.Weight,
|
si.Weight,
|
||||||
@ -220,7 +221,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert storage id
|
// Insert storage id
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"INSERT INTO storage_path "+
|
"INSERT INTO storage_path "+
|
||||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
||||||
si.ID,
|
si.ID,
|
||||||
@ -245,6 +246,11 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryAttachStorage
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,22 +290,29 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
|
|||||||
|
|
||||||
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
||||||
} else {
|
} else {
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryDropPath:
|
||||||
// Single transaction to drop storage path and sector decls which have this as a storage path
|
// Single transaction to drop storage path and sector decls which have this as a storage path
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
// Drop storage path completely
|
// Drop storage path completely
|
||||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id)
|
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop all sectors entries which use this storage path
|
// Drop all sectors entries which use this storage path
|
||||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id)
|
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryDropPath
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Warnw("Dropping sector storage", "path", id)
|
log.Warnw("Dropping sector storage", "path", id)
|
||||||
@ -373,9 +386,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
return xerrors.Errorf("invalid filetype")
|
return xerrors.Errorf("invalid filetype")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryStorageDeclareSector:
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
var currPrimary sql.NullBool
|
var currPrimary sql.NullBool
|
||||||
err = dbi.harmonyDB.QueryRow(ctx,
|
err = tx.QueryRow(
|
||||||
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||||
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
||||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||||
@ -385,7 +400,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
// If storage id already exists for this sector, update primary if need be
|
// If storage id already exists for this sector, update primary if need be
|
||||||
if currPrimary.Valid {
|
if currPrimary.Valid {
|
||||||
if !currPrimary.Bool && primary {
|
if !currPrimary.Bool && primary {
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||||
s.Miner, s.Number, ft, storageID)
|
s.Miner, s.Number, ft, storageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -395,7 +410,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
log.Warnf("sector %v redeclared in %s", s, storageID)
|
log.Warnf("sector %v redeclared in %s", s, storageID)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"INSERT INTO sector_location "+
|
"INSERT INTO sector_location "+
|
||||||
"values($1, $2, $3, $4, $5)",
|
"values($1, $2, $3, $4, $5)",
|
||||||
s.Miner, s.Number, ft, storageID, primary)
|
s.Miner, s.Number, ft, storageID, primary)
|
||||||
@ -407,6 +422,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryStorageDeclareSector
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -750,7 +770,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
|
||||||
fts := (read | write).AllSet()
|
fts := (read | write).AllSet()
|
||||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
err = tx.Select(&rows,
|
||||||
`SELECT sector_filetype, read_ts, read_refs, write_ts
|
`SELECT sector_filetype, read_ts, read_refs, write_ts
|
||||||
FROM sector_location
|
FROM sector_location
|
||||||
WHERE miner_id=$1
|
WHERE miner_id=$1
|
||||||
@ -792,7 +812,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Acquire write locks
|
// Acquire write locks
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
`UPDATE sector_location
|
`UPDATE sector_location
|
||||||
SET write_ts = NOW(), write_lock_owner = $1
|
SET write_ts = NOW(), write_lock_owner = $1
|
||||||
WHERE miner_id=$2
|
WHERE miner_id=$2
|
||||||
@ -807,7 +827,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Acquire read locks
|
// Acquire read locks
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
`UPDATE sector_location
|
`UPDATE sector_location
|
||||||
SET read_ts = NOW(), read_refs = read_refs + 1
|
SET read_ts = NOW(), read_refs = read_refs + 1
|
||||||
WHERE miner_id=$1
|
WHERE miner_id=$1
|
||||||
|
Loading…
Reference in New Issue
Block a user