workers: Drop sectors from origin after transfers
This commit is contained in:
parent
709fd034b4
commit
971fe6fdfd
@ -186,6 +186,7 @@ type StorageMinerStruct struct {
|
||||
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
|
||||
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
|
||||
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
||||
StorageDropSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
||||
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType, bool) ([]stores.StorageInfo, error) `perm:"admin"`
|
||||
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
|
||||
StorageBestAlloc func(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]stores.StorageInfo, error) `perm:"admin"`
|
||||
@ -665,6 +666,10 @@ func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId
|
||||
return c.Internal.StorageDeclareSector(ctx, storageId, s, ft)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
|
||||
return c.Internal.StorageDropSector(ctx, storageId, s, ft)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType, allowFetch bool) ([]stores.StorageInfo, error) {
|
||||
return c.Internal.StorageFindSector(ctx, si, types, allowFetch)
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ var runCmd = &cli.Command{
|
||||
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
|
||||
|
||||
mux.Handle("/rpc/v0", rpcServer)
|
||||
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Store: localStore}).ServeHTTP)
|
||||
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
|
||||
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
|
||||
|
||||
ah := &auth.Handler{
|
||||
|
@ -86,7 +86,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
|
||||
ls: ls,
|
||||
storage: stor,
|
||||
localStore: lstor,
|
||||
remoteHnd: &stores.FetchHandler{Store: lstor},
|
||||
remoteHnd: &stores.FetchHandler{Local: lstor},
|
||||
index: si,
|
||||
|
||||
nextWorker: 0,
|
||||
|
@ -17,35 +17,37 @@ import (
|
||||
var log = logging.Logger("stores")
|
||||
|
||||
type FetchHandler struct {
|
||||
Store
|
||||
*Local
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
|
||||
mux := mux.NewRouter()
|
||||
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
|
||||
|
||||
log.Infof("SERVEGETREMOTE %s", r.URL)
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
|
||||
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE GET %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
||||
id, err := sectorutil.ParseSectorID(vars["id"])
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
ft, err := ftFromString(vars["type"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
paths, _, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false)
|
||||
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false)
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
defer done()
|
||||
@ -59,7 +61,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
@ -73,14 +75,38 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(200)
|
||||
if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small
|
||||
log.Error(err)
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE DELETE %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
||||
id, err := sectorutil.ParseSectorID(vars["id"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
ft, err := ftFromString(vars["type"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := handler.delete(r.Context(), id, ft); err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,8 @@ type SectorIndex interface { // part of storage-miner api
|
||||
// TODO: StorageUpdateStats(FsStat)
|
||||
|
||||
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
|
||||
StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error)
|
||||
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
|
||||
StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error)
|
||||
|
||||
StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error)
|
||||
}
|
||||
@ -137,6 +138,40 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&ft == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
d := Decl{s, fileType}
|
||||
|
||||
if len(i.sectors[d]) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
rewritten := make([]ID, 0, len(i.sectors[d])-1)
|
||||
for _, sid := range i.sectors[d] {
|
||||
if sid == storageId {
|
||||
continue
|
||||
}
|
||||
|
||||
rewritten = append(rewritten, sid)
|
||||
}
|
||||
if len(rewritten) == 0 {
|
||||
delete(i.sectors, d)
|
||||
return nil
|
||||
}
|
||||
|
||||
i.sectors[d] = rewritten
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@ -187,6 +188,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
|
||||
sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID))
|
||||
|
||||
existing ^= fileType
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,6 +270,37 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
||||
if bits.OnesCount(uint(typ)) != 1 {
|
||||
return xerrors.New("delete expects one file type")
|
||||
}
|
||||
|
||||
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||
}
|
||||
|
||||
for _, info := range si {
|
||||
p, ok := st.paths[info.ID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.local == "" { // TODO: can that even be the case?
|
||||
continue
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid))
|
||||
log.Infof("remove %s", spath)
|
||||
|
||||
if err := os.RemoveAll(spath); err != nil {
|
||||
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) FsStat(id ID) (FsStat, error) {
|
||||
st.localLk.RLock()
|
||||
defer st.localLk.RUnlock()
|
||||
|
@ -59,7 +59,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
continue
|
||||
}
|
||||
|
||||
ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||
ap, storageID, url, foundIn, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||
if err != nil {
|
||||
done()
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
|
||||
@ -71,16 +71,26 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
|
||||
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
|
||||
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: some way to allow having duplicated sectors in the system for perf
|
||||
if err := r.index.StorageDropSector(ctx, foundIn, s, fileType); err != nil {
|
||||
log.Warnf("dropping sector %v from %s from sector index failed: %+v", s, storageID, err)
|
||||
}
|
||||
|
||||
if err := r.deleteFromRemote(url); err != nil {
|
||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||
}
|
||||
}
|
||||
|
||||
return paths, stores, done, nil
|
||||
}
|
||||
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) {
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, ID, func(), error) {
|
||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
return "", "", "", "", nil, err
|
||||
}
|
||||
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
@ -89,7 +99,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
|
||||
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
|
||||
if err != nil {
|
||||
return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
return "", "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
dest := sectorutil.PathByType(apaths, fileType)
|
||||
storageID := sectorutil.PathByType(ids, fileType)
|
||||
@ -106,12 +116,12 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
if merr != nil {
|
||||
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||
}
|
||||
return dest, ID(storageID), done, nil
|
||||
return dest, ID(storageID), url, info.ID, done, nil
|
||||
}
|
||||
}
|
||||
|
||||
done()
|
||||
return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
return "", "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
}
|
||||
|
||||
func (r *Remote) fetch(url, outname string) error {
|
||||
@ -160,7 +170,28 @@ func (r *Remote) fetch(url, outname string) error {
|
||||
default:
|
||||
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) deleteFromRemote(url string) error {
|
||||
log.Infof("Delete %s", url)
|
||||
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeDone(a func(), b func()) func() {
|
||||
|
Loading…
Reference in New Issue
Block a user