post workers: Cleanup, tests

This commit is contained in:
Łukasz Magiera 2022-01-14 14:11:04 +01:00
parent e216aefd23
commit 4a874eff70
54 changed files with 1183 additions and 1251 deletions

View File

@ -920,6 +920,11 @@ workflows:
suite: itest-wdpost
target: "./itests/wdpost_test.go"
- test:
name: test-itest-worker
suite: itest-worker
target: "./itests/worker_test.go"
- test:
name: test-unit-cli
suite: utest-unit-cli

View File

@ -1,46 +0,0 @@
### Environment variables
Ensure that workers have access to the following environment variables when they run.
```
export TMPDIR=/fast/disk/folder3 # used when sealing
export MINER_API_INFO:<TOKEN>:/ip4/<miner_api_address>/tcp/<port>/http`
export BELLMAN_CPU_UTILIZATION=0.875 # optimal value depends on exact hardware
export FIL_PROOFS_MAXIMIZE_CACHING=1
export FIL_PROOFS_USE_GPU_COLUMN_BUILDER=1 # when GPU is available
export FIL_PROOFS_USE_GPU_TREE_BUILDER=1 # when GPU is available
export FIL_PROOFS_PARAMETER_CACHE=/fast/disk/folder # > 100GiB!
export FIL_PROOFS_PARENT_CACHE=/fast/disk/folder2 # > 50GiB!
# The following increases speed of PreCommit1 at the cost of using a full
# CPU core-complex rather than a single core.
# See https://github.com/filecoin-project/rust-fil-proofs/ and the
# "Worker co-location" section below.
export FIL_PROOFS_USE_MULTICORE_SDR=1
```
### Run the worker
```
lotus-worker run <flags>
```
These are old flags:
```
--addpiece enable addpiece (default: true)
--precommit1 enable precommit1 (32G sectors: 1 core, 128GiB RAM) (default: true)
--unseal enable unsealing (32G sectors: 1 core, 128GiB RAM) (default: true)
--precommit2 enable precommit2 (32G sectors: multiple cores, 96GiB RAM) (default: true)
--commit enable commit (32G sectors: multiple cores or GPUs, 128GiB RAM + 64GiB swap) (default: true)
```
We added two new flags:
```
--windowpost enable windowpost (default: false)
--winnningpost enable winningpost (default: false)
```
These post flags have priority over old flags. If you want this worker to be a window post machine, you can just set the windowpost flag to be `true`. Similar to winning post machine. If you set both of them to be `true`, it will be a window post machine.

View File

@ -154,8 +154,6 @@ type StorageMiner interface {
StorageLocal(ctx context.Context) (map[stores.ID]string, error) //perm:admin
StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) //perm:admin
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) //perm:admin
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
MarketListDeals(ctx context.Context) ([]MarketDeal, error) //perm:read
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read

View File

@ -7,11 +7,11 @@ import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
)
@ -49,8 +49,9 @@ type Worker interface {
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, postChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) //perm:admin
TaskDisable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin

View File

@ -300,8 +300,6 @@ func init() {
Error: "<error>",
})
addExample(storiface.ResourceTable)
addExample(map[abi.SectorNumber][]uint64{})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -22,7 +22,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -31,7 +30,7 @@ import (
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
@ -808,8 +807,6 @@ type StorageMinerStruct struct {
StorageGetLocks func(p0 context.Context) (storiface.SectorLocks, error) `perm:"admin"`
StorageGetUrl func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) `perm:"admin"`
StorageInfo func(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageList func(p0 context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
@ -871,9 +868,9 @@ type WorkerStruct struct {
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
GenerateWindowPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) `perm:"admin"`
GenerateWindowPoSt func(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 int, p5 abi.PoStRandomness) (storiface.WindowPoStResult, error) `perm:"admin"`
GenerateWinningPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) `perm:"admin"`
GenerateWinningPoSt func(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 abi.PoStRandomness) ([]proof.PoStProof, error) `perm:"admin"`
Info func(p0 context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
@ -4741,17 +4738,6 @@ func (s *StorageMinerStub) StorageGetLocks(p0 context.Context) (storiface.Sector
return *new(storiface.SectorLocks), ErrNotSupported
}
func (s *StorageMinerStruct) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
if s.Internal.StorageGetUrl == nil {
return "", ErrNotSupported
}
return s.Internal.StorageGetUrl(p0, p1, p2)
}
func (s *StorageMinerStub) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
return "", ErrNotSupported
}
func (s *StorageMinerStruct) StorageInfo(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) {
if s.Internal.StorageInfo == nil {
return *new(stores.StorageInfo), ErrNotSupported
@ -4994,25 +4980,25 @@ func (s *WorkerStub) GenerateSectorKeyFromData(p0 context.Context, p1 storage.Se
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
func (s *WorkerStruct) GenerateWindowPoSt(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 int, p5 abi.PoStRandomness) (storiface.WindowPoStResult, error) {
if s.Internal.GenerateWindowPoSt == nil {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
return *new(storiface.WindowPoStResult), ErrNotSupported
}
return s.Internal.GenerateWindowPoSt(p0, p1, p2, p3, p4, p5, p6)
return s.Internal.GenerateWindowPoSt(p0, p1, p2, p3, p4, p5)
}
func (s *WorkerStub) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
func (s *WorkerStub) GenerateWindowPoSt(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 int, p5 abi.PoStRandomness) (storiface.WindowPoStResult, error) {
return *new(storiface.WindowPoStResult), ErrNotSupported
}
func (s *WorkerStruct) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
func (s *WorkerStruct) GenerateWinningPoSt(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 abi.PoStRandomness) ([]proof.PoStProof, error) {
if s.Internal.GenerateWinningPoSt == nil {
return *new([]proof.PoStProof), ErrNotSupported
}
return s.Internal.GenerateWinningPoSt(p0, p1, p2, p3, p4)
}
func (s *WorkerStub) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
func (s *WorkerStub) GenerateWinningPoSt(p0 context.Context, p1 abi.RegisteredPoStProof, p2 abi.ActorID, p3 []storiface.PostSectorChallenge, p4 abi.PoStRandomness) ([]proof.PoStProof, error) {
return *new([]proof.PoStProof), ErrNotSupported
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -13,7 +13,6 @@ import (
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
@ -22,7 +21,6 @@ import (
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-statestore"
@ -31,13 +29,12 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/cmd/lotus-seal-worker/sealworker"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
)
@ -163,16 +160,6 @@ var runCmd = &cli.Command{
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
&cli.BoolFlag{
Name: "windowpost",
Usage: "enable window post",
@ -184,6 +171,16 @@ var runCmd = &cli.Command{
Usage: "enable winning post",
Value: false,
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
@ -262,7 +259,19 @@ var runCmd = &cli.Command{
var taskTypes []sealtasks.TaskType
exclusiveSet := false
if cctx.Bool("windowpost") {
exclusiveSet = true
taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
}
if cctx.Bool("winningpost") {
exclusiveSet = true
taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
}
if !exclusiveSet {
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
}
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
@ -280,17 +289,12 @@ var runCmd = &cli.Command{
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if cctx.Bool("windowpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
}
if cctx.Bool("winningpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
}
if exclusiveSet && len(taskTypes) != 1 {
return xerrors.Errorf("PoSt workers only support a single task type; have %v", taskTypes)
}
// Open repo
@ -415,35 +419,19 @@ var runCmd = &cli.Command{
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
workerApi := &worker{
workerApi := &sealworker.Worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
}, remote, localStore, nodeApi, nodeApi, wsts),
localStore: localStore,
ls: lr,
LocalStore: localStore,
Storage: lr,
}
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + address)
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", api.PermissionedWorkerAPI(proxy.MetricedWorkerAPI(workerApi)))
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remoteHandler)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}
srv := &http.Server{
Handler: ah,
Handler: sealworker.WorkerHandler(nodeApi.AuthVerify, remoteHandler, workerApi, true),
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx

View File

@ -1,85 +0,0 @@
package main
import (
"context"
"sync/atomic"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type worker struct {
*sectorstorage.LocalWorker
localStore *stores.Local
ls stores.LocalStorage
disabled int64
}
func (w *worker) Version(context.Context) (api.Version, error) {
return api.WorkerAPIVersion0, nil
}
func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := w.localStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := w.ls.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
return nil
}
func (w *worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1)
if enabled {
disabled = 0
}
atomic.StoreInt64(&w.disabled, disabled)
return nil
}
func (w *worker) Enabled(ctx context.Context) (bool, error) {
return atomic.LoadInt64(&w.disabled) == 0, nil
}
func (w *worker) WaitQuiet(ctx context.Context) error {
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
return nil
}
func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.LocalWorker.Session(ctx)
}
func (w *worker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&w.disabled) == 1 {
return uuid.UUID{}, xerrors.Errorf("worker disabled")
}
return w.LocalWorker.Session(ctx)
}
func (w *worker) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) {
return build.OpenRPCDiscoverJSON_Worker(), nil
}
var _ storiface.WorkerCalls = &worker{}

View File

@ -0,0 +1,119 @@
package sealworker
import (
"context"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/gorilla/mux"
"net/http"
"sync/atomic"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func WorkerHandler(authv func(ctx context.Context, token string) ([]auth.Permission, error), remote http.HandlerFunc, a api.Worker, permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
wapi := proxy.MetricedWorkerAPI(a)
if permissioned {
wapi = api.PermissionedWorkerAPI(wapi)
}
rpcServer.Register("Filecoin", wapi)
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
if !permissioned {
return mux
}
ah := &auth.Handler{
Verify: authv,
Next: mux.ServeHTTP,
}
return ah
}
type Worker struct {
*sectorstorage.LocalWorker
LocalStore *stores.Local
Storage stores.LocalStorage
disabled int64
}
func (w *Worker) Version(context.Context) (api.Version, error) {
return api.WorkerAPIVersion0, nil
}
func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := w.LocalStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := w.Storage.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
return nil
}
func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1)
if enabled {
disabled = 0
}
atomic.StoreInt64(&w.disabled, disabled)
return nil
}
func (w *Worker) Enabled(ctx context.Context) (bool, error) {
return atomic.LoadInt64(&w.disabled) == 0, nil
}
func (w *Worker) WaitQuiet(ctx context.Context) error {
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
return nil
}
func (w *Worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.LocalWorker.Session(ctx)
}
func (w *Worker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&w.disabled) == 1 {
return uuid.UUID{}, xerrors.Errorf("worker disabled")
}
return w.LocalWorker.Session(ctx)
}
func (w *Worker) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) {
return build.OpenRPCDiscoverJSON_Worker(), nil
}
var _ storiface.WorkerCalls = &Worker{}

View File

@ -144,7 +144,6 @@
* [StorageDropSector](#StorageDropSector)
* [StorageFindSector](#StorageFindSector)
* [StorageGetLocks](#StorageGetLocks)
* [StorageGetUrl](#StorageGetUrl)
* [StorageInfo](#StorageInfo)
* [StorageList](#StorageList)
* [StorageLocal](#StorageLocal)
@ -2877,8 +2876,7 @@ Inputs:
],
"AllowTo": [
"string value"
],
"Path": "string value"
]
},
{
"Capacity": 9,
@ -2924,8 +2922,7 @@ Response:
],
"AllowTo": [
"string value"
],
"Path": "string value"
]
}
]
```
@ -2995,11 +2992,13 @@ Response:
"URLs": [
"string value"
],
"BaseURLs": [
"string value"
],
"Weight": 42,
"CanSeal": true,
"CanStore": true,
"Primary": true,
"Path": "string value"
"Primary": true
}
]
```
@ -3039,24 +3038,6 @@ Response:
}
```
### StorageGetUrl
Perms: admin
Inputs:
```json
[
{
"Miner": 1000,
"Number": 9
},
1
]
```
Response: `"string value"`
### StorageInfo
@ -3085,8 +3066,7 @@ Response:
],
"AllowTo": [
"string value"
],
"Path": "string value"
]
}
```

View File

@ -864,22 +864,22 @@ Perms: admin
Inputs:
```json
[
8,
1000,
[
{
"Spsi": null
"SealProof": 8,
"SectorNumber": 9,
"SealedCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
123,
123,
"Bw==",
{
"Fc": {
"Sectors": [
123,
124
"Challenge": [
42
]
}
],
"Challenges": {}
}
}
123,
"Bw=="
]
```
@ -907,20 +907,21 @@ Perms: admin
Inputs:
```json
[
8,
1000,
[
{
"Spsi": null
"SealProof": 8,
"SectorNumber": 9,
"SealedCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Bw==",
{
"Fc": {
"Sectors": [
123,
124
"Challenge": [
42
]
}
],
"Challenges": {}
}
}
"Bw=="
]
```

View File

@ -44,10 +44,10 @@ OPTIONS:
--unseal enable unsealing (32G sectors: 1 core, 128GiB Memory) (default: true)
--precommit2 enable precommit2 (32G sectors: all cores, 96GiB Memory) (default: true)
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--windowpost enable window post (default: false)
--winningpost enable winning post (default: false)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--help, -h show help (default: false)
```

View File

@ -4,15 +4,12 @@ import (
"context"
"crypto/rand"
"fmt"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -23,9 +20,12 @@ type FaultTracker interface {
// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
if rg == nil {
return nil, xerrors.Errorf("rg is nil")
}
var bad = make(map[abi.SectorID]string)
// TODO: More better checks
for _, sector := range sectors {
err := func() error {
ctx, cancel := context.WithCancel(ctx)
@ -42,20 +42,6 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
lp, _, err := m.storage.AcquireSectorPaths(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
return nil
}
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache and/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)
bad[sector.ID] = fmt.Sprintf("cache and/or sealed paths not found, cache %q, sealed %q", lp.Cache, lp.Sealed)
return nil
}
if rg != nil {
wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
return err
@ -69,38 +55,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
sector.ID.Number,
})
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err)
return nil
}
commr, err := rg(ctx, sector.ID)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err)
return nil
}
psi := ffiwrapper.PrivateSectorInfo{
Psi: ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
_, err = m.storage.GenerateSingleVanillaProof(ctx, sector.ID.Miner, storiface.PostSectorChallenge{
SealProof: sector.ProofType,
SectorNumber: sector.ID.Number,
SealedCID: commr,
},
CacheDirPath: lp.Cache,
PoStProofType: wpp,
SealedSectorPath: lp.Sealed,
},
}
_, err = m.storage.GenerateSingleVanillaProof(ctx, sector.ID.Miner, &psi, ch.Challenges[sector.ID.Number])
Challenge: ch.Challenges[sector.ID.Number],
}, wpp)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
return nil
}
}
return nil
}()

