From 0e49673c4991a0a2ad74d2f8d19a4b154002625f Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 7 Dec 2023 15:32:35 -0600 Subject: [PATCH 1/5] fix: db serialize txn - retry --- cmd/lotus-provider/proving.go | 4 ++ lib/harmony/harmonydb/userfuncs.go | 45 +++++++++++++++++++- lib/harmony/harmonytask/task_type_handler.go | 9 +++- storage/paths/db_index.go | 35 +++++++++------ 4 files changed, 79 insertions(+), 14 deletions(-) diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 577b5b5f9..621896c91 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -82,6 +82,7 @@ var wdPostTaskCmd = &cli.Command{ return xerrors.Errorf("cannot get miner id %w", err) } var id int64 + retryAddTask: _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) if err != nil { @@ -102,6 +103,9 @@ var wdPostTaskCmd = &cli.Command{ return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + goto retryAddTask + } return xerrors.Errorf("writing SQL transaction: %w", err) } fmt.Printf("Inserted task %v. Waiting for success ", id) diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 788ca4a34..add50c269 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -3,6 +3,7 @@ package harmonydb import ( "context" "errors" + "runtime" "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgerrcode" @@ -10,6 +11,8 @@ import ( "github.com/jackc/pgx/v5/pgconn" ) +var inTxErr = errors.New("Cannot use a non-transaction func in a transaction") + // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. // In any package, raw strings will satisfy compilation. Ex: // @@ -22,6 +25,9 @@ type rawStringOnly string // Note, for CREATE & DROP please keep these permanent and express // them in the ./sql/ files (next number). func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) { + if usedInTransaction() { + return 0, inTxErr + } res, err := db.pgx.Exec(ctx, string(sql), arguments...) return int(res.RowsAffected()), err } @@ -55,6 +61,9 @@ type Query struct { // fmt.Println(id, name) // } func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { + if usedInTransaction() { + return &Query{}, inTxErr + } q, err := db.pgx.Query(ctx, string(sql), arguments...) return &Query{q}, err } @@ -66,6 +75,9 @@ type Row interface { Scan(...any) error } +type rowErr struct{} +func (rowErr) Scan(..any) error { return inTxErr } + // QueryRow gets 1 row using column order matching. // This is a timesaver for the special case of wanting the first row returned only. // EX: @@ -74,7 +86,10 @@ type Row interface { // var ID = 123 // err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet) func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row { - return db.pgx.QueryRow(ctx, string(sql), arguments...) + if usedInTransaction() { + return rowErr{} + } + return db.pgx.QueryRow(ctx, string(sql), arguments...) } /* @@ -92,6 +107,9 @@ Ex: err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet) */ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { + if usedInTransaction() { + return inTxErr + } return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) } @@ -100,10 +118,30 @@ type Tx struct { ctx context.Context } +// usedInTransaction is a helper to prevent nesting transactions +// & non-transaction calls in transactions. In the case of a stack read error, +// it will return false, so only use true for a course of action. +func usedInTransaction() bool { + ok := true + fn := "" + for v:=2; ok; v++ { + _,_,fn,ok = runtime.Caller(v) + if strings.Contains(fn, "BeginTransaction") { + return true + } + } + return false +} + // BeginTransaction is how you can access transactions using this library. // The entire transaction happens in the function passed in. // The return must be true or a rollback will occur. +// Be sure to test the error for IsErrSerialization() if you want to retry +// when there is a DB serialization error. func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { + if usedInTransaction() { + return 0, inTxErr + } tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return false, err @@ -156,3 +194,8 @@ func IsErrUniqueContraint(err error) bool { var e2 *pgconn.PgError return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation } + +func IsErrSerialization(err error) bool { + var e2 *pgconn.PgError + return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure +} diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 34f7a5c3e..6091cb61e 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -25,6 +25,7 @@ type taskTypeHandler struct { func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { var tID TaskID +retryAddTask: _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { // create taskID (from DB) _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) @@ -44,6 +45,9 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name) return } + if harmonydb.IsErrSerialization(err) { + goto retryAddTask + } log.Error("Could not add task. AddTasFunc failed: %v", err) return } @@ -161,7 +165,7 @@ top: func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() - +retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) @@ -214,6 +218,9 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + goto retryRecordCompletion + } log.Error("Could not record transaction: ", err) return } diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 1e4abfab1..69a03198d 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -180,12 +180,12 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } } +retryAttachStorage: // Single transaction to attach storage which is not present in the DB _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - var urls sql.NullString var storageId sql.NullString - err = dbi.harmonyDB.QueryRow(ctx, + err = tx.QueryRow( "Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { return false, xerrors.Errorf("storage attach select fails: %v", err) @@ -200,7 +200,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } currUrls = union(currUrls, si.URLs) - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10", strings.Join(currUrls, ","), si.Weight, @@ -220,7 +220,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } // Insert storage id - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "INSERT INTO storage_path "+ "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", si.ID, @@ -245,6 +245,9 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + goto retryAttachStorage + } return err } @@ -284,22 +287,26 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri log.Warnw("Dropping sector path endpoint", "path", id, "url", url) } else { + retryDropPath: // Single transaction to drop storage path and sector decls which have this as a storage path _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // Drop storage path completely - _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id) + _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id) if err != nil { return false, err } // Drop all sectors entries which use this storage path - _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id) + _, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id) if err != nil { return false, err } return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + goto retryDropPath + } return err } log.Warnw("Dropping sector storage", "path", id) @@ -373,9 +380,10 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac return xerrors.Errorf("invalid filetype") } +retryStorageDeclareSector: _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { var currPrimary sql.NullBool - err = dbi.harmonyDB.QueryRow(ctx, + err = tx.QueryRow( "SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { @@ -385,7 +393,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac // If storage id already exists for this sector, update primary if need be if currPrimary.Valid { if !currPrimary.Bool && primary { - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", s.Miner, s.Number, ft, storageID) if err != nil { @@ -395,7 +403,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac log.Warnf("sector %v redeclared in %s", s, storageID) } } else { - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "INSERT INTO sector_location "+ "values($1, $2, $3, $4, $5)", s.Miner, s.Number, ft, storageID, primary) @@ -407,6 +415,9 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + goto retryStorageDeclareSector + } return err } @@ -750,7 +761,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { fts := (read | write).AllSet() - err = dbi.harmonyDB.Select(ctx, &rows, + err = tx.Select(&rows, `SELECT sector_filetype, read_ts, read_refs, write_ts FROM sector_location WHERE miner_id=$1 @@ -792,7 +803,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac } // Acquire write locks - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( `UPDATE sector_location SET write_ts = NOW(), write_lock_owner = $1 WHERE miner_id=$2 @@ -807,7 +818,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac } // Acquire read locks - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( `UPDATE sector_location SET read_ts = NOW(), read_refs = read_refs + 1 WHERE miner_id=$1 From 1e09e1e9661bd67d46f62b050fd1afd622d97223 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 7 Dec 2023 16:01:28 -0600 Subject: [PATCH 2/5] detect unsafe code uses --- lib/harmony/harmonydb/harmonydb.go | 3 ++ lib/harmony/harmonydb/userfuncs.go | 48 ++++++++++++++++-------------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index 0fed176d2..caa5d1daa 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" logging "github.com/ipfs/go-log/v2" @@ -33,6 +34,8 @@ type DB struct { cfg *pgxpool.Config schema string hostnames []string + BTFPOnce sync.Once + BTFP uintptr } var logger = logging.Logger("harmonydb") diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index add50c269..8abce73d0 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/samber/lo" ) var inTxErr = errors.New("Cannot use a non-transaction func in a transaction") @@ -25,7 +26,7 @@ type rawStringOnly string // Note, for CREATE & DROP please keep these permanent and express // them in the ./sql/ files (next number). func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) { - if usedInTransaction() { + if db.usedInTransaction() { return 0, inTxErr } res, err := db.pgx.Exec(ctx, string(sql), arguments...) @@ -61,7 +62,7 @@ type Query struct { // fmt.Println(id, name) // } func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { - if usedInTransaction() { + if db.usedInTransaction() { return &Query{}, inTxErr } q, err := db.pgx.Query(ctx, string(sql), arguments...) @@ -76,7 +77,8 @@ type Row interface { } type rowErr struct{} -func (rowErr) Scan(..any) error { return inTxErr } + +func (rowErr) Scan(_ ...any) error { return inTxErr } // QueryRow gets 1 row using column order matching. // This is a timesaver for the special case of wanting the first row returned only. @@ -86,10 +88,10 @@ func (rowErr) Scan(..any) error { return inTxErr } // var ID = 123 // err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet) func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row { - if usedInTransaction() { + if db.usedInTransaction() { return rowErr{} } - return db.pgx.QueryRow(ctx, string(sql), arguments...) + return db.pgx.QueryRow(ctx, string(sql), arguments...) } /* @@ -107,7 +109,7 @@ Ex: err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet) */ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { - if usedInTransaction() { + if db.usedInTransaction() { return inTxErr } return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) @@ -118,29 +120,31 @@ type Tx struct { ctx context.Context } -// usedInTransaction is a helper to prevent nesting transactions -// & non-transaction calls in transactions. In the case of a stack read error, -// it will return false, so only use true for a course of action. -func usedInTransaction() bool { - ok := true - fn := "" - for v:=2; ok; v++ { - _,_,fn,ok = runtime.Caller(v) - if strings.Contains(fn, "BeginTransaction") { - return true - } - } - return false +// usedInTransaction is a helper to prevent nesting transactions +// & non-transaction calls in transactions. It only checks 20 frames. +// Fast: This memory should all be in CPU Caches. +func (db *DB) usedInTransaction() bool { + var framePtrs = (&[20]uintptr{})[:] + runtime.Callers(3, framePtrs) + return lo.Contains(framePtrs, db.BTFP) } // BeginTransaction is how you can access transactions using this library. // The entire transaction happens in the function passed in. // The return must be true or a rollback will occur. // Be sure to test the error for IsErrSerialization() if you want to retry -// when there is a DB serialization error. +// +// when there is a DB serialization error. +// +//go:noinline func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { - if usedInTransaction() { - return 0, inTxErr + db.BTFPOnce.Do(func() { + fp := make([]uintptr, 20) + runtime.Callers(1, fp) + db.BTFP = fp[0] + }) + if db.usedInTransaction() { + return false, inTxErr } tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { From 9dca4346b18eda186a72a7457bfa55b82b6a7512 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Sat, 9 Dec 2023 11:20:41 -0600 Subject: [PATCH 3/5] fix: lint --- lib/harmony/harmonydb/userfuncs.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 8abce73d0..eb17efd46 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -12,7 +12,7 @@ import ( "github.com/samber/lo" ) -var inTxErr = errors.New("Cannot use a non-transaction func in a transaction") +var errTx = errors.New("Cannot use a non-transaction func in a transaction") // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. // In any package, raw strings will satisfy compilation. Ex: @@ -27,7 +27,7 @@ type rawStringOnly string // them in the ./sql/ files (next number). func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) { if db.usedInTransaction() { - return 0, inTxErr + return 0, errTx } res, err := db.pgx.Exec(ctx, string(sql), arguments...) return int(res.RowsAffected()), err @@ -63,7 +63,7 @@ type Query struct { // } func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { if db.usedInTransaction() { - return &Query{}, inTxErr + return &Query{}, errTx } q, err := db.pgx.Query(ctx, string(sql), arguments...) return &Query{q}, err @@ -78,7 +78,7 @@ type Row interface { type rowErr struct{} -func (rowErr) Scan(_ ...any) error { return inTxErr } +func (rowErr) Scan(_ ...any) error { return errTx } // QueryRow gets 1 row using column order matching. // This is a timesaver for the special case of wanting the first row returned only. @@ -110,7 +110,7 @@ Ex: */ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { if db.usedInTransaction() { - return inTxErr + return errTx } return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) } @@ -124,9 +124,9 @@ type Tx struct { // & non-transaction calls in transactions. It only checks 20 frames. // Fast: This memory should all be in CPU Caches. func (db *DB) usedInTransaction() bool { - var framePtrs = (&[20]uintptr{})[:] - runtime.Callers(3, framePtrs) - return lo.Contains(framePtrs, db.BTFP) + var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc) + runtime.Callers(3, framePtrs) // skip past our caller. + return lo.Contains(framePtrs, db.BTFP) // Unsafe read @ beginTx overlap, but 'return false' is correct there. } // BeginTransaction is how you can access transactions using this library. @@ -144,7 +144,7 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er db.BTFP = fp[0] }) if db.usedInTransaction() { - return false, inTxErr + return false, errTx } tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { From 96353e63ea062212b0227cb69155252764538856 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 11 Dec 2023 10:50:49 -0600 Subject: [PATCH 4/5] exp backoff, short stack err --- cmd/lotus-provider/proving.go | 3 +++ lib/harmony/harmonydb/userfuncs.go | 6 +++--- lib/harmony/harmonytask/task_type_handler.go | 6 ++++++ storage/paths/db_index.go | 9 +++++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 621896c91..025950586 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -82,6 +82,7 @@ var wdPostTaskCmd = &cli.Command{ return xerrors.Errorf("cannot get miner id %w", err) } var id int64 + retryDelay := time.Millisecond * 10 retryAddTask: _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) @@ -104,6 +105,8 @@ var wdPostTaskCmd = &cli.Command{ }) if err != nil { if harmonydb.IsErrSerialization(err) { + time.Sleep(retryDelay) + retryDelay *= 2 goto retryAddTask } return xerrors.Errorf("writing SQL transaction: %w", err) diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index eb17efd46..9e2b5a1b6 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -124,9 +124,9 @@ type Tx struct { // & non-transaction calls in transactions. It only checks 20 frames. // Fast: This memory should all be in CPU Caches. func (db *DB) usedInTransaction() bool { - var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc) - runtime.Callers(3, framePtrs) // skip past our caller. - return lo.Contains(framePtrs, db.BTFP) // Unsafe read @ beginTx overlap, but 'return false' is correct there. + var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc) + framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller. + return lo.Contains(framePtrs, db.BTFP) // Unsafe read @ beginTx overlap, but 'return false' is correct there. } // BeginTransaction is how you can access transactions using this library. diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 6091cb61e..ccfcc1f6f 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -25,6 +25,7 @@ type taskTypeHandler struct { func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { var tID TaskID + retryWait := time.Millisecond * 100 retryAddTask: _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { // create taskID (from DB) @@ -46,6 +47,8 @@ retryAddTask: return } if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 goto retryAddTask } log.Error("Could not add task. AddTasFunc failed: %v", err) @@ -165,6 +168,7 @@ top: func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() + retryWait := time.Millisecond * 100 retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time @@ -219,6 +223,8 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo }) if err != nil { if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 goto retryRecordCompletion } log.Error("Could not record transaction: ", err) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 69a03198d..e6bf3e5da 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -180,6 +180,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } } + retryWait := time.Millisecond * 100 retryAttachStorage: // Single transaction to attach storage which is not present in the DB _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { @@ -246,6 +247,8 @@ retryAttachStorage: }) if err != nil { if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 goto retryAttachStorage } return err @@ -287,6 +290,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri log.Warnw("Dropping sector path endpoint", "path", id, "url", url) } else { + retryWait := time.Millisecond * 100 retryDropPath: // Single transaction to drop storage path and sector decls which have this as a storage path _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { @@ -305,6 +309,8 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri }) if err != nil { if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 goto retryDropPath } return err @@ -380,6 +386,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac return xerrors.Errorf("invalid filetype") } + retryWait := time.Millisecond * 100 retryStorageDeclareSector: _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { var currPrimary sql.NullBool @@ -416,6 +423,8 @@ retryStorageDeclareSector: }) if err != nil { if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 goto retryStorageDeclareSector } return err From de38e77cfc11d471d13eb00c6ae209e6bf8abb03 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 11 Dec 2023 11:31:38 -0600 Subject: [PATCH 5/5] tx-detector use atomic --- lib/harmony/harmonydb/harmonydb.go | 3 ++- lib/harmony/harmonydb/userfuncs.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index caa5d1daa..5ec9f5a25 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" logging "github.com/ipfs/go-log/v2" @@ -35,7 +36,7 @@ type DB struct { schema string hostnames []string BTFPOnce sync.Once - BTFP uintptr + BTFP atomic.Uintptr } var logger = logging.Logger("harmonydb") diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 9e2b5a1b6..7fcf76dcd 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -126,7 +126,7 @@ type Tx struct { func (db *DB) usedInTransaction() bool { var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc) framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller. - return lo.Contains(framePtrs, db.BTFP) // Unsafe read @ beginTx overlap, but 'return false' is correct there. + return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there. } // BeginTransaction is how you can access transactions using this library. @@ -141,7 +141,7 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er db.BTFPOnce.Do(func() { fp := make([]uintptr, 20) runtime.Callers(1, fp) - db.BTFP = fp[0] + db.BTFP.Store(fp[0]) }) if db.usedInTransaction() { return false, errTx