diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 08b3ba9a9..5b0ffcd6e 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -373,17 +373,17 @@ type WorkerStruct struct { Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"` - AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"` - SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"` - SealPreCommit2 func(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"` - SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"` - SealCommit2 func(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"` - FinalizeSector func(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"` - ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"` - MoveStorage func(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"` - UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"` - ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"` - Fetch func(context.Context, abi.SectorID, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` + AddPiece func(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"` + SealPreCommit1 func(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"` + SealPreCommit2 func(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"` + SealCommit1 func(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"` + SealCommit2 func(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"` + FinalizeSector func(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"` + ReleaseUnsealed func(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"` + MoveStorage func(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"` + UnsealPiece func(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"` + ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"` + Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` @@ -1513,47 +1513,47 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) { return w.Internal.Info(ctx) } -func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { +func (w *WorkerStruct) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) } -func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { +func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces) } -func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) { +func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { return w.Internal.SealPreCommit2(ctx, sector, pc1o) } -func (w *WorkerStruct) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { +func (w *WorkerStruct) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { return w.Internal.SealCommit1(ctx, sector, ticket, seed, pieces, cids) } -func (w *WorkerStruct) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) { +func (w *WorkerStruct) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { return w.Internal.SealCommit2(ctx, sector, c1o) } -func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) { +func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { return w.Internal.FinalizeSector(ctx, sector, keepUnsealed) } -func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) { +func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) { return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree) } -func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) { +func (w *WorkerStruct) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) { return w.Internal.MoveStorage(ctx, sector, types) } -func (w *WorkerStruct) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, c cid.Cid) (storiface.CallID, error) { +func (w *WorkerStruct) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, c cid.Cid) (storiface.CallID, error) { return w.Internal.UnsealPiece(ctx, sector, offset, size, ticket, c) } -func (w *WorkerStruct) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { +func (w *WorkerStruct) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { return w.Internal.ReadPiece(ctx, sink, sector, offset, size) } -func (w *WorkerStruct) Fetch(ctx context.Context, id abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { +func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { return w.Internal.Fetch(ctx, id, fileType, ptype, am) } diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 3eaa6359e..e82ca882f 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -90,7 +90,7 @@ func init() { addExample(&pid) addExample(bitfield.NewFromSet([]uint64{5})) - addExample(abi.RegisteredSealProof_StackedDrg32GiBV1) + addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1) addExample(abi.RegisteredPoStProof_StackedDrgWindow32GiBV1) addExample(abi.ChainEpoch(10101)) addExample(crypto.SigTypeBLS) diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 9d5b643b7..895e0b30d 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -43,10 +43,6 @@ const UpgradeCalicoHeight = 999999 func init() { policy.SetConsensusMinerMinPower(abi.NewStoragePower(10 << 40)) - policy.SetSupportedProofTypes( - abi.RegisteredSealProof_StackedDrg32GiBV1, - abi.RegisteredSealProof_StackedDrg64GiBV1, - ) if os.Getenv("LOTUS_USE_TEST_ADDRESSES") != "1" { SetAddressNetwork(address.Mainnet) diff --git a/build/version.go b/build/version.go index 0e63b2a27..65eb5be63 100644 --- a/build/version.go +++ b/build/version.go @@ -84,8 +84,8 @@ func VersionForType(nodeType NodeType) (Version, error) { // semver versions of the rpc api exposed var ( FullAPIVersion = newVer(0, 17, 0) - MinerAPIVersion = newVer(0, 17, 0) - WorkerAPIVersion = newVer(0, 16, 0) + MinerAPIVersion = newVer(0, 18, 0) + WorkerAPIVersion = newVer(0, 17, 0) ) //nolint:varcheck,deadcode diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index be83a8711..877f35397 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -23,8 +23,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" power0 "github.com/filecoin-project/specs-actors/actors/builtin/power" @@ -101,11 +99,12 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid i := i m := m - spt, err := ffiwrapper.SealProofTypeFromSectorSize(m.SectorSize) - if err != nil { - return cid.Undef, err + if len(m.Sectors) == 0 { + return cid.Undef, xerrors.Errorf("genesis miners must have at least 1 presealed sector") } + spt := m.Sectors[0].ProofType + { constructorParams := &power0.CreateMinerParams{ Owner: m.Worker, diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 19fb5f35b..fb0b91378 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -207,12 +207,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra return nil, xerrors.Errorf("getting miner info: %w", err) } - spt, err := ffiwrapper.SealProofTypeFromSectorSize(info.SectorSize) - if err != nil { - return nil, xerrors.Errorf("getting seal proof type: %w", err) - } - - wpt, err := spt.RegisteredWinningPoStProof() + wpt, err := info.SealProofType.RegisteredWinningPoStProof() if err != nil { return nil, xerrors.Errorf("getting window proof type: %w", err) } @@ -252,7 +247,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra out := make([]builtin.SectorInfo, len(sectors)) for i, sinfo := range sectors { out[i] = builtin.SectorInfo{ - SealProof: spt, + SealProof: sinfo.SealProof, SectorNumber: sinfo.SectorNumber, SealedCID: sinfo.SealedCID, } diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index ab8e5a52a..2dc10ed8b 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -42,10 +42,6 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect return nil, nil, err } - cfg := &ffiwrapper.Config{ - SealProofType: spt, - } - if err := os.MkdirAll(sbroot, 0775); err != nil { //nolint:gosec return nil, nil, err } @@ -56,7 +52,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect Root: sbroot, } - sb, err := ffiwrapper.New(sbfs, cfg) + sb, err := ffiwrapper.New(sbfs) if err != nil { return nil, nil, err } diff --git a/extern/sector-storage/faults.go b/extern/sector-storage/faults.go index c4e1364ad..6f0dcfa13 100644 --- a/extern/sector-storage/faults.go +++ b/extern/sector-storage/faults.go @@ -9,17 +9,18 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) // FaultTracker TODO: Track things more actively type FaultTracker interface { - CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error) + CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef) ([]abi.SectorID, error) } // CheckProvable returns unprovable sectors -func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []abi.SectorID) ([]abi.SectorID, error) { +func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef) ([]abi.SectorID, error) { var bad []abi.SectorID ssize, err := pp.SectorSize() @@ -33,27 +34,27 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, ctx, cancel := context.WithCancel(ctx) defer cancel() - locked, err := m.index.StorageTryLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone) + locked, err := m.index.StorageTryLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTNone) if err != nil { return xerrors.Errorf("acquiring sector lock: %w", err) } if !locked { log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector, "sealed") - bad = append(bad, sector) + bad = append(bad, sector.ID) return nil } - lp, _, err := m.localStore.AcquireSector(ctx, sector, ssize, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + lp, _, err := m.localStore.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err) - bad = append(bad, sector) + bad = append(bad, sector.ID) return nil } if lp.Sealed == "" || lp.Cache == "" { log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache) - bad = append(bad, sector) + bad = append(bad, sector.ID) return nil } @@ -69,14 +70,14 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, st, err := os.Stat(p) if err != nil { log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "err", err) - bad = append(bad, sector) + bad = append(bad, sector.ID) return nil } if sz != 0 { if st.Size() != int64(ssize)*sz { log.Warnw("CheckProvable Sector FAULT: sector file is wrong size", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "size", st.Size(), "expectSize", int64(ssize)*sz) - bad = append(bad, sector) + bad = append(bad, sector.ID) return nil } } diff --git a/extern/sector-storage/ffiwrapper/basicfs/fs.go b/extern/sector-storage/ffiwrapper/basicfs/fs.go index 7ae303d9c..a833f728c 100644 --- a/extern/sector-storage/ffiwrapper/basicfs/fs.go +++ b/extern/sector-storage/ffiwrapper/basicfs/fs.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) @@ -23,7 +24,7 @@ type Provider struct { waitSector map[sectorFile]chan struct{} } -func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { +func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint return storiface.SectorPaths{}, nil, err } @@ -37,7 +38,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing done := func() {} out := storiface.SectorPaths{ - ID: id, + ID: id.ID, } for _, fileType := range storiface.PathTypes { @@ -49,10 +50,10 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing if b.waitSector == nil { b.waitSector = map[sectorFile]chan struct{}{} } - ch, found := b.waitSector[sectorFile{id, fileType}] + ch, found := b.waitSector[sectorFile{id.ID, fileType}] if !found { ch = make(chan struct{}, 1) - b.waitSector[sectorFile{id, fileType}] = ch + b.waitSector[sectorFile{id.ID, fileType}] = ch } b.lk.Unlock() @@ -63,7 +64,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing return storiface.SectorPaths{}, nil, ctx.Err() } - path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id)) + path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID)) prevDone := done done = func() { diff --git a/extern/sector-storage/ffiwrapper/config.go b/extern/sector-storage/ffiwrapper/config.go index ca32b1191..a7669b8bd 100644 --- a/extern/sector-storage/ffiwrapper/config.go +++ b/extern/sector-storage/ffiwrapper/config.go @@ -6,17 +6,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" ) -type Config struct { - SealProofType abi.RegisteredSealProof - - _ struct{} // guard against nameless init -} - -func sizeFromConfig(cfg Config) (abi.SectorSize, error) { - return cfg.SealProofType.SectorSize() -} - -func SealProofTypeFromSectorSize(ssize abi.SectorSize) (abi.RegisteredSealProof, error) { +func SealProofTypeFromSectorSizea(ssize abi.SectorSize) (abi.RegisteredSealProof, error) { switch ssize { case 2 << 10: return abi.RegisteredSealProof_StackedDrg2KiBV1, nil diff --git a/extern/sector-storage/ffiwrapper/sealer.go b/extern/sector-storage/ffiwrapper/sealer.go index c1b558d9a..39cb8fa1b 100644 --- a/extern/sector-storage/ffiwrapper/sealer.go +++ b/extern/sector-storage/ffiwrapper/sealer.go @@ -1,16 +1,12 @@ package ffiwrapper import ( - "github.com/filecoin-project/go-state-types/abi" logging "github.com/ipfs/go-log/v2" ) var log = logging.Logger("ffiwrapper") type Sealer struct { - sealProofType abi.RegisteredSealProof - ssize abi.SectorSize // a function of sealProofType and postProofType - sectors SectorProvider stopping chan struct{} } @@ -18,11 +14,3 @@ type Sealer struct { func (sb *Sealer) Stop() { close(sb.stopping) } - -func (sb *Sealer) SectorSize() abi.SectorSize { - return sb.ssize -} - -func (sb *Sealer) SealProofType() abi.RegisteredSealProof { - return sb.sealProofType -} diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index b48b0bfd5..0887bc329 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -27,16 +27,8 @@ import ( var _ Storage = &Sealer{} -func New(sectors SectorProvider, cfg *Config) (*Sealer, error) { - sectorSize, err := sizeFromConfig(*cfg) - if err != nil { - return nil, err - } - +func New(sectors SectorProvider) (*Sealer, error) { sb := &Sealer{ - sealProofType: cfg.SealProofType, - ssize: sectorSize, - sectors: sectors, stopping: make(chan struct{}), @@ -45,25 +37,29 @@ func New(sectors SectorProvider, cfg *Config) (*Sealer, error) { return sb, nil } -func (sb *Sealer) NewSector(ctx context.Context, sector abi.SectorID) error { +func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error { // TODO: Allocate the sector here instead of in addpiece return nil } -func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { +func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { var offset abi.UnpaddedPieceSize for _, size := range existingPieceSizes { offset += size } - maxPieceSize := abi.PaddedPieceSize(sb.ssize) + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return abi.PieceInfo{}, err + } + + maxPieceSize := abi.PaddedPieceSize(ssize) if offset.Padded()+pieceSize.Padded() > maxPieceSize { return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset) } - var err error var done func() var stagedFile *partialFile @@ -135,7 +131,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie break } - c, err := sb.pieceCid(buf[:read]) + c, err := sb.pieceCid(sector.ProofType, buf[:read]) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err) } @@ -162,7 +158,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return pieceCids[0], nil } - pieceCID, err := ffi.GenerateUnsealedCID(sb.sealProofType, pieceCids) + pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err) } @@ -178,13 +174,13 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie }, nil } -func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) { +func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) { prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in))) if err != nil { return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err) } - pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, abi.UnpaddedPieceSize(len(in))) + pieceCID, err := ffi.GeneratePieceCIDFromFile(spt, prf, abi.UnpaddedPieceSize(len(in))) if err != nil { return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err) } @@ -194,8 +190,12 @@ func (sb *Sealer) pieceCid(in []byte) (cid.Cid, error) { return pieceCID, werr() } -func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { - maxPieceSize := abi.PaddedPieceSize(sb.ssize) +func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return err + } + maxPieceSize := abi.PaddedPieceSize(ssize) // try finding existing unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage) @@ -317,12 +317,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s // // TODO: This may be possible to do in parallel - err = ffi.UnsealRange(sb.sealProofType, + err = ffi.UnsealRange(sector.ProofType, srcPaths.Cache, sealed, opw, - sector.Number, - sector.Miner, + sector.ID.Number, + sector.ID.Miner, randomness, commd, uint64(at.Unpadded()), @@ -356,14 +356,18 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s return nil } -func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { +func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { path, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage) if err != nil { return false, xerrors.Errorf("acquire unsealed sector path: %w", err) } defer done() - maxPieceSize := abi.PaddedPieceSize(sb.ssize) + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return false, err + } + maxPieceSize := abi.PaddedPieceSize(ssize) pf, err := openPartialFile(maxPieceSize, path.Unsealed) if err != nil { @@ -408,7 +412,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se return true, nil } -func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { +func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing) if err != nil { return nil, xerrors.Errorf("acquiring sector paths: %w", err) @@ -443,29 +447,33 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke for _, piece := range pieces { sum += piece.Size.Unpadded() } - ussize := abi.PaddedPieceSize(sb.ssize).Unpadded() + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return nil, err + } + ussize := abi.PaddedPieceSize(ssize).Unpadded() if sum != ussize { return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) } // TODO: context cancellation respect p1o, err := ffi.SealPreCommitPhase1( - sb.sealProofType, + sector.ProofType, paths.Cache, paths.Unsealed, paths.Sealed, - sector.Number, - sector.Miner, + sector.ID.Number, + sector.ID.Miner, ticket, pieces, ) if err != nil { - return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err) + return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err) } return p1o, nil } -func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) { +func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) { paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing) if err != nil { return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err) @@ -474,7 +482,7 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed) if err != nil { - return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err) + return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err) } return storage.SectorCids{ @@ -483,40 +491,45 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase }, nil } -func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { +func (sb *Sealer) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing) if err != nil { return nil, xerrors.Errorf("acquire sector paths: %w", err) } defer done() output, err := ffi.SealCommitPhase1( - sb.sealProofType, + sector.ProofType, cids.Sealed, cids.Unsealed, paths.Cache, paths.Sealed, - sector.Number, - sector.Miner, + sector.ID.Number, + sector.ID.Miner, ticket, seed, pieces, ) if err != nil { log.Warn("StandaloneSealCommit error: ", err) - log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed) + log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.ID.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed) return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) } return output, nil } -func (sb *Sealer) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (storage.Proof, error) { - return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner) +func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storage.Proof, error) { + return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner) } -func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { +func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error { + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return err + } + maxPieceSize := abi.PaddedPieceSize(ssize) + if len(keepUnsealed) > 0 { - maxPieceSize := abi.PaddedPieceSize(sb.ssize) sr := pieceRun(0, maxPieceSize) @@ -580,10 +593,10 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU } defer done() - return ffi.ClearCache(uint64(sb.ssize), paths.Cache) + return ffi.ClearCache(uint64(ssize), paths.Cache) } -func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { +func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error { // This call is meant to mark storage as 'freeable'. Given that unsealing is // very expensive, we don't remove data as soon as we can - instead we only // do that when we don't have free space for data that really needs it @@ -593,7 +606,7 @@ func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safe return xerrors.Errorf("not supported at this layer") } -func (sb *Sealer) Remove(ctx context.Context, sector abi.SectorID) error { +func (sb *Sealer) Remove(ctx context.Context, sector storage.SectorRef) error { return xerrors.Errorf("not supported at this layer") // happens in localworker } diff --git a/extern/sector-storage/ffiwrapper/types.go b/extern/sector-storage/ffiwrapper/types.go index b67f9c595..b7e96636a 100644 --- a/extern/sector-storage/ffiwrapper/types.go +++ b/extern/sector-storage/ffiwrapper/types.go @@ -29,8 +29,8 @@ type Storage interface { storage.Prover StorageSealer - UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error - ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) + UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error + ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } type Verifier interface { @@ -44,7 +44,7 @@ type Verifier interface { type SectorProvider interface { // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns an error when allocate is set, and existing isn't, and the sector exists - AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) + AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) } var _ SectorProvider = &basicfs.Provider{} diff --git a/extern/sector-storage/ffiwrapper/verifier_cgo.go b/extern/sector-storage/ffiwrapper/verifier_cgo.go index 9dab7103e..15e0e6ab3 100644 --- a/extern/sector-storage/ffiwrapper/verifier_cgo.go +++ b/extern/sector-storage/ffiwrapper/verifier_cgo.go @@ -11,6 +11,7 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-state-types/abi" proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) @@ -74,12 +75,15 @@ func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorIn continue } - sid := abi.SectorID{Miner: mid, Number: s.SectorNumber} + sid := storage.SectorRef{ + ID: abi.SectorID{Miner: mid, Number: s.SectorNumber}, + ProofType: s.SealProof, + } paths, d, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, 0, storiface.PathStorage) if err != nil { - log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err) - skipped = append(skipped, sid) + log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err) + skipped = append(skipped, sid.ID) continue } doneFuncs = append(doneFuncs, d) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index a3d6a4131..eec19c9f2 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -47,9 +47,7 @@ type Worker interface { } type SectorManager interface { - SectorSize() abi.SectorSize - - ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error + ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ffiwrapper.StorageSealer storage.Prover @@ -61,8 +59,6 @@ type WorkerID uuid.UUID // worker session UUID var ClosedWorkerID = uuid.UUID{} type Manager struct { - scfg *ffiwrapper.Config - ls stores.LocalStorage storage *stores.Remote localStore *stores.Local @@ -105,13 +101,13 @@ type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { +func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err } - prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, spt: cfg.SealProofType}, cfg) + prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } @@ -119,15 +115,13 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) m := &Manager{ - scfg: cfg, - ls: ls, storage: stor, localStore: lstor, remoteHnd: &stores.FetchHandler{Local: lstor}, index: si, - sched: newScheduler(cfg.SealProofType), + sched: newScheduler(), Prover: prover, @@ -162,7 +156,6 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg } err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ - SealProof: cfg.SealProofType, TaskTypes: localTasks, }, stor, lstor, si, m, wss)) if err != nil { @@ -198,23 +191,18 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.remoteHnd.ServeHTTP(w, r) } -func (m *Manager) SectorSize() abi.SectorSize { - sz, _ := m.scfg.SealProofType.SectorSize() - return sz -} - func schedNop(context.Context, Worker) error { return nil } -func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { +func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) return err } } -func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { +func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { return func(ctx context.Context, w Worker) error { r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) if err != nil { @@ -227,19 +215,19 @@ func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storifac } } -func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) { +func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTNone); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { returnErr = xerrors.Errorf("acquiring read sector lock: %w", err) return } // passing 0 spt because we only need it when allowFetch is true - best, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false) + best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) if err != nil { returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) return @@ -249,7 +237,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect if foundUnsealed { // append to existing // There is unsealed sector, see if we can read from it - selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) + selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) @@ -262,7 +250,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect return } -func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { +func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size) if err != nil { return err @@ -273,7 +261,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { return xerrors.Errorf("acquiring unseal sector lock: %w", err) } @@ -302,7 +290,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return err } - selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) + selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) @@ -317,16 +305,16 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return nil } -func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { +func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error { log.Warnf("stub NewSector") return nil } -func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { +func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTUnsealed); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTUnsealed); err != nil { return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err) } @@ -335,7 +323,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie if len(existingPieces) == 0 { // new selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing) } else { // use existing - selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) + selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) } var out abi.PieceInfo @@ -353,7 +341,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return out, err } -func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { +func (m *Manager) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -380,7 +368,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke return out, waitErr } - if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil { return nil, xerrors.Errorf("acquiring sector lock: %w", err) } @@ -404,7 +392,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke return out, waitErr } -func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) { +func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -431,11 +419,11 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase return out, waitErr } - if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil { return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) } - selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true) + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, true) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out)) @@ -453,7 +441,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase return out, waitErr } -func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { +func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -480,14 +468,14 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a return out, waitErr } - if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil { return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) } // NOTE: We set allowFetch to false in so that we always execute on a worker // with direct access to the data. We want to do that because this step is // generally very cheap / fast, and transferring data is not worth the effort - selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) @@ -505,7 +493,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a return out, waitErr } -func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { +func (m *Manager) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (out storage.Proof, err error) { wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out) if err != nil { return storage.Proof{}, xerrors.Errorf("getWork: %w", err) @@ -548,17 +536,17 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou return out, waitErr } -func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { +func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { return xerrors.Errorf("acquiring sector lock: %w", err) } unsealed := storiface.FTUnsealed { - unsealedStores, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false) + unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("finding unsealed sector: %w", err) } @@ -568,7 +556,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU } } - selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), @@ -601,28 +589,28 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU return nil } -func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { +func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error { log.Warnw("ReleaseUnsealed todo") return nil } -func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { +func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { return xerrors.Errorf("acquiring sector lock: %w", err) } var err error - if rerr := m.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil { + if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) } - if rerr := m.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil { + if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTCache, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) } - if rerr := m.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil { + if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) } diff --git a/extern/sector-storage/request_queue.go b/extern/sector-storage/request_queue.go index 9247ce24a..925c44fa8 100644 --- a/extern/sector-storage/request_queue.go +++ b/extern/sector-storage/request_queue.go @@ -20,7 +20,7 @@ func (q requestQueue) Less(i, j int) bool { return q[i].taskType.Less(q[j].taskType) } - return q[i].sector.Number < q[j].sector.Number // optimize minerActor.NewSectors bitfield + return q[i].sector.ID.Number < q[j].sector.ID.Number // optimize minerActor.NewSectors bitfield } func (q requestQueue) Swap(i, j int) { diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/resources.go index 6b531e82b..7da3e96a6 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/resources.go @@ -314,4 +314,13 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources func init() { ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately ResourceTable[sealtasks.TTReadUnsealed] = ResourceTable[sealtasks.TTFetch] + + // V1_1 is the same as V1 + for _, m := range ResourceTable { + m[abi.RegisteredSealProof_StackedDrg2KiBV1_1] = m[abi.RegisteredSealProof_StackedDrg2KiBV1] + m[abi.RegisteredSealProof_StackedDrg8MiBV1_1] = m[abi.RegisteredSealProof_StackedDrg8MiBV1] + m[abi.RegisteredSealProof_StackedDrg512MiBV1_1] = m[abi.RegisteredSealProof_StackedDrg512MiBV1] + m[abi.RegisteredSealProof_StackedDrg32GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg32GiBV1] + m[abi.RegisteredSealProof_StackedDrg64GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg64GiBV1] + } } diff --git a/extern/sector-storage/roprov.go b/extern/sector-storage/roprov.go index 7f051b549..ebc7610d7 100644 --- a/extern/sector-storage/roprov.go +++ b/extern/sector-storage/roprov.go @@ -5,7 +5,7 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -14,23 +14,17 @@ import ( type readonlyProvider struct { index stores.SectorIndex stor *stores.Local - spt abi.RegisteredSealProof } -func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { +func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { if allocate != storiface.FTNone { return storiface.SectorPaths{}, nil, xerrors.New("read-only storage") } - ssize, err := l.spt.SectorSize() - if err != nil { - return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to determine sector size: %w", err) - } - ctx, cancel := context.WithCancel(ctx) // use TryLock to avoid blocking - locked, err := l.index.StorageTryLock(ctx, id, existing, storiface.FTNone) + locked, err := l.index.StorageTryLock(ctx, id.ID, existing, storiface.FTNone) if err != nil { cancel() return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) @@ -40,7 +34,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock") } - p, _, err := l.stor.AcquireSector(ctx, id, ssize, existing, allocate, sealing, storiface.AcquireMove) + p, _, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing, storiface.AcquireMove) return p, cancel, err } diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 549a16a96..79761a65e 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -11,6 +11,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -51,8 +52,6 @@ type WorkerSelector interface { } type scheduler struct { - spt abi.RegisteredSealProof - workersLk sync.RWMutex workers map[WorkerID]*workerHandle @@ -122,7 +121,7 @@ type activeResources struct { } type workerRequest struct { - sector abi.SectorID + sector storage.SectorRef taskType sealtasks.TaskType priority int // larger values more important sel WorkerSelector @@ -143,10 +142,8 @@ type workerResponse struct { err error } -func newScheduler(spt abi.RegisteredSealProof) *scheduler { +func newScheduler() *scheduler { return &scheduler{ - spt: spt, - workers: map[WorkerID]*workerHandle{}, schedule: make(chan *workerRequest), @@ -168,7 +165,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { } } -func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { +func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { ret := make(chan workerResponse) select { @@ -315,7 +312,7 @@ func (sh *scheduler) diag() SchedDiagInfo { task := (*sh.schedQueue)[sqi] out.Requests = append(out.Requests, SchedDiagRequestInfo{ - Sector: task.sector, + Sector: task.sector.ID, TaskType: task.taskType, Priority: task.priority, }) @@ -378,7 +375,7 @@ func (sh *scheduler) trySched() { }() task := (*sh.schedQueue)[sqi] - needRes := ResourceTable[task.taskType][sh.spt] + needRes := ResourceTable[task.taskType][task.sector.ProofType] task.indexHeap = sqi for wnd, windowRequest := range sh.openWindows { @@ -400,7 +397,7 @@ func (sh *scheduler) trySched() { } rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) - ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker) + ok, err := task.sel.Ok(rpcCtx, task.taskType, task.sector.ProofType, worker) cancel() if err != nil { log.Errorf("trySched(1) req.sel.Ok error: %+v", err) @@ -456,21 +453,21 @@ func (sh *scheduler) trySched() { for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { task := (*sh.schedQueue)[sqi] - needRes := ResourceTable[task.taskType][sh.spt] + needRes := ResourceTable[task.taskType][task.sector.ProofType] selectedWindow := -1 for _, wnd := range acceptableWindows[task.indexHeap] { wid := sh.openWindows[wnd].worker wr := sh.workers[wid].info.Resources - log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd) // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) { continue } - log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd) + log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd) windows[wnd].allocated.add(wr, needRes) // TODO: We probably want to re-sort acceptableWindows here based on new diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index 29c730b03..67bddca3a 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -294,7 +294,7 @@ func (sw *schedWorker) workerCompactWindows() { var moved []int for ti, todo := range window.todo { - needRes := ResourceTable[todo.taskType][sw.sched.spt] + needRes := ResourceTable[todo.taskType][todo.sector.ProofType] if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) { continue } @@ -350,7 +350,7 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.todo { - needRes := ResourceTable[todo.taskType][sw.sched.spt] + needRes := ResourceTable[todo.taskType][todo.sector.ProofType] if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) { tidx = t break @@ -364,7 +364,7 @@ assignLoop: todo := firstWindow.todo[tidx] - log.Debugf("assign worker sector %d", todo.sector.Number) + log.Debugf("assign worker sector %d", todo.sector.ID.Number) err := sw.startProcessingTask(sw.taskDone, todo) if err != nil { @@ -389,7 +389,7 @@ assignLoop: func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error { w, sh := sw.worker, sw.sched - needRes := ResourceTable[req.taskType][sh.spt] + needRes := ResourceTable[req.taskType][req.sector.ProofType] w.lk.Lock() w.preparing.add(w.info.Resources, needRes) diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index a58d515c9..df3b4eed0 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -46,7 +46,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob { for _, request := range window.todo { out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{ ID: storiface.UndefCall, - Sector: request.sector, + Sector: request.sector.ID, Task: request.taskType, RunWait: wi + 1, Start: request.start, diff --git a/extern/sector-storage/stores/http_handler.go b/extern/sector-storage/stores/http_handler.go index 2237bd407..a225d1b7e 100644 --- a/extern/sector-storage/stores/http_handler.go +++ b/extern/sector-storage/stores/http_handler.go @@ -2,6 +2,7 @@ package stores import ( "encoding/json" + "github.com/filecoin-project/specs-storage/storage" "io" "net/http" "os" @@ -73,7 +74,12 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ // The caller has a lock on this sector already, no need to get one here // passing 0 spt because we don't allocate anything - paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + si := storage.SectorRef{ + ID: id, + ProofType: 0, + } + + paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { log.Errorf("%+v", err) w.WriteHeader(500) diff --git a/extern/sector-storage/stores/interface.go b/extern/sector-storage/stores/interface.go index 574ec599e..991ff0f0f 100644 --- a/extern/sector-storage/stores/interface.go +++ b/extern/sector-storage/stores/interface.go @@ -2,15 +2,16 @@ package stores import ( "context" - "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error) + AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error) Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error // like remove, but doesn't remove the primary sector copy, nor the last @@ -18,7 +19,7 @@ type Store interface { RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error // move sectors into storage - MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error + MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) } diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 7de3103b9..026f096eb 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -14,6 +14,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -325,7 +326,12 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { +func (st *Local) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { + ssize, err := sid.ProofType.SectorSize() + if err != nil { + return nil, err + } + st.localLk.Lock() done := func() {} @@ -375,11 +381,16 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, ssize abi.Sector return done, nil } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { +func (st *Local) AcquireSector(ctx context.Context, sid storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } + ssize, err := sid.ProofType.SectorSize() + if err != nil { + return storiface.SectorPaths{}, storiface.SectorPaths{}, err + } + st.localLk.RLock() defer st.localLk.RUnlock() @@ -391,7 +402,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi. continue } - si, err := st.index.StorageFindSector(ctx, sid, fileType, ssize, false) + si, err := st.index.StorageFindSector(ctx, sid.ID, fileType, ssize, false) if err != nil { log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err) continue @@ -407,7 +418,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi. continue } - spath := p.sectorPath(sid, fileType) + spath := p.sectorPath(sid.ID, fileType) storiface.SetPathByType(&out, fileType, spath) storiface.SetPathByType(&storageIDs, fileType, string(info.ID)) @@ -449,7 +460,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, ssize abi. // TODO: Check free space - best = p.sectorPath(sid, fileType) + best = p.sectorPath(sid.ID, fileType) bestID = si.ID break } @@ -578,13 +589,13 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa return nil } -func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error { - dest, destIds, err := st.AcquireSector(ctx, s, ssize, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove) +func (st *Local) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error { + dest, destIds, err := st.AcquireSector(ctx, s, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove) if err != nil { return xerrors.Errorf("acquire dest storage: %w", err) } - src, srcIds, err := st.AcquireSector(ctx, s, ssize, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + src, srcIds, err := st.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage: %w", err) } @@ -616,7 +627,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore) - if err := st.index.StorageDropSector(ctx, ID(storiface.PathByType(srcIds, fileType)), s, fileType); err != nil { + if err := st.index.StorageDropSector(ctx, ID(storiface.PathByType(srcIds, fileType)), s.ID, fileType); err != nil { return xerrors.Errorf("dropping source sector from index: %w", err) } @@ -625,7 +636,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) } - if err := st.index.StorageDeclareSector(ctx, ID(storiface.PathByType(destIds, fileType)), s, fileType, true); err != nil { + if err := st.index.StorageDeclareSector(ctx, ID(storiface.PathByType(destIds, fileType)), s.ID, fileType, true); err != nil { return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(storiface.PathByType(destIds, fileType)), err) } } diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 37dde910d..bf66c1bb5 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/tarutil" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/hashicorp/go-multierror" files "github.com/ipfs/go-ipfs-files" @@ -58,7 +59,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { +func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } @@ -66,9 +67,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se for { r.fetchLk.Lock() - c, locked := r.fetching[s] + c, locked := r.fetching[s.ID] if !locked { - r.fetching[s] = make(chan struct{}) + r.fetching[s.ID] = make(chan struct{}) r.fetchLk.Unlock() break } @@ -85,12 +86,12 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se defer func() { r.fetchLk.Lock() - close(r.fetching[s]) - delete(r.fetching, s) + close(r.fetching[s.ID]) + delete(r.fetching, s.ID) r.fetchLk.Unlock() }() - paths, stores, err := r.local.AcquireSector(ctx, s, ssize, existing, allocate, pathType, op) + paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) } @@ -106,7 +107,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se } } - apaths, ids, err := r.local.AcquireSector(ctx, s, ssize, storiface.FTNone, toFetch, pathType, op) + apaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) } @@ -116,7 +117,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se odt = storiface.FsOverheadFinalized } - releaseStorage, err := r.local.Reserve(ctx, s, ssize, toFetch, ids, odt) + releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) } @@ -134,7 +135,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se dest := storiface.PathByType(apaths, fileType) storageID := storiface.PathByType(ids, fileType) - url, err := r.acquireFromRemote(ctx, s, fileType, dest) + url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, err } @@ -142,7 +143,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, ssize abi.Se storiface.SetPathByType(&paths, fileType, dest) storiface.SetPathByType(&stores, fileType, storageID) - if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == storiface.AcquireMove); err != nil { + if err := r.index.StorageDeclareSector(ctx, ID(storageID), s.ID, fileType, op == storiface.AcquireMove); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) continue } @@ -281,14 +282,14 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { } } -func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.SectorSize, types storiface.SectorFileType) error { +func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error { // Make sure we have the data local - _, _, err := r.AcquireSector(ctx, s, ssize, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } - return r.local.MoveStorage(ctx, s, ssize, types) + return r.local.MoveStorage(ctx, s, types) } func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool) error { diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 1f2c316b8..4e52fa04c 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -77,17 +77,17 @@ var _ fmt.Stringer = &CallID{} var UndefCall CallID type WorkerCalls interface { - AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) - SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error) - SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (CallID, error) - SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error) - SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (CallID, error) - FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (CallID, error) - ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (CallID, error) - MoveStorage(ctx context.Context, sector abi.SectorID, types SectorFileType) (CallID, error) - UnsealPiece(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error) - ReadPiece(context.Context, io.Writer, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error) - Fetch(context.Context, abi.SectorID, SectorFileType, PathType, AcquireMode) (CallID, error) + AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) + SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error) + SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error) + SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error) + SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error) + FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error) + ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error) + MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error) + UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error) + ReadPiece(context.Context, io.Writer, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error) + Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error) } type WorkerReturn interface { diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 07cc9b5f9..7ab23e335 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -20,7 +20,7 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" - storage2 "github.com/filecoin-project/specs-storage/storage" + storage "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" @@ -31,7 +31,6 @@ import ( var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache} type WorkerConfig struct { - SealProof abi.RegisteredSealProof TaskTypes []sealtasks.TaskType NoSwap bool } @@ -40,7 +39,6 @@ type WorkerConfig struct { type ExecutorFunc func() (ffiwrapper.Storage, error) type LocalWorker struct { - scfg *ffiwrapper.Config storage stores.Store localStore *stores.Local sindex stores.SectorIndex @@ -64,9 +62,6 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store } w := &LocalWorker{ - scfg: &ffiwrapper.Config{ - SealProofType: wcfg.SealProof, - }, storage: store, localStore: local, sindex: sindex, @@ -119,18 +114,13 @@ type localWorkerPathProvider struct { op storiface.AcquireMode } -func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { - ssize, err := l.w.scfg.SealProofType.SectorSize() +func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { + paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing, l.op) if err != nil { return storiface.SectorPaths{}, nil, err } - paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, ssize, existing, allocate, sealing, l.op) - if err != nil { - return storiface.SectorPaths{}, nil, err - } - - releaseStorage, err := l.w.localStore.Reserve(ctx, sector, ssize, allocate, storageIDs, storiface.FSOverheadSeal) + releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) if err != nil { return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } @@ -147,7 +137,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. sid := storiface.PathByType(storageIDs, fileType) - if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == storiface.AcquireMove); err != nil { + if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector.ID, fileType, l.op == storiface.AcquireMove); err != nil { log.Errorf("declare sector error: %+v", err) } } @@ -155,7 +145,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. } func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) { - return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg) + return ffiwrapper.New(&localWorkerPathProvider{w: l}) } type ReturnType string @@ -222,9 +212,9 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac Fetch: rfunc(storiface.WorkerReturn.ReturnFetch), } -func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { +func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { ci := storiface.CallID{ - Sector: sector, + Sector: sector.ID, ID: uuid.New(), } @@ -297,7 +287,7 @@ func errstr(err error) string { return "" } -func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error { +func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error { sb, err := l.executor() if err != nil { return err @@ -306,7 +296,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error return sb.NewSector(ctx, sector) } -func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { +func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -317,7 +307,7 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs [] }) } -func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { +func (l *LocalWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { return l.asyncCall(ctx, sector, Fetch, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype) if err == nil { @@ -328,16 +318,16 @@ func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType s }) } -func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { +func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { return l.asyncCall(ctx, sector, SealPreCommit1, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { { // cleanup previous failed attempts if they exist - if err := l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil { + if err := l.storage.Remove(ctx, sector.ID, storiface.FTSealed, true); err != nil { return nil, xerrors.Errorf("cleaning up sealed data: %w", err) } - if err := l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil { + if err := l.storage.Remove(ctx, sector.ID, storiface.FTCache, true); err != nil { return nil, xerrors.Errorf("cleaning up cache data: %w", err) } } @@ -351,7 +341,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t }) } -func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (storiface.CallID, error) { +func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -362,7 +352,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p }) } -func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (storiface.CallID, error) { +func (l *LocalWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -373,7 +363,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick }) } -func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (storiface.CallID, error) { +func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -384,7 +374,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas }) } -func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) (storiface.CallID, error) { +func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -396,7 +386,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k } if len(keepUnsealed) == 0 { - if err := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); err != nil { + if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true); err != nil { return nil, xerrors.Errorf("removing unsealed data: %w", err) } } @@ -405,7 +395,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k }) } -func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) (storiface.CallID, error) { +func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) { return storiface.UndefCall, xerrors.Errorf("implement me") } @@ -425,18 +415,13 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { return err } -func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) { +func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) { return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { - ssize, err := l.scfg.SealProofType.SectorSize() - if err != nil { - return nil, err - } - - return nil, l.storage.MoveStorage(ctx, sector, ssize, types) + return nil, l.storage.MoveStorage(ctx, sector, types) }) } -func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { +func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err @@ -447,11 +432,11 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde return nil, xerrors.Errorf("unsealing sector: %w", err) } - if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil { + if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil { return nil, xerrors.Errorf("removing source data: %w", err) } - if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil { + if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil { return nil, xerrors.Errorf("removing source data: %w", err) } @@ -459,7 +444,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde }) } -func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { +func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { sb, err := l.executor() if err != nil { return storiface.UndefCall, err diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 4a22fcca7..febb190c5 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -42,7 +42,7 @@ func (wt *workTracker) onDone(callID storiface.CallID) { delete(wt.running, callID) } -func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { +func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { return func(callID storiface.CallID, err error) (storiface.CallID, error) { if err != nil { return callID, err @@ -60,7 +60,7 @@ func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.Task wt.running[callID] = trackedWork{ job: storiface.WorkerJob{ ID: callID, - Sector: sid, + Sector: sid.ID, Task: task, Start: time.Now(), }, @@ -99,39 +99,39 @@ type trackedWorker struct { tracker *workTracker } -func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { +func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) } -func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) { +func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) } -func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { +func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) } -func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) { +func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) } -func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) { +func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) } -func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { +func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) } -func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { +func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) } -func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { +func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) } -func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { +func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) } diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index ed7a691ef..56a55bb61 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -14,7 +14,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" ) @@ -166,23 +165,14 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrBadSeed{xerrors.Errorf("seed has changed")} } - ss, err := m.api.StateMinerSectorSize(ctx, m.maddr, tok) - if err != nil { - return &ErrApi{err} - } - spt, err := ffiwrapper.SealProofTypeFromSectorSize(ss) - if err != nil { - return err - } - if *si.CommR != pci.Info.SealedCID { log.Warn("on-chain sealed CID doesn't match!") } ok, err := m.verif.VerifySeal(proof2.SealVerifyInfo{ - SectorID: m.minerSector(si.SectorNumber), + SectorID: m.minerSectorID(si.SectorNumber), SealedCID: pci.Info.SealedCID, - SealProof: spt, + SealProof: pci.Info.SealProof, Proof: proof, Randomness: si.TicketValue, InteractiveRandomness: si.SeedValue, diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 482181dc6..83f4e2390 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -267,7 +267,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta */ - m.stats.updateSector(m.minerSector(state.SectorNumber), state.State) + m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) switch state.State { // Happy path @@ -394,6 +394,15 @@ func (m *Sealing) restartSectors(ctx context.Context) error { return xerrors.Errorf("getting the sealing delay: %w", err) } + spt, err := m.currentSealProof(ctx) + if err != nil { + return xerrors.Errorf("getting current seal proof: %w", err) + } + ssize, err := spt.SectorSize() + if err != nil { + return err + } + m.unsealedInfoMap.lk.Lock() defer m.unsealedInfoMap.lk.Unlock() for _, sector := range trackedSectors { @@ -408,7 +417,9 @@ func (m *Sealing) restartSectors(ctx context.Context) error { // something's funky here, but probably safe to move on log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber) } else { - ui := UnsealedSectorInfo{} + ui := UnsealedSectorInfo{ + ssize: ssize, + } for _, p := range sector.Pieces { if p.DealInfo != nil { ui.numDeals++ diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index fc8e86c55..c3b282d79 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -6,9 +6,10 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" ) -func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { +func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { if len(sizes) == 0 { return nil, nil } @@ -47,21 +48,31 @@ func (m *Sealing) PledgeSector() error { // this, as we run everything here async, and it's cancelled when the // command exits - size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() + spt, err := m.currentSealProof(ctx) + if err != nil { + log.Errorf("%+v", err) + return + } + + size, err := spt.SectorSize() + if err != nil { + log.Errorf("%+v", err) + return + } sid, err := m.sc.Next() if err != nil { log.Errorf("%+v", err) return } - sectorID := m.minerSector(sid) + sectorID := m.minerSector(spt, sid) err = m.sealer.NewSector(ctx, sectorID) if err != nil { log.Errorf("%+v", err) return } - pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, size) + pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded()) if err != nil { log.Errorf("%+v", err) return @@ -75,7 +86,7 @@ func (m *Sealing) PledgeSector() error { } } - if err := m.newSectorCC(sid, ps); err != nil { + if err := m.newSectorCC(ctx, sid, ps); err != nil { log.Errorf("%+v", err) return } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index d9953eee0..5211f8bbe 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -8,13 +8,15 @@ import ( "sync" "time" - "github.com/filecoin-project/go-state-types/network" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/go-address" padreader "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" @@ -53,6 +55,7 @@ type SealingAPI interface { StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) + StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) @@ -105,6 +108,7 @@ type UnsealedSectorInfo struct { // stored should always equal sum of pieceSizes.Padded() stored abi.PaddedPieceSize pieceSizes []abi.UnpaddedPieceSize + ssize abi.SectorSize } func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing { @@ -151,19 +155,30 @@ func (m *Sealing) Run(ctx context.Context) error { func (m *Sealing) Stop(ctx context.Context) error { return m.sectors.Stop(ctx) } + func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) if (padreader.PaddedSize(uint64(size))) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } - if size > abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() { + sp, err := m.currentSealProof(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + } + + ssize, err := sp.SectorSize() + if err != nil { + return 0, 0, err + } + + if size > abi.PaddedPieceSize(ssize).Unpadded() { return 0, 0, xerrors.Errorf("piece cannot fit into a sector") } m.unsealedInfoMap.lk.Lock() - sid, pads, err := m.getSectorAndPadding(size) + sid, pads, err := m.getSectorAndPadding(ctx, size) if err != nil { m.unsealedInfoMap.lk.Unlock() return 0, 0, xerrors.Errorf("getting available sector: %w", err) @@ -185,7 +200,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) } - startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize()) + startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize) m.unsealedInfoMap.lk.Unlock() @@ -201,7 +216,16 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec // Caller should hold m.unsealedInfoMap.lk func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { log.Infof("Adding piece to sector %d", sectorID) - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) + sp, err := m.currentSealProof(ctx) + if err != nil { + return xerrors.Errorf("getting current seal proof type: %w", err) + } + ssize, err := sp.SectorSize() + if err != nil { + return err + } + + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) if err != nil { return xerrors.Errorf("writing piece: %w", err) } @@ -224,6 +248,7 @@ func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size numDeals: num, stored: ui.stored + piece.Piece.Size, pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), + ssize: ssize, } return nil @@ -257,16 +282,16 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { } // Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { - ss := abi.PaddedPieceSize(m.sealer.SectorSize()) +func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { for k, v := range m.unsealedInfoMap.infos { pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) - if v.stored+size.Padded()+padLength <= ss { + + if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { return k, pads, nil } } - ns, err := m.newDealSector() + ns, ssize, err := m.newDealSector(ctx) if err != nil { return 0, nil, err } @@ -275,23 +300,24 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum numDeals: 0, stored: 0, pieceSizes: nil, + ssize: ssize, } return ns, nil, nil } // newDealSector creates a new sector for deal storage -func (m *Sealing) newDealSector() (abi.SectorNumber, error) { +func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { // First make sure we don't have too many 'open' sectors cfg, err := m.getConfig() if err != nil { - return 0, xerrors.Errorf("getting config: %w", err) + return 0, 0, xerrors.Errorf("getting config: %w", err) } if cfg.MaxSealingSectorsForDeals > 0 { if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return 0, ErrTooManySectorsSealing + return 0, 0, ErrTooManySectorsSealing } } @@ -338,36 +364,36 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { } } + spt, err := m.currentSealProof(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + } + // Now actually create a new sector sid, err := m.sc.Next() if err != nil { - return 0, xerrors.Errorf("getting sector number: %w", err) + return 0, 0, xerrors.Errorf("getting sector number: %w", err) } - err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) + err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid)) if err != nil { - return 0, xerrors.Errorf("initializing sector: %w", err) - } - - rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) - if err != nil { - return 0, xerrors.Errorf("bad sector size: %w", err) + return 0, 0, xerrors.Errorf("initializing sector: %w", err) } log.Infof("Creating sector %d", sid) err = m.sectors.Send(uint64(sid), SectorStart{ ID: sid, - SectorType: rt, + SectorType: spt, }) if err != nil { - return 0, xerrors.Errorf("starting the sector fsm: %w", err) + return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err) } cf, err := m.getConfig() if err != nil { - return 0, xerrors.Errorf("getting the sealing delay: %w", err) + return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err) } if cf.WaitDealsDelay > 0 { @@ -380,25 +406,42 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { }() } - return sid, nil + ssize, err := spt.SectorSize() + return sid, ssize, err } // newSectorCC accepts a slice of pieces with no deal (junk data) -func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error { - rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) +func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { + spt, err := m.currentSealProof(ctx) if err != nil { - return xerrors.Errorf("bad sector size: %w", err) + return xerrors.Errorf("getting current seal proof type: %w", err) } log.Infof("Creating CC sector %d", sid) return m.sectors.Send(uint64(sid), SectorStartCC{ ID: sid, Pieces: pieces, - SectorType: rt, + SectorType: spt, }) } -func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID { +func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) { + mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil) + if err != nil { + return 0, err + } + + return mi.SealProofType, nil +} + +func (m *Sealing) minerSector(spt abi.RegisteredSealProof, num abi.SectorNumber) storage.SectorRef { + return storage.SectorRef{ + ID: m.minerSectorID(num), + ProofType: spt, + } +} + +func (m *Sealing) minerSectorID(num abi.SectorNumber) abi.SectorID { mid, err := address.IDFromAddress(m.maddr) if err != nil { panic(err) diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index 6684c714d..de7e6c8d0 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -32,7 +32,7 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf } func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error { - if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorNumber)); err != nil { + if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { return ctx.Send(SectorRemoveFailed{err}) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index a1aee4cde..fca4a8699 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -31,7 +31,12 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err allocated += piece.Piece.Size.Unpadded() } - ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return err + } + + ubytes := abi.PaddedPieceSize(ssize).Unpadded() if allocated > ubytes { return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) @@ -46,7 +51,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber) } - fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...) + fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...) if err != nil { return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } @@ -148,7 +153,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) // process has just restarted and the worker had the result ready) } - pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) if err != nil { return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } @@ -159,7 +164,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { - cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.PreCommit1Out) + cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out) if err != nil { return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } @@ -386,12 +391,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) Unsealed: *sector.CommD, Sealed: *sector.CommR, } - c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) + c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)}) } - proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), c2in) + proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) } @@ -492,7 +497,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: Maybe wait for some finality - if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil { + if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil { return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } @@ -503,7 +508,7 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf // TODO: track sector health / expiration log.Infof("Proving sector %d", sector.SectorNumber) - if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil { + if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil { log.Error(err) } diff --git a/go.mod b/go.mod index 2becc2453..471efa7d0 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/specs-actors v0.9.13 github.com/filecoin-project/specs-actors/v2 v2.3.0 - github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796 + github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27 github.com/filecoin-project/test-vectors/schema v0.0.5 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-kit/kit v0.10.0 diff --git a/go.sum b/go.sum index b84a1ad3d..0a72659b3 100644 --- a/go.sum +++ b/go.sum @@ -293,6 +293,8 @@ github.com/filecoin-project/specs-actors/v2 v2.3.0 h1:V7lHeF2ylfFi84F4y80u5FE4Bp github.com/filecoin-project/specs-actors/v2 v2.3.0/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y= github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796 h1:dJsTPWpG2pcTeojO2pyn0c6l+x/3MZYCBgo/9d11JEk= github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g= +github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27 h1:GLr3UKAqSjFjm5TimjLw6rRXkiZ2gPkrPv6xWsYw66Y= +github.com/filecoin-project/specs-storage v0.1.1-0.20201104202311-4d056083cd27/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g= github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 1e3374950..348137a47 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -141,11 +141,6 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err) } - rt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize) - if err != nil { - return nil, xerrors.Errorf("bad sector size: %w", err) - } - if uint64(params.Data.PieceSize.Padded()) > uint64(mi.SectorSize) { return nil, xerrors.New("data doesn't fit in a sector") } @@ -171,7 +166,7 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), Price: params.EpochPrice, Collateral: params.ProviderCollateral, - Rt: rt, + Rt: abi.RegisteredSealProof_StackedDrg32GiBV1_1, // all proof types have the same D tree FastRetrieval: params.FastRetrieval, VerifiedDeal: params.VerifiedDeal, StoreID: storeID, @@ -647,7 +642,7 @@ func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Addre func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) { - // Hard-code the sector size to 32GiB, because: + // Hard-code the sector type to 32GiBV1_1, because: // - pieceio.GeneratePieceCommitment requires a RegisteredSealProof // - commP itself is sector-size independent, with rather low probability of that changing // ( note how the final rust call is identical for every RegSP type ) @@ -655,12 +650,7 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet // // IF/WHEN this changes in the future we will have to be able to calculate // "old style" commP, and thus will need to introduce a version switch or similar - arbitrarySectorSize := abi.SectorSize(32 << 30) - - rt, err := ffiwrapper.SealProofTypeFromSectorSize(arbitrarySectorSize) - if err != nil { - return nil, xerrors.Errorf("bad sector size: %w", err) - } + arbitraryProofType := abi.RegisteredSealProof_StackedDrg32GiBV1_1 rdr, err := os.Open(inpath) if err != nil { @@ -673,7 +663,7 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet return nil, err } - commP, pieceSize, err := pieceio.GeneratePieceCommitment(rt, rdr, uint64(stat.Size())) + commP, pieceSize, err := pieceio.GeneratePieceCommitment(arbitraryProofType, rdr, uint64(stat.Size())) if err != nil { return nil, xerrors.Errorf("computing commP failed: %w", err) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a612b142b..4ed6b3838 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -125,11 +125,6 @@ func ProofsConfig(maddr dtypes.MinerAddress, fnapi lapi.FullNode) (*ffiwrapper.C return nil, err } - spt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize) - if err != nil { - return nil, xerrors.Errorf("bad sector size: %w", err) - } - sb := &ffiwrapper.Config{ SealProofType: spt, } diff --git a/storage/miner.go b/storage/miner.go index 378c12b84..daeb0ef20 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -214,12 +214,7 @@ func NewWinningPoStProver(api api.FullNode, prover storage.Prover, verifier ffiw return nil, xerrors.Errorf("getting sector size: %w", err) } - spt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize) - if err != nil { - return nil, err - } - - wpt, err := spt.RegisteredWinningPoStProof() + wpt, err := mi.SealProofType.RegisteredWinningPoStProof() if err != nil { return nil, err }