storageminer: More storage stats in storage list
This commit is contained in:
parent
8d1fab1e05
commit
8559c89560
@ -110,6 +110,7 @@ type StorageMiner interface {
|
|||||||
|
|
||||||
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
|
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
|
||||||
StorageLocal(ctx context.Context) (map[stores.ID]string, error)
|
StorageLocal(ctx context.Context) (map[stores.ID]string, error)
|
||||||
|
StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error)
|
||||||
|
|
||||||
// WorkerConnect tells the node to connect to workers RPC
|
// WorkerConnect tells the node to connect to workers RPC
|
||||||
WorkerConnect(context.Context, string) error
|
WorkerConnect(context.Context, string) error
|
||||||
|
@ -181,11 +181,12 @@ type StorageMinerStruct struct {
|
|||||||
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
|
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
|
||||||
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
|
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
|
||||||
|
|
||||||
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
|
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
|
||||||
WorkerStats func(context.Context) (map[uint64]api.WorkerStats, error) `perm:"admin"`
|
WorkerStats func(context.Context) (map[uint64]api.WorkerStats, error) `perm:"admin"`
|
||||||
|
|
||||||
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
|
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
|
||||||
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
|
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
|
||||||
|
StorageStat func(context.Context, stores.ID) (stores.FsStat, error) `perm:"admin"`
|
||||||
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
|
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
|
||||||
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
||||||
StorageDropSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
StorageDropSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
||||||
@ -688,6 +689,10 @@ func (c *StorageMinerStruct) StorageLocal(ctx context.Context) (map[stores.ID]st
|
|||||||
return c.Internal.StorageLocal(ctx)
|
return c.Internal.StorageLocal(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *StorageMinerStruct) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
|
||||||
|
return c.Internal.StorageStat(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (stores.StorageInfo, error) {
|
func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (stores.StorageInfo, error) {
|
||||||
return c.Internal.StorageInfo(ctx, id)
|
return c.Internal.StorageInfo(ctx, id)
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
@ -18,6 +19,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-sectorbuilder"
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
||||||
)
|
)
|
||||||
@ -134,9 +136,15 @@ var storageListCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sorted := make([]struct{stores.ID; sectors []stores.Decl}, 0, len(st))
|
sorted := make([]struct {
|
||||||
|
stores.ID
|
||||||
|
sectors []stores.Decl
|
||||||
|
}, 0, len(st))
|
||||||
for id, decls := range st {
|
for id, decls := range st {
|
||||||
sorted = append(sorted, struct{stores.ID; sectors []stores.Decl}{id, decls})
|
sorted = append(sorted, struct {
|
||||||
|
stores.ID
|
||||||
|
sectors []stores.Decl
|
||||||
|
}{id, decls})
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(sorted, func(i, j int) bool {
|
sort.Slice(sorted, func(i, j int) bool {
|
||||||
@ -154,8 +162,20 @@ var storageListCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pingStart := time.Now()
|
||||||
|
st, err := nodeApi.StorageStat(ctx, s.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ping := time.Now().Sub(pingStart)
|
||||||
|
|
||||||
fmt.Printf("%s:\n", s.ID)
|
fmt.Printf("%s:\n", s.ID)
|
||||||
fmt.Printf("\tUnsealed: %d; Sealed: %d; Caches: %d\n", cnt[0], cnt[1], cnt[2])
|
fmt.Printf("\tUnsealed: %d; Sealed: %d; Caches: %d\n", cnt[0], cnt[1], cnt[2])
|
||||||
|
fmt.Printf("\tSpace Used: %s/%s %d%% (%s avail)\n",
|
||||||
|
types.SizeStr(types.NewInt(st.Capacity-st.Available)),
|
||||||
|
types.SizeStr(types.NewInt(st.Capacity)),
|
||||||
|
(st.Capacity-st.Available)*100/st.Capacity,
|
||||||
|
types.SizeStr(types.NewInt(st.Available)))
|
||||||
|
|
||||||
si, err := nodeApi.StorageInfo(ctx, s.ID)
|
si, err := nodeApi.StorageInfo(ctx, s.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -179,8 +199,13 @@ var storageListCmd = &cli.Command{
|
|||||||
if localPath, ok := local[s.ID]; ok {
|
if localPath, ok := local[s.ID]; ok {
|
||||||
fmt.Printf("\tLocal: %s\n", localPath)
|
fmt.Printf("\tLocal: %s\n", localPath)
|
||||||
}
|
}
|
||||||
for _, l := range si.URLs {
|
for i, l := range si.URLs {
|
||||||
fmt.Printf("\tURL: %s\n", l) // TODO; try pinging maybe?? print latency?
|
var rtt string
|
||||||
|
if _, ok := local[s.ID]; !ok && i == 0 {
|
||||||
|
rtt = " (latency: " + ping.Truncate(time.Microsecond*100).String() + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\tURL: %s%s\n", l, rtt) // TODO; try pinging maybe?? print latency?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,9 +35,15 @@ var workersListCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
st := make([]struct{id uint64; api.WorkerStats}, 0, len(stats))
|
st := make([]struct {
|
||||||
|
id uint64
|
||||||
|
api.WorkerStats
|
||||||
|
}, 0, len(stats))
|
||||||
for id, stat := range stats {
|
for id, stat := range stats {
|
||||||
st = append(st, struct{id uint64; api.WorkerStats}{id, stat})
|
st = append(st, struct {
|
||||||
|
id uint64
|
||||||
|
api.WorkerStats
|
||||||
|
}{id, stat})
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(st, func(i, j int) bool {
|
sort.Slice(st, func(i, j int) bool {
|
||||||
@ -66,14 +72,14 @@ var workersListCmd = &cli.Command{
|
|||||||
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)),
|
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)),
|
||||||
types.SizeStr(types.NewInt(stat.Info.Resources.MemSwap)),
|
types.SizeStr(types.NewInt(stat.Info.Resources.MemSwap)),
|
||||||
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved)),
|
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved)),
|
||||||
stat.Info.Resources.MemReserved * 100 / stat.Info.Resources.MemPhysical)
|
stat.Info.Resources.MemReserved*100/stat.Info.Resources.MemPhysical)
|
||||||
|
|
||||||
fmt.Printf("\t\tUsed: Physical %s (%d%% phys), Virtual %s (%d%% phys, %d%% virt)\n",
|
fmt.Printf("\t\tUsed: Physical %s (%d%% phys), Virtual %s (%d%% phys, %d%% virt)\n",
|
||||||
types.SizeStr(types.NewInt(stat.MemUsedMin)),
|
types.SizeStr(types.NewInt(stat.MemUsedMin)),
|
||||||
stat.MemUsedMin * 100 / stat.Info.Resources.MemPhysical,
|
stat.MemUsedMin*100/stat.Info.Resources.MemPhysical,
|
||||||
types.SizeStr(types.NewInt(stat.MemUsedMax)),
|
types.SizeStr(types.NewInt(stat.MemUsedMax)),
|
||||||
stat.MemUsedMax * 100 / stat.Info.Resources.MemPhysical,
|
stat.MemUsedMax*100/stat.Info.Resources.MemPhysical,
|
||||||
stat.MemUsedMax * 100 / (stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap))
|
stat.MemUsedMax*100/(stat.Info.Resources.MemPhysical+stat.Info.Resources.MemSwap))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -51,7 +51,6 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
|
|||||||
sm.StorageMgr.ServeHTTP(w, r)
|
sm.StorageMgr.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]api.WorkerStats, error) {
|
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]api.WorkerStats, error) {
|
||||||
return sm.StorageMgr.WorkerStats(), nil
|
return sm.StorageMgr.WorkerStats(), nil
|
||||||
}
|
}
|
||||||
@ -142,6 +141,10 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
|
||||||
|
return sm.StorageMgr.FsStat(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
|
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
|
||||||
return sm.Miner.ForceSectorState(ctx, id, state)
|
return sm.Miner.ForceSectorState(ctx, id, state)
|
||||||
}
|
}
|
||||||
|
@ -410,4 +410,8 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
|
||||||
|
return m.storage.FsStat(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
var _ SectorManager = &Manager{}
|
var _ SectorManager = &Manager{}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package stores
|
package stores
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@ -23,12 +24,36 @@ type FetchHandler struct {
|
|||||||
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
|
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
|
||||||
mux := mux.NewRouter()
|
mux := mux.NewRouter()
|
||||||
|
|
||||||
|
mux.HandleFunc("/remote/stat/{id}", handler.remoteStatFs).Methods("GET")
|
||||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
|
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
|
||||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
|
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
|
||||||
|
|
||||||
mux.ServeHTTP(w, r)
|
mux.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Debugf("SERVE STAT %s", r.URL)
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
id := ID(vars["id"])
|
||||||
|
|
||||||
|
st, err := handler.Local.FsStat(id)
|
||||||
|
switch err {
|
||||||
|
case errPathNotFound:
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
case nil:
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
w.WriteHeader(500)
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.NewEncoder(w).Encode(&st); err != nil {
|
||||||
|
log.Warnf("error writing stat response: %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Infof("SERVE GET %s", r.URL)
|
log.Infof("SERVE GET %s", r.URL)
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
|
@ -295,8 +295,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.Sec
|
|||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(candidates, func(i, j int) bool {
|
sort.Slice(candidates, func(i, j int) bool {
|
||||||
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight)))
|
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight)))
|
||||||
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight)))
|
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight)))
|
||||||
|
|
||||||
return iw.GreaterThan(jw)
|
return iw.GreaterThan(jw)
|
||||||
})
|
})
|
||||||
|
@ -12,11 +12,7 @@ import (
|
|||||||
|
|
||||||
type Store interface {
|
type Store interface {
|
||||||
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
||||||
}
|
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||||
|
|
||||||
type FsStat struct {
|
|
||||||
Capacity uint64
|
|
||||||
Free uint64 // Free to use for sector storage
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Stat(path string) (FsStat, error) {
|
func Stat(path string) (FsStat, error) {
|
||||||
@ -26,7 +22,13 @@ func Stat(path string) (FsStat, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return FsStat{
|
return FsStat{
|
||||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
Capacity: stat.Blocks * uint64(stat.Bsize),
|
||||||
Free: stat.Bavail * uint64(stat.Bsize),
|
Available: stat.Bavail * uint64(stat.Bsize),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FsStat struct {
|
||||||
|
Capacity uint64
|
||||||
|
Available uint64 // Available to use for sector storage
|
||||||
|
Used uint64
|
||||||
|
}
|
||||||
|
@ -309,13 +309,15 @@ func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
||||||
|
|
||||||
func (st *Local) FsStat(id ID) (FsStat, error) {
|
func (st *Local) FsStat(id ID) (FsStat, error) {
|
||||||
st.localLk.RLock()
|
st.localLk.RLock()
|
||||||
defer st.localLk.RUnlock()
|
defer st.localLk.RUnlock()
|
||||||
|
|
||||||
p, ok := st.paths[id]
|
p, ok := st.paths[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return FsStat{}, xerrors.Errorf("fsstat: path not found")
|
return FsStat{}, errPathNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return Stat(p.local)
|
return Stat(p.local)
|
||||||
|
@ -2,9 +2,13 @@ package stores
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
"mime"
|
"mime"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
gopath "path"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -75,7 +79,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: some way to allow having duplicated sectors in the system for perf
|
// TODO: some way to allow having duplicated sectors in the system for perf
|
||||||
if err := r.deleteFromRemote(url); err != nil {
|
if err := r.deleteFromRemote(ctx, url); err != nil {
|
||||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -103,7 +107,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
var merr error
|
var merr error
|
||||||
for _, info := range si {
|
for _, info := range si {
|
||||||
for _, url := range info.URLs {
|
for _, url := range info.URLs {
|
||||||
err := r.fetch(url, dest)
|
err := r.fetch(ctx, url, dest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err))
|
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err))
|
||||||
continue
|
continue
|
||||||
@ -120,7 +124,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) fetch(url, outname string) error {
|
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
||||||
log.Infof("Fetch %s -> %s", url, outname)
|
log.Infof("Fetch %s -> %s", url, outname)
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
@ -128,6 +132,7 @@ func (r *Remote) fetch(url, outname string) error {
|
|||||||
return xerrors.Errorf("request: %w", err)
|
return xerrors.Errorf("request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header = r.auth
|
req.Header = r.auth
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := http.DefaultClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -168,7 +173,7 @@ func (r *Remote) fetch(url, outname string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) deleteFromRemote(url string) error {
|
func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
||||||
log.Infof("Delete %s", url)
|
log.Infof("Delete %s", url)
|
||||||
|
|
||||||
req, err := http.NewRequest("DELETE", url, nil)
|
req, err := http.NewRequest("DELETE", url, nil)
|
||||||
@ -176,6 +181,7 @@ func (r *Remote) deleteFromRemote(url string) error {
|
|||||||
return xerrors.Errorf("request: %w", err)
|
return xerrors.Errorf("request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header = r.auth
|
req.Header = r.auth
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := http.DefaultClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -190,6 +196,68 @@ func (r *Remote) deleteFromRemote(url string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||||
|
st, err := r.local.FsStat(id)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return st, nil
|
||||||
|
case errPathNotFound:
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
return FsStat{}, xerrors.Errorf("local stat: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
si, err := r.index.StorageInfo(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(si.URLs) == 0 {
|
||||||
|
return FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
rl, err := url.Parse(si.URLs[0])
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rl.Path = gopath.Join(rl.Path, "stat", string(id))
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", rl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header = r.auth
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("do request: %w", err)
|
||||||
|
}
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case 200:
|
||||||
|
break
|
||||||
|
case 404:
|
||||||
|
return FsStat{}, errPathNotFound
|
||||||
|
case 500:
|
||||||
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return FsStat{}, xerrors.New(string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
var out FsStat
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func mergeDone(a func(), b func()) func() {
|
func mergeDone(a func(), b func()) func() {
|
||||||
return func() {
|
return func() {
|
||||||
a()
|
a()
|
||||||
|
Loading…
Reference in New Issue
Block a user