diff --git a/api/api_storage.go b/api/api_storage.go index d8c33f4f7..0f3b07bd5 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -110,6 +110,7 @@ type StorageMiner interface { StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) StorageLocal(ctx context.Context) (map[stores.ID]string, error) + StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 0a2c297b7..b6bc3f1b3 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -181,11 +181,12 @@ type StorageMinerStruct struct { SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` - WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm - WorkerStats func(context.Context) (map[uint64]api.WorkerStats, error) `perm:"admin"` + WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm + WorkerStats func(context.Context) (map[uint64]api.WorkerStats, error) `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` + StorageStat func(context.Context, stores.ID) (stores.FsStat, error) `perm:"admin"` StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"` StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"` StorageDropSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"` @@ -688,6 +689,10 @@ func (c *StorageMinerStruct) StorageLocal(ctx context.Context) (map[stores.ID]st return c.Internal.StorageLocal(ctx) } +func (c *StorageMinerStruct) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) { + return c.Internal.StorageStat(ctx, id) +} + func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (stores.StorageInfo, error) { return c.Internal.StorageInfo(ctx, id) } diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index 397894edf..98cc96754 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "strconv" + "time" "github.com/google/uuid" "github.com/mitchellh/go-homedir" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" ) @@ -134,9 +136,15 @@ var storageListCmd = &cli.Command{ return err } - sorted := make([]struct{stores.ID; sectors []stores.Decl}, 0, len(st)) + sorted := make([]struct { + stores.ID + sectors []stores.Decl + }, 0, len(st)) for id, decls := range st { - sorted = append(sorted, struct{stores.ID; sectors []stores.Decl}{id, decls}) + sorted = append(sorted, struct { + stores.ID + sectors []stores.Decl + }{id, decls}) } sort.Slice(sorted, func(i, j int) bool { @@ -154,8 +162,20 @@ var storageListCmd = &cli.Command{ } } + pingStart := time.Now() + st, err := nodeApi.StorageStat(ctx, s.ID) + if err != nil { + return err + } + ping := time.Now().Sub(pingStart) + fmt.Printf("%s:\n", s.ID) fmt.Printf("\tUnsealed: %d; Sealed: %d; Caches: %d\n", cnt[0], cnt[1], cnt[2]) + fmt.Printf("\tSpace Used: %s/%s %d%% (%s avail)\n", + types.SizeStr(types.NewInt(st.Capacity-st.Available)), + types.SizeStr(types.NewInt(st.Capacity)), + (st.Capacity-st.Available)*100/st.Capacity, + types.SizeStr(types.NewInt(st.Available))) si, err := nodeApi.StorageInfo(ctx, s.ID) if err != nil { @@ -179,8 +199,13 @@ var storageListCmd = &cli.Command{ if localPath, ok := local[s.ID]; ok { fmt.Printf("\tLocal: %s\n", localPath) } - for _, l := range si.URLs { - fmt.Printf("\tURL: %s\n", l) // TODO; try pinging maybe?? print latency? + for i, l := range si.URLs { + var rtt string + if _, ok := local[s.ID]; !ok && i == 0 { + rtt = " (latency: " + ping.Truncate(time.Microsecond*100).String() + ")" + } + + fmt.Printf("\tURL: %s%s\n", l, rtt) // TODO; try pinging maybe?? print latency? } } diff --git a/cmd/lotus-storage-miner/workers.go b/cmd/lotus-storage-miner/workers.go index 77f3cc929..a29de1c6b 100644 --- a/cmd/lotus-storage-miner/workers.go +++ b/cmd/lotus-storage-miner/workers.go @@ -35,9 +35,15 @@ var workersListCmd = &cli.Command{ return err } - st := make([]struct{id uint64; api.WorkerStats}, 0, len(stats)) + st := make([]struct { + id uint64 + api.WorkerStats + }, 0, len(stats)) for id, stat := range stats { - st = append(st, struct{id uint64; api.WorkerStats}{id, stat}) + st = append(st, struct { + id uint64 + api.WorkerStats + }{id, stat}) } sort.Slice(st, func(i, j int) bool { @@ -66,16 +72,16 @@ var workersListCmd = &cli.Command{ types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)), types.SizeStr(types.NewInt(stat.Info.Resources.MemSwap)), types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved)), - stat.Info.Resources.MemReserved * 100 / stat.Info.Resources.MemPhysical) + stat.Info.Resources.MemReserved*100/stat.Info.Resources.MemPhysical) fmt.Printf("\t\tUsed: Physical %s (%d%% phys), Virtual %s (%d%% phys, %d%% virt)\n", types.SizeStr(types.NewInt(stat.MemUsedMin)), - stat.MemUsedMin * 100 / stat.Info.Resources.MemPhysical, + stat.MemUsedMin*100/stat.Info.Resources.MemPhysical, types.SizeStr(types.NewInt(stat.MemUsedMax)), - stat.MemUsedMax * 100 / stat.Info.Resources.MemPhysical, - stat.MemUsedMax * 100 / (stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap)) + stat.MemUsedMax*100/stat.Info.Resources.MemPhysical, + stat.MemUsedMax*100/(stat.Info.Resources.MemPhysical+stat.Info.Resources.MemSwap)) } return nil }, -} \ No newline at end of file +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 2c29766a3..24027bba5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -51,7 +51,6 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { sm.StorageMgr.ServeHTTP(w, r) } - func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]api.WorkerStats, error) { return sm.StorageMgr.WorkerStats(), nil } @@ -142,6 +141,10 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed return out, nil } +func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) { + return sm.StorageMgr.FsStat(ctx, id) +} + func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error { return sm.Miner.ForceSectorState(ctx, id, state) } diff --git a/storage/sectorstorage/manager.go b/storage/sectorstorage/manager.go index 6a38c3f4d..f3aa90964 100644 --- a/storage/sectorstorage/manager.go +++ b/storage/sectorstorage/manager.go @@ -410,4 +410,8 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error return out, nil } +func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, error) { + return m.storage.FsStat(ctx, id) +} + var _ SectorManager = &Manager{} diff --git a/storage/sectorstorage/stores/http_handler.go b/storage/sectorstorage/stores/http_handler.go index daa81061e..97c6c34f6 100644 --- a/storage/sectorstorage/stores/http_handler.go +++ b/storage/sectorstorage/stores/http_handler.go @@ -1,6 +1,7 @@ package stores import ( + "encoding/json" "io" "net/http" "os" @@ -23,12 +24,36 @@ type FetchHandler struct { func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/ mux := mux.NewRouter() + mux.HandleFunc("/remote/stat/{id}", handler.remoteStatFs).Methods("GET") mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE") mux.ServeHTTP(w, r) } +func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request) { + log.Debugf("SERVE STAT %s", r.URL) + vars := mux.Vars(r) + id := ID(vars["id"]) + + st, err := handler.Local.FsStat(id) + switch err { + case errPathNotFound: + w.WriteHeader(404) + return + case nil: + break + default: + w.WriteHeader(500) + log.Errorf("%+v", err) + return + } + + if err := json.NewEncoder(w).Encode(&st); err != nil { + log.Warnf("error writing stat response: %+v", err) + } +} + func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) { log.Infof("SERVE GET %s", r.URL) vars := mux.Vars(r) diff --git a/storage/sectorstorage/stores/index.go b/storage/sectorstorage/stores/index.go index da7c8212e..ccad8ba7f 100644 --- a/storage/sectorstorage/stores/index.go +++ b/storage/sectorstorage/stores/index.go @@ -295,8 +295,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.Sec } sort.Slice(candidates, func(i, j int) bool { - iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight))) - jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight))) + iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight))) + jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight))) return iw.GreaterThan(jw) }) diff --git a/storage/sectorstorage/stores/interface.go b/storage/sectorstorage/stores/interface.go index 67c18b16e..149cb9e5f 100644 --- a/storage/sectorstorage/stores/interface.go +++ b/storage/sectorstorage/stores/interface.go @@ -12,11 +12,7 @@ import ( type Store interface { AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error) -} - -type FsStat struct { - Capacity uint64 - Free uint64 // Free to use for sector storage + FsStat(ctx context.Context, id ID) (FsStat, error) } func Stat(path string) (FsStat, error) { @@ -26,7 +22,13 @@ func Stat(path string) (FsStat, error) { } return FsStat{ - Capacity: stat.Blocks * uint64(stat.Bsize), - Free: stat.Bavail * uint64(stat.Bsize), + Capacity: stat.Blocks * uint64(stat.Bsize), + Available: stat.Bavail * uint64(stat.Bsize), }, nil } + +type FsStat struct { + Capacity uint64 + Available uint64 // Available to use for sector storage + Used uint64 +} diff --git a/storage/sectorstorage/stores/local.go b/storage/sectorstorage/stores/local.go index 64924ad00..581afb13f 100644 --- a/storage/sectorstorage/stores/local.go +++ b/storage/sectorstorage/stores/local.go @@ -309,13 +309,15 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder return nil } +var errPathNotFound = xerrors.Errorf("fsstat: path not found") + func (st *Local) FsStat(id ID) (FsStat, error) { st.localLk.RLock() defer st.localLk.RUnlock() p, ok := st.paths[id] if !ok { - return FsStat{}, xerrors.Errorf("fsstat: path not found") + return FsStat{}, errPathNotFound } return Stat(p.local) diff --git a/storage/sectorstorage/stores/remote.go b/storage/sectorstorage/stores/remote.go index d2ce1626a..a0648d972 100644 --- a/storage/sectorstorage/stores/remote.go +++ b/storage/sectorstorage/stores/remote.go @@ -2,9 +2,13 @@ package stores import ( "context" + "encoding/json" + "io/ioutil" "mime" "net/http" + "net/url" "os" + gopath "path" "sort" "sync" @@ -75,7 +79,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec } // TODO: some way to allow having duplicated sectors in the system for perf - if err := r.deleteFromRemote(url); err != nil { + if err := r.deleteFromRemote(ctx, url); err != nil { log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) } } @@ -103,7 +107,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType var merr error for _, info := range si { for _, url := range info.URLs { - err := r.fetch(url, dest) + err := r.fetch(ctx, url, dest) if err != nil { merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err)) continue @@ -120,7 +124,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } -func (r *Remote) fetch(url, outname string) error { +func (r *Remote) fetch(ctx context.Context, url, outname string) error { log.Infof("Fetch %s -> %s", url, outname) req, err := http.NewRequest("GET", url, nil) @@ -128,6 +132,7 @@ func (r *Remote) fetch(url, outname string) error { return xerrors.Errorf("request: %w", err) } req.Header = r.auth + req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -168,7 +173,7 @@ func (r *Remote) fetch(url, outname string) error { } } -func (r *Remote) deleteFromRemote(url string) error { +func (r *Remote) deleteFromRemote(ctx context.Context, url string) error { log.Infof("Delete %s", url) req, err := http.NewRequest("DELETE", url, nil) @@ -176,6 +181,7 @@ func (r *Remote) deleteFromRemote(url string) error { return xerrors.Errorf("request: %w", err) } req.Header = r.auth + req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -190,6 +196,68 @@ func (r *Remote) deleteFromRemote(url string) error { return nil } +func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) { + st, err := r.local.FsStat(id) + switch err { + case nil: + return st, nil + case errPathNotFound: + break + default: + return FsStat{}, xerrors.Errorf("local stat: %w", err) + } + + si, err := r.index.StorageInfo(ctx, id) + if err != nil { + return FsStat{}, xerrors.Errorf("getting remote storage info: %w", err) + } + + if len(si.URLs) == 0 { + return FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id) + } + + rl, err := url.Parse(si.URLs[0]) + if err != nil { + return FsStat{}, xerrors.Errorf("failed to parse url: %w", err) + } + + rl.Path = gopath.Join(rl.Path, "stat", string(id)) + + req, err := http.NewRequest("GET", rl.String(), nil) + if err != nil { + return FsStat{}, xerrors.Errorf("request: %w", err) + } + req.Header = r.auth + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return FsStat{}, xerrors.Errorf("do request: %w", err) + } + switch resp.StatusCode { + case 200: + break + case 404: + return FsStat{}, errPathNotFound + case 500: + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err) + } + + return FsStat{}, xerrors.New(string(b)) + } + + var out FsStat + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return FsStat{}, xerrors.Errorf("decoding fsstat: %w", err) + } + + defer resp.Body.Close() + + return out, nil +} + func mergeDone(a func(), b func()) func() { return func() { a()