Wire up unsealing logic, track primary sector copies

This commit is contained in:
Łukasz Magiera 2020-05-20 18:36:46 +02:00
parent 8f70192bf3
commit 33673a30c7
14 changed files with 322 additions and 165 deletions

View File

@ -315,84 +315,6 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
return nil
}
func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) {
{
path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644)
if err == nil {
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
doneUnsealed()
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: closerFunc(func() error {
doneUnsealed()
return f.Close()
}),
}, nil
}
doneUnsealed()
if !os.IsNotExist(err) {
return nil, err
}
}
paths, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed, false)
if err != nil {
return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err)
}
defer doneSealed()
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
err = ffi.Unseal(
sb.sealProofType,
paths.Cache,
paths.Sealed,
paths.Unsealed,
sector.Number,
sector.Miner,
ticket,
unsealedCID,
)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err := os.OpenFile(paths.Unsealed, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true)
if err != nil {

View File

@ -29,7 +29,8 @@ type Storage interface {
storage.Prover
StorageSealer
ReadPieceFromSealedSector(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
UnsealPiece(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
}
type Verifier interface {

View File

@ -56,10 +56,11 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
type localWorkerPathProvider struct {
w *LocalWorker
op stores.AcquireMode
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing)
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op)
if err != nil {
return stores.SectorPaths{}, nil, err
}
@ -76,7 +77,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
sid := stores.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil {
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == stores.AcquireMove); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
@ -105,8 +106,8 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
return sb.AddPiece(ctx, sector, epcs, sz, r)
}
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool) error {
_, done, err := (&localWorkerPathProvider{w: l}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing)
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool, am stores.AcquireMode) error {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing)
if err != nil {
return err
}
@ -182,12 +183,30 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return nil
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
panic("implement me")
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
sb, err := l.sb()
if err != nil {
return err
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
panic("implement me")
if err := sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
return xerrors.Errorf("unsealing sector: %w", err)
}
if err := l.storage.RemoveCopies(ctx, sector, stores.FTSealed | stores.FTCache); err != nil {
return xerrors.Errorf("removing source data: %w", err)
}
return nil
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.ReadPiece(ctx, writer, sector, index, size)
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {

View File

@ -29,7 +29,7 @@ type URLs []string
type Worker interface {
ffiwrapper.StorageSealer
Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error
UnsealPiece(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
@ -183,9 +183,9 @@ func schedNop(context.Context, Worker) error {
return nil
}
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) func(context.Context, Worker) error {
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error {
return worker.Fetch(ctx, sector, ft, sealing)
return worker.Fetch(ctx, sector, ft, sealing, am)
}
}
@ -207,10 +207,18 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
// TODO!!!! make schedFetch COPY stores.FTSealed and stores.FTCache
// Moving those to a temp sealing storage may make PoSts fail
unsealFetch := func(ctx context.Context, worker Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, true, stores.AcquireCopy); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
}
err = m.sched.Schedule(ctx, sealtasks.TTUnseal, selector, schedFetch(sector, stores.FTUnsealed|stores.FTSealed|stores.FTCache, true), func(ctx context.Context, w Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, true, stores.AcquireMove); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
return nil
}
err = m.sched.Schedule(ctx, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
return w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)
})
if err != nil {
@ -222,7 +230,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return xerrors.Errorf("creating readPiece selector: %w", err)
}
err = m.sched.Schedule(ctx, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
return w.ReadPiece(ctx, sink, sector, offset, size)
})
if err != nil {
@ -270,7 +278,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("creating path selector: %w", err)
}
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil {
return err
@ -288,7 +296,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
}
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil {
return err
@ -309,7 +317,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
// (except, don't.. for now at least - we are using this step to bring data
// into 'provable' storage. Optimally we'd do that in commit2, in parallel
// with snark compute)
err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil {
return err
@ -342,7 +350,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
}
return m.sched.Schedule(ctx, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false),
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector)
})

View File

@ -65,6 +65,10 @@ func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error {
return nil
}
func (t *testStorage) Stat(path string) (stores.FsStat, error) {
return stores.Stat(path)
}
var _ stores.LocalStorage = &testStorage{}
func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) {

View File

@ -20,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing)
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove)
return p, done, err
}

View File

