Making sealing logic work with multiple seal proof types

This commit is contained in:
Łukasz Magiera 2020-11-04 21:29:08 +01:00
parent 470538b082
commit 6bea9dd178
39 changed files with 400 additions and 383 deletions

View File

@ -373,17 +373,17 @@ type WorkerStruct struct {
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"`
SealPreCommit2 func(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"`
SealCommit2 func(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"`
FinalizeSector func(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"`
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, abi.SectorID, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
AddPiece func(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"`
SealPreCommit2 func(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"`
SealCommit2 func(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"`
FinalizeSector func(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"`
ReleaseUnsealed func(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"`
MoveStorage func(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
UnsealPiece func(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"`
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
@ -1513,47 +1513,47 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return w.Internal.Info(ctx)
}
func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (w *WorkerStruct) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
}
func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return w.Internal.SealPreCommit2(ctx, sector, pc1o)
}
func (w *WorkerStruct) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
func (w *WorkerStruct) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return w.Internal.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
}
func (w *WorkerStruct) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
func (w *WorkerStruct) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return w.Internal.SealCommit2(ctx, sector, c1o)
}
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return w.Internal.FinalizeSector(ctx, sector, keepUnsealed)
}
func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree)
}
func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
func (w *WorkerStruct) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
return w.Internal.MoveStorage(ctx, sector, types)
}
func (w *WorkerStruct) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, c cid.Cid) (storiface.CallID, error) {
func (w *WorkerStruct) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, c cid.Cid) (storiface.CallID, error) {
return w.Internal.UnsealPiece(ctx, sector, offset, size, ticket, c)
}
func (w *WorkerStruct) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
func (w *WorkerStruct) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return w.Internal.ReadPiece(ctx, sink, sector, offset, size)
}
func (w *WorkerStruct) Fetch(ctx context.Context, id abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
}

View File

@ -90,7 +90,7 @@ func init() {
addExample(&pid)
addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1)
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
addExample(abi.RegisteredPoStProof_StackedDrgWindow32GiBV1)
addExample(abi.ChainEpoch(10101))
addExample(crypto.SigTypeBLS)

View File

@ -43,10 +43,6 @@ const UpgradeCalicoHeight = 999999
func init() {
policy.SetConsensusMinerMinPower(abi.NewStoragePower(10 << 40))
policy.SetSupportedProofTypes(
abi.RegisteredSealProof_StackedDrg32GiBV1,
abi.RegisteredSealProof_StackedDrg64GiBV1,
)
if os.Getenv("LOTUS_USE_TEST_ADDRESSES") != "1" {
SetAddressNetwork(address.Mainnet)

View File

@ -84,8 +84,8 @@ func VersionForType(nodeType NodeType) (Version, error) {
// semver versions of the rpc api exposed
var (
FullAPIVersion = newVer(0, 17, 0)
MinerAPIVersion = newVer(0, 17, 0)
WorkerAPIVersion = newVer(0, 16, 0)
MinerAPIVersion = newVer(0, 18, 0)
WorkerAPIVersion = newVer(0, 17, 0)
)
//nolint:varcheck,deadcode

View File

@ -23,8 +23,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
power0 "github.com/filecoin-project/specs-actors/actors/builtin/power"
@ -101,11 +99,12 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
i := i
m := m
spt, err := ffiwrapper.SealProofTypeFromSectorSize(m.SectorSize)
if err != nil {
return cid.Undef, err
if len(m.Sectors) == 0 {
return cid.Undef, xerrors.Errorf("genesis miners must have at least 1 presealed sector")
}
spt := m.Sectors[0].ProofType
{
constructorParams := &power0.CreateMinerParams{
Owner: m.Worker,

View File

@ -207,12 +207,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra
return nil, xerrors.Errorf("getting miner info: %w", err)
}
spt, err := ffiwrapper.SealProofTypeFromSectorSize(info.SectorSize)
if err != nil {
return nil, xerrors.Errorf("getting seal proof type: %w", err)
}
wpt, err := spt.RegisteredWinningPoStProof()
wpt, err := info.SealProofType.RegisteredWinningPoStProof()
if err != nil {
return nil, xerrors.Errorf("getting window proof type: %w", err)
}
@ -252,7 +247,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra
out := make([]builtin.SectorInfo, len(sectors))
for i, sinfo := range sectors {
out[i] = builtin.SectorInfo{
SealProof: spt,
SealProof: sinfo.SealProof,
SectorNumber: sinfo.SectorNumber,
SealedCID: sinfo.SealedCID,
}

View File

@ -42,10 +42,6 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
return nil, nil, err
}
cfg := &ffiwrapper.Config{
SealProofType: spt,
}
if err := os.MkdirAll(sbroot, 0775); err != nil { //nolint:gosec
return nil, nil, err
}
@ -56,7 +52,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
Root: sbroot,
}
sb, err := ffiwrapper.New(sbfs, cfg)
sb, err := ffiwrapper.New(sbfs)
if err != nil {
return nil, nil, err
}

View File

@ -9,17 +9,18 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
// FaultTracker TODO: Track things more actively
type FaultTracker interface {
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error)
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef) ([]abi.SectorID, error)
}
// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error) {
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef) ([]abi.SectorID, error) {
var bad []abi.SectorID
ssize, err := pp.SectorSize()
@ -33,27 +34,27 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
ctx, cancel := context.WithCancel(ctx)
defer cancel()
locked, err := m.index.StorageTryLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone)
locked, err := m.index.StorageTryLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector, "sealed")
bad = append(bad, sector)
bad = append(bad, sector.ID)
return nil
}
lp, _, err := m.localStore.AcquireSector(ctx, sector, ssize, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
lp, _, err := m.localStore.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad = append(bad, sector)
bad = append(bad, sector.ID)
return nil
}
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)
bad = append(bad, sector)
bad = append(bad, sector.ID)
return nil
}
@ -69,14 +70,14 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
st, err := os.Stat(p)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "err", err)
bad = append(bad, sector)
bad = append(bad, sector.ID)
return nil
}
if sz != 0 {
if st.Size() != int64(ssize)*sz {
log.Warnw("CheckProvable Sector FAULT: sector file is wrong size", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "size", st.Size(), "expectSize", int64(ssize)*sz)
bad = append(bad, sector)
bad = append(bad, sector.ID)
return nil
}
}

