Drop AcquireSector locks

This commit is contained in:
Łukasz Magiera 2020-06-04 21:00:16 +02:00
parent b5674f12f0
commit aac3c448a4
7 changed files with 34 additions and 49 deletions

View File

@ -19,13 +19,14 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se
var bad []abi.SectorID var bad []abi.SectorID
// TODO: More better checks // TODO: More better checks
// TODO: This should live in sector-storage
// TODO: Use proper locking
for _, sector := range sectors { for _, sector := range sectors {
err := func() error { err := func() error {
lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove) lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire sector in checkProvable: %w", err) return xerrors.Errorf("acquire sector in checkProvable: %w", err)
} }
defer done()
if lp.Sealed == "" || lp.Cache == "" { if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache) log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)

View File

@ -60,7 +60,7 @@ type localWorkerPathProvider struct {
} }
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op) paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op)
if err != nil { if err != nil {
return stores.SectorPaths{}, nil, err return stores.SectorPaths{}, nil, err
} }
@ -68,8 +68,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() { return paths, func() {
done()
for _, fileType := range pathTypes { for _, fileType := range pathTypes {
if fileType&allocate == 0 { if fileType&allocate == 0 {
continue continue

View File

@ -26,10 +26,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
} }
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove) p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove)
return p, func() { return p, cancel, err
cancel()
done()
}, err
} }

View File

@ -69,14 +69,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
return return
} }
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything // passing 0 spt because we don't allocate anything
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove) paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
if err != nil { if err != nil {
log.Error("%+v", err) log.Error("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
defer done()
path := PathByType(paths, ft) path := PathByType(paths, ft)
if path == "" { if path == "" {

View File

@ -24,7 +24,7 @@ const (
) )
type Store interface { type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last // like remove, but doesn't remove the primary sector copy, nor the last

View File

@ -197,12 +197,13 @@ func (st *Local) reportHealth(ctx context.Context) {
} }
} }
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
} }
st.localLk.RLock() st.localLk.RLock()
defer st.localLk.RUnlock()
var out SectorPaths var out SectorPaths
var storageIDs SectorPaths var storageIDs SectorPaths
@ -245,7 +246,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType) sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
if err != nil { if err != nil {
st.localLk.RUnlock() st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) return SectorPaths{}, SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err)
} }
var best string var best string
@ -277,7 +278,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
if best == "" { if best == "" {
st.localLk.RUnlock() st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") return SectorPaths{}, SectorPaths{}, xerrors.Errorf("couldn't find a suitable path for a sector")
} }
SetPathByType(&out, fileType, best) SetPathByType(&out, fileType, best)
@ -285,7 +286,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
allocate ^= fileType allocate ^= fileType
} }
return out, storageIDs, st.localLk.RUnlock, nil return out, storageIDs, nil
} }
func (st *Local) Local(ctx context.Context) ([]StoragePath, error) { func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
@ -399,17 +400,15 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
} }
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove) dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err) return xerrors.Errorf("acquire dest storage: %w", err)
} }
defer sdone()
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove) src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage: %w", err) return xerrors.Errorf("acquire src storage: %w", err)
} }
defer ddone()
for _, fileType := range PathTypes { for _, fileType := range PathTypes {
if fileType&types == 0 { if fileType&types == 0 {

View File

@ -50,9 +50,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
} }
} }
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) { func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
} }
for { for {
@ -71,7 +71,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
case <-c: case <-c:
continue continue
case <-ctx.Done(): case <-ctx.Done():
return SectorPaths{}, SectorPaths{}, nil, ctx.Err() return SectorPaths{}, SectorPaths{}, ctx.Err()
} }
} }
@ -82,9 +82,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
r.fetchLk.Unlock() r.fetchLk.Unlock()
}() }()
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op) paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
} }
for _, fileType := range PathTypes { for _, fileType := range PathTypes {
@ -96,13 +96,11 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
continue continue
} }
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op) ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
if err != nil { if err != nil {
done() return SectorPaths{}, SectorPaths{}, err
return SectorPaths{}, SectorPaths{}, nil, err
} }
done = mergeDone(done, rdone)
SetPathByType(&paths, fileType, ap) SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID)) SetPathByType(&stores, fileType, string(storageID))
@ -118,26 +116,26 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
} }
} }
return paths, stores, done, nil return paths, stores, nil
} }
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) { func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false) si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil { if err != nil {
return "", "", "", nil, err return "", "", "", err
} }
if len(si) == 0 { if len(si) == 0 {
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
} }
sort.Slice(si, func(i, j int) bool { sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight return si[i].Weight < si[j].Weight
}) })
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op) apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
if err != nil { if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err)
} }
dest := PathByType(apaths, fileType) dest := PathByType(apaths, fileType)
storageID := PathByType(ids, fileType) storageID := PathByType(ids, fileType)
@ -156,12 +154,11 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
if merr != nil { if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
} }
return dest, ID(storageID), url, done, nil return dest, ID(storageID), url, nil
} }
} }
done() return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
} }
func (r *Remote) fetch(ctx context.Context, url, outname string) error { func (r *Remote) fetch(ctx context.Context, url, outname string) error {
@ -215,11 +212,10 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
// Make sure we have the data local // Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) _, _, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err) return xerrors.Errorf("acquire src storage (remote): %w", err)
} }
ddone()
return r.local.MoveStorage(ctx, s, spt, types) return r.local.MoveStorage(ctx, s, spt, types)
} }
@ -336,11 +332,4 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
return out, nil return out, nil
} }
func mergeDone(a func(), b func()) func() {
return func() {
a()
b()
}
}
var _ Store = &Remote{} var _ Store = &Remote{}