diff --git a/faults.go b/faults.go index 1f5475259..75143d55e 100644 --- a/faults.go +++ b/faults.go @@ -19,13 +19,14 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se var bad []abi.SectorID // TODO: More better checks + // TODO: This should live in sector-storage + // TODO: Use proper locking for _, sector := range sectors { err := func() error { - lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove) + lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove) if err != nil { return xerrors.Errorf("acquire sector in checkProvable: %w", err) } - defer done() if lp.Sealed == "" || lp.Cache == "" { log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache) diff --git a/localworker.go b/localworker.go index 52a1c7cb0..7e2e9ca26 100644 --- a/localworker.go +++ b/localworker.go @@ -60,7 +60,7 @@ type localWorkerPathProvider struct { } func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { - paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op) + paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op) if err != nil { return stores.SectorPaths{}, nil, err } @@ -68,8 +68,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) return paths, func() { - done() - for _, fileType := range pathTypes { if fileType&allocate == 0 { continue diff --git a/roprov.go b/roprov.go index 0e8950478..aa94521a3 100644 --- a/roprov.go +++ b/roprov.go @@ -26,10 +26,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) } - p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) + p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) - return p, func() { - cancel() - done() - }, err + return p, cancel, err } diff --git a/stores/http_handler.go b/stores/http_handler.go index 7e2330dbd..60f8a41c5 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -69,14 +69,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ return } + // The caller has a lock on this sector already, no need to get one here + // passing 0 spt because we don't allocate anything - paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) + paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) if err != nil { log.Error("%+v", err) w.WriteHeader(500) return } - defer done() path := PathByType(paths, ft) if path == "" { diff --git a/stores/interface.go b/stores/interface.go index 01ac2bffe..c400019aa 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -24,7 +24,7 @@ const ( ) type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error) + AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error) Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error // like remove, but doesn't remove the primary sector copy, nor the last diff --git a/stores/local.go b/stores/local.go index 9c0dc4477..bf11a3418 100644 --- a/stores/local.go +++ b/stores/local.go @@ -197,12 +197,13 @@ func (st *Local) reportHealth(ctx context.Context) { } } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { if existing|allocate != existing^allocate { - return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") } st.localLk.RLock() + defer st.localLk.RUnlock() var out SectorPaths var storageIDs SectorPaths @@ -245,7 +246,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType) if err != nil { st.localLk.RUnlock() - return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) + return SectorPaths{}, SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err) } var best string @@ -277,7 +278,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re if best == "" { st.localLk.RUnlock() - return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") + return SectorPaths{}, SectorPaths{}, xerrors.Errorf("couldn't find a suitable path for a sector") } SetPathByType(&out, fileType, best) @@ -285,7 +286,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re allocate ^= fileType } - return out, storageIDs, st.localLk.RUnlock, nil + return out, storageIDs, nil } func (st *Local) Local(ctx context.Context) ([]StoragePath, error) { @@ -399,17 +400,15 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF } func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { - dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove) + dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove) if err != nil { return xerrors.Errorf("acquire dest storage: %w", err) } - defer sdone() - src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove) + src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage: %w", err) } - defer ddone() for _, fileType := range PathTypes { if fileType&types == 0 { diff --git a/stores/remote.go b/stores/remote.go index e510d71d1..be1ebf1d2 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -50,9 +50,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { if existing|allocate != existing^allocate { - return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") } for { @@ -71,7 +71,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi case <-c: continue case <-ctx.Done(): - return SectorPaths{}, SectorPaths{}, nil, ctx.Err() + return SectorPaths{}, SectorPaths{}, ctx.Err() } } @@ -82,9 +82,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi r.fetchLk.Unlock() }() - paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op) + paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op) if err != nil { - return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) + return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) } for _, fileType := range PathTypes { @@ -96,13 +96,11 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi continue } - ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op) + ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op) if err != nil { - done() - return SectorPaths{}, SectorPaths{}, nil, err + return SectorPaths{}, SectorPaths{}, err } - done = mergeDone(done, rdone) SetPathByType(&paths, fileType, ap) SetPathByType(&stores, fileType, string(storageID)) @@ -118,26 +116,26 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi } } - return paths, stores, done, nil + return paths, stores, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { - return "", "", "", nil, err + return "", "", "", err } if len(si) == 0 { - return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) + return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) } sort.Slice(si, func(i, j int) bool { return si[i].Weight < si[j].Weight }) - apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op) + apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op) if err != nil { - return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) + return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err) } dest := PathByType(apaths, fileType) storageID := PathByType(ids, fileType) @@ -156,12 +154,11 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi. if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } - return dest, ID(storageID), url, done, nil + return dest, ID(storageID), url, nil } } - done() - return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) + return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } func (r *Remote) fetch(ctx context.Context, url, outname string) error { @@ -215,11 +212,10 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { // Make sure we have the data local - _, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) + _, _, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } - ddone() return r.local.MoveStorage(ctx, s, spt, types) } @@ -336,11 +332,4 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) { return out, nil } -func mergeDone(a func(), b func()) func() { - return func() { - a() - b() - } -} - var _ Store = &Remote{}