stores: use heartbeat info in selecting alloc storage
This commit is contained in:
parent
fc637552b7
commit
e721b8910c
@ -58,7 +58,7 @@ type localWorkerPathProvider struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
|
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
|
||||||
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
|
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stores.SectorPaths{}, nil, err
|
return stores.SectorPaths{}, nil, err
|
||||||
}
|
}
|
||||||
@ -163,7 +163,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
|
|||||||
return xerrors.Errorf("removing unsealed data: %w", err)
|
return xerrors.Errorf("removing unsealed data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := l.storage.MoveStorage(ctx, sector, stores.FTSealed|stores.FTCache); err != nil {
|
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil {
|
||||||
return xerrors.Errorf("moving sealed data to storage: %w", err)
|
return xerrors.Errorf("moving sealed data to storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
resources.go
13
resources.go
@ -4,21 +4,8 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/sector-storage/stores"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads
|
|
||||||
stores.FTUnsealed: 10,
|
|
||||||
stores.FTSealed: 10,
|
|
||||||
stores.FTCache: 70, // TODO: confirm for 32G
|
|
||||||
}
|
|
||||||
|
|
||||||
var FsOverheadFinalized = map[stores.SectorFileType]int{
|
|
||||||
stores.FTUnsealed: 10,
|
|
||||||
stores.FTSealed: 10,
|
|
||||||
stores.FTCache: 2,
|
|
||||||
}
|
|
||||||
|
|
||||||
type Resources struct {
|
type Resources struct {
|
||||||
MinMemory uint64 // What Must be in RAM for decent perf
|
MinMemory uint64 // What Must be in RAM for decent perf
|
||||||
MaxMemory uint64 // Memory required (swap + ram)
|
MaxMemory uint64 // Memory required (swap + ram)
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
type readonlyProvider struct {
|
type readonlyProvider struct {
|
||||||
stor *stores.Local
|
stor *stores.Local
|
||||||
|
spt abi.RegisteredProof
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
|
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
|
||||||
@ -19,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
|
|||||||
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
|
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing)
|
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing)
|
||||||
|
|
||||||
return p, done, err
|
return p, done, err
|
||||||
}
|
}
|
||||||
|
6
sched.go
6
sched.go
@ -20,7 +20,7 @@ const mib = 1 << 20
|
|||||||
type WorkerAction func(ctx context.Context, w Worker) error
|
type WorkerAction func(ctx context.Context, w Worker) error
|
||||||
|
|
||||||
type WorkerSelector interface {
|
type WorkerSelector interface {
|
||||||
Ok(ctx context.Context, task sealtasks.TaskType, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
|
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
|
||||||
|
|
||||||
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
|
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
|
||||||
}
|
}
|
||||||
@ -178,7 +178,7 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) {
|
|||||||
for i := 0; i < sh.schedQueue.Len(); i++ {
|
for i := 0; i < sh.schedQueue.Len(); i++ {
|
||||||
req := (*sh.schedQueue)[i]
|
req := (*sh.schedQueue)[i]
|
||||||
|
|
||||||
ok, err := req.sel.Ok(req.ctx, req.taskType, w)
|
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
|
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
|
||||||
continue
|
continue
|
||||||
@ -212,7 +212,7 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
|
|||||||
needRes := ResourceTable[req.taskType][sh.spt]
|
needRes := ResourceTable[req.taskType][sh.spt]
|
||||||
|
|
||||||
for wid, worker := range sh.workers {
|
for wid, worker := range sh.workers {
|
||||||
ok, err := req.sel.Ok(req.ctx, req.taskType, worker)
|
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -5,26 +5,25 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/sector-storage/stores"
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
)
|
)
|
||||||
|
|
||||||
type allocSelector struct {
|
type allocSelector struct {
|
||||||
best []stores.StorageInfo
|
index stores.SectorIndex
|
||||||
|
alloc stores.SectorFileType
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) {
|
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) {
|
||||||
best, err := index.StorageBestAlloc(ctx, alloc, true)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &allocSelector{
|
return &allocSelector{
|
||||||
best: best,
|
index: index,
|
||||||
|
alloc: alloc,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
|
||||||
tasks, err := whnd.w.TaskTypes(ctx)
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
@ -43,7 +42,12 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *w
|
|||||||
have[path.ID] = struct{}{}
|
have[path.ID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range s.best {
|
best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("finding best alloc storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range best {
|
||||||
if _, ok := have[info.ID]; ok {
|
if _, ok := have[info.ID]; ok {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -5,9 +5,10 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/sector-storage/stores"
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type existingSelector struct {
|
type existingSelector struct {
|
||||||
@ -25,7 +26,7 @@ func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector a
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
|
||||||
tasks, err := whnd.w.TaskTypes(ctx)
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
@ -5,6 +5,8 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/sector-storage/stores"
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
)
|
)
|
||||||
@ -17,7 +19,7 @@ func newTaskSelector() *taskSelector {
|
|||||||
return &taskSelector{}
|
return &taskSelector{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
|
||||||
tasks, err := whnd.w.TaskTypes(ctx)
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
@ -17,6 +17,18 @@ const (
|
|||||||
FTNone SectorFileType = 0
|
FTNone SectorFileType = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
|
||||||
|
FTUnsealed: 10,
|
||||||
|
FTSealed: 10,
|
||||||
|
FTCache: 70, // TODO: confirm for 32G
|
||||||
|
}
|
||||||
|
|
||||||
|
var FsOverheadFinalized = map[SectorFileType]int{
|
||||||
|
FTUnsealed: 10,
|
||||||
|
FTSealed: 10,
|
||||||
|
FTCache: 2,
|
||||||
|
}
|
||||||
|
|
||||||
type SectorFileType int
|
type SectorFileType int
|
||||||
|
|
||||||
func (t SectorFileType) String() string {
|
func (t SectorFileType) String() string {
|
||||||
@ -36,6 +48,29 @@ func (t SectorFileType) Has(singleType SectorFileType) bool {
|
|||||||
return t&singleType == singleType
|
return t&singleType == singleType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) {
|
||||||
|
ssize, err := spt.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("getting sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var need uint64
|
||||||
|
for _, pathType := range PathTypes {
|
||||||
|
if !t.Has(pathType) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
oh, ok := FSOverheadSeal[pathType]
|
||||||
|
if !ok {
|
||||||
|
return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
|
||||||
|
}
|
||||||
|
|
||||||
|
need += uint64(oh) * uint64(ssize) / 10
|
||||||
|
}
|
||||||
|
|
||||||
|
return need, nil
|
||||||
|
}
|
||||||
|
|
||||||
type SectorPaths struct {
|
type SectorPaths struct {
|
||||||
Id abi.SectorID
|
Id abi.SectorID
|
||||||
|
|
||||||
|
@ -68,7 +68,9 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
|||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, FTNone, false)
|
|
||||||
|
// passing 0 spt because we don't allocate anything
|
||||||
|
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("%+v", err)
|
log.Error("%+v", err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
|
@ -14,8 +14,8 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
)
|
)
|
||||||
|
|
||||||
var HeartBeatInterval = 10 * time.Second
|
var HeartbeatInterval = 10 * time.Second
|
||||||
var SkippedHeartbeatThresh = HeartBeatInterval * 5
|
var SkippedHeartbeatThresh = HeartbeatInterval * 5
|
||||||
|
|
||||||
// ID identifies sector storage by UUID. One sector storage should map to one
|
// ID identifies sector storage by UUID. One sector storage should map to one
|
||||||
// filesystem, local or networked / shared by multiple machines
|
// filesystem, local or networked / shared by multiple machines
|
||||||
@ -47,7 +47,7 @@ type SectorIndex interface { // part of storage-miner api
|
|||||||
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
||||||
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error)
|
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error)
|
||||||
|
|
||||||
StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error)
|
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Decl struct {
|
type Decl struct {
|
||||||
@ -302,12 +302,17 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
|
|||||||
return *si.info, nil
|
return *si.info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) {
|
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) {
|
||||||
i.lk.RLock()
|
i.lk.RLock()
|
||||||
defer i.lk.RUnlock()
|
defer i.lk.RUnlock()
|
||||||
|
|
||||||
var candidates []storageEntry
|
var candidates []storageEntry
|
||||||
|
|
||||||
|
spaceReq, err := allocate.SealSpaceUse(spt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, p := range i.stores {
|
for _, p := range i.stores {
|
||||||
if sealing && !p.info.CanSeal {
|
if sealing && !p.info.CanSeal {
|
||||||
continue
|
continue
|
||||||
@ -316,7 +321,20 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: filter out of space
|
if spaceReq > p.fsi.Available {
|
||||||
|
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Since(p.lastHeartbeat) > SkippedHeartbeatThresh {
|
||||||
|
log.Debugf("not allocating on %s, didn't receive heartbeats for %s", p.info.ID, time.Since(p.lastHeartbeat))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.heartbeatErr != nil {
|
||||||
|
log.Debugf("not allocating on %s, heartbeat error: %s", p.info.ID, p.heartbeatErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
candidates = append(candidates, *p)
|
candidates = append(candidates, *p)
|
||||||
}
|
}
|
||||||
|
@ -10,11 +10,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Store interface {
|
type Store interface {
|
||||||
AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
|
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
|
||||||
Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error
|
Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error
|
||||||
|
|
||||||
// move sectors into storage
|
// move sectors into storage
|
||||||
MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error
|
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error
|
||||||
|
|
||||||
FsStat(ctx context.Context, id ID) (FsStat, error)
|
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ func (st *Local) open(ctx context.Context) error {
|
|||||||
|
|
||||||
func (st *Local) reportHealth(ctx context.Context) {
|
func (st *Local) reportHealth(ctx context.Context) {
|
||||||
// randomize interval by ~10%
|
// randomize interval by ~10%
|
||||||
interval := (HeartBeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000
|
interval := (HeartbeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -195,7 +195,7 @@ func (st *Local) reportHealth(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
@ -240,7 +240,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing S
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing)
|
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
st.localLk.RUnlock()
|
st.localLk.RUnlock()
|
||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
|
||||||
@ -352,14 +352,14 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
|
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
|
||||||
dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false)
|
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire dest storage: %w", err)
|
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||||
}
|
}
|
||||||
defer sdone()
|
defer sdone()
|
||||||
|
|
||||||
src, srcIds, ddone, err := st.AcquireSector(ctx, s, types, FTNone, false)
|
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire src storage: %w", err)
|
return xerrors.Errorf("acquire src storage: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
@ -73,7 +73,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
|
|||||||
r.fetchLk.Unlock()
|
r.fetchLk.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
|
||||||
}
|
}
|
||||||
@ -87,7 +87,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
done()
|
done()
|
||||||
return SectorPaths{}, SectorPaths{}, nil, err
|
return SectorPaths{}, SectorPaths{}, nil, err
|
||||||
@ -111,7 +111,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
|
|||||||
return paths, stores, done, nil
|
return paths, stores, done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
|
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
|
||||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", "", nil, err
|
return "", "", "", nil, err
|
||||||
@ -125,7 +125,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return si[i].Weight < si[j].Weight
|
return si[i].Weight < si[j].Weight
|
||||||
})
|
})
|
||||||
|
|
||||||
apaths, ids, done, err := r.local.AcquireSector(ctx, s, FTNone, fileType, sealing)
|
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||||
}
|
}
|
||||||
@ -203,15 +203,15 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
|
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
|
||||||
// Make sure we have the data local
|
// Make sure we have the data local
|
||||||
_, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false)
|
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire src storage (remote): %w", err)
|
return xerrors.Errorf("acquire src storage (remote): %w", err)
|
||||||
}
|
}
|
||||||
ddone()
|
ddone()
|
||||||
|
|
||||||
return r.local.MoveStorage(ctx, s, types)
|
return r.local.MoveStorage(ctx, s, spt, types)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
|
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user