address review comments
This commit is contained in:
parent
84d9dc5dfe
commit
7759444247
@ -42,8 +42,7 @@ func TestCrud(t *testing.T) {
|
||||
Animal string `db:"content"`
|
||||
Unpopulated int
|
||||
}
|
||||
contentfilter := []string{"cows", "cats"}
|
||||
err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch where content = ANY($1)", contentfilter)
|
||||
err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch")
|
||||
if err != nil {
|
||||
t.Fatal("Could not select: ", err)
|
||||
}
|
||||
@ -69,7 +68,7 @@ func TestTransaction(t *testing.T) {
|
||||
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
|
||||
t.Fatal("E0", err)
|
||||
}
|
||||
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
|
||||
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
|
||||
t.Fatal("E1", err)
|
||||
}
|
||||
@ -91,7 +90,7 @@ func TestTransaction(t *testing.T) {
|
||||
if sum2 != 4+5+6+7+8+9 {
|
||||
t.Fatal("Expected 39, got ", sum2)
|
||||
}
|
||||
return false // rollback
|
||||
return false, nil // rollback
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("ET", err)
|
||||
|
@ -1,12 +1,14 @@
|
||||
create table SectorLocation
|
||||
(
|
||||
"miner_id" int8,
|
||||
"sector_num" int8,
|
||||
"miner_id" bigint,
|
||||
"sector_num" bigint,
|
||||
"sector_filetype" int,
|
||||
"storage_id" varchar,
|
||||
"is_primary" bool,
|
||||
constraint SectorLocation_pk
|
||||
primary key ("miner_id", "sector_num", "sector_filetype", "storage_id")
|
||||
|
||||
-- TODO: Maybe add index on above PK fields
|
||||
);
|
||||
|
||||
|
||||
@ -15,22 +17,21 @@ create table StorageLocation
|
||||
"storage_id" varchar not null
|
||||
constraint "StorageLocation_pkey"
|
||||
primary key,
|
||||
"urls" varchar,
|
||||
"weight" int8,
|
||||
"max_storage" int8,
|
||||
"urls" varchar, -- comma separated list of urls
|
||||
"weight" bigint,
|
||||
"max_storage" bigint,
|
||||
"can_seal" bool,
|
||||
"can_store" bool,
|
||||
"groups" varchar,
|
||||
"allow_to" varchar,
|
||||
"allow_types" varchar,
|
||||
"deny_types" varchar,
|
||||
"groups" varchar, -- comma separated list of group names
|
||||
"allow_to" varchar, -- comma separated list of allowed groups
|
||||
"allow_types" varchar, -- comma separated list of allowed file types
|
||||
"deny_types" varchar, -- comma separated list of denied file types
|
||||
|
||||
"capacity" int8,
|
||||
"available" int8,
|
||||
"fs_available" int8,
|
||||
"reserved" int8,
|
||||
-- "MaxStorage" int8,
|
||||
"used" int8,
|
||||
"capacity" bigint,
|
||||
"available" bigint,
|
||||
"fs_available" bigint,
|
||||
"reserved" bigint,
|
||||
"used" bigint,
|
||||
"last_heartbeat" timestamp(6),
|
||||
"heartbeat_err" varchar
|
||||
);
|
||||
|
@ -100,7 +100,7 @@ type Tx struct {
|
||||
// BeginTransaction is how you can access transactions using this library.
|
||||
// The entire transaction happens in the function passed in.
|
||||
// The return must be true or a rollback will occur.
|
||||
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
|
||||
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
||||
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
|
||||
retErr = tx.Rollback(ctx)
|
||||
}
|
||||
}()
|
||||
commit = f(&Tx{tx, ctx})
|
||||
commit, err = f(&Tx{tx, ctx})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if commit {
|
||||
err := tx.Commit(ctx)
|
||||
if err != nil {
|
||||
|
744
storage/paths/db_index.go
Normal file
744
storage/paths/db_index.go
Normal file
@ -0,0 +1,744 @@
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
gopath "path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/journal/alerting"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type DBIndex struct {
|
||||
alerting *alerting.Alerting
|
||||
pathAlerts map[storiface.ID]alerting.AlertType
|
||||
|
||||
harmonyDB *harmonydb.DB
|
||||
}
|
||||
|
||||
func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
|
||||
return &DBIndex{
|
||||
harmonyDB: db,
|
||||
|
||||
alerting: al,
|
||||
pathAlerts: map[storiface.ID]alerting.AlertType{},
|
||||
}
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||
|
||||
var sectorEntries []struct {
|
||||
StorageId string
|
||||
MinerId sql.NullInt64
|
||||
SectorNum sql.NullInt64
|
||||
SectorFiletype sql.NullInt32
|
||||
IsPrimary sql.NullBool
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, §orEntries,
|
||||
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storagelocation stor LEFT JOIN sectorlocation sec on stor.storage_id=sec.storage_id")
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("StorageList DB query fails: ", err)
|
||||
}
|
||||
|
||||
byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{}
|
||||
for _, entry := range sectorEntries {
|
||||
id := storiface.ID(entry.StorageId)
|
||||
_, ok := byID[id]
|
||||
if !ok {
|
||||
byID[id] = map[abi.SectorID]storiface.SectorFileType{}
|
||||
}
|
||||
|
||||
// skip sector info for storage paths with no sectors
|
||||
if !entry.MinerId.Valid {
|
||||
continue
|
||||
}
|
||||
|
||||
sectorId := abi.SectorID{
|
||||
Miner: abi.ActorID(entry.MinerId.Int64),
|
||||
Number: abi.SectorNumber(entry.SectorNum.Int64),
|
||||
}
|
||||
|
||||
byID[id][sectorId] |= storiface.SectorFileType(entry.SectorFiletype.Int32)
|
||||
}
|
||||
|
||||
out := map[storiface.ID][]storiface.Decl{}
|
||||
for id, m := range byID {
|
||||
out[id] = []storiface.Decl{}
|
||||
for sectorID, fileType := range m {
|
||||
out[id] = append(out[id], storiface.Decl{
|
||||
SectorID: sectorID,
|
||||
SectorFileType: fileType,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func union(a, b []string) []string {
|
||||
m := make(map[string]bool)
|
||||
|
||||
for _, elem := range a {
|
||||
m[elem] = true
|
||||
}
|
||||
|
||||
for _, elem := range b {
|
||||
if _, ok := m[elem]; !ok {
|
||||
a = append(a, elem)
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func splitString(str string) []string {
|
||||
if str == "" {
|
||||
return []string{}
|
||||
}
|
||||
return strings.Split(str, ",")
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
|
||||
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
|
||||
|
||||
if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert {
|
||||
dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID))
|
||||
}
|
||||
|
||||
var hasConfigIssues bool
|
||||
|
||||
for id, typ := range si.AllowTypes {
|
||||
_, err := storiface.TypeFromString(typ)
|
||||
if err != nil {
|
||||
//No need to hard-fail here, just warn the user
|
||||
//(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
|
||||
hasConfigIssues = true
|
||||
|
||||
if dbi.alerting != nil {
|
||||
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
|
||||
"message": "bad path type in AllowTypes",
|
||||
"path": string(si.ID),
|
||||
"idx": id,
|
||||
"path_type": typ,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
allow = append(allow, typ)
|
||||
}
|
||||
for id, typ := range si.DenyTypes {
|
||||
_, err := storiface.TypeFromString(typ)
|
||||
if err != nil {
|
||||
//No need to hard-fail here, just warn the user
|
||||
hasConfigIssues = true
|
||||
|
||||
if dbi.alerting != nil {
|
||||
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
|
||||
"message": "bad path type in DenyTypes",
|
||||
"path": string(si.ID),
|
||||
"idx": id,
|
||||
"path_type": typ,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
deny = append(deny, typ)
|
||||
}
|
||||
si.AllowTypes = allow
|
||||
si.DenyTypes = deny
|
||||
|
||||
if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) {
|
||||
dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{
|
||||
"message": "path config is now correct",
|
||||
})
|
||||
}
|
||||
|
||||
for _, u := range si.URLs {
|
||||
if _, err := url.Parse(u); err != nil {
|
||||
return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Single transaction to attach storage which is not present in the DB
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
|
||||
var urls sql.NullString
|
||||
var storageId sql.NullString
|
||||
err = dbi.harmonyDB.QueryRow(ctx,
|
||||
"Select storage_id, urls FROM storagelocation WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return false, xerrors.Errorf("storage attach select fails: ", err)
|
||||
}
|
||||
|
||||
// Storage ID entry exists
|
||||
// TODO: Consider using insert into .. on conflict do update set ... below
|
||||
if storageId.Valid {
|
||||
var currUrls []string
|
||||
if urls.Valid {
|
||||
currUrls = strings.Split(urls.String, ",")
|
||||
}
|
||||
currUrls = union(currUrls, si.URLs)
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
||||
strings.Join(currUrls, ","),
|
||||
si.Weight,
|
||||
si.MaxStorage,
|
||||
si.CanSeal,
|
||||
si.CanStore,
|
||||
strings.Join(si.Groups, ","),
|
||||
strings.Join(si.AllowTo, ","),
|
||||
strings.Join(si.AllowTypes, ","),
|
||||
strings.Join(si.DenyTypes, ","),
|
||||
si.ID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("storage attach UPDATE fails: ", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Insert storage id
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"INSERT INTO StorageLocation "+
|
||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
||||
si.ID,
|
||||
strings.Join(si.URLs, ","),
|
||||
si.Weight,
|
||||
si.MaxStorage,
|
||||
si.CanSeal,
|
||||
si.CanStore,
|
||||
strings.Join(si.Groups, ","),
|
||||
strings.Join(si.AllowTo, ","),
|
||||
strings.Join(si.AllowTypes, ","),
|
||||
strings.Join(si.DenyTypes, ","),
|
||||
st.Capacity,
|
||||
st.Available,
|
||||
st.FSAvailable,
|
||||
st.Reserved,
|
||||
st.Used,
|
||||
time.Now())
|
||||
if err != nil {
|
||||
log.Errorf("StorageAttach insert fails: ", err)
|
||||
return false, xerrors.Errorf("StorageAttach insert fails: ", err)
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
|
||||
// If url not in path urls, error out
|
||||
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
|
||||
|
||||
var qUrls string
|
||||
err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storagelocation WHERE storage_id=$1", string(id)).Scan(&qUrls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
urls := splitString(qUrls)
|
||||
|
||||
var modUrls []string
|
||||
for _, u := range urls {
|
||||
if u != url {
|
||||
modUrls = append(modUrls, u)
|
||||
}
|
||||
}
|
||||
|
||||
// noop if url doesn't exist in urls
|
||||
if len(modUrls) == len(urls) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(modUrls) > 0 {
|
||||
newUrls := strings.Join(modUrls, ",")
|
||||
_, err := dbi.harmonyDB.Exec(ctx, "UPDATE storagelocation set urls=$1 WHERE storage_id=$2", newUrls, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
||||
} else {
|
||||
// Single transaction to drop storage path and sector decls which have this as a storage path
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
// Drop storage path completely
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storagelocation WHERE storage_id=$1", id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Drop all sectors entries which use this storage path
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sectorlocation WHERE storage_id=$1", id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Warnw("Dropping sector storage", "path", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
|
||||
var canSeal, canStore bool
|
||||
err := dbi.harmonyDB.QueryRow(ctx,
|
||||
"SELECT can_seal, can_store FROM storagelocation WHERE storage_id=$1", id).Scan(&canSeal, &canStore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Querying for storage id %d fails with err %w", id, err)
|
||||
}
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
|
||||
report.Stat.Capacity,
|
||||
report.Stat.Available,
|
||||
report.Stat.FSAvailable,
|
||||
report.Stat.Reserved,
|
||||
report.Stat.Used,
|
||||
time.Now())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("updating storage health in DB fails with err: ", err)
|
||||
}
|
||||
|
||||
if report.Stat.Capacity > 0 {
|
||||
ctx, _ = tag.New(ctx,
|
||||
tag.Upsert(metrics.StorageID, string(id)),
|
||||
tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)),
|
||||
tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)),
|
||||
)
|
||||
|
||||
stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity)))
|
||||
stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity)))
|
||||
stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity)))
|
||||
|
||||
stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity))
|
||||
stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable))
|
||||
stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available))
|
||||
stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved))
|
||||
|
||||
if report.Stat.Max > 0 {
|
||||
stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max)))
|
||||
stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used))
|
||||
stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
|
||||
ftValid := false
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
ftValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ftValid {
|
||||
return xerrors.Errorf("Invalid filetype")
|
||||
}
|
||||
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
var currPrimary sql.NullBool
|
||||
err = dbi.harmonyDB.QueryRow(ctx,
|
||||
"SELECT is_primary FROM SectorLocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return false, xerrors.Errorf("DB SELECT fails: ", err)
|
||||
}
|
||||
|
||||
// If storage id already exists for this sector, update primary if need be
|
||||
if currPrimary.Valid {
|
||||
if !currPrimary.Bool && primary {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE SectorLocation set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
s.Miner, s.Number, ft, storageID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("DB update fails: ", err)
|
||||
}
|
||||
} else {
|
||||
log.Warnf("sector %v redeclared in %s", s, storageID)
|
||||
}
|
||||
} else {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"INSERT INTO SectorLocation "+
|
||||
"values($1, $2, $3, $4, $5)",
|
||||
s.Miner, s.Number, ft, storageID, primary)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("DB insert fails: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
|
||||
ftValid := false
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
ftValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ftValid {
|
||||
return xerrors.Errorf("Invalid filetype")
|
||||
}
|
||||
|
||||
_, err := dbi.harmonyDB.Exec(ctx,
|
||||
"DELETE FROM sectorlocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
int(s.Miner), int(s.Number), int(ft), string(storageID))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("StorageDropSector DELETE query fails: ", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||
|
||||
var result []storiface.SectorStorageInfo
|
||||
|
||||
allowList := make(map[string]struct{})
|
||||
storageWithSector := map[string]bool{}
|
||||
|
||||
type dbRes struct {
|
||||
StorageId string
|
||||
Count uint64
|
||||
IsPrimary bool
|
||||
Urls string
|
||||
Weight uint64
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
Groups string
|
||||
AllowTo string
|
||||
AllowTypes string
|
||||
DenyTypes string
|
||||
}
|
||||
|
||||
var rows []dbRes
|
||||
|
||||
fts := ft.AllSet()
|
||||
// Find all storage info which already hold this sector + filetype
|
||||
err := dbi.harmonyDB.Select(ctx, &rows,
|
||||
` SELECT DISTINCT ON (stor.storage_id)
|
||||
stor.storage_id,
|
||||
COUNT(*) OVER(PARTITION BY stor.storage_id) as count,
|
||||
BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary,
|
||||
urls,
|
||||
weight,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM sectorlocation sec
|
||||
JOIN storagelocation stor ON sec.storage_id = stor.storage_id
|
||||
WHERE sec.miner_id = $1
|
||||
AND sec.sector_num = $2
|
||||
AND sec.sector_filetype = ANY($3)
|
||||
ORDER BY stor.storage_id`,
|
||||
s.Miner, s.Number, fts)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Finding sector storage from DB fails with err: ", err)
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
|
||||
// Parse all urls
|
||||
var urls, burls []string
|
||||
for _, u := range splitString(row.Urls) {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
|
||||
urls = append(urls, rl.String())
|
||||
burls = append(burls, u)
|
||||
}
|
||||
|
||||
result = append(result, storiface.SectorStorageInfo{
|
||||
ID: storiface.ID(row.StorageId),
|
||||
URLs: urls,
|
||||
BaseURLs: burls,
|
||||
Weight: row.Weight * row.Count,
|
||||
CanSeal: row.CanSeal,
|
||||
CanStore: row.CanStore,
|
||||
Primary: row.IsPrimary,
|
||||
AllowTypes: splitString(row.AllowTypes),
|
||||
DenyTypes: splitString(row.DenyTypes),
|
||||
})
|
||||
|
||||
storageWithSector[row.StorageId] = true
|
||||
|
||||
allowTo := splitString(row.AllowTo)
|
||||
if allowList != nil && len(allowTo) > 0 {
|
||||
for _, group := range allowTo {
|
||||
allowList[group] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
allowList = nil // allow to any
|
||||
}
|
||||
}
|
||||
|
||||
// Find all storage paths which can hold this sector if allowFetch is true
|
||||
if allowFetch {
|
||||
spaceReq, err := ft.SealSpaceUse(ssize)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||
}
|
||||
|
||||
// Conditions to satisfy when choosing a sector
|
||||
// 1. CanSeal is true
|
||||
// 2. Available >= spaceReq
|
||||
// 3. curr_time - last_heartbeat < SkippedHeartbeatThresh
|
||||
// 4. heartbeat_err is NULL
|
||||
// 5. not one of the earlier picked storage ids
|
||||
// 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes)
|
||||
// 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector
|
||||
|
||||
var rows []struct {
|
||||
StorageId string
|
||||
Urls string
|
||||
Weight uint64
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
Groups string
|
||||
AllowTypes string
|
||||
DenyTypes string
|
||||
}
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
`SELECT storage_id,
|
||||
urls,
|
||||
weight,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM storagelocation
|
||||
WHERE can_seal=true
|
||||
and available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
and heartbeat_err is null`,
|
||||
spaceReq, SkippedHeartbeatThresh)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: ", err)
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
if ok := storageWithSector[row.StorageId]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if !ft.AnyAllowed(splitString(row.AllowTypes), splitString(row.DenyTypes)) {
|
||||
log.Debugf("not selecting on %s, not allowed by file type filters", row.StorageId)
|
||||
continue
|
||||
}
|
||||
|
||||
if allowList != nil {
|
||||
groups := splitString(row.Groups)
|
||||
allow := false
|
||||
for _, group := range groups {
|
||||
if _, found := allowList[group]; found {
|
||||
log.Debugf("path %s in allowed group %s", row.StorageId, group)
|
||||
allow = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !allow {
|
||||
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.StorageId, allowList, groups)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var urls, burls []string
|
||||
for _, u := range splitString(row.Urls) {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
|
||||
urls = append(urls, rl.String())
|
||||
burls = append(burls, u)
|
||||
}
|
||||
|
||||
result = append(result, storiface.SectorStorageInfo{
|
||||
ID: storiface.ID(row.StorageId),
|
||||
URLs: urls,
|
||||
BaseURLs: burls,
|
||||
Weight: row.Weight * 0,
|
||||
CanSeal: row.CanSeal,
|
||||
CanStore: row.CanStore,
|
||||
Primary: false,
|
||||
AllowTypes: splitString(row.AllowTypes),
|
||||
DenyTypes: splitString(row.DenyTypes),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||
|
||||
var qResults []struct {
|
||||
Urls string
|
||||
Weight uint64
|
||||
MaxStorage uint64
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
Groups string
|
||||
AllowTo string
|
||||
AllowTypes string
|
||||
DenyTypes string
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, &qResults,
|
||||
"SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+
|
||||
"FROM StorageLocation WHERE storage_id=$1", string(id))
|
||||
if err != nil {
|
||||
return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: ", err)
|
||||
}
|
||||
|
||||
var sinfo storiface.StorageInfo
|
||||
sinfo.ID = id
|
||||
sinfo.URLs = splitString(qResults[0].Urls)
|
||||
sinfo.Weight = qResults[0].Weight
|
||||
sinfo.MaxStorage = qResults[0].MaxStorage
|
||||
sinfo.CanSeal = qResults[0].CanSeal
|
||||
sinfo.CanStore = qResults[0].CanStore
|
||||
sinfo.Groups = splitString(qResults[0].Groups)
|
||||
sinfo.AllowTo = splitString(qResults[0].AllowTo)
|
||||
sinfo.AllowTypes = splitString(qResults[0].AllowTypes)
|
||||
sinfo.DenyTypes = splitString(qResults[0].DenyTypes)
|
||||
|
||||
return sinfo, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||
var err error
|
||||
var spaceReq uint64
|
||||
switch pathType {
|
||||
case storiface.PathSealing:
|
||||
spaceReq, err = allocate.SealSpaceUse(ssize)
|
||||
case storiface.PathStorage:
|
||||
spaceReq, err = allocate.StoreSpaceUse(ssize)
|
||||
default:
|
||||
return nil, xerrors.Errorf("Unexpected path type")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||
}
|
||||
|
||||
var checkString string
|
||||
if pathType == storiface.PathSealing {
|
||||
checkString = "can_seal = TRUE"
|
||||
} else if pathType == storiface.PathStorage {
|
||||
checkString = "can_store = TRUE"
|
||||
}
|
||||
|
||||
var rows []struct {
|
||||
StorageId string
|
||||
Urls string
|
||||
Weight uint64
|
||||
MaxStorage uint64
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
Groups string
|
||||
AllowTo string
|
||||
AllowTypes string
|
||||
DenyTypes string
|
||||
}
|
||||
|
||||
sql := fmt.Sprintf(`SELECT storage_id,
|
||||
urls,
|
||||
weight,
|
||||
max_storage,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM storagelocation
|
||||
WHERE %s and available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
and heartbeat_err is null
|
||||
order by (available::numeric * weight) desc`, checkString)
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
sql, spaceReq, SkippedHeartbeatThresh)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err)
|
||||
}
|
||||
|
||||
var result []storiface.StorageInfo
|
||||
for _, row := range rows {
|
||||
result = append(result, storiface.StorageInfo{
|
||||
ID: storiface.ID(row.StorageId),
|
||||
URLs: splitString(row.Urls),
|
||||
Weight: row.Weight,
|
||||
MaxStorage: row.MaxStorage,
|
||||
CanSeal: row.CanSeal,
|
||||
CanStore: row.CanStore,
|
||||
Groups: splitString(row.Groups),
|
||||
AllowTo: splitString(row.AllowTo),
|
||||
AllowTypes: splitString(row.AllowTypes),
|
||||
DenyTypes: splitString(row.DenyTypes),
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
// TODO: implementation
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
// TODO: implementation
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
// TODO: implementation
|
||||
return storiface.SectorLocks{}, nil
|
||||
}
|
||||
|
||||
var _ SectorIndex = &DBIndex{}
|
@ -2,13 +2,11 @@ package paths
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
gopath "path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -20,7 +18,6 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/journal/alerting"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
@ -64,87 +61,6 @@ type storageEntry struct {
|
||||
heartbeatErr error
|
||||
}
|
||||
|
||||
type IndexProxy struct {
|
||||
memIndex *Index
|
||||
dbIndex *DBIndex
|
||||
enableSectorIndexDB bool
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageAttach(ctx, info, stat)
|
||||
}
|
||||
return ip.memIndex.StorageAttach(ctx, info, stat)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDetach(ctx, id, url)
|
||||
}
|
||||
return ip.memIndex.StorageDetach(ctx, id, url)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageInfo(ctx, id)
|
||||
}
|
||||
return ip.memIndex.StorageInfo(ctx, id)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageReportHealth(ctx, id, report)
|
||||
}
|
||||
return ip.memIndex.StorageReportHealth(ctx, id, report)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||
}
|
||||
return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||
}
|
||||
return ip.memIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||
}
|
||||
return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||
}
|
||||
return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
return ip.memIndex.StorageLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
return ip.memIndex.StorageGetLocks(ctx)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageList(ctx)
|
||||
}
|
||||
return ip.memIndex.StorageList(ctx)
|
||||
}
|
||||
|
||||
type Index struct {
|
||||
*indexLocks
|
||||
lk sync.RWMutex
|
||||
@ -157,36 +73,6 @@ type Index struct {
|
||||
stores map[storiface.ID]*storageEntry
|
||||
}
|
||||
|
||||
type DBIndex struct {
|
||||
alerting *alerting.Alerting
|
||||
pathAlerts map[storiface.ID]alerting.AlertType
|
||||
|
||||
harmonyDB *harmonydb.DB
|
||||
}
|
||||
|
||||
func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||
return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||
return NewIndexProxy(al, db, enableSectorIndexDB)
|
||||
}
|
||||
}
|
||||
|
||||
func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy {
|
||||
return &IndexProxy{
|
||||
memIndex: NewIndex(al),
|
||||
dbIndex: NewDBIndex(al, db),
|
||||
enableSectorIndexDB: enableSectorIndexDB,
|
||||
}
|
||||
}
|
||||
|
||||
func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
|
||||
return &DBIndex{
|
||||
harmonyDB: db,
|
||||
|
||||
alerting: al,
|
||||
pathAlerts: map[storiface.ID]alerting.AlertType{},
|
||||
}
|
||||
}
|
||||
|
||||
func NewIndex(al *alerting.Alerting) *Index {
|
||||
return &Index{
|
||||
indexLocks: &indexLocks{
|
||||
@ -201,59 +87,6 @@ func NewIndex(al *alerting.Alerting) *Index {
|
||||
}
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||
|
||||
var sectorEntries []struct {
|
||||
Storage_id string
|
||||
Miner_id sql.NullInt64
|
||||
Sector_num sql.NullInt64
|
||||
Sector_filetype sql.NullInt32
|
||||
Is_primary sql.NullBool
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, §orEntries,
|
||||
"select stor.storage_id, miner_id, sector_num, sector_filetype, is_primary from storagelocation stor left join sectorlocation sec on stor.storage_id=sec.storage_id")
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("StorageList DB query fails: ", err)
|
||||
}
|
||||
|
||||
log.Errorf("Sector entries: ", sectorEntries)
|
||||
|
||||
byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{}
|
||||
for _, entry := range sectorEntries {
|
||||
id := storiface.ID(entry.Storage_id)
|
||||
_, ok := byID[id]
|
||||
if !ok {
|
||||
byID[id] = map[abi.SectorID]storiface.SectorFileType{}
|
||||
}
|
||||
|
||||
// skip sector info for storage paths with no sectors
|
||||
if !entry.Miner_id.Valid {
|
||||
continue
|
||||
}
|
||||
|
||||
sectorId := abi.SectorID{
|
||||
Miner: abi.ActorID(entry.Miner_id.Int64),
|
||||
Number: abi.SectorNumber(entry.Sector_num.Int64),
|
||||
}
|
||||
|
||||
byID[id][sectorId] |= storiface.SectorFileType(entry.Sector_filetype.Int32)
|
||||
}
|
||||
|
||||
out := map[storiface.ID][]storiface.Decl{}
|
||||
for id, m := range byID {
|
||||
out[id] = []storiface.Decl{}
|
||||
for sectorID, fileType := range m {
|
||||
out[id] = append(out[id], storiface.Decl{
|
||||
SectorID: sectorID,
|
||||
SectorFileType: fileType,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
@ -283,156 +116,6 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func union(a, b []string) []string {
|
||||
m := make(map[string]bool)
|
||||
|
||||
for _, elem := range a {
|
||||
m[elem] = true
|
||||
}
|
||||
|
||||
for _, elem := range b {
|
||||
if _, ok := m[elem]; !ok {
|
||||
a = append(a, elem)
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func splitString(str string) []string {
|
||||
if str == "" {
|
||||
return []string{}
|
||||
}
|
||||
return strings.Split(str, ",")
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
|
||||
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
|
||||
|
||||
if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert {
|
||||
dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID))
|
||||
}
|
||||
|
||||
var hasConfigIssues bool
|
||||
|
||||
for id, typ := range si.AllowTypes {
|
||||
_, err := storiface.TypeFromString(typ)
|
||||
if err != nil {
|
||||
//No need to hard-fail here, just warn the user
|
||||
//(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
|
||||
hasConfigIssues = true
|
||||
|
||||
if dbi.alerting != nil {
|
||||
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
|
||||
"message": "bad path type in AllowTypes",
|
||||
"path": string(si.ID),
|
||||
"idx": id,
|
||||
"path_type": typ,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
allow = append(allow, typ)
|
||||
}
|
||||
for id, typ := range si.DenyTypes {
|
||||
_, err := storiface.TypeFromString(typ)
|
||||
if err != nil {
|
||||
//No need to hard-fail here, just warn the user
|
||||
hasConfigIssues = true
|
||||
|
||||
if dbi.alerting != nil {
|
||||
dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
|
||||
"message": "bad path type in DenyTypes",
|
||||
"path": string(si.ID),
|
||||
"idx": id,
|
||||
"path_type": typ,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
deny = append(deny, typ)
|
||||
}
|
||||
si.AllowTypes = allow
|
||||
si.DenyTypes = deny
|
||||
|
||||
if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) {
|
||||
dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{
|
||||
"message": "path config is now correct",
|
||||
})
|
||||
}
|
||||
|
||||
for _, u := range si.URLs {
|
||||
if _, err := url.Parse(u); err != nil {
|
||||
return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make below part of a single transaction
|
||||
var urls sql.NullString
|
||||
var storageId sql.NullString
|
||||
err := dbi.harmonyDB.QueryRow(ctx,
|
||||
"Select storage_id, urls from StorageLocation where storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return xerrors.Errorf("storage attach select fails: ", err)
|
||||
}
|
||||
|
||||
// Storage ID entry exists
|
||||
if storageId.Valid {
|
||||
var currUrls []string
|
||||
if urls.Valid {
|
||||
currUrls = strings.Split(urls.String, ",")
|
||||
}
|
||||
currUrls = union(currUrls, si.URLs)
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"update StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 where storage_id=$10",
|
||||
strings.Join(currUrls, ","),
|
||||
si.Weight,
|
||||
si.MaxStorage,
|
||||
si.CanSeal,
|
||||
si.CanStore,
|
||||
strings.Join(si.Groups, ","),
|
||||
strings.Join(si.AllowTo, ","),
|
||||
strings.Join(si.AllowTypes, ","),
|
||||
strings.Join(si.DenyTypes, ","),
|
||||
si.ID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("storage attach update fails: ", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Insert storage id
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"insert into StorageLocation "+
|
||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
||||
si.ID,
|
||||
strings.Join(si.URLs, ","),
|
||||
si.Weight,
|
||||
si.MaxStorage,
|
||||
si.CanSeal,
|
||||
si.CanStore,
|
||||
strings.Join(si.Groups, ","),
|
||||
strings.Join(si.AllowTo, ","),
|
||||
strings.Join(si.AllowTypes, ","),
|
||||
strings.Join(si.DenyTypes, ","),
|
||||
st.Capacity,
|
||||
st.Available,
|
||||
st.FSAvailable,
|
||||
st.Reserved,
|
||||
st.Used,
|
||||
time.Now())
|
||||
if err != nil {
|
||||
log.Errorf("StorageAttach insert fails: ", err)
|
||||
return xerrors.Errorf("StorageAttach insert fails: ", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
|
||||
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
|
||||
|
||||
@ -536,62 +219,6 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
|
||||
// If url not in path urls, error out
|
||||
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
|
||||
|
||||
var qUrls sql.NullString
|
||||
err := dbi.harmonyDB.QueryRow(ctx, "select urls from storagelocation where storage_id=$1", string(id)).Scan(&qUrls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !qUrls.Valid {
|
||||
return xerrors.Errorf("No urls available for storage id: ", id)
|
||||
}
|
||||
urls := splitString(qUrls.String)
|
||||
|
||||
var modUrls []string
|
||||
for _, u := range urls {
|
||||
if u != url {
|
||||
modUrls = append(modUrls, u)
|
||||
}
|
||||
}
|
||||
|
||||
// noop if url doesn't exist in urls
|
||||
if len(modUrls) == len(urls) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(modUrls) > 0 {
|
||||
newUrls := strings.Join(modUrls, ",")
|
||||
_, err := dbi.harmonyDB.Exec(ctx, "update storagelocation set urls=$1 where storage_id=$2", newUrls, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
||||
} else {
|
||||
// Todo: single transaction
|
||||
|
||||
// Drop storage path completely
|
||||
_, err := dbi.harmonyDB.Exec(ctx, "delete from storagelocation where storage_id=$1", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Drop all sectors entries which use this storage path
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "delete from sectorlocation where storage_id=$1", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Warnw("Dropping sector storage", "path", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
@ -680,53 +307,6 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
|
||||
var canSeal, canStore bool
|
||||
err := dbi.harmonyDB.QueryRow(ctx,
|
||||
"select can_seal, can_store from storagelocation where storage_id=$1", id).Scan(&canSeal, &canStore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Querying for storage id %d fails with err %w", id, err)
|
||||
}
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"update storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
|
||||
report.Stat.Capacity,
|
||||
report.Stat.Available,
|
||||
report.Stat.FSAvailable,
|
||||
report.Stat.Reserved,
|
||||
report.Stat.Used,
|
||||
time.Now())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("updating storage health in DB fails with err: ", err)
|
||||
}
|
||||
|
||||
if report.Stat.Capacity > 0 {
|
||||
ctx, _ = tag.New(ctx,
|
||||
tag.Upsert(metrics.StorageID, string(id)),
|
||||
tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)),
|
||||
tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)),
|
||||
)
|
||||
|
||||
stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity)))
|
||||
stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity)))
|
||||
stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity)))
|
||||
|
||||
stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity))
|
||||
stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable))
|
||||
stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available))
|
||||
stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved))
|
||||
|
||||
if report.Stat.Max > 0 {
|
||||
stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max)))
|
||||
stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used))
|
||||
stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
@ -770,54 +350,6 @@ func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
|
||||
ftValid := false
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
ftValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ftValid {
|
||||
return xerrors.Errorf("Invalid filetype")
|
||||
}
|
||||
|
||||
// TODO: make the belowpart of a single transaction
|
||||
var currPrimary sql.NullBool
|
||||
err := dbi.harmonyDB.QueryRow(ctx,
|
||||
"select is_primary from SectorLocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return xerrors.Errorf("DB select fails: ", err)
|
||||
}
|
||||
|
||||
// If storage id already exists for this sector, update primary if need be
|
||||
if currPrimary.Valid {
|
||||
if !currPrimary.Bool && primary {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"update SectorLocation set is_primary = TRUE where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
s.Miner, s.Number, ft, storageID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("DB update fails: ", err)
|
||||
}
|
||||
} else {
|
||||
log.Warnf("sector %v redeclared in %s", s, storageID)
|
||||
}
|
||||
} else {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"insert into SectorLocation "+
|
||||
"values($1, $2, $3, $4, $5)",
|
||||
s.Miner, s.Number, ft, storageID, primary)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("DB insert fails: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
@ -850,29 +382,6 @@ loop:
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
|
||||
ftValid := false
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
ftValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ftValid {
|
||||
return xerrors.Errorf("Invalid filetype")
|
||||
}
|
||||
|
||||
_, err := dbi.harmonyDB.Exec(ctx,
|
||||
"delete from sectorlocation where miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
int(s.Miner), int(s.Number), int(ft), string(storageID))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("StorageDropSector delete query fails: ", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
@ -907,193 +416,6 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||
|
||||
var result []storiface.SectorStorageInfo
|
||||
|
||||
allowList := make(map[string]struct{})
|
||||
storageWithSector := map[string]bool{}
|
||||
|
||||
type dbRes struct {
|
||||
Storage_id string
|
||||
Count uint64
|
||||
Is_primary bool
|
||||
Urls string
|
||||
Weight uint64
|
||||
Can_seal bool
|
||||
Can_store bool
|
||||
Groups string
|
||||
Allow_to string
|
||||
Allow_types string
|
||||
Deny_types string
|
||||
}
|
||||
|
||||
var rows []dbRes
|
||||
|
||||
fts := ft.AllSet()
|
||||
// Find all storage info which already hold this sector + filetype
|
||||
err := dbi.harmonyDB.Select(ctx, &rows,
|
||||
` SELECT DISTINCT ON (stor.storage_id)
|
||||
stor.storage_id,
|
||||
COUNT(*) OVER(PARTITION BY stor.storage_id) as count,
|
||||
BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary,
|
||||
urls,
|
||||
weight,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM sectorlocation sec
|
||||
JOIN storagelocation stor ON sec.storage_id = stor.storage_id
|
||||
WHERE sec.miner_id = $1
|
||||
AND sec.sector_num = $2
|
||||
AND sec.sector_filetype = ANY($3)
|
||||
ORDER BY stor.storage_id`,
|
||||
s.Miner, s.Number, fts)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Finding sector storage from DB fails with err: ", err)
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
|
||||
// Parse all urls
|
||||
var urls, burls []string
|
||||
for _, u := range splitString(row.Urls) {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
|
||||
urls = append(urls, rl.String())
|
||||
burls = append(burls, u)
|
||||
}
|
||||
|
||||
result = append(result, storiface.SectorStorageInfo{
|
||||
ID: storiface.ID(row.Storage_id),
|
||||
URLs: urls,
|
||||
BaseURLs: burls,
|
||||
Weight: row.Weight * row.Count,
|
||||
CanSeal: row.Can_seal,
|
||||
CanStore: row.Can_store,
|
||||
Primary: row.Is_primary,
|
||||
AllowTypes: splitString(row.Allow_types),
|
||||
DenyTypes: splitString(row.Deny_types),
|
||||
})
|
||||
|
||||
storageWithSector[row.Storage_id] = true
|
||||
|
||||
allowTo := splitString(row.Allow_to)
|
||||
if allowList != nil && len(allowTo) > 0 {
|
||||
for _, group := range allowTo {
|
||||
allowList[group] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
allowList = nil // allow to any
|
||||
}
|
||||
}
|
||||
|
||||
// Find all storage paths which can hold this sector if allowFetch is true
|
||||
if allowFetch {
|
||||
spaceReq, err := ft.SealSpaceUse(ssize)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||
}
|
||||
|
||||
// Conditions to satisfy when choosing a sector
|
||||
// 1. CanSeal is true
|
||||
// 2. Available >= spaceReq
|
||||
// 3. curr_time - last_heartbeat < SkippedHeartbeatThresh
|
||||
// 4. heartbeat_err is NULL
|
||||
// 5. not one of the earlier picked storage ids
|
||||
// 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes)
|
||||
// 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector
|
||||
|
||||
var rows []struct {
|
||||
Storage_id string
|
||||
Urls string
|
||||
Weight uint64
|
||||
Can_seal bool
|
||||
Can_store bool
|
||||
Groups string
|
||||
Allow_types string
|
||||
Deny_types string
|
||||
}
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
`select storage_id,
|
||||
urls,
|
||||
weight,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_types,
|
||||
deny_types
|
||||
from storagelocation
|
||||
where can_seal=true
|
||||
and available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
and heartbeat_err is null`,
|
||||
spaceReq, SkippedHeartbeatThresh)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: ", err)
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
if ok := storageWithSector[row.Storage_id]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if !ft.AnyAllowed(splitString(row.Allow_types), splitString(row.Deny_types)) {
|
||||
log.Debugf("not selecting on %s, not allowed by file type filters", row.Storage_id)
|
||||
continue
|
||||
}
|
||||
|
||||
if allowList != nil {
|
||||
groups := splitString(row.Groups)
|
||||
allow := false
|
||||
for _, group := range groups {
|
||||
if _, found := allowList[group]; found {
|
||||
log.Debugf("path %s in allowed group %s", row.Storage_id, group)
|
||||
allow = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !allow {
|
||||
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.Storage_id, allowList, groups)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var urls, burls []string
|
||||
for _, u := range splitString(row.Urls) {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
|
||||
urls = append(urls, rl.String())
|
||||
burls = append(burls, u)
|
||||
}
|
||||
|
||||
result = append(result, storiface.SectorStorageInfo{
|
||||
ID: storiface.ID(row.Storage_id),
|
||||
URLs: urls,
|
||||
BaseURLs: burls,
|
||||
Weight: row.Weight,
|
||||
CanSeal: row.Can_seal,
|
||||
CanStore: row.Can_store,
|
||||
Primary: false,
|
||||
AllowTypes: splitString(row.Allow_types),
|
||||
DenyTypes: splitString(row.Deny_types),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
@ -1242,42 +564,6 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||
|
||||
var qResults []struct {
|
||||
Urls string
|
||||
Weight uint64
|
||||
Max_storage uint64
|
||||
Can_seal bool
|
||||
Can_store bool
|
||||
Groups string
|
||||
Allow_to string
|
||||
Allow_types string
|
||||
Deny_types string
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, &qResults,
|
||||
"select urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+
|
||||
"from StorageLocation where storage_id=$1", string(id))
|
||||
if err != nil {
|
||||
return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: ", err)
|
||||
}
|
||||
|
||||
var sinfo storiface.StorageInfo
|
||||
sinfo.ID = id
|
||||
sinfo.URLs = splitString(qResults[0].Urls)
|
||||
sinfo.Weight = qResults[0].Weight
|
||||
sinfo.MaxStorage = qResults[0].Max_storage
|
||||
sinfo.CanSeal = qResults[0].Can_seal
|
||||
sinfo.CanStore = qResults[0].Can_store
|
||||
sinfo.Groups = splitString(qResults[0].Groups)
|
||||
sinfo.AllowTo = splitString(qResults[0].Allow_to)
|
||||
sinfo.AllowTypes = splitString(qResults[0].Allow_types)
|
||||
sinfo.DenyTypes = splitString(qResults[0].Deny_types)
|
||||
|
||||
return sinfo, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
@ -1290,81 +576,6 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto
|
||||
return *si.info, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||
var err error
|
||||
var spaceReq uint64
|
||||
switch pathType {
|
||||
case storiface.PathSealing:
|
||||
spaceReq, err = allocate.SealSpaceUse(ssize)
|
||||
case storiface.PathStorage:
|
||||
spaceReq, err = allocate.StoreSpaceUse(ssize)
|
||||
default:
|
||||
return nil, xerrors.Errorf("Unexpected path type")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||
}
|
||||
|
||||
var checkString string
|
||||
if pathType == storiface.PathSealing {
|
||||
checkString = "can_seal = TRUE"
|
||||
} else if pathType == storiface.PathStorage {
|
||||
checkString = "can_store = TRUE"
|
||||
}
|
||||
|
||||
var rows []struct {
|
||||
Storage_id string
|
||||
Urls string
|
||||
Weight uint64
|
||||
Max_storage uint64
|
||||
Can_seal bool
|
||||
Can_store bool
|
||||
Groups string
|
||||
Allow_to string
|
||||
Allow_types string
|
||||
Deny_types string
|
||||
}
|
||||
|
||||
sql := fmt.Sprintf(`select storage_id,
|
||||
urls,
|
||||
weight,
|
||||
max_storage,
|
||||
can_seal,
|
||||
can_store,
|
||||
groups,
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
from storagelocation
|
||||
where %s and available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
and heartbeat_err is null
|
||||
order by (available::numeric * weight) desc`, checkString)
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
sql, spaceReq, SkippedHeartbeatThresh)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err)
|
||||
}
|
||||
|
||||
var result []storiface.StorageInfo
|
||||
for _, row := range rows {
|
||||
result = append(result, storiface.StorageInfo{
|
||||
ID: storiface.ID(row.Storage_id),
|
||||
URLs: splitString(row.Urls),
|
||||
Weight: row.Weight,
|
||||
MaxStorage: row.Max_storage,
|
||||
CanSeal: row.Can_seal,
|
||||
CanStore: row.Can_store,
|
||||
Groups: splitString(row.Groups),
|
||||
AllowTo: splitString(row.Allow_to),
|
||||
AllowTypes: splitString(row.Allow_types),
|
||||
DenyTypes: splitString(row.Deny_types),
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
@ -1449,23 +660,4 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]sto
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
// TODO: implementation
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
// TODO: implementation
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
// TODO: implementation
|
||||
return storiface.SectorLocks{}, nil
|
||||
}
|
||||
|
||||
var _ SectorIndex = &Index{}
|
||||
|
||||
var _ SectorIndex = &DBIndex{}
|
||||
|
||||
var _ SectorIndex = &IndexProxy{}
|
||||
|
109
storage/paths/index_proxy.go
Normal file
109
storage/paths/index_proxy.go
Normal file
@ -0,0 +1,109 @@
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/journal/alerting"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type IndexProxy struct {
|
||||
memIndex *Index
|
||||
dbIndex *DBIndex
|
||||
enableSectorIndexDB bool
|
||||
}
|
||||
|
||||
func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||
return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||
return NewIndexProxy(al, db, enableSectorIndexDB)
|
||||
}
|
||||
}
|
||||
|
||||
func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy {
|
||||
return &IndexProxy{
|
||||
memIndex: NewIndex(al),
|
||||
dbIndex: NewDBIndex(al, db),
|
||||
enableSectorIndexDB: enableSectorIndexDB,
|
||||
}
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageAttach(ctx, info, stat)
|
||||
}
|
||||
return ip.memIndex.StorageAttach(ctx, info, stat)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDetach(ctx, id, url)
|
||||
}
|
||||
return ip.memIndex.StorageDetach(ctx, id, url)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageInfo(ctx, id)
|
||||
}
|
||||
return ip.memIndex.StorageInfo(ctx, id)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageReportHealth(ctx, id, report)
|
||||
}
|
||||
return ip.memIndex.StorageReportHealth(ctx, id, report)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||
}
|
||||
return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||
}
|
||||
return ip.memIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||
}
|
||||
return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||
}
|
||||
return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||
return ip.memIndex.StorageLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||
return ip.memIndex.StorageGetLocks(ctx)
|
||||
}
|
||||
|
||||
func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||
if ip.enableSectorIndexDB {
|
||||
return ip.dbIndex.StorageList(ctx)
|
||||
}
|
||||
return ip.memIndex.StorageList(ctx)
|
||||
}
|
||||
|
||||
var _ SectorIndex = &IndexProxy{}
|
Loading…
Reference in New Issue
Block a user