View File

@ -84,7 +84,3 @@ func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, exis
return out, done, nil
}
func (b *Provider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return b.AcquireSector(ctx, id, existing, 0, ptype)
}

View File

@ -22,6 +22,7 @@ import (
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/detailyang/go-fallocate"
@ -920,3 +921,18 @@ func GenerateUnsealedCID(proofType abi.RegisteredSealProof, pieces []abi.PieceIn
return ffi.GenerateUnsealedCID(proofType, allPieces)
}
func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof5.PoStProof, error) {
return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas)
}
func (sb *Sealer) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof5.PoStProof, error) {
pp, err := ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx))
if err != nil || pp == nil {
return proof5.PoStProof{}, err
}
return proof5.PoStProof{
PoStProof: pp.PoStProof,
ProofBytes: pp.ProofBytes,
}, nil
}

View File

@ -6,13 +6,12 @@ import (
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -34,8 +33,6 @@ type Storage interface {
UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
WorkerProver
}
type Verifier interface {
@ -59,41 +56,6 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
type MinerProver interface {
PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (SortedPrivateSectorInfo, []abi.SectorID, func(), error)
SplitSortedPrivateSectorInfo(ctx context.Context, privsector SortedPrivateSectorInfo, offset int, end int) (SortedPrivateSectorInfo, error)
GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*FallbackChallenges, error)
}
type WindowPoStResult struct {
PoStProofs proof.PoStProof
Skipped []abi.SectorID
}
// type SortedPrivateSectorInfo ffi.SortedPrivateSectorInfo
type FallbackChallenges struct {
Fc ffi.FallbackChallenges
}
type SortedPrivateSectorInfo struct {
Spsi ffi.SortedPrivateSectorInfo
}
type PrivateSectorInfo struct {
Psi ffi.PrivateSectorInfo
}
type WorkerProver interface {
GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error)
GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error)
}
type WorkerCalls interface {
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *FallbackChallenges) (WindowPoStResult, error)
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *FallbackChallenges) ([]proof.PoStProof, error)
}

View File

@ -12,7 +12,6 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
@ -142,83 +141,7 @@ func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoS
return ffi.VerifyWindowPoSt(info)
}
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, mid abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
randomness[31] &= 0x3f
return ffi.GenerateWinningPoStSectorChallenge(proofType, mid, randomness, eligibleSectorCount)
}
func (sb *Sealer) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var doneFuncs []func()
done := func() {
for _, df := range doneFuncs {
df()
}
}
var skipped []abi.SectorID
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSectorPaths(ctx, sid, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)
postProofType, err := rpt(s.SealProof)
if err != nil {
done()
return SortedPrivateSectorInfo{}, nil, nil, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
privsectors := ffi.NewSortedPrivateSectorInfo(out...)
return SortedPrivateSectorInfo{Spsi: privsectors}, skipped, done, nil
}
func (sb *Sealer) GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*FallbackChallenges, error) {
fc, err := ffi.GeneratePoStFallbackSectorChallenges(proofType, minerID, randomness, sectorIds)
return &FallbackChallenges{
Fc: *fc,
}, err
}
func (sb *Sealer) SplitSortedPrivateSectorInfo(ctx context.Context, privsector SortedPrivateSectorInfo, offset int, end int) (SortedPrivateSectorInfo, error) {
Spsi, err := ffi.SplitSortedPrivateSectorInfo(ctx, privsector.Spsi, offset, end)
return SortedPrivateSectorInfo{Spsi: Spsi}, err
}
func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof5.PoStProof, error) {
return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas)
}
func (sb *Sealer) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error) {
pp, err := ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx))
return &proof.PoStProof{
PoStProof: pp.PoStProof,
ProofBytes: pp.ProofBytes,
}, err
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
}

View File

