lotus/storage/paths/db_index.go

892 lines
25 KiB
Go
Raw Normal View History

2023-08-10 22:35:35 +00:00
package paths
import (
"context"
"database/sql"
"fmt"
"net/url"
gopath "path"
"strings"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type DBIndex struct {
alerting *alerting.Alerting
pathAlerts map[storiface.ID]alerting.AlertType
harmonyDB *harmonydb.DB
}
func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
return &DBIndex{
harmonyDB: db,
alerting: al,
pathAlerts: map[storiface.ID]alerting.AlertType{},
}
}
func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
var sectorEntries []struct {
StorageId string
MinerId sql.NullInt64
SectorNum sql.NullInt64
SectorFiletype sql.NullInt32
IsPrimary sql.NullBool
}
err := dbi.harmonyDB.Select(ctx, &sectorEntries,
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storagelocation stor LEFT JOIN sectorlocation sec on stor.storage_id=sec.storage_id")
if err != nil {
2023-08-14 13:24:00 +00:00
return nil, xerrors.Errorf("StorageList DB query fails: %v", err)
2023-08-10 22:35:35 +00:00
}
byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{}
for _, entry := range sectorEntries {
id := storiface.ID(entry.StorageId)
_, ok := byID[id]
if !ok {
byID[id] = map[abi.SectorID]storiface.SectorFileType{}
}
// skip sector info for storage paths with no sectors
if !entry.MinerId.Valid {
continue
}
sectorId := abi.SectorID{
Miner: abi.ActorID(entry.MinerId.Int64),
Number: abi.SectorNumber(entry.SectorNum.Int64),
}
byID[id][sectorId] |= storiface.SectorFileType(entry.SectorFiletype.Int32)
}
out := map[storiface.ID][]storiface.Decl{}
for id, m := range byID {
out[id] = []storiface.Decl{}
for sectorID, fileType := range m {
out[id] = append(out[id], storiface.Decl{
SectorID: sectorID,
SectorFileType: fileType,
})
}
}
return out, nil
}
func union(a, b []string) []string {
m := make(map[string]bool)
for _, elem := range a {
m[elem] = true
}
for _, elem := range b {
if _, ok := m[elem]; !ok {
a = append(a, elem)
}
}
return a
}
func splitString(str string) []string {
if str == "" {
return []string{}
}
return strings.Split(str, ",")
}
func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert {
dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID))
}
var hasConfigIssues bool
for id, typ := range si.AllowTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
//No need to hard-fail here, just warn the user
//(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
hasConfigIssues = true
if dbi.alerting != nil {
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
"message": "bad path type in AllowTypes",
"path": string(si.ID),
"idx": id,
"path_type": typ,
"error": err.Error(),
})
}
continue
}
allow = append(allow, typ)
}
for id, typ := range si.DenyTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
//No need to hard-fail here, just warn the user
hasConfigIssues = true
if dbi.alerting != nil {
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
"message": "bad path type in DenyTypes",
"path": string(si.ID),
"idx": id,
"path_type": typ,
"error": err.Error(),
})
}
continue
}
deny = append(deny, typ)
}
si.AllowTypes = allow
si.DenyTypes = deny
if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) {
dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{
"message": "path config is now correct",
})
}
for _, u := range si.URLs {
if _, err := url.Parse(u); err != nil {
return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err)
}
}
// Single transaction to attach storage which is not present in the DB
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var urls sql.NullString
var storageId sql.NullString
err = dbi.harmonyDB.QueryRow(ctx,
"Select storage_id, urls FROM storagelocation WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("storage attach select fails: %v", err)
2023-08-10 22:35:35 +00:00
}
// Storage ID entry exists
// TODO: Consider using insert into .. on conflict do update set ... below
if storageId.Valid {
var currUrls []string
if urls.Valid {
currUrls = strings.Split(urls.String, ",")
}
currUrls = union(currUrls, si.URLs)
_, err = dbi.harmonyDB.Exec(ctx,
"UPDATE StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
strings.Join(currUrls, ","),
si.Weight,
si.MaxStorage,
si.CanSeal,
si.CanStore,
strings.Join(si.Groups, ","),
strings.Join(si.AllowTo, ","),
strings.Join(si.AllowTypes, ","),
strings.Join(si.DenyTypes, ","),
si.ID)
if err != nil {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("storage attach UPDATE fails: %v", err)
2023-08-10 22:35:35 +00:00
}
return true, nil
}
// Insert storage id
_, err = dbi.harmonyDB.Exec(ctx,
"INSERT INTO StorageLocation "+
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
si.ID,
strings.Join(si.URLs, ","),
si.Weight,
si.MaxStorage,
si.CanSeal,
si.CanStore,
strings.Join(si.Groups, ","),
strings.Join(si.AllowTo, ","),
strings.Join(si.AllowTypes, ","),
strings.Join(si.DenyTypes, ","),
st.Capacity,
st.Available,
st.FSAvailable,
st.Reserved,
st.Used,
time.Now())
if err != nil {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("StorageAttach insert fails: %v", err)
2023-08-10 22:35:35 +00:00
}
return true, nil
})
if err != nil {
return err
}
return nil
}
func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
// If url not in path urls, error out
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
var qUrls string
err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storagelocation WHERE storage_id=$1", string(id)).Scan(&qUrls)
if err != nil {
return err
}
urls := splitString(qUrls)
var modUrls []string
for _, u := range urls {
if u != url {
modUrls = append(modUrls, u)
}
}
// noop if url doesn't exist in urls
if len(modUrls) == len(urls) {
return nil
}
if len(modUrls) > 0 {
newUrls := strings.Join(modUrls, ",")
_, err := dbi.harmonyDB.Exec(ctx, "UPDATE storagelocation set urls=$1 WHERE storage_id=$2", newUrls, id)
if err != nil {
return err
}
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
} else {
// Single transaction to drop storage path and sector decls which have this as a storage path
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// Drop storage path completely
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storagelocation WHERE storage_id=$1", id)
if err != nil {
return false, err
}
// Drop all sectors entries which use this storage path
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sectorlocation WHERE storage_id=$1", id)
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
return err
}
log.Warnw("Dropping sector storage", "path", id)
}
return nil
}
func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
var canSeal, canStore bool
err := dbi.harmonyDB.QueryRow(ctx,
"SELECT can_seal, can_store FROM storagelocation WHERE storage_id=$1", id).Scan(&canSeal, &canStore)
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors.Errorf("Querying for storage id %s fails with err %v", id, err)
2023-08-10 22:35:35 +00:00
}
_, err = dbi.harmonyDB.Exec(ctx,
"UPDATE storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
report.Stat.Capacity,
report.Stat.Available,
report.Stat.FSAvailable,
report.Stat.Reserved,
report.Stat.Used,
time.Now())
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors.Errorf("updating storage health in DB fails with err: %v", err)
2023-08-10 22:35:35 +00:00
}
if report.Stat.Capacity > 0 {
ctx, _ = tag.New(ctx,
tag.Upsert(metrics.StorageID, string(id)),
tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)),
tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)),
)
stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity)))
stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity)))
stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity)))
stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity))
stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable))
stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available))
stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved))
if report.Stat.Max > 0 {
stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max)))
stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used))
stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max))
}
}
return nil
}
func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
ftValid := false
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
ftValid = true
break
}
}
if !ftValid {
return xerrors.Errorf("Invalid filetype")
}
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var currPrimary sql.NullBool
err = dbi.harmonyDB.QueryRow(ctx,
"SELECT is_primary FROM SectorLocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("DB SELECT fails: %v", err)
2023-08-10 22:35:35 +00:00
}
// If storage id already exists for this sector, update primary if need be
if currPrimary.Valid {
if !currPrimary.Bool && primary {
_, err = dbi.harmonyDB.Exec(ctx,
"UPDATE SectorLocation set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
s.Miner, s.Number, ft, storageID)
if err != nil {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("DB update fails: %v", err)
2023-08-10 22:35:35 +00:00
}
} else {
log.Warnf("sector %v redeclared in %s", s, storageID)
}
} else {
_, err = dbi.harmonyDB.Exec(ctx,
"INSERT INTO SectorLocation "+
"values($1, $2, $3, $4, $5)",
s.Miner, s.Number, ft, storageID, primary)
if err != nil {
2023-08-14 17:56:29 +00:00
return false, xerrors.Errorf("DB insert fails: %v", err)
2023-08-10 22:35:35 +00:00
}
}
return true, nil
})
if err != nil {
return err
}
return nil
}
func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
ftValid := false
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
ftValid = true
break
}
}
if !ftValid {
return xerrors.Errorf("Invalid filetype")
}
_, err := dbi.harmonyDB.Exec(ctx,
"DELETE FROM sectorlocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
int(s.Miner), int(s.Number), int(ft), string(storageID))
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors.Errorf("StorageDropSector DELETE query fails: %v", err)
2023-08-10 22:35:35 +00:00
}
return nil
}
func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
var result []storiface.SectorStorageInfo
allowList := make(map[string]struct{})
storageWithSector := map[string]bool{}
type dbRes struct {
StorageId string
Count uint64
IsPrimary bool
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
var rows []dbRes
fts := ft.AllSet()
// Find all storage info which already hold this sector + filetype
err := dbi.harmonyDB.Select(ctx, &rows,
` SELECT DISTINCT ON (stor.storage_id)
stor.storage_id,
COUNT(*) OVER(PARTITION BY stor.storage_id) as count,
BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary,
urls,
weight,
can_seal,
can_store,
groups,
allow_to,
allow_types,
deny_types
FROM sectorlocation sec
JOIN storagelocation stor ON sec.storage_id = stor.storage_id
WHERE sec.miner_id = $1
AND sec.sector_num = $2
AND sec.sector_filetype = ANY($3)
ORDER BY stor.storage_id`,
s.Miner, s.Number, fts)
if err != nil {
2023-08-14 15:35:18 +00:00
return nil, xerrors.Errorf("Finding sector storage from DB fails with err: %v", err)
2023-08-10 22:35:35 +00:00
}
for _, row := range rows {
// Parse all urls
var urls, burls []string
for _, u := range splitString(row.Urls) {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls = append(urls, rl.String())
burls = append(burls, u)
}
result = append(result, storiface.SectorStorageInfo{
ID: storiface.ID(row.StorageId),
URLs: urls,
BaseURLs: burls,
Weight: row.Weight * row.Count,
CanSeal: row.CanSeal,
CanStore: row.CanStore,
Primary: row.IsPrimary,
AllowTypes: splitString(row.AllowTypes),
DenyTypes: splitString(row.DenyTypes),
})
storageWithSector[row.StorageId] = true
allowTo := splitString(row.AllowTo)
if allowList != nil && len(allowTo) > 0 {
for _, group := range allowTo {
allowList[group] = struct{}{}
}
} else {
allowList = nil // allow to any
}
}
// Find all storage paths which can hold this sector if allowFetch is true
if allowFetch {
spaceReq, err := ft.SealSpaceUse(ssize)
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
}
// Conditions to satisfy when choosing a sector
// 1. CanSeal is true
// 2. Available >= spaceReq
// 3. curr_time - last_heartbeat < SkippedHeartbeatThresh
// 4. heartbeat_err is NULL
// 5. not one of the earlier picked storage ids
// 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes)
// 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector
var rows []struct {
StorageId string
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTypes string
DenyTypes string
}
err = dbi.harmonyDB.Select(ctx, &rows,
`SELECT storage_id,
urls,
weight,
can_seal,
can_store,
groups,
allow_types,
deny_types
FROM storagelocation
WHERE can_seal=true
and available >= $1
and NOW()-last_heartbeat < $2
and heartbeat_err is null`,
spaceReq, SkippedHeartbeatThresh)
if err != nil {
2023-08-14 15:35:18 +00:00
return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: %v", err)
2023-08-10 22:35:35 +00:00
}
for _, row := range rows {
if ok := storageWithSector[row.StorageId]; ok {
continue
}
if !ft.AnyAllowed(splitString(row.AllowTypes), splitString(row.DenyTypes)) {
log.Debugf("not selecting on %s, not allowed by file type filters", row.StorageId)
continue
}
if allowList != nil {
groups := splitString(row.Groups)
allow := false
for _, group := range groups {
if _, found := allowList[group]; found {
log.Debugf("path %s in allowed group %s", row.StorageId, group)
allow = true
break
}
}
if !allow {
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.StorageId, allowList, groups)
continue
}
}
var urls, burls []string
for _, u := range splitString(row.Urls) {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls = append(urls, rl.String())
burls = append(burls, u)
}
result = append(result, storiface.SectorStorageInfo{
ID: storiface.ID(row.StorageId),
URLs: urls,
BaseURLs: burls,
Weight: row.Weight * 0,
CanSeal: row.CanSeal,
CanStore: row.CanStore,
Primary: false,
AllowTypes: splitString(row.AllowTypes),
DenyTypes: splitString(row.DenyTypes),
})
}
}
return result, nil
}
func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
var qResults []struct {
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
err := dbi.harmonyDB.Select(ctx, &qResults,
"SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+
"FROM StorageLocation WHERE storage_id=$1", string(id))
if err != nil {
2023-08-14 15:35:18 +00:00
return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: %v", err)
2023-08-10 22:35:35 +00:00
}
var sinfo storiface.StorageInfo
sinfo.ID = id
sinfo.URLs = splitString(qResults[0].Urls)
sinfo.Weight = qResults[0].Weight
sinfo.MaxStorage = qResults[0].MaxStorage
sinfo.CanSeal = qResults[0].CanSeal
sinfo.CanStore = qResults[0].CanStore
sinfo.Groups = splitString(qResults[0].Groups)
sinfo.AllowTo = splitString(qResults[0].AllowTo)
sinfo.AllowTypes = splitString(qResults[0].AllowTypes)
sinfo.DenyTypes = splitString(qResults[0].DenyTypes)
return sinfo, nil
}
func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
var err error
var spaceReq uint64
switch pathType {
case storiface.PathSealing:
spaceReq, err = allocate.SealSpaceUse(ssize)
case storiface.PathStorage:
spaceReq, err = allocate.StoreSpaceUse(ssize)
default:
return nil, xerrors.Errorf("Unexpected path type")
}
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
}
var checkString string
if pathType == storiface.PathSealing {
checkString = "can_seal = TRUE"
} else if pathType == storiface.PathStorage {
checkString = "can_store = TRUE"
}
var rows []struct {
StorageId string
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
sql := fmt.Sprintf(`SELECT storage_id,
urls,
weight,
max_storage,
can_seal,
can_store,
groups,
allow_to,
allow_types,
deny_types
FROM storagelocation
WHERE %s and available >= $1
and NOW()-last_heartbeat < $2
and heartbeat_err is null
order by (available::numeric * weight) desc`, checkString)
err = dbi.harmonyDB.Select(ctx, &rows,
sql, spaceReq, SkippedHeartbeatThresh)
if err != nil {
return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err)
}
var result []storiface.StorageInfo
for _, row := range rows {
result = append(result, storiface.StorageInfo{
ID: storiface.ID(row.StorageId),
URLs: splitString(row.Urls),
Weight: row.Weight,
MaxStorage: row.MaxStorage,
CanSeal: row.CanSeal,
CanStore: row.CanStore,
Groups: splitString(row.Groups),
AllowTo: splitString(row.AllowTo),
AllowTypes: splitString(row.AllowTypes),
DenyTypes: splitString(row.DenyTypes),
})
}
return result, nil
}
2023-08-15 01:47:27 +00:00
// timeout after which we consider a lock to be stale
const LockTimeOut = 300 * time.Second
2023-08-14 22:46:41 +00:00
func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
if read|write == 0 {
return false, nil
}
if read|write > (1<<storiface.FileTypes)-1 {
return false, xerrors.Errorf("unknown file types specified")
}
var rows []struct {
SectorFileType storiface.SectorFileType
ReadTs sql.NullTime
ReadRefs int
WriteTs sql.NullTime
}
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
fts := (read | write).AllSet()
err = dbi.harmonyDB.Select(ctx, &rows,
` SELECT sector_filetype, read_ts, read_refs, write_ts
FROM sectorlocation
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, fts)
if err != nil {
return false, xerrors.Errorf("StorageLock SELECT fails: %v", err)
}
type locks struct {
readTs sql.NullTime
readRefs int
writeTs sql.NullTime
}
lockMap := make(map[storiface.SectorFileType]locks)
for _, row := range rows {
lockMap[row.SectorFileType] = locks{
readTs: row.ReadTs,
readRefs: row.ReadRefs,
writeTs: row.WriteTs,
}
}
// Check if we can acquire write locks
2023-08-15 01:47:27 +00:00
// Conditions: No write lock or write lock is stale, No read lock or read lock is stale
2023-08-14 22:46:41 +00:00
for wft := range write.AllSet() {
2023-08-15 01:47:27 +00:00
if (lockMap[storiface.SectorFileType(wft)].writeTs.Valid &&
lockMap[storiface.SectorFileType(wft)].writeTs.Time.After(time.Now().Add(LockTimeOut))) ||
(lockMap[storiface.SectorFileType(wft)].readTs.Valid &&
lockMap[storiface.SectorFileType(wft)].writeTs.Time.After(time.Now().Add(LockTimeOut))) {
2023-08-14 22:46:41 +00:00
return false, xerrors.Errorf("Cannot acquire writelock for sector %v filetype %d already locked", sector, wft)
}
}
// Check if we can acquire read locks
2023-08-15 01:47:27 +00:00
// Conditions: No write lock or write lock is stale
2023-08-14 22:46:41 +00:00
for rft := range read.AllSet() {
2023-08-15 01:47:27 +00:00
if lockMap[storiface.SectorFileType(rft)].writeTs.Valid &&
lockMap[storiface.SectorFileType(rft)].writeTs.Time.After(time.Now().Add(LockTimeOut)) {
2023-08-14 22:46:41 +00:00
return false, xerrors.Errorf("Cannot acquire read lock for sector %v filetype %d already locked for writing", sector, rft)
}
}
// Acquire write locks
_, err = dbi.harmonyDB.Exec(ctx,
`UPDATE sectorlocation
SET write_ts = NOW()
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, write.AllSet())
if err != nil {
return false, xerrors.Errorf("Acquiring write locks for sector %v fails with err: %v", sector, err)
}
// Acquire read locks
_, err = dbi.harmonyDB.Exec(ctx,
`UPDATE sectorlocation
SET read_ts = NOW(), read_refs = read_refs + 1
WHERE miner_id=$1
AND sector_num=$2
AND sector_filetype = ANY($3)`,
sector.Miner, sector.Number, read.AllSet())
if err != nil {
return false, xerrors.Errorf("Acquiring read locks for sector %v fails with err: %v", sector, err)
}
return true, nil
})
if err != nil {
return false, err
}
return true, nil
}
2023-08-10 22:35:35 +00:00
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
2023-08-14 22:46:41 +00:00
_, err := dbi.lock(ctx, sector, read, write)
if err != nil {
return err
}
2023-08-10 22:35:35 +00:00
return nil
}
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
2023-08-14 22:46:41 +00:00
return dbi.lock(ctx, sector, read, write)
2023-08-10 22:35:35 +00:00
}
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
2023-08-14 22:46:41 +00:00
var rows []struct {
MinerId uint64
SectorNum uint64
SectorFileType int
ReadTs sql.NullTime
ReadRefs int
WriteTs sql.NullTime
}
err := dbi.harmonyDB.Select(ctx, &rows,
"SELECT miner_id, sector_num, sector_filetype, read_ts, read_refs, write_ts FROM sectorlocation")
if err != nil {
return storiface.SectorLocks{}, err
}
type locks struct {
sectorFileType storiface.SectorFileType
readRefs uint
writeLk bool
}
sectorLocks := make(map[abi.SectorID]locks)
for _, row := range rows {
sector := abi.SectorID{
Miner: abi.ActorID(row.MinerId),
Number: abi.SectorNumber(row.SectorNum),
}
sectorLocks[sector] = locks{
sectorFileType: storiface.SectorFileType(row.SectorFileType),
readRefs: uint(row.ReadRefs),
writeLk: row.WriteTs.Valid,
}
}
var result storiface.SectorLocks
for sector, locks := range sectorLocks {
var lock storiface.SectorLock
lock.Sector = sector
lock.Read[locks.sectorFileType] = locks.readRefs
if locks.writeLk {
lock.Write[locks.sectorFileType] = 1
} else {
lock.Write[locks.sectorFileType] = 0
}
}
return result, nil
2023-08-10 22:35:35 +00:00
}
var _ SectorIndex = &DBIndex{}