sectorstorage: wire up closing logic
This commit is contained in:
		
							parent
							
								
									ecf53f88ce
								
							
						
					
					
						commit
						cfc65f525a
					
				| @ -35,15 +35,14 @@ var workersListCmd = &cli.Command{ | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		st := make([]struct { | ||||
| 		type sortableStat struct { | ||||
| 			id uint64 | ||||
| 			api.WorkerStats | ||||
| 		}, 0, len(stats)) | ||||
| 		} | ||||
| 
 | ||||
| 		st := make([]sortableStat, 0, len(stats)) | ||||
| 		for id, stat := range stats { | ||||
| 			st = append(st, struct { | ||||
| 				id uint64 | ||||
| 				api.WorkerStats | ||||
| 			}{id, stat}) | ||||
| 			st = append(st, sortableStat{id, stat}) | ||||
| 		} | ||||
| 
 | ||||
| 		sort.Slice(st, func(i, j int) bool { | ||||
|  | ||||
							
								
								
									
										2
									
								
								extern/filecoin-ffi
									
									
									
									
										vendored
									
									
								
							
							
								
								
								
								
								
								
							
						
						
									
										2
									
								
								extern/filecoin-ffi
									
									
									
									
										vendored
									
									
								
							| @ -1 +1 @@ | ||||
| Subproject commit eb91e8c461452a685ba0d0765f996d2117dbd314 | ||||
| Subproject commit 41b20ed16500eb5b4bacd07ec8aee386257e56da | ||||
| @ -342,5 +342,20 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S | ||||
| func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, sc config.Storage, urls sectorstorage.URLs, ca lapi.Common) (*sectorstorage.Manager, error) { | ||||
| 	ctx := helpers.LifecycleCtx(mctx, lc) | ||||
| 
 | ||||
| 	return sectorstorage.New(ctx, ls, si, cfg, sc, urls, ca) | ||||
| 	sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, ca) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	lc.Append(fx.Hook{ | ||||
| 		OnStop: func(_ context.Context) error { | ||||
| 			if err := sst.Close(); err != nil { | ||||
| 				log.Errorf("%+v", err) | ||||
| 			} | ||||
| 
 | ||||
| 			return nil | ||||
| 		}, | ||||
| 	}) | ||||
| 
 | ||||
| 	return sst, nil | ||||
| } | ||||
|  | ||||
| @ -38,6 +38,8 @@ type Worker interface { | ||||
| 	Paths(context.Context) ([]stores.StoragePath, error) | ||||
| 
 | ||||
| 	Info(context.Context) (api.WorkerInfo, error) | ||||
| 
 | ||||
| 	Close() error | ||||
| } | ||||
| 
 | ||||
| type SectorManager interface { | ||||
| @ -425,4 +427,9 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, erro | ||||
| 	return m.storage.FsStat(ctx, id) | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) Close() error { | ||||
| 	close(m.closing) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| var _ SectorManager = &Manager{} | ||||
|  | ||||
| @ -67,6 +67,9 @@ func (m *Manager) runSched() { | ||||
| 			m.schedQueue.PushBack(req) | ||||
| 		case wid := <-m.workerFree: | ||||
| 			m.onWorkerFreed(wid) | ||||
| 		case <-m.closing: | ||||
| 			m.schedClose() | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @ -240,3 +243,14 @@ func (m *Manager) schedNewWorker(w *workerHandle) { | ||||
| 	m.workers[id] = w | ||||
| 	m.nextWorker++ | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) schedClose() { | ||||
| 	m.workersLk.Lock() | ||||
| 	defer m.workersLk.Unlock() | ||||
| 
 | ||||
| 	for i, w := range m.workers { | ||||
| 		if err := w.w.Close(); err != nil { | ||||
| 			log.Errorf("closing worker %d: %+v", i, err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -203,4 +203,8 @@ func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func (l *LocalWorker) Close() error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| var _ Worker = &LocalWorker{} | ||||
|  | ||||
| @ -10,10 +10,12 @@ import ( | ||||
| 
 | ||||
| 	"github.com/filecoin-project/lotus/api" | ||||
| 	"github.com/filecoin-project/lotus/api/client" | ||||
| 	"github.com/filecoin-project/lotus/lib/jsonrpc" | ||||
| ) | ||||
| 
 | ||||
| type remote struct { | ||||
| 	api.WorkerApi | ||||
| 	closer jsonrpc.ClientCloser | ||||
| } | ||||
| 
 | ||||
| func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error { | ||||
| @ -33,13 +35,17 @@ func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, err | ||||
| 	headers := http.Header{} | ||||
| 	headers.Add("Authorization", "Bearer "+string(token)) | ||||
| 
 | ||||
| 	wapi, close, err := client.NewWorkerRPC(url, headers) | ||||
| 	wapi, closer, err := client.NewWorkerRPC(url, headers) | ||||
| 	if err != nil { | ||||
| 		return nil, xerrors.Errorf("creating jsonrpc client: %w", err) | ||||
| 	} | ||||
| 	_ = close // TODO
 | ||||
| 
 | ||||
| 	return &remote{wapi}, nil | ||||
| 	return &remote{wapi, closer}, nil | ||||
| } | ||||
| 
 | ||||
| func (r *remote) Close() error { | ||||
| 	r.closer() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| var _ Worker = &remote{} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user