sched: Handle workers using sessions instead of connections

This commit is contained in:
Łukasz Magiera 2020-10-18 12:35:44 +02:00
parent 7ac5dc55d0
commit 8d06cca073
15 changed files with 294 additions and 273 deletions

View File

@ -5,6 +5,7 @@ import (
"context"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
@ -62,8 +63,8 @@ type StorageMiner interface {
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error
WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error)
WorkerJobs(context.Context) (map[int64][]storiface.WorkerJob, error)
WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error)
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error)
storiface.WorkerReturn
// SealingSchedDiag dumps internal sealing scheduler state

View File

@ -3,6 +3,8 @@ package api
import (
"context"
"github.com/google/uuid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -26,5 +28,5 @@ type WorkerAPI interface {
StorageAddLocal(ctx context.Context, path string) error
Closing(context.Context) (<-chan struct{}, error)
Session(context.Context) (uuid.UUID, error)
}

View File

@ -296,9 +296,9 @@ type StorageMinerStruct struct {
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[int64][]storiface.WorkerJob, error) `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"`
ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin" retry:"true"`
ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin" retry:"true"`
@ -376,7 +376,7 @@ type WorkerStruct struct {
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Closing func(context.Context) (<-chan struct{}, error) `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
}
}
@ -1200,11 +1200,11 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro
return c.Internal.WorkerConnect(ctx, url)
}
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]storiface.WorkerStats, error) {
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uuid.UUID]storiface.WorkerStats, error) {
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) {
func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) {
return c.Internal.WorkerJobs(ctx)
}
@ -1490,8 +1490,8 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}
func (w *WorkerStruct) Closing(ctx context.Context) (<-chan struct{}, error) {
return w.Internal.Closing(ctx)
func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
return w.Internal.Session(ctx)
}
func (g GatewayStruct) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {

View File

@ -449,7 +449,7 @@ var runCmd = &cli.Command{
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
workerApi.LocalWorker.WaitQuiet()
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -53,7 +54,7 @@ var sealingWorkersCmd = &cli.Command{
}
type sortableStat struct {
id uint64
id uuid.UUID
storiface.WorkerStats
}
@ -63,7 +64,7 @@ var sealingWorkersCmd = &cli.Command{
}
sort.Slice(st, func(i, j int) bool {
return st[i].id < st[j].id
return st[i].id.String() < st[j].id.String()
})
for _, stat := range st {
@ -74,7 +75,7 @@ var sealingWorkersCmd = &cli.Command{
gpuUse = ""
}
fmt.Printf("Worker %d, host %s\n", stat.id, color.MagentaString(stat.Info.Hostname))
fmt.Printf("Worker %s, host %s\n", stat.id, color.MagentaString(stat.Info.Hostname))
var barCols = uint64(64)
cpuBars := int(stat.CpuUse * barCols / stat.Info.Resources.CPUs)
@ -140,7 +141,7 @@ var sealingJobsCmd = &cli.Command{
type line struct {
storiface.WorkerJob
wid int64
wid uuid.UUID
}
lines := make([]line, 0)
@ -165,7 +166,7 @@ var sealingJobsCmd = &cli.Command{
return lines[i].Start.Before(lines[j].Start)
})
workerHostnames := map[int64]string{}
workerHostnames := map[uuid.UUID]string{}
wst, err := nodeApi.WorkerStats(ctx)
if err != nil {
@ -173,7 +174,7 @@ var sealingJobsCmd = &cli.Command{
}
for wid, st := range wst {
workerHostnames[int64(wid)] = st.Info.Hostname
workerHostnames[wid] = st.Info.Hostname
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
@ -192,7 +193,7 @@ var sealingJobsCmd = &cli.Command{
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur)
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur)
}
return tw.Flush()

View File

@ -7,6 +7,7 @@ import (
"net/http"
"sync"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
@ -40,8 +41,7 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error)
// returns channel signalling worker shutdown
Closing(context.Context) (<-chan struct{}, error)
Session(context.Context) (uuid.UUID, error)
Close() error // TODO: do we need this?
}
@ -57,7 +57,8 @@ type SectorManager interface {
FaultTracker
}
type WorkerID int64
type WorkerID uuid.UUID // worker session UUID
var ClosedWorkerID = uuid.UUID{}
type Manager struct {
scfg *ffiwrapper.Config
@ -190,19 +191,7 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
}
func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
info, err := w.Info(ctx)
if err != nil {
return xerrors.Errorf("getting worker info: %w", err)
}
m.sched.newWorkers <- &workerHandle{
w: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
}
return nil
return m.sched.runWorker(ctx, w)
}
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {

View File

@ -2,7 +2,6 @@ package sectorstorage
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
@ -13,6 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -53,17 +53,13 @@ type WorkerSelector interface {
type scheduler struct {
spt abi.RegisteredSealProof
workersLk sync.RWMutex
nextWorker WorkerID
workers map[WorkerID]*workerHandle
newWorkers chan *workerHandle
watchClosing chan WorkerID
workerClosing chan WorkerID
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
// owned by the sh.runSched goroutine
schedQueue *requestQueue
@ -91,6 +87,8 @@ type workerHandle struct {
wndLk sync.Mutex
activeWindows []*schedWindow
enabled bool
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
@ -108,6 +106,12 @@ type schedWindow struct {
todo []*workerRequest
}
type workerDisableReq struct {
activeWindows []*schedWindow
wid WorkerID
done func()
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
@ -143,16 +147,12 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
return &scheduler{
spt: spt,
nextWorker: 0,
workers: map[WorkerID]*workerHandle{},
newWorkers: make(chan *workerHandle),
watchClosing: make(chan WorkerID),
workerClosing: make(chan WorkerID),
workers: map[WorkerID]*workerHandle{},
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest, 20),
workerChange: make(chan struct{}, 20),
workerDisable: make(chan workerDisableReq),
schedQueue: &requestQueue{},
@ -224,21 +224,19 @@ type SchedDiagInfo struct {
func (sh *scheduler) runSched() {
defer close(sh.closed)
go sh.runWorkerWatcher()
iw := time.After(InitWait)
var initialised bool
for {
var doSched bool
var toDisable []workerDisableReq
select {
case w := <-sh.newWorkers:
sh.newWorker(w)
case wid := <-sh.workerClosing:
sh.dropWorker(wid)
case <-sh.workerChange:
doSched = true
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
doSched = true
case req := <-sh.schedule:
sh.schedQueue.Push(req)
doSched = true
@ -267,6 +265,9 @@ func (sh *scheduler) runSched() {
loop:
for {
select {
case <-sh.workerChange:
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
case req := <-sh.schedule:
sh.schedQueue.Push(req)
if sh.testSync != nil {
@ -279,6 +280,28 @@ func (sh *scheduler) runSched() {
}
}
for _, req := range toDisable {
for _, window := range req.activeWindows {
for _, request := range window.todo {
sh.schedQueue.Push(request)
}
}
openWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
for _, window := range sh.openWindows {
if window.worker != req.wid {
openWindows = append(openWindows, window)
}
}
sh.openWindows = openWindows
sh.workersLk.Lock()
sh.workers[req.wid].enabled = false
sh.workersLk.Unlock()
req.done()
}
sh.trySched()
}
@ -298,6 +321,9 @@ func (sh *scheduler) diag() SchedDiagInfo {
})
}
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
for _, window := range sh.openWindows {
out.OpenWindows = append(out.OpenWindows, window.worker)
}
@ -322,13 +348,14 @@ func (sh *scheduler) trySched() {
*/
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
if len(sh.openWindows) == 0 {
// nothing to schedule on
return
@ -357,11 +384,16 @@ func (sh *scheduler) trySched() {
for wnd, windowRequest := range sh.openWindows {
worker, ok := sh.workers[windowRequest.worker]
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker)
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.worker)
// TODO: How to move forward here?
continue
}
if !worker.enabled {
log.Debugw("skipping disabled worker", "worker", windowRequest.worker)
continue
}
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
continue
@ -499,21 +531,48 @@ func (sh *scheduler) trySched() {
sh.openWindows = newOpenWindows
}
func (sh *scheduler) runWorker(wid WorkerID) {
var ready sync.WaitGroup
ready.Add(1)
defer ready.Wait()
// context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
info, err := w.Info(ctx)
if err != nil {
return xerrors.Errorf("getting worker info: %w", err)
}
sessID, err := w.Session(ctx)
if err != nil {
return xerrors.Errorf("getting worker session: %w", err)
}
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
worker := &workerHandle{
w: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
enabled: true,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
}
wid := WorkerID(sessID)
sh.workersLk.Lock()
_, exist := sh.workers[wid]
if exist {
// this is ok, we're already handling this worker in a different goroutine
return nil
}
sh.workers[wid] = worker
sh.workersLk.Unlock()
go func() {
sh.workersLk.RLock()
worker, found := sh.workers[wid]
sh.workersLk.RUnlock()
ready.Done()
if !found {
panic(fmt.Sprintf("worker %d not found", wid))
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer close(worker.closedMgr)
@ -521,23 +580,60 @@ func (sh *scheduler) runWorker(wid WorkerID) {
taskDone := make(chan struct{}, 1)
windowsRequested := 0
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
disable := func(ctx context.Context) error {
done := make(chan struct{})
workerClosing, err := worker.w.Closing(ctx)
if err != nil {
return
// request cleanup in the main scheduler goroutine
select {
case sh.workerDisable <- workerDisableReq{
activeWindows: worker.activeWindows,
wid: wid,
done: func() {
close(done)
},
}:
case <-ctx.Done():
return ctx.Err()
case <-sh.closing:
return nil
}
// wait for cleanup to complete
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
case <-sh.closing:
return nil
}
worker.activeWindows = worker.activeWindows[:0]
windowsRequested = 0
return nil
}
defer func() {
log.Warnw("Worker closing", "workerid", wid)
log.Warnw("Worker closing", "workerid", sessID)
// TODO: close / return all queued tasks
if err := disable(ctx); err != nil {
log.Warnw("failed to disable worker", "worker", wid, "error", err)
}
sh.workersLk.Lock()
delete(sh.workers, wid)
sh.workersLk.Unlock()
}()
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
defer heartbeatTimer.Stop()
for {
// ask for more windows if we need them
for ; windowsRequested < SchedWindows; windowsRequested++ {
sh.workersLk.Lock()
enabled := worker.enabled
sh.workersLk.Unlock()
// ask for more windows if we need them (non-blocking)
for ; enabled && windowsRequested < SchedWindows; windowsRequested++ {
select {
case sh.windowRequests <- &schedWindowRequest{
worker: wid,
@ -545,33 +641,90 @@ func (sh *scheduler) runWorker(wid WorkerID) {
}:
case <-sh.closing:
return
case <-workerClosing:
return
case <-worker.closingMgr:
return
}
}
select {
case w := <-scheduledWindows:
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
case <-taskDone:
log.Debugw("task done", "workerid", wid)
case <-sh.closing:
return
case <-workerClosing:
return
case <-worker.closingMgr:
return
// wait for more windows to come in, or for tasks to get finished (blocking)
for {
// first ping the worker and check session
{
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := worker.w.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
if err := disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", wid, "error", err)
}
select {
case <-heartbeatTimer.C:
continue
case w := <-scheduledWindows:
// was in flight when initially disabled, return
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
if err := disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", wid, "error", err)
}
case <-sh.closing:
return
case <-worker.closingMgr:
return
}
continue
}
if curSes != sessID {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sessID, "current", curSes)
}
return
}
// session looks good
if !enabled {
sh.workersLk.Lock()
worker.enabled = true
sh.workersLk.Unlock()
// we'll send window requests on the next loop
}
}
select {
case <-heartbeatTimer.C:
continue
case w := <-scheduledWindows:
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
case <-taskDone:
log.Debugw("task done", "workerid", wid)
case <-sh.closing:
return
case <-worker.closingMgr:
return
}
break
}
// process assigned windows (non-blocking)
sh.workersLk.RLock()
worker.wndLk.Lock()
windowsRequested -= sh.workerCompactWindows(worker, wid)
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
@ -622,6 +775,8 @@ func (sh *scheduler) runWorker(wid WorkerID) {
sh.workersLk.RUnlock()
}
}()
return nil
}
func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int {
@ -745,38 +900,6 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
return nil
}
func (sh *scheduler) newWorker(w *workerHandle) {
w.closedMgr = make(chan struct{})
w.closingMgr = make(chan struct{})
sh.workersLk.Lock()
id := sh.nextWorker
sh.workers[id] = w
sh.nextWorker++
sh.workersLk.Unlock()
sh.runWorker(id)
select {
case sh.watchClosing <- id:
case <-sh.closing:
return
}
}
func (sh *scheduler) dropWorker(wid WorkerID) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
w := sh.workers[wid]
sh.workerCleanup(wid, w)
delete(sh.workers, wid)
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:

View File

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
@ -43,7 +44,7 @@ type schedTestWorker struct {
paths []stores.StoragePath
closed bool
closing chan struct{}
session uuid.UUID
}
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
@ -121,15 +122,15 @@ func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error
}, nil
}
func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return s.closing, nil
func (s *schedTestWorker) Session(context.Context) (uuid.UUID, error) {
return s.session, nil
}
func (s *schedTestWorker) Close() error {
if !s.closed {
log.Info("close schedTestWorker")
s.closed = true
close(s.closing)
s.session = uuid.UUID{}
}
return nil
}
@ -142,7 +143,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
taskTypes: taskTypes,
paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},
closing: make(chan struct{}),
session: uuid.New(),
}
for _, path := range w.paths {
@ -160,16 +161,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
require.NoError(t, err)
}
info, err := w.Info(context.TODO())
require.NoError(t, err)
sched.newWorkers <- &workerHandle{
w: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
}
require.NoError(t, sched.runWorker(context.TODO(), w))
}
func TestSchedStartStop(t *testing.T) {
@ -433,7 +425,7 @@ func TestSched(t *testing.T) {
type line struct {
storiface.WorkerJob
wid uint64
wid uuid.UUID
}
lines := make([]line, 0)
@ -442,7 +434,7 @@ func TestSched(t *testing.T) {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: uint64(wid),
wid: wid,
})
}
}
@ -537,7 +529,7 @@ func BenchmarkTrySched(b *testing.B) {
b.StopTimer()
sched := newScheduler(spt)
sched.workers[0] = &workerHandle{
sched.workers[WorkerID{}] = &workerHandle{
w: nil,
info: storiface.WorkerInfo{
Hostname: "t",
@ -549,7 +541,7 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
worker: 0,
worker: WorkerID{},
done: make(chan *schedWindow, 1000),
})
}
@ -599,7 +591,7 @@ func TestWindowCompact(t *testing.T) {
wh.activeWindows = append(wh.activeWindows, window)
}
n := sh.workerCompactWindows(wh, 0)
n := sh.workerCompactWindows(wh, WorkerID{})
require.Equal(t, len(start)-len(expect), n)
for wi, tasks := range expect {

View File

@ -1,100 +0,0 @@
package sectorstorage
import (
"context"
"reflect"
)
func (sh *scheduler) runWorkerWatcher() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nilch := reflect.ValueOf(new(chan struct{})).Elem()
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.closing),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.watchClosing),
},
}
caseToWorker := map[int]WorkerID{}
for {
n, rv, ok := reflect.Select(cases)
switch {
case n == 0: // sh.closing
return
case n == 1: // sh.watchClosing
if !ok {
log.Errorf("watchClosing channel closed")
return
}
wid, ok := rv.Interface().(WorkerID)
if !ok {
panic("got a non-WorkerID message")
}
sh.workersLk.Lock()
workerClosing, err := sh.workers[wid].w.Closing(ctx)
sh.workersLk.Unlock()
if err != nil {
log.Errorf("getting worker closing channel: %+v", err)
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
continue
}
toSet := -1
for i, sc := range cases {
if sc.Chan == nilch {
toSet = i
break
}
}
if toSet == -1 {
toSet = len(cases)
cases = append(cases, reflect.SelectCase{})
}
cases[toSet] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(workerClosing),
}
caseToWorker[toSet] = wid
default:
wid, found := caseToWorker[n]
if !found {
log.Errorf("worker ID not found for case %d", n)
continue
}
delete(caseToWorker, n)
cases[n] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: nilch,
}
log.Warnf("worker %d dropped", wid)
// send in a goroutine to avoid a deadlock between workerClosing / watchClosing
go func() {
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
}()
}
}
}

View File

@ -3,18 +3,22 @@ package sectorstorage
import (
"time"
"github.com/google/uuid"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
m.sched.workersLk.RLock()
defer m.sched.workersLk.RUnlock()
out := map[uint64]storiface.WorkerStats{}
out := map[uuid.UUID]storiface.WorkerStats{}
for id, handle := range m.sched.workers {
out[uint64(id)] = storiface.WorkerStats{
Info: handle.info,
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
Enabled: handle.enabled,
MemUsedMin: handle.active.memUsedMin,
MemUsedMax: handle.active.memUsedMax,
GpuUsed: handle.active.gpuUsed,
@ -25,12 +29,12 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
return out
}
func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
out := map[int64][]storiface.WorkerJob{}
func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
out := map[uuid.UUID][]storiface.WorkerJob{}
calls := map[storiface.CallID]struct{}{}
for _, t := range m.sched.wt.Running() {
out[int64(t.worker)] = append(out[int64(t.worker)], t.job)
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{}
}
@ -40,7 +44,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
handle.wndLk.Lock()
for wi, window := range handle.activeWindows {
for _, request := range window.todo {
out[int64(id)] = append(out[int64(id)], storiface.WorkerJob{
out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{
ID: storiface.UndefCall,
Sector: request.sector,
Task: request.taskType,
@ -63,7 +67,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob {
continue
}
out[-1] = append(out[-1], storiface.WorkerJob{
out[uuid.UUID{}] = append(out[uuid.UUID{}], storiface.WorkerJob{
ID: id,
Sector: id.Sector,
Task: work.Method,

View File

@ -32,7 +32,8 @@ type WorkerResources struct {
}
type WorkerStats struct {
Info WorkerInfo
Info WorkerInfo
Enabled bool
MemUsedMin uint64
MemUsedMax uint64

View File

@ -27,6 +27,8 @@ type testWorker struct {
pc1s int
pc1lk sync.Mutex
pc1wait *sync.WaitGroup
session uuid.UUID
}
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
@ -46,6 +48,8 @@ func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerR
ret: ret,
mockSeal: mock.NewMockSectorMgr(ssize, nil),
session: uuid.New(),
}
}
@ -158,8 +162,8 @@ func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
}, nil
}
func (t *testWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return ctx.Done(), nil
func (t *testWorker) Session(context.Context) (uuid.UUID, error) {
return t.session, nil
}
func (t *testWorker) Close() error {

View File

@ -48,6 +48,7 @@ type LocalWorker struct {
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
session uuid.UUID
closing chan struct{}
}
@ -73,6 +74,7 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf
executor: executor,
noSwap: wcfg.NoSwap,
session: uuid.New(),
closing: make(chan struct{}),
}
@ -465,8 +467,13 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}, nil
}
func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return l.closing, nil
func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error) {
select {
case <-l.closing:
return ClosedWorkerID, nil
default:
return l.session, nil
}
}
func (l *LocalWorker) Close() error {

View File

@ -8,6 +8,7 @@ import (
"strconv"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"
@ -85,11 +86,11 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
sm.StorageMgr.ServeHTTP(w, r)
}
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error) {
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) {
return sm.StorageMgr.WorkerStats(), nil
}
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) {
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) {
return sm.StorageMgr.WorkerJobs(), nil
}

View File

@ -16,7 +16,6 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
proof0 "github.com/filecoin-project/specs-actors/actors/runtime/proof"
@ -31,6 +30,7 @@ import (
type mockStorageMinerAPI struct {
partitions []api.Partition
pushedMessages chan *types.Message
storageMinerApi
}
func newMockStorageMinerAPI() *mockStorageMinerAPI {
@ -46,10 +46,6 @@ func (m *mockStorageMinerAPI) StateMinerInfo(ctx context.Context, a address.Addr
}, nil
}
func (m *mockStorageMinerAPI) StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error) {
panic("implement me")
}
func (m *mockStorageMinerAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) {
return abi.Randomness("ticket rand"), nil
}