@ -42,8 +42,6 @@ type Worker interface {
Session(context.Context) (uuid.UUID, error)
Close() error // TODO: do we need this?
ffiwrapper.WorkerCalls
}
type SectorManager interface {
@ -51,8 +49,6 @@ type SectorManager interface {
storage.Prover
storiface.WorkerReturn
FaultTracker
ffiwrapper.MinerProver
}
var ClosedWorkerID = uuid.UUID{}
@ -65,8 +61,10 @@ type Manager struct {
index stores.SectorIndex
sched *scheduler
windowPoStSched *poStScheduler
winningPoStSched *poStScheduler
storage.Prover
localProver storage.Prover
workLk sync.Mutex
work *statestore.StateStore
@ -77,10 +75,10 @@ type Manager struct {
results map[WorkID]result
waitRes map[WorkID]chan struct{}
ffiwrapper.MinerProver
}
var _ storage.Prover = &Manager{}
type result struct {
r interface{}
err error
@ -122,7 +120,7 @@ type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, remote: stor})
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -135,16 +133,16 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
index: si,
sched: newScheduler(),
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
Prover: prover,
localProver: prover,
work: mss,
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},
results: map[WorkID]result{},
waitRes: map[WorkID]chan struct{}{},
MinerProver: prover,
}
m.setupWorkTracker()
@ -202,7 +200,32 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
}
func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
return m.sched.runWorker(ctx, w)
sessID, err := w.Session(ctx)
if err != nil {
return xerrors.Errorf("getting worker session: %w", err)
}
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
wid := storiface.WorkerID(sessID)
whnd, err := newWorkerHandle(ctx, w)
if err != nil {
return err
}
tasks, err := w.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting worker tasks: %w", err)
}
if m.windowPoStSched.MaybeAddWorker(wid, tasks, whnd) ||
m.winningPoStSched.MaybeAddWorker(wid, tasks, whnd) {
return nil
}
return m.sched.runWorker(ctx, wid, whnd)
}
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -948,6 +971,8 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
}
func (m *Manager) Close(ctx context.Context) error {
m.windowPoStSched.schedClose()
m.winningPoStSched.schedClose()
return m.sched.Close(ctx)
}

View File

