workers: Memory based scheduling

This commit is contained in:
Łukasz Magiera 2020-03-20 23:30:17 +01:00
parent 076cc428af
commit c57c0e7f55
10 changed files with 575 additions and 36 deletions

View File

@ -2,12 +2,12 @@ package api
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
) )
type WorkerApi interface { type WorkerApi interface {
@ -16,6 +16,22 @@ type WorkerApi interface {
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight
Paths(context.Context) ([]stores.StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
Info(context.Context) (WorkerInfo, error)
storage.Sealer storage.Sealer
} }
type WorkerResources struct {
MemPhysical uint64
MemSwap uint64
MemReserved uint64 // Used by system / other processes
GPUs []string
}
type WorkerInfo struct {
Hostname string
Resources WorkerResources
}

View File

@ -205,6 +205,7 @@ type WorkerStruct struct {
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"`
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (api.WorkerInfo, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
@ -724,6 +725,10 @@ func (w *WorkerStruct) Paths(ctx context.Context) ([]stores.StoragePath, error)
return w.Internal.Paths(ctx) return w.Internal.Paths(ctx)
} }
func (w *WorkerStruct) Info(ctx context.Context) (api.WorkerInfo, error) {
return w.Internal.Info(ctx)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces) return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
} }

View File

@ -137,11 +137,11 @@ func main() {
if err != nil { if err != nil {
return err return err
} }
defer func() { /*defer func() {
if err := os.RemoveAll(tsdir); err != nil { if err := os.RemoveAll(tsdir); err != nil {
log.Warn("remove all: ", err) log.Warn("remove all: ", err)
} }
}() }()*/
sbdir = tsdir sbdir = tsdir
} else { } else {
exp, err := homedir.Expand(robench) exp, err := homedir.Expand(robench)

1
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/coreos/go-systemd/v22 v22.0.0 github.com/coreos/go-systemd/v22 v22.0.0
github.com/docker/go-units v0.4.0 github.com/docker/go-units v0.4.0
github.com/elastic/go-sysinfo v1.3.0
github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5 github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5
github.com/filecoin-project/filecoin-ffi v0.0.0-20200304181354-4446ff8a1bb9 github.com/filecoin-project/filecoin-ffi v0.0.0-20200304181354-4446ff8a1bb9
github.com/filecoin-project/go-address v0.0.2-0.20200218010043-eb9bb40ed5be github.com/filecoin-project/go-address v0.0.2-0.20200218010043-eb9bb40ed5be

11
go.sum
View File

@ -96,6 +96,10 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE=
github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY=
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY= github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY=
github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8= github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8=
github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5 h1:cr9+8iX+u9fDV53MWqqZw820EyeWVX+h/HCz56JUWb0= github.com/filecoin-project/chain-validation v0.0.6-0.20200318065243-0ccb5ec3afc5 h1:cr9+8iX+u9fDV53MWqqZw820EyeWVX+h/HCz56JUWb0=
@ -363,6 +367,8 @@ github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@ -702,6 +708,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0 h1:c8R11WC8m7KNMkTv/0+Be8vvwo4I3/Ut9AC2FW8fX3U=
github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
@ -887,6 +895,7 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 h1:gSbV7h1NRL2G1xTg/owz62CST1oJBmxy4QpMMregXVQ= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 h1:gSbV7h1NRL2G1xTg/owz62CST1oJBmxy4QpMMregXVQ=
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -955,5 +964,7 @@ honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M=
howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM= launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=

View File

@ -155,7 +155,7 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error
log.Infof("Connected to a remote worker at %s", url) log.Infof("Connected to a remote worker at %s", url)
return sm.StorageMgr.AddWorker(w) return sm.StorageMgr.AddWorker(ctx, w)
} }
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {

View File

@ -1,6 +1,7 @@
package advmgr package advmgr
import ( import (
"container/list"
"context" "context"
"io" "io"
"net/http" "net/http"
@ -32,10 +33,13 @@ type Worker interface {
// Returns paths accessible to the worker // Returns paths accessible to the worker
Paths(context.Context) ([]stores.StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
Info(context.Context) (api.WorkerInfo, error)
} }
type workerID uint64
type Manager struct { type Manager struct {
workers []Worker
scfg *sectorbuilder.Config scfg *sectorbuilder.Config
ls stores.LocalStorage ls stores.LocalStorage
@ -46,7 +50,16 @@ type Manager struct {
storage2.Prover storage2.Prover
lk sync.Mutex workersLk sync.Mutex
nextWorker workerID
workers map[workerID]*workerHandle
newWorkers chan *workerHandle
schedule chan *workerRequest
workerFree chan workerID
closing chan struct{}
schedQueue list.List // List[*workerRequest]
} }
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
@ -68,12 +81,6 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
stor := stores.NewRemote(lstor, si, headers) stor := stores.NewRemote(lstor, si, headers)
m := &Manager{ m := &Manager{
workers: []Worker{
NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize},
}, stor, lstor, si),
},
scfg: cfg, scfg: cfg,
ls: ls, ls: ls,
@ -82,9 +89,27 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
remoteHnd: &stores.FetchHandler{Store: lstor}, remoteHnd: &stores.FetchHandler{Store: lstor},
index: si, index: si,
nextWorker: 0,
workers: map[workerID]*workerHandle{},
newWorkers: make(chan *workerHandle),
schedule: make(chan *workerRequest),
workerFree: make(chan workerID),
closing: make(chan struct{}),
Prover: prover, Prover: prover,
} }
go m.runSched()
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize},
}, stor, lstor, si))
if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err)
}
return m, nil return m, nil
} }
@ -106,11 +131,16 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
return nil return nil
} }
func (m *Manager) AddWorker(w Worker) error { func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
m.lk.Lock() info, err := w.Info(ctx)
defer m.lk.Unlock() if err != nil {
return xerrors.Errorf("getting worker info: %w", err)
}
m.workers = append(m.workers, w) m.newWorkers <- &workerHandle{
w: w,
info: info,
}
return nil return nil
} }
@ -127,12 +157,15 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, secto
panic("implement me") panic("implement me")
} }
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]Worker, map[int]stores.StorageInfo) { func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) {
var workers []Worker m.workersLk.Lock()
paths := map[int]stores.StorageInfo{} defer m.workersLk.Unlock()
var workers []workerID
paths := map[workerID]stores.StorageInfo{}
for i, worker := range m.workers { for i, worker := range m.workers {
tt, err := worker.TaskTypes(context.TODO()) tt, err := worker.w.TaskTypes(context.TODO())
if err != nil { if err != nil {
log.Errorf("error getting supported worker task types: %+v", err) log.Errorf("error getting supported worker task types: %+v", err)
continue continue
@ -142,7 +175,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
continue continue
} }
phs, err := worker.Paths(context.TODO()) phs, err := worker.w.Paths(context.TODO())
if err != nil { if err != nil {
log.Errorf("error getting worker paths: %+v", err) log.Errorf("error getting worker paths: %+v", err)
continue continue
@ -169,12 +202,39 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
} }
paths[i] = *st paths[i] = *st
workers = append(workers, worker) workers = append(workers, i)
} }
return workers, paths return workers, paths
} }
func (m *Manager) getWorker(ctx context.Context, taskType sealmgr.TaskType, accept []workerID) (Worker, func(), error) {
ret := make(chan workerResponse)
select {
case m.schedule <- &workerRequest{
taskType: taskType,
accept: accept,
cancel: ctx.Done(),
ret: ret,
}:
case <-m.closing:
return nil, nil, xerrors.New("closing")
case <-ctx.Done():
return nil, nil, ctx.Err()
}
select {
case resp := <-ret:
return resp.worker, resp.done, resp.err
case <-m.closing:
return nil, nil, xerrors.New("closing")
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
log.Warnf("stub NewSector") log.Warnf("stub NewSector")
return nil return nil
@ -201,9 +261,15 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.New("no worker found") return abi.PieceInfo{}, xerrors.New("no worker found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTAddPiece, candidateWorkers)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err)
}
defer done()
// TODO: select(candidateWorkers, ...) // TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly // TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].AddPiece(ctx, sector, existingPieces, sz, r) return worker.AddPiece(ctx, sector, existingPieces, sz, r)
} }
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) { func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
@ -216,12 +282,18 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best) candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") // TODO: wait? return nil, xerrors.New("no suitable workers found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit1, candidateWorkers)
if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err)
}
defer done()
// TODO: select(candidateWorkers, ...) // TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly // TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit1(ctx, sector, ticket, pieces) return worker.SealPreCommit1(ctx, sector, ticket, pieces)
} }
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) { func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
@ -234,12 +306,18 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return storage2.SectorCids{}, xerrors.New("no suitable workers found") // TODO: wait? return storage2.SectorCids{}, xerrors.New("no suitable workers found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit2, candidateWorkers)
if err != nil {
return storage2.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err)
}
defer done()
// TODO: select(candidateWorkers, ...) // TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly // TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit2(ctx, sector, phase1Out) return worker.SealPreCommit2(ctx, sector, phase1Out)
} }
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) { func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
@ -253,14 +331,24 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return nil, xerrors.New("no suitable workers found") // TODO: wait? return nil, xerrors.New("no suitable workers found") // TODO: wait?
} }
// TODO: Try very hard to execute on worker with access to the sectors
worker, done, err := m.getWorker(ctx, sealmgr.TTCommit1, candidateWorkers)
if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err)
}
defer done()
// TODO: select(candidateWorkers, ...) // TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly // TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealCommit1(ctx, sector, ticket, seed, pieces, cids) return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
} }
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
for _, worker := range m.workers { var candidateWorkers []workerID
tt, err := worker.TaskTypes(context.TODO())
m.workersLk.Lock()
for id, worker := range m.workers {
tt, err := worker.w.TaskTypes(ctx)
if err != nil { if err != nil {
log.Errorf("error getting supported worker task types: %+v", err) log.Errorf("error getting supported worker task types: %+v", err)
continue continue
@ -268,11 +356,17 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
if _, ok := tt[sealmgr.TTCommit2]; !ok { if _, ok := tt[sealmgr.TTCommit2]; !ok {
continue continue
} }
candidateWorkers = append(candidateWorkers, id)
}
m.workersLk.Unlock()
worker, done, err := m.getWorker(ctx, sealmgr.TTCommit2, candidateWorkers)
if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err)
}
defer done()
return worker.SealCommit2(ctx, sector, phase1Out) return worker.SealCommit2(ctx, sector, phase1Out)
}
return nil, xerrors.New("no worker found")
} }
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
@ -281,11 +375,11 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
return xerrors.Errorf("finding sealed sector: %w", err) return xerrors.Errorf("finding sealed sector: %w", err)
} }
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best) candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTFinalize, best)
// TODO: Remove sector from sealing stores // TODO: Remove sector from sealing stores
// TODO: Move the sector to long-term storage // TODO: Move the sector to long-term storage
return candidateWorkers[0].FinalizeSector(ctx, sector) return m.workers[candidateWorkers[0]].w.FinalizeSector(ctx, sector)
} }
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -0,0 +1,135 @@
package advmgr
import (
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads
sectorbuilder.FTUnsealed: 10,
sectorbuilder.FTSealed: 10,
sectorbuilder.FTCache: 70, // TODO: confirm for 32G
}
var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{
sectorbuilder.FTUnsealed: 10,
sectorbuilder.FTSealed: 10,
sectorbuilder.FTCache: 2,
}
type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Mamory required (swap + ram)
MultiThread bool
CanGPU bool
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
}
const MaxCachingOverhead = 32 << 30
var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{
sealmgr.TTAddPiece: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
MultiThread: false,
BaseMinMemory: 1 << 30,
},
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
MultiThread: false,
BaseMinMemory: 1 << 30,
},
},
sealmgr.TTPreCommit1: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 64 << 30,
MinMemory: 32 << 30,
MultiThread: false,
BaseMinMemory: 30 << 30,
},
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
MultiThread: false,
BaseMinMemory: 1 << 30,
},
},
sealmgr.TTPreCommit2: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 96 << 30,
MinMemory: 64 << 30,
MultiThread: true,
BaseMinMemory: 30 << 30,
},
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
MultiThread: true,
BaseMinMemory: 1 << 30,
},
},
sealmgr.TTCommit1: { // Very short (~100ms), so params are very light
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
MultiThread: false,
BaseMinMemory: 1 << 30,
},
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
MultiThread: false,
BaseMinMemory: 1 << 30,
},
},
sealmgr.TTCommit2: { // TODO: Measure more accurately
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 110 << 30,
MinMemory: 60 << 30,
MultiThread: true,
CanGPU: true,
BaseMinMemory: 64 << 30, // params
},
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
MultiThread: false, // This is fine
CanGPU: true,
BaseMinMemory: 10 << 30,
},
},
}
func init() {
// for now we just reuse params for 2kib and 8mib from 512mib
for taskType := range ResourceTable {
ResourceTable[taskType][abi.RegisteredProof_StackedDRG8MiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal]
ResourceTable[taskType][abi.RegisteredProof_StackedDRG2KiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal]
}
}

