From 07cf84cbc787fc965a887a073018f061b9fdba4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 22 Jun 2020 17:02:59 +0200 Subject: [PATCH] Update specs-storage, sector removing support --- ffiwrapper/sealer_cgo.go | 20 +++++++++++++++- ffiwrapper/sealer_test.go | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- localworker.go | 25 ++++++++++++++++++-- manager.go | 49 ++++++++++++++++++++++++++++++++++++--- mock/mock.go | 19 ++++++++++++++- testworker_test.go | 10 +++++++- 8 files changed, 120 insertions(+), 13 deletions(-) diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index d17aed272..6510f81cc 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -500,7 +500,11 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner) } -func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error { +func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + if len(keepUnsealed) > 0 { + return xerrors.Errorf("keepUnsealed unsupported") // TODO: impl for fastretrieval copies + } + paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false) if err != nil { return xerrors.Errorf("acquiring sector cache path: %w", err) @@ -510,6 +514,20 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error return ffi.ClearCache(uint64(sb.ssize), paths.Cache) } +func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + // This call is meant to mark storage as 'freeable'. Given that unsealing is + // very expensive, we don't remove data as soon as we can - instead we only + // do that when we don't have free space for data that really needs it + + // This function should not be called at this layer, everything should be + // handled in localworker + return xerrors.Errorf("not supported at this layer") +} + +func (sb *Sealer) Remove(ctx context.Context, sector abi.SectorID) error { + return xerrors.Errorf("not supported at this layer") // happens in localworker +} + func GeneratePieceCIDFromFile(proofType abi.RegisteredSealProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) { f, werr, err := ToReadableFile(piece, int64(pieceSize)) if err != nil { diff --git a/ffiwrapper/sealer_test.go b/ffiwrapper/sealer_test.go index 7c1ad0474..5e6f02cd2 100644 --- a/ffiwrapper/sealer_test.go +++ b/ffiwrapper/sealer_test.go @@ -297,7 +297,7 @@ func TestSealAndVerify(t *testing.T) { post(t, sb, s) - if err := sb.FinalizeSector(context.TODO(), si); err != nil { + if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil { t.Fatalf("%+v", err) } @@ -358,7 +358,7 @@ func TestSealPoStNoCommit(t *testing.T) { precommit := time.Now() - if err := sb.FinalizeSector(context.TODO(), si); err != nil { + if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 82eab906c..60f31942f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/specs-actors v0.6.0 - github.com/filecoin-project/specs-storage v0.1.0 + github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-multierror v1.0.0 diff --git a/go.sum b/go.sum index 33fbe76a6..300226c5e 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifo github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0= github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= -github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94= -github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= +github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY= +github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= diff --git a/localworker.go b/localworker.go index 6056fb214..a1d82209a 100644 --- a/localworker.go +++ b/localworker.go @@ -7,6 +7,7 @@ import ( "runtime" "github.com/elastic/go-sysinfo" + "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -160,13 +161,13 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas return sb.SealCommit2(ctx, sector, phase1Out) } -func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error { +func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) error { sb, err := l.sb() if err != nil { return err } - if err := sb.FinalizeSector(ctx, sector); err != nil { + if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil { return xerrors.Errorf("finalizing sector: %w", err) } @@ -177,6 +178,26 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return nil } +func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) error { + return xerrors.Errorf("implement me") +} + +func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { + var err error + + if rerr := l.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) + } + if rerr := l.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) + } + if rerr := l.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) + } + + return err +} + func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error { if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil { return xerrors.Errorf("moving sealed data to storage: %w", err) diff --git a/manager.go b/manager.go index 383320483..ba84e579d 100644 --- a/manager.go +++ b/manager.go @@ -382,7 +382,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou return out, err } -func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { +func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -410,7 +410,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { - return w.FinalizeSector(ctx, sector) + return w.FinalizeSector(ctx, sector, keepUnsealed) }) if err != nil { return err @@ -421,8 +421,15 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error return xerrors.Errorf("creating fetchSel: %w", err) } + moveUnsealed := unsealed + { + if len(keepUnsealed) == 0 { + unsealed = stores.FTNone + } + } + err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, - schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathStorage, stores.AcquireMove), + schedFetch(sector, stores.FTCache|stores.FTSealed|moveUnsealed, stores.PathStorage, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.MoveStorage(ctx, sector) }) @@ -433,6 +440,42 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error return nil } +func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + return xerrors.Errorf("implement me") +} + +func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil { + return xerrors.Errorf("acquiring sector lock: %w", err) + } + + unsealed := stores.FTUnsealed + { + unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) + if err != nil { + return xerrors.Errorf("finding unsealed sector: %w", err) + } + + if len(unsealedStores) == 0 { // can be already removed + unsealed = stores.FTNone + } + } + + selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false) + if err != nil { + return xerrors.Errorf("creating selector: %w", err) + } + + return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, + schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove), + func(ctx context.Context, w Worker) error { + return w.Remove(ctx, sector) + }) +} + func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { l, err := m.localStore.Local(ctx) if err != nil { diff --git a/mock/mock.go b/mock/mock.go index eae17bd3c..cbc3a1f99 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -315,7 +315,23 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceI return id, []abi.PieceInfo{pi}, nil } -func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID) error { +func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID, []storage.Range) error { + return nil +} + +func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + panic("implement me") +} + +func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + if _, has := mgr.sectors[sector]; !has { + return xerrors.Errorf("sector not found") + } + + delete(mgr.sectors, sector) return nil } @@ -355,4 +371,5 @@ func (m mockVerif) GenerateWinningPoStSectorChallenge(ctx context.Context, proof var MockVerifier = mockVerif{} +var _ storage.Sealer = &SectorMgr{} var _ ffiwrapper.Verifier = MockVerifier diff --git a/testworker_test.go b/testworker_test.go index d28761702..5ca51b771 100644 --- a/testworker_test.go +++ b/testworker_test.go @@ -73,7 +73,15 @@ func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o s panic("implement me") } -func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error { +func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + panic("implement me") +} + +func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + panic("implement me") +} + +func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) error { panic("implement me") }