From 8d06cca073a49911a7018296b11951aceb32b484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 18 Oct 2020 12:35:44 +0200 Subject: [PATCH] sched: Handle workers using sessions instead of connections --- api/api_storage.go | 5 +- api/api_worker.go | 4 +- api/apistruct/struct.go | 16 +- cmd/lotus-seal-worker/main.go | 2 +- cmd/lotus-storage-miner/sealing.go | 15 +- extern/sector-storage/manager.go | 21 +- extern/sector-storage/sched.go | 317 +++++++++++++++------- extern/sector-storage/sched_test.go | 32 +-- extern/sector-storage/sched_watch.go | 100 ------- extern/sector-storage/stats.go | 22 +- extern/sector-storage/storiface/worker.go | 3 +- extern/sector-storage/testworker_test.go | 8 +- extern/sector-storage/worker_local.go | 11 +- node/impl/storminer.go | 5 +- storage/wdpost_run_test.go | 6 +- 15 files changed, 294 insertions(+), 273 deletions(-) delete mode 100644 extern/sector-storage/sched_watch.go diff --git a/api/api_storage.go b/api/api_storage.go index 79d538fe5..5520ad114 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -5,6 +5,7 @@ import ( "context" "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" @@ -62,8 +63,8 @@ type StorageMiner interface { // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error - WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error) - WorkerJobs(context.Context) (map[int64][]storiface.WorkerJob, error) + WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) + WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) storiface.WorkerReturn // SealingSchedDiag dumps internal sealing scheduler state diff --git a/api/api_worker.go b/api/api_worker.go index 42eea9289..036748ec6 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -3,6 +3,8 @@ package api import ( "context" + "github.com/google/uuid" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -26,5 +28,5 @@ type WorkerAPI interface { StorageAddLocal(ctx context.Context, path string) error - Closing(context.Context) (<-chan struct{}, error) + Session(context.Context) (uuid.UUID, error) } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 180570742..70eb518e4 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -296,9 +296,9 @@ type StorageMinerStruct struct { SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` - WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm - WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"` - WorkerJobs func(context.Context) (map[int64][]storiface.WorkerJob, error) `perm:"admin"` + WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm + WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"` + WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"` ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin" retry:"true"` ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin" retry:"true"` @@ -376,7 +376,7 @@ type WorkerStruct struct { Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` - Closing func(context.Context) (<-chan struct{}, error) `perm:"admin"` + Session func(context.Context) (uuid.UUID, error) `perm:"admin"` } } @@ -1200,11 +1200,11 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro return c.Internal.WorkerConnect(ctx, url) } -func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]storiface.WorkerStats, error) { +func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uuid.UUID]storiface.WorkerStats, error) { return c.Internal.WorkerStats(ctx) } -func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) { +func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) { return c.Internal.WorkerJobs(ctx) } @@ -1490,8 +1490,8 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error { return w.Internal.StorageAddLocal(ctx, path) } -func (w *WorkerStruct) Closing(ctx context.Context) (<-chan struct{}, error) { - return w.Internal.Closing(ctx) +func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) { + return w.Internal.Session(ctx) } func (g GatewayStruct) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index bf7ce1e52..3472192e8 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -449,7 +449,7 @@ var runCmd = &cli.Command{ // TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly workerApi.LocalWorker.WaitQuiet() - if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil { + if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil { log.Errorf("Registering worker failed: %+v", err) cancel() return diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 3e33f2185..8649ad7d4 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -11,6 +11,7 @@ import ( "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -53,7 +54,7 @@ var sealingWorkersCmd = &cli.Command{ } type sortableStat struct { - id uint64 + id uuid.UUID storiface.WorkerStats } @@ -63,7 +64,7 @@ var sealingWorkersCmd = &cli.Command{ } sort.Slice(st, func(i, j int) bool { - return st[i].id < st[j].id + return st[i].id.String() < st[j].id.String() }) for _, stat := range st { @@ -74,7 +75,7 @@ var sealingWorkersCmd = &cli.Command{ gpuUse = "" } - fmt.Printf("Worker %d, host %s\n", stat.id, color.MagentaString(stat.Info.Hostname)) + fmt.Printf("Worker %s, host %s\n", stat.id, color.MagentaString(stat.Info.Hostname)) var barCols = uint64(64) cpuBars := int(stat.CpuUse * barCols / stat.Info.Resources.CPUs) @@ -140,7 +141,7 @@ var sealingJobsCmd = &cli.Command{ type line struct { storiface.WorkerJob - wid int64 + wid uuid.UUID } lines := make([]line, 0) @@ -165,7 +166,7 @@ var sealingJobsCmd = &cli.Command{ return lines[i].Start.Before(lines[j].Start) }) - workerHostnames := map[int64]string{} + workerHostnames := map[uuid.UUID]string{} wst, err := nodeApi.WorkerStats(ctx) if err != nil { @@ -173,7 +174,7 @@ var sealingJobsCmd = &cli.Command{ } for wid, st := range wst { - workerHostnames[int64(wid)] = st.Info.Hostname + workerHostnames[wid] = st.Info.Hostname } tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) @@ -192,7 +193,7 @@ var sealingJobsCmd = &cli.Command{ dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String() } - _, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur) + _, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n", hex.EncodeToString(l.ID.ID[10:]), l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, dur) } return tw.Flush() diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 9445bdd2a..0a8ff4339 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -7,6 +7,7 @@ import ( "net/http" "sync" + "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -40,8 +41,7 @@ type Worker interface { Info(context.Context) (storiface.WorkerInfo, error) - // returns channel signalling worker shutdown - Closing(context.Context) (<-chan struct{}, error) + Session(context.Context) (uuid.UUID, error) Close() error // TODO: do we need this? } @@ -57,7 +57,8 @@ type SectorManager interface { FaultTracker } -type WorkerID int64 +type WorkerID uuid.UUID // worker session UUID +var ClosedWorkerID = uuid.UUID{} type Manager struct { scfg *ffiwrapper.Config @@ -190,19 +191,7 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { } func (m *Manager) AddWorker(ctx context.Context, w Worker) error { - info, err := w.Info(ctx) - if err != nil { - return xerrors.Errorf("getting worker info: %w", err) - } - - m.sched.newWorkers <- &workerHandle{ - w: w, - - info: info, - preparing: &activeResources{}, - active: &activeResources{}, - } - return nil + return m.sched.runWorker(ctx, w) } func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index e91c92525..a4e6a6239 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -2,7 +2,6 @@ package sectorstorage import ( "context" - "fmt" "math/rand" "sort" "sync" @@ -13,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) @@ -53,17 +53,13 @@ type WorkerSelector interface { type scheduler struct { spt abi.RegisteredSealProof - workersLk sync.RWMutex - nextWorker WorkerID - workers map[WorkerID]*workerHandle - - newWorkers chan *workerHandle - - watchClosing chan WorkerID - workerClosing chan WorkerID + workersLk sync.RWMutex + workers map[WorkerID]*workerHandle schedule chan *workerRequest windowRequests chan *schedWindowRequest + workerChange chan struct{} // worker added / changed/freed resources + workerDisable chan workerDisableReq // owned by the sh.runSched goroutine schedQueue *requestQueue @@ -91,6 +87,8 @@ type workerHandle struct { wndLk sync.Mutex activeWindows []*schedWindow + enabled bool + // for sync manager goroutine closing cleanupStarted bool closedMgr chan struct{} @@ -108,6 +106,12 @@ type schedWindow struct { todo []*workerRequest } +type workerDisableReq struct { + activeWindows []*schedWindow + wid WorkerID + done func() +} + type activeResources struct { memUsedMin uint64 memUsedMax uint64 @@ -143,16 +147,12 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { return &scheduler{ spt: spt, - nextWorker: 0, - workers: map[WorkerID]*workerHandle{}, - - newWorkers: make(chan *workerHandle), - - watchClosing: make(chan WorkerID), - workerClosing: make(chan WorkerID), + workers: map[WorkerID]*workerHandle{}, schedule: make(chan *workerRequest), windowRequests: make(chan *schedWindowRequest, 20), + workerChange: make(chan struct{}, 20), + workerDisable: make(chan workerDisableReq), schedQueue: &requestQueue{}, @@ -224,21 +224,19 @@ type SchedDiagInfo struct { func (sh *scheduler) runSched() { defer close(sh.closed) - go sh.runWorkerWatcher() - iw := time.After(InitWait) var initialised bool for { var doSched bool + var toDisable []workerDisableReq select { - case w := <-sh.newWorkers: - sh.newWorker(w) - - case wid := <-sh.workerClosing: - sh.dropWorker(wid) - + case <-sh.workerChange: + doSched = true + case dreq := <-sh.workerDisable: + toDisable = append(toDisable, dreq) + doSched = true case req := <-sh.schedule: sh.schedQueue.Push(req) doSched = true @@ -267,6 +265,9 @@ func (sh *scheduler) runSched() { loop: for { select { + case <-sh.workerChange: + case dreq := <-sh.workerDisable: + toDisable = append(toDisable, dreq) case req := <-sh.schedule: sh.schedQueue.Push(req) if sh.testSync != nil { @@ -279,6 +280,28 @@ func (sh *scheduler) runSched() { } } + for _, req := range toDisable { + for _, window := range req.activeWindows { + for _, request := range window.todo { + sh.schedQueue.Push(request) + } + } + + openWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) + for _, window := range sh.openWindows { + if window.worker != req.wid { + openWindows = append(openWindows, window) + } + } + sh.openWindows = openWindows + + sh.workersLk.Lock() + sh.workers[req.wid].enabled = false + sh.workersLk.Unlock() + + req.done() + } + sh.trySched() } @@ -298,6 +321,9 @@ func (sh *scheduler) diag() SchedDiagInfo { }) } + sh.workersLk.RLock() + defer sh.workersLk.RUnlock() + for _, window := range sh.openWindows { out.OpenWindows = append(out.OpenWindows, window.worker) } @@ -322,13 +348,14 @@ func (sh *scheduler) trySched() { */ + sh.workersLk.RLock() + defer sh.workersLk.RUnlock() + windows := make([]schedWindow, len(sh.openWindows)) acceptableWindows := make([][]int, sh.schedQueue.Len()) log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) - sh.workersLk.RLock() - defer sh.workersLk.RUnlock() if len(sh.openWindows) == 0 { // nothing to schedule on return @@ -357,11 +384,16 @@ func (sh *scheduler) trySched() { for wnd, windowRequest := range sh.openWindows { worker, ok := sh.workers[windowRequest.worker] if !ok { - log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker) + log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.worker) // TODO: How to move forward here? continue } + if !worker.enabled { + log.Debugw("skipping disabled worker", "worker", windowRequest.worker) + continue + } + // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) { continue @@ -499,21 +531,48 @@ func (sh *scheduler) trySched() { sh.openWindows = newOpenWindows } -func (sh *scheduler) runWorker(wid WorkerID) { - var ready sync.WaitGroup - ready.Add(1) - defer ready.Wait() +// context only used for startup +func (sh *scheduler) runWorker(ctx context.Context, w Worker) error { + info, err := w.Info(ctx) + if err != nil { + return xerrors.Errorf("getting worker info: %w", err) + } + + sessID, err := w.Session(ctx) + if err != nil { + return xerrors.Errorf("getting worker session: %w", err) + } + if sessID == ClosedWorkerID { + return xerrors.Errorf("worker already closed") + } + + worker := &workerHandle{ + w: w, + info: info, + + preparing: &activeResources{}, + active: &activeResources{}, + enabled: true, + + closingMgr: make(chan struct{}), + closedMgr: make(chan struct{}), + } + + wid := WorkerID(sessID) + + sh.workersLk.Lock() + _, exist := sh.workers[wid] + if exist { + // this is ok, we're already handling this worker in a different goroutine + return nil + } + + sh.workers[wid] = worker + sh.workersLk.Unlock() go func() { - sh.workersLk.RLock() - worker, found := sh.workers[wid] - sh.workersLk.RUnlock() - - ready.Done() - - if !found { - panic(fmt.Sprintf("worker %d not found", wid)) - } + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() defer close(worker.closedMgr) @@ -521,23 +580,60 @@ func (sh *scheduler) runWorker(wid WorkerID) { taskDone := make(chan struct{}, 1) windowsRequested := 0 - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + disable := func(ctx context.Context) error { + done := make(chan struct{}) - workerClosing, err := worker.w.Closing(ctx) - if err != nil { - return + // request cleanup in the main scheduler goroutine + select { + case sh.workerDisable <- workerDisableReq{ + activeWindows: worker.activeWindows, + wid: wid, + done: func() { + close(done) + }, + }: + case <-ctx.Done(): + return ctx.Err() + case <-sh.closing: + return nil + } + + // wait for cleanup to complete + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + case <-sh.closing: + return nil + } + + worker.activeWindows = worker.activeWindows[:0] + windowsRequested = 0 + return nil } defer func() { - log.Warnw("Worker closing", "workerid", wid) + log.Warnw("Worker closing", "workerid", sessID) - // TODO: close / return all queued tasks + if err := disable(ctx); err != nil { + log.Warnw("failed to disable worker", "worker", wid, "error", err) + } + + sh.workersLk.Lock() + delete(sh.workers, wid) + sh.workersLk.Unlock() }() + heartbeatTimer := time.NewTicker(stores.HeartbeatInterval) + defer heartbeatTimer.Stop() + for { - // ask for more windows if we need them - for ; windowsRequested < SchedWindows; windowsRequested++ { + sh.workersLk.Lock() + enabled := worker.enabled + sh.workersLk.Unlock() + + // ask for more windows if we need them (non-blocking) + for ; enabled && windowsRequested < SchedWindows; windowsRequested++ { select { case sh.windowRequests <- &schedWindowRequest{ worker: wid, @@ -545,33 +641,90 @@ func (sh *scheduler) runWorker(wid WorkerID) { }: case <-sh.closing: return - case <-workerClosing: - return case <-worker.closingMgr: return } } - select { - case w := <-scheduledWindows: - worker.wndLk.Lock() - worker.activeWindows = append(worker.activeWindows, w) - worker.wndLk.Unlock() - case <-taskDone: - log.Debugw("task done", "workerid", wid) - case <-sh.closing: - return - case <-workerClosing: - return - case <-worker.closingMgr: - return + // wait for more windows to come in, or for tasks to get finished (blocking) + for { + + // first ping the worker and check session + { + sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2) + curSes, err := worker.w.Session(sctx) + scancel() + if err != nil { + // Likely temporary error + + log.Warnw("failed to check worker session", "error", err) + + if err := disable(ctx); err != nil { + log.Warnw("failed to disable worker with session error", "worker", wid, "error", err) + } + + select { + case <-heartbeatTimer.C: + continue + case w := <-scheduledWindows: + // was in flight when initially disabled, return + worker.wndLk.Lock() + worker.activeWindows = append(worker.activeWindows, w) + worker.wndLk.Unlock() + + if err := disable(ctx); err != nil { + log.Warnw("failed to disable worker with session error", "worker", wid, "error", err) + } + case <-sh.closing: + return + case <-worker.closingMgr: + return + } + continue + } + + if curSes != sessID { + if curSes != ClosedWorkerID { + // worker restarted + log.Warnw("worker session changed (worker restarted?)", "initial", sessID, "current", curSes) + } + + return + } + + // session looks good + if !enabled { + sh.workersLk.Lock() + worker.enabled = true + sh.workersLk.Unlock() + + // we'll send window requests on the next loop + } + } + + select { + case <-heartbeatTimer.C: + continue + case w := <-scheduledWindows: + worker.wndLk.Lock() + worker.activeWindows = append(worker.activeWindows, w) + worker.wndLk.Unlock() + case <-taskDone: + log.Debugw("task done", "workerid", wid) + case <-sh.closing: + return + case <-worker.closingMgr: + return + } + + break } + // process assigned windows (non-blocking) sh.workersLk.RLock() worker.wndLk.Lock() windowsRequested -= sh.workerCompactWindows(worker, wid) - assignLoop: // process windows in order for len(worker.activeWindows) > 0 { @@ -622,6 +775,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { sh.workersLk.RUnlock() } }() + + return nil } func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int { @@ -745,38 +900,6 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke return nil } -func (sh *scheduler) newWorker(w *workerHandle) { - w.closedMgr = make(chan struct{}) - w.closingMgr = make(chan struct{}) - - sh.workersLk.Lock() - - id := sh.nextWorker - sh.workers[id] = w - sh.nextWorker++ - - sh.workersLk.Unlock() - - sh.runWorker(id) - - select { - case sh.watchClosing <- id: - case <-sh.closing: - return - } -} - -func (sh *scheduler) dropWorker(wid WorkerID) { - sh.workersLk.Lock() - defer sh.workersLk.Unlock() - - w := sh.workers[wid] - - sh.workerCleanup(wid, w) - - delete(sh.workers, wid) -} - func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { select { case <-w.closingMgr: diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 3a198bad5..1afa92b64 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" @@ -43,7 +44,7 @@ type schedTestWorker struct { paths []stores.StoragePath closed bool - closing chan struct{} + session uuid.UUID } func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { @@ -121,15 +122,15 @@ func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error }, nil } -func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) { - return s.closing, nil +func (s *schedTestWorker) Session(context.Context) (uuid.UUID, error) { + return s.session, nil } func (s *schedTestWorker) Close() error { if !s.closed { log.Info("close schedTestWorker") s.closed = true - close(s.closing) + s.session = uuid.UUID{} } return nil } @@ -142,7 +143,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str taskTypes: taskTypes, paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, - closing: make(chan struct{}), + session: uuid.New(), } for _, path := range w.paths { @@ -160,16 +161,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str require.NoError(t, err) } - info, err := w.Info(context.TODO()) - require.NoError(t, err) - - sched.newWorkers <- &workerHandle{ - w: w, - - info: info, - preparing: &activeResources{}, - active: &activeResources{}, - } + require.NoError(t, sched.runWorker(context.TODO(), w)) } func TestSchedStartStop(t *testing.T) { @@ -433,7 +425,7 @@ func TestSched(t *testing.T) { type line struct { storiface.WorkerJob - wid uint64 + wid uuid.UUID } lines := make([]line, 0) @@ -442,7 +434,7 @@ func TestSched(t *testing.T) { for _, job := range jobs { lines = append(lines, line{ WorkerJob: job, - wid: uint64(wid), + wid: wid, }) } } @@ -537,7 +529,7 @@ func BenchmarkTrySched(b *testing.B) { b.StopTimer() sched := newScheduler(spt) - sched.workers[0] = &workerHandle{ + sched.workers[WorkerID{}] = &workerHandle{ w: nil, info: storiface.WorkerInfo{ Hostname: "t", @@ -549,7 +541,7 @@ func BenchmarkTrySched(b *testing.B) { for i := 0; i < windows; i++ { sched.openWindows = append(sched.openWindows, &schedWindowRequest{ - worker: 0, + worker: WorkerID{}, done: make(chan *schedWindow, 1000), }) } @@ -599,7 +591,7 @@ func TestWindowCompact(t *testing.T) { wh.activeWindows = append(wh.activeWindows, window) } - n := sh.workerCompactWindows(wh, 0) + n := sh.workerCompactWindows(wh, WorkerID{}) require.Equal(t, len(start)-len(expect), n) for wi, tasks := range expect { diff --git a/extern/sector-storage/sched_watch.go b/extern/sector-storage/sched_watch.go deleted file mode 100644 index 2dd9875d7..000000000 --- a/extern/sector-storage/sched_watch.go +++ /dev/null @@ -1,100 +0,0 @@ -package sectorstorage - -import ( - "context" - "reflect" -) - -func (sh *scheduler) runWorkerWatcher() { - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - nilch := reflect.ValueOf(new(chan struct{})).Elem() - - cases := []reflect.SelectCase{ - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(sh.closing), - }, - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(sh.watchClosing), - }, - } - - caseToWorker := map[int]WorkerID{} - - for { - n, rv, ok := reflect.Select(cases) - - switch { - case n == 0: // sh.closing - return - case n == 1: // sh.watchClosing - if !ok { - log.Errorf("watchClosing channel closed") - return - } - - wid, ok := rv.Interface().(WorkerID) - if !ok { - panic("got a non-WorkerID message") - } - - sh.workersLk.Lock() - workerClosing, err := sh.workers[wid].w.Closing(ctx) - sh.workersLk.Unlock() - if err != nil { - log.Errorf("getting worker closing channel: %+v", err) - select { - case sh.workerClosing <- wid: - case <-sh.closing: - return - } - - continue - } - - toSet := -1 - for i, sc := range cases { - if sc.Chan == nilch { - toSet = i - break - } - } - if toSet == -1 { - toSet = len(cases) - cases = append(cases, reflect.SelectCase{}) - } - - cases[toSet] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(workerClosing), - } - - caseToWorker[toSet] = wid - default: - wid, found := caseToWorker[n] - if !found { - log.Errorf("worker ID not found for case %d", n) - continue - } - - delete(caseToWorker, n) - cases[n] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: nilch, - } - - log.Warnf("worker %d dropped", wid) - // send in a goroutine to avoid a deadlock between workerClosing / watchClosing - go func() { - select { - case sh.workerClosing <- wid: - case <-sh.closing: - return - } - }() - } - } -} diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index bba47d169..f9d96fc5d 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -3,18 +3,22 @@ package sectorstorage import ( "time" + "github.com/google/uuid" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) -func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { +func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats { m.sched.workersLk.RLock() defer m.sched.workersLk.RUnlock() - out := map[uint64]storiface.WorkerStats{} + out := map[uuid.UUID]storiface.WorkerStats{} for id, handle := range m.sched.workers { - out[uint64(id)] = storiface.WorkerStats{ - Info: handle.info, + out[uuid.UUID(id)] = storiface.WorkerStats{ + Info: handle.info, + Enabled: handle.enabled, + MemUsedMin: handle.active.memUsedMin, MemUsedMax: handle.active.memUsedMax, GpuUsed: handle.active.gpuUsed, @@ -25,12 +29,12 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { return out } -func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { - out := map[int64][]storiface.WorkerJob{} +func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob { + out := map[uuid.UUID][]storiface.WorkerJob{} calls := map[storiface.CallID]struct{}{} for _, t := range m.sched.wt.Running() { - out[int64(t.worker)] = append(out[int64(t.worker)], t.job) + out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) calls[t.job.ID] = struct{}{} } @@ -40,7 +44,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { handle.wndLk.Lock() for wi, window := range handle.activeWindows { for _, request := range window.todo { - out[int64(id)] = append(out[int64(id)], storiface.WorkerJob{ + out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{ ID: storiface.UndefCall, Sector: request.sector, Task: request.taskType, @@ -63,7 +67,7 @@ func (m *Manager) WorkerJobs() map[int64][]storiface.WorkerJob { continue } - out[-1] = append(out[-1], storiface.WorkerJob{ + out[uuid.UUID{}] = append(out[uuid.UUID{}], storiface.WorkerJob{ ID: id, Sector: id.Sector, Task: work.Method, diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index e6ab2246f..bbc9ca554 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -32,7 +32,8 @@ type WorkerResources struct { } type WorkerStats struct { - Info WorkerInfo + Info WorkerInfo + Enabled bool MemUsedMin uint64 MemUsedMax uint64 diff --git a/extern/sector-storage/testworker_test.go b/extern/sector-storage/testworker_test.go index 94a87cdd2..fda25643a 100644 --- a/extern/sector-storage/testworker_test.go +++ b/extern/sector-storage/testworker_test.go @@ -27,6 +27,8 @@ type testWorker struct { pc1s int pc1lk sync.Mutex pc1wait *sync.WaitGroup + + session uuid.UUID } func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker { @@ -46,6 +48,8 @@ func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerR ret: ret, mockSeal: mock.NewMockSectorMgr(ssize, nil), + + session: uuid.New(), } } @@ -158,8 +162,8 @@ func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { }, nil } -func (t *testWorker) Closing(ctx context.Context) (<-chan struct{}, error) { - return ctx.Done(), nil +func (t *testWorker) Session(context.Context) (uuid.UUID, error) { + return t.session, nil } func (t *testWorker) Close() error { diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index e38b84d40..739f70fa0 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -48,6 +48,7 @@ type LocalWorker struct { acceptTasks map[sealtasks.TaskType]struct{} running sync.WaitGroup + session uuid.UUID closing chan struct{} } @@ -73,6 +74,7 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf executor: executor, noSwap: wcfg.NoSwap, + session: uuid.New(), closing: make(chan struct{}), } @@ -465,8 +467,13 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { }, nil } -func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) { - return l.closing, nil +func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error) { + select { + case <-l.closing: + return ClosedWorkerID, nil + default: + return l.session, nil + } } func (l *LocalWorker) Close() error { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index f7da91711..b27ea9edb 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" "golang.org/x/xerrors" @@ -85,11 +86,11 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { sm.StorageMgr.ServeHTTP(w, r) } -func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error) { +func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) { return sm.StorageMgr.WorkerStats(), nil } -func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[int64][]storiface.WorkerJob, error) { +func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) { return sm.StorageMgr.WorkerJobs(), nil } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index dd7ac4c24..10dfbd281 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" - "github.com/filecoin-project/go-state-types/network" builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" proof0 "github.com/filecoin-project/specs-actors/actors/runtime/proof" @@ -31,6 +30,7 @@ import ( type mockStorageMinerAPI struct { partitions []api.Partition pushedMessages chan *types.Message + storageMinerApi } func newMockStorageMinerAPI() *mockStorageMinerAPI { @@ -46,10 +46,6 @@ func (m *mockStorageMinerAPI) StateMinerInfo(ctx context.Context, a address.Addr }, nil } -func (m *mockStorageMinerAPI) StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error) { - panic("implement me") -} - func (m *mockStorageMinerAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { return abi.Randomness("ticket rand"), nil }