@ -2,53 +2,64 @@ package sectorstorage
import (
"context"
"sort"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
builtin6 "github.com/filecoin-project/specs-actors/v6/actors/builtin"
"github.com/hashicorp/go-multierror"
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/v6/actors/builtin"
"github.com/filecoin-project/specs-actors/v6/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (m *Manager) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
if !m.sched.winningPoStSched.CanSched(ctx) {
if !m.winningPoStSched.CanSched(ctx) {
log.Info("GenerateWinningPoSt run at lotus-miner")
return m.Prover.GenerateWinningPoSt(ctx, minerID, sectorInfo, randomness)
return m.localProver.GenerateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
randomness[31] &= 0x3f
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof)
sectorNums := make([]abi.SectorNumber, len(sectorInfo))
for i, s := range sectorInfo {
sectorNums[i] = s.SectorNumber
}
if len(sectorInfo) == 0 {
return nil, xerrors.New("generate window post len(sectorInfo)=0")
}
ppt, err := sectorInfo[0].SealProof.RegisteredWinningPoStProof()
if err != nil {
return nil, err
}
defer done()
if len(skipped) > 0 {
return nil, xerrors.Errorf("pubSectorToPriv skipped sectors: %+v", skipped)
}
var sectorNums []abi.SectorNumber
for _, s := range ps.Spsi.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Spsi.Values()[0].PoStProofType, minerID, randomness, sectorNums)
postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, minerID, randomness, sectorNums)
if err != nil {
return nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
sectorChallenges := make([]storiface.PostSectorChallenge, len(sectorInfo))
for i, s := range sectorInfo {
sectorChallenges[i] = storiface.PostSectorChallenge{
SealProof: s.SealProof,
SectorNumber: s.SectorNumber,
SealedCID: s.SealedCID,
Challenge: postChallenges.Challenges[s.SectorNumber],
}
}
var proofs []proof.PoStProof
err = m.sched.winningPoStSched.Schedule(ctx, false, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWinningPoSt(ctx, minerID, ps, randomness, postChallenges)
err = m.winningPoStSched.Schedule(ctx, false, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWinningPoSt(ctx, ppt, minerID, sectorChallenges, randomness)
if err != nil {
return err
}
@ -63,9 +74,9 @@ func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID,
}
func (m *Manager) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) {
if !m.sched.windowPoStSched.CanSched(ctx) {
if !m.windowPoStSched.CanSched(ctx) {
log.Info("GenerateWindowPoSt run at lotus-miner")
return m.Prover.GenerateWindowPoSt(ctx, minerID, sectorInfo, randomness)
return m.localProver.GenerateWindowPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWindowPoSt(ctx, minerID, sectorInfo, randomness)
@ -81,127 +92,126 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
return nil, nil, xerrors.New("generate window post len(sectorInfo)=0")
}
//get window proof type
proofType, err := abi.RegisteredSealProof.RegisteredWindowPoStProof(sectorInfo[0].SealProof)
ppt, err := sectorInfo[0].SealProof.RegisteredWindowPoStProof()
if err != nil {
return nil, nil, err
}
// Get sorted and de-duplicate sectors info
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
return nil, nil, xerrors.Errorf("PubSectorToPriv failed: %+v", err)
}
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("skipped = %d", len(skipped))
}
defer done()
partitionSectorsCount, err := builtin6.PoStProofWindowPoStPartitionSectors(proofType)
maxPartitionSize, err := builtin.PoStProofWindowPoStPartitionSectors(ppt) // todo proxy through chain/actors
if err != nil {
return nil, nil, xerrors.Errorf("get sectors count of partition failed:%+v", err)
}
// The partitions number of this batch
partitionCount := (len(ps.Spsi.Values()) + int(partitionSectorsCount) - 1) / int(partitionSectorsCount)
// ceil(sectorInfos / maxPartitionSize)
partitionCount := uint64((len(sectorInfo) + int(maxPartitionSize) - 1) / int(maxPartitionSize))
log.Infof("generateWindowPoSt len(partitionSectorsCount):%d len(partitionCount):%d \n", partitionSectorsCount, partitionCount)
log.Infof("generateWindowPoSt maxPartitionSize:%d partitionCount:%d \n", maxPartitionSize, partitionCount)
var faults []abi.SectorID
var skipped []abi.SectorID
var flk sync.Mutex
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var sectorNums []abi.SectorNumber
for _, s := range ps.Spsi.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
sort.Slice(sectorInfo, func(i, j int) bool {
return sectorInfo[i].SectorNumber < sectorInfo[j].SectorNumber
})
sectorNums := make([]abi.SectorNumber, len(sectorInfo))
sectorMap := make(map[abi.SectorNumber]proof.SectorInfo)
for i, s := range sectorInfo {
sectorNums[i] = s.SectorNumber
sectorMap[s.SectorNumber] = s
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Spsi.Values()[0].PoStProofType, minerID, randomness, sectorNums)
postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, minerID, randomness, sectorNums)
if err != nil {
return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
proofList := make([]proof.PoStProof, partitionCount)
proofList := make([]ffi.PartitionProof, partitionCount)
var wg sync.WaitGroup
for i := 0; i < partitionCount; i++ {
wg.Add(1)
go func(n int) {
wg.Add(int(partitionCount))
for partIdx := uint64(0); partIdx < partitionCount; partIdx++ {
go func(partIdx uint64) {
defer wg.Done()
proof, faultsectors, err := m.generatePartitionWindowPost(cctx, minerID, n, int(partitionSectorsCount), partitionCount, ps, randomness, postChallenges)
sectors := make([]storiface.PostSectorChallenge, 0)
for i := uint64(0); i < maxPartitionSize; i++ {
si := i + partIdx*maxPartitionSize
if si >= uint64(len(postChallenges.Sectors)) {
break
}
snum := postChallenges.Sectors[si]
sinfo := sectorMap[snum]
sectors = append(sectors, storiface.PostSectorChallenge{
SealProof: sinfo.SealProof,
SectorNumber: snum,
SealedCID: sinfo.SealedCID,
Challenge: postChallenges.Challenges[snum],
})
}
p, sk, err := m.generatePartitionWindowPost(cctx, ppt, minerID, int(partIdx), sectors, randomness)
if err != nil {
retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", i, err))
if len(faultsectors) > 0 {
log.Errorf("generateWindowPost groupCount:%d, faults:%d, err: %+v", i, len(faultsectors), err)
retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
if len(sk) > 0 {
log.Errorf("generateWindowPost groupCount:%d, skipped:%d, err: %+v", partIdx, len(sk), err)
flk.Lock()
faults = append(faults, faultsectors...)
skipped = append(skipped, sk...)
flk.Unlock()
}
cancel()
return
}
proofList[n] = proof
}(i)
time.Sleep(1 * time.Second)
proofList[partIdx] = ffi.PartitionProof(p)
}(partIdx)
}
wg.Wait()
pl := make([]ffi.PartitionProof, 0)
for i, pp := range proofList {
pl[i] = ffi.PartitionProof(pp)
}
postProofs, err := ffi.MergeWindowPoStPartitionProofs(proofType, pl)
postProofs, err := ffi.MergeWindowPoStPartitionProofs(ppt, proofList)
if err != nil {
return nil, nil, xerrors.Errorf("merge windowPoSt partition proofs: %v", err)
}
if len(faults) > 0 {
log.Warnf("GenerateWindowPoSt get faults: %d", len(faults))
return out, faults, retErr
if len(skipped) > 0 {
log.Warnf("GenerateWindowPoSt get skipped: %d", len(skipped))
return out, skipped, retErr
}
out = append(out, *postProofs)
return out, skipped, retErr
}
func (m *Manager) generatePartitionWindowPost(ctx context.Context, minerID abi.ActorID, index int, psc int, groupCount int, ps ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, postChallenges *ffiwrapper.FallbackChallenges) (proof.PoStProof, []abi.SectorID, error) {
var faults []abi.SectorID
func (m *Manager) generatePartitionWindowPost(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, partIndex int, sc []storiface.PostSectorChallenge, randomness abi.PoStRandomness) (proof.PoStProof, []abi.SectorID, error) {
log.Infow("generateWindowPost", "index", partIndex)
start := index * psc
end := (index + 1) * psc
if index == groupCount-1 {
end = len(ps.Spsi.Values())
}
log.Infow("generateWindowPost", "start", start, "end", end, "index", index)
privsectors, err := m.SplitSortedPrivateSectorInfo(ctx, ps, start, end)
if err != nil {
return proof.PoStProof{}, faults, xerrors.Errorf("generateWindowPost GetScopeSortedPrivateSectorInfo failed: %w", err)
}
var result *ffiwrapper.WindowPoStResult
err = m.sched.windowPoStSched.Schedule(ctx, true, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWindowPoSt(ctx, minerID, privsectors, index, start, randomness, postChallenges)
var result storiface.WindowPoStResult
err := m.windowPoStSched.Schedule(ctx, true, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWindowPoSt(ctx, ppt, minerID, sc, partIndex, randomness)
if err != nil {
return err
}
result = &out
result = out
return nil
})
if err != nil {
return proof.PoStProof{}, faults, err
log.Warnf("generateWindowPost partition:%d, get skip count:%d", partIndex, len(result.Skipped))
return result.PoStProofs, result.Skipped, err
}
if len(result.Skipped) > 0 {
log.Warnf("generateWindowPost partition:%d, get faults:%d", index, len(result.Skipped))
return proof.PoStProof{}, result.Skipped, xerrors.Errorf("generatePartitionWindowPoStProofs partition:%d get faults:%d", index, len(result.Skipped))
func (m *Manager) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte) ([]proof.PoStProof, error) {
//TODO implement me
panic("implement me")
}
return result.PoStProofs, faults, err
func (m *Manager) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) {
//TODO implement me
panic("implement me")
}

View File

@ -116,8 +116,10 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
index: si,
sched: newScheduler(),
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
Prover: prover,
localProver: prover,
work: statestore.New(ds),
callToWork: map[storiface.CallID]WorkID{},

View File

@ -5,6 +5,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"github.com/filecoin-project/specs-actors/v6/actors/runtime/proof"
"io"
"io/ioutil"
"math/rand"
@ -409,6 +410,14 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte) ([]proof.PoStProof, error) {
panic("implement me")
}
func (mgr *SectorMgr) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) {
panic("implement me")
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
@ -553,26 +562,6 @@ func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callI
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionSectorsCount(ctx context.Context, prooftype abi.RegisteredPoStProof) (int, error) {
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionVanillaParams(ctx context.Context, proofType abi.RegisteredPoStProof) (string, error) {
panic("not supported")
}
func (mgr *SectorMgr) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffiwrapper.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
panic("not supported")
}
func (mgr *SectorMgr) SplitSortedPrivateSectorInfo(ctx context.Context, privsector ffiwrapper.SortedPrivateSectorInfo, offset int, end int) (ffiwrapper.SortedPrivateSectorInfo, error) {
panic("not supported")
}
func (mgr *SectorMgr) GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*ffiwrapper.FallbackChallenges, error) {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -14,7 +14,6 @@ import (
type readonlyProvider struct {
index stores.SectorIndex
stor *stores.Local
remote *stores.Remote
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
@ -39,23 +38,3 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorR
return p, cancel, err
}
func (l *readonlyProvider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
ctx, cancel := context.WithCancel(ctx)
// use TryLock to avoid blocking
locked, err := l.index.StorageTryLock(ctx, id.ID, existing, storiface.FTNone)
if err != nil {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
if !locked {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
}
p, _, err := l.remote.AcquireSectorPaths(ctx, id, existing, storiface.FTNone, sealing)
return p, cancel, err
}

View File

@ -68,11 +68,6 @@ type scheduler struct {
info chan func(interface{})
// window scheduler
windowPoStSched *poStScheduler
// winning scheduler
winningPoStSched *poStScheduler
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
@ -88,8 +83,6 @@ type workerHandle struct {
lk sync.Mutex // can be taken inside sched.workersLk.RLock
acceptTasks map[sealtasks.TaskType]struct{}
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*schedWindow
@ -169,9 +162,6 @@ func newScheduler() *scheduler {
info: make(chan func(interface{})),
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
@ -557,9 +547,6 @@ func (sh *scheduler) schedClose() {
for i, w := range sh.workers {
sh.workerCleanup(i, w)
}
sh.windowPoStSched.schedClose()
sh.winningPoStSched.schedClose()
}
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {

View File

@ -32,8 +32,8 @@ func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
return ps
}
func (ps *poStScheduler) AddWorker(wid storiface.WorkerID, w *workerHandle) bool {
if _, ok := w.acceptTasks[ps.postType]; !ok {
func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *workerHandle) bool {
if _, ok := tasks[ps.postType]; !ok {
return false
}
@ -230,7 +230,7 @@ func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle)
select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
log.Errorf("timeout closing worker manager goroutine %s", wid)
}
ps.lk.Lock()
}

View File

@ -16,8 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -136,11 +135,11 @@ func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id st
panic("implement me")
}
func (s *schedTestWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
func (s *schedTestWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
panic("implement me")
}
func (s *schedTestWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
func (s *schedTestWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
panic("implement me")
}
@ -203,7 +202,15 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
require.NoError(t, err)
}
require.NoError(t, sched.runWorker(context.TODO(), w))
sessID, err := w.Session(context.TODO())
require.NoError(t, err)
wid := storiface.WorkerID(sessID)
wh, err := newWorkerHandle(context.TODO(), w)
require.NoError(t, err)
require.NoError(t, sched.runWorker(context.TODO(), wid, wh))
}
func TestSchedStartStop(t *testing.T) {

View File

@ -24,23 +24,10 @@ type schedWorker struct {
windowsRequested int
}
// context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) {
info, err := w.Info(ctx)
if err != nil {
return xerrors.Errorf("getting worker info: %w", err)
}
sessID, err := w.Session(ctx)
if err != nil {
return xerrors.Errorf("getting worker session: %w", err)
}
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
tasks, err := w.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting worker tasks: %w", err)
return nil, xerrors.Errorf("getting worker info: %w", err)
}
worker := &workerHandle{
@ -51,20 +38,15 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
active: &activeResources{},
enabled: true,
acceptTasks: tasks,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
}
wid := storiface.WorkerID(sessID)
//add worker to post scheduler
if sh.windowPoStSched.AddWorker(wid, worker) ||
sh.winningPoStSched.AddWorker(wid, worker) {
return nil
return worker, nil
}
// context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, worker *workerHandle) error {
sh.workersLk.Lock()
_, exist := sh.workers[wid]
if exist {

View File

@ -34,11 +34,12 @@ var order = map[TaskType]int{
TTCommit2: 3,
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2, // most priority
TTFinalize: -2,
TTGenerateWindowPoSt: -3,
TTGenerateWinningPoSt: -4,
TTGenerateWinningPoSt: -4, // most priority
}
var shortNames = map[TaskType]string{

View File

@ -10,6 +10,7 @@ import (
func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
m.sched.workersLk.RLock()
defer m.sched.workersLk.RUnlock()
out := map[uuid.UUID]storiface.WorkerStats{}
@ -18,7 +19,6 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
Enabled: handle.enabled,
MemUsedMin: handle.active.memUsedMin,
MemUsedMax: handle.active.memUsedMax,
GpuUsed: handle.active.gpuUsed,
@ -34,8 +34,8 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
m.sched.workersLk.RUnlock()
//list post workers
m.sched.winningPoStSched.WorkerStats(cb)
m.sched.windowPoStSched.WorkerStats(cb)
m.winningPoStSched.WorkerStats(cb)
m.windowPoStSched.WorkerStats(cb)
return out
}

View File

@ -2,7 +2,6 @@ package stores
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strconv"
@ -11,7 +10,6 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -54,13 +52,11 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux := mux.NewRouter()
mux.HandleFunc("/remote/stat/{id}", handler.remoteStatFs).Methods("GET")
mux.HandleFunc("/remote/vanilla/single", handler.generateSingleVanillaProof).Methods("POST")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", handler.remoteGetAllocated).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
//for post vanilla
mux.HandleFunc("/remote/vanilla/single", handler.generateSingleVanillaProof).Methods("POST")
mux.ServeHTTP(w, r)
}
@ -291,6 +287,32 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}
type SingleVanillaParams struct {
Miner abi.ActorID
Sector storiface.PostSectorChallenge
ProofType abi.RegisteredPoStProof
}
func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r *http.Request) {
var params SingleVanillaParams
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
http.Error(w, err.Error(), 500)
return
}
vanilla, err := handler.Local.GenerateSingleVanillaProof(r.Context(), params.Miner, params.Sector, params.ProofType)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
_, err = w.Write(vanilla)
if err != nil {
log.Error("response writer: ", err)
}
}
func ftFromString(t string) (storiface.SectorFileType, error) {
switch t {
case storiface.FTUnsealed.String():
@ -303,35 +325,3 @@ func ftFromString(t string) (storiface.SectorFileType, error) {
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}
type SingleVanillaParams struct {
PrivSector ffi.PrivateSectorInfo
Challenge []uint64
}
func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
params := SingleVanillaParams{}
err = json.Unmarshal(body, &params)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
vanilla, err := ffi.GenerateSingleVanillaProof(params.PrivSector, params.Challenge)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
_, err = w.Write(vanilla)
if err != nil {
log.Error("response writer: ", err)
}
}

View File

@ -42,8 +42,6 @@ type StorageInfo struct {
Groups []Group
AllowTo []Group
// storage path
Path string
}
type HealthReport struct {
@ -54,15 +52,13 @@ type HealthReport struct {
type SectorStorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
BaseURLs []string
Weight uint64
CanSeal bool
CanStore bool
Primary bool
// storage path
Path string
}
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/index.go -package=mocks . SectorIndex
@ -84,9 +80,6 @@ type SectorIndex interface { // part of storage-miner api
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error)
StorageList(ctx context.Context) (map[ID][]Decl, error)
// get sector storage url for post worker
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error)
}
type Decl struct {
@ -185,8 +178,6 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo
i.stores[si.ID].info.Path = si.Path
return nil
}
i.stores[si.ID] = &storageEntry{
@ -332,7 +323,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
continue
}
urls := make([]string, len(st.info.URLs))
urls, burls := make([]string, len(st.info.URLs)), make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
if err != nil {
@ -341,6 +332,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls[k] = rl.String()
burls[k] = u
}
if allowTo != nil && len(st.info.AllowTo) > 0 {
@ -354,14 +346,13 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
BaseURLs: burls,
Weight: st.info.Weight * n, // storage with more sector types is better
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
Primary: isprimary[id],
Path: st.info.Path,
})
}
@ -411,7 +402,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
}
}
urls := make([]string, len(st.info.URLs))
urls, burls := make([]string, len(st.info.URLs)), make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
if err != nil {
@ -420,19 +411,19 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls[k] = rl.String()
burls[k] = u
}
out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
BaseURLs: burls,
Weight: st.info.Weight * 0, // TODO: something better than just '0'
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
Primary: false,
Path: st.info.Path,
})
}
}
@ -536,49 +527,4 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]ID,
return out, nil
}
func (i *Index) StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) {
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := map[ID]uint64{}
for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 {
continue
}
for _, id := range i.sectors[Decl{s, pathType}] {
storageIDs[id.storage]++
}
}
storages := make([]StorageInfo, 0, len(storageIDs))
for id := range storageIDs {
st, ok := i.stores[id]
if !ok {
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
continue
}
urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
urls[k] = u
}
if st.info.CanStore {
storages = append(storages, StorageInfo{
URLs: urls,
})
}
}
if len(storages) == 0 {
return "", xerrors.New("cant find sector storage")
}
if len(storages[0].URLs) == 0 {
return "", xerrors.New("sector storage url is nil")
}
return storages[0].URLs[0], nil
}
var _ SectorIndex = &Index{}

