diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 0f78ca6cd..583a536fe 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -489,7 +489,7 @@ afterUnsealedMove: return nil } -func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef) error { +func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef, taskID *harmonytask.TaskID) error { // only move the unsealed file if it still exists and needs moving moveUnsealed := storiface.FTUnsealed { @@ -505,7 +505,24 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef toMove := storiface.FTCache | storiface.FTSealed | moveUnsealed - err := sb.sectors.storage.MoveStorage(ctx, sector, toMove) + var opts []storiface.AcquireOption + if taskID != nil { + resv, ok := sb.sectors.storageReservations.Load(*taskID) + if ok { + defer resv.Release() + + if resv.Alloc != storiface.FTNone { + return xerrors.Errorf("task %d has storage reservation with alloc", taskID) + } + if resv.Existing != toMove|storiface.FTUnsealed { + return xerrors.Errorf("task %d has storage reservation with different existing", taskID) + } + + opts = append(opts, storiface.AcquireInto(storiface.PathsWithIDs{Paths: resv.Paths, IDs: resv.PathIDs})) + } + } + + err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...) if err != nil { return xerrors.Errorf("moving storage: %w", err) } diff --git a/curiosrc/seal/task_movestorage.go b/curiosrc/seal/task_movestorage.go index 9092ec829..6037a390d 100644 --- a/curiosrc/seal/task_movestorage.go +++ b/curiosrc/seal/task_movestorage.go @@ -58,7 +58,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) ProofType: abi.RegisteredSealProof(task.RegSealProof), } - err = m.sc.MoveStorage(ctx, sector) + err = m.sc.MoveStorage(ctx, sector, &taskID) if err != nil { return false, xerrors.Errorf("moving storage: %w", err) } @@ -136,18 +136,39 @@ func (m *MoveStorageTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas } func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + return harmonytask.TaskTypeDetails{ Max: m.max, Name: "MoveStorage", Cost: resources.Resources{ - Cpu: 1, - Gpu: 0, - Ram: 128 << 20, + Cpu: 1, + Gpu: 0, + Ram: 128 << 20, + Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage), }, MaxFailures: 10, } } +func (m *MoveStorageTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { + var refs []ffi.SectorRef + + err := m.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_move_storage = $1`, id) + if err != nil { + return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + func (m *MoveStorageTask) Adder(taskFunc harmonytask.AddTaskFunc) { m.sp.pollers[pollerMoveStorage].Set(taskFunc) } diff --git a/storage/paths/interface.go b/storage/paths/interface.go index 27d6ee541..d3dce8886 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -43,7 +43,7 @@ type Store interface { RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error // move sectors into storage - MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error + MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) diff --git a/storage/paths/local.go b/storage/paths/local.go index 68999940f..ccb41c2a2 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -716,22 +716,36 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa return nil } -func (st *Local) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error { - dest, destIds, err := st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove) - if err != nil { - return xerrors.Errorf("acquire dest storage: %w", err) +func (st *Local) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error { + settings := storiface.AcquireSettings{ + // If into is nil then we're expecting the data to be there already, but make sure here + Into: nil, + } + for _, o := range opts { + o(&settings) } + var err error + var dest, destIds storiface.SectorPaths + if settings.Into == nil { + dest, destIds, err = st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove) + if err != nil { + return xerrors.Errorf("acquire dest storage: %w", err) + } + } else { + // destination from settings + dest = settings.Into.Paths + destIds = settings.Into.IDs + } + + // note: this calls allocate on types - if data is already in paths of correct type, + // the returned paths are guaranteed to be the same as dest src, srcIds, err := st.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage: %w", err) } - for _, fileType := range storiface.PathTypes { - if fileType&types == 0 { - continue - } - + for _, fileType := range types.AllSet() { sst, err := st.index.StorageInfo(ctx, storiface.ID(storiface.PathByType(srcIds, fileType))) if err != nil { return xerrors.Errorf("failed to get source storage info: %w", err) diff --git a/storage/paths/mocks/store.go b/storage/paths/mocks/store.go index d7fa226e6..1224e6b57 100644 --- a/storage/paths/mocks/store.go +++ b/storage/paths/mocks/store.go @@ -107,17 +107,22 @@ func (mr *MockStoreMockRecorder) GenerateSingleVanillaProof(arg0, arg1, arg2, ar } // MoveStorage mocks base method. -func (m *MockStore) MoveStorage(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType) error { +func (m *MockStore) MoveStorage(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 ...storiface.AcquireOption) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MoveStorage", arg0, arg1, arg2) + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MoveStorage", varargs...) ret0, _ := ret[0].(error) return ret0 } // MoveStorage indicates an expected call of MoveStorage. -func (mr *MockStoreMockRecorder) MoveStorage(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockStoreMockRecorder) MoveStorage(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), arg0, arg1, arg2) + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), varargs...) } // Remove mocks base method. diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 8532357b4..abf8622e1 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -325,14 +325,14 @@ func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.Registe } } -func (r *Remote) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType) error { +func (r *Remote) MoveStorage(ctx context.Context, s storiface.SectorRef, types storiface.SectorFileType, opts ...storiface.AcquireOption) error { // Make sure we have the data local - _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove, opts...) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } - return r.local.MoveStorage(ctx, s, types) + return r.local.MoveStorage(ctx, s, types, opts...) } func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool, keepIn []storiface.ID) error {