package paths import ( "context" "database/sql" "errors" "fmt" "net/url" gopath "path" "strings" "time" "github.com/google/uuid" "go.opencensus.io/stats" "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var errAlreadyLocked = errors.New("already locked") type DBIndex struct { alerting *alerting.Alerting pathAlerts map[storiface.ID]alerting.AlertType harmonyDB *harmonydb.DB } func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { return &DBIndex{ harmonyDB: db, alerting: al, pathAlerts: map[storiface.ID]alerting.AlertType{}, } } func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { var sectorEntries []struct { StorageId string MinerId sql.NullInt64 SectorNum sql.NullInt64 SectorFiletype sql.NullInt32 `db:"sector_filetype"` IsPrimary sql.NullBool } err := dbi.harmonyDB.Select(ctx, §orEntries, "SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storage_path stor LEFT JOIN sector_location sec on stor.storage_id=sec.storage_id") if err != nil { return nil, xerrors.Errorf("StorageList DB query fails: %v", err) } byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{} for _, entry := range sectorEntries { id := storiface.ID(entry.StorageId) _, ok := byID[id] if !ok { byID[id] = map[abi.SectorID]storiface.SectorFileType{} } // skip sector info for storage paths with no sectors if !entry.MinerId.Valid { continue } sectorId := abi.SectorID{ Miner: abi.ActorID(entry.MinerId.Int64), Number: abi.SectorNumber(entry.SectorNum.Int64), } byID[id][sectorId] |= storiface.SectorFileType(entry.SectorFiletype.Int32) } out := map[storiface.ID][]storiface.Decl{} for id, m := range byID { out[id] = []storiface.Decl{} for sectorID, fileType := range m { out[id] = append(out[id], storiface.Decl{ SectorID: sectorID, SectorFileType: fileType, }) } } return out, nil } func union(a, b []string) []string { m := make(map[string]bool) for _, elem := range a { m[elem] = true } for _, elem := range b { if _, ok := m[elem]; !ok { a = append(a, elem) } } return a } func splitString(str string) []string { if str == "" { return []string{} } return strings.Split(str, ",") } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) } var hasConfigIssues bool for id, typ := range si.AllowTypes { _, err := storiface.TypeFromString(typ) if err != nil { //No need to hard-fail here, just warn the user //(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) hasConfigIssues = true if dbi.alerting != nil { dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ "message": "bad path type in AllowTypes", "path": string(si.ID), "idx": id, "path_type": typ, "error": err.Error(), }) } continue } allow = append(allow, typ) } for id, typ := range si.DenyTypes { _, err := storiface.TypeFromString(typ) if err != nil { //No need to hard-fail here, just warn the user hasConfigIssues = true if dbi.alerting != nil { dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ "message": "bad path type in DenyTypes", "path": string(si.ID), "idx": id, "path_type": typ, "error": err.Error(), }) } continue } deny = append(deny, typ) } si.AllowTypes = allow si.DenyTypes = deny if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) { dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{ "message": "path config is now correct", }) } for _, u := range si.URLs { if _, err := url.Parse(u); err != nil { return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) } } // Single transaction to attach storage which is not present in the DB _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { var urls sql.NullString var storageId sql.NullString err = tx.QueryRow( "SELECT storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { return false, xerrors.Errorf("storage attach select fails: %v", err) } // Storage ID entry exists // TODO: Consider using insert into .. on conflict do update set ... below if storageId.Valid { var currUrls []string if urls.Valid { currUrls = strings.Split(urls.String, ",") } currUrls = union(currUrls, si.URLs) _, err = tx.Exec( "UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9, last_heartbeat=NOW() WHERE storage_id=$10", strings.Join(currUrls, ","), si.Weight, si.MaxStorage, si.CanSeal, si.CanStore, strings.Join(si.Groups, ","), strings.Join(si.AllowTo, ","), strings.Join(si.AllowTypes, ","), strings.Join(si.DenyTypes, ","), si.ID) if err != nil { return false, xerrors.Errorf("storage attach UPDATE fails: %v", err) } return true, nil } // Insert storage id _, err = tx.Exec( "INSERT INTO storage_path "+ "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW())", si.ID, strings.Join(si.URLs, ","), si.Weight, si.MaxStorage, si.CanSeal, si.CanStore, strings.Join(si.Groups, ","), strings.Join(si.AllowTo, ","), strings.Join(si.AllowTypes, ","), strings.Join(si.DenyTypes, ","), st.Capacity, st.Available, st.FSAvailable, st.Reserved, st.Used) if err != nil { return false, xerrors.Errorf("StorageAttach insert fails: %v", err) } return true, nil }, harmonydb.OptionRetry()) return err } func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { // If url not in path urls, error out // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path var qUrls string err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storage_path WHERE storage_id=$1", string(id)).Scan(&qUrls) if err != nil { return err } urls := splitString(qUrls) var modUrls []string for _, u := range urls { if u != url { modUrls = append(modUrls, u) } } // noop if url doesn't exist in urls if len(modUrls) == len(urls) { return nil } if len(modUrls) > 0 { newUrls := strings.Join(modUrls, ",") _, err := dbi.harmonyDB.Exec(ctx, "UPDATE storage_path set urls=$1 WHERE storage_id=$2", newUrls, id) if err != nil { return err } log.Warnw("Dropping sector path endpoint", "path", id, "url", url) } else { // Single transaction to drop storage path and sector decls which have this as a storage path _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // Drop storage path completely _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id) if err != nil { return false, err } // Drop all sectors entries which use this storage path _, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id) if err != nil { return false, err } return true, nil }, harmonydb.OptionRetry()) if err != nil { return err } log.Warnw("Dropping sector storage", "path", id) } return err } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { retryWait := time.Millisecond * 20 retryReportHealth: _, err := dbi.harmonyDB.Exec(ctx, "UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=NOW() where storage_id=$6", report.Stat.Capacity, report.Stat.Available, report.Stat.FSAvailable, report.Stat.Reserved, report.Stat.Used, id) if err != nil { //return xerrors.Errorf("updating storage health in DB fails with err: %v", err) if harmonydb.IsErrSerialization(err) { time.Sleep(retryWait) retryWait *= 2 goto retryReportHealth } return err } var canSeal, canStore bool err = dbi.harmonyDB.QueryRow(ctx, "SELECT can_seal, can_store FROM storage_path WHERE storage_id=$1", id).Scan(&canSeal, &canStore) if err != nil { return xerrors.Errorf("Querying for storage id %s fails with err %v", id, err) } if report.Stat.Capacity > 0 { ctx, _ = tag.New(ctx, tag.Upsert(metrics.StorageID, string(id)), tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)), tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)), ) stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity))) stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity))) stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity))) stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity)) stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable)) stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available)) stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved)) if report.Stat.Max > 0 { stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max))) stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used)) stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max)) } } return nil } // function to check if a filetype is valid func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { ftValid := false for _, fileTypeValid := range storiface.PathTypes { if fileTypeValid&fileType == 0 { ftValid = true break } } return ftValid } func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") } _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { var currPrimary sql.NullBool err = tx.QueryRow( "SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { return false, xerrors.Errorf("DB SELECT fails: %v", err) } // If storage id already exists for this sector, update primary if need be if currPrimary.Valid { if !currPrimary.Bool && primary { _, err = tx.Exec( "UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", s.Miner, s.Number, ft, storageID) if err != nil { return false, xerrors.Errorf("DB update fails: %v", err) } } else { log.Warnf("sector %v redeclared in %s", s, storageID) } } else { _, err = tx.Exec( "INSERT INTO sector_location "+ "values($1, $2, $3, $4, $5)", s.Miner, s.Number, ft, storageID, primary) if err != nil { return false, xerrors.Errorf("DB insert fails: %v", err) } } return true, nil }, harmonydb.OptionRetry()) return err } func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") } _, err := dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", int(s.Miner), int(s.Number), int(ft), string(storageID)) if err != nil { return xerrors.Errorf("StorageDropSector DELETE query fails: %v", err) } return nil } func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { var result []storiface.SectorStorageInfo allowList := make(map[string]struct{}) storageWithSector := map[string]bool{} type dbRes struct { StorageId string Count uint64 IsPrimary bool Urls string Weight uint64 CanSeal bool CanStore bool Groups string AllowTo string AllowTypes string DenyTypes string } var rows []dbRes fts := ft.AllSet() // Find all storage info which already hold this sector + filetype err := dbi.harmonyDB.Select(ctx, &rows, ` SELECT DISTINCT ON (stor.storage_id) stor.storage_id, COUNT(*) OVER(PARTITION BY stor.storage_id) as count, BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary, urls, weight, can_seal, can_store, groups, allow_to, allow_types, deny_types FROM sector_location sec JOIN storage_path stor ON sec.storage_id = stor.storage_id WHERE sec.miner_id = $1 AND sec.sector_num = $2 AND sec.sector_filetype = ANY($3) ORDER BY stor.storage_id`, s.Miner, s.Number, fts) if err != nil { return nil, xerrors.Errorf("Finding sector storage from DB fails with err: %v", err) } for _, row := range rows { // Parse all urls var urls, burls []string for _, u := range splitString(row.Urls) { rl, err := url.Parse(u) if err != nil { return nil, xerrors.Errorf("failed to parse url: %w", err) } rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) urls = append(urls, rl.String()) burls = append(burls, u) } result = append(result, storiface.SectorStorageInfo{ ID: storiface.ID(row.StorageId), URLs: urls, BaseURLs: burls, Weight: row.Weight * row.Count, CanSeal: row.CanSeal, CanStore: row.CanStore, Primary: row.IsPrimary, AllowTypes: splitString(row.AllowTypes), DenyTypes: splitString(row.DenyTypes), }) storageWithSector[row.StorageId] = true allowTo := splitString(row.AllowTo) if allowList != nil && len(allowTo) > 0 { for _, group := range allowTo { allowList[group] = struct{}{} } } else { allowList = nil // allow to any } } // Find all storage paths which can hold this sector if allowFetch is true if allowFetch { spaceReq, err := ft.SealSpaceUse(ssize) if err != nil { return nil, xerrors.Errorf("estimating required space: %w", err) } // Conditions to satisfy when choosing a sector // 1. CanSeal is true // 2. Available >= spaceReq // 3. curr_time - last_heartbeat < SkippedHeartbeatThresh // 4. heartbeat_err is NULL // 5. not one of the earlier picked storage ids // 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) // 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector var rows []struct { StorageId string Urls string Weight uint64 CanSeal bool CanStore bool Groups string AllowTypes string DenyTypes string } err = dbi.harmonyDB.Select(ctx, &rows, `SELECT storage_id, urls, weight, can_seal, can_store, groups, allow_types, deny_types FROM storage_path WHERE can_seal=true and available >= $1 and NOW()-($2 * INTERVAL '1 second') < last_heartbeat and heartbeat_err is null`, spaceReq, SkippedHeartbeatThresh.Seconds()) if err != nil { return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: %v", err) } for _, row := range rows { if ok := storageWithSector[row.StorageId]; ok { continue } if !ft.AnyAllowed(splitString(row.AllowTypes), splitString(row.DenyTypes)) { log.Debugf("not selecting on %s, not allowed by file type filters", row.StorageId) continue } if allowList != nil { groups := splitString(row.Groups) allow := false for _, group := range groups { if _, found := allowList[group]; found { log.Debugf("path %s in allowed group %s", row.StorageId, group) allow = true break } } if !allow { log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.StorageId, allowList, groups) continue } } var urls, burls []string for _, u := range splitString(row.Urls) { rl, err := url.Parse(u) if err != nil { return nil, xerrors.Errorf("failed to parse url: %w", err) } rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) urls = append(urls, rl.String()) burls = append(burls, u) } result = append(result, storiface.SectorStorageInfo{ ID: storiface.ID(row.StorageId), URLs: urls, BaseURLs: burls, Weight: row.Weight * 0, CanSeal: row.CanSeal, CanStore: row.CanStore, Primary: false, AllowTypes: splitString(row.AllowTypes), DenyTypes: splitString(row.DenyTypes), }) } } return result, nil } func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { var qResults []struct { Urls string Weight uint64 MaxStorage uint64 CanSeal bool CanStore bool Groups string AllowTo string AllowTypes string DenyTypes string } err := dbi.harmonyDB.Select(ctx, &qResults, "SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+ "FROM storage_path WHERE storage_id=$1", string(id)) if err != nil { return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: %v", err) } var sinfo storiface.StorageInfo sinfo.ID = id sinfo.URLs = splitString(qResults[0].Urls) sinfo.Weight = qResults[0].Weight sinfo.MaxStorage = qResults[0].MaxStorage sinfo.CanSeal = qResults[0].CanSeal sinfo.CanStore = qResults[0].CanStore sinfo.Groups = splitString(qResults[0].Groups) sinfo.AllowTo = splitString(qResults[0].AllowTo) sinfo.AllowTypes = splitString(qResults[0].AllowTypes) sinfo.DenyTypes = splitString(qResults[0].DenyTypes) return sinfo, nil } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { var err error var spaceReq uint64 switch pathType { case storiface.PathSealing: spaceReq, err = allocate.SealSpaceUse(ssize) case storiface.PathStorage: spaceReq, err = allocate.StoreSpaceUse(ssize) default: return nil, xerrors.Errorf("unexpected path type") } if err != nil { return nil, xerrors.Errorf("estimating required space: %w", err) } var rows []struct { StorageId string Urls string Weight uint64 MaxStorage uint64 CanSeal bool CanStore bool Groups string AllowTo string AllowTypes string DenyTypes string } err = dbi.harmonyDB.Select(ctx, &rows, `SELECT storage_id, urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types FROM storage_path WHERE available >= $1 and NOW()-($2 * INTERVAL '1 second') < last_heartbeat and heartbeat_err is null and (($3 and can_seal = TRUE) or ($4 and can_store = TRUE)) order by (available::numeric * weight) desc`, spaceReq, SkippedHeartbeatThresh.Seconds(), pathType == storiface.PathSealing, pathType == storiface.PathStorage, ) if err != nil { return nil, xerrors.Errorf("Querying for best storage sectors fails with err %w: ", err) } var result []storiface.StorageInfo for _, row := range rows { result = append(result, storiface.StorageInfo{ ID: storiface.ID(row.StorageId), URLs: splitString(row.Urls), Weight: row.Weight, MaxStorage: row.MaxStorage, CanSeal: row.CanSeal, CanStore: row.CanStore, Groups: splitString(row.Groups), AllowTo: splitString(row.AllowTo), AllowTypes: splitString(row.AllowTypes), DenyTypes: splitString(row.DenyTypes), }) } return result, nil } // timeout after which we consider a lock to be stale const LockTimeOut = 300 * time.Second func isLocked(ts sql.NullTime) bool { return ts.Valid && ts.Time.After(time.Now().Add(-LockTimeOut)) } func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) { if read|write == 0 { return false, nil } if read|write > (1< (1<