workers: RPC scaffolding

This commit is contained in:
Łukasz Magiera 2020-03-11 02:57:52 +01:00
parent da52596d79
commit eb61a36fd7
15 changed files with 299 additions and 214 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -107,16 +108,12 @@ type StorageMiner interface {
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
/*WorkerStats(context.Context) (sealsched.WorkerStats, error)*/ // WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error
WorkerAttachStorage(context.Context, StorageInfo) error
WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
WorkerFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
/*// WorkerQueue registers a remote worker
WorkerQueue(context.Context, WorkerCfg) (<-chan WorkerTask, error)
// WorkerQueue registers a remote worker
WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
*/
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
@ -128,6 +125,15 @@ type StorageMiner interface {
StorageAddLocal(ctx context.Context, path string) error StorageAddLocal(ctx context.Context, path string) error
} }
type StorageInfo struct {
ID string
URLs []string // TODO: Support non-http transports
Cost int
CanSeal bool
CanStore bool
}
type SealRes struct { type SealRes struct {
Err string Err string
GoErr error `json:"-"` GoErr error `json:"-"`

19
api/api_worker.go Normal file
View File

@ -0,0 +1,19 @@
package api
import (
"context"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type WorkerApi interface {
Version(context.Context) (build.Version, error)
// TODO: Info() (name, ...) ?
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight
storage.Sealer
}

View File

@ -3,6 +3,7 @@ package apistruct
import ( import (
"context" "context"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -16,9 +17,12 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr"
) )
// All permissions are listed in permissioned.go // All permissions are listed in permissioned.go
@ -167,6 +171,7 @@ type StorageMinerStruct struct {
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
/* Market */ SetPrice func(context.Context, types.BigInt) error `perm:"admin"`
PledgeSector func(context.Context) error `perm:"write"` PledgeSector func(context.Context) error `perm:"write"`
@ -175,12 +180,10 @@ type StorageMinerStruct struct {
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
/* WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"` WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"`
WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm WorkerDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"`
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` WorkerFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"`
*/
SetPrice func(context.Context, types.BigInt) error `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
@ -189,6 +192,22 @@ type StorageMinerStruct struct {
} }
} }
type WorkerStruct struct {
Internal struct {
// TODO: lower perms
Version func(context.Context) (build.Version, error) `perm:"admin"`
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) `perm:"admin"`
SealCommit2 func(context.Context, abi.SectorNumber, storage.Commit1Out) (storage.Proof, error) `perm:"admin"`
FinalizeSector func(context.Context, abi.SectorNumber) error `perm:"admin"`
}
}
func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]api.Permission, error) { func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]api.Permission, error) {
return c.Internal.AuthVerify(ctx, token) return c.Internal.AuthVerify(ctx, token)
} }
@ -627,17 +646,21 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum
return c.Internal.SectorsUpdate(ctx, id, state) return c.Internal.SectorsUpdate(ctx, id, state)
} }
/*func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sealsched.WorkerStats, error) { func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) error {
return c.Internal.WorkerStats(ctx) return c.Internal.WorkerConnect(ctx, url)
}*/
/*func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx, cfg)
} }
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error {
return c.Internal.WorkerDone(ctx, task, res) return c.Internal.WorkerAttachStorage(ctx, si)
}*/ }
func (c *StorageMinerStruct) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error {
return c.Internal.WorkerDeclareSector(ctx, storageId, s)
}
func (c *StorageMinerStruct) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) {
return c.Internal.WorkerFindSector(ctx, si, types)
}
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error { func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
return c.Internal.MarketImportDealData(ctx, propcid, path) return c.Internal.MarketImportDealData(ctx, propcid, path)
@ -667,6 +690,35 @@ func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) e
return c.Internal.StorageAddLocal(ctx, path) return c.Internal.StorageAddLocal(ctx, path)
} }
func (w *WorkerStruct) Version(ctx context.Context) (build.Version, error) {
return w.Internal.Version(ctx)
}
func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]struct{}, error) {
return w.Internal.TaskTypes(ctx)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, p1o storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
return w.Internal.SealPreCommit2(ctx, sectorNum, p1o)
}
func (w *WorkerStruct) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) {
return w.Internal.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (w *WorkerStruct) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, c1o storage.Commit1Out) (storage.Proof, error) {
return w.Internal.SealCommit2(ctx, sectorNum, c1o)
}
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
return w.Internal.FinalizeSector(ctx, sectorNum)
}
var _ api.Common = &CommonStruct{} var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{} var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{} var _ api.StorageMiner = &StorageMinerStruct{}
var _ api.WorkerApi = &WorkerStruct{}

View File

@ -42,3 +42,13 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine
return &res, closer, err return &res, closer, err
} }
func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerApi, jsonrpc.ClientCloser, error) {
var res apistruct.WorkerStruct
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.Internal,
}, requestHeader)
return &res, closer, err
}

View File

@ -5,13 +5,14 @@ import (
"fmt" "fmt"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-actors/actors/runtime" "github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
) )
func init() { func init() {

View File

@ -134,7 +134,6 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
} }
func GetRawAPI(ctx *cli.Context, t repo.RepoType) (string, http.Header, error) { func GetRawAPI(ctx *cli.Context, t repo.RepoType) (string, http.Header, error) {
ainfo, err := GetAPIInfo(ctx, t) ainfo, err := GetAPIInfo(ctx, t)
if err != nil { if err != nil {
return "", nil, xerrors.Errorf("could not get API info: %w", err) return "", nil, xerrors.Errorf("could not get API info: %w", err)

View File

@ -4,9 +4,15 @@ import (
"os" "os"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -75,7 +81,7 @@ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start lotus worker", Usage: "Start lotus worker",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
/* if !cctx.Bool("enable-gpu-proving") { if !cctx.Bool("enable-gpu-proving") {
os.Setenv("BELLMAN_NO_GPU", "true") os.Setenv("BELLMAN_NO_GPU", "true")
} }
@ -86,17 +92,6 @@ var runCmd = &cli.Command{
defer closer() defer closer()
ctx := lcli.ReqContext(cctx) ctx := lcli.ReqContext(cctx)
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
_, storageAddr, err := manet.DialArgs(ainfo.Addr)
r, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
v, err := nodeApi.Version(ctx) v, err := nodeApi.Version(ctx)
if err != nil { if err != nil {
return err return err
@ -110,11 +105,6 @@ var runCmd = &cli.Command{
log.Warn("Shutting down..") log.Warn("Shutting down..")
}() }()
limiter := &limits{
workLimit: make(chan struct{}, workers),
transferLimit: make(chan struct{}, transfers),
}
act, err := nodeApi.ActorAddress(ctx) act, err := nodeApi.ActorAddress(ctx)
if err != nil { if err != nil {
return err return err
@ -128,6 +118,15 @@ var runCmd = &cli.Command{
return xerrors.Errorf("get params: %w", err) return xerrors.Errorf("get params: %w", err)
} }
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
_, storageAddr, err := manet.DialArgs(ainfo.Addr)
/*
/*ppt, spt, err := api.ProofTypeFromSectorSize(ssize) /*ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil { if err != nil {
return err return err

View File

@ -0,0 +1,35 @@
package main
import (
"context"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
)
type worker struct {
}
func (w *worker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}
func (w *worker) SealPreCommit2(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
panic("implement me")
}
func (w *worker) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) {
panic("implement me")
}
func (w *worker) SealCommit2(context.Context, abi.SectorNumber, storage.Commit1Out) (storage.Proof, error) {
panic("implement me")
}
func (w *worker) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
}
var _ storage.Sealer = &worker{}

View File

@ -0,0 +1,22 @@
package main
import (
"context"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
)
type workerStorage struct {
path string // TODO: multi-path support
api api.StorageMiner
}
func (w *workerStorage) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
w.api.WorkerFindSector()
}
var _ sectorbuilder.SectorProvider = &workerStorage{}

View File

@ -1,132 +0,0 @@
package main
/*
import (
"context"
"net/http"
"github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
lapi "github.com/filecoin-project/lotus/api"
)
type worker struct {
api lapi.StorageMiner
minerEndpoint string
repo string
auth http.Header
limiter *limits
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api lapi.StorageMiner, sb *sectorbuilder.SectorBuilder, limiter *limits, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
w := &worker{
api: api,
minerEndpoint: endpoint,
auth: auth,
repo: repo,
limiter: limiter,
sb: sb,
}
tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
NoPreCommit: noprecommit,
NoCommit: nocommit,
})
if err != nil {
return err
}
loop:
for {
log.Infof("Waiting for new task")
select {
case task := <-tasks:
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorNum, task.Type)
res := w.processTask(ctx, task)
log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr)
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
log.Error(err)
}
case <-ctx.Done():
break loop
}
}
log.Warn("acceptJobs exit")
return nil
}
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {
switch task.Type {
case sectorbuilder.WorkerPreCommit:
case sectorbuilder.WorkerCommit:
default:
return errRes(xerrors.Errorf("unknown task type %d", task.Type))
}
if err := w.fetchSector(task.SectorNum, task.Type); err != nil {
return errRes(xerrors.Errorf("fetching sector: %w", err))
}
log.Infof("Data fetched, starting computation")
var res sectorbuilder.SealRes
switch task.Type {
case sectorbuilder.WorkerPreCommit:
w.limiter.workLimit <- struct{}{}
sealedCid, unsealedCid, err := w.sb.SealPreCommit(ctx, task.SectorNum, task.SealTicket, task.Pieces)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("precomitting: %w", err))
}
res.Rspco.CommD = unsealedCid
res.Rspco.CommR = sealedCid
if err := w.push("sealed", task.SectorNum); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.push("cache", task.SectorNum); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.remove("staging", task.SectorNum); err != nil {
return errRes(xerrors.Errorf("cleaning up staged sector: %w", err))
}
case sectorbuilder.WorkerCommit:
w.limiter.workLimit <- struct{}{}
proof, err := w.sb.SealCommit(ctx, task.SectorNum, task.SealTicket, task.SealSeed, task.Pieces, task.SealedCID, task.UnsealedCID)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("comitting: %w", err))
}
res.Proof = proof
if err := w.push("cache", task.SectorNum); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.remove("sealed", task.SectorNum); err != nil {
return errRes(xerrors.Errorf("cleaning up sealed sector: %w", err))
}
}
return res
}
func errRes(err error) sectorbuilder.SealRes {
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
}
*/

2
go.mod
View File

@ -117,3 +117,5 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder

View File

@ -13,7 +13,9 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -256,15 +258,26 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe
return sm.Miner.ForceSectorState(ctx, id, state) return sm.Miner.ForceSectorState(ctx, id, state)
} }
/* func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { _, err := advmgr.ConnectRemote(ctx, sm.Full, url)
return sm.SectorBuilder.AddWorker(ctx, cfg) if err != nil {
return err
} }
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { panic("todo register ")
return sm.SectorBuilder.TaskDone(ctx, task, res) }
func (sm *StorageMinerAPI) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error {
panic("implement me")
}
func (sm *StorageMinerAPI) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error {
panic("implement me")
}
func (sm *StorageMinerAPI) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) {
panic("implement me")
} }
*/
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path) fi, err := os.Open(path)

View File

@ -91,13 +91,13 @@ func (l *localWorker) FinalizeSector(ctx context.Context, sectorNum abi.SectorNu
return sb.FinalizeSector(ctx, sectorNum) return sb.FinalizeSector(ctx, sectorNum)
} }
func (l *localWorker) TaskTypes() map[sealmgr.TaskType]struct{} { func (l *localWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) {
return map[sealmgr.TaskType]struct{}{ return map[sealmgr.TaskType]struct{}{
sealmgr.TTAddPiece: {}, sealmgr.TTAddPiece: {},
sealmgr.TTPreCommit1: {}, sealmgr.TTPreCommit1: {},
sealmgr.TTPreCommit2: {}, sealmgr.TTPreCommit2: {},
sealmgr.TTCommit2: {}, sealmgr.TTCommit2: {},
} }, nil
} }
func (l *localWorker) Paths() []Path { func (l *localWorker) Paths() []Path {

View File

@ -5,6 +5,7 @@ import (
"io" "io"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -17,6 +18,8 @@ import (
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sealmgr"
) )
var log = logging.Logger("advmgr")
type SectorIDCounter interface { type SectorIDCounter interface {
Next() (abi.SectorNumber, error) Next() (abi.SectorNumber, error)
} }
@ -39,7 +42,7 @@ type Path struct {
type Worker interface { type Worker interface {
sectorbuilder.Sealer sectorbuilder.Sealer
TaskTypes() map[sealmgr.TaskType]struct{} TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error)
Paths() []Path Paths() []Path
} }
@ -122,7 +125,12 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor
paths := map[int]config.StorageMeta{} paths := map[int]config.StorageMeta{}
for i, worker := range m.workers { for i, worker := range m.workers {
if _, ok := worker.TaskTypes()[task]; !ok { tt, err := worker.TaskTypes(context.TODO())
if err != nil {
log.Errorf("error getting supported worker task types: %+v", err)
continue
}
if _, ok := tt[task]; !ok {
continue continue
} }
@ -221,7 +229,12 @@ func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, t
func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
for _, worker := range m.workers { for _, worker := range m.workers {
if _, ok := worker.TaskTypes()[sealmgr.TTCommit2]; !ok { tt, err := worker.TaskTypes(context.TODO())
if err != nil {
log.Errorf("error getting supported worker task types: %+v", err)
continue
}
if _, ok := tt[sealmgr.TTCommit2]; !ok {
continue continue
} }

View File

@ -0,0 +1,46 @@
package advmgr
import (
"context"
"net/http"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/api/client"
)
type remote struct {
api.WorkerApi
}
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
panic("implement me")
}
func (r *remote) Paths() []Path {
panic("implement me")
}
func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) {
token, err := fa.AuthNew(ctx, []api.Permission{apistruct.PermAdmin})
if err != nil {
return nil, xerrors.Errorf("creating auth token for remote connection: %w", err)
}
headers := http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
wapi, close, err := client.NewWorkerRPC(url, headers)
if err != nil {
return nil, xerrors.Errorf("creating jsonrpc client: %w", err)
}
_ = close // TODO
return &remote{wapi}, nil
}
var _ Worker = &remote{}