From eb61a36fd7bd74f8d4ae448703db55c087aaad2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Mar 2020 02:57:52 +0100 Subject: [PATCH] workers: RPC scaffolding --- api/api_storage.go | 24 ++++-- api/api_worker.go | 19 +++++ api/apistruct/struct.go | 82 +++++++++++++++---- api/client/client.go | 10 +++ chain/vm/syscalls.go | 3 +- cli/cmd.go | 1 - cmd/lotus-seal-worker/main.go | 89 ++++++++++---------- cmd/lotus-seal-worker/rpc.go | 35 ++++++++ cmd/lotus-seal-worker/storage.go | 22 +++++ cmd/lotus-seal-worker/sub.go | 132 ------------------------------ go.mod | 2 + node/impl/storminer.go | 25 ++++-- storage/sealmgr/advmgr/local.go | 4 +- storage/sealmgr/advmgr/manager.go | 19 ++++- storage/sealmgr/advmgr/remote.go | 46 +++++++++++ 15 files changed, 299 insertions(+), 214 deletions(-) create mode 100644 api/api_worker.go create mode 100644 cmd/lotus-seal-worker/rpc.go create mode 100644 cmd/lotus-seal-worker/storage.go delete mode 100644 cmd/lotus-seal-worker/sub.go create mode 100644 storage/sealmgr/advmgr/remote.go diff --git a/api/api_storage.go b/api/api_storage.go index 6b3705b79..180e8892a 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/chain/types" ) @@ -107,16 +108,12 @@ type StorageMiner interface { SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error - /*WorkerStats(context.Context) (sealsched.WorkerStats, error)*/ + // WorkerConnect tells the node to connect to workers RPC + WorkerConnect(context.Context, string) error + WorkerAttachStorage(context.Context, StorageInfo) error + WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error + WorkerFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) - /*// WorkerQueue registers a remote worker - WorkerQueue(context.Context, WorkerCfg) (<-chan WorkerTask, error) - - // WorkerQueue registers a remote worker - WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) - - WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error - */ MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) @@ -128,6 +125,15 @@ type StorageMiner interface { StorageAddLocal(ctx context.Context, path string) error } +type StorageInfo struct { + ID string + URLs []string // TODO: Support non-http transports + Cost int + + CanSeal bool + CanStore bool +} + type SealRes struct { Err string GoErr error `json:"-"` diff --git a/api/api_worker.go b/api/api_worker.go new file mode 100644 index 000000000..f3dbce9aa --- /dev/null +++ b/api/api_worker.go @@ -0,0 +1,19 @@ +package api + +import ( + "context" + + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/storage/sealmgr" +) + +type WorkerApi interface { + Version(context.Context) (build.Version, error) + // TODO: Info() (name, ...) ? + + TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight + + storage.Sealer +} diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 9eb73c637..4f037dced 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -3,6 +3,7 @@ package apistruct import ( "context" + "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -16,9 +17,12 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealmgr" ) // All permissions are listed in permissioned.go @@ -167,6 +171,7 @@ type StorageMinerStruct struct { MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` + /* Market */ SetPrice func(context.Context, types.BigInt) error `perm:"admin"` PledgeSector func(context.Context) error `perm:"write"` @@ -175,12 +180,10 @@ type StorageMinerStruct struct { SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` - /* WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"` - - WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm - WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` - */ - SetPrice func(context.Context, types.BigInt) error `perm:"admin"` + WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm + WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"` + WorkerDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` + WorkerFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` @@ -189,6 +192,22 @@ type StorageMinerStruct struct { } } +type WorkerStruct struct { + Internal struct { + // TODO: lower perms + + Version func(context.Context) (build.Version, error) `perm:"admin"` + + TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` + + SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` + SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"` + SealCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) `perm:"admin"` + SealCommit2 func(context.Context, abi.SectorNumber, storage.Commit1Out) (storage.Proof, error) `perm:"admin"` + FinalizeSector func(context.Context, abi.SectorNumber) error `perm:"admin"` + } +} + func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]api.Permission, error) { return c.Internal.AuthVerify(ctx, token) } @@ -627,17 +646,21 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum return c.Internal.SectorsUpdate(ctx, id, state) } -/*func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sealsched.WorkerStats, error) { - return c.Internal.WorkerStats(ctx) -}*/ - -/*func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { - return c.Internal.WorkerQueue(ctx, cfg) +func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) error { + return c.Internal.WorkerConnect(ctx, url) } -func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { - return c.Internal.WorkerDone(ctx, task, res) -}*/ +func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { + return c.Internal.WorkerAttachStorage(ctx, si) +} + +func (c *StorageMinerStruct) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { + return c.Internal.WorkerDeclareSector(ctx, storageId, s) +} + +func (c *StorageMinerStruct) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { + return c.Internal.WorkerFindSector(ctx, si, types) +} func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error { return c.Internal.MarketImportDealData(ctx, propcid, path) @@ -667,6 +690,35 @@ func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) e return c.Internal.StorageAddLocal(ctx, path) } +func (w *WorkerStruct) Version(ctx context.Context) (build.Version, error) { + return w.Internal.Version(ctx) +} + +func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]struct{}, error) { + return w.Internal.TaskTypes(ctx) +} + +func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + return w.Internal.SealPreCommit1(ctx, sectorNum, ticket, pieces) +} + +func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, p1o storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + return w.Internal.SealPreCommit2(ctx, sectorNum, p1o) +} + +func (w *WorkerStruct) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) { + return w.Internal.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID) +} + +func (w *WorkerStruct) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, c1o storage.Commit1Out) (storage.Proof, error) { + return w.Internal.SealCommit2(ctx, sectorNum, c1o) +} + +func (w *WorkerStruct) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error { + return w.Internal.FinalizeSector(ctx, sectorNum) +} + var _ api.Common = &CommonStruct{} var _ api.FullNode = &FullNodeStruct{} var _ api.StorageMiner = &StorageMinerStruct{} +var _ api.WorkerApi = &WorkerStruct{} diff --git a/api/client/client.go b/api/client/client.go index 0e19f65c2..0380d45b5 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -42,3 +42,13 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine return &res, closer, err } + +func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerApi, jsonrpc.ClientCloser, error) { + var res apistruct.WorkerStruct + closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.Internal, + }, requestHeader) + + return &res, closer, err +} diff --git a/chain/vm/syscalls.go b/chain/vm/syscalls.go index 63438d0c1..720aa3adf 100644 --- a/chain/vm/syscalls.go +++ b/chain/vm/syscalls.go @@ -5,13 +5,14 @@ import ( "fmt" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/runtime" "github.com/ipfs/go-cid" mh "github.com/multiformats/go-multihash" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" ) func init() { diff --git a/cli/cmd.go b/cli/cmd.go index 48b93fb58..fc312f191 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -134,7 +134,6 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { } func GetRawAPI(ctx *cli.Context, t repo.RepoType) (string, http.Header, error) { - ainfo, err := GetAPIInfo(ctx, t) if err != nil { return "", nil, xerrors.Errorf("could not get API info: %w", err) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index efa4aa015..b44abc113 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -4,9 +4,15 @@ import ( "os" logging "github.com/ipfs/go-log/v2" + manet "github.com/multiformats/go-multiaddr-net" + "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + paramfetch "github.com/filecoin-project/go-paramfetch" + + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/node/repo" ) @@ -75,60 +81,53 @@ var runCmd = &cli.Command{ Name: "run", Usage: "Start lotus worker", Action: func(cctx *cli.Context) error { - /* if !cctx.Bool("enable-gpu-proving") { - os.Setenv("BELLMAN_NO_GPU", "true") - } + if !cctx.Bool("enable-gpu-proving") { + os.Setenv("BELLMAN_NO_GPU", "true") + } - nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) - if err != nil { - return xerrors.Errorf("getting miner api: %w", err) - } - defer closer() - ctx := lcli.ReqContext(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return xerrors.Errorf("getting miner api: %w", err) + } + defer closer() + ctx := lcli.ReqContext(cctx) - ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner) - if err != nil { - return xerrors.Errorf("could not get api info: %w", err) - } - _, storageAddr, err := manet.DialArgs(ainfo.Addr) + v, err := nodeApi.Version(ctx) + if err != nil { + return err + } + if v.APIVersion != build.APIVersion { + return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) + } - r, err := homedir.Expand(cctx.String("repo")) - if err != nil { - return err - } + go func() { + <-ctx.Done() + log.Warn("Shutting down..") + }() - v, err := nodeApi.Version(ctx) - if err != nil { - return err - } - if v.APIVersion != build.APIVersion { - return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) - } + act, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + ssize, err := nodeApi.ActorSectorSize(ctx, act) + if err != nil { + return err + } - go func() { - <-ctx.Done() - log.Warn("Shutting down..") - }() + if err := paramfetch.GetParams(build.ParametersJson(), uint64(ssize)); err != nil { + return xerrors.Errorf("get params: %w", err) + } - limiter := &limits{ - workLimit: make(chan struct{}, workers), - transferLimit: make(chan struct{}, transfers), - } + ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner) + if err != nil { + return xerrors.Errorf("could not get api info: %w", err) + } + _, storageAddr, err := manet.DialArgs(ainfo.Addr) - act, err := nodeApi.ActorAddress(ctx) - if err != nil { - return err - } - ssize, err := nodeApi.ActorSectorSize(ctx, act) - if err != nil { - return err - } - if err := paramfetch.GetParams(build.ParametersJson(), uint64(ssize)); err != nil { - return xerrors.Errorf("get params: %w", err) - } - /*ppt, spt, err := api.ProofTypeFromSectorSize(ssize) + /* + /*ppt, spt, err := api.ProofTypeFromSectorSize(ssize) if err != nil { return err } diff --git a/cmd/lotus-seal-worker/rpc.go b/cmd/lotus-seal-worker/rpc.go new file mode 100644 index 000000000..5f8b5e7b1 --- /dev/null +++ b/cmd/lotus-seal-worker/rpc.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + "github.com/ipfs/go-cid" +) + +type worker struct { + +} + +func (w *worker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + panic("implement me") +} + +func (w *worker) SealPreCommit2(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + panic("implement me") +} + +func (w *worker) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) { + panic("implement me") +} + +func (w *worker) SealCommit2(context.Context, abi.SectorNumber, storage.Commit1Out) (storage.Proof, error) { + panic("implement me") +} + +func (w *worker) FinalizeSector(context.Context, abi.SectorNumber) error { + panic("implement me") +} + +var _ storage.Sealer = &worker{} diff --git a/cmd/lotus-seal-worker/storage.go b/cmd/lotus-seal-worker/storage.go new file mode 100644 index 000000000..cb2775597 --- /dev/null +++ b/cmd/lotus-seal-worker/storage.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/api" +) + +type workerStorage struct { + path string // TODO: multi-path support + + api api.StorageMiner +} + +func (w *workerStorage) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { + w.api.WorkerFindSector() +} + +var _ sectorbuilder.SectorProvider = &workerStorage{} diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go deleted file mode 100644 index d11df2acc..000000000 --- a/cmd/lotus-seal-worker/sub.go +++ /dev/null @@ -1,132 +0,0 @@ -package main - -/* -import ( - "context" - "net/http" - - "github.com/filecoin-project/go-sectorbuilder" - "golang.org/x/xerrors" - - lapi "github.com/filecoin-project/lotus/api" -) - -type worker struct { - api lapi.StorageMiner - minerEndpoint string - repo string - auth http.Header - - limiter *limits - sb *sectorbuilder.SectorBuilder -} - -func acceptJobs(ctx context.Context, api lapi.StorageMiner, sb *sectorbuilder.SectorBuilder, limiter *limits, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error { - w := &worker{ - api: api, - minerEndpoint: endpoint, - auth: auth, - repo: repo, - - limiter: limiter, - sb: sb, - } - - tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{ - NoPreCommit: noprecommit, - NoCommit: nocommit, - }) - if err != nil { - return err - } - -loop: - for { - log.Infof("Waiting for new task") - - select { - case task := <-tasks: - log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorNum, task.Type) - - res := w.processTask(ctx, task) - - log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) - - if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { - log.Error(err) - } - case <-ctx.Done(): - break loop - } - } - - log.Warn("acceptJobs exit") - return nil -} - -func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes { - switch task.Type { - case sectorbuilder.WorkerPreCommit: - case sectorbuilder.WorkerCommit: - default: - return errRes(xerrors.Errorf("unknown task type %d", task.Type)) - } - - if err := w.fetchSector(task.SectorNum, task.Type); err != nil { - return errRes(xerrors.Errorf("fetching sector: %w", err)) - } - - log.Infof("Data fetched, starting computation") - - var res sectorbuilder.SealRes - - switch task.Type { - case sectorbuilder.WorkerPreCommit: - w.limiter.workLimit <- struct{}{} - sealedCid, unsealedCid, err := w.sb.SealPreCommit(ctx, task.SectorNum, task.SealTicket, task.Pieces) - <-w.limiter.workLimit - - if err != nil { - return errRes(xerrors.Errorf("precomitting: %w", err)) - } - res.Rspco.CommD = unsealedCid - res.Rspco.CommR = sealedCid - - if err := w.push("sealed", task.SectorNum); err != nil { - return errRes(xerrors.Errorf("pushing precommited data: %w", err)) - } - - if err := w.push("cache", task.SectorNum); err != nil { - return errRes(xerrors.Errorf("pushing precommited data: %w", err)) - } - - if err := w.remove("staging", task.SectorNum); err != nil { - return errRes(xerrors.Errorf("cleaning up staged sector: %w", err)) - } - case sectorbuilder.WorkerCommit: - w.limiter.workLimit <- struct{}{} - proof, err := w.sb.SealCommit(ctx, task.SectorNum, task.SealTicket, task.SealSeed, task.Pieces, task.SealedCID, task.UnsealedCID) - <-w.limiter.workLimit - - if err != nil { - return errRes(xerrors.Errorf("comitting: %w", err)) - } - - res.Proof = proof - - if err := w.push("cache", task.SectorNum); err != nil { - return errRes(xerrors.Errorf("pushing precommited data: %w", err)) - } - - if err := w.remove("sealed", task.SectorNum); err != nil { - return errRes(xerrors.Errorf("cleaning up sealed sector: %w", err)) - } - } - - return res -} - -func errRes(err error) sectorbuilder.SealRes { - return sectorbuilder.SealRes{Err: err.Error(), GoErr: err} -} -*/ diff --git a/go.mod b/go.mod index 1ab22a12a..822874a80 100644 --- a/go.mod +++ b/go.mod @@ -117,3 +117,5 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 + +replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder diff --git a/node/impl/storminer.go b/node/impl/storminer.go index f4bee869b..38b7cddc9 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -13,7 +13,9 @@ import ( "github.com/filecoin-project/go-address" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/lotus/api" @@ -256,15 +258,26 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe return sm.Miner.ForceSectorState(ctx, id, state) } -/* -func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { - return sm.SectorBuilder.AddWorker(ctx, cfg) +func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error { + _, err := advmgr.ConnectRemote(ctx, sm.Full, url) + if err != nil { + return err + } + + panic("todo register ") } -func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { - return sm.SectorBuilder.TaskDone(ctx, task, res) +func (sm *StorageMinerAPI) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { + panic("implement me") +} + +func (sm *StorageMinerAPI) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { + panic("implement me") +} + +func (sm *StorageMinerAPI) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { + panic("implement me") } -*/ func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { fi, err := os.Open(path) diff --git a/storage/sealmgr/advmgr/local.go b/storage/sealmgr/advmgr/local.go index 63f852d7b..44e15abad 100644 --- a/storage/sealmgr/advmgr/local.go +++ b/storage/sealmgr/advmgr/local.go @@ -91,13 +91,13 @@ func (l *localWorker) FinalizeSector(ctx context.Context, sectorNum abi.SectorNu return sb.FinalizeSector(ctx, sectorNum) } -func (l *localWorker) TaskTypes() map[sealmgr.TaskType]struct{} { +func (l *localWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) { return map[sealmgr.TaskType]struct{}{ sealmgr.TTAddPiece: {}, sealmgr.TTPreCommit1: {}, sealmgr.TTPreCommit2: {}, sealmgr.TTCommit2: {}, - } + }, nil } func (l *localWorker) Paths() []Path { diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index 9a8a6c5a4..c97b5ebeb 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -5,6 +5,7 @@ import ( "io" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" @@ -17,6 +18,8 @@ import ( "github.com/filecoin-project/lotus/storage/sealmgr" ) +var log = logging.Logger("advmgr") + type SectorIDCounter interface { Next() (abi.SectorNumber, error) } @@ -39,7 +42,7 @@ type Path struct { type Worker interface { sectorbuilder.Sealer - TaskTypes() map[sealmgr.TaskType]struct{} + TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) Paths() []Path } @@ -122,7 +125,12 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor paths := map[int]config.StorageMeta{} for i, worker := range m.workers { - if _, ok := worker.TaskTypes()[task]; !ok { + tt, err := worker.TaskTypes(context.TODO()) + if err != nil { + log.Errorf("error getting supported worker task types: %+v", err) + continue + } + if _, ok := tt[task]; !ok { continue } @@ -221,7 +229,12 @@ func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, t func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { for _, worker := range m.workers { - if _, ok := worker.TaskTypes()[sealmgr.TTCommit2]; !ok { + tt, err := worker.TaskTypes(context.TODO()) + if err != nil { + log.Errorf("error getting supported worker task types: %+v", err) + continue + } + if _, ok := tt[sealmgr.TTCommit2]; !ok { continue } diff --git a/storage/sealmgr/advmgr/remote.go b/storage/sealmgr/advmgr/remote.go new file mode 100644 index 000000000..78d7ee81d --- /dev/null +++ b/storage/sealmgr/advmgr/remote.go @@ -0,0 +1,46 @@ +package advmgr + +import ( + "context" + "net/http" + + "github.com/filecoin-project/specs-actors/actors/abi" + storage2 "github.com/filecoin-project/specs-storage/storage" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/api/client" +) + +type remote struct { + api.WorkerApi +} + +func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { + panic("implement me") +} + +func (r *remote) Paths() []Path { + panic("implement me") +} + +func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) { + token, err := fa.AuthNew(ctx, []api.Permission{apistruct.PermAdmin}) + if err != nil { + return nil, xerrors.Errorf("creating auth token for remote connection: %w", err) + } + + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(token)) + + wapi, close, err := client.NewWorkerRPC(url, headers) + if err != nil { + return nil, xerrors.Errorf("creating jsonrpc client: %w", err) + } + _ = close // TODO + + return &remote{wapi}, nil +} + +var _ Worker = &remote{}