working changes

This commit is contained in:
Shrenuj Bansal 2023-08-08 20:59:21 -04:00
parent 864e8219ae
commit 652ee04ce2
8 changed files with 206 additions and 145 deletions

View File

@ -7,8 +7,6 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
logging "github.com/ipfs/go-log/v2"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -18,6 +16,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
@ -49,6 +48,7 @@ import (
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
storageminer "github.com/filecoin-project/lotus/miner" storageminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
@ -468,7 +468,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "", harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
func(s string) { logging.Logger("harmonydb").Error(s) }) func(s string) { logging.Logger("harmonydb").Error(s) })
si := paths.NewIndex(nil, harmonyDB) // TODO: get this bool from miner init cmd line
enableSectorIndexDB := true
si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)
lstor, err := paths.NewLocal(ctx, lr, si, nil) lstor, err := paths.NewLocal(ctx, lr, si, nil)
if err != nil { if err != nil {

View File

@ -145,6 +145,10 @@
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS # env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
#EnableMarkets = false #EnableMarkets = false
# type: bool
# env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB
#EnableSectorIndexDB = false
# type: string # type: string
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO # env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
#SealerApiInfo = "" #SealerApiInfo = ""

View File

@ -15,6 +15,7 @@ import (
) )
func TestPathTypeFilters(t *testing.T) { func TestPathTypeFilters(t *testing.T) {
runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) { runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -132,8 +132,8 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableSectorStorage, If(cfg.Subsystems.EnableSectorStorage,
// Sector storage // Sector storage
Override(new(*paths.Index), paths.NewIndex), Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(true)),
Override(new(paths.SectorIndex), From(new(*paths.Index))), Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),

View File

@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner {
EnableSealing: true, EnableSealing: true,
EnableSectorStorage: true, EnableSectorStorage: true,
EnableMarkets: false, EnableMarkets: false,
EnableSectorIndexDB: false,
}, },
Fees: MinerFeeConfig{ Fees: MinerFeeConfig{

View File

@ -747,6 +747,12 @@ over the worker address if this flag is set.`,
Comment: ``, Comment: ``,
}, },
{
Name: "EnableSectorIndexDB",
Type: "bool",
Comment: ``,
},
{ {
Name: "SealerApiInfo", Name: "SealerApiInfo",
Type: "string", Type: "string",

View File

@ -109,6 +109,7 @@ type MinerSubsystemConfig struct {
EnableSealing bool EnableSealing bool
EnableSectorStorage bool EnableSectorStorage bool
EnableMarkets bool EnableMarkets bool
EnableSectorIndexDB bool
SealerApiInfo string // if EnableSealing == false SealerApiInfo string // if EnableSealing == false
SectorIndexApiInfo string // if EnableSectorStorage == false SectorIndexApiInfo string // if EnableSectorStorage == false

View File

@ -5,7 +5,6 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"net/url" "net/url"
gopath "path" gopath "path"
"sort" "sort"
@ -21,6 +20,7 @@ import (
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -64,6 +64,87 @@ type storageEntry struct {
heartbeatErr error heartbeatErr error
} }
type IndexProxy struct {
memIndex *Index
dbIndex *DBIndex
enableSectorIndexDB bool
}
func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageAttach(ctx, info, stat)
}
return ip.memIndex.StorageAttach(ctx, info, stat)
}
func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDetach(ctx, id, url)
}
return ip.memIndex.StorageDetach(ctx, id, url)
}
func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageInfo(ctx, id)
}
return ip.memIndex.StorageInfo(ctx, id)
}
func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageReportHealth(ctx, id, report)
}
return ip.memIndex.StorageReportHealth(ctx, id, report)
}
func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
}
return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
}
func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft)
}
return ip.memIndex.StorageDropSector(ctx, storageID, s, ft)
}
func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
}
return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
}
func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
}
return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
}
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
return ip.memIndex.StorageLock(ctx, sector, read, write)
}
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
}
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
return ip.memIndex.StorageGetLocks(ctx)
}
func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageList(ctx)
}
return ip.memIndex.StorageList(ctx)
}
type Index struct { type Index struct {
*indexLocks *indexLocks
lk sync.RWMutex lk sync.RWMutex
@ -74,8 +155,6 @@ type Index struct {
sectors map[storiface.Decl][]*declMeta sectors map[storiface.Decl][]*declMeta
stores map[storiface.ID]*storageEntry stores map[storiface.ID]*storageEntry
dbi *DBIndex
} }
type DBIndex struct { type DBIndex struct {
@ -85,6 +164,20 @@ type DBIndex struct {
harmonyDB *harmonydb.DB harmonyDB *harmonydb.DB
} }
func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
return NewIndexProxy(al, db, enableSectorIndexDB)
}
}
func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy {
return &IndexProxy{
memIndex: NewIndex(al),
dbIndex: NewDBIndex(al, db),
enableSectorIndexDB: enableSectorIndexDB,
}
}
func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
return &DBIndex{ return &DBIndex{
harmonyDB: db, harmonyDB: db,
@ -94,7 +187,7 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
} }
} }
func NewIndex(al *alerting.Alerting, db *harmonydb.DB) *Index { func NewIndex(al *alerting.Alerting) *Index {
return &Index{ return &Index{
indexLocks: &indexLocks{ indexLocks: &indexLocks{
locks: map[abi.SectorID]*sectorLock{}, locks: map[abi.SectorID]*sectorLock{},
@ -105,8 +198,6 @@ func NewIndex(al *alerting.Alerting, db *harmonydb.DB) *Index {
sectors: map[storiface.Decl][]*declMeta{}, sectors: map[storiface.Decl][]*declMeta{},
stores: map[storiface.ID]*storageEntry{}, stores: map[storiface.ID]*storageEntry{},
dbi: NewDBIndex(al, db),
} }
} }
@ -159,30 +250,6 @@ func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storifa
}) })
} }
} }
//for _, entry := range sectorEntries {
//
// id := storiface.ID(entry.Storage_id)
// _, ok := out[id]
// if !ok {
// out[id] = []storiface.Decl{}
// }
//
// // skip sector info for storage paths with no sectors
// if !entry.Miner_id.Valid {
// continue
// }
//
// sectorId := abi.SectorID{
// Miner: abi.ActorID(entry.Miner_id.Int64),
// Number: abi.SectorNumber(entry.Sector_num.Int64),
// }
// decl := storiface.Decl{
// SectorID: sectorId,
// SectorFileType: storiface.SectorFileType(entry.Sector_filetype.Int32),
// }
//
// out[id] = append(out[id], decl)
//}
return out, nil return out, nil
} }
@ -213,17 +280,7 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D
} }
} }
log.Errorf("In memory index storage list: ", out) return out, nil
dbout, err := i.dbi.StorageList(ctx)
if err != nil {
log.Errorf("DB Index storage list error: ", err)
return nil, err
} else {
log.Errorf("DB index storage list: ", dbout)
}
return dbout, nil
} }
func union(a, b []string) []string { func union(a, b []string) []string {
@ -241,51 +298,58 @@ func union(a, b []string) []string {
return a return a
} }
func splitString(str string) []string {
if str == "" {
return []string{}
}
return strings.Split(str, ",")
}
func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
//if _, hasAlert := dbi.pathAlerts[si.ID]; i.alerting != nil && !hasAlert { if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert {
// i.pathAlerts[si.ID] = i.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID))
//} }
//
//var hasConfigIssues bool
for _, typ := range si.AllowTypes { var hasConfigIssues bool
for id, typ := range si.AllowTypes {
_, err := storiface.TypeFromString(typ) _, err := storiface.TypeFromString(typ)
if err != nil { if err != nil {
// No need to hard-fail here, just warn the user //No need to hard-fail here, just warn the user
// (note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) //(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
//hasConfigIssues = true hasConfigIssues = true
//if i.alerting != nil { if dbi.alerting != nil {
// i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{ dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
// "message": "bad path type in AllowTypes", "message": "bad path type in AllowTypes",
// "path": string(si.ID), "path": string(si.ID),
// "idx": id, "idx": id,
// "path_type": typ, "path_type": typ,
// "error": err.Error(), "error": err.Error(),
// }) })
//} }
continue continue
} }
allow = append(allow, typ) allow = append(allow, typ)
} }
for _, typ := range si.DenyTypes { for id, typ := range si.DenyTypes {
_, err := storiface.TypeFromString(typ) _, err := storiface.TypeFromString(typ)
if err != nil { if err != nil {
// No need to hard-fail here, just warn the user //No need to hard-fail here, just warn the user
//hasConfigIssues = true hasConfigIssues = true
//if i.alerting != nil { if dbi.alerting != nil {
// i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{ dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{
// "message": "bad path type in DenyTypes", "message": "bad path type in DenyTypes",
// "path": string(si.ID), "path": string(si.ID),
// "idx": id, "idx": id,
// "path_type": typ, "path_type": typ,
// "error": err.Error(), "error": err.Error(),
// }) })
//} }
continue continue
} }
@ -294,11 +358,11 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
si.AllowTypes = allow si.AllowTypes = allow
si.DenyTypes = deny si.DenyTypes = deny
//if i.alerting != nil && !hasConfigIssues && i.alerting.IsRaised(i.pathAlerts[si.ID]) { if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) {
// i.alerting.Resolve(i.pathAlerts[si.ID], map[string]string{ dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{
// "message": "path config is now correct", "message": "path config is now correct",
// }) })
//} }
for _, u := range si.URLs { for _, u := range si.URLs {
if _, err := url.Parse(u); err != nil { if _, err := url.Parse(u); err != nil {
@ -306,6 +370,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
} }
} }
// TODO: make below part of a single transaction
var urls sql.NullString var urls sql.NullString
var storageId sql.NullString var storageId sql.NullString
err := dbi.harmonyDB.QueryRow(ctx, err := dbi.harmonyDB.QueryRow(ctx,
@ -321,7 +386,6 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
currUrls = strings.Split(urls.String, ",") currUrls = strings.Split(urls.String, ",")
} }
currUrls = union(currUrls, si.URLs) currUrls = union(currUrls, si.URLs)
//updatedUrls := strings.Join(currUrls, ",")
_, err = dbi.harmonyDB.Exec(ctx, _, err = dbi.harmonyDB.Exec(ctx,
"update StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 where storage_id=$10", "update StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 where storage_id=$10",
@ -469,12 +533,6 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
lastHeartbeat: time.Now(), lastHeartbeat: time.Now(),
} }
err := i.dbi.StorageAttach(ctx, si, st)
if err != nil {
print("db index storage attach error: ", err)
return err
}
return nil return nil
} }
@ -482,7 +540,6 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
// If url not in path urls, error out // If url not in path urls, error out
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
// record stats: droppedEntries, primaryEntries, droppedDecls
var qUrls sql.NullString var qUrls sql.NullString
err := dbi.harmonyDB.QueryRow(ctx, "select urls from storagelocation where storage_id=$1", string(id)).Scan(&qUrls) err := dbi.harmonyDB.QueryRow(ctx, "select urls from storagelocation where storage_id=$1", string(id)).Scan(&qUrls)
@ -493,7 +550,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
if !qUrls.Valid { if !qUrls.Valid {
return xerrors.Errorf("No urls available for storage id: ", id) return xerrors.Errorf("No urls available for storage id: ", id)
} }
urls := strings.Split(qUrls.String, ",") urls := splitString(qUrls.String)
var modUrls []string var modUrls []string
for _, u := range urls { for _, u := range urls {
@ -516,6 +573,8 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
log.Warnw("Dropping sector path endpoint", "path", id, "url", url) log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
} else { } else {
// Todo: single transaction
// Drop storage path completely // Drop storage path completely
_, err := dbi.harmonyDB.Exec(ctx, "delete from storagelocation where storage_id=$1", id) _, err := dbi.harmonyDB.Exec(ctx, "delete from storagelocation where storage_id=$1", id)
if err != nil { if err != nil {
@ -618,11 +677,6 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string)
log.Warnw("Dropping sector path endpoint", "path", id, "url", url) log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
} }
err := i.dbi.StorageDetach(ctx, id, url)
if err != nil {
return xerrors.Errorf("storage detach fails with the error: ", err)
}
return nil return nil
} }
@ -768,8 +822,6 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()
log.Errorf("StorageDeclareSector: %v, %v, %v, %v", storageID, s, ft, primary)
loop: loop:
for _, fileType := range storiface.PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
@ -795,12 +847,6 @@ loop:
}) })
} }
err := i.dbi.StorageDeclareSector(ctx, storageID, s, ft, primary)
if err != nil {
print("DB index StorageDeclareSector err: ", err)
return err
}
return nil return nil
} }
@ -831,8 +877,6 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()
log.Errorf("StorageDropSector: %v, %v, %v", storageID, s, ft)
for _, fileType := range storiface.PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
@ -860,11 +904,6 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s
i.sectors[d] = rewritten i.sectors[d] = rewritten
} }
err := i.dbi.StorageDropSector(ctx, storageID, s, ft)
if err != nil {
return xerrors.Errorf("DB StorageDropSector fails with err: ", err)
}
return nil return nil
} }
@ -921,7 +960,7 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
// Parse all urls // Parse all urls
var urls, burls []string var urls, burls []string
for _, u := range strings.Split(row.Urls, ",") { for _, u := range splitString(row.Urls) {
rl, err := url.Parse(u) rl, err := url.Parse(u)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err) return nil, xerrors.Errorf("failed to parse url: %w", err)
@ -939,13 +978,13 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
CanSeal: row.Can_seal, CanSeal: row.Can_seal,
CanStore: row.Can_store, CanStore: row.Can_store,
Primary: row.Is_primary, Primary: row.Is_primary,
AllowTypes: strings.Split(row.Allow_types, ","), AllowTypes: splitString(row.Allow_types),
DenyTypes: strings.Split(row.Deny_types, ","), DenyTypes: splitString(row.Deny_types),
}) })
storageWithSector[row.Storage_id] = true storageWithSector[row.Storage_id] = true
allowTo := strings.Split(row.Allow_to, ",") allowTo := splitString(row.Allow_to)
if allowList != nil && len(allowTo) > 0 { if allowList != nil && len(allowTo) > 0 {
for _, group := range allowTo { for _, group := range allowTo {
allowList[group] = struct{}{} allowList[group] = struct{}{}
@ -1005,13 +1044,13 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
continue continue
} }
if !ft.AnyAllowed(strings.Split(row.Allow_types, ","), strings.Split(row.Deny_types, ",")) { if !ft.AnyAllowed(splitString(row.Allow_types), splitString(row.Deny_types)) {
log.Debugf("not selecting on %s, not allowed by file type filters", row.Storage_id) log.Debugf("not selecting on %s, not allowed by file type filters", row.Storage_id)
continue continue
} }
if allowList != nil { if allowList != nil {
groups := strings.Split(row.Groups, ",") groups := splitString(row.Groups)
allow := false allow := false
for _, group := range groups { for _, group := range groups {
if _, found := allowList[group]; found { if _, found := allowList[group]; found {
@ -1028,7 +1067,7 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
} }
var urls, burls []string var urls, burls []string
for _, u := range strings.Split(row.Urls, ",") { for _, u := range splitString(row.Urls) {
rl, err := url.Parse(u) rl, err := url.Parse(u)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err) return nil, xerrors.Errorf("failed to parse url: %w", err)
@ -1046,8 +1085,8 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
CanSeal: row.Can_seal, CanSeal: row.Can_seal,
CanStore: row.Can_store, CanStore: row.Can_store,
Primary: false, Primary: false,
AllowTypes: strings.Split(row.Allow_types, ","), AllowTypes: splitString(row.Allow_types),
DenyTypes: strings.Split(row.Deny_types, ","), DenyTypes: splitString(row.Deny_types),
}) })
} }
} }
@ -1059,8 +1098,6 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
log.Errorf("StorageFindSector: %v, %v, %v, %v, %v", s, ft, ssize, allowFetch, i.sectors)
storageIDs := map[storiface.ID]uint64{} storageIDs := map[storiface.ID]uint64{}
isprimary := map[storiface.ID]bool{} isprimary := map[storiface.ID]bool{}
@ -1202,16 +1239,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
} }
} }
log.Errorf("StorageFindSector out: ", out) return out, nil
dbout, err := i.dbi.StorageFindSector(ctx, s, ft, ssize, allowFetch)
if err != nil {
return nil, xerrors.Errorf("DB StorageFindSector fails err: ", err)
}
log.Errorf("StorageFindSector dbout: ", dbout)
return dbout, nil
} }
func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
@ -1237,15 +1265,15 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface
var sinfo storiface.StorageInfo var sinfo storiface.StorageInfo
sinfo.ID = id sinfo.ID = id
sinfo.URLs = strings.Split(qResults[0].Urls, ",") sinfo.URLs = splitString(qResults[0].Urls)
sinfo.Weight = qResults[0].Weight sinfo.Weight = qResults[0].Weight
sinfo.MaxStorage = qResults[0].Max_storage sinfo.MaxStorage = qResults[0].Max_storage
sinfo.CanSeal = qResults[0].Can_seal sinfo.CanSeal = qResults[0].Can_seal
sinfo.CanStore = qResults[0].Can_store sinfo.CanStore = qResults[0].Can_store
sinfo.Groups = strings.Split(qResults[0].Groups, ",") sinfo.Groups = splitString(qResults[0].Groups)
sinfo.AllowTo = strings.Split(qResults[0].Allow_to, ",") sinfo.AllowTo = splitString(qResults[0].Allow_to)
sinfo.AllowTypes = strings.Split(qResults[0].Allow_types, ",") sinfo.AllowTypes = splitString(qResults[0].Allow_types)
sinfo.DenyTypes = strings.Split(qResults[0].Deny_types, ",") sinfo.DenyTypes = splitString(qResults[0].Deny_types)
return sinfo, nil return sinfo, nil
} }
@ -1254,6 +1282,8 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
log.Errorf("StorageInfo called id: ", id)
si, found := i.stores[id] si, found := i.stores[id]
if !found { if !found {
return storiface.StorageInfo{}, xerrors.Errorf("sector store not found") return storiface.StorageInfo{}, xerrors.Errorf("sector store not found")
@ -1322,20 +1352,18 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
for _, row := range rows { for _, row := range rows {
result = append(result, storiface.StorageInfo{ result = append(result, storiface.StorageInfo{
ID: storiface.ID(row.Storage_id), ID: storiface.ID(row.Storage_id),
URLs: strings.Split(row.Urls, ","), URLs: splitString(row.Urls),
Weight: row.Weight, Weight: row.Weight,
MaxStorage: row.Max_storage, MaxStorage: row.Max_storage,
CanSeal: row.Can_seal, CanSeal: row.Can_seal,
CanStore: row.Can_store, CanStore: row.Can_store,
Groups: strings.Split(row.Groups, ","), Groups: splitString(row.Groups),
AllowTo: strings.Split(row.Allow_to, ","), AllowTo: splitString(row.Allow_to),
AllowTypes: strings.Split(row.Allow_types, ","), AllowTypes: splitString(row.Allow_types),
DenyTypes: strings.Split(row.Deny_types, ","), DenyTypes: splitString(row.Deny_types),
}) })
} }
log.Errorf("StorageBestAlloc result: ", result)
return result, nil return result, nil
} }
@ -1401,8 +1429,6 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorF
out[i] = *candidate.info out[i] = *candidate.info
} }
return i.dbi.StorageBestAlloc(ctx, allocate, ssize, pathType)
return out, nil return out, nil
} }
@ -1425,4 +1451,23 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]sto
return out, nil return out, nil
} }
func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
// TODO: implementation
return nil
}
func (dbi *DBIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
// TODO: implementation
return false, nil
}
func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
// TODO: implementation
return storiface.SectorLocks{}, nil
}
var _ SectorIndex = &Index{} var _ SectorIndex = &Index{}
var _ SectorIndex = &DBIndex{}
var _ SectorIndex = &IndexProxy{}