feat: curio: Storage reservations in MoveStorage
This commit is contained in:
parent
466966c72d
commit
2ccc0db1cc
@ -489,7 +489,7 @@ afterUnsealedMove:
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef) error {
|
||||
func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef, taskID *harmonytask.TaskID) error {
|
||||
// only move the unsealed file if it still exists and needs moving
|
||||
moveUnsealed := storiface.FTUnsealed
|
||||
{
|
||||
@ -505,7 +505,24 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
|
||||
|
||||
toMove := storiface.FTCache | storiface.FTSealed | moveUnsealed
|
||||
|
||||
err := sb.sectors.storage.MoveStorage(ctx, sector, toMove)
|
||||
var opts []storiface.AcquireOption
|
||||
if taskID != nil {
|
||||
resv, ok := sb.sectors.storageReservations.Load(*taskID)
|
||||
if ok {
|
||||
defer resv.Release()
|
||||
|
||||
if resv.Alloc != storiface.FTNone {
|
||||
return xerrors.Errorf("task %d has storage reservation with alloc", taskID)
|
||||
}
|
||||
if resv.Existing != toMove|storiface.FTUnsealed {
|
||||
return xerrors.Errorf("task %d has storage reservation with different existing", taskID)
|
||||
}
|
||||
|
||||
opts = append(opts, storiface.AcquireInto(storiface.PathsWithIDs{Paths: resv.Paths, IDs: resv.PathIDs}))
|
||||
}
|
||||
}
|
||||
|
||||
err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("moving storage: %w", err)
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
|
||||
ProofType: abi.RegisteredSealProof(task.RegSealProof),
|
||||
}
|
||||
|
||||
err = m.sc.MoveStorage(ctx, sector)
|
||||
err = m.sc.MoveStorage(ctx, sector, &taskID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("moving storage: %w", err)
|
||||
}
|
||||
@ -136,18 +136,39 @@ func (m *MoveStorageTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas
|
||||
}
|
||||
|
||||
func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size
|
||||
if isDevnet {
|
||||
ssize = abi.SectorSize(2 << 20)
|
||||
}
|
||||
|
||||
return harmonytask.TaskTypeDetails{
|
||||
Max: m.max,
|
||||
Name: "MoveStorage",
|
||||
Cost: resources.Resources{
|
||||
Cpu: 1,
|
||||
Gpu: 0,
|
||||
Ram: 128 << 20,
|
||||
Cpu: 1,
|
||||
Gpu: 0,
|
||||
Ram: 128 << 20,
|
||||
Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage),
|
||||
},
|
||||
MaxFailures: 10,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MoveStorageTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) {
|
||||
var refs []ffi.SectorRef
|
||||
|
||||
err := m.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_move_storage = $1`, id)
|
||||
if err != nil {
|
||||
return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err)
|
||||
}
|
||||
|
||||
if len(refs) != 1 {
|
||||
return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs))
|
||||
}
|
||||
|
||||
return refs[0], nil
|
||||
}
|
||||
|
||||
func (m *MoveStorageTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||
m.sp.pollers[pollerMoveStorage].Set(taskFunc)
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ type Store interface {
|
||||
RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error
|
||||
|
||||
// move sectors into storage
|
||||
MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error
|
||||
MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error
|
||||
|
||||
FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error)
|
||||
|
||||
|
@ -716,22 +716,36 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error {
|
||||
dest, destIds, err := st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||
func (st *Local) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error {
|
||||
settings := storiface.AcquireSettings{
|
||||
// If into is nil then we're expecting the data to be there already, but make sure here
|
||||
Into: nil,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&settings)
|
||||
}
|
||||
|
||||
var err error
|
||||
var dest, destIds storiface.SectorPaths
|
||||
if settings.Into == nil {
|
||||
dest, destIds, err = st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||
}
|
||||
} else {
|
||||
// destination from settings
|
||||
dest = settings.Into.Paths
|
||||
destIds = settings.Into.IDs
|
||||
}
|
||||
|
||||
// note: this calls allocate on types - if data is already in paths of correct type,
|
||||
// the returned paths are guaranteed to be the same as dest
|
||||
src, srcIds, err := st.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire src storage: %w", err)
|
||||
}
|
||||
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&types == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, fileType := range types.AllSet() {
|
||||
sst, err := st.index.StorageInfo(ctx, storiface.ID(storiface.PathByType(srcIds, fileType)))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get source storage info: %w", err)
|
||||
|
@ -107,17 +107,22 @@ func (mr *MockStoreMockRecorder) GenerateSingleVanillaProof(arg0, arg1, arg2, ar
|
||||
}
|
||||
|
||||
// MoveStorage mocks base method.
|
||||
func (m *MockStore) MoveStorage(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType) error {
|
||||
func (m *MockStore) MoveStorage(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 ...storiface.AcquireOption) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "MoveStorage", arg0, arg1, arg2)
|
||||
varargs := []interface{}{arg0, arg1, arg2}
|
||||
for _, a := range arg3 {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "MoveStorage", varargs...)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// MoveStorage indicates an expected call of MoveStorage.
|
||||
func (mr *MockStoreMockRecorder) MoveStorage(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
func (mr *MockStoreMockRecorder) MoveStorage(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), arg0, arg1, arg2)
|
||||
varargs := append([]interface{}{arg0, arg1, arg2}, arg3...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), varargs...)
|
||||
}
|
||||
|
||||
// Remove mocks base method.
|
||||
|
@ -325,14 +325,14 @@ func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.Registe
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error {
|
||||
func (r *Remote) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error {
|
||||
// Make sure we have the data local
|
||||
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
|
||||
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove, opts...)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire src storage (remote): %w", err)
|
||||
}
|
||||
|
||||
return r.local.MoveStorage(ctx, s, types)
|
||||
return r.local.MoveStorage(ctx, s, types, opts...)
|
||||
}
|
||||
|
||||
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool, keepIn []storiface.ID) error {
|
||||
|
Loading…
Reference in New Issue
Block a user