View File

@ -5,12 +5,10 @@ import (
"os"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -37,8 +35,6 @@ type PartialFileHandler interface {
type Store interface {
AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool, keepIn []ID) error
// like remove, but doesn't remove the primary sector copy, nor the last
@ -52,5 +48,5 @@ type Store interface {
Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error)
GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error)
GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error)
}

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
ffi "github.com/filecoin-project/filecoin-ffi"
"io/ioutil"
"math/bits"
"math/rand"
@ -14,9 +15,9 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -222,7 +223,6 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
CanStore: meta.CanStore,
Groups: meta.Groups,
AllowTo: meta.AllowTo,
Path: p,
}, fst)
if err != nil {
return xerrors.Errorf("declaring storage in index: %w", err)
@ -720,11 +720,36 @@ func (st *Local) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
return p.stat(st.localStorage)
}
func (st *Local) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error) {
return nil, nil
func (st *Local) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) {
sr := storage.SectorRef{
ID: abi.SectorID{
Miner: minerID,
Number: si.SectorNumber,
},
ProofType: si.SealProof,
}
func (st *Local) AcquireSectorPaths(ctx context.Context, sid storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
return st.AcquireSector(ctx, sid, existing, allocate, pathType, storiface.AcquireCopy)
src, _, err := st.AcquireSector(ctx, sr, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire sector: %w", err)
}
if src.Sealed == "" || src.Cache == "" {
return nil, errPathNotFound
}
psi := ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
SealProof: si.SealProof,
SectorNumber: si.SectorNumber,
SealedCID: si.SealedCID,
},
CacheDirPath: src.Cache,
PoStProofType: ppt,
SealedSectorPath: src.Sealed,
}
return ffi.GenerateSingleVanillaProof(psi, si.Challenge)
}
var _ Store = &Local{}

View File

@ -125,21 +125,6 @@ func (mr *MockSectorIndexMockRecorder) StorageGetLocks(arg0 interface{}) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageGetLocks", reflect.TypeOf((*MockSectorIndex)(nil).StorageGetLocks), arg0)
}
// StorageGetUrl mocks base method.
func (m *MockSectorIndex) StorageGetUrl(arg0 context.Context, arg1 abi.SectorID, arg2 storiface.SectorFileType) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageGetUrl", arg0, arg1, arg2)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageGetUrl indicates an expected call of StorageGetUrl.
func (mr *MockSectorIndexMockRecorder) StorageGetUrl(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageGetUrl", reflect.TypeOf((*MockSectorIndex)(nil).StorageGetUrl), arg0, arg1, arg2)
}
// StorageInfo mocks base method.
func (m *MockSectorIndex) StorageInfo(arg0 context.Context, arg1 stores.ID) (stores.StorageInfo, error) {
m.ctrl.T.Helper()

View File

@ -9,7 +9,6 @@ import (
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
ffiwrapper "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -56,22 +55,6 @@ func (mr *MockStoreMockRecorder) AcquireSector(arg0, arg1, arg2, arg3, arg4, arg
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireSector", reflect.TypeOf((*MockStore)(nil).AcquireSector), arg0, arg1, arg2, arg3, arg4, arg5)
}
// AcquireSectorPaths mocks base method.
func (m *MockStore) AcquireSectorPaths(arg0 context.Context, arg1 storage.SectorRef, arg2, arg3 storiface.SectorFileType, arg4 storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AcquireSectorPaths", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(storiface.SectorPaths)
ret1, _ := ret[1].(storiface.SectorPaths)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// AcquireSectorPaths indicates an expected call of AcquireSectorPaths.
func (mr *MockStoreMockRecorder) AcquireSectorPaths(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireSectorPaths", reflect.TypeOf((*MockStore)(nil).AcquireSectorPaths), arg0, arg1, arg2, arg3, arg4)
}
// FsStat mocks base method.
func (m *MockStore) FsStat(arg0 context.Context, arg1 stores.ID) (fsutil.FsStat, error) {
m.ctrl.T.Helper()
@ -88,7 +71,7 @@ func (mr *MockStoreMockRecorder) FsStat(arg0, arg1 interface{}) *gomock.Call {
}
// GenerateSingleVanillaProof mocks base method.
func (m *MockStore) GenerateSingleVanillaProof(arg0 context.Context, arg1 abi.ActorID, arg2 *ffiwrapper.PrivateSectorInfo, arg3 []uint64) ([]byte, error) {
func (m *MockStore) GenerateSingleVanillaProof(arg0 context.Context, arg1 abi.ActorID, arg2 storiface.PostSectorChallenge, arg3 abi.RegisteredPoStProof) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateSingleVanillaProof", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].([]byte)

View File

@ -17,7 +17,6 @@ import (
"strings"
"sync"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
@ -740,123 +739,50 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}, nil
}
func (r *Remote) AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, sinfo storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) {
p, err := r.local.GenerateSingleVanillaProof(ctx, minerID, sinfo, ppt)
if err != errPathNotFound {
return p, err
}
for {
r.fetchLk.Lock()
c, locked := r.fetching[s.ID]
if !locked {
r.fetching[s.ID] = make(chan struct{})
r.fetchLk.Unlock()
break
}
r.fetchLk.Unlock()
select {
case <-c:
continue
case <-ctx.Done():
return storiface.SectorPaths{}, storiface.SectorPaths{}, ctx.Err()
}
}
defer func() {
r.fetchLk.Lock()
close(r.fetching[s.ID])
delete(r.fetching, s.ID)
r.fetchLk.Unlock()
}()
var paths storiface.SectorPaths
var stores storiface.SectorPaths
ssize, err := s.ProofType.SectorSize()
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 {
continue
}
sis, err := r.index.StorageFindSector(ctx, s.ID, fileType, ssize, false)
if err != nil {
log.Warnf("finding existing sector %d failed: %+v", s.ID, err)
continue
}
for _, si := range sis {
if (pathType == storiface.PathSealing) && !si.CanSeal {
continue
}
if (pathType == storiface.PathStorage) && !si.CanStore {
continue
}
spath := filepath.Join(si.Path, fileType.String(), storiface.SectorName(s.ID))
storiface.SetPathByType(&paths, fileType, spath)
storiface.SetPathByType(&stores, fileType, string(si.ID))
if si.CanStore {
existing ^= fileType
break
}
}
}
return paths, stores, nil
}
func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error) {
sector := abi.SectorID{
sid := abi.SectorID{
Miner: minerID,
Number: privsector.Psi.SectorNumber,
Number: sinfo.SectorNumber,
}
storageUrl, err := r.index.StorageGetUrl(ctx, sector, storiface.FTCache|storiface.FTSealed)
si, err := r.index.StorageFindSector(ctx, sid, storiface.FTSealed|storiface.FTCache, 0, false)
if err != nil {
return nil, xerrors.Errorf("finding path for sector storage: %w", err)
return nil, xerrors.Errorf("finding sector %d failed: %w", sid, err)
}
surl, err := url.Parse(storageUrl)
if err != nil {
return nil, xerrors.Errorf("parse sector storage url failed : %w", err)
}
surl.Path = gopath.Join(surl.Path, "vanilla", "single")
requestParams := SingleVanillaParams{
PrivSector: privsector.Psi,
Challenge: challenge,
Miner: minerID,
Sector: sinfo,
ProofType: ppt,
}
bytes, err := json.Marshal(requestParams)
jreq, err := json.Marshal(requestParams)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", surl.String(), strings.NewReader(string(bytes)))
for _, info := range si {
for _, u := range info.BaseURLs {
url := fmt.Sprintf("%s/vanilla/single", u)
req, err := http.NewRequest("POST", url, strings.NewReader(string(jreq)))
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
if r.auth != nil {
req.Header = r.auth.Clone()
}
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.Error("response close: ", err)
}
}()
if resp.StatusCode != 200 {
body, err := ioutil.ReadAll(resp.Body)
@ -864,15 +790,28 @@ func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.Act
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return nil, xerrors.Errorf("non-200 code: %w", string(body))
if err := resp.Body.Close(); err != nil {
log.Error("response close: ", err)
}
return nil, xerrors.Errorf("non-200 code from %s: '%s'", url, string(body))
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err := resp.Body.Close(); err != nil {
log.Error("response close: ", err)
}
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return body, nil
}
}
return nil, xerrors.Errorf("sector not found")
}
var _ Store = &Remote{}

