2023-08-10 22:35:35 +00:00
package paths
import (
"context"
"database/sql"
2023-08-22 14:43:50 +00:00
"errors"
2023-08-10 22:35:35 +00:00
"fmt"
"net/url"
gopath "path"
"strings"
"time"
2023-08-16 19:36:00 +00:00
"github.com/google/uuid"
2023-08-10 22:35:35 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
2023-08-22 14:43:50 +00:00
var errAlreadyLocked = errors . New ( "already locked" )
2023-08-10 22:35:35 +00:00
type DBIndex struct {
alerting * alerting . Alerting
pathAlerts map [ storiface . ID ] alerting . AlertType
harmonyDB * harmonydb . DB
}
func NewDBIndex ( al * alerting . Alerting , db * harmonydb . DB ) * DBIndex {
return & DBIndex {
harmonyDB : db ,
alerting : al ,
pathAlerts : map [ storiface . ID ] alerting . AlertType { } ,
}
}
func ( dbi * DBIndex ) StorageList ( ctx context . Context ) ( map [ storiface . ID ] [ ] storiface . Decl , error ) {
var sectorEntries [ ] struct {
StorageId string
MinerId sql . NullInt64
SectorNum sql . NullInt64
2023-11-28 15:05:59 +00:00
SectorFiletype sql . NullInt32 ` db:"sector_filetype" `
2023-08-10 22:35:35 +00:00
IsPrimary sql . NullBool
}
err := dbi . harmonyDB . Select ( ctx , & sectorEntries ,
2023-08-17 17:23:03 +00:00
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storage_path stor LEFT JOIN sector_location sec on stor.storage_id=sec.storage_id" )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 13:24:00 +00:00
return nil , xerrors . Errorf ( "StorageList DB query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
byID := map [ storiface . ID ] map [ abi . SectorID ] storiface . SectorFileType { }
for _ , entry := range sectorEntries {
id := storiface . ID ( entry . StorageId )
_ , ok := byID [ id ]
if ! ok {
byID [ id ] = map [ abi . SectorID ] storiface . SectorFileType { }
}
// skip sector info for storage paths with no sectors
if ! entry . MinerId . Valid {
continue
}
sectorId := abi . SectorID {
Miner : abi . ActorID ( entry . MinerId . Int64 ) ,
Number : abi . SectorNumber ( entry . SectorNum . Int64 ) ,
}
byID [ id ] [ sectorId ] |= storiface . SectorFileType ( entry . SectorFiletype . Int32 )
}
out := map [ storiface . ID ] [ ] storiface . Decl { }
for id , m := range byID {
out [ id ] = [ ] storiface . Decl { }
for sectorID , fileType := range m {
out [ id ] = append ( out [ id ] , storiface . Decl {
SectorID : sectorID ,
SectorFileType : fileType ,
} )
}
}
return out , nil
}
func union ( a , b [ ] string ) [ ] string {
m := make ( map [ string ] bool )
for _ , elem := range a {
m [ elem ] = true
}
for _ , elem := range b {
if _ , ok := m [ elem ] ; ! ok {
a = append ( a , elem )
}
}
return a
}
func splitString ( str string ) [ ] string {
if str == "" {
return [ ] string { }
}
return strings . Split ( str , "," )
}
func ( dbi * DBIndex ) StorageAttach ( ctx context . Context , si storiface . StorageInfo , st fsutil . FsStat ) error {
var allow , deny = make ( [ ] string , 0 , len ( si . AllowTypes ) ) , make ( [ ] string , 0 , len ( si . DenyTypes ) )
if _ , hasAlert := dbi . pathAlerts [ si . ID ] ; dbi . alerting != nil && ! hasAlert {
dbi . pathAlerts [ si . ID ] = dbi . alerting . AddAlertType ( "sector-index" , "pathconf-" + string ( si . ID ) )
}
var hasConfigIssues bool
for id , typ := range si . AllowTypes {
_ , err := storiface . TypeFromString ( typ )
if err != nil {
//No need to hard-fail here, just warn the user
//(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
hasConfigIssues = true
if dbi . alerting != nil {
dbi . alerting . Raise ( dbi . pathAlerts [ si . ID ] , map [ string ] interface { } {
"message" : "bad path type in AllowTypes" ,
"path" : string ( si . ID ) ,
"idx" : id ,
"path_type" : typ ,
"error" : err . Error ( ) ,
} )
}
continue
}
allow = append ( allow , typ )
}
for id , typ := range si . DenyTypes {
_ , err := storiface . TypeFromString ( typ )
if err != nil {
//No need to hard-fail here, just warn the user
hasConfigIssues = true
if dbi . alerting != nil {
dbi . alerting . Raise ( dbi . pathAlerts [ si . ID ] , map [ string ] interface { } {
"message" : "bad path type in DenyTypes" ,
"path" : string ( si . ID ) ,
"idx" : id ,
"path_type" : typ ,
"error" : err . Error ( ) ,
} )
}
continue
}
deny = append ( deny , typ )
}
si . AllowTypes = allow
si . DenyTypes = deny
if dbi . alerting != nil && ! hasConfigIssues && dbi . alerting . IsRaised ( dbi . pathAlerts [ si . ID ] ) {
dbi . alerting . Resolve ( dbi . pathAlerts [ si . ID ] , map [ string ] string {
"message" : "path config is now correct" ,
} )
}
for _ , u := range si . URLs {
if _ , err := url . Parse ( u ) ; err != nil {
return xerrors . Errorf ( "failed to parse url %s: %w" , si . URLs , err )
}
}
2023-12-11 16:50:49 +00:00
retryWait := time . Millisecond * 100
2023-12-07 21:32:35 +00:00
retryAttachStorage :
2023-08-10 22:35:35 +00:00
// Single transaction to attach storage which is not present in the DB
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
var urls sql . NullString
var storageId sql . NullString
2023-12-07 21:32:35 +00:00
err = tx . QueryRow (
2024-01-05 15:49:56 +00:00
"SELECT storage_id, urls FROM storage_path WHERE storage_id = $1" , string ( si . ID ) ) . Scan ( & storageId , & urls )
2023-08-10 22:35:35 +00:00
if err != nil && ! strings . Contains ( err . Error ( ) , "no rows in result set" ) {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "storage attach select fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
// Storage ID entry exists
// TODO: Consider using insert into .. on conflict do update set ... below
if storageId . Valid {
var currUrls [ ] string
if urls . Valid {
currUrls = strings . Split ( urls . String , "," )
}
currUrls = union ( currUrls , si . URLs )
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2024-01-05 15:49:56 +00:00
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9, last_heartbeat=NOW() WHERE storage_id=$10" ,
2023-08-10 22:35:35 +00:00
strings . Join ( currUrls , "," ) ,
si . Weight ,
si . MaxStorage ,
si . CanSeal ,
si . CanStore ,
strings . Join ( si . Groups , "," ) ,
strings . Join ( si . AllowTo , "," ) ,
strings . Join ( si . AllowTypes , "," ) ,
strings . Join ( si . DenyTypes , "," ) ,
si . ID )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "storage attach UPDATE fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return true , nil
}
// Insert storage id
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2023-08-17 17:23:03 +00:00
"INSERT INTO storage_path " +
2024-01-05 15:49:56 +00:00
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW())" ,
2023-08-10 22:35:35 +00:00
si . ID ,
strings . Join ( si . URLs , "," ) ,
si . Weight ,
si . MaxStorage ,
si . CanSeal ,
si . CanStore ,
strings . Join ( si . Groups , "," ) ,
strings . Join ( si . AllowTo , "," ) ,
strings . Join ( si . AllowTypes , "," ) ,
strings . Join ( si . DenyTypes , "," ) ,
st . Capacity ,
st . Available ,
st . FSAvailable ,
st . Reserved ,
2024-01-05 15:49:56 +00:00
st . Used )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "StorageAttach insert fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return true , nil
} )
if err != nil {
2023-12-07 21:32:35 +00:00
if harmonydb . IsErrSerialization ( err ) {
2023-12-11 16:50:49 +00:00
time . Sleep ( retryWait )
retryWait *= 2
2023-12-07 21:32:35 +00:00
goto retryAttachStorage
}
2023-08-10 22:35:35 +00:00
return err
}
return nil
}
func ( dbi * DBIndex ) StorageDetach ( ctx context . Context , id storiface . ID , url string ) error {
// If url not in path urls, error out
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
var qUrls string
2023-08-17 17:23:03 +00:00
err := dbi . harmonyDB . QueryRow ( ctx , "SELECT COALESCE(urls,'') FROM storage_path WHERE storage_id=$1" , string ( id ) ) . Scan ( & qUrls )
2023-08-10 22:35:35 +00:00
if err != nil {
return err
}
urls := splitString ( qUrls )
var modUrls [ ] string
for _ , u := range urls {
if u != url {
modUrls = append ( modUrls , u )
}
}
// noop if url doesn't exist in urls
if len ( modUrls ) == len ( urls ) {
return nil
}
if len ( modUrls ) > 0 {
newUrls := strings . Join ( modUrls , "," )
2023-08-17 17:23:03 +00:00
_ , err := dbi . harmonyDB . Exec ( ctx , "UPDATE storage_path set urls=$1 WHERE storage_id=$2" , newUrls , id )
2023-08-10 22:35:35 +00:00
if err != nil {
return err
}
log . Warnw ( "Dropping sector path endpoint" , "path" , id , "url" , url )
} else {
2023-12-11 16:50:49 +00:00
retryWait := time . Millisecond * 100
2023-12-07 21:32:35 +00:00
retryDropPath :
2023-08-10 22:35:35 +00:00
// Single transaction to drop storage path and sector decls which have this as a storage path
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
// Drop storage path completely
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec ( "DELETE FROM storage_path WHERE storage_id=$1" , id )
2023-08-10 22:35:35 +00:00
if err != nil {
return false , err
}
// Drop all sectors entries which use this storage path
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec ( "DELETE FROM sector_location WHERE storage_id=$1" , id )
2023-08-10 22:35:35 +00:00
if err != nil {
return false , err
}
return true , nil
} )
if err != nil {
2023-12-07 21:32:35 +00:00
if harmonydb . IsErrSerialization ( err ) {
2023-12-11 16:50:49 +00:00
time . Sleep ( retryWait )
retryWait *= 2
2023-12-07 21:32:35 +00:00
goto retryDropPath
}
2023-08-10 22:35:35 +00:00
return err
}
log . Warnw ( "Dropping sector storage" , "path" , id )
}
return nil
}
func ( dbi * DBIndex ) StorageReportHealth ( ctx context . Context , id storiface . ID , report storiface . HealthReport ) error {
var canSeal , canStore bool
err := dbi . harmonyDB . QueryRow ( ctx ,
2023-08-17 17:23:03 +00:00
"SELECT can_seal, can_store FROM storage_path WHERE storage_id=$1" , id ) . Scan ( & canSeal , & canStore )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "Querying for storage id %s fails with err %v" , id , err )
2023-08-10 22:35:35 +00:00
}
_ , err = dbi . harmonyDB . Exec ( ctx ,
2024-01-05 15:49:56 +00:00
"UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=NOW()" ,
2023-08-10 22:35:35 +00:00
report . Stat . Capacity ,
report . Stat . Available ,
report . Stat . FSAvailable ,
report . Stat . Reserved ,
2024-01-05 15:49:56 +00:00
report . Stat . Used )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "updating storage health in DB fails with err: %v" , err )
2023-08-10 22:35:35 +00:00
}
if report . Stat . Capacity > 0 {
ctx , _ = tag . New ( ctx ,
tag . Upsert ( metrics . StorageID , string ( id ) ) ,
tag . Upsert ( metrics . PathStorage , fmt . Sprint ( canStore ) ) ,
tag . Upsert ( metrics . PathSeal , fmt . Sprint ( canSeal ) ) ,
)
stats . Record ( ctx , metrics . StorageFSAvailable . M ( float64 ( report . Stat . FSAvailable ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageAvailable . M ( float64 ( report . Stat . Available ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageReserved . M ( float64 ( report . Stat . Reserved ) / float64 ( report . Stat . Capacity ) ) )
stats . Record ( ctx , metrics . StorageCapacityBytes . M ( report . Stat . Capacity ) )
stats . Record ( ctx , metrics . StorageFSAvailableBytes . M ( report . Stat . FSAvailable ) )
stats . Record ( ctx , metrics . StorageAvailableBytes . M ( report . Stat . Available ) )
stats . Record ( ctx , metrics . StorageReservedBytes . M ( report . Stat . Reserved ) )
if report . Stat . Max > 0 {
stats . Record ( ctx , metrics . StorageLimitUsed . M ( float64 ( report . Stat . Used ) / float64 ( report . Stat . Max ) ) )
stats . Record ( ctx , metrics . StorageLimitUsedBytes . M ( report . Stat . Used ) )
stats . Record ( ctx , metrics . StorageLimitMaxBytes . M ( report . Stat . Max ) )
}
}
return nil
}
2023-08-16 14:57:08 +00:00
// function to check if a filetype is valid
func ( dbi * DBIndex ) checkFileType ( fileType storiface . SectorFileType ) bool {
2023-08-10 22:35:35 +00:00
ftValid := false
2023-08-16 14:57:08 +00:00
for _ , fileTypeValid := range storiface . PathTypes {
if fileTypeValid & fileType == 0 {
2023-08-10 22:35:35 +00:00
ftValid = true
break
}
}
2023-08-16 14:57:08 +00:00
return ftValid
}
func ( dbi * DBIndex ) StorageDeclareSector ( ctx context . Context , storageID storiface . ID , s abi . SectorID , ft storiface . SectorFileType , primary bool ) error {
if ! dbi . checkFileType ( ft ) {
return xerrors . Errorf ( "invalid filetype" )
2023-08-10 22:35:35 +00:00
}
2023-12-11 16:50:49 +00:00
retryWait := time . Millisecond * 100
2023-12-07 21:32:35 +00:00
retryStorageDeclareSector :
2023-08-10 22:35:35 +00:00
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
var currPrimary sql . NullBool
2023-12-07 21:32:35 +00:00
err = tx . QueryRow (
2023-08-17 17:23:03 +00:00
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
2023-08-10 22:35:35 +00:00
uint64 ( s . Miner ) , uint64 ( s . Number ) , int ( ft ) , string ( storageID ) ) . Scan ( & currPrimary )
if err != nil && ! strings . Contains ( err . Error ( ) , "no rows in result set" ) {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB SELECT fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
// If storage id already exists for this sector, update primary if need be
if currPrimary . Valid {
if ! currPrimary . Bool && primary {
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2023-08-17 17:23:03 +00:00
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
2023-08-10 22:35:35 +00:00
s . Miner , s . Number , ft , storageID )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB update fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
} else {
log . Warnf ( "sector %v redeclared in %s" , s , storageID )
}
} else {
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2023-08-17 17:23:03 +00:00
"INSERT INTO sector_location " +
2023-08-10 22:35:35 +00:00
"values($1, $2, $3, $4, $5)" ,
s . Miner , s . Number , ft , storageID , primary )
if err != nil {
2023-08-14 17:56:29 +00:00
return false , xerrors . Errorf ( "DB insert fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
}
return true , nil
} )
if err != nil {
2023-12-07 21:32:35 +00:00
if harmonydb . IsErrSerialization ( err ) {
2023-12-11 16:50:49 +00:00
time . Sleep ( retryWait )
retryWait *= 2
2023-12-07 21:32:35 +00:00
goto retryStorageDeclareSector
}
2023-08-10 22:35:35 +00:00
return err
}
return nil
}
func ( dbi * DBIndex ) StorageDropSector ( ctx context . Context , storageID storiface . ID , s abi . SectorID , ft storiface . SectorFileType ) error {
2023-08-16 14:57:08 +00:00
if ! dbi . checkFileType ( ft ) {
return xerrors . Errorf ( "invalid filetype" )
2023-08-10 22:35:35 +00:00
}
_ , err := dbi . harmonyDB . Exec ( ctx ,
2023-08-17 17:23:03 +00:00
"DELETE FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4" ,
2023-08-10 22:35:35 +00:00
int ( s . Miner ) , int ( s . Number ) , int ( ft ) , string ( storageID ) )
if err != nil {
2023-08-14 13:24:00 +00:00
return xerrors . Errorf ( "StorageDropSector DELETE query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
return nil
}
func ( dbi * DBIndex ) StorageFindSector ( ctx context . Context , s abi . SectorID , ft storiface . SectorFileType , ssize abi . SectorSize , allowFetch bool ) ( [ ] storiface . SectorStorageInfo , error ) {
var result [ ] storiface . SectorStorageInfo
allowList := make ( map [ string ] struct { } )
storageWithSector := map [ string ] bool { }
type dbRes struct {
StorageId string
Count uint64
IsPrimary bool
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
var rows [ ] dbRes
fts := ft . AllSet ( )
// Find all storage info which already hold this sector + filetype
err := dbi . harmonyDB . Select ( ctx , & rows ,
` SELECT DISTINCT ON ( stor . storage_id )
stor . storage_id ,
COUNT ( * ) OVER ( PARTITION BY stor . storage_id ) as count ,
BOOL_OR ( is_primary ) OVER ( PARTITION BY stor . storage_id ) AS is_primary ,
urls ,
weight ,
can_seal ,
can_store ,
groups ,
allow_to ,
allow_types ,
deny_types
2023-08-17 17:23:03 +00:00
FROM sector_location sec
JOIN storage_path stor ON sec . storage_id = stor . storage_id
2023-08-10 22:35:35 +00:00
WHERE sec . miner_id = $ 1
AND sec . sector_num = $ 2
AND sec . sector_filetype = ANY ( $ 3 )
ORDER BY stor . storage_id ` ,
s . Miner , s . Number , fts )
if err != nil {
2023-08-14 15:35:18 +00:00
return nil , xerrors . Errorf ( "Finding sector storage from DB fails with err: %v" , err )
2023-08-10 22:35:35 +00:00
}
for _ , row := range rows {
// Parse all urls
var urls , burls [ ] string
for _ , u := range splitString ( row . Urls ) {
rl , err := url . Parse ( u )
if err != nil {
return nil , xerrors . Errorf ( "failed to parse url: %w" , err )
}
rl . Path = gopath . Join ( rl . Path , ft . String ( ) , storiface . SectorName ( s ) )
urls = append ( urls , rl . String ( ) )
burls = append ( burls , u )
}
result = append ( result , storiface . SectorStorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : urls ,
BaseURLs : burls ,
Weight : row . Weight * row . Count ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Primary : row . IsPrimary ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
storageWithSector [ row . StorageId ] = true
allowTo := splitString ( row . AllowTo )
if allowList != nil && len ( allowTo ) > 0 {
for _ , group := range allowTo {
allowList [ group ] = struct { } { }
}
} else {
allowList = nil // allow to any
}
}
// Find all storage paths which can hold this sector if allowFetch is true
if allowFetch {
spaceReq , err := ft . SealSpaceUse ( ssize )
if err != nil {
return nil , xerrors . Errorf ( "estimating required space: %w" , err )
}
// Conditions to satisfy when choosing a sector
// 1. CanSeal is true
// 2. Available >= spaceReq
// 3. curr_time - last_heartbeat < SkippedHeartbeatThresh
// 4. heartbeat_err is NULL
// 5. not one of the earlier picked storage ids
// 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes)
// 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector
var rows [ ] struct {
StorageId string
Urls string
Weight uint64
CanSeal bool
CanStore bool
Groups string
AllowTypes string
DenyTypes string
}
err = dbi . harmonyDB . Select ( ctx , & rows ,
` SELECT storage_id ,
urls ,
weight ,
can_seal ,
can_store ,
groups ,
allow_types ,
deny_types
2023-08-17 17:23:03 +00:00
FROM storage_path
2023-08-10 22:35:35 +00:00
WHERE can_seal = true
and available >= $ 1
2024-01-05 15:49:56 +00:00
and NOW ( ) - ( $ 2 * INTERVAL ' 1 second ' ) < last_heartbeat
2023-08-10 22:35:35 +00:00
and heartbeat_err is null ` ,
2024-01-05 15:49:56 +00:00
spaceReq , SkippedHeartbeatThresh . Seconds ( ) )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 15:35:18 +00:00
return nil , xerrors . Errorf ( "Selecting allowfetch storage paths from DB fails err: %v" , err )
2023-08-10 22:35:35 +00:00
}
for _ , row := range rows {
if ok := storageWithSector [ row . StorageId ] ; ok {
continue
}
if ! ft . AnyAllowed ( splitString ( row . AllowTypes ) , splitString ( row . DenyTypes ) ) {
log . Debugf ( "not selecting on %s, not allowed by file type filters" , row . StorageId )
continue
}
if allowList != nil {
groups := splitString ( row . Groups )
allow := false
for _ , group := range groups {
if _ , found := allowList [ group ] ; found {
log . Debugf ( "path %s in allowed group %s" , row . StorageId , group )
allow = true
break
}
}
if ! allow {
log . Debugf ( "not selecting on %s, not in allowed group, allow %+v; path has %+v" , row . StorageId , allowList , groups )
continue
}
}
var urls , burls [ ] string
for _ , u := range splitString ( row . Urls ) {
rl , err := url . Parse ( u )
if err != nil {
return nil , xerrors . Errorf ( "failed to parse url: %w" , err )
}
rl . Path = gopath . Join ( rl . Path , ft . String ( ) , storiface . SectorName ( s ) )
urls = append ( urls , rl . String ( ) )
burls = append ( burls , u )
}
result = append ( result , storiface . SectorStorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : urls ,
BaseURLs : burls ,
Weight : row . Weight * 0 ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Primary : false ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
}
}
return result , nil
}
func ( dbi * DBIndex ) StorageInfo ( ctx context . Context , id storiface . ID ) ( storiface . StorageInfo , error ) {
var qResults [ ] struct {
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
err := dbi . harmonyDB . Select ( ctx , & qResults ,
"SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types " +
2023-08-17 17:23:03 +00:00
"FROM storage_path WHERE storage_id=$1" , string ( id ) )
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-14 15:35:18 +00:00
return storiface . StorageInfo { } , xerrors . Errorf ( "StorageInfo query fails: %v" , err )
2023-08-10 22:35:35 +00:00
}
var sinfo storiface . StorageInfo
sinfo . ID = id
sinfo . URLs = splitString ( qResults [ 0 ] . Urls )
sinfo . Weight = qResults [ 0 ] . Weight
sinfo . MaxStorage = qResults [ 0 ] . MaxStorage
sinfo . CanSeal = qResults [ 0 ] . CanSeal
sinfo . CanStore = qResults [ 0 ] . CanStore
sinfo . Groups = splitString ( qResults [ 0 ] . Groups )
sinfo . AllowTo = splitString ( qResults [ 0 ] . AllowTo )
sinfo . AllowTypes = splitString ( qResults [ 0 ] . AllowTypes )
sinfo . DenyTypes = splitString ( qResults [ 0 ] . DenyTypes )
return sinfo , nil
}
func ( dbi * DBIndex ) StorageBestAlloc ( ctx context . Context , allocate storiface . SectorFileType , ssize abi . SectorSize , pathType storiface . PathType ) ( [ ] storiface . StorageInfo , error ) {
var err error
var spaceReq uint64
switch pathType {
case storiface . PathSealing :
spaceReq , err = allocate . SealSpaceUse ( ssize )
case storiface . PathStorage :
spaceReq , err = allocate . StoreSpaceUse ( ssize )
default :
2023-08-16 14:57:08 +00:00
return nil , xerrors . Errorf ( "unexpected path type" )
2023-08-10 22:35:35 +00:00
}
if err != nil {
return nil , xerrors . Errorf ( "estimating required space: %w" , err )
}
var rows [ ] struct {
StorageId string
Urls string
Weight uint64
MaxStorage uint64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
}
2023-08-17 17:23:03 +00:00
err = dbi . harmonyDB . Select ( ctx , & rows ,
` SELECT storage_id ,
2023-08-10 22:35:35 +00:00
urls ,
weight ,
max_storage ,
can_seal ,
can_store ,
groups ,
allow_to ,
allow_types ,
deny_types
2023-08-17 17:23:03 +00:00
FROM storage_path
WHERE available >= $ 1
2024-01-05 15:49:56 +00:00
and NOW ( ) - ( $ 2 * INTERVAL ' 1 second ' ) < last_heartbeat
2023-08-10 22:35:35 +00:00
and heartbeat_err is null
2024-01-05 15:49:56 +00:00
and ( ( $ 3 and can_seal = TRUE ) or ( $ 4 and can_store = TRUE ) )
2023-08-17 17:23:03 +00:00
order by ( available : : numeric * weight ) desc ` ,
spaceReq ,
2024-01-05 15:49:56 +00:00
SkippedHeartbeatThresh . Seconds ( ) ,
2023-08-17 17:23:03 +00:00
pathType == storiface . PathSealing ,
pathType == storiface . PathStorage ,
)
2023-08-10 22:35:35 +00:00
if err != nil {
2023-08-17 17:23:03 +00:00
return nil , xerrors . Errorf ( "Querying for best storage sectors fails with err %w: " , err )
2023-08-10 22:35:35 +00:00
}
var result [ ] storiface . StorageInfo
for _ , row := range rows {
result = append ( result , storiface . StorageInfo {
ID : storiface . ID ( row . StorageId ) ,
URLs : splitString ( row . Urls ) ,
Weight : row . Weight ,
MaxStorage : row . MaxStorage ,
CanSeal : row . CanSeal ,
CanStore : row . CanStore ,
Groups : splitString ( row . Groups ) ,
AllowTo : splitString ( row . AllowTo ) ,
AllowTypes : splitString ( row . AllowTypes ) ,
DenyTypes : splitString ( row . DenyTypes ) ,
} )
}
return result , nil
}
2023-08-15 01:47:27 +00:00
// timeout after which we consider a lock to be stale
const LockTimeOut = 300 * time . Second
2023-08-16 14:57:08 +00:00
func isLocked ( ts sql . NullTime ) bool {
return ts . Valid && ts . Time . After ( time . Now ( ) . Add ( - LockTimeOut ) )
}
2023-08-16 19:36:00 +00:00
func ( dbi * DBIndex ) lock ( ctx context . Context , sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType , lockUuid uuid . UUID ) ( bool , error ) {
2023-08-14 22:46:41 +00:00
if read | write == 0 {
return false , nil
}
if read | write > ( 1 << storiface . FileTypes ) - 1 {
return false , xerrors . Errorf ( "unknown file types specified" )
}
var rows [ ] struct {
2023-11-28 15:05:59 +00:00
SectorFileType storiface . SectorFileType ` db:"sector_filetype" `
2023-08-14 22:46:41 +00:00
ReadTs sql . NullTime
ReadRefs int
WriteTs sql . NullTime
}
_ , err := dbi . harmonyDB . BeginTransaction ( ctx , func ( tx * harmonydb . Tx ) ( commit bool , err error ) {
fts := ( read | write ) . AllSet ( )
2023-12-07 21:32:35 +00:00
err = tx . Select ( & rows ,
2023-08-16 14:57:08 +00:00
` SELECT sector_filetype , read_ts , read_refs , write_ts
2023-08-17 17:23:03 +00:00
FROM sector_location
2023-08-14 22:46:41 +00:00
WHERE miner_id = $ 1
AND sector_num = $ 2
AND sector_filetype = ANY ( $ 3 ) ` ,
sector . Miner , sector . Number , fts )
if err != nil {
return false , xerrors . Errorf ( "StorageLock SELECT fails: %v" , err )
}
type locks struct {
readTs sql . NullTime
readRefs int
writeTs sql . NullTime
}
lockMap := make ( map [ storiface . SectorFileType ] locks )
for _ , row := range rows {
lockMap [ row . SectorFileType ] = locks {
readTs : row . ReadTs ,
readRefs : row . ReadRefs ,
writeTs : row . WriteTs ,
}
}
// Check if we can acquire write locks
2023-08-15 01:47:27 +00:00
// Conditions: No write lock or write lock is stale, No read lock or read lock is stale
2023-08-16 14:57:08 +00:00
for _ , wft := range write . AllSet ( ) {
if isLocked ( lockMap [ wft ] . writeTs ) || isLocked ( lockMap [ wft ] . readTs ) {
2023-08-22 14:43:50 +00:00
return false , xerrors . Errorf ( "cannot acquire writelock for sector %v filetype %d already locked: %w" , sector , wft , errAlreadyLocked )
2023-08-14 22:46:41 +00:00
}
}
// Check if we can acquire read locks
2023-08-15 01:47:27 +00:00
// Conditions: No write lock or write lock is stale
2023-08-16 14:57:08 +00:00
for _ , rft := range read . AllSet ( ) {
if isLocked ( lockMap [ rft ] . writeTs ) {
2023-08-22 14:43:50 +00:00
return false , xerrors . Errorf ( "cannot acquire read lock for sector %v filetype %d already locked for writing: %w" , sector , rft , errAlreadyLocked )
2023-08-14 22:46:41 +00:00
}
}
// Acquire write locks
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2023-08-17 17:23:03 +00:00
` UPDATE sector_location
2023-08-16 19:36:00 +00:00
SET write_ts = NOW ( ) , write_lock_owner = $ 1
WHERE miner_id = $ 2
AND sector_num = $ 3
AND sector_filetype = ANY ( $ 4 ) ` ,
lockUuid . String ( ) ,
sector . Miner ,
sector . Number ,
write . AllSet ( ) )
2023-08-14 22:46:41 +00:00
if err != nil {
2023-08-16 14:57:08 +00:00
return false , xerrors . Errorf ( "acquiring write locks for sector %v fails with err: %v" , sector , err )
2023-08-14 22:46:41 +00:00
}
// Acquire read locks
2023-12-07 21:32:35 +00:00
_ , err = tx . Exec (
2023-08-17 17:23:03 +00:00
` UPDATE sector_location
2023-08-14 22:46:41 +00:00
SET read_ts = NOW ( ) , read_refs = read_refs + 1
WHERE miner_id = $ 1
AND sector_num = $ 2
AND sector_filetype = ANY ( $ 3 ) ` ,
2023-08-16 19:36:00 +00:00
sector . Miner ,
sector . Number ,
read . AllSet ( ) )
2023-08-14 22:46:41 +00:00
if err != nil {
2023-08-16 14:57:08 +00:00
return false , xerrors . Errorf ( "acquiring read locks for sector %v fails with err: %v" , sector , err )
2023-08-14 22:46:41 +00:00
}
return true , nil
} )
if err != nil {
return false , err
}
return true , nil
}
2023-08-21 15:54:51 +00:00
func ( dbi * DBIndex ) unlock ( sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType , lockUuid uuid . UUID ) ( bool , error ) {
ctx := context . Background ( )
2023-08-16 19:36:00 +00:00
if read | write == 0 {
return false , nil
}
if read | write > ( 1 << storiface . FileTypes ) - 1 {
return false , xerrors . Errorf ( "unknown file types specified" )
}
// Relinquish write locks
_ , err := dbi . harmonyDB . Exec ( ctx ,
2023-08-17 17:23:03 +00:00
` UPDATE sector_location
2023-08-16 19:36:00 +00:00
SET write_ts = NULL , write_lock_owner = NULL
2023-08-21 15:54:51 +00:00
WHERE miner_id = $ 1
AND sector_num = $ 2
AND write_lock_owner = $ 3
AND sector_filetype = ANY ( $ 4 ) ` ,
2023-08-16 19:36:00 +00:00
sector . Miner ,
sector . Number ,
lockUuid . String ( ) ,
write . AllSet ( ) )
if err != nil {
return false , xerrors . Errorf ( "relinquishing write locks for sector %v fails with err: %v" , sector , err )
}
// Relinquish read locks
_ , err = dbi . harmonyDB . Exec ( ctx ,
2023-08-17 17:23:03 +00:00
` UPDATE sector_location
2023-08-16 19:36:00 +00:00
SET read_refs = read_refs - 1 ,
read_ts = CASE WHEN read_refs - 1 = 0 THEN NULL ELSE read_ts END
WHERE miner_id = $ 1
AND sector_num = $ 2
AND sector_filetype = ANY ( $ 3 ) ` ,
sector . Miner ,
sector . Number ,
read . AllSet ( ) )
if err != nil {
return false , xerrors . Errorf ( "relinquishing read locks for sector %v fails with err: %v" , sector , err )
}
return true , nil
}
2023-08-10 22:35:35 +00:00
func ( dbi * DBIndex ) StorageLock ( ctx context . Context , sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType ) error {
2023-08-14 22:46:41 +00:00
2023-08-16 14:57:08 +00:00
retries := 5
2023-08-21 15:54:51 +00:00
maxWaitTime := 300 // Set max wait time to 5 minutes
2023-08-16 14:57:08 +00:00
2023-08-21 15:54:51 +00:00
waitTime := 1
2023-08-16 19:36:00 +00:00
// generate uuid for this lock owner
lockUuid := uuid . New ( )
2023-08-21 15:54:51 +00:00
// retry with exponential backoff and block until lock is acquired
for {
locked , err := dbi . lock ( ctx , sector , read , write , lockUuid )
// if err is not nil and is not because we cannot acquire lock, retry
2023-08-22 16:19:18 +00:00
if err != nil && ! errors . Is ( err , errAlreadyLocked ) {
2023-08-21 15:54:51 +00:00
retries --
if retries == 0 {
return err
}
}
if locked {
2023-08-16 14:57:08 +00:00
break
}
2023-08-21 15:54:51 +00:00
select {
case <- time . After ( time . Duration ( waitTime ) * time . Second ) :
if waitTime < maxWaitTime {
waitTime *= 2
}
case <- ctx . Done ( ) :
return ctx . Err ( )
}
2023-08-16 19:36:00 +00:00
}
go func ( ) {
<- ctx . Done ( )
2023-08-21 15:54:51 +00:00
_ , err := dbi . unlock ( sector , read , write , lockUuid )
2023-08-16 19:36:00 +00:00
if err != nil {
log . Errorf ( "unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v" , sector , read , write , err )
}
2023-08-14 22:46:41 +00:00
2023-08-16 19:36:00 +00:00
} ( )
return nil
2023-08-10 22:35:35 +00:00
}
func ( dbi * DBIndex ) StorageTryLock ( ctx context . Context , sector abi . SectorID , read storiface . SectorFileType , write storiface . SectorFileType ) ( bool , error ) {
2023-08-16 19:36:00 +00:00
lockUuid := uuid . New ( )
locked , err := dbi . lock ( ctx , sector , read , write , lockUuid )
if err != nil {
return false , err
}
if locked {
go func ( ) {
<- ctx . Done ( )
2023-08-21 15:54:51 +00:00
_ , err := dbi . unlock ( sector , read , write , lockUuid )
2023-08-16 19:36:00 +00:00
if err != nil {
log . Errorf ( "unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v" , sector , read , write , err )
}
} ( )
}
return locked , nil
2023-08-10 22:35:35 +00:00
}
func ( dbi * DBIndex ) StorageGetLocks ( ctx context . Context ) ( storiface . SectorLocks , error ) {
2023-08-14 22:46:41 +00:00
var rows [ ] struct {
MinerId uint64
SectorNum uint64
2023-11-28 15:05:59 +00:00
SectorFileType int ` db:"sector_filetype" `
2023-08-14 22:46:41 +00:00
ReadTs sql . NullTime
ReadRefs int
WriteTs sql . NullTime
}
err := dbi . harmonyDB . Select ( ctx , & rows ,
2023-08-17 17:23:03 +00:00
"SELECT miner_id, sector_num, sector_filetype, read_ts, read_refs, write_ts FROM sector_location" )
2023-08-14 22:46:41 +00:00
if err != nil {
return storiface . SectorLocks { } , err
}
type locks struct {
sectorFileType storiface . SectorFileType
readRefs uint
writeLk bool
}
sectorLocks := make ( map [ abi . SectorID ] locks )
for _ , row := range rows {
sector := abi . SectorID {
Miner : abi . ActorID ( row . MinerId ) ,
Number : abi . SectorNumber ( row . SectorNum ) ,
}
2023-08-16 14:57:08 +00:00
var readRefs uint
if isLocked ( row . ReadTs ) {
readRefs = uint ( row . ReadRefs )
}
2023-08-14 22:46:41 +00:00
sectorLocks [ sector ] = locks {
sectorFileType : storiface . SectorFileType ( row . SectorFileType ) ,
2023-08-16 14:57:08 +00:00
readRefs : readRefs ,
writeLk : isLocked ( row . WriteTs ) ,
2023-08-14 22:46:41 +00:00
}
}
var result storiface . SectorLocks
for sector , locks := range sectorLocks {
var lock storiface . SectorLock
lock . Sector = sector
lock . Read [ locks . sectorFileType ] = locks . readRefs
if locks . writeLk {
lock . Write [ locks . sectorFileType ] = 1
} else {
lock . Write [ locks . sectorFileType ] = 0
}
2023-08-16 14:57:08 +00:00
result . Locks = append ( result . Locks , lock )
2023-08-14 22:46:41 +00:00
}
return result , nil
2023-08-10 22:35:35 +00:00
}
var _ SectorIndex = & DBIndex { }