sectorstorage: wire up closing logic
This commit is contained in:
parent
d8f357b01c
commit
817e699738
@ -38,6 +38,8 @@ type Worker interface {
|
||||
Paths(context.Context) ([]stores.StoragePath, error)
|
||||
|
||||
Info(context.Context) (api.WorkerInfo, error)
|
||||
|
||||
Close() error
|
||||
}
|
||||
|
||||
type SectorManager interface {
|
||||
@ -425,4 +427,9 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, erro
|
||||
return m.storage.FsStat(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Manager) Close() error {
|
||||
close(m.closing)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ SectorManager = &Manager{}
|
||||
|
14
sched.go
14
sched.go
@ -67,6 +67,9 @@ func (m *Manager) runSched() {
|
||||
m.schedQueue.PushBack(req)
|
||||
case wid := <-m.workerFree:
|
||||
m.onWorkerFreed(wid)
|
||||
case <-m.closing:
|
||||
m.schedClose()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -240,3 +243,14 @@ func (m *Manager) schedNewWorker(w *workerHandle) {
|
||||
m.workers[id] = w
|
||||
m.nextWorker++
|
||||
}
|
||||
|
||||
func (m *Manager) schedClose() {
|
||||
m.workersLk.Lock()
|
||||
defer m.workersLk.Unlock()
|
||||
|
||||
for i, w := range m.workers {
|
||||
if err := w.w.Close(); err != nil {
|
||||
log.Errorf("closing worker %d: %+v", i, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -203,4 +203,8 @@ func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ Worker = &LocalWorker{}
|
||||
|
@ -10,10 +10,12 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
)
|
||||
|
||||
type remote struct {
|
||||
api.WorkerApi
|
||||
closer jsonrpc.ClientCloser
|
||||
}
|
||||
|
||||
func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error {
|
||||
@ -33,13 +35,17 @@ func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, err
|
||||
headers := http.Header{}
|
||||
headers.Add("Authorization", "Bearer "+string(token))
|
||||
|
||||
wapi, close, err := client.NewWorkerRPC(url, headers)
|
||||
wapi, closer, err := client.NewWorkerRPC(url, headers)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating jsonrpc client: %w", err)
|
||||
}
|
||||
_ = close // TODO
|
||||
|
||||
return &remote{wapi}, nil
|
||||
return &remote{wapi, closer}, nil
|
||||
}
|
||||
|
||||
func (r *remote) Close() error {
|
||||
r.closer()
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ Worker = &remote{}
|
||||
|
Loading…
Reference in New Issue
Block a user