View File

@ -42,7 +42,6 @@ type Resources struct {
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
var GPUUtilizationProof float64 = 1.0
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64, gpus int) uint64 {
@ -63,6 +62,8 @@ func (r Resources) Threads(wcpus uint64, gpus int) uint64 {
return uint64(mp)
}
var GPUUtilizationProof float64 = 1.0 // todo use resource tablo
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"time"
"github.com/google/uuid"
@ -114,6 +115,7 @@ var _ fmt.Stringer = &CallID{}
var UndefCall CallID
type WorkerCalls interface {
// async
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
@ -128,6 +130,27 @@ type WorkerCalls interface {
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
// sync
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (WindowPoStResult, error)
}
type WindowPoStResult struct {
PoStProofs proof.PoStProof
Skipped []abi.SectorID
}
type PostSectorChallenge struct {
SealProof abi.RegisteredSealProof
SectorNumber abi.SectorNumber
SealedCID cid.Cid
Challenge []uint64
}
type FallbackChallenges struct {
Sectors []abi.SectorNumber
Challenges map[abi.SectorNumber][]uint64
}
type ErrorCode int

View File

@ -31,15 +31,11 @@ func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID,
panic("implement me")
}
func (t *testExec) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error) {
func (t *testExec) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GetSectorVanillaParams(ctx context.Context, index int, partitionVanillaParams string) (string, error) {
func (t *testExec) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) {
panic("implement me")
}

View File

@ -156,10 +156,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorPaths(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
return storiface.SectorPaths{}, nil, nil
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
@ -513,32 +509,34 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef,
})
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
sb, err := l.executor()
if err != nil {
return nil, err
}
ps := privsectors.Spsi.Values()
vanillas := make([][]byte, len(ps))
var rerr error
// todo throttle config
var wg sync.WaitGroup
for i, v := range ps {
wg.Add(1)
go func(index int, sector ffi.PrivateSectorInfo) {
wg.Add(len(sectors))
vproofs := make([][]byte, len(sectors))
var rerr error
for i, s := range sectors {
go func(i int, s storiface.PostSectorChallenge) {
defer wg.Done()
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &ffiwrapper.PrivateSectorInfo{Psi: sector}, sectorChallenges.Fc.Challenges[sector.SectorNumber])
// todo context with tighter deadline (+config)
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, s, ppt)
if err != nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila failed: %w", sector.SectorNumber, err))
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila failed: %w", s.SectorNumber, err))
return
}
if vanilla == nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila is nil", sector.SectorNumber))
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila is nil", s.SectorNumber))
}
vanillas[index] = vanilla
}(i, v)
vproofs[i] = vanilla
}(i, s)
}
wg.Wait()
@ -546,55 +544,55 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID,
return nil, rerr
}
return sb.GenerateWinningPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, vanillas)
return sb.GenerateWinningPoStWithVanilla(ctx, ppt, mid, randomness, vproofs)
}
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
sb, err := l.executor()
if err != nil {
return ffiwrapper.WindowPoStResult{}, err
return storiface.WindowPoStResult{}, err
}
var slk sync.Mutex
var skipped []abi.SectorID
ps := privsectors.Spsi.Values()
out := make([][]byte, len(ps))
// todo throttle config
var wg sync.WaitGroup
for i := range ps {
wg.Add(1)
go func(index int) {
wg.Add(len(sectors))
vproofs := make([][]byte, len(sectors))
for i, s := range sectors {
go func(i int, s storiface.PostSectorChallenge) {
defer wg.Done()
sector := ps[index]
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &ffiwrapper.PrivateSectorInfo{Psi: sector}, sectorChallenges.Fc.Challenges[sector.SectorNumber])
// todo context with tighter deadline (+config)
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, s, ppt)
if err != nil || vanilla == nil {
slk.Lock()
skipped = append(skipped, abi.SectorID{
Miner: mid,
Number: sector.SectorNumber,
Number: s.SectorNumber,
})
slk.Unlock()
log.Errorf("get sector: %d, vanilla: %s, offset: %d", sector.SectorNumber, vanilla, offset+index)
log.Errorf("get sector: %d, vanilla: %s, offset: %d", s.SectorNumber, vanilla)
return
}
out[index] = vanilla
}(i)
vproofs[i] = vanilla
}(i, s)
}
wg.Wait()
if len(skipped) > 0 {
return ffiwrapper.WindowPoStResult{PoStProofs: proof.PoStProof{}, Skipped: skipped}, nil
panic("todo") // big TODO
}
PoSts, err := sb.GenerateWindowPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, out, partitionIdx)
res, err := sb.GenerateWindowPoStWithVanilla(ctx, ppt, mid, randomness, vproofs, partitionIdx)
if err != nil {
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, err
}
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, nil
return storiface.WindowPoStResult{
PoStProofs: res,
Skipped: skipped,
}, err
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {

6
go.mod
View File

@ -39,7 +39,7 @@ require (
github.com/filecoin-project/go-fil-markets v1.14.1
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.2
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53
github.com/filecoin-project/go-state-types v0.1.1
github.com/filecoin-project/go-statemachine v1.0.1
github.com/filecoin-project/go-statestore v0.2.0
@ -50,8 +50,8 @@ require (
github.com/filecoin-project/specs-actors/v4 v4.0.1
github.com/filecoin-project/specs-actors/v5 v5.0.4
github.com/filecoin-project/specs-actors/v6 v6.0.1
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec
github.com/filecoin-project/specs-storage v0.1.1-0.20211213202648-f14267c929ff
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402
github.com/filecoin-project/specs-storage v0.1.1-0.20220114131651-ee969fade269
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.1
github.com/gdamore/tcell/v2 v2.2.0

15
go.sum
View File

@ -338,10 +338,9 @@ github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.m
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
github.com/filecoin-project/go-paramfetch v0.0.2 h1:a6W3Ij6CKhwHYYlx+5mqvBIyw4CabZH2ojdEaoAZ6/g=
github.com/filecoin-project/go-paramfetch v0.0.2/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53 h1:+nripp+UI/rhl01w9Gs4V0XDGaVPYPMGU/D/gNVLue0=
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200928172055-2df22083d8ab/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
@ -357,7 +356,6 @@ github.com/filecoin-project/go-statestore v0.2.0 h1:cRRO0aPLrxKQCZ2UOQbzFGn4WDNd
github.com/filecoin-project/go-statestore v0.2.0/go.mod h1:8sjBYbS35HwPzct7iT4lIXjLlYyPor80aU7t7a/Kspo=
github.com/filecoin-project/go-storedcounter v0.1.0 h1:Mui6wSUBC+cQGHbDUBcO7rfh5zQkWJM/CpAZa/uOuus=
github.com/filecoin-project/go-storedcounter v0.1.0/go.mod h1:4ceukaXi4vFURIoxYMfKzaRF5Xv/Pinh2oTnoxpv+z8=
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
github.com/filecoin-project/specs-actors v0.9.14 h1:68PVstg2UB3ZsMLF+DKFTAs/YKsqhKWynkr0IqmVRQY=
github.com/filecoin-project/specs-actors v0.9.14/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
@ -376,10 +374,10 @@ github.com/filecoin-project/specs-actors/v6 v6.0.0/go.mod h1:V1AYfi5GkHXipx1mnVi
github.com/filecoin-project/specs-actors/v6 v6.0.1 h1:laxvHNsvrq83Y9n+W7znVCePi3oLyRf0Rkl4jFO8Wew=
github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVivoICZh3wtwPxDVuds+fbfQtk=
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec h1:KV9vE+Sl2Y3qKsrpba4HcE7wHwK7v6O5U/S0xHbje6A=
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-storage v0.1.1-0.20211213202648-f14267c929ff h1:JO62nquOGhjoDf9+JkAcV+wsD5yhoyIKOMj70ZNdD3Q=
github.com/filecoin-project/specs-storage v0.1.1-0.20211213202648-f14267c929ff/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402 h1:jPJlk7YpWKDNLx/3Dosgm3ie3wsNhe3YP6Xl8Gc5vqY=
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-storage v0.1.1-0.20220114131651-ee969fade269 h1:icvWX24jKnt8qCobOE5fLJhMxiYEZy5RcGSkLPTPndU=
github.com/filecoin-project/specs-storage v0.1.1-0.20220114131651-ee969fade269/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
@ -722,7 +720,6 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE=
github.com/ipfs/go-graphsync v0.11.5 h1:WA5hVxGBtcal6L6nqubKiqRolaZxbexOK3GumGFJRR4=
github.com/ipfs/go-graphsync v0.11.5/go.mod h1:+/sZqRwRCQRrV7NCzgBtufmr5QGpUE98XSa7NlsztmM=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE=

View File

@ -5,8 +5,13 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/cmd/lotus-seal-worker/sealworker"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/ipfs/go-datastore/namespace"
"io/ioutil"
"net"
"net/http"
"sync"
"testing"
"time"
@ -109,10 +114,12 @@ type Ensemble struct {
inactive struct {
fullnodes []*TestFullNode
miners []*TestMiner
workers []*TestWorker
}
active struct {
fullnodes []*TestFullNode
miners []*TestMiner
workers []*TestWorker
bms map[*TestMiner]*BlockMiner
}
genesis struct {
@ -268,6 +275,32 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO
return n
}
// Worker enrolls a new worker, using the provided full node for chain
// interactions.
func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...NodeOpt) *Ensemble {
require.NotNil(n.t, minerNode, "miner node required when instantiating worker")
options := DefaultNodeOpts
for _, o := range opts {
err := o(&options)
require.NoError(n.t, err)
}
rl, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(n.t, err)
*worker = TestWorker{
t: n.t,
MinerNode: minerNode,
RemoteListener: rl,
options: options,
}
n.inactive.workers = append(n.inactive.workers, worker)
return n
}
// Start starts all enrolled nodes.
func (n *Ensemble) Start() *Ensemble {
ctx := context.Background()
@ -437,6 +470,7 @@ func (n *Ensemble) Start() *Ensemble {
// require.NoError(n.t, err)
r := repo.NewMemory(nil)
n.t.Cleanup(r.Cleanup)
lr, err := r.Lock(repo.StorageMiner)
require.NoError(n.t, err)
@ -498,6 +532,15 @@ func (n *Ensemble) Start() *Ensemble {
_, err = nic.Next()
require.NoError(n.t, err)
// using real proofs, therefore need real sectors.
if !n.bootstrapped && !n.options.mockProofs {
err := lr.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: m.PresealDir})
})
require.NoError(n.t, err)
}
err = lr.Close()
require.NoError(n.t, err)
@ -533,6 +576,14 @@ func (n *Ensemble) Start() *Ensemble {
// regardless of system pressure.
node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig {
scfg := config.DefaultStorageMiner()
if m.options.minerNoLocalSealing {
scfg.Storage.AllowAddPiece = false
scfg.Storage.AllowPreCommit1 = false
scfg.Storage.AllowPreCommit2 = false
scfg.Storage.AllowCommit = false
}
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.Storage
}),
@ -578,12 +629,6 @@ func (n *Ensemble) Start() *Ensemble {
stop, err := node.New(ctx, opts...)
require.NoError(n.t, err)
// using real proofs, therefore need real sectors.
if !n.bootstrapped && !n.options.mockProofs {
err := m.StorageAddLocal(ctx, m.PresealDir)
require.NoError(n.t, err)
}
n.t.Cleanup(func() { _ = stop(context.Background()) })
// Are we hitting this node through its RPC?
@ -611,6 +656,63 @@ func (n *Ensemble) Start() *Ensemble {
// to active, so clear the slice.
n.inactive.miners = n.inactive.miners[:0]
// ---------------------
// WORKERS
// ---------------------
// Create all inactive workers.
for i, m := range n.inactive.workers {
r := repo.NewMemory(nil)
lr, err := r.Lock(repo.Worker)
require.NoError(n.t, err)
ds, err := lr.Datastore(context.Background(), "/metadata")
require.NoError(n.t, err)
addr := m.RemoteListener.Addr().String()
localStore, err := stores.NewLocal(ctx, lr, m.MinerNode, []string{"http://" + addr + "/remote"})
require.NoError(n.t, err)
auth := http.Header(nil)
remote := stores.NewRemote(localStore, m.MinerNode, auth, 20, &stores.DefaultPartialFileHandler{})
fh := &stores.FetchHandler{Local: localStore, PfHandler: &stores.DefaultPartialFileHandler{}}
m.FetchHandler = fh.ServeHTTP
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
workerApi := &sealworker.Worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
TaskTypes: m.options.workerTasks,
NoSwap: false,
}, remote, localStore, m.MinerNode, m.MinerNode, wsts),
LocalStore: localStore,
Storage: lr,
}
m.Worker = workerApi
require.True(n.t, m.options.rpc)
withRPC := workerRpc(n.t, m)
n.inactive.workers[i] = withRPC
err = m.MinerNode.WorkerConnect(ctx, "http://"+addr+"/rpc/v0")
require.NoError(n.t, err)
n.active.workers = append(n.active.workers, m)
}
// If we are here, we have processed all inactive workers and moved them
// to active, so clear the slice.
n.inactive.workers = n.inactive.workers[:0]
// ---------------------
// MISC
// ---------------------
// Link all the nodes.
err = n.mn.LinkAll()
require.NoError(n.t, err)

View File

@ -23,6 +23,20 @@ func EnsembleMinimal(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMin
return &full, &miner, ens
}
func EnsembleWorker(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestWorker, *Ensemble) {
opts = append(opts, WithAllSubsystems())
eopts, nopts := siftOptions(t, opts)
var (
full TestFullNode
miner TestMiner
worker TestWorker
)
ens := NewEnsemble(t, eopts...).FullNode(&full, nopts...).Miner(&miner, &full, nopts...).Worker(&miner, &worker, nopts...).Start()
return &full, &miner, &worker, ens
}
func EnsembleWithMinerAndMarketNodes(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) {
eopts, nopts := siftOptions(t, opts)

View File

@ -3,6 +3,7 @@ package kit
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
@ -33,6 +34,9 @@ type nodeOpts struct {
optBuilders []OptBuilder
sectorSize abi.SectorSize
maxStagingDealsBytes int64
minerNoLocalSealing bool // use worker
workerTasks []sealtasks.TaskType
}
// DefaultNodeOpts are the default options that will be applied to test nodes.
@ -40,6 +44,8 @@ var DefaultNodeOpts = nodeOpts{
balance: big.Mul(big.NewInt(100000000), types.NewInt(build.FilecoinPrecision)),
sectors: DefaultPresealsPerBootstrapMiner,
sectorSize: abi.SectorSize(2 << 10), // 2KiB.
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize},
}
// OptBuilder is used to create an option after some other node is already
@ -76,6 +82,13 @@ func WithMaxStagingDealsBytes(size int64) NodeOpt {
}
}
func WithNoLocalSealing(nope bool) NodeOpt {
return func(opts *nodeOpts) error {
opts.minerNoLocalSealing = nope
return nil
}
}
func DisableLibp2p() NodeOpt {
return func(opts *nodeOpts) error {
opts.disableLibp2p = true
@ -154,3 +167,10 @@ func SectorSize(sectorSize abi.SectorSize) NodeOpt {
return nil
}
}
func WithTaskTypes(tt []sealtasks.TaskType) NodeOpt {
return func(opts *nodeOpts) error {
opts.workerTasks = tt
return nil
}
}

30
itests/kit/node_worker.go Normal file
View File

@ -0,0 +1,30 @@
package kit
import (
"context"
"net"
"net/http"
"testing"
"github.com/filecoin-project/lotus/api"
"github.com/multiformats/go-multiaddr"
)
// TestWorker represents a worker enrolled in an Ensemble.
type TestWorker struct {
api.Worker
t *testing.T
// ListenAddr is the address on which an API server is listening, if an
// API server is created for this Node
ListenAddr multiaddr.Multiaddr
Stop func(context.Context) error
FetchHandler http.HandlerFunc
MinerNode *TestMiner
RemoteListener net.Listener
options nodeOpts
}

View File

@ -3,6 +3,7 @@ package kit
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/cmd/lotus-seal-worker/sealworker"
"net"
"net/http"
"net/http/httptest"
@ -63,3 +64,18 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
m.ListenAddr, m.StorageMiner = maddr, cl
return m
}
func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
handler := sealworker.WorkerHandler(m.MinerNode.AuthVerify, m.FetchHandler, m.Worker, false)
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Println("creating RPC server for a worker at: ", srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewWorkerRPCV0(context.Background(), url, nil)
require.NoError(t, err)
t.Cleanup(stop)
m.ListenAddr, m.Worker = maddr, cl
return m
}

