Merge pull request #7453 from filecoin-project/feat/storage-groups
Sector storage groups
This commit is contained in:
commit
37141779ac
Binary file not shown.
@ -95,6 +95,14 @@ over time
|
||||
Name: "max-storage",
|
||||
Usage: "(for init) limit storage space for sectors (expensive for very large paths!)",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "groups",
|
||||
Usage: "path group names",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "allow-to",
|
||||
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
@ -142,6 +150,8 @@ over time
|
||||
CanSeal: cctx.Bool("seal"),
|
||||
CanStore: cctx.Bool("store"),
|
||||
MaxStorage: uint64(maxStor),
|
||||
Groups: cctx.StringSlice("groups"),
|
||||
AllowTo: cctx.StringSlice("allow-to"),
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
@ -322,10 +332,17 @@ var storageListCmd = &cli.Command{
|
||||
if si.CanStore {
|
||||
fmt.Print(color.CyanString("Store"))
|
||||
}
|
||||
fmt.Println("")
|
||||
} else {
|
||||
fmt.Print(color.HiYellowString("Use: ReadOnly"))
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
if len(si.Groups) > 0 {
|
||||
fmt.Printf("\tGroups: %s\n", strings.Join(si.Groups, ", "))
|
||||
}
|
||||
if len(si.AllowTo) > 0 {
|
||||
fmt.Printf("\tAllowTo: %s\n", strings.Join(si.AllowTo, ", "))
|
||||
}
|
||||
|
||||
if localPath, ok := local[s.ID]; ok {
|
||||
fmt.Printf("\tLocal: %s\n", color.GreenString(localPath))
|
||||
|
@ -51,6 +51,14 @@ var storageAttachCmd = &cli.Command{
|
||||
Name: "max-storage",
|
||||
Usage: "(for init) limit storage space for sectors (expensive for very large paths!)",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "groups",
|
||||
Usage: "path group names",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "allow-to",
|
||||
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
|
||||
@ -98,6 +106,8 @@ var storageAttachCmd = &cli.Command{
|
||||
CanSeal: cctx.Bool("seal"),
|
||||
CanStore: cctx.Bool("store"),
|
||||
MaxStorage: uint64(maxStor),
|
||||
Groups: cctx.StringSlice("groups"),
|
||||
AllowTo: cctx.StringSlice("allow-to"),
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
|
@ -2148,7 +2148,9 @@ Inputs:
|
||||
"Weight": 42,
|
||||
"MaxStorage": 42,
|
||||
"CanSeal": true,
|
||||
"CanStore": true
|
||||
"CanStore": true,
|
||||
"Groups": null,
|
||||
"AllowTo": null
|
||||
},
|
||||
{
|
||||
"Capacity": 9,
|
||||
@ -2258,7 +2260,9 @@ Response:
|
||||
"Weight": 42,
|
||||
"MaxStorage": 42,
|
||||
"CanSeal": true,
|
||||
"CanStore": true
|
||||
"CanStore": true,
|
||||
"Groups": null,
|
||||
"AllowTo": null
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -1983,6 +1983,8 @@ OPTIONS:
|
||||
--seal (for init) use path for sealing (default: false)
|
||||
--store (for init) use path for long-term storage (default: false)
|
||||
--max-storage value (for init) limit storage space for sectors (expensive for very large paths!)
|
||||
--groups value path group names
|
||||
--allow-to value path groups allowed to pull data from this path (allow all if not specified)
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
@ -94,6 +94,8 @@ OPTIONS:
|
||||
--seal (for init) use path for sealing (default: false)
|
||||
--store (for init) use path for long-term storage (default: false)
|
||||
--max-storage value (for init) limit storage space for sectors (expensive for very large paths!)
|
||||
--groups value path group names
|
||||
--allow-to value path groups allowed to pull data from this path (allow all if not specified)
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
33
extern/sector-storage/stores/index.go
vendored
33
extern/sector-storage/stores/index.go
vendored
@ -29,6 +29,8 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
|
||||
// filesystem, local or networked / shared by multiple machines
|
||||
type ID string
|
||||
|
||||
type Group = string
|
||||
|
||||
type StorageInfo struct {
|
||||
ID ID
|
||||
URLs []string // TODO: Support non-http transports
|
||||
@ -37,6 +39,9 @@ type StorageInfo struct {
|
||||
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
|
||||
Groups []Group
|
||||
AllowTo []Group
|
||||
}
|
||||
|
||||
type HealthReport struct {
|
||||
@ -168,6 +173,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
|
||||
i.stores[si.ID].info.MaxStorage = si.MaxStorage
|
||||
i.stores[si.ID].info.CanSeal = si.CanSeal
|
||||
i.stores[si.ID].info.CanStore = si.CanStore
|
||||
i.stores[si.ID].info.Groups = si.Groups
|
||||
i.stores[si.ID].info.AllowTo = si.AllowTo
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -292,6 +299,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
|
||||
storageIDs := map[ID]uint64{}
|
||||
isprimary := map[ID]bool{}
|
||||
|
||||
allowTo := map[Group]struct{}{}
|
||||
|
||||
for _, pathType := range storiface.PathTypes {
|
||||
if ft&pathType == 0 {
|
||||
continue
|
||||
@ -323,6 +332,14 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
|
||||
urls[k] = rl.String()
|
||||
}
|
||||
|
||||
if allowTo != nil && len(st.info.AllowTo) > 0 {
|
||||
for _, group := range st.info.AllowTo {
|
||||
allowTo[group] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
allowTo = nil // allow to any
|
||||
}
|
||||
|
||||
out = append(out, SectorStorageInfo{
|
||||
ID: id,
|
||||
URLs: urls,
|
||||
@ -365,6 +382,22 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
|
||||
continue
|
||||
}
|
||||
|
||||
if allowTo != nil {
|
||||
allow := false
|
||||
for _, group := range st.info.Groups {
|
||||
if _, found := allowTo[group]; found {
|
||||
log.Debugf("path %s in allowed group %s", st.info.ID, group)
|
||||
allow = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !allow {
|
||||
log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", st.info.ID, allowTo, st.info.Groups)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
urls := make([]string, len(st.info.URLs))
|
||||
for k, u := range st.info.URLs {
|
||||
rl, err := url.Parse(u)
|
||||
|
154
extern/sector-storage/stores/index_test.go
vendored
Normal file
154
extern/sector-storage/stores/index_test.go
vendored
Normal file
@ -0,0 +1,154 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logging.SetLogLevel("stores", "DEBUG")
|
||||
}
|
||||
|
||||
func newTestStorage() StorageInfo {
|
||||
return StorageInfo{
|
||||
ID: ID(uuid.New().String()),
|
||||
CanSeal: true,
|
||||
CanStore: true,
|
||||
Groups: nil,
|
||||
AllowTo: nil,
|
||||
}
|
||||
}
|
||||
|
||||
var bigFsStat = fsutil.FsStat{
|
||||
Capacity: 1 << 40,
|
||||
Available: 1 << 40,
|
||||
FSAvailable: 1 << 40,
|
||||
Reserved: 0,
|
||||
Max: 0,
|
||||
Used: 0,
|
||||
}
|
||||
|
||||
const s32g = 32 << 30
|
||||
|
||||
func TestFindSimple(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
i := NewIndex()
|
||||
stor1 := newTestStorage()
|
||||
stor2 := newTestStorage()
|
||||
|
||||
require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat))
|
||||
require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat))
|
||||
|
||||
s1 := abi.SectorID{
|
||||
Miner: 12,
|
||||
Number: 34,
|
||||
}
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 0)
|
||||
}
|
||||
|
||||
require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true))
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 1)
|
||||
require.Equal(t, stor1.ID, si[0].ID)
|
||||
}
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindNoAllow(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
i := NewIndex()
|
||||
stor1 := newTestStorage()
|
||||
stor1.AllowTo = []Group{"grp1"}
|
||||
stor2 := newTestStorage()
|
||||
|
||||
require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat))
|
||||
require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat))
|
||||
|
||||
s1 := abi.SectorID{
|
||||
Miner: 12,
|
||||
Number: 34,
|
||||
}
|
||||
require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true))
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 1)
|
||||
require.Equal(t, stor1.ID, si[0].ID)
|
||||
}
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 1)
|
||||
require.Equal(t, stor1.ID, si[0].ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindAllow(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
i := NewIndex()
|
||||
|
||||
stor1 := newTestStorage()
|
||||
stor1.AllowTo = []Group{"grp1"}
|
||||
|
||||
stor2 := newTestStorage()
|
||||
stor2.Groups = []Group{"grp1"}
|
||||
|
||||
stor3 := newTestStorage()
|
||||
stor3.Groups = []Group{"grp2"}
|
||||
|
||||
require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat))
|
||||
require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat))
|
||||
require.NoError(t, i.StorageAttach(ctx, stor3, bigFsStat))
|
||||
|
||||
s1 := abi.SectorID{
|
||||
Miner: 12,
|
||||
Number: 34,
|
||||
}
|
||||
require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true))
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 1)
|
||||
require.Equal(t, stor1.ID, si[0].ID)
|
||||
}
|
||||
|
||||
{
|
||||
si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si, 2)
|
||||
if si[0].ID == stor1.ID {
|
||||
require.Equal(t, stor1.ID, si[0].ID)
|
||||
require.Equal(t, stor2.ID, si[1].ID)
|
||||
} else {
|
||||
require.Equal(t, stor1.ID, si[1].ID)
|
||||
require.Equal(t, stor2.ID, si[0].ID)
|
||||
}
|
||||
}
|
||||
}
|
11
extern/sector-storage/stores/local.go
vendored
11
extern/sector-storage/stores/local.go
vendored
@ -46,6 +46,13 @@ type LocalStorageMeta struct {
|
||||
// MaxStorage specifies the maximum number of bytes to use for sector storage
|
||||
// (0 = unlimited)
|
||||
MaxStorage uint64
|
||||
|
||||
// List of storage groups this path belongs to
|
||||
Groups []string
|
||||
|
||||
// List of storage groups to which data from this path can be moved. If none
|
||||
// are specified, allow to all
|
||||
AllowTo []string
|
||||
}
|
||||
|
||||
// StorageConfig .lotusstorage/storage.json
|
||||
@ -212,6 +219,8 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
MaxStorage: meta.MaxStorage,
|
||||
CanSeal: meta.CanSeal,
|
||||
CanStore: meta.CanStore,
|
||||
Groups: meta.Groups,
|
||||
AllowTo: meta.AllowTo,
|
||||
}, fst)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("declaring storage in index: %w", err)
|
||||
@ -276,6 +285,8 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
MaxStorage: meta.MaxStorage,
|
||||
CanSeal: meta.CanSeal,
|
||||
CanStore: meta.CanStore,
|
||||
Groups: meta.Groups,
|
||||
AllowTo: meta.AllowTo,
|
||||
}, fst)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("redeclaring storage in index: %w", err)
|
||||
|
Loading…
Reference in New Issue
Block a user