View File

@ -7,6 +7,7 @@ import (
"sync"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -23,7 +24,7 @@ type Provider struct {
waitSector map[sectorFile]chan struct{}
}
func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
@ -37,7 +38,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
done := func() {}
out := storiface.SectorPaths{
ID: id,
ID: id.ID,
}
for _, fileType := range storiface.PathTypes {
@ -49,10 +50,10 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id, fileType}]
ch, found := b.waitSector[sectorFile{id.ID, fileType}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id, fileType}] = ch
b.waitSector[sectorFile{id.ID, fileType}] = ch
}
b.lk.Unlock()
@ -63,7 +64,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id))
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID))
prevDone := done
done = func() {

View File

@ -6,17 +6,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)
type Config struct {
SealProofType abi.RegisteredSealProof
_ struct{} // guard against nameless init
}
func sizeFromConfig(cfg Config) (abi.SectorSize, error) {
return cfg.SealProofType.SectorSize()
}
func SealProofTypeFromSectorSize(ssize abi.SectorSize) (abi.RegisteredSealProof, error) {
func SealProofTypeFromSectorSizea(ssize abi.SectorSize) (abi.RegisteredSealProof, error) {
switch ssize {
case 2 << 10:
return abi.RegisteredSealProof_StackedDrg2KiBV1, nil

View File

@ -1,16 +1,12 @@
package ffiwrapper
import (
"github.com/filecoin-project/go-state-types/abi"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("ffiwrapper")
type Sealer struct {
sealProofType abi.RegisteredSealProof
ssize abi.SectorSize // a function of sealProofType and postProofType
sectors SectorProvider
stopping chan struct{}
}
@ -18,11 +14,3 @@ type Sealer struct {
func (sb *Sealer) Stop() {
close(sb.stopping)
}
func (sb *Sealer) SectorSize() abi.SectorSize {
return sb.ssize
}
func (sb *Sealer) SealProofType() abi.RegisteredSealProof {
return sb.sealProofType
}

View File

@ -27,16 +27,8 @@ import (
var _ Storage = &Sealer{}
func New(sectors SectorProvider, cfg *Config) (*Sealer, error) {
sectorSize, err := sizeFromConfig(*cfg)
if err != nil {
return nil, err
}
func New(sectors SectorProvider) (*Sealer, error) {
sb := &Sealer{
sealProofType: cfg.SealProofType,
ssize: sectorSize,
sectors: sectors,
stopping: make(chan struct{}),
@ -45,25 +37,29 @@ func New(sectors SectorProvider, cfg *Config) (*Sealer, error) {
return sb, nil
}
func (sb *Sealer) NewSector(ctx context.Context, sector abi.SectorID) error {
func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error {
// TODO: Allocate the sector here instead of in addpiece
return nil
}
func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
}
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return abi.PieceInfo{}, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}
var err error
var done func()
var stagedFile *partialFile
@ -135,7 +131,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
break
}
c, err := sb.pieceCid(buf[:read])
c, err := sb.pieceCid(sector.ProofType, buf[:read])
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err)
}
@ -162,7 +158,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return pieceCids[0], nil
}
pieceCID, err := ffi.GenerateUnsealedCID(sb.sealProofType, pieceCids)
pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
@ -178,13 +174,13 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
}, nil
}
func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) {
func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err)
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, abi.UnpaddedPieceSize(len(in)))
pieceCID, err := ffi.GeneratePieceCIDFromFile(spt, prf, abi.UnpaddedPieceSize(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err)
}
@ -194,8 +190,12 @@ func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) {
return pieceCID, werr()
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
// try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
@ -317,12 +317,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
// </eww>
// TODO: This may be possible to do in parallel
err = ffi.UnsealRange(sb.sealProofType,
err = ffi.UnsealRange(sector.ProofType,
srcPaths.Cache,
sealed,
opw,
sector.Number,
sector.Miner,
sector.ID.Number,
sector.ID.Miner,
randomness,
commd,
uint64(at.Unpadded()),
@ -356,14 +356,18 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
return nil
}
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
path, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
if err != nil {
return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
defer done()
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return false, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
if err != nil {
@ -408,7 +412,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
return true, nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
@ -443,29 +447,33 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ussize := abi.PaddedPieceSize(sb.ssize).Unpadded()
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return nil, err
}
ussize := abi.PaddedPieceSize(ssize).Unpadded()
if sum != ussize {
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sb.sealProofType,
sector.ProofType,
paths.Cache,
paths.Unsealed,
paths.Sealed,
sector.Number,
sector.Miner,
sector.ID.Number,
sector.ID.Miner,
ticket,
pieces,
)
if err != nil {
return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err)
return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return p1o, nil
}
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
@ -474,7 +482,7 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err)
return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return storage.SectorCids{
@ -483,40 +491,45 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}, nil
}
func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
func (sb *Sealer) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("acquire sector paths: %w", err)
}
defer done()
output, err := ffi.SealCommitPhase1(
sb.sealProofType,
sector.ProofType,
cids.Sealed,
cids.Unsealed,
paths.Cache,
paths.Sealed,
sector.Number,
sector.Miner,
sector.ID.Number,
sector.ID.Miner,
ticket,
seed,
pieces,
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed)
log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.ID.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return output, nil
}
func (sb *Sealer) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (storage.Proof, error) {
return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner)
func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storage.Proof, error) {
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
if len(keepUnsealed) > 0 {
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
sr := pieceRun(0, maxPieceSize)
@ -580,10 +593,10 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
}
defer done()
return ffi.ClearCache(uint64(sb.ssize), paths.Cache)
return ffi.ClearCache(uint64(ssize), paths.Cache)
}
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
// This call is meant to mark storage as 'freeable'. Given that unsealing is
// very expensive, we don't remove data as soon as we can - instead we only
// do that when we don't have free space for data that really needs it
@ -593,7 +606,7 @@ func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safe
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) Remove(ctx context.Context, sector abi.SectorID) error {
func (sb *Sealer) Remove(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer") // happens in localworker
}

View File

@ -29,8 +29,8 @@ type Storage interface {
storage.Prover
StorageSealer
UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}
type Verifier interface {
@ -44,7 +44,7 @@ type Verifier interface {
type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}

View File

@ -11,6 +11,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
@ -74,12 +75,15 @@ func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorIn
continue
}
sid := abi.SectorID{Miner: mid, Number: s.SectorNumber}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, 0, storiface.PathStorage)
if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err)
skipped = append(skipped, sid)
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)

