fix a couple of bugs and address comments
This commit is contained in:
parent
9262eb4fc9
commit
95709a0583
@ -827,7 +827,9 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) {
|
||||
func (dbi *DBIndex) unlock(sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
if read|write == 0 {
|
||||
return false, nil
|
||||
}
|
||||
@ -840,11 +842,10 @@ func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storif
|
||||
_, err := dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sector_location
|
||||
SET write_ts = NULL, write_lock_owner = NULL
|
||||
WHERE miner_id=$2
|
||||
AND sector_num=$3
|
||||
AND write_lock_owner=$4
|
||||
AND sector_filetype = ANY($5)`,
|
||||
lockUuid.String(),
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
AND write_lock_owner=$3
|
||||
AND sector_filetype = ANY($4)`,
|
||||
sector.Miner,
|
||||
sector.Number,
|
||||
lockUuid.String(),
|
||||
@ -875,27 +876,40 @@ func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storif
|
||||
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
|
||||
retries := 5
|
||||
waitTime := 1
|
||||
maxWaitTime := 300 // Set max wait time to 5 minutes
|
||||
|
||||
waitTime := 1
|
||||
// generate uuid for this lock owner
|
||||
lockUuid := uuid.New()
|
||||
|
||||
var err error
|
||||
for i := 0; i < retries; i++ {
|
||||
_, err := dbi.lock(ctx, sector, read, write, lockUuid)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(waitTime) * time.Second)
|
||||
}
|
||||
if err != nil {
|
||||
// retry with exponential backoff and block until lock is acquired
|
||||
for {
|
||||
locked, err := dbi.lock(ctx, sector, read, write, lockUuid)
|
||||
// if err is not nil and is not because we cannot acquire lock, retry
|
||||
if err != nil && !strings.Contains(err.Error(), "cannot acquire") {
|
||||
retries--
|
||||
if retries == 0 {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if locked {
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(time.Duration(waitTime) * time.Second):
|
||||
if waitTime < maxWaitTime {
|
||||
waitTime *= 2
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
bgCtx := context.Background()
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_, err := dbi.unlock(bgCtx, sector, read, write, lockUuid)
|
||||
_, err := dbi.unlock(sector, read, write, lockUuid)
|
||||
if err != nil {
|
||||
log.Errorf("unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v", sector, read, write, err)
|
||||
}
|
||||
@ -915,7 +929,7 @@ func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, rea
|
||||
if locked {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_, err := dbi.unlock(ctx, sector, read, write, lockUuid)
|
||||
_, err := dbi.unlock(sector, read, write, lockUuid)
|
||||
if err != nil {
|
||||
log.Errorf("unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v", sector, read, write, err)
|
||||
}
|
||||
|
@ -88,14 +88,23 @@ func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.S
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageLock(ctx, sector, read, write)
|
||||
}
|
||||
return ip.memIndex.StorageLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageTryLock(ctx, sector, read, write)
|
||||
}
|
||||
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageGetLocks(ctx)
|
||||
}
|
||||
return ip.memIndex.StorageGetLocks(ctx)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user