harmony: BeginTransaction Retry Serialization option

This commit is contained in:
Łukasz Magiera 2024-02-20 13:14:58 +01:00
parent 648ac7adaf
commit 94a31c6db6
2 changed files with 44 additions and 2 deletions

View File

@ -36,7 +36,7 @@ type DB struct {
schema string schema string
hostnames []string hostnames []string
BTFPOnce sync.Once BTFPOnce sync.Once
BTFP atomic.Uintptr BTFP atomic.Uintptr // BeginTransactionFramePointer
} }
var logger = logging.Logger("harmonydb") var logger = logging.Logger("harmonydb")

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"runtime" "runtime"
"time"
"github.com/georgysavva/scany/v2/pgxscan" "github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgerrcode" "github.com/jackc/pgerrcode"
@ -129,6 +130,25 @@ func (db *DB) usedInTransaction() bool {
return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there. return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there.
} }
type TransactionOptions struct {
RetrySerializationError bool
InitialSerializationErrorRetryWait time.Duration
}
type TransactionOption func(*TransactionOptions)
func RetrySerializationErr() TransactionOption {
return func(o *TransactionOptions) {
o.RetrySerializationError = true
}
}
func InitialSerializationErrorRetryWait(d time.Duration) TransactionOption {
return func(o *TransactionOptions) {
o.InitialSerializationErrorRetryWait = d
}
}
// BeginTransaction is how you can access transactions using this library. // BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in. // The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur. // The return must be true or a rollback will occur.
@ -137,7 +157,7 @@ func (db *DB) usedInTransaction() bool {
// when there is a DB serialization error. // when there is a DB serialization error.
// //
//go:noinline //go:noinline
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error), opt ...TransactionOption) (didCommit bool, retErr error) {
db.BTFPOnce.Do(func() { db.BTFPOnce.Do(func() {
fp := make([]uintptr, 20) fp := make([]uintptr, 20)
runtime.Callers(1, fp) runtime.Callers(1, fp)
@ -146,6 +166,28 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er
if db.usedInTransaction() { if db.usedInTransaction() {
return false, errTx return false, errTx
} }
opts := TransactionOptions{
RetrySerializationError: false,
InitialSerializationErrorRetryWait: 10 * time.Millisecond,
}
for _, o := range opt {
o(&opts)
}
retry:
comm, err := db.transactionInner(ctx, f)
if err != nil && opts.RetrySerializationError && IsErrSerialization(err) {
time.Sleep(opts.InitialSerializationErrorRetryWait)
opts.InitialSerializationErrorRetryWait *= 2
goto retry
}
return comm, err
}
func (db *DB) transactionInner(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil { if err != nil {
return false, err return false, err