before joining with wdpost from shrenuj

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-06 11:46:13 -05:00
parent 2b7732e5c8
commit 88518a34b7
4 changed files with 102 additions and 3 deletions

View File

@ -394,8 +394,14 @@ func (e *TaskEngine) bump(taskType string) {
}
}
// resourcesInUse requires workListsMutex to be already locked.
func (e *TaskEngine) resourcesInUse() resources.Resources {
func (e *TaskEngine) ResourcesAvailable() resources.Resources {
e.workAdderMutex.Lock()
defer e.workAdderMutex.Unlock()
return e.resoourcesAvailable()
}
// resoourcesAvailable requires workAdderMutex to be already locked.
func (e *TaskEngine) resoourcesAvailable() resources.Resources {
tmp := e.reg.Resources
copy(tmp.GpuRam, e.reg.Resources.GpuRam)
for _, t := range e.handlers {

View File

@ -220,7 +220,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
}
func (h *taskTypeHandler) AssertMachineHasCapacity() error {
r := h.TaskEngine.resourcesInUse()
r := h.TaskEngine.resoourcesAvailable()
if r.Cpu-h.Cost.Cpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of cpu")

View File

@ -0,0 +1,15 @@
package tasks
func SliceIfFound[T any](slice []T, f func(T) bool) []T {
ct := 0
for i, v := range slice {
if f(v) {
slice[ct], slice[i] = slice[i], slice[ct]
ct++
}
}
if ct == 0 {
return slice
}
return slice[:ct]
}

View File

@ -0,0 +1,78 @@
package wdpost
import (
"context"
"github.com/samber/lo"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type WindowPostTaskHandler struct {
max int // TODO read from Flags
*harmonytask.TaskEngine // TODO populate at setup time
Chain full.ChainModuleAPI
}
func New(chain full.ChainModuleAPI) *WindowPostTaskHandler {
// TODO
return &WindowPostTaskHandler{
Chain: chain,
}
}
func (wp *WindowPostTaskHandler) CanAccept(tids []harmonytask.TaskID) (*harmonytask.TaskID, error) {
// GetEpoch
ts, err := wp.Chain.ChainHead(context.Background())
if err != nil {
return nil, err
}
// TODO GetDeadline Epochs for tasks
type wdTaskDef struct {
abi.RegisteredSealProof
}
var tasks []wdTaskDef
// TODO accept those past deadline, then do the right thing in Do()
// TODO Exit nil if no disk available?
// Discard those too big for our free RAM
freeRAM := wp.TaskEngine.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
return res[d.RegisteredSealProof].MaxMemory <= freeRAM
})
// TODO If Local Disk, discard others
// TODO If Shared Disk entries, discard others
// TODO Select the one closest to the deadline
// FUTURE: Be less greedy: let the best machine do the work.
// FUTURE: balance avoiding 2nd retries (3rd run)
return nil, nil
}
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
func (wp *WindowPostTaskHandler) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPost",
Max: wp.max,
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
Gpu: 1,
// RAM of smallest proof's max is listed here
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
if res[k].MaxMemory < i {
return res[k].MaxMemory
}
return i
}, 1<<63),
},
}
}