add unlock

This commit is contained in:
Shrenuj Bansal 2023-08-16 15:36:00 -04:00
parent a249f96341
commit 169f953de4
2 changed files with 106 additions and 18 deletions

View File

@ -1,15 +1,20 @@
create table SectorLocation
create table sectorlocation
(
"miner_id" bigint,
"sector_num" bigint,
"sector_filetype" int,
"storage_id" varchar,
"is_primary" bool,
constraint SectorLocation_pk
primary key ("miner_id", "sector_num", "sector_filetype", "storage_id")
miner_id bigint not null,
sector_num bigint not null,
sector_filetype int not null,
storage_id varchar not null,
is_primary bool,
read_ts timestamp(6),
read_refs int,
write_ts timestamp(6),
write_lock_owner varchar,
constraint sectorlocation_pk
primary key (miner_id, sector_num, sector_filetype, storage_id)
);
create table StorageLocation
(
"storage_id" varchar not null

View File

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/google/uuid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
@ -730,7 +731,7 @@ func isLocked(ts sql.NullTime) bool {
return ts.Valid && ts.Time.After(time.Now().Add(-LockTimeOut))
}
func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) {
if read|write == 0 {
return false, nil
}
@ -792,11 +793,14 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
// Acquire write locks
_, err = dbi.harmonyDB.Exec(ctx,
`UPDATE sectorlocation
SET write_ts = NOW()
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, write.AllSet())
SET write_ts = NOW(), write_lock_owner = $1
WHERE miner_id=$2
AND sector_num=$3
AND sector_filetype = ANY($4)`,
lockUuid.String(),
sector.Miner,
sector.Number,
write.AllSet())
if err != nil {
return false, xerrors.Errorf("acquiring write locks for sector %v fails with err: %v", sector, err)
}
@ -808,7 +812,10 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, read.AllSet())
sector.Miner,
sector.Number,
read.AllSet())
if err != nil {
return false, xerrors.Errorf("acquiring read locks for sector %v fails with err: %v", sector, err)
}
@ -822,25 +829,101 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
return true, nil
}
func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) {
if read|write == 0 {
return false, nil
}
if read|write > (1<<storiface.FileTypes)-1 {
return false, xerrors.Errorf("unknown file types specified")
}
// Relinquish write locks
_, err := dbi.harmonyDB.Exec(ctx,
`UPDATE sectorlocation
SET write_ts = NULL, write_lock_owner = NULL
WHERE miner_id=$2
AND sector_num=$3
AND write_lock_owner=$4
AND sector_filetype = ANY($5)`,
lockUuid.String(),
sector.Miner,
sector.Number,
lockUuid.String(),
write.AllSet())
if err != nil {
return false, xerrors.Errorf("relinquishing write locks for sector %v fails with err: %v", sector, err)
}
// Relinquish read locks
_, err = dbi.harmonyDB.Exec(ctx,
`UPDATE sectorlocation
SET read_refs = read_refs-1,
read_ts = CASE WHEN read_refs - 1 = 0 THEN NULL ELSE read_ts END
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner,
sector.Number,
read.AllSet())
if err != nil {
return false, xerrors.Errorf("relinquishing read locks for sector %v fails with err: %v", sector, err)
}
return true, nil
}
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
retries := 5
waitTime := 1
// generate uuid for this lock owner
lockUuid := uuid.New()
var err error
for i := 0; i < retries; i++ {
_, err := dbi.lock(ctx, sector, read, write)
_, err := dbi.lock(ctx, sector, read, write, lockUuid)
if err == nil {
break
}
time.Sleep(time.Duration(waitTime) * time.Second)
}
if err != nil {
return err
}
return err
go func() {
<-ctx.Done()
_, err := dbi.unlock(ctx, sector, read, write, lockUuid)
if err != nil {
log.Errorf("unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v", sector, read, write, err)
}
}()
return nil
}
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
return dbi.lock(ctx, sector, read, write)
lockUuid := uuid.New()
locked, err := dbi.lock(ctx, sector, read, write, lockUuid)
if err != nil {
return false, err
}
if locked {
go func() {
<-ctx.Done()
_, err := dbi.unlock(ctx, sector, read, write, lockUuid)
if err != nil {
log.Errorf("unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v", sector, read, write, err)
}
}()
}
return locked, nil
}
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {