fix: curio taskstorage: Don't try to free reservations by nulled TaskID (#12018)
This commit is contained in:
parent
1afe58dcad
commit
db105f1cb6
@ -115,7 +115,7 @@ func (t *TaskStorage) HasCapacity() bool {
|
|||||||
return false // no path found
|
return false // no path found
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TaskStorage) Claim(taskID int) error {
|
func (t *TaskStorage) Claim(taskID int) (func() error, error) {
|
||||||
// TaskStorage Claim Attempts to reserve storage for the task
|
// TaskStorage Claim Attempts to reserve storage for the task
|
||||||
// A: Create a reservation for files to be allocated
|
// A: Create a reservation for files to be allocated
|
||||||
// B: Create a reservation for existing files to be fetched into local storage
|
// B: Create a reservation for existing files to be fetched into local storage
|
||||||
@ -125,7 +125,7 @@ func (t *TaskStorage) Claim(taskID int) error {
|
|||||||
|
|
||||||
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting sector ref: %w", err)
|
return nil, xerrors.Errorf("getting sector ref: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// storage writelock sector
|
// storage writelock sector
|
||||||
@ -147,12 +147,12 @@ func (t *TaskStorage) Claim(taskID int) error {
|
|||||||
|
|
||||||
if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil {
|
if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil {
|
||||||
// timer will expire
|
// timer will expire
|
||||||
return xerrors.Errorf("claim StorageLock: %w", err)
|
return nil, xerrors.Errorf("claim StorageLock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !lockAcquireTimer.Stop() {
|
if !lockAcquireTimer.Stop() {
|
||||||
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
|
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
|
||||||
return xerrors.Errorf("failed to acquire lock")
|
return nil, xerrors.Errorf("failed to acquire lock")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// make sure we release the sector lock
|
// make sure we release the sector lock
|
||||||
@ -166,13 +166,13 @@ func (t *TaskStorage) Claim(taskID int) error {
|
|||||||
// paths to be used.
|
// paths to be used.
|
||||||
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove)
|
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// reserve the space
|
// reserve the space
|
||||||
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage)
|
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var releaseOnce sync.Once
|
var releaseOnce sync.Once
|
||||||
@ -197,19 +197,16 @@ func (t *TaskStorage) Claim(taskID int) error {
|
|||||||
// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
|
// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
|
||||||
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
|
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
|
||||||
// run the job, harmonytask is what ensures that only one SDR runs at a time
|
// run the job, harmonytask is what ensures that only one SDR runs at a time
|
||||||
return nil
|
return func() error {
|
||||||
|
return t.markComplete(taskID, sectorRef)
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TaskStorage) MarkComplete(taskID int) error {
|
func (t *TaskStorage) markComplete(taskID int, sectorRef SectorRef) error {
|
||||||
// MarkComplete is ALWAYS called after the task is done or not scheduled
|
// MarkComplete is ALWAYS called after the task is done or not scheduled
|
||||||
// If Claim is called and returns without errors, MarkComplete with the same
|
// If Claim is called and returns without errors, MarkComplete with the same
|
||||||
// taskID is guaranteed to eventually be called
|
// taskID is guaranteed to eventually be called
|
||||||
|
|
||||||
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting sector ref: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID))
|
sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID))
|
||||||
if !ok {
|
if !ok {
|
||||||
return xerrors.Errorf("no reservation found for task %d", taskID)
|
return xerrors.Errorf("no reservation found for task %d", taskID)
|
||||||
|
@ -105,7 +105,8 @@ canAcceptAgain:
|
|||||||
releaseStorage := func() {
|
releaseStorage := func() {
|
||||||
}
|
}
|
||||||
if h.TaskTypeDetails.Cost.Storage != nil {
|
if h.TaskTypeDetails.Cost.Storage != nil {
|
||||||
if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil {
|
markComplete, err := h.TaskTypeDetails.Cost.Storage.Claim(int(*tID))
|
||||||
|
if err != nil {
|
||||||
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
|
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
|
||||||
|
|
||||||
if len(ids) > 1 {
|
if len(ids) > 1 {
|
||||||
@ -122,7 +123,7 @@ canAcceptAgain:
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
releaseStorage = func() {
|
releaseStorage = func() {
|
||||||
if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(int(*tID)); err != nil {
|
if err := markComplete(); err != nil {
|
||||||
log.Errorw("Could not release storage", "error", err)
|
log.Errorw("Could not release storage", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,12 +31,8 @@ type Resources struct {
|
|||||||
type Storage interface {
|
type Storage interface {
|
||||||
HasCapacity() bool
|
HasCapacity() bool
|
||||||
|
|
||||||
// This allows some other system to claim space for this task.
|
// This allows some other system to claim space for this task. Returns a cleanup function
|
||||||
Claim(taskID int) error
|
Claim(taskID int) (func() error, error)
|
||||||
|
|
||||||
// This allows some other system to consider the task done.
|
|
||||||
// It's up to the caller to remove the data, if that applies.
|
|
||||||
MarkComplete(taskID int) error
|
|
||||||
}
|
}
|
||||||
type Reg struct {
|
type Reg struct {
|
||||||
Resources
|
Resources
|
||||||
|
Loading…
Reference in New Issue
Block a user