@ -12,7 +12,7 @@ import (
)
type existingSelector struct {
best []stores.StorageInfo
best []stores.SectorStorageInfo
}
func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) {

View File

@ -70,7 +70,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
}
// passing 0 spt because we don't allocate anything
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false)
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
if err != nil {
log.Error("%+v", err)
w.WriteHeader(500)

View File

@ -38,16 +38,27 @@ type HealthReport struct {
Err error
}
type SectorStorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Weight uint64
CanSeal bool
CanStore bool
Primary bool
}
type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo, FsStat) error
StorageInfo(context.Context, ID) (StorageInfo, error)
StorageReportHealth(context.Context, ID, HealthReport) error
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error)
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error)
}
type Decl struct {
@ -55,6 +66,11 @@ type Decl struct {
SectorFileType
}
type declMeta struct {
storage ID
primary bool
}
type storageEntry struct {
info *StorageInfo
fsi FsStat
@ -66,13 +82,13 @@ type storageEntry struct {
type Index struct {
lk sync.RWMutex
sectors map[Decl][]ID
sectors map[Decl][]*declMeta
stores map[ID]*storageEntry
}
func NewIndex() *Index {
return &Index{
sectors: map[Decl][]ID{},
sectors: map[Decl][]*declMeta{},
stores: map[ID]*storageEntry{},
}
}
@ -88,7 +104,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
}
for decl, ids := range i.sectors {
for _, id := range ids {
byID[id][decl.SectorID] |= decl.SectorFileType
byID[id.storage][decl.SectorID] |= decl.SectorFileType
}
}
@ -157,10 +173,11 @@ func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthRep
return nil
}
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error {
i.lk.Lock()
defer i.lk.Unlock()
loop:
for _, fileType := range PathTypes {
if fileType&ft == 0 {
continue
@ -169,13 +186,20 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
d := Decl{s, fileType}
for _, sid := range i.sectors[d] {
if sid == storageId {
if sid.storage == storageId {
if !sid.primary && primary {
sid.primary = true
} else {
log.Warnf("sector %v redeclared in %s", s, storageId)
return nil
}
continue loop
}
}
i.sectors[d] = append(i.sectors[d], storageId)
i.sectors[d] = append(i.sectors[d], &declMeta{
storage: storageId,
primary: primary,
})
}
return nil
@ -196,9 +220,9 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
return nil
}
rewritten := make([]ID, 0, len(i.sectors[d])-1)
rewritten := make([]*declMeta, 0, len(i.sectors[d])-1)
for _, sid := range i.sectors[d] {
if sid == storageId {
if sid.storage == storageId {
continue
}
@ -215,11 +239,12 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
return nil
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) {
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := map[ID]uint64{}
isprimary := map[ID]bool{}
for _, pathType := range PathTypes {
if ft&pathType == 0 {
@ -227,11 +252,12 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
}
for _, id := range i.sectors[Decl{s, pathType}] {
storageIDs[id]++
storageIDs[id.storage]++
isprimary[id.storage] = isprimary[id.storage] || id.primary
}
}
out := make([]StorageInfo, 0, len(storageIDs))
out := make([]SectorStorageInfo, 0, len(storageIDs))
for id, n := range storageIDs {
st, ok := i.stores[id]
@ -251,12 +277,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
urls[k] = rl.String()
}
out = append(out, StorageInfo{
out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
Weight: st.info.Weight * n, // storage with more sector types is better
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
Primary: isprimary[id],
})
}
@ -277,12 +306,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
urls[k] = rl.String()
}
out = append(out, StorageInfo{
out = append(out, SectorStorageInfo{
ID: id,
URLs: urls,
Weight: st.info.Weight * 0, // TODO: something better than just '0'
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
Primary: false,
})
}
}
@ -302,7 +334,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil
}
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) {
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
@ -314,10 +346,10 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
}
for _, p := range i.stores {
if sealing && !p.info.CanSeal {
if (pathType == PathSealing) && !p.info.CanSeal {
continue
}
if !sealing && !p.info.CanStore {
if (pathType == PathStorage) && !p.info.CanStore {
continue
}
@ -362,10 +394,19 @@ func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) {
i.lk.RLock()
defer i.lk.RUnlock()
return i.sectors[Decl{
f, ok := i.sectors[Decl{
SectorID: id,
SectorFileType: typ,
}], nil
}]
if !ok {
return nil, nil
}
out := make([]ID, 0, len(f))
for _, meta := range f {
out = append(out, meta.storage)
}
return out, nil
}
var _ SectorIndex = &Index{}

View File

@ -9,10 +9,26 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
)
type PathType bool
const (
PathStorage = false
PathSealing = true
)
type AcquireMode string
const (
AcquireMove = "move"
AcquireCopy = "copy"
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last
// non-primary copy if there no primary copies
RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error
// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error

View File

@ -47,6 +47,8 @@ type LocalPath struct {
type LocalStorage interface {
GetStorage() (StorageConfig, error)
SetStorage(func(*StorageConfig)) error
Stat(path string) (FsStat, error)
}
const MetaFile = "sectorstore.json"
@ -98,7 +100,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
local: p,
}
fst, err := Stat(p)
fst, err := st.localStorage.Stat(p)
if err != nil {
return err
}
@ -133,7 +135,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil {
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t, meta.CanStore); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err)
}
}
@ -177,7 +179,7 @@ func (st *Local) reportHealth(ctx context.Context) {
toReport := map[ID]HealthReport{}
for id, p := range st.paths {
stat, err := Stat(p.local)
stat, err := st.localStorage.Stat(p.local)
toReport[id] = HealthReport{
Stat: stat,
@ -195,7 +197,7 @@ func (st *Local) reportHealth(ctx context.Context) {
}
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
@ -240,7 +242,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue
}
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing)
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
if err != nil {
st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
@ -259,11 +261,11 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue
}
if sealing && !si.CanSeal {
if (pathType == PathSealing) && !si.CanSeal {
continue
}
if !sealing && !si.CanStore {
if (pathType == PathStorage) && !si.CanStore {
continue
}
@ -328,16 +330,61 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
}
for _, info := range si {
p, ok := st.paths[info.ID]
if !ok {
if err := st.removeSector(ctx, sid, typ, info.ID); err != nil {
return err
}
}
return nil
}
func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type")
}
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
if err != nil {
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
}
var hasPrimary bool
for _, info := range si {
if info.Primary {
hasPrimary = true
break
}
}
if !hasPrimary {
log.Warnf("RemoveCopies: no primary copies of sector %v (%s), not removing anything", sid, typ)
return nil
}
for _, info := range si {
if info.Primary {
continue
}
if err := st.removeSector(ctx, sid, typ, info.ID); err != nil {
return err
}
}
return nil
}
func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorFileType, storage ID) error {
p, ok := st.paths[storage]
if !ok {
return nil
}
if p.local == "" { // TODO: can that even be the case?
continue
return nil
}
if err := st.index.StorageDropSector(ctx, info.ID, sid, typ); err != nil {
if err := st.index.StorageDropSector(ctx, storage, sid, typ); err != nil {
return xerrors.Errorf("dropping sector from index: %w", err)
}
@ -347,19 +394,18 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
if err := os.RemoveAll(spath); err != nil {
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
}
}
return nil
}
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false)
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
}
defer sdone()
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false)
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage: %w", err)
}
@ -401,7 +447,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
}
if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType); err != nil {
if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType, true); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err)
}
}
@ -420,7 +466,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
return FsStat{}, errPathNotFound
}
return Stat(p.local)
return st.localStorage.Stat(p.local)
}
var _ Store = &Local{}

