workers: Address review
This commit is contained in:
parent
8559c89560
commit
726c9c1fe7
@ -121,7 +121,7 @@ type StorageMiner interface {
|
|||||||
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
|
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
|
||||||
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||||
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
|
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
|
||||||
SetPrice(context.Context, types.BigInt) error
|
MarketSetPrice(context.Context, types.BigInt) error
|
||||||
|
|
||||||
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
|
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
|
||||||
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||||
|
@ -172,7 +172,7 @@ type StorageMinerStruct struct {
|
|||||||
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
|
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
|
||||||
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||||
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
|
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
|
||||||
/* Market */ SetPrice func(context.Context, types.BigInt) error `perm:"admin"`
|
MarketSetPrice func(context.Context, types.BigInt) error `perm:"admin"`
|
||||||
|
|
||||||
PledgeSector func(context.Context) error `perm:"write"`
|
PledgeSector func(context.Context) error `perm:"write"`
|
||||||
|
|
||||||
@ -713,8 +713,8 @@ func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]s
|
|||||||
return c.Internal.MarketListIncompleteDeals(ctx)
|
return c.Internal.MarketListIncompleteDeals(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) SetPrice(ctx context.Context, p types.BigInt) error {
|
func (c *StorageMinerStruct) MarketSetPrice(ctx context.Context, p types.BigInt) error {
|
||||||
return c.Internal.SetPrice(ctx, p)
|
return c.Internal.MarketSetPrice(ctx, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
|
func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
|
||||||
|
@ -14,7 +14,9 @@ func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, jsonrpc.C
|
|||||||
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
|
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
|
||||||
[]interface{}{
|
[]interface{}{
|
||||||
&res.Internal,
|
&res.Internal,
|
||||||
}, requestHeader)
|
},
|
||||||
|
requestHeader,
|
||||||
|
)
|
||||||
|
|
||||||
return &res, closer, err
|
return &res, closer, err
|
||||||
}
|
}
|
||||||
@ -38,7 +40,9 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine
|
|||||||
[]interface{}{
|
[]interface{}{
|
||||||
&res.CommonStruct.Internal,
|
&res.CommonStruct.Internal,
|
||||||
&res.Internal,
|
&res.Internal,
|
||||||
}, requestHeader)
|
},
|
||||||
|
requestHeader,
|
||||||
|
)
|
||||||
|
|
||||||
return &res, closer, err
|
return &res, closer, err
|
||||||
}
|
}
|
||||||
@ -48,7 +52,9 @@ func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerApi, jsonrp
|
|||||||
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
|
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
|
||||||
[]interface{}{
|
[]interface{}{
|
||||||
&res.Internal,
|
&res.Internal,
|
||||||
}, requestHeader)
|
},
|
||||||
|
requestHeader,
|
||||||
|
)
|
||||||
|
|
||||||
return &res, closer, err
|
return &res, closer, err
|
||||||
}
|
}
|
||||||
|
@ -277,8 +277,6 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// todo go register
|
|
||||||
|
|
||||||
return srv.Serve(nl)
|
return srv.Serve(nl)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -404,7 +404,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
return xerrors.Errorf("getting id address: %w", err)
|
return xerrors.Errorf("getting id address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
smgr, err := sectorstorage.New(lr, stores.NewIndex(), §orbuilder.Config{
|
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), §orbuilder.Config{
|
||||||
SealProofType: spt,
|
SealProofType: spt,
|
||||||
PoStProofType: ppt,
|
PoStProofType: ppt,
|
||||||
}, nil, api)
|
}, nil, api)
|
||||||
|
@ -32,7 +32,7 @@ var setPriceCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return api.SetPrice(ctx, types.BigInt(fp))
|
return api.MarketSetPrice(ctx, types.BigInt(fp))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +40,11 @@ var runCmd = &cli.Command{
|
|||||||
Name: "nosync",
|
Name: "nosync",
|
||||||
Usage: "don't check full-node sync status",
|
Usage: "don't check full-node sync status",
|
||||||
},
|
},
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "manage-fdlimit",
|
||||||
|
Usage: "manage open file limit",
|
||||||
|
Value: true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
if !cctx.Bool("enable-gpu-proving") {
|
if !cctx.Bool("enable-gpu-proving") {
|
||||||
@ -58,8 +63,10 @@ var runCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, _, err := ulimit.ManageFdLimit(); err != nil {
|
if cctx.Bool("manage-fdlimit") {
|
||||||
log.Errorf("setting file descriptor limit: %s", err)
|
if _, _, err := ulimit.ManageFdLimit(); err != nil {
|
||||||
|
log.Errorf("setting file descriptor limit: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.APIVersion != build.APIVersion {
|
if v.APIVersion != build.APIVersion {
|
||||||
|
@ -272,7 +272,7 @@ func Online() Option {
|
|||||||
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
|
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
|
||||||
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
|
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
|
||||||
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
|
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
|
||||||
Override(new(*sectorstorage.Manager), sectorstorage.New),
|
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
||||||
|
|
||||||
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
|
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
|
||||||
Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))),
|
Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))),
|
||||||
|
@ -178,7 +178,7 @@ func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]sto
|
|||||||
return sm.StorageProvider.ListIncompleteDeals()
|
return sm.StorageProvider.ListIncompleteDeals()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SetPrice(ctx context.Context, p types.BigInt) error {
|
func (sm *StorageMinerAPI) MarketSetPrice(ctx context.Context, p types.BigInt) error {
|
||||||
return sm.StorageProvider.AddAsk(abi.TokenAmount(p), 60*60*24*100) // lasts for 100 days?
|
return sm.StorageProvider.AddAsk(abi.TokenAmount(p), 60*60*24*100) // lasts for 100 days?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,6 +50,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sealing"
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorstorage"
|
"github.com/filecoin-project/lotus/storage/sectorstorage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
||||||
)
|
)
|
||||||
|
|
||||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||||
@ -336,3 +337,9 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
|
|||||||
network := rmnet.NewFromLibp2pHost(h)
|
network := rmnet.NewFromLibp2pHost(h)
|
||||||
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, ds)
|
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls sectorstorage.URLs, ca lapi.Common) (*sectorstorage.Manager, error) {
|
||||||
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
|
return sectorstorage.New(ctx, ls, si, cfg, urls, ca)
|
||||||
|
}
|
||||||
|
@ -3,6 +3,7 @@ package sectorstorage
|
|||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
@ -24,6 +25,8 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("advmgr")
|
var log = logging.Logger("advmgr")
|
||||||
|
|
||||||
|
var ErrNoWorkers = errors.New("no suitable workers found")
|
||||||
|
|
||||||
type URLs []string
|
type URLs []string
|
||||||
|
|
||||||
type Worker interface {
|
type Worker interface {
|
||||||
@ -71,9 +74,7 @@ type Manager struct {
|
|||||||
schedQueue *list.List // List[*workerRequest]
|
schedQueue *list.List // List[*workerRequest]
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
|
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
lstor, err := stores.NewLocal(ctx, ls, si, urls)
|
lstor, err := stores.NewLocal(ctx, ls, si, urls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -84,7 +85,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
|
|||||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
token, err := ca.AuthNew(context.TODO(), []api.Permission{"admin"})
|
token, err := ca.AuthNew(ctx, []api.Permission{"admin"})
|
||||||
headers := http.Header{}
|
headers := http.Header{}
|
||||||
headers.Add("Authorization", "Bearer "+string(token))
|
headers.Add("Authorization", "Bearer "+string(token))
|
||||||
stor := stores.NewRemote(lstor, si, headers)
|
stor := stores.NewRemote(lstor, si, headers)
|
||||||
@ -272,7 +273,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
||||||
|
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return abi.PieceInfo{}, xerrors.New("no worker found")
|
return abi.PieceInfo{}, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
||||||
@ -296,7 +297,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return nil, xerrors.New("no suitable workers found")
|
return nil, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
||||||
@ -320,7 +321,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return storage.SectorCids{}, xerrors.New("no suitable workers found")
|
return storage.SectorCids{}, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
||||||
@ -342,7 +343,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return nil, xerrors.New("no suitable workers found") // TODO: wait?
|
return nil, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Try very hard to execute on worker with access to the sectors
|
// TODO: Try very hard to execute on worker with access to the sectors
|
||||||
@ -373,6 +374,9 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
|||||||
candidateWorkers = append(candidateWorkers, id)
|
candidateWorkers = append(candidateWorkers, id)
|
||||||
}
|
}
|
||||||
m.workersLk.Unlock()
|
m.workersLk.Unlock()
|
||||||
|
if len(candidateWorkers) == 0 {
|
||||||
|
return nil, ErrNoWorkers
|
||||||
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -390,6 +394,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
||||||
|
if len(candidateWorkers) == 0 {
|
||||||
|
return ErrNoWorkers
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Remove sector from sealing stores
|
// TODO: Remove sector from sealing stores
|
||||||
// TODO: Move the sector to long-term storage
|
// TODO: Move the sector to long-term storage
|
||||||
|
Loading…
Reference in New Issue
Block a user