address more comments

This commit is contained in:
Shrenuj Bansal 2023-08-16 10:57:08 -04:00
parent 28efd718c7
commit cdf90b83ed
3 changed files with 51 additions and 37 deletions

View File

@ -7,8 +7,6 @@ create table SectorLocation
"is_primary" bool,
constraint SectorLocation_pk
primary key ("miner_id", "sector_num", "sector_filetype", "storage_id")
-- TODO: Maybe add index on above PK fields
);

View File

@ -109,6 +109,10 @@ type MinerSubsystemConfig struct {
EnableSealing bool
EnableSectorStorage bool
EnableMarkets bool
// When enabled, the sector index will reside in an external database
// as opposed to the local KV store in the miner process
// This is useful to allow workers to bypass the lotus miner to access sector information
EnableSectorIndexDB bool
SealerApiInfo string // if EnableSealing == false

View File

@ -351,17 +351,22 @@ func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, re
return nil
}
func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
// function to check if a filetype is valid
func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool {
ftValid := false
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
for _, fileTypeValid := range storiface.PathTypes {
if fileTypeValid&fileType == 0 {
ftValid = true
break
}
}
if !ftValid {
return xerrors.Errorf("Invalid filetype")
return ftValid
}
func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
if !dbi.checkFileType(ft) {
return xerrors.Errorf("invalid filetype")
}
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
@ -406,15 +411,8 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
ftValid := false
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
ftValid = true
break
}
}
if !ftValid {
return xerrors.Errorf("Invalid filetype")
if !dbi.checkFileType(ft) {
return xerrors.Errorf("invalid filetype")
}
_, err := dbi.harmonyDB.Exec(ctx,
@ -659,7 +657,7 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
case storiface.PathStorage:
spaceReq, err = allocate.StoreSpaceUse(ssize)
default:
return nil, xerrors.Errorf("Unexpected path type")
return nil, xerrors.Errorf("unexpected path type")
}
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
@ -728,6 +726,10 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
// timeout after which we consider a lock to be stale
const LockTimeOut = 300 * time.Second
func isLocked(ts sql.NullTime) bool {
return ts.Valid && ts.Time.After(time.Now().Add(-LockTimeOut))
}
func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
if read|write == 0 {
return false, nil
@ -747,7 +749,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
fts := (read | write).AllSet()
err = dbi.harmonyDB.Select(ctx, &rows,
` SELECT sector_filetype, read_ts, read_refs, write_ts
`SELECT sector_filetype, read_ts, read_refs, write_ts
FROM sectorlocation
WHERE miner_id=$1
AND sector_num=$2
@ -773,21 +775,17 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
// Check if we can acquire write locks
// Conditions: No write lock or write lock is stale, No read lock or read lock is stale
for wft := range write.AllSet() {
if (lockMap[storiface.SectorFileType(wft)].writeTs.Valid &&
lockMap[storiface.SectorFileType(wft)].writeTs.Time.After(time.Now().Add(LockTimeOut))) ||
(lockMap[storiface.SectorFileType(wft)].readTs.Valid &&
lockMap[storiface.SectorFileType(wft)].writeTs.Time.After(time.Now().Add(LockTimeOut))) {
return false, xerrors.Errorf("Cannot acquire writelock for sector %v filetype %d already locked", sector, wft)
for _, wft := range write.AllSet() {
if isLocked(lockMap[wft].writeTs) || isLocked(lockMap[wft].readTs) {
return false, xerrors.Errorf("cannot acquire writelock for sector %v filetype %d already locked", sector, wft)
}
}
// Check if we can acquire read locks
// Conditions: No write lock or write lock is stale
for rft := range read.AllSet() {
if lockMap[storiface.SectorFileType(rft)].writeTs.Valid &&
lockMap[storiface.SectorFileType(rft)].writeTs.Time.After(time.Now().Add(LockTimeOut)) {
return false, xerrors.Errorf("Cannot acquire read lock for sector %v filetype %d already locked for writing", sector, rft)
for _, rft := range read.AllSet() {
if isLocked(lockMap[rft].writeTs) {
return false, xerrors.Errorf("cannot acquire read lock for sector %v filetype %d already locked for writing", sector, rft)
}
}
@ -800,7 +798,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, write.AllSet())
if err != nil {
return false, xerrors.Errorf("Acquiring write locks for sector %v fails with err: %v", sector, err)
return false, xerrors.Errorf("acquiring write locks for sector %v fails with err: %v", sector, err)
}
// Acquire read locks
@ -812,7 +810,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, read.AllSet())
if err != nil {
return false, xerrors.Errorf("Acquiring read locks for sector %v fails with err: %v", sector, err)
return false, xerrors.Errorf("acquiring read locks for sector %v fails with err: %v", sector, err)
}
return true, nil
@ -826,12 +824,19 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
_, err := dbi.lock(ctx, sector, read, write)
if err != nil {
return err
retries := 5
waitTime := 1
var err error
for i := 0; i < retries; i++ {
_, err := dbi.lock(ctx, sector, read, write)
if err == nil {
break
}
time.Sleep(time.Duration(waitTime) * time.Second)
}
return nil
return err
}
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
@ -866,10 +871,16 @@ func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks,
Miner: abi.ActorID(row.MinerId),
Number: abi.SectorNumber(row.SectorNum),
}
var readRefs uint
if isLocked(row.ReadTs) {
readRefs = uint(row.ReadRefs)
}
sectorLocks[sector] = locks{
sectorFileType: storiface.SectorFileType(row.SectorFileType),
readRefs: uint(row.ReadRefs),
writeLk: row.WriteTs.Valid,
readRefs: readRefs,
writeLk: isLocked(row.WriteTs),
}
}
@ -883,6 +894,7 @@ func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks,
} else {
lock.Write[locks.sectorFileType] = 0
}
result.Locks = append(result.Locks, lock)
}
return result, nil