diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index 4bbb8e343..4cd9adffe 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -115,7 +115,7 @@ func (t *TaskStorage) HasCapacity() bool { return false // no path found } -func (t *TaskStorage) Claim(taskID int) error { +func (t *TaskStorage) Claim(taskID int) (func() error, error) { // TaskStorage Claim Attempts to reserve storage for the task // A: Create a reservation for files to be allocated // B: Create a reservation for existing files to be fetched into local storage @@ -125,7 +125,7 @@ func (t *TaskStorage) Claim(taskID int) error { sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) if err != nil { - return xerrors.Errorf("getting sector ref: %w", err) + return nil, xerrors.Errorf("getting sector ref: %w", err) } // storage writelock sector @@ -147,12 +147,12 @@ func (t *TaskStorage) Claim(taskID int) error { if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil { // timer will expire - return xerrors.Errorf("claim StorageLock: %w", err) + return nil, xerrors.Errorf("claim StorageLock: %w", err) } if !lockAcquireTimer.Stop() { // timer expired, so lkctx is done, and that means the lock was acquired and dropped.. - return xerrors.Errorf("failed to acquire lock") + return nil, xerrors.Errorf("failed to acquire lock") } defer func() { // make sure we release the sector lock @@ -166,13 +166,13 @@ func (t *TaskStorage) Claim(taskID int) error { // paths to be used. pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove) if err != nil { - return err + return nil, err } // reserve the space release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage) if err != nil { - return err + return nil, err } var releaseOnce sync.Once @@ -197,19 +197,16 @@ func (t *TaskStorage) Claim(taskID int) error { // note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't // guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can // run the job, harmonytask is what ensures that only one SDR runs at a time - return nil + return func() error { + return t.markComplete(taskID, sectorRef) + }, nil } -func (t *TaskStorage) MarkComplete(taskID int) error { +func (t *TaskStorage) markComplete(taskID int, sectorRef SectorRef) error { // MarkComplete is ALWAYS called after the task is done or not scheduled // If Claim is called and returns without errors, MarkComplete with the same // taskID is guaranteed to eventually be called - sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) - if err != nil { - return xerrors.Errorf("getting sector ref: %w", err) - } - sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID)) if !ok { return xerrors.Errorf("no reservation found for task %d", taskID) diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index a8c6e58b8..cfee945a6 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -105,7 +105,8 @@ canAcceptAgain: releaseStorage := func() { } if h.TaskTypeDetails.Cost.Storage != nil { - if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil { + markComplete, err := h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)) + if err != nil { log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err) if len(ids) > 1 { @@ -122,7 +123,7 @@ canAcceptAgain: return false } releaseStorage = func() { - if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(int(*tID)); err != nil { + if err := markComplete(); err != nil { log.Errorw("Could not release storage", "error", err) } } diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 33bc80d6f..9597f464e 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -31,12 +31,8 @@ type Resources struct { type Storage interface { HasCapacity() bool - // This allows some other system to claim space for this task. - Claim(taskID int) error - - // This allows some other system to consider the task done. - // It's up to the caller to remove the data, if that applies. - MarkComplete(taskID int) error + // This allows some other system to claim space for this task. Returns a cleanup function + Claim(taskID int) (func() error, error) } type Reg struct { Resources