post workers: Share resource accounting code

This commit is contained in:
Łukasz Magiera 2022-01-18 15:53:13 +01:00
parent 8ac20305df
commit fa09b9afb1
8 changed files with 482 additions and 77 deletions

Binary file not shown.

Binary file not shown.

View File

@ -3280,6 +3280,170 @@ Response:
"aGPU 1337" "aGPU 1337"
], ],
"Resources": { "Resources": {
"post/v0/windowproof": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 32212254720,
"MaxMemory": 103079215104,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 64424509440,
"MaxMemory": 128849018880,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 32212254720,
"MaxMemory": 103079215104,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 64424509440,
"MaxMemory": 128849018880,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"post/v0/winningproof": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"seal/v0/addpiece": { "seal/v0/addpiece": {
"0": { "0": {
"MinMemory": 2048, "MinMemory": 2048,

View File

@ -109,6 +109,170 @@ Response:
"string value" "string value"
], ],
"Resources": { "Resources": {
"post/v0/windowproof": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 32212254720,
"MaxMemory": 103079215104,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 64424509440,
"MaxMemory": 128849018880,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 32212254720,
"MaxMemory": 103079215104,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 64424509440,
"MaxMemory": 128849018880,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"post/v0/winningproof": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"seal/v0/addpiece": { "seal/v0/addpiece": {
"0": { "0": {
"MinMemory": 2048, "MinMemory": 2048,

View File

@ -37,7 +37,9 @@ func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID,
return nil, xerrors.New("generate window post len(sectorInfo)=0") return nil, xerrors.New("generate window post len(sectorInfo)=0")
} }
ppt, err := sectorInfo[0].SealProof.RegisteredWinningPoStProof() spt := sectorInfo[0].SealProof
ppt, err := spt.RegisteredWinningPoStProof()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -59,7 +61,7 @@ func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID,
} }
var proofs []proof.PoStProof var proofs []proof.PoStProof
err = m.winningPoStSched.Schedule(ctx, false, func(ctx context.Context, w Worker) error { err = m.winningPoStSched.Schedule(ctx, false, spt, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWinningPoSt(ctx, ppt, minerID, sectorChallenges, randomness) out, err := w.GenerateWinningPoSt(ctx, ppt, minerID, sectorChallenges, randomness)
if err != nil { if err != nil {
return err return err
@ -93,7 +95,9 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
return nil, nil, xerrors.New("generate window post len(sectorInfo)=0") return nil, nil, xerrors.New("generate window post len(sectorInfo)=0")
} }
ppt, err := sectorInfo[0].SealProof.RegisteredWindowPoStProof() spt := sectorInfo[0].SealProof
ppt, err := spt.RegisteredWindowPoStProof()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -157,7 +161,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
}) })
} }
p, sk, err := m.generatePartitionWindowPost(cctx, ppt, minerID, int(partIdx), sectors, randomness) p, sk, err := m.generatePartitionWindowPost(cctx, spt, ppt, minerID, int(partIdx), sectors, randomness)
if err != nil { if err != nil {
retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err)) retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
if len(sk) > 0 { if len(sk) > 0 {
@ -189,11 +193,11 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
return out, skipped, retErr return out, skipped, retErr
} }
func (m *Manager) generatePartitionWindowPost(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, partIndex int, sc []storiface.PostSectorChallenge, randomness abi.PoStRandomness) (proof.PoStProof, []abi.SectorID, error) { func (m *Manager) generatePartitionWindowPost(ctx context.Context, spt abi.RegisteredSealProof, ppt abi.RegisteredPoStProof, minerID abi.ActorID, partIndex int, sc []storiface.PostSectorChallenge, randomness abi.PoStRandomness) (proof.PoStProof, []abi.SectorID, error) {
log.Infow("generateWindowPost", "index", partIndex) log.Infow("generateWindowPost", "index", partIndex)
var result storiface.WindowPoStResult var result storiface.WindowPoStResult
err := m.windowPoStSched.Schedule(ctx, true, func(ctx context.Context, w Worker) error { err := m.windowPoStSched.Schedule(ctx, true, spt, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWindowPoSt(ctx, ppt, minerID, sc, partIndex, randomness) out, err := w.GenerateWindowPoSt(ctx, ppt, minerID, sc, partIndex, randomness)
if err != nil { if err != nil {
return err return err

View File

@ -2,31 +2,31 @@ package sectorstorage
import ( import (
"context" "context"
"sort" "math/rand"
"sync" "sync"
"time" "time"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type poStScheduler struct { type poStScheduler struct {
lk sync.RWMutex lk sync.RWMutex
workers map[storiface.WorkerID]*workerHandle workers map[storiface.WorkerID]*workerHandle
cond *sync.Cond cond *sync.Cond
postType sealtasks.TaskType
GPUUtilization float64 postType sealtasks.TaskType
} }
func newPoStScheduler(t sealtasks.TaskType) *poStScheduler { func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
ps := &poStScheduler{ ps := &poStScheduler{
workers: map[storiface.WorkerID]*workerHandle{}, workers: map[storiface.WorkerID]*workerHandle{},
postType: t, postType: t,
GPUUtilization: storiface.GPUUtilizationProof,
} }
ps.cond = sync.NewCond(&ps.lk) ps.cond = sync.NewCond(&ps.lk)
return ps return ps
@ -76,7 +76,7 @@ func (ps *poStScheduler) CanSched(ctx context.Context) bool {
return false return false
} }
func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, work WorkerAction) error { func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.RegisteredSealProof, work WorkerAction) error {
ps.lk.Lock() ps.lk.Lock()
defer ps.lk.Unlock() defer ps.lk.Unlock()
@ -85,83 +85,61 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, work Worker
} }
// Get workers by resource // Get workers by resource
canDo, accepts := ps.canHandleWorkers() canDo, candidates := ps.readyWorkers(spt)
for !canDo { for !canDo {
//if primary is true, it must be dispatched to a worker //if primary is true, it must be dispatched to a worker
if primary { if primary {
ps.cond.Wait() ps.cond.Wait()
canDo, accepts = ps.canHandleWorkers() canDo, candidates = ps.readyWorkers(spt)
} else { } else {
return xerrors.Errorf("cant find %s post worker", ps.postType) return xerrors.Errorf("can't find %s post worker", ps.postType)
} }
} }
return ps.withResources(accepts[0], func(worker Worker) error { defer func() {
if ps.cond != nil {
ps.cond.Broadcast()
}
}()
selected := candidates[0]
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.info, selected.res, &ps.lk, func() error {
ps.lk.Unlock() ps.lk.Unlock()
defer ps.lk.Lock() defer ps.lk.Lock()
return work(ctx, worker) return work(ctx, worker.workerRpc)
}) })
} }
func (ps *poStScheduler) canHandleWorkers() (bool, []storiface.WorkerID) { type candidateWorker struct {
id storiface.WorkerID
res storiface.Resources
}
var accepts []storiface.WorkerID func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []candidateWorker) {
var accepts []candidateWorker
//if the gpus of the worker are insufficient or its disable, it cannot be scheduled //if the gpus of the worker are insufficient or its disable, it cannot be scheduled
for wid, wr := range ps.workers { for wid, wr := range ps.workers {
if wr.active.gpuUsed >= float64(len(wr.info.Resources.GPUs)) || !wr.enabled { needRes := wr.info.Resources.ResourceSpec(spt, ps.postType)
if !wr.active.canHandleRequest(needRes, wid, "post-readyWorkers", wr.info) {
continue continue
} }
accepts = append(accepts, wid)
accepts = append(accepts, candidateWorker{
id: wid,
res: needRes,
})
} }
freeGPU := func(i int) float64 { // todo: round robin or something
w := ps.workers[accepts[i]] rand.Shuffle(len(accepts), func(i, j int) {
return float64(len(w.info.Resources.GPUs)) - w.active.gpuUsed accepts[i], accepts[j] = accepts[j], accepts[i]
}
sort.Slice(accepts[:], func(i, j int) bool {
return freeGPU(i) > freeGPU(j)
}) })
if len(accepts) == 0 { return len(accepts) != 0, accepts
return false, accepts
}
return true, accepts
}
func (ps *poStScheduler) withResources(wid storiface.WorkerID, cb func(wid Worker) error) error {
ps.addResource(wid)
worker := ps.workers[wid].workerRpc
err := cb(worker)
ps.freeResource(wid)
if ps.cond != nil {
ps.cond.Broadcast()
}
return err
}
func (ps *poStScheduler) freeResource(wid storiface.WorkerID) {
if _, ok := ps.workers[wid]; !ok {
log.Warnf("release PoSt Worker not found worker")
return
}
if ps.workers[wid].active.gpuUsed > 0 {
ps.workers[wid].active.gpuUsed -= ps.GPUUtilization
}
return
}
func (ps *poStScheduler) addResource(wid storiface.WorkerID) {
ps.workers[wid].active.gpuUsed += ps.GPUUtilization
} }
func (ps *poStScheduler) disable(wid storiface.WorkerID) { func (ps *poStScheduler) disable(wid storiface.WorkerID) {

View File

@ -62,8 +62,6 @@ func (r Resources) Threads(wcpus uint64, gpus int) uint64 {
return uint64(mp) return uint64(mp)
} }
var GPUUtilizationProof float64 = 1.0 // todo use resource tablo
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: { sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
@ -333,6 +331,104 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
BaseMinMemory: 0, BaseMinMemory: 0,
}, },
}, },
sealtasks.TTGenerateWindowPoSt: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 120 << 30, // TODO: Confirm
MinMemory: 60 << 30,
MaxParallelism: -1,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 64 << 30, // params
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 96 << 30,
MinMemory: 30 << 30,
MaxParallelism: -1,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 32 << 30, // params
},
abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
MaxParallelism: 1, // This is fine
GPUUtilization: 1.0,
BaseMinMemory: 10 << 30,
},
abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
MaxParallelism: 1,
GPUUtilization: 1.0,
BaseMinMemory: 2 << 10,
},
abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
MaxParallelism: 1,
GPUUtilization: 1.0,
BaseMinMemory: 8 << 20,
},
},
sealtasks.TTGenerateWinningPoSt: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
MaxParallelism: -1,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 64 << 30, // params
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
MaxParallelism: -1,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 32 << 30, // params
},
abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
MaxParallelism: 1, // This is fine
GPUUtilization: 1.0,
BaseMinMemory: 10 << 30,
},
abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
MaxParallelism: 1,
GPUUtilization: 1.0,
BaseMinMemory: 2 << 10,
},
abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
MaxParallelism: 1,
GPUUtilization: 1.0,
BaseMinMemory: 8 << 20,
},
},
} }
func init() { func init() {

View File

@ -80,19 +80,18 @@ func TestWindowPostWorker(t *testing.T) {
t.Log("Waiting for post message") t.Log("Waiting for post message")
bm.Stop() bm.Stop()
var lastPending []*types.SignedMessage
for i := 0; i < 500; i++ { for i := 0; i < 500; i++ {
n, err := client.MpoolPending(ctx, types.EmptyTSK) lastPending, err = client.MpoolPending(ctx, types.EmptyTSK)
require.NoError(t, err) require.NoError(t, err)
if len(n) > 0 { if len(lastPending) > 0 {
break break
} }
time.Sleep(40 * time.Millisecond) time.Sleep(40 * time.Millisecond)
} }
n, err := client.MpoolPending(ctx, types.EmptyTSK) require.Greater(t, len(lastPending), 0)
require.NoError(t, err)
require.Greater(t, len(n), 0)
t.Log("post message landed") t.Log("post message landed")