diff --git a/manager.go b/manager.go index 843d841cb..3ef8e30c6 100644 --- a/manager.go +++ b/manager.go @@ -38,6 +38,8 @@ type Worker interface { Paths(context.Context) ([]stores.StoragePath, error) Info(context.Context) (api.WorkerInfo, error) + + Close() error } type SectorManager interface { @@ -425,4 +427,9 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, erro return m.storage.FsStat(ctx, id) } +func (m *Manager) Close() error { + close(m.closing) + return nil +} + var _ SectorManager = &Manager{} diff --git a/sched.go b/sched.go index a38707f74..d8e3d35a0 100644 --- a/sched.go +++ b/sched.go @@ -67,6 +67,9 @@ func (m *Manager) runSched() { m.schedQueue.PushBack(req) case wid := <-m.workerFree: m.onWorkerFreed(wid) + case <-m.closing: + m.schedClose() + return } } } @@ -240,3 +243,14 @@ func (m *Manager) schedNewWorker(w *workerHandle) { m.workers[id] = w m.nextWorker++ } + +func (m *Manager) schedClose() { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + for i, w := range m.workers { + if err := w.w.Close(); err != nil { + log.Errorf("closing worker %d: %+v", i, err) + } + } +} diff --git a/worker_local.go b/worker_local.go index 18a0305fd..175106bad 100644 --- a/worker_local.go +++ b/worker_local.go @@ -203,4 +203,8 @@ func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { }, nil } +func (l *LocalWorker) Close() error { + return nil +} + var _ Worker = &LocalWorker{} diff --git a/worker_remote.go b/worker_remote.go index f49ea4dc6..ffd96f188 100644 --- a/worker_remote.go +++ b/worker_remote.go @@ -10,10 +10,12 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/lib/jsonrpc" ) type remote struct { api.WorkerApi + closer jsonrpc.ClientCloser } func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error { @@ -33,13 +35,17 @@ func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, err headers := http.Header{} headers.Add("Authorization", "Bearer "+string(token)) - wapi, close, err := client.NewWorkerRPC(url, headers) + wapi, closer, err := client.NewWorkerRPC(url, headers) if err != nil { return nil, xerrors.Errorf("creating jsonrpc client: %w", err) } - _ = close // TODO - return &remote{wapi}, nil + return &remote{wapi, closer}, nil +} + +func (r *remote) Close() error { + r.closer() + return nil } var _ Worker = &remote{}