Merge pull request #9013 from filecoin-project/feat/path-type-filters

feat: storage: Path type filters
This commit is contained in:
Łukasz Magiera 2022-07-15 13:08:43 +02:00 committed by GitHub
commit 9eb8f4ee9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 733 additions and 57 deletions

View File

@ -903,6 +903,11 @@ workflows:
suite: itest-nonce
target: "./itests/nonce_test.go"
- test:
name: test-itest-path_type_filters
suite: itest-path_type_filters
target: "./itests/path_type_filters_test.go"
- test:
name: test-itest-paych_api
suite: itest-paych_api
@ -978,6 +983,11 @@ workflows:
suite: itest-wdpost_dispute
target: "./itests/wdpost_dispute_test.go"
- test:
name: test-itest-wdpost_no_miner_storage
suite: itest-wdpost_no_miner_storage
target: "./itests/wdpost_no_miner_storage_test.go"
- test:
name: test-itest-wdpost
suite: itest-wdpost

View File

@ -146,18 +146,29 @@ type StorageMiner interface {
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
// SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin
StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin
StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error //perm:admin
// paths.SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin
StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin
StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error //perm:admin
// StorageFindSector returns list of paths where the specified sector files exist.
//
// If allowFetch is set, list of paths to which the sector can be fetched will also be returned.
// - Paths which have sector files locally (don't require fetching) will be listed first.
// - Paths which have sector files locally will not be filtered based on based on AllowTypes/DenyTypes.
// - Paths which require fetching will be filtered based on AllowTypes/DenyTypes. If multiple
// file types are specified, each type will be considered individually, and a union of all paths
// which can accommodate each file type will be returned.
StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) //perm:admin
StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) //perm:admin
StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error //perm:admin
StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) //perm:admin
StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) //perm:admin
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) //perm:admin
// StorageBestAlloc returns list of paths where sector files of the specified type can be allocated, ordered by preference.
// Paths with more weight and more % of free space are preferred.
// Note: This method doesn't filter paths based on AllowTypes/DenyTypes.
StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) //perm:admin
StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error //perm:admin
StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) //perm:admin
StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) //perm:admin
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) //perm:admin
StorageLocal(ctx context.Context) (map[storiface.ID]string, error) //perm:admin
StorageStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) //perm:admin

Binary file not shown.

View File

@ -458,7 +458,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
si := paths.NewIndex()
si := paths.NewIndex(nil)
lstor, err := paths.NewLocal(ctx, lr, si, nil)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/bits"
"os"
"path/filepath"
"sort"
@ -345,6 +346,20 @@ var storageListCmd = &cli.Command{
fmt.Printf("\tAllowTo: %s\n", strings.Join(si.AllowTo, ", "))
}
if len(si.AllowTypes) > 0 || len(si.DenyTypes) > 0 {
denied := storiface.FTAll.SubAllowed(si.AllowTypes, si.DenyTypes)
allowed := storiface.FTAll ^ denied
switch {
case bits.OnesCount64(uint64(allowed)) == 0:
fmt.Printf("\tAllow Types: %s\n", color.RedString("None"))
case bits.OnesCount64(uint64(allowed)) < bits.OnesCount64(uint64(denied)):
fmt.Printf("\tAllow Types: %s\n", color.GreenString(strings.Join(allowed.Strings(), " ")))
default:
fmt.Printf("\tDeny Types: %s\n", color.RedString(strings.Join(denied.Strings(), " ")))
}
}
if localPath, ok := local[s.ID]; ok {
fmt.Printf("\tLocal: %s\n", color.GreenString(localPath))
}

View File

