storageminer: More storage stats in storage list
This commit is contained in:
parent
6842b4cb46
commit
290b7ebd26
@ -410,4 +410,8 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
|
||||
return m.storage.FsStat(ctx, id)
|
||||
}
|
||||
|
||||
var _ SectorManager = &Manager{}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -23,12 +24,36 @@ type FetchHandler struct {
|
||||
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
|
||||
mux := mux.NewRouter()
|
||||
|
||||
mux.HandleFunc("/remote/stat/{id}", handler.remoteStatFs).Methods("GET")
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
|
||||
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debugf("SERVE STAT %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
id := ID(vars["id"])
|
||||
|
||||
st, err := handler.Local.FsStat(id)
|
||||
switch err {
|
||||
case errPathNotFound:
|
||||
w.WriteHeader(404)
|
||||
return
|
||||
case nil:
|
||||
break
|
||||
default:
|
||||
w.WriteHeader(500)
|
||||
log.Errorf("%+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&st); err != nil {
|
||||
log.Warnf("error writing stat response: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE GET %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
@ -295,8 +295,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.Sec
|
||||
}
|
||||
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight)))
|
||||
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight)))
|
||||
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight)))
|
||||
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight)))
|
||||
|
||||
return iw.GreaterThan(jw)
|
||||
})
|
||||
|
@ -12,11 +12,7 @@ import (
|
||||
|
||||
type Store interface {
|
||||
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
||||
}
|
||||
|
||||
type FsStat struct {
|
||||
Capacity uint64
|
||||
Free uint64 // Free to use for sector storage
|
||||
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||
}
|
||||
|
||||
func Stat(path string) (FsStat, error) {
|
||||
@ -26,7 +22,13 @@ func Stat(path string) (FsStat, error) {
|
||||
}
|
||||
|
||||
return FsStat{
|
||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
||||
Free: stat.Bavail * uint64(stat.Bsize),
|
||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
||||
Available: stat.Bavail * uint64(stat.Bsize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type FsStat struct {
|
||||
Capacity uint64
|
||||
Available uint64 // Available to use for sector storage
|
||||
Used uint64
|
||||
}
|
||||
|
@ -309,13 +309,15 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
||||
return nil
|
||||
}
|
||||
|
||||
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
||||
|
||||
func (st *Local) FsStat(id ID) (FsStat, error) {
|
||||
st.localLk.RLock()
|
||||
defer st.localLk.RUnlock()
|
||||
|
||||
p, ok := st.paths[id]
|
||||
if !ok {
|
||||
return FsStat{}, xerrors.Errorf("fsstat: path not found")
|
||||
return FsStat{}, errPathNotFound
|
||||
}
|
||||
|
||||
return Stat(p.local)
|
||||
|
@ -2,9 +2,13 @@ package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"mime"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
gopath "path"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@ -75,7 +79,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
||||
}
|
||||
|
||||
// TODO: some way to allow having duplicated sectors in the system for perf
|
||||
if err := r.deleteFromRemote(url); err != nil {
|
||||
if err := r.deleteFromRemote(ctx, url); err != nil {
|
||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||
}
|
||||
}
|
||||
@ -103,7 +107,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
var merr error
|
||||
for _, info := range si {
|
||||
for _, url := range info.URLs {
|
||||
err := r.fetch(url, dest)
|
||||
err := r.fetch(ctx, url, dest)
|
||||
if err != nil {
|
||||
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err))
|
||||
continue
|
||||
@ -120,7 +124,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
||||
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
}
|
||||
|
||||
func (r *Remote) fetch(url, outname string) error {
|
||||
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
||||
log.Infof("Fetch %s -> %s", url, outname)
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
@ -128,6 +132,7 @@ func (r *Remote) fetch(url, outname string) error {
|
||||
return xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
@ -168,7 +173,7 @@ func (r *Remote) fetch(url, outname string) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) deleteFromRemote(url string) error {
|
||||
func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
||||
log.Infof("Delete %s", url)
|
||||
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
@ -176,6 +181,7 @@ func (r *Remote) deleteFromRemote(url string) error {
|
||||
return xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
@ -190,6 +196,68 @@ func (r *Remote) deleteFromRemote(url string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||
st, err := r.local.FsStat(id)
|
||||
switch err {
|
||||
case nil:
|
||||
return st, nil
|
||||
case errPathNotFound:
|
||||
break
|
||||
default:
|
||||
return FsStat{}, xerrors.Errorf("local stat: %w", err)
|
||||
}
|
||||
|
||||
si, err := r.index.StorageInfo(ctx, id)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
|
||||
}
|
||||
|
||||
if len(si.URLs) == 0 {
|
||||
return FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
|
||||
}
|
||||
|
||||
rl, err := url.Parse(si.URLs[0])
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
|
||||
rl.Path = gopath.Join(rl.Path, "stat", string(id))
|
||||
|
||||
req, err := http.NewRequest("GET", rl.String(), nil)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
switch resp.StatusCode {
|
||||
case 200:
|
||||
break
|
||||
case 404:
|
||||
return FsStat{}, errPathNotFound
|
||||
case 500:
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
|
||||
}
|
||||
|
||||
return FsStat{}, xerrors.New(string(b))
|
||||
}
|
||||
|
||||
var out FsStat
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func mergeDone(a func(), b func()) func() {
|
||||
return func() {
|
||||
a()
|
||||
|
Loading…
Reference in New Issue
Block a user