diff --git a/api/api_storage.go b/api/api_storage.go index 0f3b07bd5..4712ed98c 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -121,7 +121,7 @@ type StorageMiner interface { MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) - SetPrice(context.Context, types.BigInt) error + MarketSetPrice(context.Context, types.BigInt) error DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index b6bc3f1b3..1f6f90b37 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -172,7 +172,7 @@ type StorageMinerStruct struct { MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` - /* Market */ SetPrice func(context.Context, types.BigInt) error `perm:"admin"` + MarketSetPrice func(context.Context, types.BigInt) error `perm:"admin"` PledgeSector func(context.Context) error `perm:"write"` @@ -713,8 +713,8 @@ func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]s return c.Internal.MarketListIncompleteDeals(ctx) } -func (c *StorageMinerStruct) SetPrice(ctx context.Context, p types.BigInt) error { - return c.Internal.SetPrice(ctx, p) +func (c *StorageMinerStruct) MarketSetPrice(ctx context.Context, p types.BigInt) error { + return c.Internal.MarketSetPrice(ctx, p) } func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { diff --git a/api/client/client.go b/api/client/client.go index 0380d45b5..d0facc55d 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -14,7 +14,9 @@ func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, jsonrpc.C closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.Internal, - }, requestHeader) + }, + requestHeader, + ) return &res, closer, err } @@ -38,7 +40,9 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine []interface{}{ &res.CommonStruct.Internal, &res.Internal, - }, requestHeader) + }, + requestHeader, + ) return &res, closer, err } @@ -48,7 +52,9 @@ func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerApi, jsonrp closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.Internal, - }, requestHeader) + }, + requestHeader, + ) return &res, closer, err } diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index c3717735d..dd0972099 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -277,8 +277,6 @@ var runCmd = &cli.Command{ } }() - // todo go register - return srv.Serve(nl) }, } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 5a62c4e19..8bf135479 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -404,7 +404,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return xerrors.Errorf("getting id address: %w", err) } - smgr, err := sectorstorage.New(lr, stores.NewIndex(), §orbuilder.Config{ + smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), §orbuilder.Config{ SealProofType: spt, PoStProofType: ppt, }, nil, api) diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index ea5de6345..13713ee2a 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -32,7 +32,7 @@ var setPriceCmd = &cli.Command{ return err } - return api.SetPrice(ctx, types.BigInt(fp)) + return api.MarketSetPrice(ctx, types.BigInt(fp)) }, } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 897e60f27..3d2a21c87 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -40,6 +40,11 @@ var runCmd = &cli.Command{ Name: "nosync", Usage: "don't check full-node sync status", }, + &cli.BoolFlag{ + Name: "manage-fdlimit", + Usage: "manage open file limit", + Value: true, + }, }, Action: func(cctx *cli.Context) error { if !cctx.Bool("enable-gpu-proving") { @@ -58,8 +63,10 @@ var runCmd = &cli.Command{ return err } - if _, _, err := ulimit.ManageFdLimit(); err != nil { - log.Errorf("setting file descriptor limit: %s", err) + if cctx.Bool("manage-fdlimit") { + if _, _, err := ulimit.ManageFdLimit(); err != nil { + log.Errorf("setting file descriptor limit: %s", err) + } } if v.APIVersion != build.APIVersion { diff --git a/node/builder.go b/node/builder.go index ec8c11f63..730b1b9ef 100644 --- a/node/builder.go +++ b/node/builder.go @@ -272,7 +272,7 @@ func Online() Option { Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig), Override(new(stores.LocalStorage), From(new(repo.LockedRepo))), Override(new(sealing.SectorIDCounter), modules.SectorIDCounter), - Override(new(*sectorstorage.Manager), sectorstorage.New), + Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 24027bba5..8579bdcf9 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -178,7 +178,7 @@ func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]sto return sm.StorageProvider.ListIncompleteDeals() } -func (sm *StorageMinerAPI) SetPrice(ctx context.Context, p types.BigInt) error { +func (sm *StorageMinerAPI) MarketSetPrice(ctx context.Context, p types.BigInt) error { return sm.StorageProvider.AddAsk(abi.TokenAmount(p), 60*60*24*100) // lasts for 100 days? } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 2c6a4c805..16713e745 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -50,6 +50,7 @@ import ( "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sectorstorage" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -336,3 +337,9 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S network := rmnet.NewFromLibp2pHost(h) return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, ds) } + +func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls sectorstorage.URLs, ca lapi.Common) (*sectorstorage.Manager, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + + return sectorstorage.New(ctx, ls, si, cfg, urls, ca) +} diff --git a/storage/sectorstorage/manager.go b/storage/sectorstorage/manager.go index f3aa90964..d198c88ec 100644 --- a/storage/sectorstorage/manager.go +++ b/storage/sectorstorage/manager.go @@ -3,6 +3,7 @@ package sectorstorage import ( "container/list" "context" + "errors" "io" "net/http" "sync" @@ -24,6 +25,8 @@ import ( var log = logging.Logger("advmgr") +var ErrNoWorkers = errors.New("no suitable workers found") + type URLs []string type Worker interface { @@ -71,9 +74,7 @@ type Manager struct { schedQueue *list.List // List[*workerRequest] } -func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { - ctx := context.TODO() - +func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err @@ -84,7 +85,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi return nil, xerrors.Errorf("creating prover instance: %w", err) } - token, err := ca.AuthNew(context.TODO(), []api.Permission{"admin"}) + token, err := ca.AuthNew(ctx, []api.Permission{"admin"}) headers := http.Header{} headers.Add("Authorization", "Bearer "+string(token)) stor := stores.NewRemote(lstor, si, headers) @@ -272,7 +273,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best) if len(candidateWorkers) == 0 { - return abi.PieceInfo{}, xerrors.New("no worker found") + return abi.PieceInfo{}, ErrNoWorkers } worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers) @@ -296,7 +297,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best) if len(candidateWorkers) == 0 { - return nil, xerrors.New("no suitable workers found") + return nil, ErrNoWorkers } worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers) @@ -320,7 +321,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best) if len(candidateWorkers) == 0 { - return storage.SectorCids{}, xerrors.New("no suitable workers found") + return storage.SectorCids{}, ErrNoWorkers } worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers) @@ -342,7 +343,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best) if len(candidateWorkers) == 0 { - return nil, xerrors.New("no suitable workers found") // TODO: wait? + return nil, ErrNoWorkers } // TODO: Try very hard to execute on worker with access to the sectors @@ -373,6 +374,9 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou candidateWorkers = append(candidateWorkers, id) } m.workersLk.Unlock() + if len(candidateWorkers) == 0 { + return nil, ErrNoWorkers + } worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers) if err != nil { @@ -390,6 +394,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error } candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best) + if len(candidateWorkers) == 0 { + return ErrNoWorkers + } // TODO: Remove sector from sealing stores // TODO: Move the sector to long-term storage