View File

@ -0,0 +1,239 @@
package advmgr
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
const mib = 1 << 20
type workerRequest struct {
taskType sealmgr.TaskType
accept []workerID // ordered by preference
ret chan<- workerResponse
cancel <-chan struct{}
}
type workerResponse struct {
err error
worker Worker
done func()
}
func (r *workerRequest) respond(resp workerResponse) {
select {
case r.ret <- resp:
case <-r.cancel:
log.Warnf("request got cancelled before we could respond")
if resp.done != nil {
resp.done()
}
}
}
type workerHandle struct {
w Worker
info api.WorkerInfo
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things
}
func (m *Manager) runSched() {
for {
select {
case w := <-m.newWorkers:
m.schedNewWorker(w)
case req := <-m.schedule:
resp, err := m.maybeSchedRequest(req)
if err != nil {
req.respond(workerResponse{err: err})
continue
}
if resp != nil {
req.respond(*resp)
continue
}
m.schedQueue.PushBack(req)
case wid := <-m.workerFree:
m.onWorkerFreed(wid)
}
}
}
func (m *Manager) onWorkerFreed(wid workerID) {
for e := m.schedQueue.Front(); e != nil; e = e.Next() {
req := e.Value.(*workerRequest)
var ok bool
for _, id := range req.accept {
if id == wid {
ok = true
break
}
}
if !ok {
continue
}
resp, err := m.maybeSchedRequest(req)
if err != nil {
req.respond(workerResponse{err: err})
continue
}
if resp != nil {
req.respond(*resp)
pe := e.Prev()
m.schedQueue.Remove(e)
if pe == nil {
pe = m.schedQueue.Front()
}
e = pe
continue
}
}
}
func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) {
m.workersLk.Lock()
defer m.workersLk.Unlock()
tried := 0
for _, id := range req.accept {
w, ok := m.workers[id]
if !ok {
log.Warnf("requested worker %d is not in scheduler", id)
}
tried++
canDo, err := m.canHandleRequest(id, w, req)
if err != nil {
return nil, err
}
if !canDo {
continue
}
return m.makeResponse(id, w, req), nil
}
if tried == 0 {
return nil, xerrors.New("maybeSchedRequest didn't find any good workers")
}
return nil, nil // put in waiting queue
}
func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse {
needRes := ResourceTable[req.taskType][m.scfg.SealProofType]
w.gpuUsed = needRes.CanGPU
if needRes.MultiThread {
w.cpuUse = -1
} else {
if w.cpuUse != -1 {
w.cpuUse++
} else {
log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid)
}
}
w.memUsedMin += needRes.MinMemory
w.memUsedMax += needRes.MaxMemory
return &workerResponse{
err: nil,
worker: w.w,
done: func() {
m.workersLk.Lock()
if needRes.CanGPU {
w.gpuUsed = false
}
if needRes.MultiThread {
w.cpuUse = 0
} else if w.cpuUse != -1 {
w.cpuUse--
}
w.memUsedMin -= needRes.MinMemory
w.memUsedMax -= needRes.MaxMemory
m.workersLk.Unlock()
select {
case m.workerFree <- wid:
case <-m.closing:
}
},
}
}
func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) {
needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType]
if !ok {
return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType)
}
res := w.info.Resources
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false, nil
}
maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal {
maxNeedMem += MaxCachingOverhead
}
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false, nil
}
if needRes.MultiThread {
if w.cpuUse != 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid)
return false, nil
}
} else {
if w.cpuUse == -1 {
log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid)
return false, nil
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if w.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false, nil
}
}
return true, nil
}
func (m *Manager) schedNewWorker(w *workerHandle) {
m.workersLk.Lock()
defer m.workersLk.Unlock()
id := m.nextWorker
m.workers[id] = w
m.nextWorker++
}

View File

@ -3,11 +3,17 @@ package advmgr
import ( import (
"context" "context"
"io" "io"
"os"
"github.com/elastic/go-sysinfo"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sealmgr/stores"
@ -157,4 +163,36 @@ func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx) return l.localStore.Local(ctx)
} }
func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) {
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
panic(err)
}
gpus, err := ffi.GetGPUDevices()
if err != nil {
log.Errorf("getting gpu devices failed: %+v", err)
}
h, err := sysinfo.Host()
if err != nil {
return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err)
}
mem, err := h.Memory()
if err != nil {
return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
return api.WorkerInfo{
Hostname: hostname,
Resources: api.WorkerResources{
MemPhysical: mem.Total,
MemSwap: mem.VirtualTotal,
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
GPUs: gpus,
},
}, nil
}
var _ Worker = &LocalWorker{} var _ Worker = &LocalWorker{}