115
itests/worker_test.go Normal file
View File

@ -0,0 +1,115 @@
package itests
import (
"context"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage"
logging "github.com/ipfs/go-log/v2"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestWorkerPledge(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
miner.PledgeSectors(ctx, 1, 0, nil)
}
func TestWinningPostWorker(t *testing.T) {
prevIns := build.InsecurePoStValidation
build.InsecurePoStValidation = false
defer func() {
build.InsecurePoStValidation = prevIns
}()
ctx := context.Background()
client, _, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
client.WaitTillChain(ctx, kit.HeightAtLeast(6))
}
func TestWindowPostWorker(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("storageminer", "INFO")
sectors := 2 * 48 * 2
client, miner, _, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
bm := ens.InterconnectAll().BeginMining(2 * time.Millisecond)[0]
di = di.NextNotElapsed()
t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 + storage.SubmitConfidence
ts := client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Log("Waiting for post message")
bm.Stop()
for i := 0; i < 500; i++ {
n, err := client.MpoolPending(ctx, types.EmptyTSK)
require.NoError(t, err)
if len(n) > 0 {
break
}
time.Sleep(40 * time.Millisecond)
}
n, err := client.MpoolPending(ctx, types.EmptyTSK)
require.NoError(t, err)
require.Greater(t, len(n), 0)
t.Log("post message landed")
bm.MineBlocks(ctx, 2*time.Millisecond)
waitUntil = di.Open + di.WPoStChallengeWindow*3
t.Logf("End for head.Height > %d", waitUntil)
ts = client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Logf("Now head.Height = %d", ts.Height())
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
ssz, err := miner.ActorSectorSize(ctx, maddr)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*uint64(sectors)))
}

View File

