From 33673a30c7fa07a565f06992ad7d298dfa5a9922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 May 2020 18:36:46 +0200 Subject: [PATCH] Wire up unsealing logic, track primary sector copies --- ffiwrapper/sealer_cgo.go | 78 ----------------------------- ffiwrapper/types.go | 3 +- localworker.go | 35 ++++++++++--- manager.go | 30 ++++++----- manager_test.go | 4 ++ roprov.go | 2 +- selector_existing.go | 2 +- stores/http_handler.go | 2 +- stores/index.go | 87 +++++++++++++++++++++++--------- stores/interface.go | 18 ++++++- stores/local.go | 104 ++++++++++++++++++++++++++++----------- stores/local_test.go | 91 ++++++++++++++++++++++++++++++++++ stores/remote.go | 29 +++++++---- testworker_test.go | 2 +- 14 files changed, 322 insertions(+), 165 deletions(-) create mode 100644 stores/local_test.go diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 879b04afb..1ffc10b72 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -315,84 +315,6 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se return nil } -func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) { - { - path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) - if err != nil { - return nil, xerrors.Errorf("acquire unsealed sector path: %w", err) - } - - f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644) - if err == nil { - if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { - doneUnsealed() - return nil, xerrors.Errorf("seek: %w", err) - } - - lr := io.LimitReader(f, int64(size)) - - return &struct { - io.Reader - io.Closer - }{ - Reader: lr, - Closer: closerFunc(func() error { - doneUnsealed() - return f.Close() - }), - }, nil - } - - doneUnsealed() - - if !os.IsNotExist(err) { - return nil, err - } - } - - paths, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed, false) - if err != nil { - return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err) - } - defer doneSealed() - - // TODO: GC for those - // (Probably configurable count of sectors to be kept unsealed, and just - // remove last used one (or use whatever other cache policy makes sense)) - err = ffi.Unseal( - sb.sealProofType, - paths.Cache, - paths.Sealed, - paths.Unsealed, - sector.Number, - sector.Miner, - ticket, - unsealedCID, - ) - if err != nil { - return nil, xerrors.Errorf("unseal failed: %w", err) - } - - f, err := os.OpenFile(paths.Unsealed, os.O_RDONLY, 0644) - if err != nil { - return nil, err - } - - if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { - return nil, xerrors.Errorf("seek: %w", err) - } - - lr := io.LimitReader(f, int64(size)) - - return &struct { - io.Reader - io.Closer - }{ - Reader: lr, - Closer: f, - }, nil -} - func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true) if err != nil { diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go index 932dc3c42..06c07b715 100644 --- a/ffiwrapper/types.go +++ b/ffiwrapper/types.go @@ -29,7 +29,8 @@ type Storage interface { storage.Prover StorageSealer - ReadPieceFromSealedSector(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) + UnsealPiece(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error + ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error } type Verifier interface { diff --git a/localworker.go b/localworker.go index bc4499e5b..a01623bc1 100644 --- a/localworker.go +++ b/localworker.go @@ -56,10 +56,11 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, type localWorkerPathProvider struct { w *LocalWorker + op stores.AcquireMode } func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { - paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing) + paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op) if err != nil { return stores.SectorPaths{}, nil, err } @@ -76,7 +77,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. sid := stores.PathByType(storageIDs, fileType) - if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil { + if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == stores.AcquireMove); err != nil { log.Errorf("declare sector error: %+v", err) } } @@ -105,8 +106,8 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs [] return sb.AddPiece(ctx, sector, epcs, sz, r) } -func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool) error { - _, done, err := (&localWorkerPathProvider{w: l}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing) +func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool, am stores.AcquireMode) error { + _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing) if err != nil { return err } @@ -182,12 +183,30 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return nil } -func (l *LocalWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { - panic("implement me") +func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { + sb, err := l.sb() + if err != nil { + return err + } + + if err := sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil { + return xerrors.Errorf("unsealing sector: %w", err) + } + + if err := l.storage.RemoveCopies(ctx, sector, stores.FTSealed | stores.FTCache); err != nil { + return xerrors.Errorf("removing source data: %w", err) + } + + return nil } -func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { - panic("implement me") +func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { + sb, err := l.sb() + if err != nil { + return err + } + + return sb.ReadPiece(ctx, writer, sector, index, size) } func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) { diff --git a/manager.go b/manager.go index 284ff272a..7450c130f 100644 --- a/manager.go +++ b/manager.go @@ -29,7 +29,7 @@ type URLs []string type Worker interface { ffiwrapper.StorageSealer - Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error + Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error UnsealPiece(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize) error @@ -183,9 +183,9 @@ func schedNop(context.Context, Worker) error { return nil } -func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) func(context.Context, Worker) error { +func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { - return worker.Fetch(ctx, sector, ft, sealing) + return worker.Fetch(ctx, sector, ft, sealing, am) } } @@ -207,10 +207,18 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect // TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed - // TODO!!!! make schedFetch COPY stores.FTSealed and stores.FTCache - // Moving those to a temp sealing storage may make PoSts fail + unsealFetch := func(ctx context.Context, worker Worker) error { + if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, true, stores.AcquireCopy); err != nil { + return xerrors.Errorf("copy sealed/cache sector data: %w", err) + } - err = m.sched.Schedule(ctx, sealtasks.TTUnseal, selector, schedFetch(sector, stores.FTUnsealed|stores.FTSealed|stores.FTCache, true), func(ctx context.Context, w Worker) error { + if err := worker.Fetch(ctx, sector, stores.FTUnsealed, true, stores.AcquireMove); err != nil { + return xerrors.Errorf("copy unsealed sector data: %w", err) + } + return nil + } + + err = m.sched.Schedule(ctx, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { return w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed) }) if err != nil { @@ -222,7 +230,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("creating readPiece selector: %w", err) } - err = m.sched.Schedule(ctx, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.ReadPiece(ctx, sink, sector, offset, size) }) if err != nil { @@ -270,7 +278,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke return nil, xerrors.Errorf("creating path selector: %w", err) } - err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) if err != nil { return err @@ -288,7 +296,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) } - err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit2(ctx, sector, phase1Out) if err != nil { return err @@ -309,7 +317,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a // (except, don't.. for now at least - we are using this step to bring data // into 'provable' storage. Optimally we'd do that in commit2, in parallel // with snark compute) - err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { + err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) if err != nil { return err @@ -342,7 +350,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error } return m.sched.Schedule(ctx, sealtasks.TTFinalize, selector, - schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false), + schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove), func(ctx context.Context, w Worker) error { return w.FinalizeSector(ctx, sector) }) diff --git a/manager_test.go b/manager_test.go index f89989989..165ecf280 100644 --- a/manager_test.go +++ b/manager_test.go @@ -65,6 +65,10 @@ func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error { return nil } +func (t *testStorage) Stat(path string) (stores.FsStat, error) { + return stores.Stat(path) +} + var _ stores.LocalStorage = &testStorage{} func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) { diff --git a/roprov.go b/roprov.go index 172cf7cf8..aba6bb5d9 100644 --- a/roprov.go +++ b/roprov.go @@ -20,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e return stores.SectorPaths{}, nil, xerrors.New("read-only storage") } - p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing) + p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) return p, done, err } diff --git a/selector_existing.go b/selector_existing.go index 14e6dbefd..bba48b965 100644 --- a/selector_existing.go +++ b/selector_existing.go @@ -12,7 +12,7 @@ import ( ) type existingSelector struct { - best []stores.StorageInfo + best []stores.SectorStorageInfo } func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) { diff --git a/stores/http_handler.go b/stores/http_handler.go index 2a3e85aef..7e2330dbd 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -70,7 +70,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ } // passing 0 spt because we don't allocate anything - paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false) + paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) if err != nil { log.Error("%+v", err) w.WriteHeader(500) diff --git a/stores/index.go b/stores/index.go index 6659a4422..0dafd87ed 100644 --- a/stores/index.go +++ b/stores/index.go @@ -38,16 +38,27 @@ type HealthReport struct { Err error } +type SectorStorageInfo struct { + ID ID + URLs []string // TODO: Support non-http transports + Weight uint64 + + CanSeal bool + CanStore bool + + Primary bool +} + type SectorIndex interface { // part of storage-miner api StorageAttach(context.Context, StorageInfo, FsStat) error StorageInfo(context.Context, ID) (StorageInfo, error) StorageReportHealth(context.Context, ID, HealthReport) error - StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error + StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error - StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) + StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) - StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) + StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) } type Decl struct { @@ -55,6 +66,11 @@ type Decl struct { SectorFileType } +type declMeta struct { + storage ID + primary bool +} + type storageEntry struct { info *StorageInfo fsi FsStat @@ -66,13 +82,13 @@ type storageEntry struct { type Index struct { lk sync.RWMutex - sectors map[Decl][]ID + sectors map[Decl][]*declMeta stores map[ID]*storageEntry } func NewIndex() *Index { return &Index{ - sectors: map[Decl][]ID{}, + sectors: map[Decl][]*declMeta{}, stores: map[ID]*storageEntry{}, } } @@ -88,7 +104,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) { } for decl, ids := range i.sectors { for _, id := range ids { - byID[id][decl.SectorID] |= decl.SectorFileType + byID[id.storage][decl.SectorID] |= decl.SectorFileType } } @@ -157,10 +173,11 @@ func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthRep return nil } -func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error { +func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error { i.lk.Lock() defer i.lk.Unlock() +loop: for _, fileType := range PathTypes { if fileType&ft == 0 { continue @@ -169,13 +186,20 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se d := Decl{s, fileType} for _, sid := range i.sectors[d] { - if sid == storageId { - log.Warnf("sector %v redeclared in %s", s, storageId) - return nil + if sid.storage == storageId { + if !sid.primary && primary { + sid.primary = true + } else { + log.Warnf("sector %v redeclared in %s", s, storageId) + } + continue loop } } - i.sectors[d] = append(i.sectors[d], storageId) + i.sectors[d] = append(i.sectors[d], &declMeta{ + storage: storageId, + primary: primary, + }) } return nil @@ -196,9 +220,9 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto return nil } - rewritten := make([]ID, 0, len(i.sectors[d])-1) + rewritten := make([]*declMeta, 0, len(i.sectors[d])-1) for _, sid := range i.sectors[d] { - if sid == storageId { + if sid.storage == storageId { continue } @@ -215,11 +239,12 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto return nil } -func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) { +func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() storageIDs := map[ID]uint64{} + isprimary := map[ID]bool{} for _, pathType := range PathTypes { if ft&pathType == 0 { @@ -227,11 +252,12 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector } for _, id := range i.sectors[Decl{s, pathType}] { - storageIDs[id]++ + storageIDs[id.storage]++ + isprimary[id.storage] = isprimary[id.storage] || id.primary } } - out := make([]StorageInfo, 0, len(storageIDs)) + out := make([]SectorStorageInfo, 0, len(storageIDs)) for id, n := range storageIDs { st, ok := i.stores[id] @@ -251,12 +277,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector urls[k] = rl.String() } - out = append(out, StorageInfo{ + out = append(out, SectorStorageInfo{ ID: id, URLs: urls, Weight: st.info.Weight * n, // storage with more sector types is better + CanSeal: st.info.CanSeal, CanStore: st.info.CanStore, + + Primary: isprimary[id], }) } @@ -277,12 +306,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector urls[k] = rl.String() } - out = append(out, StorageInfo{ + out = append(out, SectorStorageInfo{ ID: id, URLs: urls, Weight: st.info.Weight * 0, // TODO: something better than just '0' + CanSeal: st.info.CanSeal, CanStore: st.info.CanStore, + + Primary: false, }) } } @@ -302,7 +334,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) { return *si.info, nil } -func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) { +func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -314,10 +346,10 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s } for _, p := range i.stores { - if sealing && !p.info.CanSeal { + if (pathType == PathSealing) && !p.info.CanSeal { continue } - if !sealing && !p.info.CanStore { + if (pathType == PathStorage) && !p.info.CanStore { continue } @@ -362,10 +394,19 @@ func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) { i.lk.RLock() defer i.lk.RUnlock() - return i.sectors[Decl{ + f, ok := i.sectors[Decl{ SectorID: id, SectorFileType: typ, - }], nil + }] + if !ok { + return nil, nil + } + out := make([]ID, 0, len(f)) + for _, meta := range f { + out = append(out, meta.storage) + } + + return out, nil } var _ SectorIndex = &Index{} diff --git a/stores/interface.go b/stores/interface.go index 0735f7bf8..a818406a5 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -9,10 +9,26 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" ) +type PathType bool +const ( + PathStorage = false + PathSealing = true +) + +type AcquireMode string +const ( + AcquireMove = "move" + AcquireCopy = "copy" +) + type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error) + AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error) Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error + // like remove, but doesn't remove the primary sector copy, nor the last + // non-primary copy if there no primary copies + RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error + // move sectors into storage MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error diff --git a/stores/local.go b/stores/local.go index 16f333ac4..9c0dc4477 100644 --- a/stores/local.go +++ b/stores/local.go @@ -47,6 +47,8 @@ type LocalPath struct { type LocalStorage interface { GetStorage() (StorageConfig, error) SetStorage(func(*StorageConfig)) error + + Stat(path string) (FsStat, error) } const MetaFile = "sectorstore.json" @@ -98,7 +100,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { local: p, } - fst, err := Stat(p) + fst, err := st.localStorage.Stat(p) if err != nil { return err } @@ -133,7 +135,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) } - if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil { + if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t, meta.CanStore); err != nil { return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err) } } @@ -177,7 +179,7 @@ func (st *Local) reportHealth(ctx context.Context) { toReport := map[ID]HealthReport{} for id, p := range st.paths { - stat, err := Stat(p.local) + stat, err := st.localStorage.Stat(p.local) toReport[id] = HealthReport{ Stat: stat, @@ -195,7 +197,7 @@ func (st *Local) reportHealth(ctx context.Context) { } } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } @@ -240,7 +242,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re continue } - sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing) + sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType) if err != nil { st.localLk.RUnlock() return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) @@ -259,11 +261,11 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re continue } - if sealing && !si.CanSeal { + if (pathType == PathSealing) && !si.CanSeal { continue } - if !sealing && !si.CanStore { + if (pathType == PathStorage) && !si.CanStore { continue } @@ -328,38 +330,82 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp } for _, info := range si { - p, ok := st.paths[info.ID] - if !ok { - continue - } - - if p.local == "" { // TODO: can that even be the case? - continue - } - - if err := st.index.StorageDropSector(ctx, info.ID, sid, typ); err != nil { - return xerrors.Errorf("dropping sector from index: %w", err) - } - - spath := filepath.Join(p.local, typ.String(), SectorName(sid)) - log.Infof("remove %s", spath) - - if err := os.RemoveAll(spath); err != nil { - log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) + if err := st.removeSector(ctx, sid, typ, info.ID); err != nil { + return err } } return nil } +func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorFileType) error { + if bits.OnesCount(uint(typ)) != 1 { + return xerrors.New("delete expects one file type") + } + + si, err := st.index.StorageFindSector(ctx, sid, typ, false) + if err != nil { + return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) + } + + var hasPrimary bool + for _, info := range si { + if info.Primary { + hasPrimary = true + break + } + } + + if !hasPrimary { + log.Warnf("RemoveCopies: no primary copies of sector %v (%s), not removing anything", sid, typ) + return nil + } + + for _, info := range si { + if info.Primary { + continue + } + + if err := st.removeSector(ctx, sid, typ, info.ID); err != nil { + return err + } + } + + return nil +} + +func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorFileType, storage ID) error { + p, ok := st.paths[storage] + if !ok { + return nil + } + + if p.local == "" { // TODO: can that even be the case? + return nil + } + + if err := st.index.StorageDropSector(ctx, storage, sid, typ); err != nil { + return xerrors.Errorf("dropping sector from index: %w", err) + } + + spath := filepath.Join(p.local, typ.String(), SectorName(sid)) + log.Infof("remove %s", spath) + + if err := os.RemoveAll(spath); err != nil { + log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) + } + + return nil +} + func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { - dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false) + dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove) if err != nil { return xerrors.Errorf("acquire dest storage: %w", err) } defer sdone() - src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false) + src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage: %w", err) } @@ -401,7 +447,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) } - if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType); err != nil { + if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType, true); err != nil { return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err) } } @@ -420,7 +466,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) { return FsStat{}, errPathNotFound } - return Stat(p.local) + return st.localStorage.Stat(p.local) } var _ Store = &Local{} diff --git a/stores/local_test.go b/stores/local_test.go new file mode 100644 index 000000000..d98c59182 --- /dev/null +++ b/stores/local_test.go @@ -0,0 +1,91 @@ +package stores + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +const pathSize = 16 << 20 + +type TestingLocalStorage struct { + root string + c StorageConfig +} + +func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) { + return t.c, nil +} + +func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error { + f(&t.c) + return nil +} + +func (t *TestingLocalStorage) Stat(path string) (FsStat, error) { + return FsStat{ + Capacity: pathSize, + Available: pathSize, + Used: 0, + }, nil +} + +func (t *TestingLocalStorage) init(subpath string) error { + path := filepath.Join(t.root, subpath) + if err := os.Mkdir(path, 0755); err != nil { + return err + } + + metaFile := filepath.Join(path, MetaFile) + + meta := &LocalStorageMeta{ + ID: ID(uuid.New().String()), + Weight: 1, + CanSeal: true, + CanStore: true, + } + + mb, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return err + } + + if err := ioutil.WriteFile(metaFile, mb, 0644); err != nil { + return err + } + + return nil +} + + +var _ LocalStorage = &TestingLocalStorage{} + +func TestLocalStorage(t *testing.T) { + ctx := context.TODO() + + root, err := ioutil.TempDir("", "sector-storage-teststorage-") + require.NoError(t, err) + + tstor := &TestingLocalStorage{ + root: root, + } + + index := NewIndex() + + st, err := NewLocal(ctx, tstor, index, nil) + require.NoError(t, err) + + p1 := "1" + require.NoError(t, tstor.init("1")) + + err = st.OpenPath(ctx, filepath.Join(tstor.root, p1)) + require.NoError(t, err) + + // TODO: put more things here +} diff --git a/stores/remote.go b/stores/remote.go index 151c0ed2f..e510d71d1 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -32,6 +32,14 @@ type Remote struct { fetching map[abi.SectorID]chan struct{} } +func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error { + // TODO: do this on remotes too + // (not that we really need to do that since it's always called by the + // worker which pulled the copy) + + return r.local.RemoveCopies(ctx, s, types) +} + func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { return &Remote{ local: local, @@ -42,7 +50,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } @@ -74,7 +82,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi r.fetchLk.Unlock() }() - paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing) + paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op) if err != nil { return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) } @@ -88,7 +96,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi continue } - ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing) + ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op) if err != nil { done() return SectorPaths{}, SectorPaths{}, nil, err @@ -98,21 +106,22 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi SetPathByType(&paths, fileType, ap) SetPathByType(&stores, fileType, string(storageID)) - if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { + if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) continue } - // TODO: some way to allow having duplicated sectors in the system for perf - if err := r.deleteFromRemote(ctx, url); err != nil { - log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) + if op == AcquireMove { + if err := r.deleteFromRemote(ctx, url); err != nil { + log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) + } } } return paths, stores, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { return "", "", "", nil, err @@ -126,7 +135,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi. return si[i].Weight < si[j].Weight }) - apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing) + apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op) if err != nil { return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } @@ -206,7 +215,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { // Make sure we have the data local - _, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false) + _, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } diff --git a/testworker_test.go b/testworker_test.go index cb9a82a2c..68d70c838 100644 --- a/testworker_test.go +++ b/testworker_test.go @@ -78,7 +78,7 @@ func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) er panic("implement me") } -func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool) error { +func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool, am stores.AcquireMode) error { return nil }