Update specs-storage, sector removing support
This commit is contained in:
parent
ffbfa84591
commit
07cf84cbc7
@ -500,7 +500,11 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
||||
return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner)
|
||||
}
|
||||
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
||||
if len(keepUnsealed) > 0 {
|
||||
return xerrors.Errorf("keepUnsealed unsupported") // TODO: impl for fastretrieval copies
|
||||
}
|
||||
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||
@ -510,6 +514,20 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
||||
return ffi.ClearCache(uint64(sb.ssize), paths.Cache)
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
// This call is meant to mark storage as 'freeable'. Given that unsealing is
|
||||
// very expensive, we don't remove data as soon as we can - instead we only
|
||||
// do that when we don't have free space for data that really needs it
|
||||
|
||||
// This function should not be called at this layer, everything should be
|
||||
// handled in localworker
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
||||
func (sb *Sealer) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
return xerrors.Errorf("not supported at this layer") // happens in localworker
|
||||
}
|
||||
|
||||
func GeneratePieceCIDFromFile(proofType abi.RegisteredSealProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) {
|
||||
f, werr, err := ToReadableFile(piece, int64(pieceSize))
|
||||
if err != nil {
|
||||
|
@ -297,7 +297,7 @@ func TestSealAndVerify(t *testing.T) {
|
||||
|
||||
post(t, sb, s)
|
||||
|
||||
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
|
||||
if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
}
|
||||
|
||||
@ -358,7 +358,7 @@ func TestSealPoStNoCommit(t *testing.T) {
|
||||
|
||||
precommit := time.Now()
|
||||
|
||||
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
|
||||
if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -10,7 +10,7 @@ require (
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1
|
||||
github.com/filecoin-project/specs-actors v0.6.0
|
||||
github.com/filecoin-project/specs-storage v0.1.0
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/hashicorp/go-multierror v1.0.0
|
||||
|
4
go.sum
4
go.sum
@ -45,8 +45,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifo
|
||||
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
|
||||
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
|
||||
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||
github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94=
|
||||
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"runtime"
|
||||
|
||||
"github.com/elastic/go-sysinfo"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -160,13 +161,13 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas
|
||||
return sb.SealCommit2(ctx, sector, phase1Out)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) error {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sb.FinalizeSector(ctx, sector); err != nil {
|
||||
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
|
||||
return xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
|
||||
@ -177,6 +178,26 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) error {
|
||||
return xerrors.Errorf("implement me")
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
var err error
|
||||
|
||||
if rerr := l.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
|
||||
}
|
||||
if rerr := l.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
|
||||
}
|
||||
if rerr := l.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error {
|
||||
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil {
|
||||
return xerrors.Errorf("moving sealed data to storage: %w", err)
|
||||
|
49
manager.go
49
manager.go
@ -382,7 +382,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -410,7 +410,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
return w.FinalizeSector(ctx, sector)
|
||||
return w.FinalizeSector(ctx, sector, keepUnsealed)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -421,8 +421,15 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
||||
return xerrors.Errorf("creating fetchSel: %w", err)
|
||||
}
|
||||
|
||||
moveUnsealed := unsealed
|
||||
{
|
||||
if len(keepUnsealed) == 0 {
|
||||
unsealed = stores.FTNone
|
||||
}
|
||||
}
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||
schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathStorage, stores.AcquireMove),
|
||||
schedFetch(sector, stores.FTCache|stores.FTSealed|moveUnsealed, stores.PathStorage, stores.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
return w.MoveStorage(ctx, sector)
|
||||
})
|
||||
@ -433,6 +440,42 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
return xerrors.Errorf("implement me")
|
||||
}
|
||||
|
||||
func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
unsealed := stores.FTUnsealed
|
||||
{
|
||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||
}
|
||||
|
||||
if len(unsealedStores) == 0 { // can be already removed
|
||||
unsealed = stores.FTNone
|
||||
}
|
||||
}
|
||||
|
||||
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("creating selector: %w", err)
|
||||
}
|
||||
|
||||
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
return w.Remove(ctx, sector)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
||||
l, err := m.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
|
19
mock/mock.go
19
mock/mock.go
@ -315,7 +315,23 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceI
|
||||
return id, []abi.PieceInfo{pi}, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID) error {
|
||||
func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID, []storage.Range) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
mgr.lk.Lock()
|
||||
defer mgr.lk.Unlock()
|
||||
|
||||
if _, has := mgr.sectors[sector]; !has {
|
||||
return xerrors.Errorf("sector not found")
|
||||
}
|
||||
|
||||
delete(mgr.sectors, sector)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -355,4 +371,5 @@ func (m mockVerif) GenerateWinningPoStSectorChallenge(ctx context.Context, proof
|
||||
|
||||
var MockVerifier = mockVerif{}
|
||||
|
||||
var _ storage.Sealer = &SectorMgr{}
|
||||
var _ ffiwrapper.Verifier = MockVerifier
|
||||
|
@ -73,7 +73,15 @@ func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o s
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user