Implement Sector Index locks APIs
This commit is contained in:
parent
e4c2bfc150
commit
16fb7cc0da
@ -725,19 +725,158 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
if read|write == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if read|write > (1<<storiface.FileTypes)-1 {
|
||||
return false, xerrors.Errorf("unknown file types specified")
|
||||
}
|
||||
|
||||
var rows []struct {
|
||||
SectorFileType storiface.SectorFileType
|
||||
ReadTs sql.NullTime
|
||||
ReadRefs int
|
||||
WriteTs sql.NullTime
|
||||
}
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
|
||||
fts := (read | write).AllSet()
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
` SELECT sector_filetype, read_ts, read_refs, write_ts
|
||||
FROM sectorlocation
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
AND sector_filetype = ANY($3)`,
|
||||
sector.Miner, sector.Number, fts)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("StorageLock SELECT fails: %v", err)
|
||||
}
|
||||
|
||||
type locks struct {
|
||||
readTs sql.NullTime
|
||||
readRefs int
|
||||
writeTs sql.NullTime
|
||||
}
|
||||
lockMap := make(map[storiface.SectorFileType]locks)
|
||||
for _, row := range rows {
|
||||
lockMap[row.SectorFileType] = locks{
|
||||
readTs: row.ReadTs,
|
||||
readRefs: row.ReadRefs,
|
||||
writeTs: row.WriteTs,
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we can acquire write locks
|
||||
for wft := range write.AllSet() {
|
||||
if lockMap[storiface.SectorFileType(wft)].writeTs.Valid || lockMap[storiface.SectorFileType(wft)].readTs.Valid {
|
||||
return false, xerrors.Errorf("Cannot acquire writelock for sector %v filetype %d already locked", sector, wft)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we can acquire read locks
|
||||
for rft := range read.AllSet() {
|
||||
if lockMap[storiface.SectorFileType(rft)].writeTs.Valid {
|
||||
return false, xerrors.Errorf("Cannot acquire read lock for sector %v filetype %d already locked for writing", sector, rft)
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire write locks
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
SET write_ts = NOW()
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
AND sector_filetype = ANY($3)`,
|
||||
sector.Miner, sector.Number, write.AllSet())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("Acquiring write locks for sector %v fails with err: %v", sector, err)
|
||||
}
|
||||
|
||||
// Acquire read locks
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
SET read_ts = NOW(), read_refs = read_refs + 1
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
AND sector_filetype = ANY($3)`,
|
||||
sector.Miner, sector.Number, read.AllSet())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("Acquiring read locks for sector %v fails with err: %v", sector, err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
// TODO: implementation
|
||||
|
||||
_, err := dbi.lock(ctx, sector, read, write)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
// TODO: implementation
|
||||
return false, nil
|
||||
return dbi.lock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
// TODO: implementation
|
||||
return storiface.SectorLocks{}, nil
|
||||
|
||||
var rows []struct {
|
||||
MinerId uint64
|
||||
SectorNum uint64
|
||||
SectorFileType int
|
||||
ReadTs sql.NullTime
|
||||
ReadRefs int
|
||||
WriteTs sql.NullTime
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, &rows,
|
||||
"SELECT miner_id, sector_num, sector_filetype, read_ts, read_refs, write_ts FROM sectorlocation")
|
||||
if err != nil {
|
||||
return storiface.SectorLocks{}, err
|
||||
}
|
||||
|
||||
type locks struct {
|
||||
sectorFileType storiface.SectorFileType
|
||||
readRefs uint
|
||||
writeLk bool
|
||||
}
|
||||
sectorLocks := make(map[abi.SectorID]locks)
|
||||
for _, row := range rows {
|
||||
sector := abi.SectorID{
|
||||
Miner: abi.ActorID(row.MinerId),
|
||||
Number: abi.SectorNumber(row.SectorNum),
|
||||
}
|
||||
sectorLocks[sector] = locks{
|
||||
sectorFileType: storiface.SectorFileType(row.SectorFileType),
|
||||
readRefs: uint(row.ReadRefs),
|
||||
writeLk: row.WriteTs.Valid,
|
||||
}
|
||||
}
|
||||
|
||||
var result storiface.SectorLocks
|
||||
for sector, locks := range sectorLocks {
|
||||
var lock storiface.SectorLock
|
||||
lock.Sector = sector
|
||||
lock.Read[locks.sectorFileType] = locks.readRefs
|
||||
if locks.writeLk {
|
||||
lock.Write[locks.sectorFileType] = 1
|
||||
} else {
|
||||
lock.Write[locks.sectorFileType] = 0
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var _ SectorIndex = &DBIndex{}
|
||||
|
@ -16,9 +16,9 @@ const (
|
||||
AcquireCopy AcquireMode = "copy"
|
||||
)
|
||||
|
||||
type Refs struct {
|
||||
RefCount [FileTypes]uint
|
||||
}
|
||||
//type Refs struct {
|
||||
// RefCount [FileTypes]uint
|
||||
//}
|
||||
|
||||
type SectorLock struct {
|
||||
Sector abi.SectorID
|
||||
|
Loading…
Reference in New Issue
Block a user