From c57c0e7f551a02dd39f4c35332f28cc7d6cbc1a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 20 Mar 2020 23:30:17 +0100 Subject: [PATCH] workers: Memory based scheduling --- api/api_worker.go | 18 +- api/apistruct/struct.go | 5 + cmd/lotus-bench/main.go | 4 +- go.mod | 1 + go.sum | 11 ++ node/impl/storminer.go | 2 +- storage/sealmgr/advmgr/manager.go | 158 ++++++++++++---- storage/sealmgr/advmgr/resources.go | 135 ++++++++++++++ storage/sealmgr/advmgr/sched.go | 239 +++++++++++++++++++++++++ storage/sealmgr/advmgr/worker_local.go | 38 ++++ 10 files changed, 575 insertions(+), 36 deletions(-) create mode 100644 storage/sealmgr/advmgr/resources.go create mode 100644 storage/sealmgr/advmgr/sched.go diff --git a/api/api_worker.go b/api/api_worker.go index bafd6ca83..7a0004656 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -2,12 +2,12 @@ package api import ( "context" - "github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" ) type WorkerApi interface { @@ -16,6 +16,22 @@ type WorkerApi interface { TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight Paths(context.Context) ([]stores.StoragePath, error) + Info(context.Context) (WorkerInfo, error) storage.Sealer } + +type WorkerResources struct { + MemPhysical uint64 + MemSwap uint64 + + MemReserved uint64 // Used by system / other processes + + GPUs []string +} + +type WorkerInfo struct { + Hostname string + + Resources WorkerResources +} diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index eb833db86..059f2e79d 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -205,6 +205,7 @@ type WorkerStruct struct { TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` + Info func(context.Context) (api.WorkerInfo, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` @@ -724,6 +725,10 @@ func (w *WorkerStruct) Paths(ctx context.Context) ([]stores.StoragePath, error) return w.Internal.Paths(ctx) } +func (w *WorkerStruct) Info(ctx context.Context) (api.WorkerInfo, error) { + return w.Internal.Info(ctx) +} + func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces) } diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 54a8a7c36..a4e52a2df 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -137,11 +137,11 @@ func main() { if err != nil { return err } - defer func() { + /*defer func() { if err := os.RemoveAll(tsdir); err != nil { log.Warn("remove all: ", err) } - }() + }()*/ sbdir = tsdir } else { exp, err := homedir.Expand(robench) diff --git a/go.mod b/go.mod index bfa28557b..1233a04d9 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/coreos/go-systemd/v22 v22.0.0 github.com/docker/go-units v0.4.0 + github.com/elastic/go-sysinfo v1.3.0 github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5 github.com/filecoin-project/filecoin-ffi v0.0.0-20200304181354-4446ff8a1bb9 github.com/filecoin-project/go-address v0.0.2-0.20200218010043-eb9bb40ed5be diff --git a/go.sum b/go.sum index 7e4c8b895..2b1d2a531 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,10 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE= +github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= +github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= +github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY= github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8= github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5 h1:cr9+8iX+u9fDV53MWqqZw820EyeWVX+h/HCz56JUWb0= @@ -363,6 +367,8 @@ github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= @@ -702,6 +708,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0 h1:c8R11WC8m7KNMkTv/0+Be8vvwo4I3/Ut9AC2FW8fX3U= +github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= @@ -887,6 +895,7 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 h1:gSbV7h1NRL2G1xTg/owz62CST1oJBmxy4QpMMregXVQ= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -955,5 +964,7 @@ honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M= +howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM= diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b260096b7..45fb726e2 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -155,7 +155,7 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error log.Infof("Connected to a remote worker at %s", url) - return sm.StorageMgr.AddWorker(w) + return sm.StorageMgr.AddWorker(ctx, w) } func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index b6228bf01..3846baa42 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -1,6 +1,7 @@ package advmgr import ( + "container/list" "context" "io" "net/http" @@ -32,11 +33,14 @@ type Worker interface { // Returns paths accessible to the worker Paths(context.Context) ([]stores.StoragePath, error) + + Info(context.Context) (api.WorkerInfo, error) } +type workerID uint64 + type Manager struct { - workers []Worker - scfg *sectorbuilder.Config + scfg *sectorbuilder.Config ls stores.LocalStorage storage *stores.Remote @@ -46,7 +50,16 @@ type Manager struct { storage2.Prover - lk sync.Mutex + workersLk sync.Mutex + nextWorker workerID + workers map[workerID]*workerHandle + + newWorkers chan *workerHandle + schedule chan *workerRequest + workerFree chan workerID + closing chan struct{} + + schedQueue list.List // List[*workerRequest] } func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { @@ -68,12 +81,6 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi stor := stores.NewRemote(lstor, si, headers) m := &Manager{ - workers: []Worker{ - NewLocalWorker(WorkerConfig{ - SealProof: cfg.SealProofType, - TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize}, - }, stor, lstor, si), - }, scfg: cfg, ls: ls, @@ -82,9 +89,27 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi remoteHnd: &stores.FetchHandler{Store: lstor}, index: si, + nextWorker: 0, + workers: map[workerID]*workerHandle{}, + + newWorkers: make(chan *workerHandle), + schedule: make(chan *workerRequest), + workerFree: make(chan workerID), + closing: make(chan struct{}), + Prover: prover, } + go m.runSched() + + err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ + SealProof: cfg.SealProofType, + TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize}, + }, stor, lstor, si)) + if err != nil { + return nil, xerrors.Errorf("adding local worker: %w", err) + } + return m, nil } @@ -106,11 +131,16 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { return nil } -func (m *Manager) AddWorker(w Worker) error { - m.lk.Lock() - defer m.lk.Unlock() +func (m *Manager) AddWorker(ctx context.Context, w Worker) error { + info, err := w.Info(ctx) + if err != nil { + return xerrors.Errorf("getting worker info: %w", err) + } - m.workers = append(m.workers, w) + m.newWorkers <- &workerHandle{ + w: w, + info: info, + } return nil } @@ -127,12 +157,15 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, secto panic("implement me") } -func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]Worker, map[int]stores.StorageInfo) { - var workers []Worker - paths := map[int]stores.StorageInfo{} +func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + var workers []workerID + paths := map[workerID]stores.StorageInfo{} for i, worker := range m.workers { - tt, err := worker.TaskTypes(context.TODO()) + tt, err := worker.w.TaskTypes(context.TODO()) if err != nil { log.Errorf("error getting supported worker task types: %+v", err) continue @@ -142,7 +175,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor continue } - phs, err := worker.Paths(context.TODO()) + phs, err := worker.w.Paths(context.TODO()) if err != nil { log.Errorf("error getting worker paths: %+v", err) continue @@ -169,12 +202,39 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor } paths[i] = *st - workers = append(workers, worker) + workers = append(workers, i) } return workers, paths } +func (m *Manager) getWorker(ctx context.Context, taskType sealmgr.TaskType, accept []workerID) (Worker, func(), error) { + ret := make(chan workerResponse) + + select { + case m.schedule <- &workerRequest{ + taskType: taskType, + accept: accept, + + cancel: ctx.Done(), + ret: ret, + }: + case <-m.closing: + return nil, nil, xerrors.New("closing") + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + + select { + case resp := <-ret: + return resp.worker, resp.done, resp.err + case <-m.closing: + return nil, nil, xerrors.New("closing") + case <-ctx.Done(): + return nil, nil, ctx.Err() + } +} + func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { log.Warnf("stub NewSector") return nil @@ -201,9 +261,15 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.New("no worker found") } + worker, done, err := m.getWorker(ctx, sealmgr.TTAddPiece, candidateWorkers) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + // TODO: select(candidateWorkers, ...) // TODO: remove the sectorbuilder abstraction, pass path directly - return candidateWorkers[0].AddPiece(ctx, sector, existingPieces, sz, r) + return worker.AddPiece(ctx, sector, existingPieces, sz, r) } func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) { @@ -216,12 +282,18 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best) if len(candidateWorkers) == 0 { - return nil, xerrors.New("no suitable workers found") // TODO: wait? + return nil, xerrors.New("no suitable workers found") } + worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit1, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + // TODO: select(candidateWorkers, ...) // TODO: remove the sectorbuilder abstraction, pass path directly - return candidateWorkers[0].SealPreCommit1(ctx, sector, ticket, pieces) + return worker.SealPreCommit1(ctx, sector, ticket, pieces) } func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) { @@ -234,12 +306,18 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) if len(candidateWorkers) == 0 { - return storage2.SectorCids{}, xerrors.New("no suitable workers found") // TODO: wait? + return storage2.SectorCids{}, xerrors.New("no suitable workers found") } + worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit2, candidateWorkers) + if err != nil { + return storage2.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + // TODO: select(candidateWorkers, ...) // TODO: remove the sectorbuilder abstraction, pass path directly - return candidateWorkers[0].SealPreCommit2(ctx, sector, phase1Out) + return worker.SealPreCommit2(ctx, sector, phase1Out) } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) { @@ -253,14 +331,24 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a return nil, xerrors.New("no suitable workers found") // TODO: wait? } + // TODO: Try very hard to execute on worker with access to the sectors + worker, done, err := m.getWorker(ctx, sealmgr.TTCommit1, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + // TODO: select(candidateWorkers, ...) // TODO: remove the sectorbuilder abstraction, pass path directly - return candidateWorkers[0].SealCommit1(ctx, sector, ticket, seed, pieces, cids) + return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) } func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { - for _, worker := range m.workers { - tt, err := worker.TaskTypes(context.TODO()) + var candidateWorkers []workerID + + m.workersLk.Lock() + for id, worker := range m.workers { + tt, err := worker.w.TaskTypes(ctx) if err != nil { log.Errorf("error getting supported worker task types: %+v", err) continue @@ -268,11 +356,17 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou if _, ok := tt[sealmgr.TTCommit2]; !ok { continue } - - return worker.SealCommit2(ctx, sector, phase1Out) + candidateWorkers = append(candidateWorkers, id) } + m.workersLk.Unlock() - return nil, xerrors.New("no worker found") + worker, done, err := m.getWorker(ctx, sealmgr.TTCommit2, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + return worker.SealCommit2(ctx, sector, phase1Out) } func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { @@ -281,11 +375,11 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error return xerrors.Errorf("finding sealed sector: %w", err) } - candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best) + candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTFinalize, best) // TODO: Remove sector from sealing stores // TODO: Move the sector to long-term storage - return candidateWorkers[0].FinalizeSector(ctx, sector) + return m.workers[candidateWorkers[0]].w.FinalizeSector(ctx, sector) } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { diff --git a/storage/sealmgr/advmgr/resources.go b/storage/sealmgr/advmgr/resources.go new file mode 100644 index 000000000..a80435ef9 --- /dev/null +++ b/storage/sealmgr/advmgr/resources.go @@ -0,0 +1,135 @@ +package advmgr + +import ( + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads + sectorbuilder.FTUnsealed: 10, + sectorbuilder.FTSealed: 10, + sectorbuilder.FTCache: 70, // TODO: confirm for 32G +} + +var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{ + sectorbuilder.FTUnsealed: 10, + sectorbuilder.FTSealed: 10, + sectorbuilder.FTCache: 2, +} + +type Resources struct { + MinMemory uint64 // What Must be in RAM for decent perf + MaxMemory uint64 // Mamory required (swap + ram) + + MultiThread bool + CanGPU bool + + BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads) +} + +const MaxCachingOverhead = 32 << 30 + +var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{ + sealmgr.TTAddPiece: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative + MaxMemory: 32 << 30, + MinMemory: 32 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealmgr.TTPreCommit1: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 64 << 30, + MinMemory: 32 << 30, + + MultiThread: false, + + BaseMinMemory: 30 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealmgr.TTPreCommit2: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 96 << 30, + MinMemory: 64 << 30, + + MultiThread: true, + + BaseMinMemory: 30 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: true, + + BaseMinMemory: 1 << 30, + }, + }, + sealmgr.TTCommit1: { // Very short (~100ms), so params are very light + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealmgr.TTCommit2: { // TODO: Measure more accurately + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 110 << 30, + MinMemory: 60 << 30, + + MultiThread: true, + CanGPU: true, + + BaseMinMemory: 64 << 30, // params + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: false, // This is fine + CanGPU: true, + + BaseMinMemory: 10 << 30, + }, + }, +} + +func init() { + // for now we just reuse params for 2kib and 8mib from 512mib + + for taskType := range ResourceTable { + ResourceTable[taskType][abi.RegisteredProof_StackedDRG8MiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal] + ResourceTable[taskType][abi.RegisteredProof_StackedDRG2KiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal] + } +} diff --git a/storage/sealmgr/advmgr/sched.go b/storage/sealmgr/advmgr/sched.go new file mode 100644 index 000000000..6badc7663 --- /dev/null +++ b/storage/sealmgr/advmgr/sched.go @@ -0,0 +1,239 @@ +package advmgr + +import ( + "github.com/filecoin-project/specs-actors/actors/abi" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sealmgr" +) + +const mib = 1 << 20 + +type workerRequest struct { + taskType sealmgr.TaskType + accept []workerID // ordered by preference + + ret chan<- workerResponse + cancel <-chan struct{} +} + +type workerResponse struct { + err error + + worker Worker + done func() +} + +func (r *workerRequest) respond(resp workerResponse) { + select { + case r.ret <- resp: + case <-r.cancel: + log.Warnf("request got cancelled before we could respond") + if resp.done != nil { + resp.done() + } + } +} + +type workerHandle struct { + w Worker + + info api.WorkerInfo + + memUsedMin uint64 + memUsedMax uint64 + gpuUsed bool + cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things +} + +func (m *Manager) runSched() { + for { + select { + case w := <-m.newWorkers: + m.schedNewWorker(w) + case req := <-m.schedule: + resp, err := m.maybeSchedRequest(req) + if err != nil { + req.respond(workerResponse{err: err}) + continue + } + + if resp != nil { + req.respond(*resp) + continue + } + + m.schedQueue.PushBack(req) + case wid := <-m.workerFree: + m.onWorkerFreed(wid) + } + } +} + +func (m *Manager) onWorkerFreed(wid workerID) { + for e := m.schedQueue.Front(); e != nil; e = e.Next() { + req := e.Value.(*workerRequest) + var ok bool + for _, id := range req.accept { + if id == wid { + ok = true + break + } + } + if !ok { + continue + } + + resp, err := m.maybeSchedRequest(req) + if err != nil { + req.respond(workerResponse{err: err}) + continue + } + + if resp != nil { + req.respond(*resp) + + pe := e.Prev() + m.schedQueue.Remove(e) + if pe == nil { + pe = m.schedQueue.Front() + } + e = pe + continue + } + } +} + +func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + tried := 0 + + for _, id := range req.accept { + w, ok := m.workers[id] + if !ok { + log.Warnf("requested worker %d is not in scheduler", id) + } + tried++ + + canDo, err := m.canHandleRequest(id, w, req) + if err != nil { + return nil, err + } + + if !canDo { + continue + } + + return m.makeResponse(id, w, req), nil + } + + if tried == 0 { + return nil, xerrors.New("maybeSchedRequest didn't find any good workers") + } + + return nil, nil // put in waiting queue +} + +func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse { + needRes := ResourceTable[req.taskType][m.scfg.SealProofType] + + w.gpuUsed = needRes.CanGPU + if needRes.MultiThread { + w.cpuUse = -1 + } else { + if w.cpuUse != -1 { + w.cpuUse++ + } else { + log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid) + } + } + + w.memUsedMin += needRes.MinMemory + w.memUsedMax += needRes.MaxMemory + + return &workerResponse{ + err: nil, + worker: w.w, + done: func() { + m.workersLk.Lock() + + if needRes.CanGPU { + w.gpuUsed = false + } + + if needRes.MultiThread { + w.cpuUse = 0 + } else if w.cpuUse != -1 { + w.cpuUse-- + } + + w.memUsedMin -= needRes.MinMemory + w.memUsedMax -= needRes.MaxMemory + + m.workersLk.Unlock() + + select { + case m.workerFree <- wid: + case <-m.closing: + } + }, + } +} + +func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) { + needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType] + if !ok { + return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType) + } + + res := w.info.Resources + + // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) + minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + if minNeedMem > res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + return false, nil + } + + maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal { + maxNeedMem += MaxCachingOverhead + } + if maxNeedMem > res.MemSwap+res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + return false, nil + } + + if needRes.MultiThread { + if w.cpuUse != 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid) + return false, nil + } + } else { + if w.cpuUse == -1 { + log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid) + return false, nil + } + } + + if len(res.GPUs) > 0 && needRes.CanGPU { + if w.gpuUsed { + log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + return false, nil + } + } + + return true, nil +} + +func (m *Manager) schedNewWorker(w *workerHandle) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + id := m.nextWorker + m.workers[id] = w + m.nextWorker++ +} diff --git a/storage/sealmgr/advmgr/worker_local.go b/storage/sealmgr/advmgr/worker_local.go index 4c7ebbc26..f62ad58c9 100644 --- a/storage/sealmgr/advmgr/worker_local.go +++ b/storage/sealmgr/advmgr/worker_local.go @@ -3,11 +3,17 @@ package advmgr import ( "context" "io" + "os" + "github.com/elastic/go-sysinfo" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" storage2 "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sealmgr/stores" @@ -157,4 +163,36 @@ func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { return l.localStore.Local(ctx) } +func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { + hostname, err := os.Hostname() // TODO: allow overriding from config + if err != nil { + panic(err) + } + + gpus, err := ffi.GetGPUDevices() + if err != nil { + log.Errorf("getting gpu devices failed: %+v", err) + } + + h, err := sysinfo.Host() + if err != nil { + return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err) + } + + mem, err := h.Memory() + if err != nil { + return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) + } + + return api.WorkerInfo{ + Hostname: hostname, + Resources: api.WorkerResources{ + MemPhysical: mem.Total, + MemSwap: mem.VirtualTotal, + MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process + GPUs: gpus, + }, + }, nil +} + var _ Worker = &LocalWorker{}