View File

@ -47,9 +47,7 @@ type Worker interface {
}
type SectorManager interface {
SectorSize() abi.SectorSize
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer
storage.Prover
@ -61,8 +59,6 @@ type WorkerID uuid.UUID // worker session UUID
var ClosedWorkerID = uuid.UUID{}
type Manager struct {
scfg *ffiwrapper.Config
ls stores.LocalStorage
storage *stores.Remote
localStore *stores.Local
@ -105,13 +101,13 @@ type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil {
return nil, err
}
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, spt: cfg.SealProofType}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -119,15 +115,13 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
m := &Manager{
scfg: cfg,
ls: ls,
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Local: lstor},
index: si,
sched: newScheduler(cfg.SealProofType),
sched: newScheduler(),
Prover: prover,
@ -162,7 +156,6 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
}
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: localTasks,
}, stor, lstor, si, m, wss))
if err != nil {
@ -198,23 +191,18 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.remoteHnd.ServeHTTP(w, r)
}
func (m *Manager) SectorSize() abi.SectorSize {
sz, _ := m.scfg.SealProofType.SectorSize()
return sz
}
func schedNop(context.Context, Worker) error {
return nil
}
func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err
}
}
func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
return func(ctx context.Context, w Worker) error {
r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
if err != nil {
@ -227,19 +215,19 @@ func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storifac
}
}
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTNone); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
return
}
// passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false)
best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
return
@ -249,7 +237,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect
if foundUnsealed { // append to existing
// There is unsealed sector, see if we can read from it
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
@ -262,7 +250,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect
return
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
if err != nil {
return err
@ -273,7 +261,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}
@ -302,7 +290,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return err
}
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
@ -317,16 +305,16 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return nil
}
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error {
log.Warnf("stub NewSector")
return nil
}
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTUnsealed); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTUnsealed); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
@ -335,7 +323,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
if len(existingPieces) == 0 { // new
selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
} else { // use existing
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
}
var out abi.PieceInfo
@ -353,7 +341,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return out, err
}
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
func (m *Manager) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -380,7 +368,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return out, waitErr
}
if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil {
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
@ -404,7 +392,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return out, waitErr
}
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) {
func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -431,11 +419,11 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return out, waitErr
}
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, true)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
@ -453,7 +441,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return out, waitErr
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -480,14 +468,14 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return out, waitErr
}
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil {
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
@ -505,7 +493,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return out, waitErr
}
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
func (m *Manager) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out)
if err != nil {
return storage.Proof{}, xerrors.Errorf("getWork: %w", err)
@ -548,17 +536,17 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
return out, waitErr
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
unsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false)
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}
@ -568,7 +556,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
}
}
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
@ -601,28 +589,28 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
return nil
}
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
log.Warnw("ReleaseUnsealed todo")
return nil
}
func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
var err error
if rerr := m.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil {
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil {
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTCache, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil {
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}

View File

@ -20,7 +20,7 @@ func (q requestQueue) Less(i, j int) bool {
return q[i].taskType.Less(q[j].taskType)
}
return q[i].sector.Number < q[j].sector.Number // optimize minerActor.NewSectors bitfield
return q[i].sector.ID.Number < q[j].sector.ID.Number // optimize minerActor.NewSectors bitfield
}
func (q requestQueue) Swap(i, j int) {

View File

@ -314,4 +314,13 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
func init() {
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
ResourceTable[sealtasks.TTReadUnsealed] = ResourceTable[sealtasks.TTFetch]
// V1_1 is the same as V1
for _, m := range ResourceTable {
m[abi.RegisteredSealProof_StackedDrg2KiBV1_1] = m[abi.RegisteredSealProof_StackedDrg2KiBV1]
m[abi.RegisteredSealProof_StackedDrg8MiBV1_1] = m[abi.RegisteredSealProof_StackedDrg8MiBV1]
m[abi.RegisteredSealProof_StackedDrg512MiBV1_1] = m[abi.RegisteredSealProof_StackedDrg512MiBV1]
m[abi.RegisteredSealProof_StackedDrg32GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg32GiBV1]
m[abi.RegisteredSealProof_StackedDrg64GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg64GiBV1]
}
}

View File

@ -5,7 +5,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -14,23 +14,17 @@ import (
type readonlyProvider struct {
index stores.SectorIndex
stor *stores.Local
spt abi.RegisteredSealProof
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
if allocate != storiface.FTNone {
return storiface.SectorPaths{}, nil, xerrors.New("read-only storage")
}
ssize, err := l.spt.SectorSize()
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to determine sector size: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
// use TryLock to avoid blocking
locked, err := l.index.StorageTryLock(ctx, id, existing, storiface.FTNone)
locked, err := l.index.StorageTryLock(ctx, id.ID, existing, storiface.FTNone)
if err != nil {
cancel()
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
@ -40,7 +34,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
}
p, _, err := l.stor.AcquireSector(ctx, id, ssize, existing, allocate, sealing, storiface.AcquireMove)
p, _, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing, storiface.AcquireMove)
return p, cancel, err
}

View File

@ -11,6 +11,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -51,8 +52,6 @@ type WorkerSelector interface {
}
type scheduler struct {
spt abi.RegisteredSealProof
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
@ -122,7 +121,7 @@ type activeResources struct {
}
type workerRequest struct {
sector abi.SectorID
sector storage.SectorRef
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
@ -143,10 +142,8 @@ type workerResponse struct {
err error
}
func newScheduler(spt abi.RegisteredSealProof) *scheduler {
func newScheduler() *scheduler {
return &scheduler{
spt: spt,
workers: map[WorkerID]*workerHandle{},
schedule: make(chan *workerRequest),
@ -168,7 +165,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
}
}
func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
ret := make(chan workerResponse)
select {
@ -315,7 +312,7 @@ func (sh *scheduler) diag() SchedDiagInfo {
task := (*sh.schedQueue)[sqi]
out.Requests = append(out.Requests, SchedDiagRequestInfo{
Sector: task.sector,
Sector: task.sector.ID,
TaskType: task.taskType,
Priority: task.priority,
})
@ -378,7 +375,7 @@ func (sh *scheduler) trySched() {
}()
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
task.indexHeap = sqi
for wnd, windowRequest := range sh.openWindows {
@ -400,7 +397,7 @@ func (sh *scheduler) trySched() {
}
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker)
ok, err := task.sel.Ok(rpcCtx, task.taskType, task.sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.sel.Ok error: %+v", err)
@ -456,21 +453,21 @@ func (sh *scheduler) trySched() {
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) {
continue
}
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd)
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)
windows[wnd].allocated.add(wr, needRes)
// TODO: We probably want to re-sort acceptableWindows here based on new

View File

@ -294,7 +294,7 @@ func (sw *schedWorker) workerCompactWindows() {
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sw.sched.spt]
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) {
continue
}
@ -350,7 +350,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sw.sched.spt]
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
tidx = t
break
@ -364,7 +364,7 @@ assignLoop:
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.Number)
log.Debugf("assign worker sector %d", todo.sector.ID.Number)
err := sw.startProcessingTask(sw.taskDone, todo)
if err != nil {
@ -389,7 +389,7 @@ assignLoop:
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][sh.spt]
needRes := ResourceTable[req.taskType][req.sector.ProofType]
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)

View File

@ -46,7 +46,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
for _, request := range window.todo {
out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{
ID: storiface.UndefCall,
Sector: request.sector,
Sector: request.sector.ID,
Task: request.taskType,
RunWait: wi + 1,
Start: request.start,

View File

@ -2,6 +2,7 @@ package stores
import (
"encoding/json"
"github.com/filecoin-project/specs-storage/storage"
"io"
"net/http"
"os"
@ -73,7 +74,12 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
si := storage.SectorRef{
ID: id,
ProofType: 0,
}
paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)

View File

@ -2,15 +2,16 @@ package stores
import (
"context"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last
@ -18,7 +19,7 @@ type Store interface {
RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error
// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error
MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
}

View File

@ -14,6 +14,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -325,7 +326,12 @@ func (st *Local) reportStorage(ctx context.Context) {
}
}
func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
func (st *Local) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
ssize, err := sid.ProofType.SectorSize()
if err != nil {
return nil, err
}
st.localLk.Lock()
done := func() {}
@ -375,11 +381,16 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, ssize abi.Sector
return done, nil
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
func (st *Local) AcquireSector(ctx context.Context, sid storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
ssize, err := sid.ProofType.SectorSize()
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
st.localLk.RLock()
defer st.localLk.RUnlock()
@ -391,7 +402,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.
continue
}
si, err := st.index.StorageFindSector(ctx, sid, fileType, ssize, false)
si, err := st.index.StorageFindSector(ctx, sid.ID, fileType, ssize, false)
if err != nil {
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
continue
@ -407,7 +418,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.
continue
}
spath := p.sectorPath(sid, fileType)
spath := p.sectorPath(sid.ID, fileType)
storiface.SetPathByType(&out, fileType, spath)
storiface.SetPathByType(&storageIDs, fileType, string(info.ID))
@ -449,7 +460,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.
// TODO: Check free space
best = p.sectorPath(sid, fileType)
best = p.sectorPath(sid.ID, fileType)
bestID = si.ID
break
}
@ -578,13 +589,13 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa
return nil
}
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error {
dest, destIds, err := st.AcquireSector(ctx, s, ssize, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove)
func (st *Local) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
dest, destIds, err := st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
}
src, srcIds, err := st.AcquireSector(ctx, s, ssize, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
src, srcIds, err := st.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage: %w", err)
}
@ -616,7 +627,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect
log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore)
if err := st.index.StorageDropSector(ctx, ID(storiface.PathByType(srcIds, fileType)), s, fileType); err != nil {
if err := st.index.StorageDropSector(ctx, ID(storiface.PathByType(srcIds, fileType)), s.ID, fileType); err != nil {
return xerrors.Errorf("dropping source sector from index: %w", err)
}
@ -625,7 +636,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
}
if err := st.index.StorageDeclareSector(ctx, ID(storiface.PathByType(destIds, fileType)), s, fileType, true); err != nil {
if err := st.index.StorageDeclareSector(ctx, ID(storiface.PathByType(destIds, fileType)), s.ID, fileType, true); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(storiface.PathByType(destIds, fileType)), err)
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/hashicorp/go-multierror"
files "github.com/ipfs/go-ipfs-files"
@ -58,7 +59,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
@ -66,9 +67,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
for {
r.fetchLk.Lock()
c, locked := r.fetching[s]
c, locked := r.fetching[s.ID]
if !locked {
r.fetching[s] = make(chan struct{})
r.fetching[s.ID] = make(chan struct{})
r.fetchLk.Unlock()
break
}
@ -85,12 +86,12 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
defer func() {
r.fetchLk.Lock()
close(r.fetching[s])
delete(r.fetching, s)
close(r.fetching[s.ID])
delete(r.fetching, s.ID)
r.fetchLk.Unlock()
}()
paths, stores, err := r.local.AcquireSector(ctx, s, ssize, existing, allocate, pathType, op)
paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
}
@ -106,7 +107,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
}
}
apaths, ids, err := r.local.AcquireSector(ctx, s, ssize, storiface.FTNone, toFetch, pathType, op)
apaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
@ -116,7 +117,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
odt = storiface.FsOverheadFinalized
}
releaseStorage, err := r.local.Reserve(ctx, s, ssize, toFetch, ids, odt)
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
}
@ -134,7 +135,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
dest := storiface.PathByType(apaths, fileType)
storageID := storiface.PathByType(ids, fileType)
url, err := r.acquireFromRemote(ctx, s, fileType, dest)
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
@ -142,7 +143,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se
storiface.SetPathByType(&paths, fileType, dest)
storiface.SetPathByType(&stores, fileType, storageID)
if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == storiface.AcquireMove); err != nil {
if err := r.index.StorageDeclareSector(ctx, ID(storageID), s.ID, fileType, op == storiface.AcquireMove); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
continue
}
@ -281,14 +282,14 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
}
}
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error {
func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
// Make sure we have the data local
_, _, err := r.AcquireSector(ctx, s, ssize, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err)
}
return r.local.MoveStorage(ctx, s, ssize, types)
return r.local.MoveStorage(ctx, s, types)
}
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool) error {

View File

@ -77,17 +77,17 @@ var _ fmt.Stringer = &CallID{}
var UndefCall CallID
type WorkerCalls interface {
AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (CallID, error)
MoveStorage(ctx context.Context, sector abi.SectorID, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
ReadPiece(context.Context, io.Writer, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error)
Fetch(context.Context, abi.SectorID, SectorFileType, PathType, AcquireMode) (CallID, error)
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
ReadPiece(context.Context, io.Writer, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
}
type WorkerReturn interface {

View File

@ -20,7 +20,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
storage2 "github.com/filecoin-project/specs-storage/storage"
storage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -31,7 +31,6 @@ import (
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache}
type WorkerConfig struct {
SealProof abi.RegisteredSealProof
TaskTypes []sealtasks.TaskType
NoSwap bool
}
@ -40,7 +39,6 @@ type WorkerConfig struct {
type ExecutorFunc func() (ffiwrapper.Storage, error)
type LocalWorker struct {
scfg *ffiwrapper.Config
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
@ -64,9 +62,6 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
}
w := &LocalWorker{
scfg: &ffiwrapper.Config{
SealProofType: wcfg.SealProof,
},
storage: store,
localStore: local,
sindex: sindex,
@ -119,18 +114,13 @@ type localWorkerPathProvider struct {
op storiface.AcquireMode
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
ssize, err := l.w.scfg.SealProofType.SectorSize()
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing, l.op)
if err != nil {
return storiface.SectorPaths{}, nil, err
}
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, ssize, existing, allocate, sealing, l.op)
if err != nil {
return storiface.SectorPaths{}, nil, err
}
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, ssize, allocate, storageIDs, storiface.FSOverheadSeal)
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
@ -147,7 +137,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
sid := storiface.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == storiface.AcquireMove); err != nil {
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector.ID, fileType, l.op == storiface.AcquireMove); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
@ -155,7 +145,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
type ReturnType string
@ -222,9 +212,9 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
ci := storiface.CallID{
Sector: sector,
Sector: sector.ID,
ID: uuid.New(),
}
@ -297,7 +287,7 @@ func errstr(err error) string {
return ""
}
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error {
sb, err := l.executor()
if err != nil {
return err
@ -306,7 +296,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error
return sb.NewSector(ctx, sector)
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -317,7 +307,7 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
})
}
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
func (l *LocalWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, Fetch, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
if err == nil {
@ -328,16 +318,16 @@ func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType s
})
}
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, SealPreCommit1, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
{
// cleanup previous failed attempts if they exist
if err := l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTSealed, true); err != nil {
return nil, xerrors.Errorf("cleaning up sealed data: %w", err)
}
if err := l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTCache, true); err != nil {
return nil, xerrors.Errorf("cleaning up cache data: %w", err)
}
}
@ -351,7 +341,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t
})
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (storiface.CallID, error) {
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -362,7 +352,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p
})
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (storiface.CallID, error) {
func (l *LocalWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -373,7 +363,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick
})
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (storiface.CallID, error) {
func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -384,7 +374,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas
})
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) (storiface.CallID, error) {
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -396,7 +386,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k
}
if len(keepUnsealed) == 0 {
if err := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); err != nil {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true); err != nil {
return nil, xerrors.Errorf("removing unsealed data: %w", err)
}
}
@ -405,7 +395,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k
})
}
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) (storiface.CallID, error) {
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
return storiface.UndefCall, xerrors.Errorf("implement me")
}
@ -425,18 +415,13 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
return err
}
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
ssize, err := l.scfg.SealProofType.SectorSize()
if err != nil {
return nil, err
}
return nil, l.storage.MoveStorage(ctx, sector, ssize, types)
return nil, l.storage.MoveStorage(ctx, sector, types)
})
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
@ -447,11 +432,11 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
return nil, xerrors.Errorf("unsealing sector: %w", err)
}
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil {
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil {
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
@ -459,7 +444,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
})
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err

