From 864e8219ae1b4a640eafa6bfe18642affe7c55b8 Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Fri, 4 Aug 2023 11:35:31 -0400 Subject: [PATCH] WIP: sector index yugabyte impl --- cmd/lotus-miner/init.go | 7 +- itests/harmonydb_test.go | 3 +- lib/harmony/harmonydb/userfuncs.go | 2 +- node/builder_miner.go | 7 +- storage/paths/index.go | 772 ++++++++++++++++++++++++++++- 5 files changed, 782 insertions(+), 9 deletions(-) diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index c109e85b9..71d9de3d3 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -7,6 +7,8 @@ import ( "encoding/binary" "encoding/json" "fmt" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + logging "github.com/ipfs/go-log/v2" "net/http" "os" "path/filepath" @@ -463,7 +465,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - si := paths.NewIndex(nil) + harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "", + func(s string) { logging.Logger("harmonydb").Error(s) }) + + si := paths.NewIndex(nil, harmonyDB) lstor, err := paths.NewLocal(ctx, lr, si, nil) if err != nil { diff --git a/itests/harmonydb_test.go b/itests/harmonydb_test.go index b52a2aa8f..3a32ce047 100644 --- a/itests/harmonydb_test.go +++ b/itests/harmonydb_test.go @@ -42,7 +42,8 @@ func TestCrud(t *testing.T) { Animal string `db:"content"` Unpopulated int } - err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch") + contentfilter := []string{"cows", "cats"} + err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch where content = ANY($1)", contentfilter) if err != nil { t.Fatal("Could not select: ", err) } diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 4d35fd8ca..23dd3194b 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -88,7 +88,7 @@ Ex: pet := "cat" err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet) */ -func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { +func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql string, arguments ...any) error { return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) } diff --git a/node/builder_miner.go b/node/builder_miner.go index bd81f4265..a99091059 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -126,6 +126,10 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))), ), + Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { + return harmonydb.NewFromConfigWithITestID(cfg)(id) + }), + If(cfg.Subsystems.EnableSectorStorage, // Sector storage Override(new(*paths.Index), paths.NewIndex), @@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(config.HarmonyDB), cfg.HarmonyDB), Override(new(harmonydb.ITestID), harmonydb.ITestID("")), Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)), - Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { - return harmonydb.NewFromConfigWithITestID(cfg)(id) - }), ) } diff --git a/storage/paths/index.go b/storage/paths/index.go index ce11eec9c..17e78cbde 100644 --- a/storage/paths/index.go +++ b/storage/paths/index.go @@ -2,11 +2,14 @@ package paths import ( "context" + "database/sql" "errors" "fmt" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "net/url" gopath "path" "sort" + "strings" "sync" "time" @@ -71,9 +74,27 @@ type Index struct { sectors map[storiface.Decl][]*declMeta stores map[storiface.ID]*storageEntry + + dbi *DBIndex } -func NewIndex(al *alerting.Alerting) *Index { +type DBIndex struct { + alerting *alerting.Alerting + pathAlerts map[storiface.ID]alerting.AlertType + + harmonyDB *harmonydb.DB +} + +func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { + return &DBIndex{ + harmonyDB: db, + + alerting: al, + pathAlerts: map[storiface.ID]alerting.AlertType{}, + } +} + +func NewIndex(al *alerting.Alerting, db *harmonydb.DB) *Index { return &Index{ indexLocks: &indexLocks{ locks: map[abi.SectorID]*sectorLock{}, @@ -84,9 +105,88 @@ func NewIndex(al *alerting.Alerting) *Index { sectors: map[storiface.Decl][]*declMeta{}, stores: map[storiface.ID]*storageEntry{}, + + dbi: NewDBIndex(al, db), } } +func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { + + var sectorEntries []struct { + Storage_id string + Miner_id sql.NullInt64 + Sector_num sql.NullInt64 + Sector_filetype sql.NullInt32 + Is_primary sql.NullBool + } + + err := dbi.harmonyDB.Select(ctx, §orEntries, + "select stor.storage_id, miner_id, sector_num, sector_filetype, is_primary from storagelocation stor left join sectorlocation sec on stor.storage_id=sec.storage_id") + if err != nil { + return nil, xerrors.Errorf("StorageList DB query fails: ", err) + } + + log.Errorf("Sector entries: ", sectorEntries) + + byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{} + for _, entry := range sectorEntries { + id := storiface.ID(entry.Storage_id) + _, ok := byID[id] + if !ok { + byID[id] = map[abi.SectorID]storiface.SectorFileType{} + } + + // skip sector info for storage paths with no sectors + if !entry.Miner_id.Valid { + continue + } + + sectorId := abi.SectorID{ + Miner: abi.ActorID(entry.Miner_id.Int64), + Number: abi.SectorNumber(entry.Sector_num.Int64), + } + + byID[id][sectorId] |= storiface.SectorFileType(entry.Sector_filetype.Int32) + } + + out := map[storiface.ID][]storiface.Decl{} + for id, m := range byID { + out[id] = []storiface.Decl{} + for sectorID, fileType := range m { + out[id] = append(out[id], storiface.Decl{ + SectorID: sectorID, + SectorFileType: fileType, + }) + } + } + //for _, entry := range sectorEntries { + // + // id := storiface.ID(entry.Storage_id) + // _, ok := out[id] + // if !ok { + // out[id] = []storiface.Decl{} + // } + // + // // skip sector info for storage paths with no sectors + // if !entry.Miner_id.Valid { + // continue + // } + // + // sectorId := abi.SectorID{ + // Miner: abi.ActorID(entry.Miner_id.Int64), + // Number: abi.SectorNumber(entry.Sector_num.Int64), + // } + // decl := storiface.Decl{ + // SectorID: sectorId, + // SectorFileType: storiface.SectorFileType(entry.Sector_filetype.Int32), + // } + // + // out[id] = append(out[id], decl) + //} + + return out, nil +} + func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -113,7 +213,160 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D } } - return out, nil + log.Errorf("In memory index storage list: ", out) + + dbout, err := i.dbi.StorageList(ctx) + if err != nil { + log.Errorf("DB Index storage list error: ", err) + return nil, err + } else { + log.Errorf("DB index storage list: ", dbout) + } + + return dbout, nil +} + +func union(a, b []string) []string { + m := make(map[string]bool) + + for _, elem := range a { + m[elem] = true + } + + for _, elem := range b { + if _, ok := m[elem]; !ok { + a = append(a, elem) + } + } + return a +} + +func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { + var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) + + //if _, hasAlert := dbi.pathAlerts[si.ID]; i.alerting != nil && !hasAlert { + // i.pathAlerts[si.ID] = i.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) + //} + // + //var hasConfigIssues bool + + for _, typ := range si.AllowTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + // No need to hard-fail here, just warn the user + // (note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) + //hasConfigIssues = true + + //if i.alerting != nil { + // i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{ + // "message": "bad path type in AllowTypes", + // "path": string(si.ID), + // "idx": id, + // "path_type": typ, + // "error": err.Error(), + // }) + //} + + continue + } + allow = append(allow, typ) + } + for _, typ := range si.DenyTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + // No need to hard-fail here, just warn the user + //hasConfigIssues = true + + //if i.alerting != nil { + // i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{ + // "message": "bad path type in DenyTypes", + // "path": string(si.ID), + // "idx": id, + // "path_type": typ, + // "error": err.Error(), + // }) + //} + + continue + } + deny = append(deny, typ) + } + si.AllowTypes = allow + si.DenyTypes = deny + + //if i.alerting != nil && !hasConfigIssues && i.alerting.IsRaised(i.pathAlerts[si.ID]) { + // i.alerting.Resolve(i.pathAlerts[si.ID], map[string]string{ + // "message": "path config is now correct", + // }) + //} + + for _, u := range si.URLs { + if _, err := url.Parse(u); err != nil { + return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) + } + } + + var urls sql.NullString + var storageId sql.NullString + err := dbi.harmonyDB.QueryRow(ctx, + "Select storage_id, urls from StorageLocation where storage_id = $1", string(si.ID)).Scan(&storageId, &urls) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return xerrors.Errorf("storage attach select fails: ", err) + } + + // Storage ID entry exists + if storageId.Valid { + var currUrls []string + if urls.Valid { + currUrls = strings.Split(urls.String, ",") + } + currUrls = union(currUrls, si.URLs) + //updatedUrls := strings.Join(currUrls, ",") + + _, err = dbi.harmonyDB.Exec(ctx, + "update StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 where storage_id=$10", + strings.Join(currUrls, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + si.ID) + if err != nil { + return xerrors.Errorf("storage attach update fails: ", err) + } + + return nil + } + + // Insert storage id + _, err = dbi.harmonyDB.Exec(ctx, + "insert into StorageLocation "+ + "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", + si.ID, + strings.Join(si.URLs, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + st.Capacity, + st.Available, + st.FSAvailable, + st.Reserved, + st.Used, + time.Now()) + if err != nil { + log.Errorf("StorageAttach insert fails: ", err) + return xerrors.Errorf("StorageAttach insert fails: ", err) + } + return nil } func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { @@ -215,6 +468,68 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st lastHeartbeat: time.Now(), } + + err := i.dbi.StorageAttach(ctx, si, st) + if err != nil { + print("db index storage attach error: ", err) + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { + + // If url not in path urls, error out + // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path + // record stats: droppedEntries, primaryEntries, droppedDecls + + var qUrls sql.NullString + err := dbi.harmonyDB.QueryRow(ctx, "select urls from storagelocation where storage_id=$1", string(id)).Scan(&qUrls) + if err != nil { + return err + } + + if !qUrls.Valid { + return xerrors.Errorf("No urls available for storage id: ", id) + } + urls := strings.Split(qUrls.String, ",") + + var modUrls []string + for _, u := range urls { + if u != url { + modUrls = append(modUrls, u) + } + } + + // noop if url doesn't exist in urls + if len(modUrls) == len(urls) { + return nil + } + + if len(modUrls) > 0 { + newUrls := strings.Join(modUrls, ",") + _, err := dbi.harmonyDB.Exec(ctx, "update storagelocation set urls=$1 where storage_id=$2", newUrls, id) + if err != nil { + return err + } + + log.Warnw("Dropping sector path endpoint", "path", id, "url", url) + } else { + // Drop storage path completely + _, err := dbi.harmonyDB.Exec(ctx, "delete from storagelocation where storage_id=$1", id) + if err != nil { + return err + } + + // Drop all sectors entries which use this storage path + _, err = dbi.harmonyDB.Exec(ctx, "delete from sectorlocation where storage_id=$1", id) + if err != nil { + return err + } + log.Warnw("Dropping sector storage", "path", id) + } + return nil } @@ -303,6 +618,58 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) log.Warnw("Dropping sector path endpoint", "path", id, "url", url) } + err := i.dbi.StorageDetach(ctx, id, url) + if err != nil { + return xerrors.Errorf("storage detach fails with the error: ", err) + } + + return nil +} + +func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + + var canSeal, canStore bool + err := dbi.harmonyDB.QueryRow(ctx, + "select can_seal, can_store from storagelocation where storage_id=$1", id).Scan(&canSeal, &canStore) + if err != nil { + return xerrors.Errorf("Querying for storage id %d fails with err %w", id, err) + } + + _, err = dbi.harmonyDB.Exec(ctx, + "update storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6", + report.Stat.Capacity, + report.Stat.Available, + report.Stat.FSAvailable, + report.Stat.Reserved, + report.Stat.Used, + time.Now()) + if err != nil { + return xerrors.Errorf("updating storage health in DB fails with err: ", err) + } + + if report.Stat.Capacity > 0 { + ctx, _ = tag.New(ctx, + tag.Upsert(metrics.StorageID, string(id)), + tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)), + tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)), + ) + + stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity))) + + stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity)) + stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable)) + stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available)) + stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved)) + + if report.Stat.Max > 0 { + stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max))) + stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used)) + stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max)) + } + } + return nil } @@ -349,10 +716,60 @@ func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report return nil } +func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + + ftValid := false + for _, fileType := range storiface.PathTypes { + if fileType&ft == 0 { + ftValid = true + break + } + } + if !ftValid { + return xerrors.Errorf("Invalid filetype") + } + + // TODO: make the belowpart of a single transaction + var currPrimary sql.NullBool + err := dbi.harmonyDB.QueryRow(ctx, + "select is_primary from SectorLocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return xerrors.Errorf("DB select fails: ", err) + } + + // If storage id already exists for this sector, update primary if need be + if currPrimary.Valid { + if !currPrimary.Bool && primary { + _, err = dbi.harmonyDB.Exec(ctx, + "update SectorLocation set is_primary = TRUE where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + s.Miner, s.Number, ft, storageID) + if err != nil { + return xerrors.Errorf("DB update fails: ", err) + } + } else { + log.Warnf("sector %v redeclared in %s", s, storageID) + } + } else { + _, err = dbi.harmonyDB.Exec(ctx, + "insert into SectorLocation "+ + "values($1, $2, $3, $4, $5)", + s.Miner, s.Number, ft, storageID, primary) + if err != nil { + return xerrors.Errorf("DB insert fails: ", err) + } + } + + return nil + +} + func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { i.lk.Lock() defer i.lk.Unlock() + log.Errorf("StorageDeclareSector: %v, %v, %v, %v", storageID, s, ft, primary) + loop: for _, fileType := range storiface.PathTypes { if fileType&ft == 0 { @@ -378,6 +795,35 @@ loop: }) } + err := i.dbi.StorageDeclareSector(ctx, storageID, s, ft, primary) + if err != nil { + print("DB index StorageDeclareSector err: ", err) + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { + + ftValid := false + for _, fileType := range storiface.PathTypes { + if fileType&ft == 0 { + ftValid = true + break + } + } + if !ftValid { + return xerrors.Errorf("Invalid filetype") + } + + _, err := dbi.harmonyDB.Exec(ctx, + "delete from sectorlocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + int(s.Miner), int(s.Number), int(ft), string(storageID)) + if err != nil { + return xerrors.Errorf("StorageDropSector delete query fails: ", err) + } + return nil } @@ -385,6 +831,8 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s i.lk.Lock() defer i.lk.Unlock() + log.Errorf("StorageDropSector: %v, %v, %v", storageID, s, ft) + for _, fileType := range storiface.PathTypes { if fileType&ft == 0 { continue @@ -412,13 +860,207 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s i.sectors[d] = rewritten } + err := i.dbi.StorageDropSector(ctx, storageID, s, ft) + if err != nil { + return xerrors.Errorf("DB StorageDropSector fails with err: ", err) + } + return nil } +func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { + + var result []storiface.SectorStorageInfo + + allowList := make(map[string]struct{}) + storageWithSector := map[string]bool{} + + type dbRes struct { + Storage_id string + Count uint64 + Is_primary bool + Urls string + Weight uint64 + Can_seal bool + Can_store bool + Groups string + Allow_to string + Allow_types string + Deny_types string + } + + var rows []dbRes + + fts := ft.AllSet() + // Find all storage info which already hold this sector + filetype + err := dbi.harmonyDB.Select(ctx, &rows, + ` SELECT DISTINCT ON (stor.storage_id) + stor.storage_id, + COUNT(*) OVER(PARTITION BY stor.storage_id) as count, + BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary, + urls, + weight, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + FROM sectorlocation sec + JOIN storagelocation stor ON sec.storage_id = stor.storage_id + WHERE sec.miner_id = $1 + AND sec.sector_num = $2 + AND sec.sector_filetype = ANY($3) + ORDER BY stor.storage_id`, + s.Miner, s.Number, fts) + if err != nil { + return nil, xerrors.Errorf("Finding sector storage from DB fails with err: ", err) + } + + for _, row := range rows { + + // Parse all urls + var urls, burls []string + for _, u := range strings.Split(row.Urls, ",") { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.Storage_id), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight * row.Count, + CanSeal: row.Can_seal, + CanStore: row.Can_store, + Primary: row.Is_primary, + AllowTypes: strings.Split(row.Allow_types, ","), + DenyTypes: strings.Split(row.Deny_types, ","), + }) + + storageWithSector[row.Storage_id] = true + + allowTo := strings.Split(row.Allow_to, ",") + if allowList != nil && len(allowTo) > 0 { + for _, group := range allowTo { + allowList[group] = struct{}{} + } + } else { + allowList = nil // allow to any + } + } + + // Find all storage paths which can hold this sector if allowFetch is true + if allowFetch { + spaceReq, err := ft.SealSpaceUse(ssize) + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + // Conditions to satisfy when choosing a sector + // 1. CanSeal is true + // 2. Available >= spaceReq + // 3. curr_time - last_heartbeat < SkippedHeartbeatThresh + // 4. heartbeat_err is NULL + // 5. not one of the earlier picked storage ids + // 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) + // 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector + + var rows []struct { + Storage_id string + Urls string + Weight uint64 + Can_seal bool + Can_store bool + Groups string + Allow_types string + Deny_types string + } + err = dbi.harmonyDB.Select(ctx, &rows, + `select storage_id, + urls, + weight, + can_seal, + can_store, + groups, + allow_types, + deny_types + from storagelocation + where can_seal=true + and available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null`, + spaceReq, SkippedHeartbeatThresh) + if err != nil { + return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: ", err) + } + + for _, row := range rows { + if ok := storageWithSector[row.Storage_id]; ok { + continue + } + + if !ft.AnyAllowed(strings.Split(row.Allow_types, ","), strings.Split(row.Deny_types, ",")) { + log.Debugf("not selecting on %s, not allowed by file type filters", row.Storage_id) + continue + } + + if allowList != nil { + groups := strings.Split(row.Groups, ",") + allow := false + for _, group := range groups { + if _, found := allowList[group]; found { + log.Debugf("path %s in allowed group %s", row.Storage_id, group) + allow = true + break + } + } + + if !allow { + log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.Storage_id, allowList, groups) + continue + } + } + + var urls, burls []string + for _, u := range strings.Split(row.Urls, ",") { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.Storage_id), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight, + CanSeal: row.Can_seal, + CanStore: row.Can_store, + Primary: false, + AllowTypes: strings.Split(row.Allow_types, ","), + DenyTypes: strings.Split(row.Deny_types, ","), + }) + } + } + + return result, nil +} + func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() + log.Errorf("StorageFindSector: %v, %v, %v, %v, %v", s, ft, ssize, allowFetch, i.sectors) + storageIDs := map[storiface.ID]uint64{} isprimary := map[storiface.ID]bool{} @@ -560,7 +1202,52 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif } } - return out, nil + log.Errorf("StorageFindSector out: ", out) + + dbout, err := i.dbi.StorageFindSector(ctx, s, ft, ssize, allowFetch) + if err != nil { + return nil, xerrors.Errorf("DB StorageFindSector fails err: ", err) + } + + log.Errorf("StorageFindSector dbout: ", dbout) + + return dbout, nil +} + +func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { + + var qResults []struct { + Urls string + Weight uint64 + Max_storage uint64 + Can_seal bool + Can_store bool + Groups string + Allow_to string + Allow_types string + Deny_types string + } + + err := dbi.harmonyDB.Select(ctx, &qResults, + "select urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+ + "from StorageLocation where storage_id=$1", string(id)) + if err != nil { + return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: ", err) + } + + var sinfo storiface.StorageInfo + sinfo.ID = id + sinfo.URLs = strings.Split(qResults[0].Urls, ",") + sinfo.Weight = qResults[0].Weight + sinfo.MaxStorage = qResults[0].Max_storage + sinfo.CanSeal = qResults[0].Can_seal + sinfo.CanStore = qResults[0].Can_store + sinfo.Groups = strings.Split(qResults[0].Groups, ",") + sinfo.AllowTo = strings.Split(qResults[0].Allow_to, ",") + sinfo.AllowTypes = strings.Split(qResults[0].Allow_types, ",") + sinfo.DenyTypes = strings.Split(qResults[0].Deny_types, ",") + + return sinfo, nil } func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { @@ -575,6 +1262,83 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto return *si.info, nil } +func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { + var err error + var spaceReq uint64 + switch pathType { + case storiface.PathSealing: + spaceReq, err = allocate.SealSpaceUse(ssize) + case storiface.PathStorage: + spaceReq, err = allocate.StoreSpaceUse(ssize) + default: + return nil, xerrors.Errorf("Unexpected path type") + } + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + var checkString string + if pathType == storiface.PathSealing { + checkString = "can_seal = TRUE" + } else if pathType == storiface.PathStorage { + checkString = "can_store = TRUE" + } + + var rows []struct { + Storage_id string + Urls string + Weight uint64 + Max_storage uint64 + Can_seal bool + Can_store bool + Groups string + Allow_to string + Allow_types string + Deny_types string + } + + sql := fmt.Sprintf(`select storage_id, + urls, + weight, + max_storage, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + from storagelocation + where %s and available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null + order by available * weight desc`, checkString) + err = dbi.harmonyDB.Select(ctx, &rows, + sql, spaceReq, SkippedHeartbeatThresh) + if err != nil { + return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err) + } + + var result []storiface.StorageInfo + for _, row := range rows { + result = append(result, storiface.StorageInfo{ + ID: storiface.ID(row.Storage_id), + URLs: strings.Split(row.Urls, ","), + Weight: row.Weight, + MaxStorage: row.Max_storage, + CanSeal: row.Can_seal, + CanStore: row.Can_store, + Groups: strings.Split(row.Groups, ","), + AllowTo: strings.Split(row.Allow_to, ","), + AllowTypes: strings.Split(row.Allow_types, ","), + DenyTypes: strings.Split(row.Deny_types, ","), + }) + } + + log.Errorf("StorageBestAlloc result: ", result) + + return result, nil +} + func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -637,6 +1401,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorF out[i] = *candidate.info } + return i.dbi.StorageBestAlloc(ctx, allocate, ssize, pathType) + return out, nil }