2023-08-10 22:35:35 +00:00
package paths
import (
"context"
"database/sql"
"fmt"
"net/url"
gopath "path"
"strings"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type DBIndex struct {
alerting * alerting . Alerting
pathAlerts map [ storiface . ID ] alerting . AlertType
harmonyDB * harmonydb . DB
}
func NewDBIndex ( al * alerting . Alerting , db * harmonydb . DB ) * DBIndex {
return & DBIndex {
harmonyDB : db ,
alerting : al ,
pathAlerts : map [ storiface . ID ] alerting . AlertType { } ,
}
}
func ( dbi * DBIndex ) StorageList ( ctx context . Context ) ( map [ storiface . ID ] [ ] storiface . Decl , error ) {
var sectorEntries [ ] struct {
StorageId string
MinerId sql . NullInt64
SectorNum sql . NullInt64
SectorFiletype sql . NullInt32
IsPrimary sql . NullBool
}
err := dbi . harmonyDB . Select ( ctx , & sectorEntries ,
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storagelocation stor LEFT JOIN sectorlocation sec on stor.storage_id=sec.storage_id" )
if err != nil {
2023-08-14 13:24:00 +00:00
return nil , xerrors . Errorf ( "StorageList DB query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
byID := map [ storiface . ID ] map [ abi . SectorID ] storiface . SectorFileType { }
for _ , entry := range sectorEntries {
id := storiface . ID ( entry . StorageId )
_ , ok := byID [ id ]
if ! ok {
byID [ id ] = map [ abi . SectorID ] storiface . SectorFileType { }
}
// skip sector info for storage paths with no sectors
if ! entry . MinerId . Valid {
continue
}
sectorId := abi . SectorID {
Miner : abi . ActorID ( entry . MinerId . Int64 ) ,
Number : abi . SectorNumber ( entry . SectorNum . Int64 ) ,
}
byID [ id ] [ sectorId ] |= storiface . SectorFileType ( entry . SectorFiletype . Int32 )
}
out := map [ storiface . ID ] [ ] storiface . Decl { }
for id , m := range byID {
out [ id ] = [ ] storiface . Decl { }
for sectorID , fileType := range m {
out [ id ] = append ( out [ id ] , storiface . Decl {
SectorID : sectorID ,
SectorFileType : fileType ,
} )
}
}
return out , nil
}
func union ( a , b [ ] string ) [ ] string {
m := make ( map [ string ] bool )
for _ , elem := range a {
m [ elem ] = true
}
for _ , elem := range b {
if _ , ok := m [ elem ] ; ! ok {
a = append ( a , elem )
}
}
return a
}
func splitString ( str string ) [ ] string {
if str == "" {
return [ ] string { }
}
return strings . Split ( str , "," )
}
func ( dbi * DBIndex ) StorageAttach ( ctx context . Context , si storiface . StorageInfo , st fsutil . FsStat ) error {
var allow , deny = make ( [ ] string , 0 , len ( si . AllowTypes ) ) , make ( [ ] string , 0 , len ( si . DenyTypes ) )
if _ , hasAlert := dbi . pathAlerts [ si . ID ] ; dbi . alerting != nil && ! hasAlert {
dbi . pathAlerts [ si . ID ] = dbi . alerting . AddAlertType ( "sector-index" , "pathconf-" + string ( si . ID ) )
}
var hasConfigIssues bool
for id , typ := range si . AllowTypes {
_ , err := storiface . TypeFromString ( typ )
if err != nil {
//No need to hard-fail here, just warn the user
//(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
hasConfigIssues = true
if dbi . alerting != nil {
dbi . alerting . Raise ( dbi . pathAlerts [ si . ID ] , map [ string ] interface { } {
"message" : "bad path type in AllowTypes" ,
"path" : string ( si . ID ) ,
"idx" : id ,
"path_type" : typ ,
"error" : err . Error ( ) ,
} )
}
continue
}
allow = append ( allow , typ )
}
for id , typ := range si . DenyTypes {
_ , err := storiface . TypeFromString ( typ )
if err != nil {
//No need to hard-fail here, just warn the user
hasConfigIssues = true
if dbi . alerting != nil {
dbi . alerting . Raise ( dbi . pathAlerts [ si . ID ] , map [ string ] interface { } {
"message" : "bad path type in DenyTypes" ,
"path" : string ( si . ID ) ,
"idx" : id ,
"path_type" : typ ,
"error" : err . Error ( ) ,
} )
}
continue
}
deny = append ( deny , typ )
}
si . AllowTypes = allow
si . DenyTypes = deny
if dbi . alerting != nil && ! hasConfigIssues && dbi . alerting . IsRaised ( dbi . pathAlerts [ si . ID ] ) {
dbi . alerting . Resolve ( dbi . pathAlerts [ si . ID ] , map [ string ] string {
"message" : "path config is now correct" ,
} )
}
for _ , u := range si . URLs {
if _ , err := url . Parse ( u ) ; err != nil {
return xerrors . Errorf ( "failed to parse url %s: %w" , si . URLs , err )
}
}
// Single transaction to attach storage which is not present in the DB
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
var urls sql . NullString
var storageId sql . NullString
err = dbi . harmonyDB . QueryRow ( ctx ,
"Select storage_id, urls FROM storagelocation WHERE storage_id = $1" , string ( si . ID ) ) . Scan ( & storageId , & urls )
if err != nil && ! strings . Contains ( err . Error ( ) , "no rows in result set" ) {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "storage attach select fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
// Storage ID entry exists
// TODO: Consider using insert into .. on conflict do update set ... below
if storageId . Valid {
var currUrls [ ] string
if urls . Valid {
currUrls = strings . Split ( urls . String , "," )
}
currUrls = union ( currUrls , si . URLs )
_ , err = dbi . harmonyDB . Exec ( ctx ,
"UPDATE StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10" ,
strings . Join ( currUrls , "," ) ,
si . Weight ,
si . MaxStorage ,
si . CanSeal ,
si . CanStore ,
strings . Join ( si . Groups , "," ) ,
strings . Join ( si . AllowTo , "," ) ,
strings . Join ( si . AllowTypes , "," ) ,
strings . Join ( si . DenyTypes , "," ) ,
si . ID )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "storage attach UPDATE fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return true , nil
}
// Insert storage id
_ , err = dbi . harmonyDB . Exec ( ctx ,
"INSERT INTO StorageLocation " +
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)" ,
si . ID ,
strings . Join ( si . URLs , "," ) ,
si . Weight ,
si . MaxStorage ,
si . CanSeal ,
si . CanStore ,
strings . Join ( si . Groups , "," ) ,
strings . Join ( si . AllowTo , "," ) ,
strings . Join ( si . AllowTypes , "," ) ,
strings . Join ( si . DenyTypes , "," ) ,
st . Capacity ,
st . Available ,
st . FSAvailable ,
st . Reserved ,
st . Used ,
time . Now ( ) )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "StorageAttach insert fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return true , nil
} )
if err != nil {
return err
}
return nil
}
func ( dbi * DBIndex ) StorageDetach ( ctx context . Context , id storiface . ID , url string ) error {
// If url not in path urls, error out
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
var qUrls string
err := dbi . harmonyDB . QueryRow ( ctx , "SELECT COALESCE(urls,'') FROM storagelocation WHERE storage_id=$1" , string ( id ) ) . Scan ( & qUrls )
if err != nil {
return err
}
urls := splitString ( qUrls )
var modUrls [ ] string
for _ , u := range urls {
if u != url {
modUrls = append ( modUrls , u )
}
}
// noop if url doesn't exist in urls
if len ( modUrls ) == len ( urls ) {
return nil
}
if len ( modUrls ) > 0 {
newUrls := strings . Join ( modUrls , "," )
_ , err := dbi . harmonyDB . Exec ( ctx , "UPDATE storagelocation set urls=$1 WHERE storage_id=$2" , newUrls , id )
if err != nil {
return err
}
log . Warnw ( "Dropping sector path endpoint" , "path" , id , "url" , url )
} else {
// Single transaction to drop storage path and sector decls which have this as a storage path
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
// Drop storage path completely
_ , err = dbi . harmonyDB . Exec ( ctx , "DELETE FROM storagelocation WHERE storage_id=$1" , id )
if err != nil {
return false , err
}
// Drop all sectors entries which use this storage path
_ , err = dbi . harmonyDB . Exec ( ctx , "DELETE FROM sectorlocation WHERE storage_id=$1" , id )
if err != nil {
return false , err
}
return true , nil
} )
if err != nil {
return err
}
log . Warnw ( "Dropping sector storage" , "path" , id )
}
return nil
}
func ( dbi * DBIndex ) StorageReportHealth ( ctx context . Context , id storiface . ID , report storiface . HealthReport ) error {
var canSeal , canStore bool
err := dbi . harmonyDB . QueryRow ( ctx ,
"SELECT can_seal, can_store FROM storagelocation WHERE storage_id=$1" , id ) . Scan ( & canSeal , & canStore )
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "Querying for storage id %s fails with err %v" , id , err )
2023-08-10 22:35:35 +00:00
}
_ , err = dbi . harmonyDB . Exec ( ctx ,
"UPDATE storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6" ,
report . Stat . Capacity ,
report . Stat . Available ,
report . Stat . FSAvailable ,
report . Stat . Reserved ,
report . Stat . Used ,
time . Now ( ) )
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "updating storage health in DB fails with err: %v" , err )
2023-08-10 22:35:35 +00:00
}
if report . Stat . Capacity > 0 {
ctx , _ = tag . New ( ctx ,
tag . Upsert ( metrics . StorageID , string ( id ) ) ,
tag . Upsert ( metrics . PathStorage , fmt . Sprint ( canStore ) ) ,
tag . Upsert ( metrics . PathSeal , fmt . Sprint ( canSeal ) ) ,
)
stats . Record ( ctx , metrics . StorageFSAvailable . M ( float64 ( report . Stat . FSAvailable ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageAvailable . M ( float64 ( report . Stat . Available ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageReserved . M ( float64 ( report . Stat . Reserved ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageCapacityBytes . M ( report . Stat . Capacity ) )
stats . Record ( ctx , metrics . StorageFSAvailableBytes . M ( report . Stat . FSAvailable ) )
stats . Record ( ctx , metrics . StorageAvailableBytes . M ( report . Stat . Available ) )
stats . Record ( ctx , metrics . StorageReservedBytes . M ( report . Stat . Reserved ) )
if report . Stat . Max > 0 {
stats . Record ( ctx , metrics . StorageLimitUsed . M ( float64 ( report . Stat . Used ) / float64 ( report . Stat . Max ) ) )
stats . Record ( ctx , metrics . StorageLimitUsedBytes . M ( report . Stat . Used ) )
stats . Record ( ctx , metrics . StorageLimitMaxBytes . M ( report . Stat . Max ) )
}
}
return nil
}
func ( dbi * DBIndex ) StorageDeclareSector ( ctx context . Context , storageID storiface . ID , s abi . SectorID , ft storiface . SectorFileType , primary bool ) error {
ftValid := false
for _ , fileType := range storiface . PathTypes {
if fileType & ft == 0 {
ftValid = true
break
}
}
if ! ftValid {
return xerrors . Errorf ( "Invalid filetype" )
}
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
var currPrimary sql . NullBool
err = dbi . harmonyDB . QueryRow ( ctx ,
"SELECT is_primary FROM SectorLocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
uint64 ( s . Miner ) , uint64 ( s . Number ) , int ( ft ) , string ( storageID ) ) . Scan ( & currPrimary )
if err != nil && ! strings . Contains ( err . Error ( ) , "no rows in result set" ) {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB SELECT fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
// If storage id already exists for this sector, update primary if need be
if currPrimary . Valid {
if ! currPrimary . Bool && primary {
_ , err = dbi . harmonyDB . Exec ( ctx ,
"UPDATE SectorLocation set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
s . Miner , s . Number , ft , storageID )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB update fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
} else {
log . Warnf ( "sector %v redeclared in %s" , s , storageID )
}
} else {
_ , err = dbi . harmonyDB . Exec ( ctx ,
"INSERT INTO SectorLocation " +
"values($1, $2, $3, $4, $5)" ,
s . Miner , s . Number , ft , storageID , primary )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB insert fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
}
return true , nil
} )
if err != nil {
return err
}
return nil
}
func ( dbi * DBIndex ) StorageDropSector ( ctx context . Context , storageID storiface . ID , s abi . SectorID , ft storiface . SectorFileType ) error {
ftValid := false
for _ , fileType := range storiface . PathTypes {
if fileType & ft == 0 {
ftValid = true
break
}
}
if ! ftValid {
return xerrors . Errorf ( "Invalid filetype" )
}
_ , err := dbi . harmonyDB . Exec ( ctx ,
"DELETE FROM sectorlocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
int ( s . Miner ) , int ( s . Number ) , int ( ft ) , string ( storageID ) )
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "StorageDropSector DELETE query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return nil
}
func ( dbi * DBIndex ) StorageFindSector ( ctx context . Context , s abi . SectorID , ft storiface . SectorFileType , ssize abi . SectorSize , allowFetch bool ) ( [ ] storiface . SectorStorageInfo , error ) {
var result [ ] storiface . SectorStorageInfo
allowList := make ( map [ string ] struct { } )
storageWithSector := map [ string ] bool { }
type dbRes struct {
StorageId string
Count uint64
IsPrimary bool
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
var rows [ ] dbRes
fts := ft . AllSet ( )
// Find all storage info which already hold this sector + filetype
err := dbi . harmonyDB . Select ( ctx , & rows ,
` SELECT DISTINCT ON ( stor . storage_id )
stor . storage_id ,
COUNT ( * ) OVER ( PARTITION BY stor . storage_id ) as count ,
BOOL_OR ( is_primary ) OVER ( PARTITION BY stor . storage_id ) AS is_primary ,
urls ,
weight ,
can_seal ,
can_store ,
groups ,
allow_to ,
allow_types ,
deny_types
FROM sectorlocation sec
JOIN storagelocation stor ON sec . storage_id = stor . storage_id
WHERE sec . miner_id = $ 1
AND sec . sector_num = $ 2
AND sec . sector_filetype = ANY ( $ 3 )
ORDER BY stor . storage_id ` ,
s . Miner , s . Number , fts )
if err != nil {
2023-08-14 15:35:18 +00:00
return nil , xerrors . Errorf ( "Finding sector storage from DB fails with err: %v" , err )
2023-08-10 22:35:35 +00:00
}
for _ , row := range rows {
// Parse all urls
var urls , burls [ ] string
for _ , u := range splitString ( row . Urls ) {
rl , err := url . Parse ( u )
if err != nil {
return nil , xerrors . Errorf ( "failed to parse url: %w" , err )
}
rl . Path = gopath . Join ( rl . Path , ft . String ( ) , storiface . SectorName ( s ) )
urls = append ( urls , rl . String ( ) )
burls = append ( burls , u )
}
result = append ( result , storiface . SectorStorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : urls ,
BaseURLs : burls ,
Weight : row . Weight * row . Count ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Primary : row . IsPrimary ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
storageWithSector [ row . StorageId ] = true
allowTo := splitString ( row . AllowTo )
if allowList != nil && len ( allowTo ) > 0 {
for _ , group := range allowTo {
allowList [ group ] = struct { } { }
}
} else {
allowList = nil // allow to any
}
}
// Find all storage paths which can hold this sector if allowFetch is true
if allowFetch {
spaceReq , err := ft . SealSpaceUse ( ssize )
if err != nil {
return nil , xerrors . Errorf ( "estimating required space: %w" , err )
}
// Conditions to satisfy when choosing a sector
// 1. CanSeal is true
// 2. Available >= spaceReq
// 3. curr_time - last_heartbeat < SkippedHeartbeatThresh
// 4. heartbeat_err is NULL
// 5. not one of the earlier picked storage ids
// 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes)
// 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector
var rows [ ] struct {
StorageId string
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTypes string
DenyTypes string
}
err = dbi . harmonyDB . Select ( ctx , & rows ,
` SELECT storage_id ,
urls ,
weight ,
can_seal ,
can_store ,
groups ,
allow_types ,
deny_types
FROM storagelocation
WHERE can_seal = true
and available >= $ 1
and NOW ( ) - last_heartbeat < $ 2
and heartbeat_err is null ` ,
spaceReq , SkippedHeartbeatThresh )
if err != nil {
2023-08-14 15:35:18 +00:00
return nil , xerrors . Errorf ( "Selecting allowfetch storage paths from DB fails err: %v" , err )
2023-08-10 22:35:35 +00:00
}
for _ , row := range rows {
if ok := storageWithSector [ row . StorageId ] ; ok {
continue
}
if ! ft . AnyAllowed ( splitString ( row . AllowTypes ) , splitString ( row . DenyTypes ) ) {
log . Debugf ( "not selecting on %s, not allowed by file type filters" , row . StorageId )
continue
}
if allowList != nil {
groups := splitString ( row . Groups )
allow := false
for _ , group := range groups {
if _ , found := allowList [ group ] ; found {
log . Debugf ( "path %s in allowed group %s" , row . StorageId , group )
allow = true
break
}
}
if ! allow {
log . Debugf ( "not selecting on %s, not in allowed group, allow %+v; path has %+v" , row . StorageId , allowList , groups )
continue
}
}
var urls , burls [ ] string
for _ , u := range splitString ( row . Urls ) {
rl , err := url . Parse ( u )
if err != nil {
return nil , xerrors . Errorf ( "failed to parse url: %w" , err )
}
rl . Path = gopath . Join ( rl . Path , ft . String ( ) , storiface . SectorName ( s ) )
urls = append ( urls , rl . String ( ) )
burls = append ( burls , u )
}
result = append ( result , storiface . SectorStorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : urls ,
BaseURLs : burls ,
Weight : row . Weight * 0 ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Primary : false ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
}
}
return result , nil
}
func ( dbi * DBIndex ) StorageInfo ( ctx context . Context , id storiface . ID ) ( storiface . StorageInfo , error ) {
var qResults [ ] struct {
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
err := dbi . harmonyDB . Select ( ctx , & qResults ,
"SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types " +
"FROM StorageLocation WHERE storage_id=$1" , string ( id ) )
if err != nil {
2023-08-14 15:35:18 +00:00
return storiface . StorageInfo { } , xerrors . Errorf ( "StorageInfo query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
var sinfo storiface . StorageInfo
sinfo . ID = id
sinfo . URLs = splitString ( qResults [ 0 ] . Urls )
sinfo . Weight = qResults [ 0 ] . Weight
sinfo . MaxStorage = qResults [ 0 ] . MaxStorage
sinfo . CanSeal = qResults [ 0 ] . CanSeal
sinfo . CanStore = qResults [ 0 ] . CanStore
sinfo . Groups = splitString ( qResults [ 0 ] . Groups )
sinfo . AllowTo = splitString ( qResults [ 0 ] . AllowTo )
sinfo . AllowTypes = splitString ( qResults [ 0 ] . AllowTypes )
sinfo . DenyTypes = splitString ( qResults [ 0 ] . DenyTypes )
return sinfo , nil
}
func ( dbi * DBIndex ) StorageBestAlloc ( ctx context . Context , allocate storiface . SectorFileType , ssize abi . SectorSize , pathType storiface . PathType ) ( [ ] storiface . StorageInfo , error ) {
var err error
var spaceReq uint64
switch pathType {
case storiface . PathSealing :
spaceReq , err = allocate . SealSpaceUse ( ssize )
case storiface . PathStorage :
spaceReq , err = allocate . StoreSpaceUse ( ssize )
default :
return nil , xerrors . Errorf ( "Unexpected path type" )
}
if err != nil {
return nil , xerrors . Errorf ( "estimating required space: %w" , err )
}
var checkString string
if pathType == storiface . PathSealing {
checkString = "can_seal = TRUE"
} else if pathType == storiface . PathStorage {
checkString = "can_store = TRUE"
}
var rows [ ] struct {
StorageId string
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
sql := fmt . Sprintf ( ` SELECT storage_id ,
urls ,
weight ,
max_storage ,
can_seal ,
can_store ,
groups ,
allow_to ,
allow_types ,
deny_types
FROM storagelocation
WHERE % s and available >= $ 1
and NOW ( ) - last_heartbeat < $ 2
and heartbeat_err is null
order by ( available : : numeric * weight ) desc ` , checkString )
err = dbi . harmonyDB . Select ( ctx , & rows ,
sql , spaceReq , SkippedHeartbeatThresh )
if err != nil {
return nil , xerrors . Errorf ( "Querying for best storage sectors fails with sql: %s and err %w: " , sql , err )
}
var result [ ] storiface . StorageInfo
for _ , row := range rows {
result = append ( result , storiface . StorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : splitString ( row . Urls ) ,
Weight : row . Weight ,
MaxStorage : row . MaxStorage ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Groups : splitString ( row . Groups ) ,
AllowTo : splitString ( row . AllowTo ) ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
}
return result , nil
}
func ( dbi * DBIndex ) StorageLock ( ctx context . Context , sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType ) error {
// TODO: implementation
return nil
}
func ( dbi * DBIndex ) StorageTryLock ( ctx context . Context , sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType ) ( bool , error ) {
// TODO: implementation
return false , nil
}
func ( dbi * DBIndex ) StorageGetLocks ( ctx context . Context ) ( storiface . SectorLocks , error ) {
// TODO: implementation
return storiface . SectorLocks { } , nil
}
var _ SectorIndex = & DBIndex { }