@ -3,12 +3,13 @@ package repo
import (
"context"
"encoding/json"
"github.com/filecoin-project/lotus/node/config"
"github.com/google/uuid"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dssync "github.com/ipfs/go-datastore/sync"
@ -19,7 +20,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/node/config"
)
type MemRepo struct {
@ -36,6 +36,9 @@ type MemRepo struct {
keystore map[string]types.KeyInfo
blockstore blockstore.Blockstore
sc *stores.StorageConfig
tempDir string
// given a repo type, produce the default config
configF func(t RepoType) interface{}
@ -51,9 +54,7 @@ type lockedMemRepo struct {
t RepoType
sync.RWMutex
tempDir string
token *byte
sc *stores.StorageConfig
}
func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
@ -61,13 +62,13 @@ func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
return stores.StorageConfig{}, err
}
if lmem.sc == nil {
lmem.sc = &stores.StorageConfig{StoragePaths: []stores.LocalPath{
if lmem.mem.sc == nil {
lmem.mem.sc = &stores.StorageConfig{StoragePaths: []stores.LocalPath{
{Path: lmem.Path()},
}}
}
return *lmem.sc, nil
return *lmem.mem.sc, nil
}
func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
@ -77,7 +78,7 @@ func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
_, _ = lmem.GetStorage()
c(lmem.sc)
c(lmem.mem.sc)
return nil
}
@ -97,8 +98,8 @@ func (lmem *lockedMemRepo) Path() string {
lmem.Lock()
defer lmem.Unlock()
if lmem.tempDir != "" {
return lmem.tempDir
if lmem.mem.tempDir != "" {
return lmem.mem.tempDir
}
t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
@ -113,6 +114,16 @@ func (lmem *lockedMemRepo) Path() string {
if err := os.MkdirAll(filepath.Join(t, "deal-staging"), 0755); err != nil {
panic(err)
}
}
if lmem.t == StorageMiner || lmem.t == Worker {
lmem.initSectorStore(t)
}
lmem.mem.tempDir = t
return t
}
func (lmem *lockedMemRepo) initSectorStore(t string) {
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), stores.StorageConfig{
StoragePaths: []stores.LocalPath{
{Path: t},
@ -135,10 +146,6 @@ func (lmem *lockedMemRepo) Path() string {
}
}
lmem.tempDir = t
return t
}
var _ Repo = &MemRepo{}
// MemRepoOptions contains options for memory repo
@ -207,6 +214,18 @@ func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) {
}, nil
}
func (mem *MemRepo) Cleanup() {
mem.api.Lock()
defer mem.api.Unlock()
if mem.tempDir != "" {
if err := os.RemoveAll(mem.tempDir); err != nil {
log.Errorw("cleanup test memrepo", "error", err)
}
mem.tempDir = ""
}
}
func (lmem *lockedMemRepo) Readonly() bool {
return false
}
@ -231,20 +250,12 @@ func (lmem *lockedMemRepo) Close() error {
return ErrClosedRepo
}
if lmem.tempDir != "" {
if err := os.RemoveAll(lmem.tempDir); err != nil {
return err
}
lmem.tempDir = ""
}
lmem.mem.token = nil
lmem.mem.api.Lock()
lmem.mem.api.ma = nil
lmem.mem.api.Unlock()
<-lmem.mem.repoLock // unlock
return nil
}
func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Batching, error) {

View File

@ -3,14 +3,11 @@ package storage
import (
"bytes"
"context"
"sync"
"time"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/specs-storage/storage"
"github.com/hashicorp/go-multierror"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
@ -207,10 +204,10 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
return bitfield.BitField{}, err
}
sectors := make(map[abi.SectorNumber]struct{})
sectors := make(map[abi.SectorNumber]cid.Cid)
var tocheck []storage.SectorRef
for _, info := range sectorInfos {
sectors[info.SectorNumber] = struct{}{}
sectors[info.SectorNumber] = info.SealedCID
tocheck = append(tocheck, storage.SectorRef{
ProofType: info.SealProof,
ID: abi.SectorID{
@ -221,12 +218,11 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
}
bad, err := s.faultTracker.CheckProvable(ctx, s.proofType, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, error) {
for _, sector := range sectorInfos {
if sector.SectorNumber == id.Number {
return sector.SealedCID, nil
scid, ok := sectors[id.Number]
if !ok {
return cid.Undef, xerrors.Errorf("sealed CID not found")
}
}
return cid.Undef, xerrors.Errorf("cann't get commr for sector %d", id.Number)
return scid, nil
})
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("checking provable sectors: %w", err)
@ -557,13 +553,8 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
return nil, err
}
var berr error
batchCtx, batchAbort := context.WithCancel(ctx)
defer batchAbort()
// Generate proofs in batches
var batchWg sync.WaitGroup
posts := make([]miner.SubmitWindowedPoStParams, len(partitionBatches))
posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches))
for batchIdx, batch := range partitionBatches {
batchPartitionStartIdx := 0
for _, batch := range partitionBatches[:batchIdx] {
@ -576,10 +567,6 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
Proofs: nil,
}
batchWg.Add(1)
go func(ctx context.Context, batchIdx int, batch []api.Partition, batchPartitionStartIdx int, params miner.SubmitWindowedPoStParams) {
defer batchWg.Done()
postSkipped := bitfield.New()
somethingToProve := false
@ -588,85 +575,53 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
skipCount := uint64(0)
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
var partitionWg sync.WaitGroup
var partitionLk sync.Mutex
for partIdx, partition := range batch {
// Get sectors info in parallel
partitionWg.Add(1)
go func(partIdx int, partition api.Partition) {
defer partitionWg.Done()
cbFailed := func(err error) {
batchAbort()
log.Warn("compute post batch:", batchIdx, "parttion:", partIdx, " failed:", err)
berr = multierror.Append(berr, err)
return
}
// TODO: Can do this in parallel
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
if err != nil {
cbFailed(xerrors.Errorf("removing faults from set of sectors to prove: %w", err))
return
return nil, xerrors.Errorf("removing faults from set of sectors to prove: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
if err != nil {
cbFailed(xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err))
return
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}
good, err := s.checkSectors(ctx, toProve, ts.Key())
if err != nil {
cbFailed(xerrors.Errorf("checking sectors to skip: %w", err))
return
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
cbFailed(xerrors.Errorf("toProve - postSkipped: %w", err))
return
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
cbFailed(xerrors.Errorf("toProve - good: %w", err))
return
return nil, xerrors.Errorf("toProve - good: %w", err)
}
sc, err := skipped.Count()
if err != nil {
cbFailed(xerrors.Errorf("getting skipped sector count: %w", err))
return
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
partitionLk.Lock()
skipCount += sc
partitionLk.Unlock()
log.Infow("skipped sectors", "batch", batchIdx, "partition", partIdx, "sectors count", sc)
ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
cbFailed(xerrors.Errorf("batch: %d getting sorted sector info: %w", batchIdx, err))
return
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
if len(ssi) == 0 {
log.Warnf("getting sectors for proof batch: %d, sector info len: %d", batchIdx, len(ssi))
return
continue
}
partitionLk.Lock()
sinfos = append(sinfos, ssi...)
partitions = append(partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
partitionLk.Unlock()
}(partIdx, partition)
}
partitionWg.Wait()
//return when any partition fault
if berr != nil {
return
}
if len(sinfos) == 0 {
@ -679,16 +634,13 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount,
"batch", batchIdx)
"skipped", skipCount)
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("batch: %d get actor address: %w", batchIdx, err))
batchAbort()
return
return nil, err
}
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, append(abi.PoStRandomness{}, rand...))
@ -699,23 +651,17 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
berr = multierror.Append(berr, xerrors.Errorf("received no proofs back from generate window post"))
batchAbort()
return
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
headTs, err := s.api.ChainHead(ctx)
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("getting current head: %w", err))
batchAbort()
return
return nil, xerrors.Errorf("getting current head: %w", err)
}
checkRand, err := s.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err))
batchAbort()
return
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
if !bytes.Equal(checkRand, rand) {
@ -751,9 +697,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
berr = multierror.Append(berr, xerrors.Errorf("running window post failed: %w", err))
batchAbort()
return
return nil, xerrors.Errorf("running window post failed: %w", err)
}
// TODO: maybe mark these as faulty somewhere?
@ -765,11 +709,9 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
berr = multierror.Append(berr, ctx.Err())
return
return nil, ctx.Err()
}
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
@ -777,17 +719,10 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
log.Warnf("nothing to prove for batch: %d", batchIdx)
return
continue
}
posts[batchIdx] = params
}(batchCtx, batchIdx, batch, batchPartitionStartIdx, params)
}
batchWg.Wait()
if berr != nil {
return nil, berr
posts = append(posts, params)
}
return posts, nil

View File

@ -3,6 +3,7 @@ package storage
import (
"bytes"
"context"
"github.com/filecoin-project/specs-actors/v6/actors/runtime/proof"
"testing"
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
@ -116,6 +117,14 @@ func (m *mockStorageMinerAPI) GasEstimateFeeCap(context.Context, *types.Message,
type mockProver struct {
}
func (m *mockProver) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte) ([]proof.PoStProof, error) {
panic("implement me")
}
func (m *mockProver) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) {
panic("implement me")
}
func (m *mockProver) GenerateWinningPoSt(context.Context, abi.ActorID, []proof2.SectorInfo, abi.PoStRandomness) ([]proof2.PoStProof, error) {
panic("implement me")
}

View File

@ -429,8 +429,8 @@ github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.m
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
github.com/filecoin-project/go-paramfetch v0.0.2 h1:a6W3Ij6CKhwHYYlx+5mqvBIyw4CabZH2ojdEaoAZ6/g=
github.com/filecoin-project/go-paramfetch v0.0.2/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53 h1:+nripp+UI/rhl01w9Gs4V0XDGaVPYPMGU/D/gNVLue0=
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200928172055-2df22083d8ab/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=