View File

@ -42,7 +42,7 @@ func (wt *workTracker) onDone(callID storiface.CallID) {
delete(wt.running, callID)
}
func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
@ -60,7 +60,7 @@ func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.Task
wt.running[callID] = trackedWork{
job: storiface.WorkerJob{
ID: callID,
Sector: sid,
Sector: sid.ID,
Task: task,
Start: time.Now(),
},
@ -99,39 +99,39 @@ type trackedWorker struct {
tracker *workTracker
}
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
}
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
}
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
}
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
}
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
}
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
}
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
}
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
}
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
}

View File

@ -14,7 +14,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)
@ -166,23 +165,14 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrBadSeed{xerrors.Errorf("seed has changed")}
}
ss, err := m.api.StateMinerSectorSize(ctx, m.maddr, tok)
if err != nil {
return &ErrApi{err}
}
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ss)
if err != nil {
return err
}
if *si.CommR != pci.Info.SealedCID {
log.Warn("on-chain sealed CID doesn't match!")
}
ok, err := m.verif.VerifySeal(proof2.SealVerifyInfo{
SectorID: m.minerSector(si.SectorNumber),
SectorID: m.minerSectorID(si.SectorNumber),
SealedCID: pci.Info.SealedCID,
SealProof: spt,
SealProof: pci.Info.SealProof,
Proof: proof,
Randomness: si.TicketValue,
InteractiveRandomness: si.SeedValue,

View File

@ -267,7 +267,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
*/
m.stats.updateSector(m.minerSector(state.SectorNumber), state.State)
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
switch state.State {
// Happy path
@ -394,6 +394,15 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
return xerrors.Errorf("getting the sealing delay: %w", err)
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof: %w", err)
}
ssize, err := spt.SectorSize()
if err != nil {
return err
}
m.unsealedInfoMap.lk.Lock()
defer m.unsealedInfoMap.lk.Unlock()
for _, sector := range trackedSectors {
@ -408,7 +417,9 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
// something's funky here, but probably safe to move on
log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber)
} else {
ui := UnsealedSectorInfo{}
ui := UnsealedSectorInfo{
ssize: ssize,
}
for _, p := range sector.Pieces {
if p.DealInfo != nil {
ui.numDeals++

View File

@ -6,9 +6,10 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
)
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}
@ -47,21 +48,31 @@ func (m *Sealing) PledgeSector() error {
// this, as we run everything here async, and it's cancelled when the
// command exits
size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
spt, err := m.currentSealProof(ctx)
if err != nil {
log.Errorf("%+v", err)
return
}
size, err := spt.SectorSize()
if err != nil {
log.Errorf("%+v", err)
return
}
sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
sectorID := m.minerSector(sid)
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
log.Errorf("%+v", err)
return
}
pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, size)
pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded())
if err != nil {
log.Errorf("%+v", err)
return
@ -75,7 +86,7 @@ func (m *Sealing) PledgeSector() error {
}
}
if err := m.newSectorCC(sid, ps); err != nil {
if err := m.newSectorCC(ctx, sid, ps); err != nil {
log.Errorf("%+v", err)
return
}

View File

@ -8,13 +8,15 @@ import (
"sync"
"time"
"github.com/filecoin-project/go-state-types/network"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-address"
padreader "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
@ -53,6 +55,7 @@ type SealingAPI interface {
StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error)
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
@ -105,6 +108,7 @@ type UnsealedSectorInfo struct {
// stored should always equal sum of pieceSizes.Padded()
stored abi.PaddedPieceSize
pieceSizes []abi.UnpaddedPieceSize
ssize abi.SectorSize
}
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing {
@ -151,19 +155,30 @@ func (m *Sealing) Run(ctx context.Context) error {
func (m *Sealing) Stop(ctx context.Context) error {
return m.sectors.Stop(ctx)
}
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid)
if (padreader.PaddedSize(uint64(size))) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
if size > abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() {
sp, err := m.currentSealProof(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return 0, 0, err
}
if size > abi.PaddedPieceSize(ssize).Unpadded() {
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
}
m.unsealedInfoMap.lk.Lock()
sid, pads, err := m.getSectorAndPadding(size)
sid, pads, err := m.getSectorAndPadding(ctx, size)
if err != nil {
m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
@ -185,7 +200,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
}
startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize())
startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize)
m.unsealedInfoMap.lk.Unlock()
@ -201,7 +216,16 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
// Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
log.Infof("Adding piece to sector %d", sectorID)
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
sp, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return err
}
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
if err != nil {
return xerrors.Errorf("writing piece: %w", err)
}
@ -224,6 +248,7 @@ func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size
numDeals: num,
stored: ui.stored + piece.Piece.Size,
pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()),
ssize: ssize,
}
return nil
@ -257,16 +282,16 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
}
// Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
ss := abi.PaddedPieceSize(m.sealer.SectorSize())
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
for k, v := range m.unsealedInfoMap.infos {
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
if v.stored+size.Padded()+padLength <= ss {
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
return k, pads, nil
}
}
ns, err := m.newDealSector()
ns, ssize, err := m.newDealSector(ctx)
if err != nil {
return 0, nil, err
}
@ -275,23 +300,24 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
numDeals: 0,
stored: 0,
pieceSizes: nil,
ssize: ssize,
}
return ns, nil, nil
}
// newDealSector creates a new sector for deal storage
func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
// First make sure we don't have too many 'open' sectors
cfg, err := m.getConfig()
if err != nil {
return 0, xerrors.Errorf("getting config: %w", err)
return 0, 0, xerrors.Errorf("getting config: %w", err)
}
if cfg.MaxSealingSectorsForDeals > 0 {
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
return 0, ErrTooManySectorsSealing
return 0, 0, ErrTooManySectorsSealing
}
}
@ -338,36 +364,36 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
}
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
}
// Now actually create a new sector
sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(sid))
err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
return 0, xerrors.Errorf("bad sector size: %w", err)
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
}
log.Infof("Creating sector %d", sid)
err = m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: rt,
SectorType: spt,
})
if err != nil {
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err)
}
cf, err := m.getConfig()
if err != nil {
return 0, xerrors.Errorf("getting the sealing delay: %w", err)
return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err)
}
if cf.WaitDealsDelay > 0 {
@ -380,25 +406,42 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
}()
}
return sid, nil
ssize, err := spt.SectorSize()
return sid, ssize, err
}
// newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error {
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("bad sector size: %w", err)
return xerrors.Errorf("getting current seal proof type: %w", err)
}
log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
Pieces: pieces,
SectorType: rt,
SectorType: spt,
})
}
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {
return 0, err
}
return mi.SealProofType, nil
}
func (m *Sealing) minerSector(spt abi.RegisteredSealProof, num abi.SectorNumber) storage.SectorRef {
return storage.SectorRef{
ID: m.minerSectorID(num),
ProofType: spt,
}
}
func (m *Sealing) minerSectorID(num abi.SectorNumber) abi.SectorID {
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)

