sectorstorage: Remove unsealed sector in FinalizeSector
This commit is contained in:
parent
2900bb099f
commit
cd892105f6
@ -36,7 +36,7 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
|
|||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
id := ID(vars["id"])
|
id := ID(vars["id"])
|
||||||
|
|
||||||
st, err := handler.Local.FsStat(id)
|
st, err := handler.Local.FsStat(r.Context(), id)
|
||||||
switch err {
|
switch err {
|
||||||
case errPathNotFound:
|
case errPathNotFound:
|
||||||
w.WriteHeader(404)
|
w.WriteHeader(404)
|
||||||
@ -129,7 +129,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := handler.delete(r.Context(), id, ft); err != nil {
|
if err := handler.Remove(r.Context(), id, ft); err != nil {
|
||||||
log.Error("%+v", err)
|
log.Error("%+v", err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
type Store interface {
|
type Store interface {
|
||||||
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
||||||
|
Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
|
||||||
FsStat(ctx context.Context, id ID) (FsStat, error)
|
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +270,7 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
||||||
if bits.OnesCount(uint(typ)) != 1 {
|
if bits.OnesCount(uint(typ)) != 1 {
|
||||||
return xerrors.New("delete expects one file type")
|
return xerrors.New("delete expects one file type")
|
||||||
}
|
}
|
||||||
@ -311,7 +311,7 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
|||||||
|
|
||||||
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
||||||
|
|
||||||
func (st *Local) FsStat(id ID) (FsStat, error) {
|
func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||||
st.localLk.RLock()
|
st.localLk.RLock()
|
||||||
defer st.localLk.RUnlock()
|
defer st.localLk.RUnlock()
|
||||||
|
|
||||||
@ -322,3 +322,5 @@ func (st *Local) FsStat(id ID) (FsStat, error) {
|
|||||||
|
|
||||||
return Stat(p.local)
|
return Stat(p.local)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ Store = &Local{}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math/bits"
|
||||||
"mime"
|
"mime"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -173,6 +174,33 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
||||||
|
if bits.OnesCount(uint(typ)) != 1 {
|
||||||
|
return xerrors.New("delete expects one file type")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.local.Remove(ctx, sid, typ); err != nil {
|
||||||
|
return xerrors.Errorf("remove from local: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
si, err := r.index.StorageFindSector(ctx, sid, typ, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range si {
|
||||||
|
for _, url := range info.URLs {
|
||||||
|
if err := r.deleteFromRemote(ctx, url); err != nil {
|
||||||
|
log.Warnf("remove %s: %+v", url, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
||||||
log.Infof("Delete %s", url)
|
log.Infof("Delete %s", url)
|
||||||
|
|
||||||
@ -197,7 +225,7 @@ func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||||
st, err := r.local.FsStat(id)
|
st, err := r.local.FsStat(ctx, id)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
return st, nil
|
return st, nil
|
||||||
|
@ -152,7 +152,15 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return sb.FinalizeSector(ctx, sector)
|
if err := sb.FinalizeSector(ctx, sector); err != nil {
|
||||||
|
return xerrors.Errorf("finalizing sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.storage.Remove(ctx, sector, sectorbuilder.FTUnsealed); err != nil {
|
||||||
|
return xerrors.Errorf("removing unsealed data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
|
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user