Merge pull request #11498 from filecoin-project/fix/harmony-reclaim
harmony: Fix task reclaim on restart
This commit is contained in:
commit
cf8fed9440
@ -176,7 +176,7 @@ func New(
|
|||||||
continue // not really fatal, but not great
|
continue // not really fatal, but not great
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
|
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
|
||||||
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
|
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -285,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(unownedTasks) > 0 {
|
if len(unownedTasks) > 0 {
|
||||||
accepted := v.considerWork("poller", unownedTasks)
|
accepted := v.considerWork(workSourcePoller, unownedTasks)
|
||||||
if accepted {
|
if accepted {
|
||||||
return // accept new work slowly and in priority order
|
return // accept new work slowly and in priority order
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
workSourcePoller = "poller"
|
||||||
|
workSourceRecover = "recovered"
|
||||||
|
)
|
||||||
|
|
||||||
// considerWork is called to attempt to start work on a task-id of this task type.
|
// considerWork is called to attempt to start work on a task-id of this task type.
|
||||||
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
|
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
|
||||||
// The only caller should be the one work poller thread. This does spin off other threads,
|
// The only caller should be the one work poller thread. This does spin off other threads,
|
||||||
@ -87,22 +92,25 @@ top:
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Can we claim the work for our hostname?
|
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
|
||||||
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
|
if from != workSourceRecover {
|
||||||
if err != nil {
|
// 4. Can we claim the work for our hostname?
|
||||||
log.Error(err)
|
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
|
||||||
return false
|
if err != nil {
|
||||||
}
|
log.Error(err)
|
||||||
if ct == 0 {
|
return false
|
||||||
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
|
}
|
||||||
var tryAgain = make([]TaskID, 0, len(ids)-1)
|
if ct == 0 {
|
||||||
for _, id := range ids {
|
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
|
||||||
if id != *tID {
|
var tryAgain = make([]TaskID, 0, len(ids)-1)
|
||||||
tryAgain = append(tryAgain, id)
|
for _, id := range ids {
|
||||||
}
|
if id != *tID {
|
||||||
|
tryAgain = append(tryAgain, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ids = tryAgain
|
||||||
|
goto top
|
||||||
}
|
}
|
||||||
ids = tryAgain
|
|
||||||
goto top
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Count.Add(1)
|
h.Count.Add(1)
|
||||||
|
Loading…
Reference in New Issue
Block a user