diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 059f2e79d..10ddfde10 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -186,6 +186,7 @@ type StorageMinerStruct struct { StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"` StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"` + StorageDropSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"` StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType, bool) ([]stores.StorageInfo, error) `perm:"admin"` StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"` StorageBestAlloc func(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]stores.StorageInfo, error) `perm:"admin"` @@ -665,6 +666,10 @@ func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId return c.Internal.StorageDeclareSector(ctx, storageId, s, ft) } +func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + return c.Internal.StorageDropSector(ctx, storageId, s, ft) +} + func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType, allowFetch bool) ([]stores.StorageInfo, error) { return c.Internal.StorageFindSector(ctx, si, types, allowFetch) } diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 1fc5cbbfa..699a2bb04 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -237,7 +237,7 @@ var runCmd = &cli.Command{ rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi)) mux.Handle("/rpc/v0", rpcServer) - mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Store: localStore}).ServeHTTP) + mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof ah := &auth.Handler{ diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index 3846baa42..153ae661f 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -86,7 +86,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi ls: ls, storage: stor, localStore: lstor, - remoteHnd: &stores.FetchHandler{Store: lstor}, + remoteHnd: &stores.FetchHandler{Local: lstor}, index: si, nextWorker: 0, diff --git a/storage/sealmgr/stores/http_handler.go b/storage/sealmgr/stores/http_handler.go index e50c85e38..f4e519cc6 100644 --- a/storage/sealmgr/stores/http_handler.go +++ b/storage/sealmgr/stores/http_handler.go @@ -17,35 +17,37 @@ import ( var log = logging.Logger("stores") type FetchHandler struct { - Store + *Local } func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/ mux := mux.NewRouter() mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET") - - log.Infof("SERVEGETREMOTE %s", r.URL) + mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE") mux.ServeHTTP(w, r) } func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) { + log.Infof("SERVE GET %s", r.URL) vars := mux.Vars(r) id, err := sectorutil.ParseSectorID(vars["id"]) if err != nil { - log.Error(err) + log.Error("%+v", err) w.WriteHeader(500) return } ft, err := ftFromString(vars["type"]) if err != nil { + log.Error("%+v", err) return } - paths, _, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false) + paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false) if err != nil { + log.Error("%+v", err) return } defer done() @@ -59,7 +61,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ stat, err := os.Stat(path) if err != nil { - log.Error(err) + log.Error("%+v", err) w.WriteHeader(500) return } @@ -73,14 +75,38 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ w.Header().Set("Content-Type", "application/octet-stream") } if err != nil { - log.Error(err) + log.Error("%+v", err) w.WriteHeader(500) return } w.WriteHeader(200) if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small - log.Error(err) + log.Error("%+v", err) + return + } +} + +func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) { + log.Infof("SERVE DELETE %s", r.URL) + vars := mux.Vars(r) + + id, err := sectorutil.ParseSectorID(vars["id"]) + if err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } + + ft, err := ftFromString(vars["type"]) + if err != nil { + log.Error("%+v", err) + return + } + + if err := handler.delete(r.Context(), id, ft); err != nil { + log.Error("%+v", err) + w.WriteHeader(500) return } } diff --git a/storage/sealmgr/stores/index.go b/storage/sealmgr/stores/index.go index fbacf2987..d77c1b811 100644 --- a/storage/sealmgr/stores/index.go +++ b/storage/sealmgr/stores/index.go @@ -35,7 +35,8 @@ type SectorIndex interface { // part of storage-miner api // TODO: StorageUpdateStats(FsStat) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error - StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) + StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error + StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) } @@ -137,6 +138,40 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se return nil } +func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + i.lk.Lock() + defer i.lk.Unlock() + + for _, fileType := range pathTypes { + if fileType&ft == 0 { + continue + } + + d := Decl{s, fileType} + + if len(i.sectors[d]) == 0 { + return nil + } + + rewritten := make([]ID, 0, len(i.sectors[d])-1) + for _, sid := range i.sectors[d] { + if sid == storageId { + continue + } + + rewritten = append(rewritten, sid) + } + if len(rewritten) == 0 { + delete(i.sectors, d) + return nil + } + + i.sectors[d] = rewritten + } + + return nil +} + func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index 2395f840c..304c64ba4 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "io/ioutil" + "math/bits" "os" "path/filepath" "sync" @@ -187,6 +188,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID)) existing ^= fileType + break } } @@ -268,6 +270,37 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) { return out, nil } +func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error { + if bits.OnesCount(uint(typ)) != 1 { + return xerrors.New("delete expects one file type") + } + + si, err := st.index.StorageFindSector(ctx, sid, typ, false) + if err != nil { + return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) + } + + for _, info := range si { + p, ok := st.paths[info.ID] + if !ok { + continue + } + + if p.local == "" { // TODO: can that even be the case? + continue + } + + spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid)) + log.Infof("remove %s", spath) + + if err := os.RemoveAll(spath); err != nil { + log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) + } + } + + return nil +} + func (st *Local) FsStat(id ID) (FsStat, error) { st.localLk.RLock() defer st.localLk.RUnlock() diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index 5866614b0..fca680e96 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -59,7 +59,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec continue } - ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) + ap, storageID, url, foundIn, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) if err != nil { done() return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err @@ -71,16 +71,26 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) + continue + } + + // TODO: some way to allow having duplicated sectors in the system for perf + if err := r.index.StorageDropSector(ctx, foundIn, s, fileType); err != nil { + log.Warnf("dropping sector %v from %s from sector index failed: %+v", s, storageID, err) + } + + if err := r.deleteFromRemote(url); err != nil { + log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) } } return paths, stores, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, ID, func(), error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { - return "", "", nil, err + return "", "", "", "", nil, err } sort.Slice(si, func(i, j int) bool { @@ -89,7 +99,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing) if err != nil { - return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) + return "", "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } dest := sectorutil.PathByType(apaths, fileType) storageID := sectorutil.PathByType(ids, fileType) @@ -106,12 +116,12 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } - return dest, ID(storageID), done, nil + return dest, ID(storageID), url, info.ID, done, nil } } done() - return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) + return "", "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } func (r *Remote) fetch(url, outname string) error { @@ -160,7 +170,28 @@ func (r *Remote) fetch(url, outname string) error { default: return xerrors.Errorf("unknown content type: '%s'", mediatype) } +} +func (r *Remote) deleteFromRemote(url string) error { + log.Infof("Delete %s", url) + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + return xerrors.Errorf("request: %w", err) + } + req.Header = r.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + + return nil } func mergeDone(a func(), b func()) func() {