91
stores/local_test.go Normal file
View File

@ -0,0 +1,91 @@
package stores
import (
"context"
"encoding/json"
"github.com/google/uuid"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
const pathSize = 16 << 20
type TestingLocalStorage struct {
root string
c StorageConfig
}
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
return t.c, nil
}
func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error {
f(&t.c)
return nil
}
func (t *TestingLocalStorage) Stat(path string) (FsStat, error) {
return FsStat{
Capacity: pathSize,
Available: pathSize,
Used: 0,
}, nil
}
func (t *TestingLocalStorage) init(subpath string) error {
path := filepath.Join(t.root, subpath)
if err := os.Mkdir(path, 0755); err != nil {
return err
}
metaFile := filepath.Join(path, MetaFile)
meta := &LocalStorageMeta{
ID: ID(uuid.New().String()),
Weight: 1,
CanSeal: true,
CanStore: true,
}
mb, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return err
}
if err := ioutil.WriteFile(metaFile, mb, 0644); err != nil {
return err
}
return nil
}
var _ LocalStorage = &TestingLocalStorage{}
func TestLocalStorage(t *testing.T) {
ctx := context.TODO()
root, err := ioutil.TempDir("", "sector-storage-teststorage-")
require.NoError(t, err)
tstor := &TestingLocalStorage{
root: root,
}
index := NewIndex()
st, err := NewLocal(ctx, tstor, index, nil)
require.NoError(t, err)
p1 := "1"
require.NoError(t, tstor.init("1"))
err = st.OpenPath(ctx, filepath.Join(tstor.root, p1))
require.NoError(t, err)
// TODO: put more things here
}

View File

@ -32,6 +32,14 @@ type Remote struct {
fetching map[abi.SectorID]chan struct{}
}
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error {
// TODO: do this on remotes too
// (not that we really need to do that since it's always called by the
// worker which pulled the copy)
return r.local.RemoveCopies(ctx, s, types)
}
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
return &Remote{
local: local,
@ -42,7 +50,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
@ -74,7 +82,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
r.fetchLk.Unlock()
}()
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing)
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil {
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
}
@ -88,7 +96,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
continue
}
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing)
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
if err != nil {
done()
return SectorPaths{}, SectorPaths{}, nil, err
@ -98,21 +106,22 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID))
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
continue
}
// TODO: some way to allow having duplicated sectors in the system for perf
if op == AcquireMove {
if err := r.deleteFromRemote(ctx, url); err != nil {
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
}
}
}
return paths, stores, done, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", "", nil, err
@ -126,7 +135,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
return si[i].Weight < si[j].Weight
})
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing)
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
@ -206,7 +215,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
// Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false)
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err)
}

View File

@ -78,7 +78,7 @@ func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) er
panic("implement me")
}
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool) error {
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool, am stores.AcquireMode) error {
return nil
}