sectorstorage: Fix remote sector deletion
This commit is contained in:
parent
a3ba8eb0d7
commit
0bb37d1fd6
@ -280,6 +280,10 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||
}
|
||||
|
||||
if len(si) == 0 {
|
||||
return xerrors.Errorf("can't delete sector %v(%d), not found", sid, typ)
|
||||
}
|
||||
|
||||
for _, info := range si {
|
||||
p, ok := st.paths[info.ID]
|
||||
if !ok {
|
||||
@ -290,6 +294,10 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
||||
continue
|
||||
}
|
||||
|
||||
if err := st.index.StorageDropSector(ctx, info.ID, sid, typ); err != nil {
|
||||
return xerrors.Errorf("dropping sector from index: %w", err)
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid))
|
||||
log.Infof("remove %s", spath)
|
||||
|
||||
|
@ -59,7 +59,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
continue
|
||||
}
|
||||
|
||||
ap, storageID, url, foundIn, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||
if err != nil {
|
||||
done()
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
|
||||
@ -75,10 +75,6 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
}
|
||||
|
||||
// TODO: some way to allow having duplicated sectors in the system for perf
|
||||
if err := r.index.StorageDropSector(ctx, foundIn, s, fileType); err != nil {
|
||||
log.Warnf("dropping sector %v from %s from sector index failed: %+v", s, storageID, err)
|
||||
}
|
||||
|
||||
if err := r.deleteFromRemote(url); err != nil {
|
||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||
}
|
||||
@ -87,10 +83,10 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
return paths, stores, done, nil
|
||||
}
|
||||
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, ID, func(), error) {
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, func(), error) {
|
||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
||||
if err != nil {
|
||||
return "", "", "", "", nil, err
|
||||
return "", "", "", nil, err
|
||||
}
|
||||
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
@ -99,7 +95,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
|
||||
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
|
||||
if err != nil {
|
||||
return "", "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
dest := sectorutil.PathByType(apaths, fileType)
|
||||
storageID := sectorutil.PathByType(ids, fileType)
|
||||
@ -116,12 +112,12 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
if merr != nil {
|
||||
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||
}
|
||||
return dest, ID(storageID), url, info.ID, done, nil
|
||||
return dest, ID(storageID), url, done, nil
|
||||
}
|
||||
}
|
||||
|
||||
done()
|
||||
return "", "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
}
|
||||
|
||||
func (r *Remote) fetch(url, outname string) error {
|
||||
|
Loading…
Reference in New Issue
Block a user