Merge pull request #9694 from filecoin-project/9159-allow-lotus-worker-to-unseal-sector-even-if-does-not-store-unsealed-data-long-term
fix: sealing: Set all path types for Unseal pipeline to sealing storage
This commit is contained in:
commit
8ba4355cab
@ -903,6 +903,11 @@ workflows:
|
||||
suite: itest-sector_terminate
|
||||
target: "./itests/sector_terminate_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-sector_unseal
|
||||
suite: itest-sector_unseal
|
||||
target: "./itests/sector_unseal_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-self_sent_txn
|
||||
suite: itest-self_sent_txn
|
||||
|
@ -708,6 +708,7 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
scfg.Storage.AllowPreCommit1 = false
|
||||
scfg.Storage.AllowPreCommit2 = false
|
||||
scfg.Storage.AllowCommit = false
|
||||
scfg.Storage.AllowUnseal = false
|
||||
}
|
||||
|
||||
scfg.Storage.Assigner = assigner
|
||||
|
142
itests/sector_unseal_test.go
Normal file
142
itests/sector_unseal_test.go
Normal file
@ -0,0 +1,142 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
func TestUnsealPiece(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
blockTime := 1 * time.Millisecond
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
_, miner, ens := kit.EnsembleMinimal(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||
kit.NoStorage(), // no storage to have better control over path settings
|
||||
kit.MutateSealingConfig(func(sc *config.SealingConfig) {
|
||||
sc.FinalizeEarly = true
|
||||
sc.AlwaysKeepUnsealedCopy = false
|
||||
})) // no mock proofs
|
||||
|
||||
var worker kit.TestWorker
|
||||
ens.Worker(miner, &worker, kit.ThroughRPC(), kit.NoStorage(), // no storage to have better control over path settings
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{
|
||||
sealtasks.TTFetch, sealtasks.TTAddPiece,
|
||||
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2,
|
||||
sealtasks.TTReplicaUpdate, sealtasks.TTUnseal, // only first update step, later steps will not run and we'll abort
|
||||
}),
|
||||
)
|
||||
|
||||
ens.Start().InterconnectAll().BeginMiningMustPost(blockTime)
|
||||
|
||||
maddr, err := miner.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// get storage paths
|
||||
|
||||
// store-only path on the miner
|
||||
miner.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
|
||||
cfg.CanSeal = false
|
||||
cfg.CanStore = true
|
||||
})
|
||||
|
||||
mlocal, err := miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mlocal, 2) // genesis and one local
|
||||
|
||||
// we want a seal-only path on the worker disconnected from miner path
|
||||
worker.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
|
||||
cfg.CanSeal = true
|
||||
cfg.CanStore = false
|
||||
})
|
||||
|
||||
wpaths, err := worker.Paths(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, wpaths, 1)
|
||||
|
||||
// check which sectors files are present on the miner/worker storage paths
|
||||
checkSectors := func(miners, workers storiface.SectorFileType) {
|
||||
paths, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, paths, 3) // genesis, miner, worker
|
||||
|
||||
// first loop for debugging
|
||||
for id, decls := range paths {
|
||||
pinfo, err := miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
|
||||
switch {
|
||||
case id == wpaths[0].ID: // worker path
|
||||
fmt.Println("Worker Decls ", len(decls), decls)
|
||||
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
|
||||
fmt.Println("Genesis Decls ", len(decls), decls)
|
||||
default: // miner path
|
||||
fmt.Println("Miner Decls ", len(decls), decls)
|
||||
}
|
||||
}
|
||||
|
||||
for id, decls := range paths {
|
||||
pinfo, err := miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
|
||||
switch {
|
||||
case id == wpaths[0].ID: // worker path
|
||||
if workers != storiface.FTNone {
|
||||
require.Len(t, decls, 1)
|
||||
require.EqualValues(t, workers.Strings(), decls[0].SectorFileType.Strings())
|
||||
} else {
|
||||
require.Len(t, decls, 0)
|
||||
}
|
||||
case !pinfo.CanStore && !pinfo.CanSeal: // genesis path
|
||||
require.Len(t, decls, kit.DefaultPresealsPerBootstrapMiner)
|
||||
default: // miner path
|
||||
if miners != storiface.FTNone {
|
||||
require.Len(t, decls, 1)
|
||||
require.EqualValues(t, miners.Strings(), decls[0].SectorFileType.Strings())
|
||||
} else {
|
||||
require.Len(t, decls, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
checkSectors(storiface.FTNone, storiface.FTNone)
|
||||
|
||||
// get a sector for upgrading
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
sector := sl[0]
|
||||
|
||||
checkSectors(storiface.FTCache|storiface.FTSealed, storiface.FTNone)
|
||||
|
||||
sinfo, err := miner.SectorsStatus(ctx, sector, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
minerId, err := address.IDFromAddress(maddr)
|
||||
require.NoError(t, err)
|
||||
|
||||
sectorRef := storiface.SectorRef{
|
||||
ID: abi.SectorID{Miner: abi.ActorID(minerId), Number: sector},
|
||||
ProofType: sinfo.SealProof,
|
||||
}
|
||||
|
||||
err = miner.SectorsUnsealPiece(ctx, sectorRef, 0, 0, sinfo.Ticket.Value, sinfo.CommD)
|
||||
require.NoError(t, err)
|
||||
|
||||
checkSectors(storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, storiface.FTNone)
|
||||
}
|
@ -405,7 +405,7 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
|
||||
}
|
||||
|
||||
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storiface.SectorRef, commD cid.Cid, unsealedPath string, randomness abi.SealRandomness) (bool, error) {
|
||||
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage)
|
||||
replicaPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathSealing)
|
||||
if xerrors.Is(err, storiface.ErrSectorNotFound) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
@ -464,12 +464,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, o
|
||||
maxPieceSize := abi.PaddedPieceSize(ssize)
|
||||
|
||||
// try finding existing
|
||||
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
|
||||
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
|
||||
var pf *partialfile.PartialFile
|
||||
|
||||
switch {
|
||||
case xerrors.Is(err, storiface.ErrSectorNotFound):
|
||||
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage)
|
||||
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
|
||||
}
|
||||
@ -516,7 +516,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storiface.SectorRef, o
|
||||
}
|
||||
|
||||
// Piece data sealed in sector
|
||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
|
||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire sealed sector paths: %w", err)
|
||||
}
|
||||
|
@ -358,6 +358,21 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
|
||||
return xerrors.Errorf("worker UnsealPiece call: %s", err)
|
||||
}
|
||||
|
||||
// get a selector for moving unsealed sector into long-term storage
|
||||
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTUnsealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
||||
|
||||
// move unsealed sector to long-term storage
|
||||
// Possible TODO: Add an option to not keep the unsealed sector in long term storage?
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||
m.schedFetch(sector, storiface.FTUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTUnsealed))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("moving unsealed sector to long term storage: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user