View File

@ -32,7 +32,7 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf
}
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorNumber)); err != nil {
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
return ctx.Send(SectorRemoveFailed{err})
}

View File

@ -31,7 +31,12 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
allocated += piece.Piece.Size.Unpadded()
}
ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return err
}
ubytes := abi.PaddedPieceSize(ssize).Unpadded()
if allocated > ubytes {
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
@ -46,7 +51,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
}
fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}
@ -148,7 +153,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
// process has just restarted and the worker had the result ready)
}
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
@ -159,7 +164,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
}
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.PreCommit1Out)
cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
if err != nil {
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
}
@ -386,12 +391,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
}
proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), c2in)
proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
@ -492,7 +497,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil {
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
@ -503,7 +508,7 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf
// TODO: track sector health / expiration
log.Infof("Proving sector %d", sector.SectorNumber)
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil {
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil {
log.Error(err)
}

2
go.mod
View File

@ -41,7 +41,7 @@ require (
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/specs-actors v0.9.13
github.com/filecoin-project/specs-actors/v2 v2.3.0
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796
github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-kit/kit v0.10.0

2
go.sum
View File

@ -293,6 +293,8 @@ github.com/filecoin-project/specs-actors/v2 v2.3.0 h1:V7lHeF2ylfFi84F4y80u5FE4Bp
github.com/filecoin-project/specs-actors/v2 v2.3.0/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796 h1:dJsTPWpG2pcTeojO2pyn0c6l+x/3MZYCBgo/9d11JEk=
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27 h1:GLr3UKAqSjFjm5TimjLw6rRXkiZ2gPkrPv6xWsYw66Y=
github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=

View File

@ -141,11 +141,6 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) {
return nil, xerrors.New("data doesn't fit in a sector")
}
@ -171,7 +166,7 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart),
Price: params.EpochPrice,
Collateral: params.ProviderCollateral,
Rt: rt,
Rt: abi.RegisteredSealProof_StackedDrg32GiBV1_1, // all proof types have the same D tree
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
StoreID: storeID,
@ -647,7 +642,7 @@ func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Addre
func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
// Hard-code the sector size to 32GiB, because:
// Hard-code the sector type to 32GiBV1_1, because:
// - pieceio.GeneratePieceCommitment requires a RegisteredSealProof
// - commP itself is sector-size independent, with rather low probability of that changing
// ( note how the final rust call is identical for every RegSP type )
@ -655,12 +650,7 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet
//
// IF/WHEN this changes in the future we will have to be able to calculate
// "old style" commP, and thus will need to introduce a version switch or similar
arbitrarySectorSize := abi.SectorSize(32 << 30)
rt, err := ffiwrapper.SealProofTypeFromSectorSize(arbitrarySectorSize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
arbitraryProofType := abi.RegisteredSealProof_StackedDrg32GiBV1_1
rdr, err := os.Open(inpath)
if err != nil {
@ -673,7 +663,7 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet
return nil, err
}
commP, pieceSize, err := pieceio.GeneratePieceCommitment(rt, rdr, uint64(stat.Size()))
commP, pieceSize, err := pieceio.GeneratePieceCommitment(arbitraryProofType, rdr, uint64(stat.Size()))
if err != nil {
return nil, xerrors.Errorf("computing commP failed: %w", err)

View File

@ -125,11 +125,6 @@ func ProofsConfig(maddr dtypes.MinerAddress, fnapi lapi.FullNode) (*ffiwrapper.C
return nil, err
}
spt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
sb := &ffiwrapper.Config{
SealProofType: spt,
}

View File

@ -214,12 +214,7 @@ func NewWinningPoStProver(api api.FullNode, prover storage.Prover, verifier ffiw
return nil, xerrors.Errorf("getting sector size: %w", err)
}
spt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)
if err != nil {
return nil, err
}
wpt, err := spt.RegisteredWinningPoStProof()
wpt, err := mi.SealProofType.RegisteredWinningPoStProof()
if err != nil {
return nil, err
}