decoupling winningpost and windowpost from lotus-miner

This commit is contained in:
mz-sirius 2021-07-27 11:15:53 +08:00
parent 1bc987772c
commit 3fd55fa56b
30 changed files with 1601 additions and 199 deletions

46
RunPoStWorkerManual.md Normal file
View File

@ -0,0 +1,46 @@
### Environment variables
Ensure that workers have access to the following environment variables when they run.
```
export TMPDIR=/fast/disk/folder3 # used when sealing
export MINER_API_INFO:<TOKEN>:/ip4/<miner_api_address>/tcp/<port>/http`
export BELLMAN_CPU_UTILIZATION=0.875 # optimal value depends on exact hardware
export FIL_PROOFS_MAXIMIZE_CACHING=1
export FIL_PROOFS_USE_GPU_COLUMN_BUILDER=1 # when GPU is available
export FIL_PROOFS_USE_GPU_TREE_BUILDER=1 # when GPU is available
export FIL_PROOFS_PARAMETER_CACHE=/fast/disk/folder # > 100GiB!
export FIL_PROOFS_PARENT_CACHE=/fast/disk/folder2 # > 50GiB!
# The following increases speed of PreCommit1 at the cost of using a full
# CPU core-complex rather than a single core.
# See https://github.com/filecoin-project/rust-fil-proofs/ and the
# "Worker co-location" section below.
export FIL_PROOFS_USE_MULTICORE_SDR=1
```
### Run the worker
```
lotus-worker run <flags>
```
These are old flags:
```
--addpiece enable addpiece (default: true)
--precommit1 enable precommit1 (32G sectors: 1 core, 128GiB RAM) (default: true)
--unseal enable unsealing (32G sectors: 1 core, 128GiB RAM) (default: true)
--precommit2 enable precommit2 (32G sectors: multiple cores, 96GiB RAM) (default: true)
--commit enable commit (32G sectors: multiple cores or GPUs, 128GiB RAM + 64GiB swap) (default: true)
```
We added two new flags:
```
--windowpost enable windowpost (default: false)
--winnningpost enable winningpost (default: false)
```
These post flags have priority over old flags. If you want this worker to be a window post machine, you can just set the windowpost flag to be `true`. Similar to winning post machine. If you set both of them to be `true`, it will be a window post machine.

View File

@ -154,6 +154,8 @@ type StorageMiner interface {
StorageLocal(ctx context.Context) (map[stores.ID]string, error) //perm:admin
StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) //perm:admin
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) //perm:admin
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
MarketListDeals(ctx context.Context) ([]MarketDeal, error) //perm:read
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read

View File

@ -6,10 +6,13 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
)
@ -33,20 +36,22 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error) //perm:admin
// storiface.WorkerCalls
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffi.FallbackChallenges) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, postChallenges *ffi.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) //perm:admin
TaskDisable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer"
@ -22,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -30,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
@ -804,6 +807,8 @@ type StorageMinerStruct struct {
StorageGetLocks func(p0 context.Context) (storiface.SectorLocks, error) `perm:"admin"`
StorageGetUrl func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) `perm:"admin"`
StorageInfo func(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageList func(p0 context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
@ -865,6 +870,10 @@ type WorkerStruct struct {
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
GenerateWindowPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffi.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) `perm:"admin"`
GenerateWinningPoSt func(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffi.FallbackChallenges) ([]proof.PoStProof, error) `perm:"admin"`
Info func(p0 context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
MoveStorage func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
@ -4720,6 +4729,17 @@ func (s *StorageMinerStub) StorageGetLocks(p0 context.Context) (storiface.Sector
return *new(storiface.SectorLocks), ErrNotSupported
}
func (s *StorageMinerStruct) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
if s.Internal.StorageGetUrl == nil {
return "", ErrNotSupported
}
return s.Internal.StorageGetUrl(p0, p1, p2)
}
func (s *StorageMinerStub) StorageGetUrl(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType) (string, error) {
return "", ErrNotSupported
}
func (s *StorageMinerStruct) StorageInfo(p0 context.Context, p1 stores.ID) (stores.StorageInfo, error) {
if s.Internal.StorageInfo == nil {
return *new(stores.StorageInfo), ErrNotSupported
@ -4962,6 +4982,28 @@ func (s *WorkerStub) GenerateSectorKeyFromData(p0 context.Context, p1 storage.Se
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffi.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
if s.Internal.GenerateWindowPoSt == nil {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
}
return s.Internal.GenerateWindowPoSt(p0, p1, p2, p3, p4, p5, p6)
}
func (s *WorkerStub) GenerateWindowPoSt(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 int, p4 int, p5 abi.PoStRandomness, p6 *ffi.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
return *new(ffiwrapper.WindowPoStResult), ErrNotSupported
}
func (s *WorkerStruct) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffi.FallbackChallenges) ([]proof.PoStProof, error) {
if s.Internal.GenerateWinningPoSt == nil {
return *new([]proof.PoStProof), ErrNotSupported
}
return s.Internal.GenerateWinningPoSt(p0, p1, p2, p3, p4)
}
func (s *WorkerStub) GenerateWinningPoSt(p0 context.Context, p1 abi.ActorID, p2 ffi.SortedPrivateSectorInfo, p3 abi.PoStRandomness, p4 *ffi.FallbackChallenges) ([]proof.PoStProof, error) {
return *new([]proof.PoStProof), ErrNotSupported
}
func (s *WorkerStruct) Info(p0 context.Context) (storiface.WorkerInfo, error) {
if s.Internal.Info == nil {
return *new(storiface.WorkerInfo), ErrNotSupported

View File

@ -173,6 +173,17 @@ var runCmd = &cli.Command{
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
&cli.BoolFlag{
Name: "windowpost",
Usage: "enable window post",
Value: false,
},
&cli.BoolFlag{
Name: "winningpost",
Usage: "enable winning post",
Value: false,
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
@ -269,6 +280,14 @@ var runCmd = &cli.Command{
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if cctx.Bool("windowpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
}
if cctx.Bool("winningpost") {
taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
}

View File

@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"golang.org/x/xerrors"
@ -26,11 +25,6 @@ type FaultTracker interface {
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
var bad = make(map[abi.SectorID]string)
ssize, err := pp.SectorSize()
if err != nil {
return nil, err
}
// TODO: More better checks
for _, sector := range sectors {
err := func() error {
@ -48,7 +42,7 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
lp, _, err := m.localStore.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
lp, _, err := m.storage.AcquireSectorPaths(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
@ -61,30 +55,6 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
toCheck := map[string]int64{
lp.Sealed: 1,
filepath.Join(lp.Cache, "p_aux"): 0,
}
addCachePathsForSectorSize(toCheck, lp.Cache, ssize)
for p, sz := range toCheck {
st, err := os.Stat(p)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "err", err)
bad[sector.ID] = fmt.Sprintf("%s", err)
return nil
}
if sz != 0 {
if st.Size() != int64(ssize)*sz {
log.Warnw("CheckProvable Sector FAULT: sector file is wrong size", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "size", st.Size(), "expectSize", int64(ssize)*sz)
bad[sector.ID] = fmt.Sprintf("%s is wrong size (got %d, expect %d)", p, st.Size(), int64(ssize)*sz)
return nil
}
}
}
if rg != nil {
wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
@ -111,7 +81,7 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
_, err = ffi.GenerateSingleVanillaProof(ffi.PrivateSectorInfo{
_, err = m.storage.GenerateSingleVanillaProof(ctx, sector.ID.Miner, &ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
SealProof: sector.ProofType,
SectorNumber: sector.ID.Number,

View File

@ -84,3 +84,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, exis
return out, done, nil
}
func (b *Provider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return b.AcquireSector(ctx, id, existing, 0, ptype)
}

View File

@ -6,12 +6,13 @@ import (
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -33,6 +34,8 @@ type Storage interface {
UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
WorkerProver
}
type Verifier interface {
@ -56,6 +59,28 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
type MinerProver interface {
PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error)
SplitSortedPrivateSectorInfo(ctx context.Context, privsector ffi.SortedPrivateSectorInfo, offset int, end int) (ffi.SortedPrivateSectorInfo, error)
GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*ffi.FallbackChallenges, error)
}
type WindowPoStResult struct {
PoStProofs ffi.PartitionProof
Skipped []abi.SectorID
}
type WorkerProver interface {
GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*ffi.PartitionProof, error)
GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error)
}
type WorkerCalls interface {
GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffi.FallbackChallenges) (WindowPoStResult, error)
GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffi.FallbackChallenges) ([]proof.PoStProof, error)
}

View File

@ -53,6 +53,7 @@ func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
Number: f,
})
}
log.Infof("========result:%v", proof)
return proof, faultyIDs, err
}
@ -141,7 +142,75 @@ func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoS
return ffi.VerifyWindowPoSt(info)
}
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, mid abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
randomness[31] &= 0x3f
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
return ffi.GenerateWinningPoStSectorChallenge(proofType, mid, randomness, eligibleSectorCount)
}
func (sb *Sealer) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var doneFuncs []func()
done := func() {
for _, df := range doneFuncs {
df()
}
}
var skipped []abi.SectorID
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSectorPaths(ctx, sid, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)
postProofType, err := rpt(s.SealProof)
if err != nil {
done()
return ffi.SortedPrivateSectorInfo{}, nil, nil, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
privsectors := ffi.NewSortedPrivateSectorInfo(out...)
return privsectors, skipped, done, nil
}
func (sb *Sealer) GeneratePoStFallbackSectorChallenges(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, sectorIds []abi.SectorNumber) (*ffi.FallbackChallenges, error) {
return ffi.GeneratePoStFallbackSectorChallenges(proofType, minerID, randomness, sectorIds)
}
func (sb *Sealer) SplitSortedPrivateSectorInfo(ctx context.Context, privsector ffi.SortedPrivateSectorInfo, offset int, end int) (ffi.SortedPrivateSectorInfo, error) {
return ffi.SplitSortedPrivateSectorInfo(ctx, privsector, offset, end)
}
func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof5.PoStProof, error) {
return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas)
}
func (sb *Sealer) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (*ffi.PartitionProof, error) {
return ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx))
}

View File

@ -42,6 +42,8 @@ type Worker interface {
Session(context.Context) (uuid.UUID, error)
Close() error // TODO: do we need this?
ffiwrapper.WorkerCalls
}
type SectorManager interface {
@ -49,6 +51,8 @@ type SectorManager interface {
storage.Prover
storiface.WorkerReturn
FaultTracker
ffiwrapper.MinerProver
}
var ClosedWorkerID = uuid.UUID{}
@ -73,6 +77,8 @@ type Manager struct {
results map[WorkID]result
waitRes map[WorkID]chan struct{}
ffiwrapper.MinerProver
}
type result struct {
@ -116,7 +122,7 @@ type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, remote: stor})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -137,6 +143,8 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
callRes: map[storiface.CallID]chan result{},
results: map[WorkID]result{},
waitRes: map[WorkID]chan struct{}{},
MinerProver: prover,
}
m.setupWorkTracker()

199
extern/sector-storage/manager_post.go vendored Normal file
View File

@ -0,0 +1,199 @@
package sectorstorage
import (
"context"
"sync"
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
builtin6 "github.com/filecoin-project/specs-actors/v6/actors/builtin"
"github.com/hashicorp/go-multierror"
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
)
func (m *Manager) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
if !m.sched.winningPoStSched.CanSched(ctx) {
log.Info("GenerateWinningPoSt run at lotus-miner")
return m.Prover.GenerateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWinningPoSt(ctx, minerID, sectorInfo, randomness)
}
func (m *Manager) generateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
randomness[31] &= 0x3f
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof)
if err != nil {
return nil, err
}
defer done()
if len(skipped) > 0 {
return nil, xerrors.Errorf("pubSectorToPriv skipped sectors: %+v", skipped)
}
var sectorNums []abi.SectorNumber
for _, s := range ps.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Values()[0].PoStProofType, minerID, randomness, sectorNums)
if err != nil {
return nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
var proofs []proof.PoStProof
err = m.sched.winningPoStSched.Schedule(ctx, false, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWinningPoSt(ctx, minerID, ps, randomness, postChallenges)
if err != nil {
return err
}
proofs = out
return nil
})
if err != nil {
return nil, err
}
return proofs, nil
}
func (m *Manager) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) {
if !m.sched.windowPoStSched.CanSched(ctx) {
log.Info("GenerateWindowPoSt run at lotus-miner")
return m.Prover.GenerateWindowPoSt(ctx, minerID, sectorInfo, randomness)
}
return m.generateWindowPoSt(ctx, minerID, sectorInfo, randomness)
}
func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, []abi.SectorID, error) {
var retErr error = nil
randomness[31] &= 0x3f
out := make([]proof.PoStProof, 0)
if len(sectorInfo) == 0 {
return nil, nil, xerrors.New("generate window post len(sectorInfo)=0")
}
//get window proof type
proofType, err := abi.RegisteredSealProof.RegisteredWindowPoStProof(sectorInfo[0].SealProof)
if err != nil {
return nil, nil, err
}
// Get sorted and de-duplicate sectors info
ps, skipped, done, err := m.PubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
return nil, nil, xerrors.Errorf("PubSectorToPriv failed: %+v", err)
}
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("skipped = %d", len(skipped))
}
defer done()
partitionSectorsCount, err := builtin6.PoStProofWindowPoStPartitionSectors(proofType)
if err != nil {
return nil, nil, xerrors.Errorf("get sectors count of partition failed:%+v", err)
}
// The partitions number of this batch
partitionCount := (len(ps.Values()) + int(partitionSectorsCount) - 1) / int(partitionSectorsCount)
log.Infof("generateWindowPoSt len(partitionSectorsCount):%d len(partitionCount):%d \n", partitionSectorsCount, partitionCount)
var faults []abi.SectorID
var flk sync.Mutex
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var sectorNums []abi.SectorNumber
for _, s := range ps.Values() {
sectorNums = append(sectorNums, s.SectorNumber)
}
postChallenges, err := m.GeneratePoStFallbackSectorChallenges(ctx, ps.Values()[0].PoStProofType, minerID, randomness, sectorNums)
if err != nil {
return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err)
}
proofList := make([]ffi.PartitionProof, partitionCount)
var wg sync.WaitGroup
for i := 0; i < partitionCount; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
proof, faultsectors, err := m.generatePartitionWindowPost(cctx, minerID, n, int(partitionSectorsCount), partitionCount, ps, randomness, postChallenges)
if err != nil {
retErr = multierror.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", i, err))
if len(faultsectors) > 0 {
log.Errorf("generateWindowPost groupCount:%d, faults:%d, err: %+v", i, len(faultsectors), err)
flk.Lock()
faults = append(faults, faultsectors...)
flk.Unlock()
}
cancel()
return
}
proofList[n] = proof
}(i)
time.Sleep(1 * time.Second)
}
wg.Wait()
postProofs, err := ffi.MergeWindowPoStPartitionProofs(proofType, proofList)
if len(faults) > 0 {
log.Warnf("GenerateWindowPoSt get faults: %d", len(faults))
return out, faults, retErr
}
out = append(out, *postProofs)
return out, skipped, retErr
}
func (m *Manager) generatePartitionWindowPost(ctx context.Context, minerID abi.ActorID, index int, psc int, groupCount int, ps ffi.SortedPrivateSectorInfo, randomness abi.PoStRandomness, postChallenges *ffi.FallbackChallenges) (ffi.PartitionProof, []abi.SectorID, error) {
var faults []abi.SectorID
start := index * psc
end := (index + 1) * psc
if index == groupCount-1 {
end = len(ps.Values())
}
log.Infow("generateWindowPost", "start", start, "end", end, "index", index)
privsectors, err := m.SplitSortedPrivateSectorInfo(ctx, ps, start, end)
if err != nil {
return ffi.PartitionProof{}, faults, xerrors.Errorf("generateWindowPost GetScopeSortedPrivateSectorInfo failed: %w", err)
}
var result *ffiwrapper.WindowPoStResult
err = m.sched.windowPoStSched.Schedule(ctx, true, func(ctx context.Context, w Worker) error {
out, err := w.GenerateWindowPoSt(ctx, minerID, privsectors, index, start, randomness, postChallenges)
if err != nil {
return err
}
result = &out
return nil
})
if err != nil {
return ffi.PartitionProof{}, faults, err
}
if len(result.Skipped) > 0 {
log.Warnf("generateWindowPost partition:%d, get faults:%d", index, len(result.Skipped))
return ffi.PartitionProof{}, result.Skipped, xerrors.Errorf("generatePartitionWindowPoStProofs partition:%d get faults:%d", index, len(result.Skipped))
}
return result.PoStProofs, faults, err
}

View File

@ -10,6 +10,7 @@ import (
"math/rand"
"sync"
ffi "github.com/filecoin-project/filecoin-ffi"
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
@ -553,6 +554,22 @@ func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callI
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionSectorsCount(ctx context.Context, prooftype abi.RegisteredPoStProof) (int, error) {
panic("not supported")
}
func (mgr *SectorMgr) GetPartitionVanillaParams(ctx context.Context, proofType abi.RegisteredPoStProof) (string, error) {
panic("not supported")
}
func (mgr *SectorMgr) PubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
panic("not supported")
}
func (mgr *SectorMgr) SplitSortedPrivateSectorInfo(ctx context.Context, privsector ffi.SortedPrivateSectorInfo, offset int, end int) (ffi.SortedPrivateSectorInfo, error) {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -12,8 +12,9 @@ import (
)
type readonlyProvider struct {
index stores.SectorIndex
stor *stores.Local
index stores.SectorIndex
stor *stores.Local
remote *stores.Remote
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
@ -38,3 +39,23 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorR
return p, cancel, err
}
func (l *readonlyProvider) AcquireSectorPaths(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
ctx, cancel := context.WithCancel(ctx)
// use TryLock to avoid blocking
locked, err := l.index.StorageTryLock(ctx, id.ID, existing, storiface.FTNone)
if err != nil {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
if !locked {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
}
p, _, err := l.remote.AcquireSectorPaths(ctx, id, existing, storiface.FTNone, sealing)
return p, cancel, err
}

View File

@ -68,6 +68,11 @@ type scheduler struct {
info chan func(interface{})
// window scheduler
windowPoStSched *poStScheduler
// winning scheduler
winningPoStSched *poStScheduler
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
@ -83,6 +88,8 @@ type workerHandle struct {
lk sync.Mutex // can be taken inside sched.workersLk.RLock
acceptTasks map[sealtasks.TaskType]struct{}
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*schedWindow
@ -162,6 +169,9 @@ func newScheduler() *scheduler {
info: make(chan func(interface{})),
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
@ -547,6 +557,9 @@ func (sh *scheduler) schedClose() {
for i, w := range sh.workers {
sh.workerCleanup(i, w)
}
sh.windowPoStSched.schedClose()
sh.winningPoStSched.schedClose()
}
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {

254
extern/sector-storage/sched_post.go vendored Normal file
View File

@ -0,0 +1,254 @@
package sectorstorage
import (
"context"
"sort"
"sync"
"time"
sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
xerrors "golang.org/x/xerrors"
)
type poStScheduler struct {
lk sync.RWMutex
workers map[storiface.WorkerID]*workerHandle
cond *sync.Cond
postType sealtasks.TaskType
GPUUtilization float64
}
func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
ps := &poStScheduler{
workers: map[storiface.WorkerID]*workerHandle{},
postType: t,
GPUUtilization: storiface.GPUUtilizationProof,
}
ps.cond = sync.NewCond(&ps.lk)
return ps
}
func (ps *poStScheduler) AddWorker(wid storiface.WorkerID, w *workerHandle) bool {
if _, ok := w.acceptTasks[ps.postType]; !ok {
return false
}
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid] = w
go ps.watch(wid, w)
ps.cond.Broadcast()
return true
}
func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *workerHandle {
ps.lk.Lock()
defer ps.lk.Unlock()
var w *workerHandle = nil
if wh, ok := ps.workers[wid]; ok {
w = wh
delete(ps.workers, wid)
}
return w
}
func (ps *poStScheduler) CanSched(ctx context.Context) bool {
ps.lk.RLock()
defer ps.lk.RUnlock()
if len(ps.workers) == 0 {
return false
}
for _, w := range ps.workers {
if w.enabled {
return true
}
}
return false
}
func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, work WorkerAction) error {
ps.lk.Lock()
defer ps.lk.Unlock()
if len(ps.workers) == 0 {
return xerrors.Errorf("cant find %s post worker", ps.postType)
}
// Get workers by resource
canDo, accepts := ps.canHandleWorkers()
for !canDo {
//if primary is true, it must be dispatched to a worker
if primary {
ps.cond.Wait()
canDo, accepts = ps.canHandleWorkers()
} else {
return xerrors.Errorf("cant find %s post worker", ps.postType)
}
}
return ps.withResources(accepts[0], func(worker Worker) error {
ps.lk.Unlock()
defer ps.lk.Lock()
return work(ctx, worker)
})
}
func (ps *poStScheduler) canHandleWorkers() (bool, []storiface.WorkerID) {
var accepts []storiface.WorkerID
//if the gpus of the worker are insufficient or its disable, it cannot be scheduled
for wid, wr := range ps.workers {
if wr.active.gpuUsed >= float64(len(wr.info.Resources.GPUs)) || !wr.enabled {
continue
}
accepts = append(accepts, wid)
}
freeGPU := func(i int) float64 {
w := ps.workers[accepts[i]]
return float64(len(w.info.Resources.GPUs)) - w.active.gpuUsed
}
sort.Slice(accepts[:], func(i, j int) bool {
return freeGPU(i) > freeGPU(j)
})
if len(accepts) == 0 {
return false, accepts
}
return true, accepts
}
func (ps *poStScheduler) withResources(wid storiface.WorkerID, cb func(wid Worker) error) error {
ps.addResource(wid)
worker := ps.workers[wid].workerRpc
err := cb(worker)
ps.freeResource(wid)
if ps.cond != nil {
ps.cond.Broadcast()
}
return err
}
func (ps *poStScheduler) freeResource(wid storiface.WorkerID) {
if _, ok := ps.workers[wid]; !ok {
log.Warnf("release PoSt Worker not found worker")
return
}
if ps.workers[wid].active.gpuUsed > 0 {
ps.workers[wid].active.gpuUsed -= ps.GPUUtilization
}
return
}
func (ps *poStScheduler) addResource(wid storiface.WorkerID) {
ps.workers[wid].active.gpuUsed += ps.GPUUtilization
}
func (ps *poStScheduler) disable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = false
}
func (ps *poStScheduler) enable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = true
}
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) {
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
defer heartbeatTimer.Stop()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer close(worker.closedMgr)
defer func() {
log.Warnw("Worker closing", "WorkerID", wid)
ps.delWorker(wid)
}()
for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := worker.workerRpc.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
ps.disable(wid)
select {
case <-heartbeatTimer.C:
continue
case <-worker.closingMgr:
return
}
}
if storiface.WorkerID(curSes) != wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", wid, "current", curSes)
}
return
}
ps.enable(wid)
}
}
func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
default:
close(w.closingMgr)
}
ps.lk.Unlock()
select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
}
ps.lk.Lock()
}
func (ps *poStScheduler) schedClose() {
ps.lk.Lock()
defer ps.lk.Unlock()
log.Debugf("closing scheduler")
for i, w := range ps.workers {
ps.workerCleanup(i, w)
}
}
func (ps *poStScheduler) WorkerStats(cb func(wid storiface.WorkerID, worker *workerHandle)) {
ps.lk.RLock()
defer ps.lk.RUnlock()
for id, w := range ps.workers {
cb(id, w)
}
}

View File

@ -16,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -134,6 +136,14 @@ func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id st
panic("implement me")
}
func (s *schedTestWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, partitionVanillaParams string, offset int, randomness abi.PoStRandomness) (ffiwrapper.WindowPoStResult, error) {
panic("implement me")
}
func (s *schedTestWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
panic("implement me")
}
func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return s.taskTypes, nil
}

View File

@ -38,6 +38,10 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
tasks, err := w.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting worker tasks: %w", err)
}
worker := &workerHandle{
workerRpc: w,
@ -47,12 +51,20 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
active: &activeResources{},
enabled: true,
acceptTasks: tasks,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
}
wid := storiface.WorkerID(sessID)
//add worker to post scheduler
if sh.windowPoStSched.AddWorker(wid, worker) ||
sh.winningPoStSched.AddWorker(wid, worker) {
return nil
}
sh.workersLk.Lock()
_, exist := sh.workers[wid]
if exist {

View File

@ -18,6 +18,9 @@ const (
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
)
var order = map[TaskType]int{
@ -33,6 +36,9 @@ var order = map[TaskType]int{
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2, // most priority
TTGenerateWindowPoSt: -3,
TTGenerateWinningPoSt: -4,
}
var shortNames = map[TaskType]string{
@ -52,6 +58,9 @@ var shortNames = map[TaskType]string{
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTGenerateWindowPoSt: "WDP",
TTGenerateWinningPoSt: "WNP",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {

View File

@ -10,11 +10,10 @@ import (
func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
m.sched.workersLk.RLock()
defer m.sched.workersLk.RUnlock()
out := map[uuid.UUID]storiface.WorkerStats{}
for id, handle := range m.sched.workers {
cb := func(id storiface.WorkerID, handle *workerHandle) {
handle.lk.Lock()
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
@ -28,6 +27,15 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
handle.lk.Unlock()
}
for id, handle := range m.sched.workers {
cb(id, handle)
}
m.sched.workersLk.RUnlock()
//list post workers
m.sched.winningPoStSched.WorkerStats(cb)
m.sched.windowPoStSched.WorkerStats(cb)
return out
}

View File

@ -2,6 +2,7 @@ package stores
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strconv"
@ -10,6 +11,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -56,6 +58,9 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
//for post vanilla
mux.HandleFunc("/remote/vanilla/single", handler.generateSingleVanillaProof).Methods("POST")
mux.ServeHTTP(w, r)
}
@ -298,3 +303,32 @@ func ftFromString(t string) (storiface.SectorFileType, error) {
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}
type SingleVanillaParams struct {
PrivSector ffi.PrivateSectorInfo
Challange []uint64
}
func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
params := SingleVanillaParams{}
err = json.Unmarshal(body, &params)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
vanilla, err := ffi.GenerateSingleVanillaProof(params.PrivSector, params.Challange)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
w.Write(vanilla)
}

View File

@ -42,6 +42,8 @@ type StorageInfo struct {
Groups []Group
AllowTo []Group
// storage path
Path string
}
type HealthReport struct {
@ -58,6 +60,9 @@ type SectorStorageInfo struct {
CanStore bool
Primary bool
// storage path
Path string
}
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/index.go -package=mocks . SectorIndex
@ -79,6 +84,9 @@ type SectorIndex interface { // part of storage-miner api
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error)
StorageList(ctx context.Context) (map[ID][]Decl, error)
// get sector storage url for post worker
StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error)
}
type Decl struct {
@ -177,6 +185,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo
i.stores[si.ID].info.Path = si.Path
return nil
}
i.stores[si.ID] = &storageEntry{
@ -350,6 +360,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: isprimary[id],
Path: st.info.Path,
})
}
@ -419,6 +431,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,
Primary: false,
Path: st.info.Path,
})
}
}
@ -522,4 +536,49 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]ID,
return out, nil
}
func (i *Index) StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) {
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := map[ID]uint64{}
for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 {
continue
}
for _, id := range i.sectors[Decl{s, pathType}] {
storageIDs[id.storage]++
}
}
storages := make([]StorageInfo, 0, len(storageIDs))
for id, _ := range storageIDs {
st, ok := i.stores[id]
if !ok {
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
continue
}
urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
urls[k] = u
}
if st.info.CanStore {
storages = append(storages, StorageInfo{
URLs: urls,
})
}
}
if len(storages) == 0 {
return "", xerrors.New("cant find sector storage")
}
if len(storages[0].URLs) == 0 {
return "", xerrors.New("sector storage url is nil")
}
return storages[0].URLs[0], nil
}
var _ SectorIndex = &Index{}

View File

@ -4,6 +4,7 @@ import (
"context"
"os"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
@ -36,6 +37,8 @@ type PartialFileHandler interface {
type Store interface {
AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool, keepIn []ID) error
// like remove, but doesn't remove the primary sector copy, nor the last
@ -48,4 +51,6 @@ type Store interface {
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error)
GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffi.PrivateSectorInfo, challange []uint64) ([]byte, error)
}

View File

@ -13,6 +13,7 @@ import (
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
@ -221,6 +222,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
CanStore: meta.CanStore,
Groups: meta.Groups,
AllowTo: meta.AllowTo,
Path: p,
}, fst)
if err != nil {
return xerrors.Errorf("declaring storage in index: %w", err)
@ -718,4 +720,11 @@ func (st *Local) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
return p.stat(st.localStorage)
}
func (st *Local) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffi.PrivateSectorInfo, challange []uint64) ([]byte, error) {
return nil, nil
}
func (st *Local) AcquireSectorPaths(ctx context.Context, sid storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
return st.AcquireSector(ctx, sid, existing, allocate, pathType, storiface.AcquireCopy)
}
var _ Store = &Local{}

View File

@ -197,3 +197,18 @@ func (mr *MockSectorIndexMockRecorder) StorageTryLock(arg0, arg1, arg2, arg3 int
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageTryLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageTryLock), arg0, arg1, arg2, arg3)
}
// StorageGetUrl mocks base method.
func (m *MockSectorIndex) StorageGetUrl(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageGetUrl", ctx, s, ft)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageGetUrl indicates an expected call of StorageGetUrl.
func (mr *MockSectorIndexMockRecorder) StorageGetUrl(ctx, s, ft interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageGetUrl", reflect.TypeOf((*MockSectorIndex)(nil).StorageGetUrl), ctx, s, ft)
}

View File

@ -0,0 +1,243 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: interface.go
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
os "os"
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
ffiwrapper "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
partialfile "github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
storage "github.com/filecoin-project/specs-storage/storage"
gomock "github.com/golang/mock/gomock"
)
// MockpartialFileHandler is a mock of partialFileHandler interface.
type MockpartialFileHandler struct {
ctrl *gomock.Controller
recorder *MockpartialFileHandlerMockRecorder
}
// MockpartialFileHandlerMockRecorder is the mock recorder for MockpartialFileHandler.
type MockpartialFileHandlerMockRecorder struct {
mock *MockpartialFileHandler
}
// NewMockpartialFileHandler creates a new mock instance.
func NewMockpartialFileHandler(ctrl *gomock.Controller) *MockpartialFileHandler {
mock := &MockpartialFileHandler{ctrl: ctrl}
mock.recorder = &MockpartialFileHandlerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockpartialFileHandler) EXPECT() *MockpartialFileHandlerMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockpartialFileHandler) Close(pf *partialfile.PartialFile) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close", pf)
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockpartialFileHandlerMockRecorder) Close(pf interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockpartialFileHandler)(nil).Close), pf)
}
// HasAllocated mocks base method.
func (m *MockpartialFileHandler) HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasAllocated", pf, offset, size)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HasAllocated indicates an expected call of HasAllocated.
func (mr *MockpartialFileHandlerMockRecorder) HasAllocated(pf, offset, size interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasAllocated", reflect.TypeOf((*MockpartialFileHandler)(nil).HasAllocated), pf, offset, size)
}
// OpenPartialFile mocks base method.
func (m *MockpartialFileHandler) OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialfile.PartialFile, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OpenPartialFile", maxPieceSize, path)
ret0, _ := ret[0].(*partialfile.PartialFile)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OpenPartialFile indicates an expected call of OpenPartialFile.
func (mr *MockpartialFileHandlerMockRecorder) OpenPartialFile(maxPieceSize, path interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenPartialFile", reflect.TypeOf((*MockpartialFileHandler)(nil).OpenPartialFile), maxPieceSize, path)
}
// Reader mocks base method.
func (m *MockpartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Reader", pf, offset, size)
ret0, _ := ret[0].(*os.File)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Reader indicates an expected call of Reader.
func (mr *MockpartialFileHandlerMockRecorder) Reader(pf, offset, size interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reader", reflect.TypeOf((*MockpartialFileHandler)(nil).Reader), pf, offset, size)
}
// MockStore is a mock of Store interface.
type MockStore struct {
ctrl *gomock.Controller
recorder *MockStoreMockRecorder
}
// MockStoreMockRecorder is the mock recorder for MockStore.
type MockStoreMockRecorder struct {
mock *MockStore
}
// NewMockStore creates a new mock instance.
func NewMockStore(ctrl *gomock.Controller) *MockStore {
mock := &MockStore{ctrl: ctrl}
mock.recorder = &MockStoreMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStore) EXPECT() *MockStoreMockRecorder {
return m.recorder
}
// AcquireSector mocks base method.
func (m *MockStore) AcquireSector(ctx context.Context, s storage.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AcquireSector", ctx, s, existing, allocate, sealing, op)
ret0, _ := ret[0].(storiface.SectorPaths)
ret1, _ := ret[1].(storiface.SectorPaths)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// AcquireSector indicates an expected call of AcquireSector.
func (mr *MockStoreMockRecorder) AcquireSector(ctx, s, existing, allocate, sealing, op interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireSector", reflect.TypeOf((*MockStore)(nil).AcquireSector), ctx, s, existing, allocate, sealing, op)
}
// FsStat mocks base method.
func (m *MockStore) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FsStat", ctx, id)
ret0, _ := ret[0].(fsutil.FsStat)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FsStat indicates an expected call of FsStat.
func (mr *MockStoreMockRecorder) FsStat(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FsStat", reflect.TypeOf((*MockStore)(nil).FsStat), ctx, id)
}
// MoveStorage mocks base method.
func (m *MockStore) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MoveStorage", ctx, s, types)
ret0, _ := ret[0].(error)
return ret0
}
// MoveStorage indicates an expected call of MoveStorage.
func (mr *MockStoreMockRecorder) MoveStorage(ctx, s, types interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), ctx, s, types)
}
// Remove mocks base method.
func (m *MockStore) Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Remove", ctx, s, types, force)
ret0, _ := ret[0].(error)
return ret0
}
// Remove indicates an expected call of Remove.
func (mr *MockStoreMockRecorder) Remove(ctx, s, types, force interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockStore)(nil).Remove), ctx, s, types, force)
}
// RemoveCopies mocks base method.
func (m *MockStore) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveCopies", ctx, s, types)
ret0, _ := ret[0].(error)
return ret0
}
// RemoveCopies indicates an expected call of RemoveCopies.
func (mr *MockStoreMockRecorder) RemoveCopies(ctx, s, types interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveCopies", reflect.TypeOf((*MockStore)(nil).RemoveCopies), ctx, s, types)
}
// Reserve mocks base method.
func (m *MockStore) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Reserve", ctx, sid, ft, storageIDs, overheadTab)
ret0, _ := ret[0].(func())
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Reserve indicates an expected call of Reserve.
func (mr *MockStoreMockRecorder) Reserve(ctx, sid, ft, storageIDs, overheadTab interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reserve", reflect.TypeOf((*MockStore)(nil).Reserve), ctx, sid, ft, storageIDs, overheadTab)
}
// GenerateWindowPoStVanilla mocks base method.
func (m *MockStore) GenerateWindowPoStVanilla(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, vanillaParams string, randomness abi.PoStRandomness) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateWindowPoStVanilla", ctx, minerID, privsector, vanillaParams, randomness)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenerateWindowPoStVanilla indicates an expected call of GenerateWindowPoStVanilla.
func (mr *MockStoreMockRecorder) GenerateWindowPoStVanilla(ctx, minerID, privsector, vanillaParams, randomness interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWindowPoStVanilla", reflect.TypeOf((*MockStore)(nil).GenerateWindowPoStVanilla), ctx, minerID, privsector, vanillaParams, randomness)
}
// GenerateWinningPoStVanilla mocks base method.
func (m *MockStore) GenerateWinningPoStVanilla(ctx context.Context, minerID abi.ActorID, privsector *ffiwrapper.PrivateSectorInfo, randomness abi.PoStRandomness) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GenerateWinningPoStVanilla", ctx, minerID, privsector, randomness)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenerateWinningPoStVanilla indicates an expected call of GenerateWinningPoStVanilla.
func (mr *MockStoreMockRecorder) GenerateWinningPoStVanilla(ctx, minerID, privsector, randomness interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWinningPoStVanilla", reflect.TypeOf((*MockStore)(nil).GenerateWinningPoStVanilla), ctx, minerID, privsector, randomness)
}

View File

@ -14,8 +14,10 @@ import (
gopath "path"
"path/filepath"
"sort"
"strings"
"sync"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
@ -738,6 +740,135 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}, nil
}
func (r *Remote) AcquireSectorPaths(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
for {
r.fetchLk.Lock()
c, locked := r.fetching[s.ID]
if !locked {
r.fetching[s.ID] = make(chan struct{})
r.fetchLk.Unlock()
break
}
r.fetchLk.Unlock()
select {
case <-c:
continue
case <-ctx.Done():
return storiface.SectorPaths{}, storiface.SectorPaths{}, ctx.Err()
}
}
defer func() {
r.fetchLk.Lock()
close(r.fetching[s.ID])
delete(r.fetching, s.ID)
r.fetchLk.Unlock()
}()
var paths storiface.SectorPaths
var stores storiface.SectorPaths
ssize, err := s.ProofType.SectorSize()
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 {
continue
}
sis, err := r.index.StorageFindSector(ctx, s.ID, fileType, ssize, false)
if err != nil {
log.Warnf("finding existing sector %d failed: %+v", s.ID, err)
continue
}
for _, si := range sis {
if (pathType == storiface.PathSealing) && !si.CanSeal {
continue
}
if (pathType == storiface.PathStorage) && !si.CanStore {
continue
}
spath := filepath.Join(si.Path, fileType.String(), storiface.SectorName(s.ID))
storiface.SetPathByType(&paths, fileType, spath)
storiface.SetPathByType(&stores, fileType, string(si.ID))
if si.CanStore {
existing ^= fileType
break
}
}
}
return paths, stores, nil
}
func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, privsector *ffi.PrivateSectorInfo, challange []uint64) ([]byte, error) {
sector := abi.SectorID{
Miner: minerID,
Number: privsector.SectorNumber,
}
storageUrl, err := r.index.StorageGetUrl(ctx, sector, storiface.FTCache|storiface.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector storage: %w", err)
}
surl, err := url.Parse(storageUrl)
if err != nil {
return nil, xerrors.Errorf("parse sector storage url failed : %w", err)
}
surl.Path = gopath.Join(surl.Path, "vanilla", "single")
requestParams := SingleVanillaParams{
PrivSector: *privsector,
Challange: challange,
}
bytes, err := json.Marshal(requestParams)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", surl.String(), strings.NewReader(string(bytes)))
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return nil, xerrors.Errorf("non-200 code: %w", string(body))
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("resp.Body ReadAll: %w", err)
}
return body, nil
}
var _ Store = &Remote{}
type funcCloser func() error

View File

@ -42,6 +42,7 @@ type Resources struct {
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
var GPUUtilizationProof float64 = 1.0
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64, gpus int) uint64 {

View File

@ -31,6 +31,18 @@ func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID,
panic("implement me")
}
func (t *testExec) GenerateWindowPoStWithVanilla(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, vanillas []string) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GenerateWinningPoStWithVanilla(ctx context.Context, mid abi.ActorID, privsectors ffiwrapper.SortedPrivateSectorInfo, randomness abi.PoStRandomness, vanillas []string) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GetSectorVanillaParams(ctx context.Context, index int, partitionVanillaParams string) (string, error) {
panic("implement me")
}
func (t *testExec) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}

View File

@ -20,6 +20,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -155,6 +156,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorPaths(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
return storiface.SectorPaths{}, nil, nil
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
@ -508,6 +513,90 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef,
})
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, randomness abi.PoStRandomness, sectorChallenges *ffi.FallbackChallenges) ([]proof.PoStProof, error) {
sb, err := l.executor()
if err != nil {
return nil, err
}
ps := privsectors.Values()
vanillas := make([][]byte, len(ps))
var rerr error
var wg sync.WaitGroup
for i, v := range ps {
wg.Add(1)
go func(index int, sector ffi.PrivateSectorInfo) {
defer wg.Done()
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &sector, sectorChallenges.Challenges[sector.SectorNumber])
if err != nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila failed: %w", sector.SectorNumber, err))
return
}
if vanilla == nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila is nil", sector.SectorNumber))
}
vanillas[index] = vanilla
}(i, ffi.PrivateSectorInfo(v))
}
wg.Wait()
if rerr != nil {
return nil, rerr
}
return sb.GenerateWinningPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, vanillas)
}
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, mid abi.ActorID, privsectors ffi.SortedPrivateSectorInfo, partitionIdx int, offset int, randomness abi.PoStRandomness, sectorChallenges *ffi.FallbackChallenges) (ffiwrapper.WindowPoStResult, error) {
sb, err := l.executor()
if err != nil {
return ffiwrapper.WindowPoStResult{}, err
}
var slk sync.Mutex
var skipped []abi.SectorID
ps := privsectors.Values()
out := make([][]byte, len(ps))
var wg sync.WaitGroup
for i := range ps {
wg.Add(1)
go func(index int) {
defer wg.Done()
sector := ps[index]
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, &sector, sectorChallenges.Challenges[sector.SectorNumber])
if err != nil || vanilla == nil {
slk.Lock()
skipped = append(skipped, abi.SectorID{
Miner: mid,
Number: sector.SectorNumber,
})
slk.Unlock()
log.Errorf("get sector: %d, vanilla: %s, offset: %d", sector.SectorNumber, vanilla, offset+index)
return
}
out[index] = vanilla
}(i)
}
wg.Wait()
if len(skipped) > 0 {
return ffiwrapper.WindowPoStResult{PoStProofs: ffi.PartitionProof{}, Skipped: skipped}, nil
}
PoSts, err := sb.GenerateWindowPoStWithVanilla(ctx, ps[0].PoStProofType, mid, randomness, out, partitionIdx)
if err != nil {
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, err
}
return ffiwrapper.WindowPoStResult{PoStProofs: *PoSts, Skipped: skipped}, nil
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()

View File

@ -3,11 +3,14 @@ package storage
import (
"bytes"
"context"
"sync"
"time"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/specs-storage/storage"
"github.com/hashicorp/go-multierror"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
@ -216,7 +219,14 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
})
}
bad, err := s.faultTracker.CheckProvable(ctx, s.proofType, tocheck, nil)
bad, err := s.faultTracker.CheckProvable(ctx, s.proofType, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, error) {
for _, sector := range sectorInfos {
if sector.SectorNumber == id.Number {
return sector.SealedCID, nil
}
}
return cid.Undef, xerrors.Errorf("cann't get commr for sector %d", id.Number)
})
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("checking provable sectors: %w", err)
}
@ -546,8 +556,13 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
return nil, err
}
var berr error
batchCtx, batchAbort := context.WithCancel(ctx)
defer batchAbort()
// Generate proofs in batches
posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches))
var batchWg sync.WaitGroup
posts := make([]miner.SubmitWindowedPoStParams, len(partitionBatches))
for batchIdx, batch := range partitionBatches {
batchPartitionStartIdx := 0
for _, batch := range partitionBatches[:batchIdx] {
@ -560,162 +575,218 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
Proofs: nil,
}
postSkipped := bitfield.New()
somethingToProve := false
batchWg.Add(1)
go func(ctx context.Context, batchIdx int, batch []api.Partition, batchPartitionStartIdx int, params miner.SubmitWindowedPoStParams) {
defer batchWg.Done()
// Retry until we run out of sectors to prove.
for retries := 0; ; retries++ {
skipCount := uint64(0)
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
for partIdx, partition := range batch {
// TODO: Can do this in parallel
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
postSkipped := bitfield.New()
somethingToProve := false
// Retry until we run out of sectors to prove.
for retries := 0; ; retries++ {
skipCount := uint64(0)
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
var partitionWg sync.WaitGroup
var partitionLk sync.Mutex
for partIdx, partition := range batch {
// Get sectors info in parallel
partitionWg.Add(1)
go func(partIdx int, partition api.Partition) {
defer partitionWg.Done()
cbFailed := func(err error) {
batchAbort()
log.Warn("compute post batch:", batchIdx, "parttion:", partIdx, " failed:", err)
berr = multierror.Append(berr, err)
return
}
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
if err != nil {
cbFailed(xerrors.Errorf("removing faults from set of sectors to prove: %w", err))
return
}
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
if err != nil {
cbFailed(xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err))
return
}
good, err := s.checkSectors(ctx, toProve, ts.Key())
if err != nil {
cbFailed(xerrors.Errorf("checking sectors to skip: %w", err))
return
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
cbFailed(xerrors.Errorf("toProve - postSkipped: %w", err))
return
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
cbFailed(xerrors.Errorf("toProve - good: %w", err))
return
}
sc, err := skipped.Count()
if err != nil {
cbFailed(xerrors.Errorf("getting skipped sector count: %w", err))
return
}
partitionLk.Lock()
skipCount += sc
partitionLk.Unlock()
log.Infow("skipped sectors", "batch", batchIdx, "partition", partIdx, "sectors count", sc)
ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
cbFailed(xerrors.Errorf("batch: %d getting sorted sector info: %w", batchIdx, err))
return
}
if len(ssi) == 0 {
log.Warnf("getting sectors for proof batch: %d, sector info len: %d", batchIdx, len(ssi))
return
}
partitionLk.Lock()
sinfos = append(sinfos, ssi...)
partitions = append(partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
partitionLk.Unlock()
}(partIdx, partition)
}
partitionWg.Wait()
//return when any partition fault
if berr != nil {
return
}
if len(sinfos) == 0 {
// nothing to prove for this batch
break
}
// Generate proof
log.Infow("running window post",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount,
"batch", batchIdx)
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, xerrors.Errorf("removing faults from set of sectors to prove: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
berr = multierror.Append(berr, xerrors.Errorf("batch: %d get actor address: %w", batchIdx, err))
batchAbort()
return
}
good, err := s.checkSectors(ctx, toProve, ts.Key())
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, append(abi.PoStRandomness{}, rand...))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
berr = multierror.Append(berr, xerrors.Errorf("received no proofs back from generate window post"))
batchAbort()
return
}
headTs, err := s.api.ChainHead(ctx)
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("getting current head: %w", err))
batchAbort()
return
}
checkRand, err := s.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
if err != nil {
berr = multierror.Append(berr, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err))
batchAbort()
return
}
if !bytes.Equal(checkRand, rand) {
log.Warnw("windowpost randomness changed", "old", rand, "new", checkRand, "ts-height", ts.Height(), "challenge-height", di.Challenge, "tsk", ts.Key())
rand = checkRand
continue
}
// If we generated an incorrect proof, try again.
if correct, err := s.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(checkRand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil {
log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue
} else if !correct {
log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions
params.Proofs = postOut
break
}
good, err = bitfield.SubtractBitField(good, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
// Proof generation failed, so retry
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
berr = multierror.Append(berr, xerrors.Errorf("running window post failed: %w", err))
batchAbort()
return
}
// TODO: maybe mark these as faulty somewhere?
log.Warnw("generate window post skipped sectors", "sectors", ps, "error", err, "try", retries)
// Explicitly make sure we haven't aborted this PoSt
// (GenerateWindowPoSt may or may not check this).
// Otherwise, we could try to continue proving a
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
berr = multierror.Append(berr, ctx.Err())
return
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
skipCount += sc
ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
if len(ssi) == 0 {
continue
}
sinfos = append(sinfos, ssi...)
partitions = append(partitions, miner.PoStPartition{
Index: uint64(batchPartitionStartIdx + partIdx),
Skipped: skipped,
})
}
if len(sinfos) == 0 {
// nothing to prove for this batch
break
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
log.Warnf("nothing to prove for batch: %d", batchIdx)
return
}
posts[batchIdx] = params
// Generate proof
log.Infow("running window post",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
}(batchCtx, batchIdx, batch, batchPartitionStartIdx, params)
}
batchWg.Wait()
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, append(abi.PoStRandomness{}, rand...))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
headTs, err := s.api.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("getting current head: %w", err)
}
checkRand, err := s.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
if !bytes.Equal(checkRand, rand) {
log.Warnw("windowpost randomness changed", "old", rand, "new", checkRand, "ts-height", ts.Height(), "challenge-height", di.Challenge, "tsk", ts.Key())
rand = checkRand
continue
}
// If we generated an incorrect proof, try again.
if correct, err := s.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(checkRand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil {
log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue
} else if !correct {
log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions
params.Proofs = postOut
break
}
// Proof generation failed, so retry
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
return nil, xerrors.Errorf("running window post failed: %w", err)
}
// TODO: maybe mark these as faulty somewhere?
log.Warnw("generate window post skipped sectors", "sectors", ps, "error", err, "try", retries)
// Explicitly make sure we haven't aborted this PoSt
// (GenerateWindowPoSt may or may not check this).
// Otherwise, we could try to continue proving a
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
return nil, ctx.Err()
}
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
}
// Nothing to prove for this batch, try the next batch
if !somethingToProve {
continue
}
posts = append(posts, params)
if berr != nil {
return nil, berr
}
return posts, nil