@ -3271,7 +3271,7 @@ Inputs:
Response: `{}`
### StorageAttach
SectorIndex
paths.SectorIndex
Perms: admin
@ -3293,6 +3293,12 @@ Inputs:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
},
{
@ -3328,6 +3334,9 @@ Response:
```
### StorageBestAlloc
StorageBestAlloc returns list of paths where sector files of the specified type can be allocated, ordered by preference.
Paths with more weight and more % of free space are preferred.
Note: This method doesn't filter paths based on AllowTypes/DenyTypes.
Perms: admin
@ -3358,6 +3367,12 @@ Response:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
]
@ -3403,6 +3418,14 @@ Inputs:
Response: `{}`
### StorageFindSector
StorageFindSector returns list of paths where the specified sector files exist.
If allowFetch is set, list of paths to which the sector can be fetched will also be returned.
- Paths which have sector files locally (don't require fetching) will be listed first.
- Paths which have sector files locally will not be filtered based on based on AllowTypes/DenyTypes.
- Paths which require fetching will be filtered based on AllowTypes/DenyTypes. If multiple
file types are specified, each type will be considered individually, and a union of all paths
which can accommodate each file type will be returned.
Perms: admin
@ -3434,7 +3457,13 @@ Response:
"Weight": 42,
"CanSeal": true,
"CanStore": true,
"Primary": true
"Primary": true,
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
]
```
@ -3502,6 +3531,12 @@ Response:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
```

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
@ -184,7 +185,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
var target abi.ChainEpoch
reportSuccessFn := func(success bool, epoch abi.ChainEpoch, err error) {
require.NoError(bm.t, err)
// if api shuts down before mining, we may get an error which we should probably just ignore
// (fixing it will require rewriting most of the mining loop)
if err != nil && !strings.Contains(err.Error(), "websocket connection closed") {
require.NoError(bm.t, err)
}
target = epoch
wait <- success
}

View File

@ -546,7 +546,12 @@ func (n *Ensemble) Start() *Ensemble {
// using real proofs, therefore need real sectors.
if !n.bootstrapped && !n.options.mockProofs {
psd := m.PresealDir
noPaths := m.options.noStorage
err := lr.SetStorage(func(sc *paths.StorageConfig) {
if noPaths {
sc.StoragePaths = []paths.LocalPath{}
}
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: psd})
})
@ -693,6 +698,13 @@ func (n *Ensemble) Start() *Ensemble {
lr, err := r.Lock(repo.Worker)
require.NoError(n.t, err)
if m.options.noStorage {
err := lr.SetStorage(func(sc *paths.StorageConfig) {
sc.StoragePaths = []paths.LocalPath{}
})
require.NoError(n.t, err)
}
ds, err := lr.Datastore(context.Background(), "/metadata")
require.NoError(n.t, err)

View File

@ -167,9 +167,8 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
const metaFile = "sectorstore.json"
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
p, err := ioutil.TempDir("", "lotus-testsectors-")
require.NoError(t, err)
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID {
p := t.TempDir()
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
@ -177,18 +176,20 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64
}
}
_, err = os.Stat(filepath.Join(p, metaFile))
_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
require.NoError(t, err)
}
cfg := &paths.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: weight,
CanSeal: seal,
CanStore: store,
Weight: 10,
CanSeal: false,
CanStore: false,
}
conf(cfg)
if !(cfg.CanStore || cfg.CanSeal) {
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
}
@ -201,4 +202,6 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64
err = tm.StorageAddLocal(ctx, p)
require.NoError(t, err)
return cfg.ID
}

View File

@ -43,6 +43,7 @@ type nodeOpts struct {
minerNoLocalSealing bool // use worker
minerAssigner string
disallowRemoteFinalize bool
noStorage bool
workerTasks []sealtasks.TaskType
workerStorageOpt func(paths.Store) paths.Store
@ -154,6 +155,14 @@ func PresealSectors(sectors int) NodeOpt {
}
}
// NoStorage initializes miners with no writable storage paths (just read-only preseal paths)
func NoStorage() NodeOpt {
return func(opts *nodeOpts) error {
opts.noStorage = true
return nil
}
}
// ThroughRPC makes interactions with this node throughout the test flow through
// the JSON-RPC API.
func ThroughRPC() NodeOpt {
@ -210,6 +219,8 @@ func WithTaskTypes(tt []sealtasks.TaskType) NodeOpt {
}
}
var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
return func(opts *nodeOpts) error {
opts.workerStorageOpt = transform

View File

@ -0,0 +1,199 @@
package itests
import (
"context"
"strings"
"testing"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func TestPathTypeFilters(t *testing.T) {
runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("storageminer", "INFO")
var (
client kit.TestFullNode
miner kit.TestMiner
wiw, wdw kit.TestWorker
)
ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)).
FullNode(&client, kit.ThroughRPC()).
Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()).
Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})).
Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})).
Start()
ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)
asserts(t, ctx, &miner, func() {
dh := kit.NewDealHarness(t, &client, &miner, &miner)
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
})
})
}
runTest(t, "invalid-type-alert", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
slU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed", "seeled"}
})
storlist, err := miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist, 2) // 1 path we've added + preseal
si, err := miner.StorageInfo(ctx, slU)
require.NoError(t, err)
// check that bad entries are filtered
require.Len(t, si.DenyTypes, 0)
require.Len(t, si.AllowTypes, 1)
require.Equal(t, "unsealed", si.AllowTypes[0])
as, err := miner.LogAlerts(ctx)
require.NoError(t, err)
var found bool
for _, a := range as {
if a.Active && a.Type.System == "sector-index" && strings.HasPrefix(a.Type.Subsystem, "pathconf-") {
require.False(t, found)
require.Contains(t, string(a.LastActive.Message), "unknown sector file type 'seeled'")
found = true
}
}
require.True(t, found)
})
runTest(t, "seal-to-stor-unseal-allowdeny", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// allow all types in the sealing path
sealScratch := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanSeal = true
})
// unsealed storage
unsStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"unsealed"}
})
// other storage
sealStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.DenyTypes = []string{"unsealed"}
})
storlist, err := miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist, 4) // 3 paths we've added + preseal
run()
storlist, err = miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist[sealScratch], 0)
require.Len(t, storlist[unsStor], 1)
require.Len(t, storlist[sealStor], 1)
require.Equal(t, storiface.FTUnsealed, storlist[unsStor][0].SectorFileType)
require.Equal(t, storiface.FTSealed|storiface.FTCache, storlist[sealStor][0].SectorFileType)
})
runTest(t, "sealstor-unseal-allowdeny", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// unsealed storage
unsStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed"}
})
// other storage
sealStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.CanSeal = true
meta.DenyTypes = []string{"unsealed"}
})
storlist, err := miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist, 3) // 2 paths we've added + preseal
run()
storlist, err = miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist[unsStor], 1)
require.Len(t, storlist[sealStor], 1)
require.Equal(t, storiface.FTUnsealed, storlist[unsStor][0].SectorFileType)
require.Equal(t, storiface.FTSealed|storiface.FTCache, storlist[sealStor][0].SectorFileType)
})
runTest(t, "seal-store-allseparate", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// sealing stores
slU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed"}
})
slS := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"sealed"}
})
slC := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"cache"}
})
// storage stores
stU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"unsealed"}
})
stS := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"sealed"}
})
stC := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"cache"}
})
storlist, err := miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist, 7) // 6 paths we've added + preseal
run()
storlist, err = miner.StorageList(ctx)
require.NoError(t, err)
require.Len(t, storlist[slU], 0)
require.Len(t, storlist[slS], 0)
require.Len(t, storlist[slC], 0)
require.Len(t, storlist[stU], 1)
require.Len(t, storlist[stS], 1)
require.Len(t, storlist[stC], 1)
require.Equal(t, storiface.FTUnsealed, storlist[stU][0].SectorFileType)
require.Equal(t, storiface.FTSealed, storlist[stS][0].SectorFileType)
require.Equal(t, storiface.FTCache, storlist[stC][0].SectorFileType)
})
}

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
@ -35,8 +36,14 @@ func TestDealsWithFinalizeEarly(t *testing.T) {
ctx := context.Background()
miner.AddStorage(ctx, t, 1000000000, true, false)
miner.AddStorage(ctx, t, 1000000000, false, true)
miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.Weight = 1000000000
meta.CanSeal = true
})
miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
meta.Weight = 1000000000
meta.CanStore = true
})
//stm: @STORAGE_LIST_001
sl, err := miner.StorageList(ctx)

View File

@ -0,0 +1,66 @@
package itests
import (
"context"
"testing"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
)
func TestWindowPostNoMinerStorage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("storageminer", "INFO")
sealSectors := 2
presealSectors := 2*48*2 - sealSectors
sectors := presealSectors + sealSectors
var (
client kit.TestFullNode
miner kit.TestMiner
wiw, wdw, sealw kit.TestWorker
)
ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)).
FullNode(&client, kit.ThroughRPC()).
Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(presealSectors), kit.NoStorage()).
Worker(&miner, &wiw, kit.ThroughRPC(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})).
Worker(&miner, &wdw, kit.ThroughRPC(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})).
Worker(&miner, &sealw, kit.ThroughRPC(), kit.WithSealWorkerTasks).
Start()
ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)
miner.PledgeSectors(ctx, sealSectors, 0, nil)
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
di = di.NextNotElapsed()
// wait for new sectors to become active
waitUntil := di.Close + di.WPoStChallengeWindow*2 + di.WPoStProvingPeriod
t.Logf("Wait Height > %d", waitUntil)
ts := client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Logf("Now Height = %d", ts.Height())
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
ssz, err := miner.ActorSectorSize(ctx, maddr)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*uint64(sectors)))
}

View File

@ -29,7 +29,7 @@ import (
func TestWorkerPledge(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
kit.WithSealWorkerTasks) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
@ -43,7 +43,7 @@ func TestWorkerPledge(t *testing.T) {
func TestWorkerPledgeSpread(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}),
kit.WithSealWorkerTasks,
kit.WithAssigner("spread"),
) // no mock proofs
@ -59,7 +59,7 @@ func TestWorkerPledgeSpread(t *testing.T) {
func TestWorkerPledgeLocalFin(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}),
kit.WithSealWorkerTasks,
kit.WithDisallowRemoteFinalize(true),
) // no mock proofs

View File

@ -160,3 +160,10 @@ func (a *Alerting) GetAlerts() []Alert {
return out
}
func (a *Alerting) IsRaised(at AlertType) bool {
a.lk.Lock()
defer a.lk.Unlock()
return a.alerts[at].Active
}

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -63,15 +64,23 @@ type Index struct {
*indexLocks
lk sync.RWMutex
// optional
alerting *alerting.Alerting
pathAlerts map[storiface.ID]alerting.AlertType
sectors map[storiface.Decl][]*declMeta
stores map[storiface.ID]*storageEntry
}
func NewIndex() *Index {
func NewIndex(al *alerting.Alerting) *Index {
return &Index{
indexLocks: &indexLocks{
locks: map[abi.SectorID]*sectorLock{},
},
alerting: al,
pathAlerts: map[storiface.ID]alerting.AlertType{},
sectors: map[storiface.Decl][]*declMeta{},
stores: map[storiface.ID]*storageEntry{},
}
@ -107,6 +116,64 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D
}
func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))
if _, hasAlert := i.pathAlerts[si.ID]; i.alerting != nil && !hasAlert {
i.pathAlerts[si.ID] = i.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID))
}
var hasConfigIsses bool
for id, typ := range si.AllowTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
// No need to hard-fail here, just warn the user
// (note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
hasConfigIsses = true
if i.alerting != nil {
i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{
"message": "bad path type in AllowTypes",
"path": string(si.ID),
"idx": id,
"path_type": typ,
"error": err.Error(),
})
}
continue
}
allow = append(allow, typ)
}
for id, typ := range si.DenyTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
// No need to hard-fail here, just warn the user
hasConfigIsses = true
if i.alerting != nil {
i.alerting.Raise(i.pathAlerts[si.ID], map[string]interface{}{
"message": "bad path type in DenyTypes",
"path": string(si.ID),
"idx": id,
"path_type": typ,
"error": err.Error(),
})
}
continue
}
deny = append(deny, typ)
}
si.AllowTypes = allow
si.DenyTypes = deny
if i.alerting != nil && !hasConfigIsses && i.alerting.IsRaised(i.pathAlerts[si.ID]) {
i.alerting.Resolve(i.pathAlerts[si.ID], map[string]string{
"message": "path config is now correct",
})
}
i.lk.Lock()
defer i.lk.Unlock()
@ -136,6 +203,8 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
i.stores[si.ID].info.CanStore = si.CanStore
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo
i.stores[si.ID].info.AllowTypes = allow
i.stores[si.ID].info.DenyTypes = deny
return nil
}
@ -312,6 +381,9 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: isprimary[id],
AllowTypes: st.info.AllowTypes,
DenyTypes: st.info.DenyTypes,
})
}
@ -345,6 +417,11 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
continue
}
if !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) {
log.Debugf("not selecting on %s, not allowed by file type filters", st.info.ID)
continue
}
if allowTo != nil {
allow := false
for _, group := range st.info.Groups {
@ -383,6 +460,9 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: false,
AllowTypes: st.info.AllowTypes,
DenyTypes: st.info.DenyTypes,
})
}
}

View File

@ -42,7 +42,7 @@ const s32g = 32 << 30
func TestFindSimple(t *testing.T) {
ctx := context.Background()
i := NewIndex()
i := NewIndex(nil)
stor1 := newTestStorage()
stor2 := newTestStorage()
@ -79,7 +79,7 @@ func TestFindSimple(t *testing.T) {
func TestFindNoAllow(t *testing.T) {
ctx := context.Background()
i := NewIndex()
i := NewIndex(nil)
stor1 := newTestStorage()
stor1.AllowTo = []storiface.Group{"grp1"}
stor2 := newTestStorage()
@ -111,7 +111,7 @@ func TestFindNoAllow(t *testing.T) {
func TestFindAllow(t *testing.T) {
ctx := context.Background()
i := NewIndex()
i := NewIndex(nil)
stor1 := newTestStorage()
stor1.AllowTo = []storiface.Group{"grp1"}

View File

@ -44,6 +44,30 @@ type LocalStorageMeta struct {
// List of storage groups to which data from this path can be moved. If none
// are specified, allow to all
AllowTo []string
// AllowTypes lists sector file types which are allowed to be put into this
// path. If empty, all file types are allowed.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
AllowTypes []string
// DenyTypes lists sector file types which aren't allowed to be put into this
// path.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
DenyTypes []string
}
// StorageConfig .lotusstorage/storage.json
@ -218,6 +242,8 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
CanStore: meta.CanStore,
Groups: meta.Groups,
AllowTo: meta.AllowTo,
AllowTypes: meta.AllowTypes,
DenyTypes: meta.DenyTypes,
}, fst)
if err != nil {
return xerrors.Errorf("declaring storage in index: %w", err)
@ -284,6 +310,8 @@ func (st *Local) Redeclare(ctx context.Context) error {
CanStore: meta.CanStore,
Groups: meta.Groups,
AllowTo: meta.AllowTo,
AllowTypes: meta.AllowTypes,
DenyTypes: meta.DenyTypes,
}, fst)
if err != nil {
return xerrors.Errorf("redeclaring storage in index: %w", err)
@ -506,6 +534,10 @@ func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, exi
continue
}
if !fileType.Allowed(si.AllowTypes, si.DenyTypes) {
continue
}
// TODO: Check free space
best = p.sectorPath(sid.ID, fileType)

View File

@ -81,7 +81,7 @@ func TestLocalStorage(t *testing.T) {
root: root,
}
index := NewIndex()
index := NewIndex(nil)
st, err := NewLocal(ctx, tstor, index, nil)
require.NoError(t, err)

View File

@ -140,20 +140,21 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
}
}
apaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
// get a list of paths to fetch data into. Note: file type filters will apply inside this call.
fetchPaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
odt := storiface.FSOverheadSeal
overheadTable := storiface.FSOverheadSeal
if pathType == storiface.PathStorage {
odt = storiface.FsOverheadFinalized
overheadTable = storiface.FsOverheadFinalized
}
// If any path types weren't found in local storage, try fetching them
// First reserve storage
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, overheadTable)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
}
@ -168,7 +169,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
continue
}
dest := storiface.PathByType(apaths, fileType)
dest := storiface.PathByType(fetchPaths, fileType)
storageID := storiface.PathByType(ids, fileType)
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)

View File

@ -60,7 +60,7 @@ func createTestStorage(t *testing.T, p string, seal bool, att ...*paths.Local) s
func TestMoveShared(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
index := paths.NewIndex()
index := paths.NewIndex(nil)
ctx := context.Background()

View File

@ -100,7 +100,7 @@ var _ paths.LocalStorage = &testStorage{}
func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.Index, func()) {
st := newTestStorage(t)
si := paths.NewIndex()
si := paths.NewIndex(nil)
lstor, err := paths.NewLocal(ctx, st, si, nil)
require.NoError(t, err)

View File

@ -206,7 +206,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType
require.NoError(t, err)
// create index, storage, local store & remote store.
index := paths.NewIndex()
index := paths.NewIndex(nil)
storage := newTestStorage(t)
localStore, err := paths.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
require.NoError(t, err)

View File

@ -226,7 +226,7 @@ func TestSchedStartStop(t *testing.T) {
require.NoError(t, err)
go sched.runSched()
addTestWorker(t, sched, paths.NewIndex(), "fred", nil, decentWorkerResources, false)
addTestWorker(t, sched, paths.NewIndex(nil), "fred", nil, decentWorkerResources, false)
require.NoError(t, sched.Close(context.TODO()))
}
@ -350,7 +350,7 @@ func TestSched(t *testing.T) {
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
return func(t *testing.T) {
index := paths.NewIndex()
index := paths.NewIndex(nil)
sched, err := newScheduler("")
require.NoError(t, err)

View File

@ -55,13 +55,20 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}
requested := s.alloc
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, false, nil
requested = requested.SubAllowed(info.AllowTypes, info.DenyTypes)
// got all paths
if requested == storiface.FTNone {
break
}
}
}
return false, false, nil
return requested == storiface.FTNone, false, nil
}
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {

View File

@ -15,7 +15,7 @@ import (
type existingSelector struct {
index paths.SectorIndex
sector abi.SectorID
alloc storiface.SectorFileType
fileType storiface.SectorFileType
allowFetch bool
}
@ -23,7 +23,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto
return &existingSelector{
index: index,
sector: sector,
alloc: alloc,
fileType: alloc,
allowFetch: allowFetch,
}
}
@ -52,18 +52,30 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, xerrors.Errorf("getting sector size: %w", err)
}
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
best, err := s.index.StorageFindSector(ctx, s.sector, s.fileType, ssize, s.allowFetch)
if err != nil {
return false, false, xerrors.Errorf("finding best storage: %w", err)
}
requested := s.fileType
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, false, nil
// we're not putting new sector files anywhere
if !s.allowFetch {
return true, false, nil
}
requested = requested.SubAllowed(info.AllowTypes, info.DenyTypes)
// got all paths
if requested == storiface.FTNone {
break
}
}
}
return false, false, nil
return requested == storiface.FTNone, false, nil
}
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {

View File

@ -72,7 +72,8 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}
var ok bool
var ok, pref bool
requested := s.alloc
for _, info := range best {
if n, has := workerPaths[info.ID]; has {
@ -83,12 +84,19 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
return true, true, nil
pref = true
}
requested = requested.SubAllowed(info.AllowTypes, info.DenyTypes)
// got all paths
if requested == storiface.FTNone {
break
}
}
}
return ok && s.allowRemote, false, nil
return (ok && s.allowRemote) || pref, pref, nil
}
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {

View File

@ -24,6 +24,13 @@ const (
FTNone SectorFileType = 0
)
var FTAll = func() (out SectorFileType) {
for _, pathType := range PathTypes {
out |= pathType
}
return out
}()
const FSOverheadDen = 10
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
@ -46,6 +53,23 @@ var FsOverheadFinalized = map[SectorFileType]int{
type SectorFileType int
func TypeFromString(s string) (SectorFileType, error) {
switch s {
case "unsealed":
return FTUnsealed, nil
case "sealed":
return FTSealed, nil
case "cache":
return FTCache, nil
case "update":
return FTUpdate, nil
case "update-cache":
return FTUpdateCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type '%s'", s)
}
}
func (t SectorFileType) String() string {
switch t {
case FTUnsealed:
@ -63,6 +87,18 @@ func (t SectorFileType) String() string {
}
}
func (t SectorFileType) Strings() []string {
var out []string
for _, fileType := range PathTypes {
if fileType&t == 0 {
continue
}
out = append(out, fileType.String())
}
return out
}
func (t SectorFileType) Has(singleType SectorFileType) bool {
return t&singleType == singleType
}
@ -85,6 +121,43 @@ func (t SectorFileType) SealSpaceUse(ssize abi.SectorSize) (uint64, error) {
return need, nil
}
func (t SectorFileType) SubAllowed(allowTypes []string, denyTypes []string) SectorFileType {
var denyMask SectorFileType // 1s deny
if len(allowTypes) > 0 {
denyMask = ^denyMask
for _, allowType := range allowTypes {
pt, err := TypeFromString(allowType)
if err != nil {
// we've told the user about this already, don't spam logs and ignore
continue
}
denyMask = denyMask & (^pt) // unset allowed types
}
}
for _, denyType := range denyTypes {
pt, err := TypeFromString(denyType)
if err != nil {
// we've told the user about this already, don't spam logs and ignore
continue
}
denyMask |= pt
}
return t & denyMask
}
func (t SectorFileType) AnyAllowed(allowTypes []string, denyTypes []string) bool {
return t.SubAllowed(allowTypes, denyTypes) != t
}
func (t SectorFileType) Allowed(allowTypes []string, denyTypes []string) bool {
return t.SubAllowed(allowTypes, denyTypes) == 0
}
func (t SectorFileType) StoreSpaceUse(ssize abi.SectorSize) (uint64, error) {
var need uint64
for _, pathType := range PathTypes {

View File

@ -0,0 +1,39 @@
package storiface
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestFileTypeAllow(t *testing.T) {
// no filters = allow all
require.True(t, FTCache.Allowed(nil, nil))
// allow allows matching type
require.True(t, FTCache.Allowed((FTCache).Strings(), nil))
// deny denies matching type
require.False(t, FTCache.Allowed(nil, (FTCache).Strings()))
// deny has precedence over allow
require.False(t, FTCache.Allowed((FTCache).Strings(), (FTCache).Strings()))
// deny allows non-matching types
require.True(t, FTUnsealed.Allowed(nil, (FTCache).Strings()))
// allow denies non-matching types
require.False(t, FTUnsealed.Allowed((FTCache).Strings(), nil))
}
func TestFileTypeAnyAllow(t *testing.T) {
// no filters = allow all
require.True(t, FTCache.AnyAllowed(nil, nil))
// one denied
require.False(t, FTCache.AnyAllowed(nil, (FTCache).Strings()))
require.True(t, FTCache.AnyAllowed(nil, (FTUnsealed).Strings()))
// one denied, one allowed = allowed
require.True(t, (FTCache|FTUpdateCache).AnyAllowed(nil, (FTCache).Strings()))
}

View File

@ -36,16 +36,55 @@ func ParseIDList(s string) IDList {
type Group = string
type StorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Weight uint64
// ID is the UUID of the storage path
ID ID
// URLs for remote access
URLs []string // TODO: Support non-http transports
// Storage path weight; higher number means that the path will be preferred more often
Weight uint64
// MaxStorage is the number of bytes allowed to be used by files in the
// storage path
MaxStorage uint64
CanSeal bool
// CanStore is true when the path is allowed to be used for io-intensive
// sealing operations
CanSeal bool
// CanStore is true when the path is allowed to be used for long-term storage
CanStore bool
Groups []Group
// Groups is the list of path groups this path belongs to
Groups []Group
// AllowTo is the list of paths to which data from this path can be moved to
AllowTo []Group
// AllowTypes lists sector file types which are allowed to be put into this
// path. If empty, all file types are allowed.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
AllowTypes []string
// DenyTypes lists sector file types which aren't allowed to be put into this
// path.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
DenyTypes []string
}
type HealthReport struct {
@ -63,6 +102,9 @@ type SectorStorageInfo struct {
CanStore bool
Primary bool
AllowTypes []string
DenyTypes []string
}
type Decl struct {