From 77594442471f6512d5657c5be9ca4511eea4f60b Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Thu, 10 Aug 2023 18:35:35 -0400 Subject: [PATCH] address review comments --- itests/harmonydb_test.go | 7 +- lib/harmony/harmonydb/sql/20230712.sql | 31 +- lib/harmony/harmonydb/userfuncs.go | 7 +- storage/paths/db_index.go | 744 +++++++++++++++++++++++ storage/paths/index.go | 808 ------------------------- storage/paths/index_proxy.go | 109 ++++ 6 files changed, 877 insertions(+), 829 deletions(-) create mode 100644 storage/paths/db_index.go create mode 100644 storage/paths/index_proxy.go diff --git a/itests/harmonydb_test.go b/itests/harmonydb_test.go index 3a32ce047..7c0c22d88 100644 --- a/itests/harmonydb_test.go +++ b/itests/harmonydb_test.go @@ -42,8 +42,7 @@ func TestCrud(t *testing.T) { Animal string `db:"content"` Unpopulated int } - contentfilter := []string{"cows", "cats"} - err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch where content = ANY($1)", contentfilter) + err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch") if err != nil { t.Fatal("Could not select: ", err) } @@ -69,7 +68,7 @@ func TestTransaction(t *testing.T) { if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil { t.Fatal("E0", err) } - _, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) { + _, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil { t.Fatal("E1", err) } @@ -91,7 +90,7 @@ func TestTransaction(t *testing.T) { if sum2 != 4+5+6+7+8+9 { t.Fatal("Expected 39, got ", sum2) } - return false // rollback + return false, nil // rollback }) if err != nil { t.Fatal("ET", err) diff --git a/lib/harmony/harmonydb/sql/20230712.sql b/lib/harmony/harmonydb/sql/20230712.sql index 4e2100554..220589803 100644 --- a/lib/harmony/harmonydb/sql/20230712.sql +++ b/lib/harmony/harmonydb/sql/20230712.sql @@ -1,12 +1,14 @@ create table SectorLocation ( - "miner_id" int8, - "sector_num" int8, + "miner_id" bigint, + "sector_num" bigint, "sector_filetype" int, "storage_id" varchar, "is_primary" bool, constraint SectorLocation_pk primary key ("miner_id", "sector_num", "sector_filetype", "storage_id") + + -- TODO: Maybe add index on above PK fields ); @@ -15,22 +17,21 @@ create table StorageLocation "storage_id" varchar not null constraint "StorageLocation_pkey" primary key, - "urls" varchar, - "weight" int8, - "max_storage" int8, + "urls" varchar, -- comma separated list of urls + "weight" bigint, + "max_storage" bigint, "can_seal" bool, "can_store" bool, - "groups" varchar, - "allow_to" varchar, - "allow_types" varchar, - "deny_types" varchar, + "groups" varchar, -- comma separated list of group names + "allow_to" varchar, -- comma separated list of allowed groups + "allow_types" varchar, -- comma separated list of allowed file types + "deny_types" varchar, -- comma separated list of denied file types - "capacity" int8, - "available" int8, - "fs_available" int8, - "reserved" int8, --- "MaxStorage" int8, - "used" int8, + "capacity" bigint, + "available" bigint, + "fs_available" bigint, + "reserved" bigint, + "used" bigint, "last_heartbeat" timestamp(6), "heartbeat_err" varchar ); diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 23dd3194b..66a564568 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -100,7 +100,7 @@ type Tx struct { // BeginTransaction is how you can access transactions using this library. // The entire transaction happens in the function passed in. // The return must be true or a rollback will occur. -func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) { +func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return false, err @@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) ( retErr = tx.Rollback(ctx) } }() - commit = f(&Tx{tx, ctx}) + commit, err = f(&Tx{tx, ctx}) + if err != nil { + return false, err + } if commit { err := tx.Commit(ctx) if err != nil { diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go new file mode 100644 index 000000000..a0bd195ca --- /dev/null +++ b/storage/paths/db_index.go @@ -0,0 +1,744 @@ +package paths + +import ( + "context" + "database/sql" + "fmt" + "net/url" + gopath "path" + "strings" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/journal/alerting" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type DBIndex struct { + alerting *alerting.Alerting + pathAlerts map[storiface.ID]alerting.AlertType + + harmonyDB *harmonydb.DB +} + +func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { + return &DBIndex{ + harmonyDB: db, + + alerting: al, + pathAlerts: map[storiface.ID]alerting.AlertType{}, + } +} + +func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { + + var sectorEntries []struct { + StorageId string + MinerId sql.NullInt64 + SectorNum sql.NullInt64 + SectorFiletype sql.NullInt32 + IsPrimary sql.NullBool + } + + err := dbi.harmonyDB.Select(ctx, §orEntries, + "SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storagelocation stor LEFT JOIN sectorlocation sec on stor.storage_id=sec.storage_id") + if err != nil { + return nil, xerrors.Errorf("StorageList DB query fails: ", err) + } + + byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{} + for _, entry := range sectorEntries { + id := storiface.ID(entry.StorageId) + _, ok := byID[id] + if !ok { + byID[id] = map[abi.SectorID]storiface.SectorFileType{} + } + + // skip sector info for storage paths with no sectors + if !entry.MinerId.Valid { + continue + } + + sectorId := abi.SectorID{ + Miner: abi.ActorID(entry.MinerId.Int64), + Number: abi.SectorNumber(entry.SectorNum.Int64), + } + + byID[id][sectorId] |= storiface.SectorFileType(entry.SectorFiletype.Int32) + } + + out := map[storiface.ID][]storiface.Decl{} + for id, m := range byID { + out[id] = []storiface.Decl{} + for sectorID, fileType := range m { + out[id] = append(out[id], storiface.Decl{ + SectorID: sectorID, + SectorFileType: fileType, + }) + } + } + + return out, nil +} + +func union(a, b []string) []string { + m := make(map[string]bool) + + for _, elem := range a { + m[elem] = true + } + + for _, elem := range b { + if _, ok := m[elem]; !ok { + a = append(a, elem) + } + } + return a +} + +func splitString(str string) []string { + if str == "" { + return []string{} + } + return strings.Split(str, ",") +} + +func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { + var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) + + if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { + dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) + } + + var hasConfigIssues bool + + for id, typ := range si.AllowTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + //No need to hard-fail here, just warn the user + //(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) + hasConfigIssues = true + + if dbi.alerting != nil { + dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ + "message": "bad path type in AllowTypes", + "path": string(si.ID), + "idx": id, + "path_type": typ, + "error": err.Error(), + }) + } + + continue + } + allow = append(allow, typ) + } + for id, typ := range si.DenyTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + //No need to hard-fail here, just warn the user + hasConfigIssues = true + + if dbi.alerting != nil { + dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ + "message": "bad path type in DenyTypes", + "path": string(si.ID), + "idx": id, + "path_type": typ, + "error": err.Error(), + }) + } + + continue + } + deny = append(deny, typ) + } + si.AllowTypes = allow + si.DenyTypes = deny + + if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) { + dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{ + "message": "path config is now correct", + }) + } + + for _, u := range si.URLs { + if _, err := url.Parse(u); err != nil { + return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) + } + } + + // Single transaction to attach storage which is not present in the DB + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + + var urls sql.NullString + var storageId sql.NullString + err = dbi.harmonyDB.QueryRow(ctx, + "Select storage_id, urls FROM storagelocation WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return false, xerrors.Errorf("storage attach select fails: ", err) + } + + // Storage ID entry exists + // TODO: Consider using insert into .. on conflict do update set ... below + if storageId.Valid { + var currUrls []string + if urls.Valid { + currUrls = strings.Split(urls.String, ",") + } + currUrls = union(currUrls, si.URLs) + + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10", + strings.Join(currUrls, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + si.ID) + if err != nil { + return false, xerrors.Errorf("storage attach UPDATE fails: ", err) + } + + return true, nil + } + + // Insert storage id + _, err = dbi.harmonyDB.Exec(ctx, + "INSERT INTO StorageLocation "+ + "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", + si.ID, + strings.Join(si.URLs, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + st.Capacity, + st.Available, + st.FSAvailable, + st.Reserved, + st.Used, + time.Now()) + if err != nil { + log.Errorf("StorageAttach insert fails: ", err) + return false, xerrors.Errorf("StorageAttach insert fails: ", err) + } + return true, nil + }) + if err != nil { + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { + + // If url not in path urls, error out + // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path + + var qUrls string + err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storagelocation WHERE storage_id=$1", string(id)).Scan(&qUrls) + if err != nil { + return err + } + urls := splitString(qUrls) + + var modUrls []string + for _, u := range urls { + if u != url { + modUrls = append(modUrls, u) + } + } + + // noop if url doesn't exist in urls + if len(modUrls) == len(urls) { + return nil + } + + if len(modUrls) > 0 { + newUrls := strings.Join(modUrls, ",") + _, err := dbi.harmonyDB.Exec(ctx, "UPDATE storagelocation set urls=$1 WHERE storage_id=$2", newUrls, id) + if err != nil { + return err + } + + log.Warnw("Dropping sector path endpoint", "path", id, "url", url) + } else { + // Single transaction to drop storage path and sector decls which have this as a storage path + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // Drop storage path completely + _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storagelocation WHERE storage_id=$1", id) + if err != nil { + return false, err + } + + // Drop all sectors entries which use this storage path + _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sectorlocation WHERE storage_id=$1", id) + if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + return err + } + log.Warnw("Dropping sector storage", "path", id) + } + + return nil +} + +func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + + var canSeal, canStore bool + err := dbi.harmonyDB.QueryRow(ctx, + "SELECT can_seal, can_store FROM storagelocation WHERE storage_id=$1", id).Scan(&canSeal, &canStore) + if err != nil { + return xerrors.Errorf("Querying for storage id %d fails with err %w", id, err) + } + + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6", + report.Stat.Capacity, + report.Stat.Available, + report.Stat.FSAvailable, + report.Stat.Reserved, + report.Stat.Used, + time.Now()) + if err != nil { + return xerrors.Errorf("updating storage health in DB fails with err: ", err) + } + + if report.Stat.Capacity > 0 { + ctx, _ = tag.New(ctx, + tag.Upsert(metrics.StorageID, string(id)), + tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)), + tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)), + ) + + stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity))) + + stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity)) + stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable)) + stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available)) + stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved)) + + if report.Stat.Max > 0 { + stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max))) + stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used)) + stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max)) + } + } + + return nil +} + +func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + + ftValid := false + for _, fileType := range storiface.PathTypes { + if fileType&ft == 0 { + ftValid = true + break + } + } + if !ftValid { + return xerrors.Errorf("Invalid filetype") + } + + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + var currPrimary sql.NullBool + err = dbi.harmonyDB.QueryRow(ctx, + "SELECT is_primary FROM SectorLocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return false, xerrors.Errorf("DB SELECT fails: ", err) + } + + // If storage id already exists for this sector, update primary if need be + if currPrimary.Valid { + if !currPrimary.Bool && primary { + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE SectorLocation set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + s.Miner, s.Number, ft, storageID) + if err != nil { + return false, xerrors.Errorf("DB update fails: ", err) + } + } else { + log.Warnf("sector %v redeclared in %s", s, storageID) + } + } else { + _, err = dbi.harmonyDB.Exec(ctx, + "INSERT INTO SectorLocation "+ + "values($1, $2, $3, $4, $5)", + s.Miner, s.Number, ft, storageID, primary) + if err != nil { + return false, xerrors.Errorf("DB insert fails: ", err) + } + } + + return true, nil + }) + if err != nil { + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { + + ftValid := false + for _, fileType := range storiface.PathTypes { + if fileType&ft == 0 { + ftValid = true + break + } + } + if !ftValid { + return xerrors.Errorf("Invalid filetype") + } + + _, err := dbi.harmonyDB.Exec(ctx, + "DELETE FROM sectorlocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + int(s.Miner), int(s.Number), int(ft), string(storageID)) + if err != nil { + return xerrors.Errorf("StorageDropSector DELETE query fails: ", err) + } + + return nil +} + +func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { + + var result []storiface.SectorStorageInfo + + allowList := make(map[string]struct{}) + storageWithSector := map[string]bool{} + + type dbRes struct { + StorageId string + Count uint64 + IsPrimary bool + Urls string + Weight uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + var rows []dbRes + + fts := ft.AllSet() + // Find all storage info which already hold this sector + filetype + err := dbi.harmonyDB.Select(ctx, &rows, + ` SELECT DISTINCT ON (stor.storage_id) + stor.storage_id, + COUNT(*) OVER(PARTITION BY stor.storage_id) as count, + BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary, + urls, + weight, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + FROM sectorlocation sec + JOIN storagelocation stor ON sec.storage_id = stor.storage_id + WHERE sec.miner_id = $1 + AND sec.sector_num = $2 + AND sec.sector_filetype = ANY($3) + ORDER BY stor.storage_id`, + s.Miner, s.Number, fts) + if err != nil { + return nil, xerrors.Errorf("Finding sector storage from DB fails with err: ", err) + } + + for _, row := range rows { + + // Parse all urls + var urls, burls []string + for _, u := range splitString(row.Urls) { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight * row.Count, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Primary: row.IsPrimary, + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + + storageWithSector[row.StorageId] = true + + allowTo := splitString(row.AllowTo) + if allowList != nil && len(allowTo) > 0 { + for _, group := range allowTo { + allowList[group] = struct{}{} + } + } else { + allowList = nil // allow to any + } + } + + // Find all storage paths which can hold this sector if allowFetch is true + if allowFetch { + spaceReq, err := ft.SealSpaceUse(ssize) + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + // Conditions to satisfy when choosing a sector + // 1. CanSeal is true + // 2. Available >= spaceReq + // 3. curr_time - last_heartbeat < SkippedHeartbeatThresh + // 4. heartbeat_err is NULL + // 5. not one of the earlier picked storage ids + // 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) + // 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector + + var rows []struct { + StorageId string + Urls string + Weight uint64 + CanSeal bool + CanStore bool + Groups string + AllowTypes string + DenyTypes string + } + err = dbi.harmonyDB.Select(ctx, &rows, + `SELECT storage_id, + urls, + weight, + can_seal, + can_store, + groups, + allow_types, + deny_types + FROM storagelocation + WHERE can_seal=true + and available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null`, + spaceReq, SkippedHeartbeatThresh) + if err != nil { + return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: ", err) + } + + for _, row := range rows { + if ok := storageWithSector[row.StorageId]; ok { + continue + } + + if !ft.AnyAllowed(splitString(row.AllowTypes), splitString(row.DenyTypes)) { + log.Debugf("not selecting on %s, not allowed by file type filters", row.StorageId) + continue + } + + if allowList != nil { + groups := splitString(row.Groups) + allow := false + for _, group := range groups { + if _, found := allowList[group]; found { + log.Debugf("path %s in allowed group %s", row.StorageId, group) + allow = true + break + } + } + + if !allow { + log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.StorageId, allowList, groups) + continue + } + } + + var urls, burls []string + for _, u := range splitString(row.Urls) { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight * 0, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Primary: false, + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + } + } + + return result, nil +} + +func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { + + var qResults []struct { + Urls string + Weight uint64 + MaxStorage uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + err := dbi.harmonyDB.Select(ctx, &qResults, + "SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+ + "FROM StorageLocation WHERE storage_id=$1", string(id)) + if err != nil { + return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: ", err) + } + + var sinfo storiface.StorageInfo + sinfo.ID = id + sinfo.URLs = splitString(qResults[0].Urls) + sinfo.Weight = qResults[0].Weight + sinfo.MaxStorage = qResults[0].MaxStorage + sinfo.CanSeal = qResults[0].CanSeal + sinfo.CanStore = qResults[0].CanStore + sinfo.Groups = splitString(qResults[0].Groups) + sinfo.AllowTo = splitString(qResults[0].AllowTo) + sinfo.AllowTypes = splitString(qResults[0].AllowTypes) + sinfo.DenyTypes = splitString(qResults[0].DenyTypes) + + return sinfo, nil +} + +func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { + var err error + var spaceReq uint64 + switch pathType { + case storiface.PathSealing: + spaceReq, err = allocate.SealSpaceUse(ssize) + case storiface.PathStorage: + spaceReq, err = allocate.StoreSpaceUse(ssize) + default: + return nil, xerrors.Errorf("Unexpected path type") + } + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + var checkString string + if pathType == storiface.PathSealing { + checkString = "can_seal = TRUE" + } else if pathType == storiface.PathStorage { + checkString = "can_store = TRUE" + } + + var rows []struct { + StorageId string + Urls string + Weight uint64 + MaxStorage uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + sql := fmt.Sprintf(`SELECT storage_id, + urls, + weight, + max_storage, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + FROM storagelocation + WHERE %s and available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null + order by (available::numeric * weight) desc`, checkString) + err = dbi.harmonyDB.Select(ctx, &rows, + sql, spaceReq, SkippedHeartbeatThresh) + if err != nil { + return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err) + } + + var result []storiface.StorageInfo + for _, row := range rows { + result = append(result, storiface.StorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: splitString(row.Urls), + Weight: row.Weight, + MaxStorage: row.MaxStorage, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Groups: splitString(row.Groups), + AllowTo: splitString(row.AllowTo), + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + } + + return result, nil +} + +func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { + // TODO: implementation + return nil +} + +func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { + // TODO: implementation + return false, nil +} + +func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { + // TODO: implementation + return storiface.SectorLocks{}, nil +} + +var _ SectorIndex = &DBIndex{} diff --git a/storage/paths/index.go b/storage/paths/index.go index f17944103..9192ec428 100644 --- a/storage/paths/index.go +++ b/storage/paths/index.go @@ -2,13 +2,11 @@ package paths import ( "context" - "database/sql" "errors" "fmt" "net/url" gopath "path" "sort" - "strings" "sync" "time" @@ -20,7 +18,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/journal/alerting" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -64,87 +61,6 @@ type storageEntry struct { heartbeatErr error } -type IndexProxy struct { - memIndex *Index - dbIndex *DBIndex - enableSectorIndexDB bool -} - -func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageAttach(ctx, info, stat) - } - return ip.memIndex.StorageAttach(ctx, info, stat) -} - -func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDetach(ctx, id, url) - } - return ip.memIndex.StorageDetach(ctx, id, url) -} - -func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageInfo(ctx, id) - } - return ip.memIndex.StorageInfo(ctx, id) -} - -func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageReportHealth(ctx, id, report) - } - return ip.memIndex.StorageReportHealth(ctx, id, report) -} - -func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) - } - return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) -} - -func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft) - } - return ip.memIndex.StorageDropSector(ctx, storageID, s, ft) -} - -func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) - } - return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) -} - -func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) - } - return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) -} - -func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { - return ip.memIndex.StorageLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { - return ip.memIndex.StorageTryLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { - return ip.memIndex.StorageGetLocks(ctx) -} - -func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageList(ctx) - } - return ip.memIndex.StorageList(ctx) -} - type Index struct { *indexLocks lk sync.RWMutex @@ -157,36 +73,6 @@ type Index struct { stores map[storiface.ID]*storageEntry } -type DBIndex struct { - alerting *alerting.Alerting - pathAlerts map[storiface.ID]alerting.AlertType - - harmonyDB *harmonydb.DB -} - -func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return NewIndexProxy(al, db, enableSectorIndexDB) - } -} - -func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy { - return &IndexProxy{ - memIndex: NewIndex(al), - dbIndex: NewDBIndex(al, db), - enableSectorIndexDB: enableSectorIndexDB, - } -} - -func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { - return &DBIndex{ - harmonyDB: db, - - alerting: al, - pathAlerts: map[storiface.ID]alerting.AlertType{}, - } -} - func NewIndex(al *alerting.Alerting) *Index { return &Index{ indexLocks: &indexLocks{ @@ -201,59 +87,6 @@ func NewIndex(al *alerting.Alerting) *Index { } } -func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { - - var sectorEntries []struct { - Storage_id string - Miner_id sql.NullInt64 - Sector_num sql.NullInt64 - Sector_filetype sql.NullInt32 - Is_primary sql.NullBool - } - - err := dbi.harmonyDB.Select(ctx, §orEntries, - "select stor.storage_id, miner_id, sector_num, sector_filetype, is_primary from storagelocation stor left join sectorlocation sec on stor.storage_id=sec.storage_id") - if err != nil { - return nil, xerrors.Errorf("StorageList DB query fails: ", err) - } - - log.Errorf("Sector entries: ", sectorEntries) - - byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{} - for _, entry := range sectorEntries { - id := storiface.ID(entry.Storage_id) - _, ok := byID[id] - if !ok { - byID[id] = map[abi.SectorID]storiface.SectorFileType{} - } - - // skip sector info for storage paths with no sectors - if !entry.Miner_id.Valid { - continue - } - - sectorId := abi.SectorID{ - Miner: abi.ActorID(entry.Miner_id.Int64), - Number: abi.SectorNumber(entry.Sector_num.Int64), - } - - byID[id][sectorId] |= storiface.SectorFileType(entry.Sector_filetype.Int32) - } - - out := map[storiface.ID][]storiface.Decl{} - for id, m := range byID { - out[id] = []storiface.Decl{} - for sectorID, fileType := range m { - out[id] = append(out[id], storiface.Decl{ - SectorID: sectorID, - SectorFileType: fileType, - }) - } - } - - return out, nil -} - func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -283,156 +116,6 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D return out, nil } -func union(a, b []string) []string { - m := make(map[string]bool) - - for _, elem := range a { - m[elem] = true - } - - for _, elem := range b { - if _, ok := m[elem]; !ok { - a = append(a, elem) - } - } - return a -} - -func splitString(str string) []string { - if str == "" { - return []string{} - } - return strings.Split(str, ",") -} - -func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { - var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) - - if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { - dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) - } - - var hasConfigIssues bool - - for id, typ := range si.AllowTypes { - _, err := storiface.TypeFromString(typ) - if err != nil { - //No need to hard-fail here, just warn the user - //(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) - hasConfigIssues = true - - if dbi.alerting != nil { - dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ - "message": "bad path type in AllowTypes", - "path": string(si.ID), - "idx": id, - "path_type": typ, - "error": err.Error(), - }) - } - - continue - } - allow = append(allow, typ) - } - for id, typ := range si.DenyTypes { - _, err := storiface.TypeFromString(typ) - if err != nil { - //No need to hard-fail here, just warn the user - hasConfigIssues = true - - if dbi.alerting != nil { - dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ - "message": "bad path type in DenyTypes", - "path": string(si.ID), - "idx": id, - "path_type": typ, - "error": err.Error(), - }) - } - - continue - } - deny = append(deny, typ) - } - si.AllowTypes = allow - si.DenyTypes = deny - - if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) { - dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{ - "message": "path config is now correct", - }) - } - - for _, u := range si.URLs { - if _, err := url.Parse(u); err != nil { - return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) - } - } - - // TODO: make below part of a single transaction - var urls sql.NullString - var storageId sql.NullString - err := dbi.harmonyDB.QueryRow(ctx, - "Select storage_id, urls from StorageLocation where storage_id = $1", string(si.ID)).Scan(&storageId, &urls) - if err != nil && !strings.Contains(err.Error(), "no rows in result set") { - return xerrors.Errorf("storage attach select fails: ", err) - } - - // Storage ID entry exists - if storageId.Valid { - var currUrls []string - if urls.Valid { - currUrls = strings.Split(urls.String, ",") - } - currUrls = union(currUrls, si.URLs) - - _, err = dbi.harmonyDB.Exec(ctx, - "update StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 where storage_id=$10", - strings.Join(currUrls, ","), - si.Weight, - si.MaxStorage, - si.CanSeal, - si.CanStore, - strings.Join(si.Groups, ","), - strings.Join(si.AllowTo, ","), - strings.Join(si.AllowTypes, ","), - strings.Join(si.DenyTypes, ","), - si.ID) - if err != nil { - return xerrors.Errorf("storage attach update fails: ", err) - } - - return nil - } - - // Insert storage id - _, err = dbi.harmonyDB.Exec(ctx, - "insert into StorageLocation "+ - "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", - si.ID, - strings.Join(si.URLs, ","), - si.Weight, - si.MaxStorage, - si.CanSeal, - si.CanStore, - strings.Join(si.Groups, ","), - strings.Join(si.AllowTo, ","), - strings.Join(si.AllowTypes, ","), - strings.Join(si.DenyTypes, ","), - st.Capacity, - st.Available, - st.FSAvailable, - st.Reserved, - st.Used, - time.Now()) - if err != nil { - log.Errorf("StorageAttach insert fails: ", err) - return xerrors.Errorf("StorageAttach insert fails: ", err) - } - return nil -} - func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) @@ -536,62 +219,6 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st return nil } -func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { - - // If url not in path urls, error out - // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path - - var qUrls sql.NullString - err := dbi.harmonyDB.QueryRow(ctx, "select urls from storagelocation where storage_id=$1", string(id)).Scan(&qUrls) - if err != nil { - return err - } - - if !qUrls.Valid { - return xerrors.Errorf("No urls available for storage id: ", id) - } - urls := splitString(qUrls.String) - - var modUrls []string - for _, u := range urls { - if u != url { - modUrls = append(modUrls, u) - } - } - - // noop if url doesn't exist in urls - if len(modUrls) == len(urls) { - return nil - } - - if len(modUrls) > 0 { - newUrls := strings.Join(modUrls, ",") - _, err := dbi.harmonyDB.Exec(ctx, "update storagelocation set urls=$1 where storage_id=$2", newUrls, id) - if err != nil { - return err - } - - log.Warnw("Dropping sector path endpoint", "path", id, "url", url) - } else { - // Todo: single transaction - - // Drop storage path completely - _, err := dbi.harmonyDB.Exec(ctx, "delete from storagelocation where storage_id=$1", id) - if err != nil { - return err - } - - // Drop all sectors entries which use this storage path - _, err = dbi.harmonyDB.Exec(ctx, "delete from sectorlocation where storage_id=$1", id) - if err != nil { - return err - } - log.Warnw("Dropping sector storage", "path", id) - } - - return nil -} - func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error { i.lk.Lock() defer i.lk.Unlock() @@ -680,53 +307,6 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) return nil } -func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - - var canSeal, canStore bool - err := dbi.harmonyDB.QueryRow(ctx, - "select can_seal, can_store from storagelocation where storage_id=$1", id).Scan(&canSeal, &canStore) - if err != nil { - return xerrors.Errorf("Querying for storage id %d fails with err %w", id, err) - } - - _, err = dbi.harmonyDB.Exec(ctx, - "update storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6", - report.Stat.Capacity, - report.Stat.Available, - report.Stat.FSAvailable, - report.Stat.Reserved, - report.Stat.Used, - time.Now()) - if err != nil { - return xerrors.Errorf("updating storage health in DB fails with err: ", err) - } - - if report.Stat.Capacity > 0 { - ctx, _ = tag.New(ctx, - tag.Upsert(metrics.StorageID, string(id)), - tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)), - tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)), - ) - - stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity))) - stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity))) - stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity))) - - stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity)) - stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable)) - stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available)) - stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved)) - - if report.Stat.Max > 0 { - stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max))) - stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used)) - stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max)) - } - } - - return nil -} - func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { i.lk.Lock() defer i.lk.Unlock() @@ -770,54 +350,6 @@ func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report return nil } -func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - - ftValid := false - for _, fileType := range storiface.PathTypes { - if fileType&ft == 0 { - ftValid = true - break - } - } - if !ftValid { - return xerrors.Errorf("Invalid filetype") - } - - // TODO: make the belowpart of a single transaction - var currPrimary sql.NullBool - err := dbi.harmonyDB.QueryRow(ctx, - "select is_primary from SectorLocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", - uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) - if err != nil && !strings.Contains(err.Error(), "no rows in result set") { - return xerrors.Errorf("DB select fails: ", err) - } - - // If storage id already exists for this sector, update primary if need be - if currPrimary.Valid { - if !currPrimary.Bool && primary { - _, err = dbi.harmonyDB.Exec(ctx, - "update SectorLocation set is_primary = TRUE where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", - s.Miner, s.Number, ft, storageID) - if err != nil { - return xerrors.Errorf("DB update fails: ", err) - } - } else { - log.Warnf("sector %v redeclared in %s", s, storageID) - } - } else { - _, err = dbi.harmonyDB.Exec(ctx, - "insert into SectorLocation "+ - "values($1, $2, $3, $4, $5)", - s.Miner, s.Number, ft, storageID, primary) - if err != nil { - return xerrors.Errorf("DB insert fails: ", err) - } - } - - return nil - -} - func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { i.lk.Lock() defer i.lk.Unlock() @@ -850,29 +382,6 @@ loop: return nil } -func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { - - ftValid := false - for _, fileType := range storiface.PathTypes { - if fileType&ft == 0 { - ftValid = true - break - } - } - if !ftValid { - return xerrors.Errorf("Invalid filetype") - } - - _, err := dbi.harmonyDB.Exec(ctx, - "delete from sectorlocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", - int(s.Miner), int(s.Number), int(ft), string(storageID)) - if err != nil { - return xerrors.Errorf("StorageDropSector delete query fails: ", err) - } - - return nil -} - func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() @@ -907,193 +416,6 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s return nil } -func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { - - var result []storiface.SectorStorageInfo - - allowList := make(map[string]struct{}) - storageWithSector := map[string]bool{} - - type dbRes struct { - Storage_id string - Count uint64 - Is_primary bool - Urls string - Weight uint64 - Can_seal bool - Can_store bool - Groups string - Allow_to string - Allow_types string - Deny_types string - } - - var rows []dbRes - - fts := ft.AllSet() - // Find all storage info which already hold this sector + filetype - err := dbi.harmonyDB.Select(ctx, &rows, - ` SELECT DISTINCT ON (stor.storage_id) - stor.storage_id, - COUNT(*) OVER(PARTITION BY stor.storage_id) as count, - BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary, - urls, - weight, - can_seal, - can_store, - groups, - allow_to, - allow_types, - deny_types - FROM sectorlocation sec - JOIN storagelocation stor ON sec.storage_id = stor.storage_id - WHERE sec.miner_id = $1 - AND sec.sector_num = $2 - AND sec.sector_filetype = ANY($3) - ORDER BY stor.storage_id`, - s.Miner, s.Number, fts) - if err != nil { - return nil, xerrors.Errorf("Finding sector storage from DB fails with err: ", err) - } - - for _, row := range rows { - - // Parse all urls - var urls, burls []string - for _, u := range splitString(row.Urls) { - rl, err := url.Parse(u) - if err != nil { - return nil, xerrors.Errorf("failed to parse url: %w", err) - } - rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) - urls = append(urls, rl.String()) - burls = append(burls, u) - } - - result = append(result, storiface.SectorStorageInfo{ - ID: storiface.ID(row.Storage_id), - URLs: urls, - BaseURLs: burls, - Weight: row.Weight * row.Count, - CanSeal: row.Can_seal, - CanStore: row.Can_store, - Primary: row.Is_primary, - AllowTypes: splitString(row.Allow_types), - DenyTypes: splitString(row.Deny_types), - }) - - storageWithSector[row.Storage_id] = true - - allowTo := splitString(row.Allow_to) - if allowList != nil && len(allowTo) > 0 { - for _, group := range allowTo { - allowList[group] = struct{}{} - } - } else { - allowList = nil // allow to any - } - } - - // Find all storage paths which can hold this sector if allowFetch is true - if allowFetch { - spaceReq, err := ft.SealSpaceUse(ssize) - if err != nil { - return nil, xerrors.Errorf("estimating required space: %w", err) - } - - // Conditions to satisfy when choosing a sector - // 1. CanSeal is true - // 2. Available >= spaceReq - // 3. curr_time - last_heartbeat < SkippedHeartbeatThresh - // 4. heartbeat_err is NULL - // 5. not one of the earlier picked storage ids - // 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) - // 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector - - var rows []struct { - Storage_id string - Urls string - Weight uint64 - Can_seal bool - Can_store bool - Groups string - Allow_types string - Deny_types string - } - err = dbi.harmonyDB.Select(ctx, &rows, - `select storage_id, - urls, - weight, - can_seal, - can_store, - groups, - allow_types, - deny_types - from storagelocation - where can_seal=true - and available >= $1 - and NOW()-last_heartbeat < $2 - and heartbeat_err is null`, - spaceReq, SkippedHeartbeatThresh) - if err != nil { - return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: ", err) - } - - for _, row := range rows { - if ok := storageWithSector[row.Storage_id]; ok { - continue - } - - if !ft.AnyAllowed(splitString(row.Allow_types), splitString(row.Deny_types)) { - log.Debugf("not selecting on %s, not allowed by file type filters", row.Storage_id) - continue - } - - if allowList != nil { - groups := splitString(row.Groups) - allow := false - for _, group := range groups { - if _, found := allowList[group]; found { - log.Debugf("path %s in allowed group %s", row.Storage_id, group) - allow = true - break - } - } - - if !allow { - log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.Storage_id, allowList, groups) - continue - } - } - - var urls, burls []string - for _, u := range splitString(row.Urls) { - rl, err := url.Parse(u) - if err != nil { - return nil, xerrors.Errorf("failed to parse url: %w", err) - } - rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) - urls = append(urls, rl.String()) - burls = append(burls, u) - } - - result = append(result, storiface.SectorStorageInfo{ - ID: storiface.ID(row.Storage_id), - URLs: urls, - BaseURLs: burls, - Weight: row.Weight, - CanSeal: row.Can_seal, - CanStore: row.Can_store, - Primary: false, - AllowTypes: splitString(row.Allow_types), - DenyTypes: splitString(row.Deny_types), - }) - } - } - - return result, nil -} - func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -1242,42 +564,6 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif return out, nil } -func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { - - var qResults []struct { - Urls string - Weight uint64 - Max_storage uint64 - Can_seal bool - Can_store bool - Groups string - Allow_to string - Allow_types string - Deny_types string - } - - err := dbi.harmonyDB.Select(ctx, &qResults, - "select urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+ - "from StorageLocation where storage_id=$1", string(id)) - if err != nil { - return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: ", err) - } - - var sinfo storiface.StorageInfo - sinfo.ID = id - sinfo.URLs = splitString(qResults[0].Urls) - sinfo.Weight = qResults[0].Weight - sinfo.MaxStorage = qResults[0].Max_storage - sinfo.CanSeal = qResults[0].Can_seal - sinfo.CanStore = qResults[0].Can_store - sinfo.Groups = splitString(qResults[0].Groups) - sinfo.AllowTo = splitString(qResults[0].Allow_to) - sinfo.AllowTypes = splitString(qResults[0].Allow_types) - sinfo.DenyTypes = splitString(qResults[0].Deny_types) - - return sinfo, nil -} - func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -1290,81 +576,6 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto return *si.info, nil } -func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { - var err error - var spaceReq uint64 - switch pathType { - case storiface.PathSealing: - spaceReq, err = allocate.SealSpaceUse(ssize) - case storiface.PathStorage: - spaceReq, err = allocate.StoreSpaceUse(ssize) - default: - return nil, xerrors.Errorf("Unexpected path type") - } - if err != nil { - return nil, xerrors.Errorf("estimating required space: %w", err) - } - - var checkString string - if pathType == storiface.PathSealing { - checkString = "can_seal = TRUE" - } else if pathType == storiface.PathStorage { - checkString = "can_store = TRUE" - } - - var rows []struct { - Storage_id string - Urls string - Weight uint64 - Max_storage uint64 - Can_seal bool - Can_store bool - Groups string - Allow_to string - Allow_types string - Deny_types string - } - - sql := fmt.Sprintf(`select storage_id, - urls, - weight, - max_storage, - can_seal, - can_store, - groups, - allow_to, - allow_types, - deny_types - from storagelocation - where %s and available >= $1 - and NOW()-last_heartbeat < $2 - and heartbeat_err is null - order by (available::numeric * weight) desc`, checkString) - err = dbi.harmonyDB.Select(ctx, &rows, - sql, spaceReq, SkippedHeartbeatThresh) - if err != nil { - return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err) - } - - var result []storiface.StorageInfo - for _, row := range rows { - result = append(result, storiface.StorageInfo{ - ID: storiface.ID(row.Storage_id), - URLs: splitString(row.Urls), - Weight: row.Weight, - MaxStorage: row.Max_storage, - CanSeal: row.Can_seal, - CanStore: row.Can_store, - Groups: splitString(row.Groups), - AllowTo: splitString(row.Allow_to), - AllowTypes: splitString(row.Allow_types), - DenyTypes: splitString(row.Deny_types), - }) - } - - return result, nil -} - func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -1449,23 +660,4 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]sto return out, nil } -func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { - // TODO: implementation - return nil -} - -func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { - // TODO: implementation - return false, nil -} - -func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { - // TODO: implementation - return storiface.SectorLocks{}, nil -} - var _ SectorIndex = &Index{} - -var _ SectorIndex = &DBIndex{} - -var _ SectorIndex = &IndexProxy{} diff --git a/storage/paths/index_proxy.go b/storage/paths/index_proxy.go new file mode 100644 index 000000000..3695c717b --- /dev/null +++ b/storage/paths/index_proxy.go @@ -0,0 +1,109 @@ +package paths + +import ( + "context" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/journal/alerting" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type IndexProxy struct { + memIndex *Index + dbIndex *DBIndex + enableSectorIndexDB bool +} + +func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { + return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { + return NewIndexProxy(al, db, enableSectorIndexDB) + } +} + +func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy { + return &IndexProxy{ + memIndex: NewIndex(al), + dbIndex: NewDBIndex(al, db), + enableSectorIndexDB: enableSectorIndexDB, + } +} + +func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageAttach(ctx, info, stat) + } + return ip.memIndex.StorageAttach(ctx, info, stat) +} + +func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageDetach(ctx, id, url) + } + return ip.memIndex.StorageDetach(ctx, id, url) +} + +func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageInfo(ctx, id) + } + return ip.memIndex.StorageInfo(ctx, id) +} + +func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageReportHealth(ctx, id, report) + } + return ip.memIndex.StorageReportHealth(ctx, id, report) +} + +func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) + } + return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) +} + +func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft) + } + return ip.memIndex.StorageDropSector(ctx, storageID, s, ft) +} + +func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) + } + return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) +} + +func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) + } + return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) +} + +func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { + return ip.memIndex.StorageLock(ctx, sector, read, write) +} + +func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { + return ip.memIndex.StorageTryLock(ctx, sector, read, write) +} + +func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { + return ip.memIndex.StorageGetLocks(ctx) +} + +func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { + if ip.enableSectorIndexDB { + return ip.dbIndex.StorageList(ctx) + } + return ip.memIndex.StorageList(ctx) +} + +var _ SectorIndex = &IndexProxy{}