Merge branch 'feat/decoupling-post-worker' of github.com:wusirdcenter/lotus into feat/decoupling-post-worker

This commit is contained in:
Łukasz Magiera 2022-01-10 15:06:04 +01:00
commit cf7dd36829
31 changed files with 1473 additions and 230 deletions

46
RunPoStWorkerManual.md Normal file
View File

@ -0,0 +1,46 @@
### Environment variables
Ensure that workers have access to the following environment variables when they run.
```
export TMPDIR=/fast/disk/folder3 # used when sealing
export MINER_API_INFO:<TOKEN>:/ip4/<miner_api_address>/tcp/<port>/http`
export BELLMAN_CPU_UTILIZATION=0.875 # optimal value depends on exact hardware
export FIL_PROOFS_MAXIMIZE_CACHING=1
export FIL_PROOFS_USE_GPU_COLUMN_BUILDER=1 # when GPU is available
export FIL_PROOFS_USE_GPU_TREE_BUILDER=1 # when GPU is available
export FIL_PROOFS_PARAMETER_CACHE=/fast/disk/folder # > 100GiB!
export FIL_PROOFS_PARENT_CACHE=/fast/disk/folder2 # > 50GiB!
# The following increases speed of PreCommit1 at the cost of using a full
# CPU core-complex rather than a single core.
# See https://github.com/filecoin-project/rust-fil-proofs/ and the
# "Worker co-location" section below.
export FIL_PROOFS_USE_MULTICORE_SDR=1
```
### Run the worker
```
lotus-worker run <flags>
```
These are old flags:
```
--addpiece enable addpiece (default: true)
--precommit1 enable precommit1 (32G sectors: 1 core, 128GiB RAM) (default: true)
--unseal enable unsealing (32G sectors: 1 core, 128GiB RAM) (default: true)
--precommit2 enable precommit2 (32G sectors: multiple cores, 96GiB RAM) (default: true)
--commit enable commit (32G sectors: multiple cores or GPUs, 128GiB RAM + 64GiB swap) (default: true)
```
We added two new flags:
```
--windowpost enable windowpost (default: false)
--winnningpost enable winningpost (default: false)
```
These post flags have priority over old flags. If you want this worker to be a window post machine, you can just set the windowpost flag to be `true`. Similar to winning post machine. If you set both of them to be `true`, it will be a window post machine.

View File

@ -154,6 +154,8 @@ type StorageMiner interface {
StorageLocal(ctx context.Context) (map[stores.ID]string, error) //perm:admin
StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) //perm:admin
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) //perm:admin
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
MarketListDeals(ctx context.Context) ([]MarketDeal, error) //perm:read
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read

View File

@ -7,9 +7,11 @@ import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
)
@ -33,20 +35,22 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error) //perm:admin
// storiface.WorkerCalls
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, postChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) //perm:admin
TaskDisable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -30,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
@ -806,6 +808,8 @@ type StorageMinerStruct struct {
StorageGetLocks func(p0 context.Context) (storiface.SectorLocks, error) `perm:"admin"`
StorageGetUrl func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) `perm:"admin"`
StorageInfo func(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageList func(p0 context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
@ -867,6 +871,10 @@ type WorkerStruct struct {
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
GenerateWindowPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) `perm:"admin"`
GenerateWinningPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) `perm:"admin"`
Info func(p0 context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
MoveStorage func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
@ -4733,6 +4741,17 @@ func (s *StorageMinerStub) StorageGetLocks(p0 context.Context) (storiface.Sector
return *new(storiface.SectorLocks), ErrNotSupported
}
func (s *StorageMinerStruct) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
if s.Internal.StorageGetUrl == nil {
return "", ErrNotSupported
}
return s.Internal.StorageGetUrl(p0, p1, p2)
}
func (s *StorageMinerStub) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
return "", ErrNotSupported
}
func (s *StorageMinerStruct) StorageInfo(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) {
if s.Internal.StorageInfo == nil {
return *new(stores.StorageInfo), ErrNotSupported
@ -4975,6 +4994,28 @@ func (s *WorkerStub) GenerateSectorKeyFromData(p0 context.Context, p1 storage.Se
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
if s.Internal.GenerateWindowPoSt == nil {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
}
return s.Internal.GenerateWindowPoSt(p0, p1, p2, p3, p4, p5, p6)
}
func (s *WorkerStub) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
}
func (s *WorkerStruct) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
if s.Internal.GenerateWinningPoSt == nil {
return *new([]proof.PoStProof), ErrNotSupported
}
return s.Internal.GenerateWinningPoSt(p0, p1, p2, p3, p4)
}
func (s *WorkerStub) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffiwrapper.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
return *new([]proof.PoStProof), ErrNotSupported
}
func (s *WorkerStruct) Info(p0 context.Context) (storiface.WorkerInfo, error) {
if s.Internal.Info == nil {
return *new(storiface.WorkerInfo), ErrNotSupported

View File

@ -173,6 +173,17 @@ var runCmd = &cli.Command{
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
&cli.BoolFlag{
Name: "windowpost",
Usage: "enable window post",
Value: false,
},
&cli.BoolFlag{
Name: "winningpost",
Usage: "enable winning post",
Value: false,
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
@ -269,6 +280,14 @@ var runCmd = &cli.Command{
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if cctx.Bool("windowpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
}
if cctx.Bool("winningpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
}

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 52d80081bfdd8a30bc44bcfe44cb0f299615b9f3
Subproject commit 8e377f906ae40239d71979cd2dd20f3c89952e3c

View File

@ -4,8 +4,6 @@ import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"golang.org/x/xerrors"
@ -14,6 +12,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -26,11 +25,6 @@ type FaultTracker interface {
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
var bad = make(map[abi.SectorID]string)
ssize, err := pp.SectorSize()
if err != nil {
return nil, err
}
// TODO: More better checks
for _, sector := range sectors {
err := func() error {
@ -48,7 +42,7 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
lp, _, err := m.localStore.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
lp, _, err := m.storage.AcquireSectorPaths(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
@ -61,30 +55,6 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
toCheck := map[string]int64{
lp.Sealed: 1,
filepath.Join(lp.Cache, "p_aux"): 0,
}
addCachePathsForSectorSize(toCheck, lp.Cache, ssize)
for p, sz := range toCheck {
st, err := os.Stat(p)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "err", err)
bad[sector.ID] = fmt.Sprintf("%s", err)
return nil
}
if sz != 0 {
if st.Size() != int64(ssize)*sz {
log.Warnw("CheckProvable Sector FAULT: sector file is wrong size", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "size", st.Size(), "expectSize", int64(ssize)*sz)
bad[sector.ID] = fmt.Sprintf("%s is wrong size (got %d, expect %d)", p, st.Size(), int64(ssize)*sz)
return nil
}
}
}
if rg != nil {
wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
@ -111,16 +81,20 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
_, err = ffi.GenerateSingleVanillaProof(ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
SealProof: sector.ProofType,
SectorNumber: sector.ID.Number,
SealedCID: commr,
psi := ffiwrapper.PrivateSectorInfo{
Psi: ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
SealProof: sector.ProofType,
SectorNumber: sector.ID.Number,
SealedCID: commr,
},
CacheDirPath: lp.Cache,
PoStProofType: wpp,
SealedSectorPath: lp.Sealed,
},
CacheDirPath: lp.Cache,
PoStProofType: wpp,
SealedSectorPath: lp.Sealed,
}, ch.Challenges[sector.ID.Number])
}
_, err = m.storage.GenerateSingleVanillaProof(ctx, sector.ID.Miner, &psi, ch.Challenges[sector.ID.Number])
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
@ -138,25 +112,4 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return bad, nil
}
func addCachePathsForSectorSize(chk map[string]int64, cacheDir string, ssize abi.SectorSize) {
switch ssize {
case 2 << 10:
fallthrough
case 8 << 20:
fallthrough
case 512 << 20:
chk[filepath.Join(cacheDir, "sc-02-data-tree-r-last.dat")] = 0
case 32 << 30:
for i := 0; i < 8; i++ {
chk[filepath.Join(cacheDir, fmt.Sprintf("sc-02-data-tree-r-last-%d.dat", i))] = 0
}
case 64 << 30:
for i := 0; i < 16; i++ {
chk[filepath.Join(cacheDir, fmt.Sprintf("sc-02-data-tree-r-last-%d.dat", i))] = 0
}
default:
log.Warnf("not checking cache files of %s sectors for faults", ssize)
}
}
var _ FaultTracker = &Manager{}

View File

@ -84,3 +84,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, exis
return out, done, nil
}
func (b *Provider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return b.AcquireSector(ctx, id, existing, 0, ptype)
}

View File

@ -6,12 +6,13 @@ import (
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -33,6 +34,8 @@ type Storage interface {
UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
WorkerProver
}
type Verifier interface {
@ -56,6 +59,41 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
type MinerProver interface {
PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (SortedPrivateSectorInfo, []abi.SectorID, func(), error)
SplitSortedPrivateSectorInfo(ctx context.Context, privsector SortedPrivateSectorInfo, offset int, end int) (SortedPrivateSectorInfo, error)
GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*FallbackChallenges, error)
}
type WindowPoStResult struct {
PoStProofs proof.PoStProof
Skipped []abi.SectorID
}
// type SortedPrivateSectorInfo ffi.SortedPrivateSectorInfo
type FallbackChallenges struct {
Fc ffi.FallbackChallenges
}
type SortedPrivateSectorInfo struct {
Spsi ffi.SortedPrivateSectorInfo
}
type PrivateSectorInfo struct {
Psi ffi.PrivateSectorInfo
}
type WorkerProver interface {
GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error)
GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error)
}
type WorkerCalls interface {
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *FallbackChallenges) (WindowPoStResult, error)
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *FallbackChallenges) ([]proof.PoStProof, error)
}

View File

@ -12,6 +12,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
@ -141,7 +142,83 @@ func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoS
return ffi.VerifyWindowPoSt(info)
}
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, mid abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
randomness[31] &= 0x3f
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
return ffi.GenerateWinningPoStSectorChallenge(proofType, mid, randomness, eligibleSectorCount)
}
func (sb *Sealer) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var doneFuncs []func()
done := func() {
for _, df := range doneFuncs {
df()
}
}
var skipped []abi.SectorID
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSectorPaths(ctx, sid, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)
postProofType, err := rpt(s.SealProof)
if err != nil {
done()
return SortedPrivateSectorInfo{}, nil, nil, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
privsectors := ffi.NewSortedPrivateSectorInfo(out...)
return SortedPrivateSectorInfo{Spsi: privsectors}, skipped, done, nil
}
func (sb *Sealer) GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*FallbackChallenges, error) {
fc, err := ffi.GeneratePoStFallbackSectorChallenges(proofType, minerID, randomness, sectorIds)
return &FallbackChallenges{
Fc: *fc,
}, err
}
func (sb *Sealer) SplitSortedPrivateSectorInfo(ctx context.Context, privsector SortedPrivateSectorInfo, offset int, end int) (SortedPrivateSectorInfo, error) {
Spsi, err := ffi.SplitSortedPrivateSectorInfo(ctx, privsector.Spsi, offset, end)
return SortedPrivateSectorInfo{Spsi: Spsi}, err
}
func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof5.PoStProof, error) {
return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas)
}
func (sb *Sealer) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error) {
pp, err := ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx))
return &proof.PoStProof{
PoStProof: pp.PoStProof,
ProofBytes: pp.ProofBytes,
}, err
}

View File

@ -42,6 +42,8 @@ type Worker interface {
Session(context.Context) (uuid.UUID, error)
Close() error // TODO: do we need this?
ffiwrapper.WorkerCalls
}
type SectorManager interface {
@ -49,6 +51,8 @@ type SectorManager interface {
storage.Prover
storiface.WorkerReturn
FaultTracker
ffiwrapper.MinerProver
}
var ClosedWorkerID = uuid.UUID{}
@ -73,6 +77,8 @@ type Manager struct {
results map[WorkID]result
waitRes map[WorkID]chan struct{}
ffiwrapper.MinerProver
}
type result struct {
@ -116,7 +122,7 @@ type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, remote: stor})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -137,6 +143,8 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
callRes: map[storiface.CallID]chan result{},
results: map[WorkID]result{},
waitRes: map[WorkID]chan struct{}{},
MinerProver: prover,
}
m.setupWorkTracker()

207
extern/sector-storage/manager_post.go vendored Normal file
View File

@ -0,0 +1,207 @@
package sectorstorage
import (
"context"
"sync"
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
builtin6 "github.com/filecoin-project/specs-actors/v6/actors/builtin"
"github.com/hashicorp/go-multierror"
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
)
func (m *Manager) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
if !m.sched.winningPoStSched.CanSched(ctx) {
log.Info("GenerateWinningPoSt run at lotus-miner")
return m.Prover.GenerateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
randomness[31] &= 0x3f
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof)
if err != nil {
return nil, err
}
defer done()
if len(skipped) > 0 {
return nil, xerrors.Errorf("pubSectorToPriv skipped sectors: %+v", skipped)
}
var sectorNums []abi.SectorNumber
for _, s := range ps.Spsi.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Spsi.Values()[0].PoStProofType, minerID, randomness, sectorNums)
if err != nil {
return nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
var proofs []proof.PoStProof
err = m.sched.winningPoStSched.Schedule(ctx, false, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWinningPoSt(ctx, minerID, ps, randomness, postChallenges)
if err != nil {
return err
}
proofs = out
return nil
})
if err != nil {
return nil, err
}
return proofs, nil
}
func (m *Manager) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) {
if !m.sched.windowPoStSched.CanSched(ctx) {
log.Info("GenerateWindowPoSt run at lotus-miner")
return m.Prover.GenerateWindowPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWindowPoSt(ctx, minerID, sectorInfo, randomness)
}
func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, []abi.SectorID, error) {
var retErr error = nil
randomness[31] &= 0x3f
out := make([]proof.PoStProof, 0)
if len(sectorInfo) == 0 {
return nil, nil, xerrors.New("generate window post len(sectorInfo)=0")
}
//get window proof type
proofType, err := abi.RegisteredSealProof.RegisteredWindowPoStProof(sectorInfo[0].SealProof)
if err != nil {
return nil, nil, err
}
// Get sorted and de-duplicate sectors info
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
return nil, nil, xerrors.Errorf("PubSectorToPriv failed: %+v", err)
}
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("skipped = %d", len(skipped))
}
defer done()
partitionSectorsCount, err := builtin6.PoStProofWindowPoStPartitionSectors(proofType)
if err != nil {
return nil, nil, xerrors.Errorf("get sectors count of partition failed:%+v", err)
}
// The partitions number of this batch
partitionCount := (len(ps.Spsi.Values()) + int(partitionSectorsCount) - 1) / int(partitionSectorsCount)
log.Infof("generateWindowPoSt len(partitionSectorsCount):%d len(partitionCount):%d \n", partitionSectorsCount, partitionCount)
var faults []abi.SectorID
var flk sync.Mutex
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var sectorNums []abi.SectorNumber
for _, s := range ps.Spsi.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Spsi.Values()[0].PoStProofType, minerID, randomness, sectorNums)
if err != nil {
return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
proofList := make([]proof.PoStProof, partitionCount)
var wg sync.WaitGroup
for i := 0; i < partitionCount; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
proof, faultsectors, err := m.generatePartitionWindowPost(cctx, minerID, n, int(partitionSectorsCount), partitionCount, ps, randomness, postChallenges)
if err != nil {
retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", i, err))
if len(faultsectors) > 0 {
log.Errorf("generateWindowPost groupCount:%d, faults:%d, err: %+v", i, len(faultsectors), err)
flk.Lock()
faults = append(faults, faultsectors...)
flk.Unlock()
}
cancel()
return
}
proofList[n] = proof
}(i)
time.Sleep(1 * time.Second)
}
wg.Wait()
pl := make([]ffi.PartitionProof, 0)
for i, pp := range proofList {
pl[i] = ffi.PartitionProof(pp)
}
postProofs, err := ffi.MergeWindowPoStPartitionProofs(proofType, pl)
if err != nil {
return nil, nil, xerrors.Errorf("merge windowPoSt partition proofs: %v", err)
}
if len(faults) > 0 {
log.Warnf("GenerateWindowPoSt get faults: %d", len(faults))
return out, faults, retErr
}
out = append(out, *postProofs)
return out, skipped, retErr
}
func (m *Manager) generatePartitionWindowPost(ctx context.Context, minerID abi.ActorID, index int, psc int, groupCount int, ps ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, postChallenges *ffiwrapper.FallbackChallenges) (proof.PoStProof, []abi.SectorID, error) {
var faults []abi.SectorID
start := index * psc
end := (index + 1) * psc
if index == groupCount-1 {
end = len(ps.Spsi.Values())
}
log.Infow("generateWindowPost", "start", start, "end", end, "index", index)
privsectors, err := m.SplitSortedPrivateSectorInfo(ctx, ps, start, end)
if err != nil {
return proof.PoStProof{}, faults, xerrors.Errorf("generateWindowPost GetScopeSortedPrivateSectorInfo failed: %w", err)
}
var result *ffiwrapper.WindowPoStResult
err = m.sched.windowPoStSched.Schedule(ctx, true, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWindowPoSt(ctx, minerID, privsectors, index, start, randomness, postChallenges)
if err != nil {
return err
}
result = &out
return nil
})
if err != nil {
return proof.PoStProof{}, faults, err
}
if len(result.Skipped) > 0 {
log.Warnf("generateWindowPost partition:%d, get faults:%d", index, len(result.Skipped))
return proof.PoStProof{}, result.Skipped, xerrors.Errorf("generatePartitionWindowPoStProofs partition:%d get faults:%d", index, len(result.Skipped))
}
return result.PoStProofs, faults, err
}

View File

@ -553,6 +553,26 @@ func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callI
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionSectorsCount(ctx context.Context, prooftype abi.RegisteredPoStProof) (int, error) {
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionVanillaParams(ctx context.Context, proofType abi.RegisteredPoStProof) (string, error) {
panic("not supported")
}
func (mgr *SectorMgr) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffiwrapper.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
panic("not supported")
}
func (mgr *SectorMgr) SplitSortedPrivateSectorInfo(ctx context.Context, privsector ffiwrapper.SortedPrivateSectorInfo, offset int, end int) (ffiwrapper.SortedPrivateSectorInfo, error) {
panic("not supported")
}
func (mgr *SectorMgr) GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*ffiwrapper.FallbackChallenges, error) {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -12,8 +12,9 @@ import (
)
type readonlyProvider struct {
index stores.SectorIndex
stor *stores.Local
index stores.SectorIndex
stor *stores.Local
remote *stores.Remote
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
@ -38,3 +39,23 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorR
return p, cancel, err
}
func (l *readonlyProvider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
ctx, cancel := context.WithCancel(ctx)
// use TryLock to avoid blocking
locked, err := l.index.StorageTryLock(ctx, id.ID, existing, storiface.FTNone)
if err != nil {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
if !locked {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
}
p, _, err := l.remote.AcquireSectorPaths(ctx, id, existing, storiface.FTNone, sealing)
return p, cancel, err
}

View File

@ -68,6 +68,11 @@ type scheduler struct {
info chan func(interface{})
// window scheduler
windowPoStSched *poStScheduler
// winning scheduler
winningPoStSched *poStScheduler
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
@ -83,6 +88,8 @@ type workerHandle struct {
lk sync.Mutex // can be taken inside sched.workersLk.RLock
acceptTasks map[sealtasks.TaskType]struct{}
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*schedWindow
@ -162,6 +169,9 @@ func newScheduler() *scheduler {
info: make(chan func(interface{})),
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
@ -547,6 +557,9 @@ func (sh *scheduler) schedClose() {
for i, w := range sh.workers {
sh.workerCleanup(i, w)
}
sh.windowPoStSched.schedClose()
sh.winningPoStSched.schedClose()
}
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {

254
extern/sector-storage/sched_post.go vendored Normal file
View File

@ -0,0 +1,254 @@
package sectorstorage
import (
"context"
"sort"
"sync"
"time"
sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
xerrors "golang.org/x/xerrors"
)
type poStScheduler struct {
lk sync.RWMutex
workers map[storiface.WorkerID]*workerHandle
cond *sync.Cond
postType sealtasks.TaskType
GPUUtilization float64
}
func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
ps := &poStScheduler{
workers: map[storiface.WorkerID]*workerHandle{},
postType: t,
GPUUtilization: storiface.GPUUtilizationProof,
}
ps.cond = sync.NewCond(&ps.lk)
return ps
}
func (ps *poStScheduler) AddWorker(wid storiface.WorkerID, w *workerHandle) bool {
if _, ok := w.acceptTasks[ps.postType]; !ok {
return false
}
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid] = w
go ps.watch(wid, w)
ps.cond.Broadcast()
return true
}
func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *workerHandle {
ps.lk.Lock()
defer ps.lk.Unlock()
var w *workerHandle = nil
if wh, ok := ps.workers[wid]; ok {
w = wh
delete(ps.workers, wid)
}
return w
}
func (ps *poStScheduler) CanSched(ctx context.Context) bool {
ps.lk.RLock()
defer ps.lk.RUnlock()
if len(ps.workers) == 0 {
return false
}
for _, w := range ps.workers {
if w.enabled {
return true
}
}
return false
}
func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, work WorkerAction) error {
ps.lk.Lock()
defer ps.lk.Unlock()
if len(ps.workers) == 0 {
return xerrors.Errorf("cant find %s post worker", ps.postType)
}
// Get workers by resource
canDo, accepts := ps.canHandleWorkers()
for !canDo {
//if primary is true, it must be dispatched to a worker
if primary {
ps.cond.Wait()
canDo, accepts = ps.canHandleWorkers()
} else {
return xerrors.Errorf("cant find %s post worker", ps.postType)
}
}
return ps.withResources(accepts[0], func(worker Worker) error {
ps.lk.Unlock()
defer ps.lk.Lock()
return work(ctx, worker)
})
}
func (ps *poStScheduler) canHandleWorkers() (bool, []storiface.WorkerID) {
var accepts []storiface.WorkerID
//if the gpus of the worker are insufficient or its disable, it cannot be scheduled
for wid, wr := range ps.workers {
if wr.active.gpuUsed >= float64(len(wr.info.Resources.GPUs)) || !wr.enabled {
continue
}
accepts = append(accepts, wid)
}
freeGPU := func(i int) float64 {
w := ps.workers[accepts[i]]
return float64(len(w.info.Resources.GPUs)) - w.active.gpuUsed
}
sort.Slice(accepts[:], func(i, j int) bool {
return freeGPU(i) > freeGPU(j)
})
if len(accepts) == 0 {
return false, accepts
}
return true, accepts
}
func (ps *poStScheduler) withResources(wid storiface.WorkerID, cb func(wid Worker) error) error {
ps.addResource(wid)
worker := ps.workers[wid].workerRpc
err := cb(worker)
ps.freeResource(wid)
if ps.cond != nil {
ps.cond.Broadcast()
}
return err
}
func (ps *poStScheduler) freeResource(wid storiface.WorkerID) {
if _, ok := ps.workers[wid]; !ok {
log.Warnf("release PoSt Worker not found worker")
return
}
if ps.workers[wid].active.gpuUsed > 0 {
ps.workers[wid].active.gpuUsed -= ps.GPUUtilization
}
return
}
func (ps *poStScheduler) addResource(wid storiface.WorkerID) {
ps.workers[wid].active.gpuUsed += ps.GPUUtilization
}
func (ps *poStScheduler) disable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = false
}
func (ps *poStScheduler) enable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = true
}
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) {
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
defer heartbeatTimer.Stop()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer close(worker.closedMgr)
defer func() {
log.Warnw("Worker closing", "WorkerID", wid)
ps.delWorker(wid)
}()
for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := worker.workerRpc.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
ps.disable(wid)
select {
case <-heartbeatTimer.C:
continue
case <-worker.closingMgr:
return
}
}
if storiface.WorkerID(curSes) != wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", wid, "current", curSes)
}
return
}
ps.enable(wid)
}
}
func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
default:
close(w.closingMgr)
}
ps.lk.Unlock()
select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
}
ps.lk.Lock()
}
func (ps *poStScheduler) schedClose() {
ps.lk.Lock()
defer ps.lk.Unlock()
log.Debugf("closing scheduler")
for i, w := range ps.workers {
ps.workerCleanup(i, w)
}
}
func (ps *poStScheduler) WorkerStats(cb func(wid storiface.WorkerID, worker *workerHandle)) {
ps.lk.RLock()
defer ps.lk.RUnlock()
for id, w := range ps.workers {
cb(id, w)
}
}

View File

@ -16,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -134,6 +136,14 @@ func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id st
panic("implement me")
}
func (s *schedTestWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
panic("implement me")
}
func (s *schedTestWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
panic("implement me")
}
func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return s.taskTypes, nil
}

View File

@ -38,6 +38,10 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
tasks, err := w.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting worker tasks: %w", err)
}
worker := &workerHandle{
workerRpc: w,
@ -47,12 +51,20 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
active: &activeResources{},
enabled: true,
acceptTasks: tasks,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
}
wid := storiface.WorkerID(sessID)
//add worker to post scheduler
if sh.windowPoStSched.AddWorker(wid, worker) ||
sh.winningPoStSched.AddWorker(wid, worker) {
return nil
}
sh.workersLk.Lock()
_, exist := sh.workers[wid]
if exist {

View File

@ -18,6 +18,9 @@ const (
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
)
var order = map[TaskType]int{
@ -33,6 +36,9 @@ var order = map[TaskType]int{
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2, // most priority
TTGenerateWindowPoSt: -3,
TTGenerateWinningPoSt: -4,
}
var shortNames = map[TaskType]string{
@ -52,6 +58,9 @@ var shortNames = map[TaskType]string{
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTGenerateWindowPoSt: "WDP",
TTGenerateWinningPoSt: "WNP",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {

View File

@ -10,11 +10,10 @@ import (
func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
m.sched.workersLk.RLock()
defer m.sched.workersLk.RUnlock()
out := map[uuid.UUID]storiface.WorkerStats{}
for id, handle := range m.sched.workers {
cb := func(id storiface.WorkerID, handle *workerHandle) {
handle.lk.Lock()
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
@ -28,6 +27,15 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
handle.lk.Unlock()
}
for id, handle := range m.sched.workers {
cb(id, handle)
}
m.sched.workersLk.RUnlock()
//list post workers
m.sched.winningPoStSched.WorkerStats(cb)
m.sched.windowPoStSched.WorkerStats(cb)
return out
}

View File

@ -2,6 +2,7 @@ package stores
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strconv"
@ -10,6 +11,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -56,6 +58,9 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
//for post vanilla
mux.HandleFunc("/remote/vanilla/single", handler.generateSingleVanillaProof).Methods("POST")
mux.ServeHTTP(w, r)
}
@ -298,3 +303,35 @@ func ftFromString(t string) (storiface.SectorFileType, error) {
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}
type SingleVanillaParams struct {
PrivSector ffi.PrivateSectorInfo
Challenge []uint64
}
func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
params := SingleVanillaParams{}
err = json.Unmarshal(body, &params)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
vanilla, err := ffi.GenerateSingleVanillaProof(params.PrivSector, params.Challenge)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
_, err = w.Write(vanilla)
if err != nil {
log.Error("response writer: ", err)
}
}

View File

@ -42,6 +42,8 @@ type StorageInfo struct {
Groups []Group
AllowTo []Group
// storage path
Path string
}
type HealthReport struct {
@ -58,6 +60,9 @@ type SectorStorageInfo struct {
CanStore bool
Primary bool
// storage path
Path string
}
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/index.go -package=mocks . SectorIndex
@ -79,6 +84,9 @@ type SectorIndex interface { // part of storage-miner api
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error)
StorageList(ctx context.Context) (map[ID][]Decl, error)
// get sector storage url for post worker
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error)
}
type Decl struct {
@ -177,6 +185,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo
i.stores[si.ID].info.Path = si.Path
return nil
}
i.stores[si.ID] = &storageEntry{
@ -350,6 +360,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: isprimary[id],
Path: st.info.Path,
})
}
@ -419,6 +431,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: false,
Path: st.info.Path,
})
}
}
@ -522,4 +536,49 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]ID,
return out, nil
}
func (i *Index) StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) {
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := map[ID]uint64{}
for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 {
continue
}
for _, id := range i.sectors[Decl{s, pathType}] {
storageIDs[id.storage]++
}
}
storages := make([]StorageInfo, 0, len(storageIDs))
for id := range storageIDs {
st, ok := i.stores[id]
if !ok {
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
continue
}
urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
urls[k] = u
}
if st.info.CanStore {
storages = append(storages, StorageInfo{
URLs: urls,
})
}
}
if len(storages) == 0 {
return "", xerrors.New("cant find sector storage")
}
if len(storages[0].URLs) == 0 {
return "", xerrors.New("sector storage url is nil")
}
return storages[0].URLs[0], nil
}
var _ SectorIndex = &Index{}

View File

@ -5,6 +5,7 @@ import (
"os"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/specs-storage/storage"
@ -36,6 +37,8 @@ type PartialFileHandler interface {
type Store interface {
AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool, keepIn []ID) error
// like remove, but doesn't remove the primary sector copy, nor the last
@ -48,4 +51,6 @@ type Store interface {
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error)
GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -221,6 +222,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
CanStore: meta.CanStore,
Groups: meta.Groups,
AllowTo: meta.AllowTo,
Path: p,
}, fst)
if err != nil {
return xerrors.Errorf("declaring storage in index: %w", err)
@ -718,4 +720,11 @@ func (st *Local) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
return p.stat(st.localStorage)
}
func (st *Local) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error) {
return nil, nil
}
func (st *Local) AcquireSectorPaths(ctx context.Context, sid storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
return st.AcquireSector(ctx, sid, existing, allocate, pathType, storiface.AcquireCopy)
}
var _ Store = &Local{}

View File

@ -197,3 +197,18 @@ func (mr *MockSectorIndexMockRecorder) StorageTryLock(arg0, arg1, arg2, arg3 int
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageTryLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageTryLock), arg0, arg1, arg2, arg3)
}
// StorageGetUrl mocks base method.
func (m *MockSectorIndex) StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageGetUrl", ctx, s, ft)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageGetUrl indicates an expected call of StorageGetUrl.
func (mr *MockSectorIndexMockRecorder) StorageGetUrl(ctx, s, ft interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageGetUrl", reflect.TypeOf((*MockSectorIndex)(nil).StorageGetUrl), ctx, s, ft)
}

View File

@ -8,7 +8,9 @@ import (
context "context"
reflect "reflect"
ffi "github.com/filecoin-project/filecoin-ffi"
abi "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -126,3 +128,64 @@ func (mr *MockStoreMockRecorder) Reserve(arg0, arg1, arg2, arg3, arg4 interface{
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reserve", reflect.TypeOf((*MockStore)(nil).Reserve), arg0, arg1, arg2, arg3, arg4)
}
// GenerateWindowPoStVanilla mocks base method.
func (m *MockStore) GenerateWindowPoStVanilla(ctx context.Context, minerID abi.ActorID, privsector *ffi.PrivateSectorInfo, vanillaParams string, randomness abi.PoStRandomness) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateWindowPoStVanilla", ctx, minerID, privsector, vanillaParams, randomness)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenerateWindowPoStVanilla indicates an expected call of GenerateWindowPoStVanilla.
func (mr *MockStoreMockRecorder) GenerateWindowPoStVanilla(ctx, minerID, privsector, vanillaParams, randomness interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWindowPoStVanilla", reflect.TypeOf((*MockStore)(nil).GenerateWindowPoStVanilla), ctx, minerID, privsector, vanillaParams, randomness)
}
// GenerateWinningPoStVanilla mocks base method.
func (m *MockStore) GenerateWinningPoStVanilla(ctx context.Context, minerID abi.ActorID, privsector *ffi.PrivateSectorInfo, randomness abi.PoStRandomness) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateWinningPoStVanilla", ctx, minerID, privsector, randomness)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenerateWinningPoStVanilla indicates an expected call of GenerateWinningPoStVanilla.
func (mr *MockStoreMockRecorder) GenerateWinningPoStVanilla(ctx, minerID, privsector, randomness interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWinningPoStVanilla", reflect.TypeOf((*MockStore)(nil).GenerateWinningPoStVanilla), ctx, minerID, privsector, randomness)
}
// AcquireSectorPaths mocks base method.
func (m *MockStore) AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AcquireSectorPaths", ctx, s, existing, allocate, sealing)
ret0, _ := ret[0].(storiface.SectorPaths)
ret1, _ := ret[1].(storiface.SectorPaths)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// AcquireSectorPaths indicates an expected call of AcquireSectorPaths.
func (mr *MockStoreMockRecorder) AcquireSectorPaths(ctx, s, existing, allocate, sealing interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireSectorPaths", reflect.TypeOf((*MockStore)(nil).AcquireSectorPaths), ctx, s, existing, allocate, sealing)
}
// GenerateSingleVanillaProof mocks base method.
func (m *MockStore) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challange []uint64) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateSingleVanillaProof", ctx, minerID, privsector, challange)
ret0, _ := ret[0].([]byte)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenerateSingleVanillaProof indicates an expected call of GenerateSingleVanillaProof.
func (mr *MockStoreMockRecorder) GenerateSingleVanillaProof(ctx, minerID, privsector, challange interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateSingleVanillaProof", reflect.TypeOf((*MockStore)(nil).GenerateSingleVanillaProof), ctx, minerID, privsector, challange)
}

View File

@ -14,8 +14,10 @@ import (
gopath "path"
"path/filepath"
"sort"
"strings"
"sync"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
@ -738,6 +740,140 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}, nil
}
func (r *Remote) AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
for {
r.fetchLk.Lock()
c, locked := r.fetching[s.ID]
if !locked {
r.fetching[s.ID] = make(chan struct{})
r.fetchLk.Unlock()
break
}
r.fetchLk.Unlock()
select {
case <-c:
continue
case <-ctx.Done():
return storiface.SectorPaths{}, storiface.SectorPaths{}, ctx.Err()
}
}
defer func() {
r.fetchLk.Lock()
close(r.fetching[s.ID])
delete(r.fetching, s.ID)
r.fetchLk.Unlock()
}()
var paths storiface.SectorPaths
var stores storiface.SectorPaths
ssize, err := s.ProofType.SectorSize()
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 {
continue
}
sis, err := r.index.StorageFindSector(ctx, s.ID, fileType, ssize, false)
if err != nil {
log.Warnf("finding existing sector %d failed: %+v", s.ID, err)
continue
}
for _, si := range sis {
if (pathType == storiface.PathSealing) && !si.CanSeal {
continue
}
if (pathType == storiface.PathStorage) && !si.CanStore {
continue
}
spath := filepath.Join(si.Path, fileType.String(), storiface.SectorName(s.ID))
storiface.SetPathByType(&paths, fileType, spath)
storiface.SetPathByType(&stores, fileType, string(si.ID))
if si.CanStore {
existing ^= fileType
break
}
}
}
return paths, stores, nil
}
func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, challenge []uint64) ([]byte, error) {
sector := abi.SectorID{
Miner: minerID,
Number: privsector.Psi.SectorNumber,
}
storageUrl, err := r.index.StorageGetUrl(ctx, sector, storiface.FTCache|storiface.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector storage: %w", err)
}
surl, err := url.Parse(storageUrl)
if err != nil {
return nil, xerrors.Errorf("parse sector storage url failed : %w", err)
}
surl.Path = gopath.Join(surl.Path, "vanilla", "single")
requestParams := SingleVanillaParams{
PrivSector: privsector.Psi,
Challenge: challenge,
}
bytes, err := json.Marshal(requestParams)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", surl.String(), strings.NewReader(string(bytes)))
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.Error("response close: ", err)
}
}()
if resp.StatusCode != 200 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return nil, xerrors.Errorf("non-200 code: %w", string(body))
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return body, nil
}
var _ Store = &Remote{}
type funcCloser func() error

View File

@ -42,6 +42,7 @@ type Resources struct {
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
var GPUUtilizationProof float64 = 1.0
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64, gpus int) uint64 {

View File

@ -31,6 +31,18 @@ func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID,
panic("implement me")
}
func (t *testExec) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GetSectorVanillaParams(ctx context.Context, index int, partitionVanillaParams string) (string, error) {
panic("implement me")
}
func (t *testExec) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}

View File

@ -20,6 +20,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -155,6 +156,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorPaths(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
return storiface.SectorPaths{}, nil, nil
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
@ -508,6 +513,90 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef,
})
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) ([]proof.PoStProof, error) {
sb, err := l.executor()
if err != nil {
return nil, err
}
ps := privsectors.Spsi.Values()
vanillas := make([][]byte, len(ps))
var rerr error
var wg sync.WaitGroup
for i, v := range ps {
wg.Add(1)
go func(index int, sector ffi.PrivateSectorInfo) {
defer wg.Done()
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &ffiwrapper.PrivateSectorInfo{Psi: sector}, sectorChallenges.Fc.Challenges[sector.SectorNumber])
if err != nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila failed: %w", sector.SectorNumber, err))
return
}
if vanilla == nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila is nil", sector.SectorNumber))
}
vanillas[index] = vanilla
}(i, v)
}
wg.Wait()
if rerr != nil {
return nil, rerr
}
return sb.GenerateWinningPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, vanillas)
}
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffiwrapper.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
sb, err := l.executor()
if err != nil {
return ffiwrapper.WindowPoStResult{}, err
}
var slk sync.Mutex
var skipped []abi.SectorID
ps := privsectors.Spsi.Values()
out := make([][]byte, len(ps))
var wg sync.WaitGroup
for i := range ps {
wg.Add(1)
go func(index int) {
defer wg.Done()
sector := ps[index]
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &ffiwrapper.PrivateSectorInfo{Psi: sector}, sectorChallenges.Fc.Challenges[sector.SectorNumber])
if err != nil || vanilla == nil {
slk.Lock()
skipped = append(skipped, abi.SectorID{
Miner: mid,
Number: sector.SectorNumber,
})
slk.Unlock()
log.Errorf("get sector: %d, vanilla: %s, offset: %d", sector.SectorNumber, vanilla, offset+index)
return
}
out[index] = vanilla
}(i)
}
wg.Wait()
if len(skipped) > 0 {
return ffiwrapper.WindowPoStResult{PoStProofs: proof.PoStProof{}, Skipped: skipped}, nil
}
PoSts, err := sb.GenerateWindowPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, out, partitionIdx)
if err != nil {
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, err
}
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, nil
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()

View File

@ -3,11 +3,14 @@ package storage
import (
"bytes"
"context"
"sync"
"time"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/specs-storage/storage"
"github.com/hashicorp/go-multierror"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
@ -217,7 +220,14 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
})
}
bad, err := s.faultTracker.CheckProvable(ctx, s.proofType, tocheck, nil)
bad, err := s.faultTracker.CheckProvable(ctx, s.proofType, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, error) {
for _, sector := range sectorInfos {
if sector.SectorNumber == id.Number {
return sector.SealedCID, nil
}
}
return cid.Undef, xerrors.Errorf("cann't get commr for sector %d", id.Number)
})
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("checking provable sectors: %w", err)
}
@ -547,8 +557,13 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
return nil, err
}
var berr error
batchCtx, batchAbort := context.WithCancel(ctx)
defer batchAbort()
// Generate proofs in batches
posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches))
var batchWg sync.WaitGroup
posts := make([]miner.SubmitWindowedPoStParams, len(partitionBatches))
for batchIdx, batch := range partitionBatches {
batchPartitionStartIdx := 0
for _, batch := range partitionBatches[:batchIdx] {
@ -561,162 +576,218 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
Proofs: nil,
}
postSkipped := bitfield.New()
somethingToProve := false
batchWg.Add(1)
go func(ctx context.Context, batchIdx int, batch []api.Partition, batchPartitionStartIdx int, params miner.SubmitWindowedPoStParams) {
defer batchWg.Done()
// Retry until we run out of sectors to prove.
for retries := 0; ; retries++ {
skipCount := uint64(0)
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
for partIdx, partition := range batch {
// TODO: Can do this in parallel
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
postSkipped := bitfield.New()
somethingToProve := false
// Retry until we run out of sectors to prove.
for retries := 0; ; retries++ {
skipCount := uint64(0)
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
var partitionWg sync.WaitGroup
var partitionLk sync.Mutex
for partIdx, partition := range batch {
// Get sectors info in parallel
partitionWg.Add(1)
go func(partIdx int, partition api.Partition) {
defer partitionWg.Done()
cbFailed := func(err error) {
batchAbort()
log.Warn("compute post batch:", batchIdx, "parttion:", partIdx, " failed:", err)
berr = multierror.Append(berr, err)
return
}
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
if err != nil {
cbFailed(xerrors.Errorf("removing faults from set of sectors to prove: %w", err))
return
}
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
if err != nil {
cbFailed(xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err))
return
}
good, err := s.checkSectors(ctx, toProve, ts.Key())
if err != nil {
cbFailed(xerrors.Errorf("checking sectors to skip: %w", err))
return
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
cbFailed(xerrors.Errorf("toProve - postSkipped: %w", err))
return
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
cbFailed(xerrors.Errorf("toProve - good: %w", err))
return
}
sc, err := skipped.Count()
if err != nil {
cbFailed(xerrors.Errorf("getting skipped sector count: %w", err))
return
}
partitionLk.Lock()
skipCount += sc
partitionLk.Unlock()
log.Infow("skipped sectors", "batch", batchIdx, "partition", partIdx, "sectors count", sc)
ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
cbFailed(xerrors.Errorf("batch: %d getting sorted sector info: %w", batchIdx, err))
return
}
if len(ssi) == 0 {
log.Warnf("getting sectors for proof batch: %d, sector info len: %d", batchIdx, len(ssi))
return
}
partitionLk.Lock()
sinfos = append(sinfos, ssi...)
partitions = append(partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
partitionLk.Unlock()
}(partIdx, partition)
}
partitionWg.Wait()
//return when any partition fault
if berr != nil {
return
}
if len(sinfos) == 0 {
// nothing to prove for this batch
break
}
// Generate proof
log.Infow("running window post",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount,
"batch", batchIdx)
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, xerrors.Errorf("removing faults from set of sectors to prove: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
berr = multierror.Append(berr, xerrors.Errorf("batch: %d get actor address: %w", batchIdx, err))
batchAbort()
return
}
good, err := s.checkSectors(ctx, toProve, ts.Key())
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, append(abi.PoStRandomness{}, rand...))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
berr = multierror.Append(berr, xerrors.Errorf("received no proofs back from generate window post"))
batchAbort()
return
}
headTs, err := s.api.ChainHead(ctx)
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("getting current head: %w", err))
batchAbort()
return
}
checkRand, err := s.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err))
batchAbort()
return
}
if !bytes.Equal(checkRand, rand) {
log.Warnw("windowpost randomness changed", "old", rand, "new", checkRand, "ts-height", ts.Height(), "challenge-height", di.Challenge, "tsk", ts.Key())
rand = checkRand
continue
}
// If we generated an incorrect proof, try again.
if correct, err := s.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(checkRand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil {
log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue
} else if !correct {
log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions
params.Proofs = postOut
break
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
// Proof generation failed, so retry
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
berr = multierror.Append(berr, xerrors.Errorf("running window post failed: %w", err))
batchAbort()
return
}
// TODO: maybe mark these as faulty somewhere?
log.Warnw("generate window post skipped sectors", "sectors", ps, "error", err, "try", retries)
// Explicitly make sure we haven't aborted this PoSt
// (GenerateWindowPoSt may or may not check this).
// Otherwise, we could try to continue proving a
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
berr = multierror.Append(berr, ctx.Err())
return
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
skipCount += sc
ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
if len(ssi) == 0 {
continue
}
sinfos = append(sinfos, ssi...)
partitions = append(partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
}
if len(sinfos) == 0 {
// nothing to prove for this batch
break
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
log.Warnf("nothing to prove for batch: %d", batchIdx)
return
}
posts[batchIdx] = params
// Generate proof
log.Infow("running window post",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
}(batchCtx, batchIdx, batch, batchPartitionStartIdx, params)
}
batchWg.Wait()
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, append(abi.PoStRandomness{}, rand...))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
headTs, err := s.api.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("getting current head: %w", err)
}
checkRand, err := s.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
if !bytes.Equal(checkRand, rand) {
log.Warnw("windowpost randomness changed", "old", rand, "new", checkRand, "ts-height", ts.Height(), "challenge-height", di.Challenge, "tsk", ts.Key())
rand = checkRand
continue
}
// If we generated an incorrect proof, try again.
if correct, err := s.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(checkRand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil {
log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue
} else if !correct {
log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions
params.Proofs = postOut
break
}
// Proof generation failed, so retry
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
return nil, xerrors.Errorf("running window post failed: %w", err)
}
// TODO: maybe mark these as faulty somewhere?
log.Warnw("generate window post skipped sectors", "sectors", ps, "error", err, "try", retries)
// Explicitly make sure we haven't aborted this PoSt
// (GenerateWindowPoSt may or may not check this).
// Otherwise, we could try to continue proving a
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
return nil, ctx.Err()
}
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
}
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
continue
}
posts = append(posts, params)
if berr != nil {
return nil, berr
}
return posts, nil